Source code for structum_lab.plugins.dynaconf.features.cache
# src/structum_lab.plugins.dynaconf/cache.py
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: 2025 PythonWoods
"""Smart Caching System for Configuration.
Provides TTL-based caching with LRU eviction and selective invalidation.
"""
from typing import Any, Dict, Optional
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import threading
import logging
log = logging.getLogger(__name__)
[docs]
@dataclass
class CacheEntry:
"""Entry in cache with metadata."""
value: Any
timestamp: datetime = field(default_factory=datetime.now)
hit_count: int = 0
[docs]
def is_expired(self, ttl: timedelta) -> bool:
"""Check if entry has exceeded TTL."""
return datetime.now() - self.timestamp > ttl
[docs]
def age_seconds(self) -> float:
"""Return age in seconds."""
return (datetime.now() - self.timestamp).total_seconds()
[docs]
class SmartCache:
"""Cache with TTL, LRU eviction, and selective invalidation."""
[docs]
def __init__(
self, max_size: int = 1000, default_ttl: timedelta = timedelta(hours=1)
) -> None:
self._cache: Dict[str, CacheEntry] = {}
self._max_size = max_size
self._default_ttl = default_ttl
self._lock = threading.RLock()
self._stats = {"hits": 0, "misses": 0}
[docs]
def get(self, key: str) -> Optional[Any]:
"""Retrieve from cache."""
with self._lock:
entry = self._cache.get(key)
if not entry:
self._stats["misses"] += 1
return None
if entry.is_expired(self._default_ttl):
del self._cache[key]
self._stats["misses"] += 1
return None
entry.hit_count += 1
self._stats["hits"] += 1
return entry.value
[docs]
def set(self, key: str, value: Any) -> None:
"""Insert into cache."""
with self._lock:
# LRU eviction if we reach limit
if len(self._cache) >= self._max_size:
self._evict_lru()
self._cache[key] = CacheEntry(value=value)
def _evict_lru(self) -> None:
"""Remove least recently used entry."""
if not self._cache:
return
# Find entry with lowest hit count, oldest timestamp
lru_key = min(
self._cache.keys(),
key=lambda k: (self._cache[k].hit_count, self._cache[k].timestamp),
)
del self._cache[lru_key]
log.debug(f"Evicted LRU cache entry: {lru_key}")
[docs]
def invalidate(self, key: str) -> None:
"""Invalidate a specific key."""
with self._lock:
self._cache.pop(key, None)
[docs]
def invalidate_prefix(self, prefix: str) -> int:
"""Invalidate all keys starting with prefix."""
with self._lock:
keys_to_remove = [k for k in self._cache if k.startswith(prefix)]
for key in keys_to_remove:
del self._cache[key]
if keys_to_remove:
log.debug(
f"Invalidated {len(keys_to_remove)} cache entries with prefix '{prefix}'"
)
return len(keys_to_remove)
[docs]
def clear(self) -> None:
"""Clear entire cache."""
with self._lock:
self._cache.clear()
self._stats = {"hits": 0, "misses": 0}
[docs]
def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics for monitoring."""
with self._lock:
total = self._stats["hits"] + self._stats["misses"]
hit_rate = self._stats["hits"] / total if total > 0 else 0
# Top 10 most accessed entries
top_entries = sorted(
self._cache.items(), key=lambda x: x[1].hit_count, reverse=True
)[:10]
return {
"size": len(self._cache),
"max_size": self._max_size,
"hits": self._stats["hits"],
"misses": self._stats["misses"],
"hit_rate": round(hit_rate, 3),
"top_entries": [
{
"key": k,
"hits": e.hit_count,
"age_seconds": round(e.age_seconds(), 2),
}
for k, e in top_entries
],
}