Core Module API Reference¶
This section documents the core interfaces and protocols that define Structum Lab’s architecture.
Structum Lab: Structum Core¶
Structum Core è il componente fondante di Structum Lab che definisce i protocolli, le interfacce base e le utility condivise.
🚀 Installazione¶
pip install structum-lab
📚 Documentazione Completa¶
La documentazione completa, incluse le guide API e gli esempi di configurazione, è disponibile su: 👉 https://structum-lab.pages.dev/
✨ Funzionalità Principali¶
Protocolli Base: Definisce le interfacce per Configurazione, Logging e altri servizi core.
Utility Condivise: Strumenti comuni per lo sviluppo di plugin e applicazioni.
Struttura Modulare: Supporta l’architettura a plugin del framework.
Integrazione nativa con l’ecosistema Structum.
Core Package¶
Configuration¶
Configuration Interface¶
Sottosistema di configurazione di Structum.
Questo modulo espone l’API pubblica ufficiale per l’accesso e la gestione della configurazione. Tutto il codice applicativo e i plugin devono interagire esclusivamente tramite queste funzioni.
- Esempio d’uso:
from structum.config import get_config
cfg = get_config() cfg.set(“app.mode”, “production”)
- structum_lab.config.ConfigInterface¶
alias of
ConfigProviderInterface
- structum_lab.config.get_config() ConfigProviderInterface¶
Restituisce il provider di configurazione globale (Caricamento Lazy e Dinamico).
Se nessun provider è stato registrato, viene istanziato automaticamente il provider JSON di fallback basato sulla standard library.
- Returns:¶
Un’implementazione di ConfigInterface.
- structum_lab.config.set_config_provider(provider: ConfigProviderInterface | type[ConfigProviderInterface]) None[source]¶
Registra un provider di configurazione personalizzato.
Questa funzione viene tipicamente chiamata da un plugin in fase di inizializzazione. Il provider registrato sostituisce globalmente quello precedente.
- Parameters:¶
- provider: ConfigProviderInterface | type[ConfigProviderInterface]¶
Istanza o classe di un provider compatibile con ConfigInterface.
- class structum_lab.config.JSONConfigProvider[source]¶
Bases:
objectProvider di configurazione basato su file JSON.
Caratteristiche: - Persistenza automatica su filesystem - Supporto a chiavi annidate tramite dot notation - Nessuna dipendenza esterna
Interfaccia del sistema di configurazione di Structum.
Questo modulo definisce il contratto formale che tutti i provider di configurazione devono rispettare. Il core di Structum dipende esclusivamente da questa interfaccia e non da implementazioni concrete.
L’architettura è pensata per essere estendibile tramite plugin: provider avanzati (Dynaconf, database, servizi remoti, ecc.) possono essere registrati in fase di bootstrap senza modificare il core.
- class structum_lab.config.interface.ConfigProviderInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol defining the interface for configuration providers.
This interface uses
typing.Protocolto enable duck typing: explicit inheritance is not required. Any object implementing these methods with compatible signatures is considered a valid provider.This approach maximizes flexibility and reduces coupling between core and plugins.
- Implementations:
JSONConfigProvider(fallback)
Example
Using a configuration provider:
from structum_lab.config import get_config_provider config = get_config_provider() # Get value with fallback db_host = config.get("database.host", default="localhost") # Set value config.set("database.port", 5432) # Check existence if config.has("database.password"): password = config.get("database.password") # Persist changes config.save()Note
All providers should be thread-safe and support hierarchical key access using dot-notation (e.g., “database.pool.size”).
See also
get_config_provider(): Retrieve the global configuration providerset_config_provider(): Register a custom provider-
get(key: str, default: Any =
None) Any[source]¶ Retrieve a configuration value by key.
Supports dot-notation for nested configuration values (e.g., “database.pool.size” accesses nested dictionaries).
- Parameters:¶
- Returns:¶
The configuration value, or default if key not found.
- Return type:¶
Any
- Raises:¶
KeyError – If key not found and default not provided (implementation-specific behavior).
Example
Retrieving nested configuration:
# Config: {"database": {"host": "localhost", "port": 5432}} host = config.get("database.host") # "localhost" port = config.get("database.port") # 5432 # With fallback timeout = config.get("database.timeout", 30) # 30
- set(key: str, value: Any) None[source]¶
Set a configuration value.
Persistence behavior depends on the concrete provider implementation. Changes may be in-memory only until
save()is called.- Parameters:¶
Example
Setting configuration values:
# Simple value config.set("app.debug", True) # Nested structure (creates intermediate dicts) config.set("database.pool.size", 10) # Complex value config.set("servers", ["srv1", "srv2", "srv3"])Warning
Changes are not persisted to storage until
save()is called (for file-based providers). In-memory providers lose changes on restart.
- has(key: str) bool[source]¶
Check if a configuration key exists.
Example
Checking key existence:
if config.has("database.password"): password = config.get("database.password") else: raise ValueError("Database password not configured")Note
A key can exist with a None value. Use
get()to distinguish between missing keys and None values.
- save() None[source]¶
Persist configuration changes to underlying storage.
For file-based providers, writes changes to disk. For remote providers, may trigger a commit or synchronization. For in-memory providers, this may be a no-op.
- Raises:¶
IOError – If unable to write to storage (file permissions, disk full).
RuntimeError – If provider doesn’t support persistence.
Example
Saving configuration:
config.set("app.version", "2.0.0") config.set("app.build", 123) config.save() # Persist both changesWarning
Unsaved changes will be lost on process termination. Call save() periodically for critical configuration updates.
See also
reload(): Discard changes and reload from storage
- reload() None[source]¶
Reload configuration from persistent storage.
Discards all unsaved in-memory changes and reloads the configuration from the underlying storage source.
Example
Reloading configuration:
config.set("temp.value", 123) # In-memory change config.reload() # Discards temp.value # Now config reflects disk state assert not config.has("temp.value")Warning
All unsaved changes will be permanently lost. Consider calling
save()before reload if needed.See also
save(): Persist changes before reloading
- structum_lab.config.interface.get_config_provider() ConfigProviderInterface[source]¶
Restituisce il provider di configurazione globale (Caricamento Lazy e Dinamico).
Se nessun provider è stato registrato, viene istanziato automaticamente il provider JSON di fallback basato sulla standard library.
- Returns:¶
Un’implementazione di ConfigInterface.
- structum_lab.config.interface.set_config_provider(provider: ConfigProviderInterface | type[ConfigProviderInterface]) None[source]¶
Registra un provider di configurazione personalizzato.
Questa funzione viene tipicamente chiamata da un plugin in fase di inizializzazione. Il provider registrato sostituisce globalmente quello precedente.
- Parameters:¶
- provider: ConfigProviderInterface | type[ConfigProviderInterface]¶
Istanza o classe di un provider compatibile con ConfigInterface.
Configuration Manager¶
Provider di configurazione JSON per Structum.
Questa implementazione rappresenta il fallback minimale basato esclusivamente sulla standard library Python. È pensata per garantire un funzionamento immediato del framework in assenza di plugin avanzati.
Per casi d’uso complessi (multi-source, validazione, override per ambiente) si raccomanda l’uso di provider esterni come Dynaconf.
- class structum_lab.config.manager.JSONConfigProvider[source]¶
Bases:
objectProvider di configurazione basato su file JSON.
Caratteristiche: - Persistenza automatica su filesystem - Supporto a chiavi annidate tramite dot notation - Nessuna dipendenza esterna
-
get(key: str, default: Any =
None) Any[source]¶ Restituisce il valore associato a una chiave (dot notation supportata).
-
get(key: str, default: Any =
Logging¶
Logging Interfaces¶
Logging and Metrics interface for Structum. Provides fallback implementations that can be replaced by plugins.
- class structum_lab.logging.StandardLoggerAdapter(logger: Logger)[source]¶
Bases:
objectFallback implementation that adapts standard logging.Logger to LoggerInterface.
It captures structured arguments (**kwargs) and places them into the ‘extra’ dictionary, creating a rudimentary structured logging experience even without advanced plugins.
- structum_lab.logging.get_logger(name: str) LoggerInterface[source]¶
Returns a logger instance conforming to LoggerInterface.
By default, this returns a StandardLoggerAdapter wrapping the stdlib logger. Plugins (like structum_observability) should patch this function to return their own implementation (e.g., a structlog BoundLogger).
- class structum_lab.logging.NullMetrics[source]¶
Bases:
objectImplementazione ‘Null Object’ per le metriche (fallback).
- structum_lab.logging.set_metrics_collector(collector: MetricsCollectorProtocol) None[source]¶
Permette ai plugin di registrare il proprio collettore di metriche.
-
structum_lab.logging.configure_logging(level: str =
'INFO', format: str ='json') None[source]¶ Combines basic logging configuration. Plugins should patch this to provide advanced setup.
- structum_lab.logging.get_logger_backend() str[source]¶
Returns the name of the active logging backend.
- structum_lab.logging.set_context(**kwargs: Any) None[source]¶
Sets global context variables (Fallback: No-op).
- structum_lab.logging.clear_context() None[source]¶
Clears global context variables (Fallback: No-op).
- structum_lab.logging.bind_context(**kwargs: Any)[source]¶
Context manager for temporary context (Fallback: yield).
Logging Interface Definition.
This module defines the LoggerInterface Protocol, which establishes the contract for all loggers in the Structum ecosystem. This ensures that the core framework and plugins can communicate logging intentions without depending on concrete implementations (Dependency Inversion).
- class structum_lab.logging.interfaces.LoggerInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for structured logging in Structum Lab.
All logging implementations (stdlib adapter, structlog wrapper, custom loggers) must conform to this interface. It follows the standard Python logger API but explicitly supports
**kwargsfor structured contextual data.The protocol uses
@runtime_checkableto enableisinstance()checks at runtime, useful for validation and debugging.- Implementations:
StructlogAdapterStandard library fallback (internal)
Example
Basic logging with structured context:
from structum_lab.logging import get_logger log = get_logger(__name__) # Simple message log.info("Application started") # With structured data log.info("User logged in", user_id=123, ip="192.168.1.1") # Error with exception try: risky_operation() except Exception as e: log.error("Operation failed", exc_info=True, operation="risky")Note
All methods accept
**kwargsfor structured logging. Keys should be valid Python identifiers and values should be JSON-serializable. Structured data enables powerful log aggregation and analysis.See also
get_logger(): Get a logger instanceconfigure_logging(): Configure logging backend- debug(message: str, **kwargs: Any) None[source]¶
Log a message at DEBUG level.
Debug-level logs are typically used for detailed diagnostic information useful during development and troubleshooting.
- Parameters:¶
Example
Debug logging with context:
log.debug("Cache lookup", key="user:123", hit=True, latency_ms=2.5) log.debug("SQL query", query="SELECT * FROM users", params={"id": 1})Note
Debug logs are typically disabled in production for performance.
- info(message: str, **kwargs: Any) None[source]¶
Log a message at INFO level.
Info-level logs track general application flow and significant events (e.g., service started, configuration loaded, request completed).
- Parameters:¶
Example
Info logging for application events:
log.info("Server started", host="0.0.0.0", port=8000) log.info("Request completed", path="/api/users", status=200, duration_ms=45) log.info("Database connected", driver="postgresql", pool_size=10)Note
This is the default log level for production environments.
- warning(message: str, **kwargs: Any) None[source]¶
Log a message at WARNING level.
Warning-level logs indicate potentially problematic situations that don’t prevent the application from functioning but deserve attention.
- Parameters:¶
Example
Warning logging for recoverable issues:
log.warning("Deprecated API used", endpoint="/old/api", caller="legacy_client") log.warning("High memory usage", usage_mb=1024, threshold_mb=800) log.warning("Retry attempt", operation="db_connect", attempt=2, max_attempts=3)Note
Warnings should be actionable - something that may need fixing but doesn’t require immediate attention.
- error(message: str, **kwargs: Any) None[source]¶
Log a message at ERROR level.
Error-level logs indicate failures that prevent specific operations from completing but don’t crash the application.
- Parameters:¶
Example
Error logging with exception info:
try: process_payment(order_id) except PaymentError as e: log.error( "Payment processing failed", exc_info=True, order_id=order_id, error_code="PAYMENT_DECLINED" )Warning
Errors indicate problems requiring investigation. Set up alerts for error rate thresholds in production.
See also
critical(): For application-critical failures
- critical(message: str, **kwargs: Any) None[source]¶
Log a message at CRITICAL level.
Critical-level logs indicate severe errors that may cause the application to abort or enter an unstable state.
- Parameters:¶
Example
Critical logging for fatal errors:
log.critical( "Database connection pool exhausted", active_connections=100, max_connections=100, pending_requests=50 ) log.critical( "Configuration file corrupted", path="/etc/app/config.toml", exc_info=True )Warning
Critical logs should trigger immediate alerts. These indicate situations where the application cannot continue safely.
Note
After logging a critical message, the application may need to shut down gracefully or enter a safe degraded mode.
Monitoring¶
Monitoring Interfaces¶
Monitoring subsystem for Structum Framework.
Provides a pluggable metrics interface following the same pattern as logging. Core provides the interface, plugins provide implementations (Prometheus, etc).
- class structum_lab.monitoring.MetricsInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for metrics emission in Structum Lab.
This interface enables plugins to emit metrics in a backend-agnostic manner. Implementations can target Prometheus, StatsD, Datadog, CloudWatch, or other monitoring systems.
- Implementations:
PrometheusMetricsStatsDMetricsNoOpMetrics: Fallback no-op implementation
Example
Basic metrics usage:
from structum_lab.monitoring import get_metrics metrics = get_metrics() # Counter: track events metrics.increment("api.requests", tags={"endpoint": "/users", "method": "GET"}) metrics.increment("api.errors", tags={"endpoint": "/users", "status": "500"}) # Gauge: current values metrics.gauge("cache.size", len(cache), tags={"cache": "redis"}) metrics.gauge("db.connections.active", pool.active_count()) # Timing: operation duration import time start = time.time() process_request() duration = time.time() - start metrics.timing("api.duration", duration, tags={"endpoint": "/users"}) # Histogram: value distributions metrics.histogram("response.size", len(response_body), tags={"endpoint": "/users"})Note
All metric methods are non-blocking and should not raise exceptions. Failed metric emissions should be logged but not disrupt application flow.
See also
LoggerInterface: Logging interfaceget_metrics(): Retrieve metrics instance-
gauge(name: str, value: float, tags: dict[str, str] | None =
None) None[source]¶ Set a gauge metric to a specific value.
Gauges represent current values that can increase or decrease (e.g., memory usage, active connections, queue size). Each call overwrites the previous value.
- Parameters:¶
Example
System metrics:
import psutil # Memory usage mem = psutil.virtual_memory() metrics.gauge("system.memory.used", mem.used, tags={"host": hostname}) metrics.gauge("system.memory.percent", mem.percent) # CPU usage cpu_percent = psutil.cpu_percent(interval=1) metrics.gauge("system.cpu.percent", cpu_percent)Application metrics:
# Database connection pool metrics.gauge("db.pool.active", db.pool.active_connections()) metrics.gauge("db.pool.idle", db.pool.idle_connections()) # Queue size metrics.gauge("queue.length", len(task_queue), tags={"queue": "background"}) # Cache size metrics.gauge("cache.entries", cache.size(), tags={"cache": "redis"})Warning
Gauges represent point-in-time values. For cumulative values, use
increment().Note
Emit gauges periodically (e.g., every minute) for accurate monitoring
Consider using background jobs for system metric collection
See also
increment(): For cumulative counters
-
histogram(name: str, value: float, tags: dict[str, str] | None =
None) None[source]¶ Record a value in a histogram.
Histograms track value distributions (e.g., request sizes, payload sizes). Unlike gauges, all values are recorded and aggregated into buckets.
- Parameters:¶
Example
Request/response sizes:
# Request payload size request_size = len(request.body) metrics.histogram( "http.request.size", request_size, tags={"endpoint": request.path, "content_type": request.content_type} ) # Response size response_size = len(response.body) metrics.histogram( "http.response.size", response_size, tags={"endpoint": request.path} )Batch sizes:
# Processing batch sizes batch = fetch_batch_from_queue() metrics.histogram( "processing.batch.size", len(batch), tags={"queue": "tasks"} )Query result counts:
rows = db.execute("SELECT * FROM users").fetchall() metrics.histogram( "db.query.rows", len(rows), tags={"table": "users"} )Note
Histograms are ideal for analyzing value distributions (percentiles, averages)
Use
timing()specifically for duration measurementsAvoid extremely high-cardinality values (>1000 unique values per second)
-
increment(name: str, value: float =
1.0, tags: dict[str, str] | None =None) None[source]¶ Increment a counter metric.
Counters track cumulative values that only increase (e.g., request count, error count). Use for counting events over time.
- Parameters:¶
Example
Tracking API requests:
# Request counter metrics.increment("http.requests", tags={ "method": request.method, "endpoint": request.path, "status": str(response.status_code) }) # Error counter try: risky_operation() except Exception: metrics.increment("operations.errors", tags={"operation": "risky"}) raiseCache statistics:
# Cache hits/misses if key in cache: metrics.increment("cache.hits", tags={"cache": "redis"}) return cache[key] else: metrics.increment("cache.misses", tags={"cache": "redis"}) return fetch_from_db(key)Note
Counter values should never decrease
Use consistent tag keys across increments for proper aggregation
Avoid high-cardinality tags (e.g., user IDs) that create too many series
See also
gauge(): For values that can increase/decrease
-
timing(name: str, value: float, tags: dict[str, str] | None =
None) None[source]¶ Record a timing/duration metric.
- Used for tracking operation latency and performance.
Typically implemented as a histogram with predefined buckets.
- Args:
name (str): Metric name (e.g.,
api.request.duration). value (float): Duration in seconds (use fractional seconds for sub-second precision). tags (Optional[Dict[str, str]]): Labels for the metric. Defaults to None.- Example:
API endpoint timing:
import time start = time.time() try: result = handle_request(request) return result finally: duration = time.time() - start metrics.timing( "api.request.duration", duration, tags={"endpoint": request.path, "method": request.method} )Database query timing:
start = time.perf_counter() rows = db.execute(query) duration = time.perf_counter() - start metrics.timing( "db.query.duration", duration, tags={"table": "users", "operation": "select"} )Context manager for timing:
from contextlib import contextmanager @contextmanager def track_time(operation: str): start = time.time() try: yield finally: metrics.timing(f"{operation}.duration", time.time() - start) with track_time("data_processing"): process_large_dataset()- Note:
Use seconds as the unit for consistency
time.perf_counter()is more accurate thantime.time()for durationsMost backends convert to milliseconds for display
- See Also:
histogram(): For general value distributions
- class structum_lab.monitoring.NoOpMetrics[source]¶
Bases:
objectFallback no-op implementation when no monitoring plugin is installed.
-
histogram(name: str, value: float, tags: dict[str, str] | None =
None) None[source]¶ No-op histogram.
-
histogram(name: str, value: float, tags: dict[str, str] | None =
-
structum_lab.monitoring.get_metrics(namespace: str =
'structum') MetricsInterface[source]¶ Get metrics emitter for a namespace.
Example
>>> metrics = get_metrics("structum.config") >>> metrics.increment("operations.total", tags={"operation": "get"})
- structum_lab.monitoring.set_metrics_backend(backend: MetricsInterface) None[source]¶
Set the global metrics backend.
This is called by monitoring plugins (e.g., structum_observability) to inject their implementation.
- Parameters:¶
- backend: MetricsInterface¶
MetricsInterface implementation
Monitoring Interfaces for Structum Framework.
Provides protocol definitions for metrics emission following the same pattern as LoggerInterface.
- class structum_lab.monitoring.interfaces.MetricsInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for metrics emission in Structum Lab.
This interface enables plugins to emit metrics in a backend-agnostic manner. Implementations can target Prometheus, StatsD, Datadog, CloudWatch, or other monitoring systems.
- Implementations:
PrometheusMetricsStatsDMetricsNoOpMetrics: Fallback no-op implementation
Example
Basic metrics usage:
from structum_lab.monitoring import get_metrics metrics = get_metrics() # Counter: track events metrics.increment("api.requests", tags={"endpoint": "/users", "method": "GET"}) metrics.increment("api.errors", tags={"endpoint": "/users", "status": "500"}) # Gauge: current values metrics.gauge("cache.size", len(cache), tags={"cache": "redis"}) metrics.gauge("db.connections.active", pool.active_count()) # Timing: operation duration import time start = time.time() process_request() duration = time.time() - start metrics.timing("api.duration", duration, tags={"endpoint": "/users"}) # Histogram: value distributions metrics.histogram("response.size", len(response_body), tags={"endpoint": "/users"})Note
All metric methods are non-blocking and should not raise exceptions. Failed metric emissions should be logged but not disrupt application flow.
See also
LoggerInterface: Logging interfaceget_metrics(): Retrieve metrics instance-
increment(name: str, value: float =
1.0, tags: dict[str, str] | None =None) None[source]¶ Increment a counter metric.
Counters track cumulative values that only increase (e.g., request count, error count). Use for counting events over time.
- Parameters:¶
Example
Tracking API requests:
# Request counter metrics.increment("http.requests", tags={ "method": request.method, "endpoint": request.path, "status": str(response.status_code) }) # Error counter try: risky_operation() except Exception: metrics.increment("operations.errors", tags={"operation": "risky"}) raiseCache statistics:
# Cache hits/misses if key in cache: metrics.increment("cache.hits", tags={"cache": "redis"}) return cache[key] else: metrics.increment("cache.misses", tags={"cache": "redis"}) return fetch_from_db(key)Note
Counter values should never decrease
Use consistent tag keys across increments for proper aggregation
Avoid high-cardinality tags (e.g., user IDs) that create too many series
See also
gauge(): For values that can increase/decrease
-
gauge(name: str, value: float, tags: dict[str, str] | None =
None) None[source]¶ Set a gauge metric to a specific value.
Gauges represent current values that can increase or decrease (e.g., memory usage, active connections, queue size). Each call overwrites the previous value.
- Parameters:¶
Example
System metrics:
import psutil # Memory usage mem = psutil.virtual_memory() metrics.gauge("system.memory.used", mem.used, tags={"host": hostname}) metrics.gauge("system.memory.percent", mem.percent) # CPU usage cpu_percent = psutil.cpu_percent(interval=1) metrics.gauge("system.cpu.percent", cpu_percent)Application metrics:
# Database connection pool metrics.gauge("db.pool.active", db.pool.active_connections()) metrics.gauge("db.pool.idle", db.pool.idle_connections()) # Queue size metrics.gauge("queue.length", len(task_queue), tags={"queue": "background"}) # Cache size metrics.gauge("cache.entries", cache.size(), tags={"cache": "redis"})Warning
Gauges represent point-in-time values. For cumulative values, use
increment().Note
Emit gauges periodically (e.g., every minute) for accurate monitoring
Consider using background jobs for system metric collection
See also
increment(): For cumulative counters
-
timing(name: str, value: float, tags: dict[str, str] | None =
None) None[source]¶ Record a timing/duration metric.
- Used for tracking operation latency and performance.
Typically implemented as a histogram with predefined buckets.
- Args:
name (str): Metric name (e.g.,
api.request.duration). value (float): Duration in seconds (use fractional seconds for sub-second precision). tags (Optional[Dict[str, str]]): Labels for the metric. Defaults to None.- Example:
API endpoint timing:
import time start = time.time() try: result = handle_request(request) return result finally: duration = time.time() - start metrics.timing( "api.request.duration", duration, tags={"endpoint": request.path, "method": request.method} )Database query timing:
start = time.perf_counter() rows = db.execute(query) duration = time.perf_counter() - start metrics.timing( "db.query.duration", duration, tags={"table": "users", "operation": "select"} )Context manager for timing:
from contextlib import contextmanager @contextmanager def track_time(operation: str): start = time.time() try: yield finally: metrics.timing(f"{operation}.duration", time.time() - start) with track_time("data_processing"): process_large_dataset()- Note:
Use seconds as the unit for consistency
time.perf_counter()is more accurate thantime.time()for durationsMost backends convert to milliseconds for display
- See Also:
histogram(): For general value distributions
-
histogram(name: str, value: float, tags: dict[str, str] | None =
None) None[source]¶ Record a value in a histogram.
Histograms track value distributions (e.g., request sizes, payload sizes). Unlike gauges, all values are recorded and aggregated into buckets.
- Parameters:¶
Example
Request/response sizes:
# Request payload size request_size = len(request.body) metrics.histogram( "http.request.size", request_size, tags={"endpoint": request.path, "content_type": request.content_type} ) # Response size response_size = len(response.body) metrics.histogram( "http.response.size", response_size, tags={"endpoint": request.path} )Batch sizes:
# Processing batch sizes batch = fetch_batch_from_queue() metrics.histogram( "processing.batch.size", len(batch), tags={"queue": "tasks"} )Query result counts:
rows = db.execute("SELECT * FROM users").fetchall() metrics.histogram( "db.query.rows", len(rows), tags={"table": "users"} )Note
Histograms are ideal for analyzing value distributions (percentiles, averages)
Use
timing()specifically for duration measurementsAvoid extremely high-cardinality values (>1000 unique values per second)
- class structum_lab.monitoring.interfaces.NoOpMetrics[source]¶
Bases:
objectFallback no-op implementation when no monitoring plugin is installed.
Database¶
Database Interfaces¶
Database interfaces for Structum Lab.
This module provides Protocol definitions for database operations. For concrete implementations, install structum-database:
pip install structum-database
- Usage:
>>> from structum_lab.plugins.database import SQLAlchemyDatabase >>> db = SQLAlchemyDatabase.from_config()
- class structum_lab.database.DatabaseInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for database managers in Structum Lab.
This is the main entry point for all database operations. Implementations must provide connection pooling, transaction management, and health monitoring.
The protocol abstracts away database-specific details, allowing applications to work with different backends (PostgreSQL, MySQL, SQLite) through a unified interface.
- Implementations:
Example
Basic usage with configuration::
from structum_lab.plugins.database import SQLAlchemyDatabase
# Initialize from config db = SQLAlchemyDatabase.from_config()
# Or with explicit URL db = SQLAlchemyDatabase(url=”postgresql://user:pass@localhost/mydb”)
# Use transaction context manager (recommended) with db.transaction() as conn:
- conn.execute(
“INSERT INTO users (name, email) VALUES (:name, :email)”, {“name”: “John”, “email”: “john@example.com”}
)
user_id = conn.execute(“SELECT last_insert_id()”).fetchone()[“id”]
- conn.execute(
“INSERT INTO profiles (user_id, bio) VALUES (:uid, :bio)”, {“uid”: user_id, “bio”: “Software engineer”}
) # Commits automatically on success
# Check health health = db.health_check() if health.status != HealthStatus.HEALTHY:
log.warning(“Database issues detected”, result=health)
Note
Always use the
transaction()context manager for database operations. It ensures proper connection pooling, automatic commit/rollback, and resource cleanup.See also
ConnectionInterface: Connection protocolTransactionInterface: Transaction protocolHealthCheckResult: Health check result data class- close() None[source]¶
Close all connections in the pool and release resources.
Should be called during application shutdown to ensure clean termination. Any ongoing transactions should be completed before closing.
- Raises:¶
RuntimeError – If called while transactions are active (implementation-specific).
Example
Application shutdown:
import atexit db = SQLAlchemyDatabase.from_config () atexit.register(db.close) # Ensure cleanup on exit # Or in FastAPI lifespan @asynccontextmanager async def lifespan(app: FastAPI): db.connect() yield db.close() # Clean shutdownWarning
After calling close(), the database instance should not be reused. Create a new instance for additional operations.
See also
connect(): Initialize connection pool
- connect() None[source]¶
Establish the database connection pool.
Creates and initializes the connection pool. Called automatically on first database operation if not already connected.
- Raises:¶
ConnectionError – If unable to connect to database.
ConfigurationError – If database URL or settings are invalid.
Example
Explicit connection during startup:
db = SQLAlchemyDatabase.from_config() try: db.connect() log.info("Database pool created", url=db.url) except ConnectionError as e: log.critical("Cannot connect to database", exc_info=True) sys.exit(1)Note
Usually not needed - the database connects automatically on first use. Explicit connection is useful for fail-fast behavior during startup.
See also
close(): Shutdown connection pool
- get_connection() ConnectionInterface[source]¶
Acquire a connection from the pool.
Returns a connection that must be explicitly returned to the pool after use. Prefer using
transaction()instead.- Returns:¶
A database connection from the pool.
- Return type:¶
- Raises:¶
PoolExhaustedError – If no connections are available and pool is at maximum.
ConnectionError – If pool is not initialized.
Example
Manual connection management (not recommended):
conn = db.get_connection() try: result = conn.execute("SELECT * FROM users") users = result.fetchall() finally: # Must return connection to pool manually conn.close()Warning
Manual connection management is error-prone. Always prefer
transaction()context manager which handles connections automatically.See also
transaction(): Recommended connection API
- health_check() HealthCheckResult[source]¶
Check database connectivity and health status.
Performs a simple query to verify the database is responsive and measures latency. Useful for readiness probes and monitoring.
Example
Health check endpoint:
@app.get("/health/database") def database_health(): result = db.health_check() if result.status == HealthStatus.UNHEALTHY: raise HTTPException(503, detail=result.message) return { "status": result.status.value, "latency_ms": result.latency_ms, "message": result.message }Prometheus metrics:
result = db.health_check() # Record health status db_health_gauge.labels(database="main").set( 1 if result.status == HealthStatus.HEALTHY else 0 ) # Record latency if result.latency_ms: db_latency_histogram.observe(result.latency_ms)Note
Health checks execute a lightweight query (usually SELECT 1). They should complete quickly (<100ms typically).
See also
HealthCheckResult: Result data classHealthStatus: Health status enumeration
- property is_connected : bool¶
Check if the database connection pool is active.
Example
Conditional connection:
if not db.is_connected: db.connect() log.info("Database connection established")Note
Most operations call
connect()automatically if not connected. Explicit checking is mainly useful for health checks and diagnostics.
- transaction() Iterator[ConnectionInterface][source]¶
Context manager for database transactions.
Provides automatic transaction management: commits on success, rolls back on exception. This is the recommended way to perform database operations.
- Yields:¶
ConnectionInterface – Database connection with active transaction.
- Raises:¶
DatabaseError – If transaction fails to start.
ConnectionError – If pool is exhausted or not connected.
Example
Recommended usage pattern:
# Single transaction with db.transaction() as conn: conn.execute( "UPDATE accounts SET balance = balance - :amount WHERE id = :id", {"amount": 100, "id": 1} ) conn.execute( "UPDATE accounts SET balance = balance + :amount WHERE id = :id", {"amount": 100, "id": 2} ) # Commits automatically if no exception # Exception triggers rollback try: with db.transaction() as conn: conn.execute("DELETE FROM important_data") raise ValueError("Validation failed") except ValueError: pass # Transaction automatically rolled backNote
Transactions are isolated - changes are not visible to other connections until commit. Isolation level depends on database implementation (usually READ COMMITTED).
See also
TransactionInterface: Transaction protocolConnectionInterface: Connection protocol
- property url : str¶
Get the database connection URL (sanitized).
- Returns:¶
- Database URL with password redacted/masked for security.
Example:
"postgresql://user:***@localhost:5432/mydb"
- Return type:¶
Example
Logging database configuration:
log.info("Connected to database", url=db.url) # Logs: postgresql://user:***@localhost/mydbNote
Passwords are automatically redacted to prevent accidental logging of credentials.
- class structum_lab.database.ConnectionInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for database connections in Structum Lab.
A connection represents an active link to the database that can execute queries and retrieve results. Connections are typically obtained from a connection pool and should be returned after use.
- Implementations:
Example
Using a connection within a transaction:
with db.transaction() as conn: # Execute query with named parameters conn.execute( "INSERT INTO users (name, email) VALUES (:name, :email)", {"name": "John", "email": "john@example.com"} ) # Fetch results conn.execute("SELECT * FROM users WHERE id > :id", {"id": 100}) users = conn.fetchall() for user in users: print(f"{user['name']} - {user['email']}")Note
Connections are not thread-safe. Use one connection per thread or protect access with locks.
See also
DatabaseInterface: Database manager providing connectionsDatabaseInterface.transaction(): Recommended way to get connections-
execute(query: str, params: dict[str, Any] | tuple[Any, ...] | None =
None) Any[source]¶ Execute a SQL query with optional parameters.
Supports both named parameters (dict) and positional parameters (tuple). Use
:namesyntax for named parameters,?for positional.- Parameters:¶
- Returns:¶
- Result object (implementation-specific). Use fetch methods
to retrieve rows.
- Return type:¶
Any
- Raises:¶
DatabaseError – If query execution fails.
ParameterError – If parameters don’t match query placeholders.
Example
Different parameter styles:
# Named parameters (recommended) conn.execute( "SELECT * FROM users WHERE age > :min_age AND city = :city", {"min_age": 18, "city": "NYC"} ) # Positional parameters conn.execute( "SELECT * FROM users WHERE age > ? AND city = ?", (18, "NYC") ) # No parameters conn.execute("SELECT COUNT(*) FROM users")Warning
Always use parameterized queries. Never use string interpolation for user input (SQL injection risk).
See also
fetchone(): Retrieve single rowfetchall(): Retrieve all rows
- fetchall() list[dict[str, Any]][source]¶
Fetch all remaining rows from the last executed query.
Example
Fetching multiple rows:
conn.execute("SELECT * FROM users WHERE active = :active", {"active": True}) users = conn.fetchall() for user in users: print(f"{user['id']}: {user['name']}") print(f"Total active users: {len(users)}")Warning
For large result sets, consider using
fetchmany()to avoid loading all rows into memory at once.See also
fetchone(): Fetch single rowfetchmany(): Fetch rows in batches
- fetchmany(size: int) list[dict[str, Any]][source]¶
Fetch up to
sizerows from the last executed query.Useful for processing large result sets in batches to manage memory usage.
- Parameters:¶
- Returns:¶
- List of rows (up to size). May return fewer
rows if result set is exhausted. Empty list if no rows remain.
- Return type:¶
Example
Batch processing large result set:
conn.execute("SELECT * FROM large_table") batch_size = 100 while True: batch = conn.fetchmany(batch_size) if not batch: break process_batch(batch) print(f"Processed {len(batch)} rows")Note
Efficient for iterating large datasets without loading everything into memory.
See also
fetchone(): Fetch single rowfetchall(): Fetch all rows
- fetchone() dict[str, Any] | None[source]¶
Fetch the next row from the last executed query.
- Returns:¶
- Row as dictionary with column names as keys,
or None if no more rows available.
- Return type:¶
Example
Fetching single row:
conn.execute("SELECT * FROM users WHERE id = :id", {"id": 1}) user = conn.fetchone() if user: print(f"Found user: {user['name']}") else: print("User not found")Note
Call after
execute(). Returns None when cursor exhausted.See also
fetchall(): Fetch all remaining rowsfetchmany(): Fetch specific number of rows
- class structum_lab.database.TransactionInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for database transactions providing ACID guarantees.
Transactions group multiple database operations into a single atomic unit. Either all operations succeed (commit) or all fail (rollback).
Example
Manual transaction management:
tx = db.begin_transaction() try: conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1") conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2") tx.commit() except Exception: tx.rollback() raisePreferred context manager approach:
# Automatic commit/rollback with db.transaction() as conn: conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1") conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2") # Commits automatically if no exceptionNote
Most applications should use
DatabaseInterface.transaction()context manager instead of managing transactions manually.See also
DatabaseInterface.transaction(): Recommended transaction API- commit() None[source]¶
Commit the current transaction.
Makes all changes since transaction start permanent in the database. After commit, the transaction is no longer active.
- Raises:¶
DatabaseError – If commit fails (e.g., constraint violation).
TransactionError – If transaction is not active.
Example
Manual commit:
tx = db.begin_transaction() try: conn.execute("INSERT INTO logs VALUES (:msg)", {"msg": "test"}) tx.commit() log.info("Transaction committed successfully") except DatabaseError as e: tx.rollback() log.error("Commit failed", exc_info=True)Warning
After commit, the transaction cannot be reused. Start a new transaction for additional operations.
See also
rollback(): Abort transaction
- property is_active : bool¶
Check if the transaction is still active.
- Returns:¶
- True if transaction can accept operations, False if
already committed or rolled back.
- Return type:¶
Example
Checking transaction state:
tx = db.begin_transaction() assert tx.is_active # True tx.commit() assert not tx.is_active # False # This would raise TransactionError # tx.execute("SELECT 1")Note
Useful for conditional logic and error handling.
- rollback() None[source]¶
Rollback the current transaction.
Discards all changes made since transaction start. The database state returns to what it was before the transaction began.
- Raises:¶
TransactionError – If transaction is not active.
Example
Manual rollback on error:
tx = db.begin_transaction() try: conn.execute("DELETE FROM important_data") # Validation fails if not validate_deletion(): tx.rollback() log.warning("Deletion rolled back - validation failed") return tx.commit() except Exception: tx.rollback() raiseNote
Rollback is safe to call multiple times. Subsequent calls are no-ops.
See also
commit(): Persist transaction changes
-
class structum_lab.database.HealthCheckResult(status: HealthStatus, message: str, latency_ms: float | None =
None, details: dict[str, Any] | None =None)[source]¶ Bases:
objectData class representing database health check results.
- details¶
Additional diagnostic information (e.g., active connections, pool statistics).
Example
Creating and using a health check result:
result = HealthCheckResult( status=HealthStatus.HEALTHY, message="Database connection OK", latency_ms=2.5, details={"active_connections": 5, "pool_size": 10} ) # Use in monitoring/alerts if result.latency_ms and result.latency_ms > 100: alert("High database latency", result.latency_ms)Note
This class is frozen (immutable) to ensure health check results cannot be modified after creation.
-
__init__(status: HealthStatus, message: str, latency_ms: float | None =
None, details: dict[str, Any] | None =None)¶
- status : HealthStatus¶
- class structum_lab.database.HealthStatus(*values)[source]¶
Bases:
EnumEnumeration of possible database health states.
- HEALTHY¶
Database is fully operational with normal latency.
- DEGRADED¶
Database is responsive but experiencing issues (e.g., high latency, connection warnings).
- UNHEALTHY¶
Database is unreachable or critically impaired.
Example
Checking health status:
result = db.health_check() if result.status == HealthStatus.HEALTHY: log.info("Database OK", latency_ms=result.latency_ms) elif result.status == HealthStatus.DEGRADED: log.warning("Database degraded", message=result.message) else: log.error("Database unhealthy", message=result.message)-
HEALTHY =
'healthy'¶
-
DEGRADED =
'degraded'¶
-
UNHEALTHY =
'unhealthy'¶
- structum_lab.database.Database¶
alias of
DatabaseInterface
- structum_lab.database.Connection¶
alias of
ConnectionInterface
- structum_lab.database.Transaction¶
alias of
TransactionInterface
Core database interfaces for Structum Lab.
This module defines the Protocols that any database implementation must follow. Implementations are provided by the structum-database plugin.
Example
>>> from structum_lab.plugins.database import SQLAlchemyDatabase
>>> db = SQLAlchemyDatabase.from_config()
>>> with db.transaction() as conn:
... conn.execute("SELECT 1")
- class structum_lab.database.interfaces.HealthStatus(*values)[source]¶
Bases:
EnumEnumeration of possible database health states.
- HEALTHY¶
Database is fully operational with normal latency.
- DEGRADED¶
Database is responsive but experiencing issues (e.g., high latency, connection warnings).
- UNHEALTHY¶
Database is unreachable or critically impaired.
Example
Checking health status:
result = db.health_check() if result.status == HealthStatus.HEALTHY: log.info("Database OK", latency_ms=result.latency_ms) elif result.status == HealthStatus.DEGRADED: log.warning("Database degraded", message=result.message) else: log.error("Database unhealthy", message=result.message)-
HEALTHY =
'healthy'¶
-
DEGRADED =
'degraded'¶
-
UNHEALTHY =
'unhealthy'¶
-
class structum_lab.database.interfaces.HealthCheckResult(status: HealthStatus, message: str, latency_ms: float | None =
None, details: dict[str, Any] | None =None)[source]¶ Bases:
objectData class representing database health check results.
- details¶
Additional diagnostic information (e.g., active connections, pool statistics).
Example
Creating and using a health check result:
result = HealthCheckResult( status=HealthStatus.HEALTHY, message="Database connection OK", latency_ms=2.5, details={"active_connections": 5, "pool_size": 10} ) # Use in monitoring/alerts if result.latency_ms and result.latency_ms > 100: alert("High database latency", result.latency_ms)Note
This class is frozen (immutable) to ensure health check results cannot be modified after creation.
- status : HealthStatus¶
- class structum_lab.database.interfaces.ConnectionInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for database connections in Structum Lab.
A connection represents an active link to the database that can execute queries and retrieve results. Connections are typically obtained from a connection pool and should be returned after use.
- Implementations:
Example
Using a connection within a transaction:
with db.transaction() as conn: # Execute query with named parameters conn.execute( "INSERT INTO users (name, email) VALUES (:name, :email)", {"name": "John", "email": "john@example.com"} ) # Fetch results conn.execute("SELECT * FROM users WHERE id > :id", {"id": 100}) users = conn.fetchall() for user in users: print(f"{user['name']} - {user['email']}")Note
Connections are not thread-safe. Use one connection per thread or protect access with locks.
See also
DatabaseInterface: Database manager providing connectionsDatabaseInterface.transaction(): Recommended way to get connections-
execute(query: str, params: dict[str, Any] | tuple[Any, ...] | None =
None) Any[source]¶ Execute a SQL query with optional parameters.
Supports both named parameters (dict) and positional parameters (tuple). Use
:namesyntax for named parameters,?for positional.- Parameters:¶
- Returns:¶
- Result object (implementation-specific). Use fetch methods
to retrieve rows.
- Return type:¶
Any
- Raises:¶
DatabaseError – If query execution fails.
ParameterError – If parameters don’t match query placeholders.
Example
Different parameter styles:
# Named parameters (recommended) conn.execute( "SELECT * FROM users WHERE age > :min_age AND city = :city", {"min_age": 18, "city": "NYC"} ) # Positional parameters conn.execute( "SELECT * FROM users WHERE age > ? AND city = ?", (18, "NYC") ) # No parameters conn.execute("SELECT COUNT(*) FROM users")Warning
Always use parameterized queries. Never use string interpolation for user input (SQL injection risk).
See also
fetchone(): Retrieve single rowfetchall(): Retrieve all rows
- fetchone() dict[str, Any] | None[source]¶
Fetch the next row from the last executed query.
- Returns:¶
- Row as dictionary with column names as keys,
or None if no more rows available.
- Return type:¶
Example
Fetching single row:
conn.execute("SELECT * FROM users WHERE id = :id", {"id": 1}) user = conn.fetchone() if user: print(f"Found user: {user['name']}") else: print("User not found")Note
Call after
execute(). Returns None when cursor exhausted.See also
fetchall(): Fetch all remaining rowsfetchmany(): Fetch specific number of rows
- fetchall() list[dict[str, Any]][source]¶
Fetch all remaining rows from the last executed query.
Example
Fetching multiple rows:
conn.execute("SELECT * FROM users WHERE active = :active", {"active": True}) users = conn.fetchall() for user in users: print(f"{user['id']}: {user['name']}") print(f"Total active users: {len(users)}")Warning
For large result sets, consider using
fetchmany()to avoid loading all rows into memory at once.See also
fetchone(): Fetch single rowfetchmany(): Fetch rows in batches
- fetchmany(size: int) list[dict[str, Any]][source]¶
Fetch up to
sizerows from the last executed query.Useful for processing large result sets in batches to manage memory usage.
- Parameters:¶
- Returns:¶
- List of rows (up to size). May return fewer
rows if result set is exhausted. Empty list if no rows remain.
- Return type:¶
Example
Batch processing large result set:
conn.execute("SELECT * FROM large_table") batch_size = 100 while True: batch = conn.fetchmany(batch_size) if not batch: break process_batch(batch) print(f"Processed {len(batch)} rows")Note
Efficient for iterating large datasets without loading everything into memory.
See also
fetchone(): Fetch single rowfetchall(): Fetch all rows
- class structum_lab.database.interfaces.TransactionInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for database transactions providing ACID guarantees.
Transactions group multiple database operations into a single atomic unit. Either all operations succeed (commit) or all fail (rollback).
Example
Manual transaction management:
tx = db.begin_transaction() try: conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1") conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2") tx.commit() except Exception: tx.rollback() raisePreferred context manager approach:
# Automatic commit/rollback with db.transaction() as conn: conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1") conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2") # Commits automatically if no exceptionNote
Most applications should use
DatabaseInterface.transaction()context manager instead of managing transactions manually.See also
DatabaseInterface.transaction(): Recommended transaction API- commit() None[source]¶
Commit the current transaction.
Makes all changes since transaction start permanent in the database. After commit, the transaction is no longer active.
- Raises:¶
DatabaseError – If commit fails (e.g., constraint violation).
TransactionError – If transaction is not active.
Example
Manual commit:
tx = db.begin_transaction() try: conn.execute("INSERT INTO logs VALUES (:msg)", {"msg": "test"}) tx.commit() log.info("Transaction committed successfully") except DatabaseError as e: tx.rollback() log.error("Commit failed", exc_info=True)Warning
After commit, the transaction cannot be reused. Start a new transaction for additional operations.
See also
rollback(): Abort transaction
- rollback() None[source]¶
Rollback the current transaction.
Discards all changes made since transaction start. The database state returns to what it was before the transaction began.
- Raises:¶
TransactionError – If transaction is not active.
Example
Manual rollback on error:
tx = db.begin_transaction() try: conn.execute("DELETE FROM important_data") # Validation fails if not validate_deletion(): tx.rollback() log.warning("Deletion rolled back - validation failed") return tx.commit() except Exception: tx.rollback() raiseNote
Rollback is safe to call multiple times. Subsequent calls are no-ops.
See also
commit(): Persist transaction changes
- property is_active : bool¶
Check if the transaction is still active.
- Returns:¶
- True if transaction can accept operations, False if
already committed or rolled back.
- Return type:¶
Example
Checking transaction state:
tx = db.begin_transaction() assert tx.is_active # True tx.commit() assert not tx.is_active # False # This would raise TransactionError # tx.execute("SELECT 1")Note
Useful for conditional logic and error handling.
- class structum_lab.database.interfaces.DatabaseInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for database managers in Structum Lab.
This is the main entry point for all database operations. Implementations must provide connection pooling, transaction management, and health monitoring.
The protocol abstracts away database-specific details, allowing applications to work with different backends (PostgreSQL, MySQL, SQLite) through a unified interface.
- Implementations:
Example
Basic usage with configuration::
from structum_lab.plugins.database import SQLAlchemyDatabase
# Initialize from config db = SQLAlchemyDatabase.from_config()
# Or with explicit URL db = SQLAlchemyDatabase(url=”postgresql://user:pass@localhost/mydb”)
# Use transaction context manager (recommended) with db.transaction() as conn:
- conn.execute(
“INSERT INTO users (name, email) VALUES (:name, :email)”, {“name”: “John”, “email”: “john@example.com”}
)
user_id = conn.execute(“SELECT last_insert_id()”).fetchone()[“id”]
- conn.execute(
“INSERT INTO profiles (user_id, bio) VALUES (:uid, :bio)”, {“uid”: user_id, “bio”: “Software engineer”}
) # Commits automatically on success
# Check health health = db.health_check() if health.status != HealthStatus.HEALTHY:
log.warning(“Database issues detected”, result=health)
Note
Always use the
transaction()context manager for database operations. It ensures proper connection pooling, automatic commit/rollback, and resource cleanup.See also
ConnectionInterface: Connection protocolTransactionInterface: Transaction protocolHealthCheckResult: Health check result data class- property url : str¶
Get the database connection URL (sanitized).
- Returns:¶
- Database URL with password redacted/masked for security.
Example:
"postgresql://user:***@localhost:5432/mydb"
- Return type:¶
Example
Logging database configuration:
log.info("Connected to database", url=db.url) # Logs: postgresql://user:***@localhost/mydbNote
Passwords are automatically redacted to prevent accidental logging of credentials.
- property is_connected : bool¶
Check if the database connection pool is active.
Example
Conditional connection:
if not db.is_connected: db.connect() log.info("Database connection established")Note
Most operations call
connect()automatically if not connected. Explicit checking is mainly useful for health checks and diagnostics.
- connect() None[source]¶
Establish the database connection pool.
Creates and initializes the connection pool. Called automatically on first database operation if not already connected.
- Raises:¶
ConnectionError – If unable to connect to database.
ConfigurationError – If database URL or settings are invalid.
Example
Explicit connection during startup:
db = SQLAlchemyDatabase.from_config() try: db.connect() log.info("Database pool created", url=db.url) except ConnectionError as e: log.critical("Cannot connect to database", exc_info=True) sys.exit(1)Note
Usually not needed - the database connects automatically on first use. Explicit connection is useful for fail-fast behavior during startup.
See also
close(): Shutdown connection pool
- close() None[source]¶
Close all connections in the pool and release resources.
Should be called during application shutdown to ensure clean termination. Any ongoing transactions should be completed before closing.
- Raises:¶
RuntimeError – If called while transactions are active (implementation-specific).
Example
Application shutdown:
import atexit db = SQLAlchemyDatabase.from_config () atexit.register(db.close) # Ensure cleanup on exit # Or in FastAPI lifespan @asynccontextmanager async def lifespan(app: FastAPI): db.connect() yield db.close() # Clean shutdownWarning
After calling close(), the database instance should not be reused. Create a new instance for additional operations.
See also
connect(): Initialize connection pool
- get_connection() ConnectionInterface[source]¶
Acquire a connection from the pool.
Returns a connection that must be explicitly returned to the pool after use. Prefer using
transaction()instead.- Returns:¶
A database connection from the pool.
- Return type:¶
- Raises:¶
PoolExhaustedError – If no connections are available and pool is at maximum.
ConnectionError – If pool is not initialized.
Example
Manual connection management (not recommended):
conn = db.get_connection() try: result = conn.execute("SELECT * FROM users") users = result.fetchall() finally: # Must return connection to pool manually conn.close()Warning
Manual connection management is error-prone. Always prefer
transaction()context manager which handles connections automatically.See also
transaction(): Recommended connection API
- transaction() Iterator[ConnectionInterface][source]¶
Context manager for database transactions.
Provides automatic transaction management: commits on success, rolls back on exception. This is the recommended way to perform database operations.
- Yields:¶
ConnectionInterface – Database connection with active transaction.
- Raises:¶
DatabaseError – If transaction fails to start.
ConnectionError – If pool is exhausted or not connected.
Example
Recommended usage pattern:
# Single transaction with db.transaction() as conn: conn.execute( "UPDATE accounts SET balance = balance - :amount WHERE id = :id", {"amount": 100, "id": 1} ) conn.execute( "UPDATE accounts SET balance = balance + :amount WHERE id = :id", {"amount": 100, "id": 2} ) # Commits automatically if no exception # Exception triggers rollback try: with db.transaction() as conn: conn.execute("DELETE FROM important_data") raise ValueError("Validation failed") except ValueError: pass # Transaction automatically rolled backNote
Transactions are isolated - changes are not visible to other connections until commit. Isolation level depends on database implementation (usually READ COMMITTED).
See also
TransactionInterface: Transaction protocolConnectionInterface: Connection protocol
- health_check() HealthCheckResult[source]¶
Check database connectivity and health status.
Performs a simple query to verify the database is responsive and measures latency. Useful for readiness probes and monitoring.
Example
Health check endpoint:
@app.get("/health/database") def database_health(): result = db.health_check() if result.status == HealthStatus.UNHEALTHY: raise HTTPException(503, detail=result.message) return { "status": result.status.value, "latency_ms": result.latency_ms, "message": result.message }Prometheus metrics:
result = db.health_check() # Record health status db_health_gauge.labels(database="main").set( 1 if result.status == HealthStatus.HEALTHY else 0 ) # Record latency if result.latency_ms: db_latency_histogram.observe(result.latency_ms)Note
Health checks execute a lightweight query (usually SELECT 1). They should complete quickly (<100ms typically).
See also
HealthCheckResult: Result data classHealthStatus: Health status enumeration
- structum_lab.database.interfaces.Database¶
alias of
DatabaseInterface
- structum_lab.database.interfaces.Connection¶
alias of
ConnectionInterface
- structum_lab.database.interfaces.Transaction¶
alias of
TransactionInterface
Authentication¶
Authentication Interfaces¶
Authentication interfaces for Structum Lab.
This module provides Protocol definitions for authentication operations. For concrete implementations, install structum-auth:
pip install structum-auth
- Usage:
>>> from structum_lab.plugins.auth import JWTAuthProvider >>> auth = JWTAuthProvider.from_config()
- class structum_lab.auth.AuthInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for authentication providers in Structum Lab.
This is the main entry point for all authentication operations. Implementations provide JWT-based authentication, password hashing, and token management.
The auth system is storage-agnostic - it doesn’t manage users directly. Instead, it uses
UserRepositoryInterfaceto fetch user data, keeping authentication logic decoupled from storage.- Implementations:
JWTAuthProvider(recommended)
Example
See usage in specific implementations like JWTAuthProvider.
- authenticate(username: str, password: str, user_repo: UserRepositoryInterface) TokenPair | None[source]¶
Authenticate a user.
- refresh(refresh_token: str, user_repo: UserRepositoryInterface) TokenPair | None[source]¶
Refresh access token.
- class structum_lab.auth.UserInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for authenticated user entities in Structum Lab.
Applications must implement this protocol for their User model to integrate with the authentication system. The auth system never creates or modifies users - it only queries them via
UserRepositoryInterface.Example
Implementing UserInterface:
from dataclasses import dataclass @dataclass class User: id: str username: str hashed_password: str roles: list[str] permissions: set[str] def has_permission(self, permission: str) -> bool: # Check role-based permissions for role in self.roles: if permission in ROLE_PERMISSIONS.get(role, set()): return True # Check user-specific permissions return permission in self.permissionsUsing with auth:
user = user_repo.find_by_username("john") if user and user.has_permission("users:write"): # Allow operation passNote
This is a Protocol, not a base class. Your User model doesn’t need to inherit from this - just implement the required properties and methods.
See also
UserRepositoryInterface: Repository for user data accessAuthInterface: Authentication provider using users- has_permission(permission: str) bool[source]¶
Check if user has a specific permission.
Example
Permission-based authorization:
@app.delete("/users/{user_id}") async def delete_user(user_id: str, current_user: User = Depends(get_current_user)): if not current_user.has_permission("users:delete"): raise HTTPException(403, "Permission denied") # Delete userRole-based permission mapping:
ROLE_PERMISSIONS = { "admin": {"users:read", "users:write", "users:delete"}, "user": {"users:read"}, } def has_permission(self, permission: str) -> bool: return any( permission in ROLE_PERMISSIONS.get(role, set()) for role in self.roles )Note
Permission format is application-defined. Use a consistent naming scheme (e.g., resource:action).
- property hashed_password : str¶
User’s hashed password.
Warning
Never store or transmit plain-text passwords. This property should only return hashed values.
Example
Password verification:
if auth.verify_password(input_password, user.hashed_password): # Password matches passSee also
PasswordHasherInterface.hash(): Hash passwordsPasswordHasherInterface.verify(): Verify passwords
- property id : str¶
Unique identifier for the user.
Example
User ID in token payload:
token_payload = {"user_id": user.id, "exp": ...}
- property roles : list[str]¶
List of roles assigned to the user.
Example
Role-based access control:
if "admin" in user.roles: # Allow admin operation passNote
Roles should be lowercase strings. Use
has_permission()for fine-grained permission checks.
- class structum_lab.auth.UserRepositoryInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for user storage and retrieval in Structum Lab.
Applications implement this to connect the auth system with their database or user storage backend. The auth plugin does NOT manage user storage directly - it delegates all user operations to this repository.
This separation ensures the auth system remains storage-agnostic and can work with any database (PostgreSQL, MongoDB, etc.) or user service (LDAP, OAuth providers).
- Implementations:
Database-backed repository (SQLAlchemy, etc.)
External user service adapter (LDAP, Active Directory)
In-memory repository (testing only)
Example
SQLAlchemy repository implementation:
class SQLAlchemyUserRepository: def __init__(self, db: DatabaseInterface): self.db = db def find_by_username(self, username: str) -> User | None: with self.db.transaction() as conn: conn.execute( "SELECT * FROM users WHERE username = :username", {"username": username} ) row = conn.fetchone() if not row: return None return User( id=row["id"], username=row["username"], hashed_password=row["password_hash"], roles=row.get("roles", []), ) def find_by_id(self, user_id: str) -> User | None: with self.db.transaction() as conn: conn.execute( "SELECT * FROM users WHERE id = :id", {"id": user_id} ) row = conn.fetchone() return User(**row) if row else NoneUsing with authentication:
user_repo = SQLAlchemyUserRepository(db) auth = JWTAuthProvider.from_config() tokens = auth.authenticate("john", "password123", user_repo) if tokens: print(f"Access token: {tokens.access_token}")Note
Repository is responsible for mapping storage format to
UserInterface. It should handle serialization/deserialization of user data.See also
UserInterface: User entity protocolAuthInterface: Authentication provider using repositories- find_by_id(user_id: str) UserInterface | None[source]¶
Find a user by their unique identifier.
- Parameters:¶
- Returns:¶
User if found, None otherwise.
- Return type:¶
UserInterface | None
Example
Loading user from token:
# After verifying access token payload = auth.verify_access_token(token) if payload: user = user_repo.find_by_id(payload["user_id"]) if user: # User authenticated return userNote
This method is called frequently (on every authenticated request). Consider caching user data for performance.
- find_by_username(username: str) UserInterface | None[source]¶
Find a user by username or email.
- Parameters:¶
- Returns:¶
User if found, None otherwise.
- Return type:¶
UserInterface | None
Example
Looking up user for authentication:
user = user_repo.find_by_username("john@example.com") if user: # Verify password if auth.verify_password(password, user.hashed_password): return auth.create_tokens(user) else: log.warning("Login attempt for unknown user", username=username)Note
Implementation should normalize username (e.g., lowercase) before lookup. Consider using database indexes on username column for performance.
- class structum_lab.auth.PasswordHasherInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for secure password hashing in Structum Lab.
Implementations must use cryptographically secure hashing algorithms (e.g., Argon2, b
- crypt, scrypt). Never use fast hashes like MD5 or SHA-1
for passwords.
- Implementations:
Argon2Hasher(recommended)BcryptHasher
- Example:
Using password hasher:
from structum_lab.plugins.auth.password import Argon2Hasher hasher = Argon2Hasher() # Hash password during registration hashed = hasher.hash("user_password_123") # Store hashed in database: user.hashed_password = hashed # Verify during login if hasher.verify("user_password_123", hashed): # Password matches return create_token(user) else: # Invalid password raise AuthenticationError("Invalid credentials")- Warning:
Never log, display, or store plain-text passwords. Always hash passwords immediately upon receipt.
- See Also:
AuthInterface: Auth provider using password hasherUserInterface: User entity with hashed_password property
- hash(password: str) str[source]¶
Hash a plain-text password securely.
- Parameters:¶
- Returns:¶
- Hashed password string including algorithm identifier and salt.
Format is implementation-specific (e.g., Argon2:
$argon2id$v=19$...).
- Return type:¶
Example
Creating user with hashed password:
# During user registration plain_password = request.form["password"] hashed = auth.hash_password(plain_password) user = User( id=generate_id(), username=request.form["username"], hashed_password=hashed, # Store this roles=["user"] ) user_repo.save(user)Warning
Hashing is intentionally slow (100-500ms) to resist brute-force attacks. Do not hash passwords in tight loops or performance-critical paths.
Note
Each call generates a unique hash (due to random salt) even for the same password. This is expected and secure behavior.
- verify(password: str, hashed: str) bool[source]¶
Verify a plain-text password against a hash.
Example
Password verification during login:
# Get user from database user = user_repo.find_by_username(username) if not user: return None # User not found # Verify password if auth.verify_password(password, user.hashed_password): # Authentication successful return auth.create_tokens(user) else: # Invalid password log.warning("Failed login attempt", username=username) return NoneWarning
Always use constant-time comparison internally to prevent timing attacks. Most modern hashing libraries handle this automatically.
Note
Returns False for invalid/malformed hashes rather than raising exceptions. This prevents information leakage about hash format.
-
class structum_lab.auth.TokenPair(access_token: str, refresh_token: str, token_type: str =
'bearer', expires_at: datetime | None =None)[source]¶ Bases:
objectData class containing JWT access and refresh token pair.
Example
Creating and using token pair:
tokens = TokenPair( access_token="eyJ0eXAiOiJKV1QiLCJhbGc...", refresh_token="eyJ0eXAiOiJKV1QiLCJhbGc...", token_type="bearer", expires_at=datetime.now() + timedelta(hours=1) ) # Use in HTTP Authorization header headers = {"Authorization": f"{tokens.token_type} {tokens.access_token}"}Note
This class is frozen (immutable) to prevent accidental token modification. Tokens should be treated as opaque strings and never parsed by clients.
See also
AuthInterface.authenticate(): Method that returns token pairsAuthInterface.refresh(): Refresh access tokens-
__init__(access_token: str, refresh_token: str, token_type: str =
'bearer', expires_at: datetime | None =None)¶
-
__init__(access_token: str, refresh_token: str, token_type: str =
- structum_lab.auth.Auth¶
alias of
AuthInterface
- structum_lab.auth.User¶
alias of
UserInterface
- structum_lab.auth.UserRepository¶
alias of
UserRepositoryInterface
- structum_lab.auth.PasswordHasher¶
alias of
PasswordHasherInterface
Core authentication interfaces for Structum Lab.
This module defines the Protocols that any auth implementation must follow. Implementations are provided by the structum-auth plugin.
- Key Design Decision:
Auth does NOT manage database/storage. The application implements UserRepositoryInterface to provide user data. This keeps auth decoupled from any specific storage solution.
Example
>>> from structum_lab.plugins.auth import JWTAuthProvider
>>> auth = JWTAuthProvider.from_config()
>>> tokens = auth.authenticate("user", "password", user_repo)
-
class structum_lab.auth.interfaces.TokenPair(access_token: str, refresh_token: str, token_type: str =
'bearer', expires_at: datetime | None =None)[source]¶ Bases:
objectData class containing JWT access and refresh token pair.
Example
Creating and using token pair:
tokens = TokenPair( access_token="eyJ0eXAiOiJKV1QiLCJhbGc...", refresh_token="eyJ0eXAiOiJKV1QiLCJhbGc...", token_type="bearer", expires_at=datetime.now() + timedelta(hours=1) ) # Use in HTTP Authorization header headers = {"Authorization": f"{tokens.token_type} {tokens.access_token}"}Note
This class is frozen (immutable) to prevent accidental token modification. Tokens should be treated as opaque strings and never parsed by clients.
See also
AuthInterface.authenticate(): Method that returns token pairsAuthInterface.refresh(): Refresh access tokens-
__init__(access_token: str, refresh_token: str, token_type: str =
'bearer', expires_at: datetime | None =None)¶
-
__init__(access_token: str, refresh_token: str, token_type: str =
- class structum_lab.auth.interfaces.UserInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for authenticated user entities in Structum Lab.
Applications must implement this protocol for their User model to integrate with the authentication system. The auth system never creates or modifies users - it only queries them via
UserRepositoryInterface.Example
Implementing UserInterface:
from dataclasses import dataclass @dataclass class User: id: str username: str hashed_password: str roles: list[str] permissions: set[str] def has_permission(self, permission: str) -> bool: # Check role-based permissions for role in self.roles: if permission in ROLE_PERMISSIONS.get(role, set()): return True # Check user-specific permissions return permission in self.permissionsUsing with auth:
user = user_repo.find_by_username("john") if user and user.has_permission("users:write"): # Allow operation passNote
This is a Protocol, not a base class. Your User model doesn’t need to inherit from this - just implement the required properties and methods.
See also
UserRepositoryInterface: Repository for user data accessAuthInterface: Authentication provider using users- property id : str¶
Unique identifier for the user.
Example
User ID in token payload:
token_payload = {"user_id": user.id, "exp": ...}
- property username : str¶
User’s username or email address.
Example
username in login:
user = user_repo.find_by_username(username) if user and auth.verify_password(password, user.hashed_password): return auth.authenticate(...)
- property roles : list[str]¶
List of roles assigned to the user.
Example
Role-based access control:
if "admin" in user.roles: # Allow admin operation passNote
Roles should be lowercase strings. Use
has_permission()for fine-grained permission checks.
- property hashed_password : str¶
User’s hashed password.
Warning
Never store or transmit plain-text passwords. This property should only return hashed values.
Example
Password verification:
if auth.verify_password(input_password, user.hashed_password): # Password matches passSee also
PasswordHasherInterface.hash(): Hash passwordsPasswordHasherInterface.verify(): Verify passwords
- has_permission(permission: str) bool[source]¶
Check if user has a specific permission.
Example
Permission-based authorization:
@app.delete("/users/{user_id}") async def delete_user(user_id: str, current_user: User = Depends(get_current_user)): if not current_user.has_permission("users:delete"): raise HTTPException(403, "Permission denied") # Delete userRole-based permission mapping:
ROLE_PERMISSIONS = { "admin": {"users:read", "users:write", "users:delete"}, "user": {"users:read"}, } def has_permission(self, permission: str) -> bool: return any( permission in ROLE_PERMISSIONS.get(role, set()) for role in self.roles )Note
Permission format is application-defined. Use a consistent naming scheme (e.g., resource:action).
- class structum_lab.auth.interfaces.UserRepositoryInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for user storage and retrieval in Structum Lab.
Applications implement this to connect the auth system with their database or user storage backend. The auth plugin does NOT manage user storage directly - it delegates all user operations to this repository.
This separation ensures the auth system remains storage-agnostic and can work with any database (PostgreSQL, MongoDB, etc.) or user service (LDAP, OAuth providers).
- Implementations:
Database-backed repository (SQLAlchemy, etc.)
External user service adapter (LDAP, Active Directory)
In-memory repository (testing only)
Example
SQLAlchemy repository implementation:
class SQLAlchemyUserRepository: def __init__(self, db: DatabaseInterface): self.db = db def find_by_username(self, username: str) -> User | None: with self.db.transaction() as conn: conn.execute( "SELECT * FROM users WHERE username = :username", {"username": username} ) row = conn.fetchone() if not row: return None return User( id=row["id"], username=row["username"], hashed_password=row["password_hash"], roles=row.get("roles", []), ) def find_by_id(self, user_id: str) -> User | None: with self.db.transaction() as conn: conn.execute( "SELECT * FROM users WHERE id = :id", {"id": user_id} ) row = conn.fetchone() return User(**row) if row else NoneUsing with authentication:
user_repo = SQLAlchemyUserRepository(db) auth = JWTAuthProvider.from_config() tokens = auth.authenticate("john", "password123", user_repo) if tokens: print(f"Access token: {tokens.access_token}")Note
Repository is responsible for mapping storage format to
UserInterface. It should handle serialization/deserialization of user data.See also
UserInterface: User entity protocolAuthInterface: Authentication provider using repositories- find_by_username(username: str) UserInterface | None[source]¶
Find a user by username or email.
- Parameters:¶
- Returns:¶
User if found, None otherwise.
- Return type:¶
UserInterface | None
Example
Looking up user for authentication:
user = user_repo.find_by_username("john@example.com") if user: # Verify password if auth.verify_password(password, user.hashed_password): return auth.create_tokens(user) else: log.warning("Login attempt for unknown user", username=username)Note
Implementation should normalize username (e.g., lowercase) before lookup. Consider using database indexes on username column for performance.
- find_by_id(user_id: str) UserInterface | None[source]¶
Find a user by their unique identifier.
- Parameters:¶
- Returns:¶
User if found, None otherwise.
- Return type:¶
UserInterface | None
Example
Loading user from token:
# After verifying access token payload = auth.verify_access_token(token) if payload: user = user_repo.find_by_id(payload["user_id"]) if user: # User authenticated return userNote
This method is called frequently (on every authenticated request). Consider caching user data for performance.
- class structum_lab.auth.interfaces.PasswordHasherInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for secure password hashing in Structum Lab.
Implementations must use cryptographically secure hashing algorithms (e.g., Argon2, b
- crypt, scrypt). Never use fast hashes like MD5 or SHA-1
for passwords.
- Implementations:
Argon2Hasher(recommended)BcryptHasher
- Example:
Using password hasher:
from structum_lab.plugins.auth.password import Argon2Hasher hasher = Argon2Hasher() # Hash password during registration hashed = hasher.hash("user_password_123") # Store hashed in database: user.hashed_password = hashed # Verify during login if hasher.verify("user_password_123", hashed): # Password matches return create_token(user) else: # Invalid password raise AuthenticationError("Invalid credentials")- Warning:
Never log, display, or store plain-text passwords. Always hash passwords immediately upon receipt.
- See Also:
AuthInterface: Auth provider using password hasherUserInterface: User entity with hashed_password property
- hash(password: str) str[source]¶
Hash a plain-text password securely.
- Parameters:¶
- Returns:¶
- Hashed password string including algorithm identifier and salt.
Format is implementation-specific (e.g., Argon2:
$argon2id$v=19$...).
- Return type:¶
Example
Creating user with hashed password:
# During user registration plain_password = request.form["password"] hashed = auth.hash_password(plain_password) user = User( id=generate_id(), username=request.form["username"], hashed_password=hashed, # Store this roles=["user"] ) user_repo.save(user)Warning
Hashing is intentionally slow (100-500ms) to resist brute-force attacks. Do not hash passwords in tight loops or performance-critical paths.
Note
Each call generates a unique hash (due to random salt) even for the same password. This is expected and secure behavior.
- verify(password: str, hashed: str) bool[source]¶
Verify a plain-text password against a hash.
Example
Password verification during login:
# Get user from database user = user_repo.find_by_username(username) if not user: return None # User not found # Verify password if auth.verify_password(password, user.hashed_password): # Authentication successful return auth.create_tokens(user) else: # Invalid password log.warning("Failed login attempt", username=username) return NoneWarning
Always use constant-time comparison internally to prevent timing attacks. Most modern hashing libraries handle this automatically.
Note
Returns False for invalid/malformed hashes rather than raising exceptions. This prevents information leakage about hash format.
- class structum_lab.auth.interfaces.AuthInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for authentication providers in Structum Lab.
This is the main entry point for all authentication operations. Implementations provide JWT-based authentication, password hashing, and token management.
The auth system is storage-agnostic - it doesn’t manage users directly. Instead, it uses
UserRepositoryInterfaceto fetch user data, keeping authentication logic decoupled from storage.- Implementations:
JWTAuthProvider(recommended)
Example
See usage in specific implementations like JWTAuthProvider.
- authenticate(username: str, password: str, user_repo: UserRepositoryInterface) TokenPair | None[source]¶
Authenticate a user.
- refresh(refresh_token: str, user_repo: UserRepositoryInterface) TokenPair | None[source]¶
Refresh access token.
- structum_lab.auth.interfaces.User¶
alias of
UserInterface
- structum_lab.auth.interfaces.UserRepository¶
alias of
UserRepositoryInterface
- structum_lab.auth.interfaces.Auth¶
alias of
AuthInterface
- structum_lab.auth.interfaces.PasswordHasher¶
alias of
PasswordHasherInterface
Validation¶
Core Validation Interfaces.
This module defines the protocols for system validation. Plugins can implement Validator to participate in the bootstrap process without depending directly on the structum-bootstrap package.
- class structum_lab.validation.ValidationContext(*args, **kwargs)[source]¶
Bases:
ProtocolContext object passed to validators to collect results.
- class structum_lab.validation.Validator(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for any component that can perform validation.
- validate(context: ValidationContext) None[source]¶
Execute validation logic and update the context.
- Parameters:¶
- context: ValidationContext¶
The bootstrap context to record success/failure/warnings.