Core Module API Reference

This section documents the core interfaces and protocols that define Structum Lab’s architecture.

Structum Lab: Structum Core

Documentation Python 3.11+ License: Apache-2.0

Structum Core è il componente fondante di Structum Lab che definisce i protocolli, le interfacce base e le utility condivise.

🚀 Installazione

pip install structum-lab

📚 Documentazione Completa

La documentazione completa, incluse le guide API e gli esempi di configurazione, è disponibile su: 👉 https://structum-lab.pages.dev/

✨ Funzionalità Principali

  • Protocolli Base: Definisce le interfacce per Configurazione, Logging e altri servizi core.

  • Utility Condivise: Strumenti comuni per lo sviluppo di plugin e applicazioni.

  • Struttura Modulare: Supporta l’architettura a plugin del framework.

Integrazione nativa con l’ecosistema Structum.

Core Package

Configuration

Configuration Interface

Sottosistema di configurazione di Structum.

Questo modulo espone l’API pubblica ufficiale per l’accesso e la gestione della configurazione. Tutto il codice applicativo e i plugin devono interagire esclusivamente tramite queste funzioni.

Esempio d’uso:

from structum.config import get_config

cfg = get_config() cfg.set(“app.mode”, “production”)

structum_lab.config.ConfigInterface

alias of ConfigProviderInterface

structum_lab.config.get_config() ConfigProviderInterface

Restituisce il provider di configurazione globale (Caricamento Lazy e Dinamico).

Se nessun provider è stato registrato, viene istanziato automaticamente il provider JSON di fallback basato sulla standard library.

Returns:

Un’implementazione di ConfigInterface.

structum_lab.config.set_config_provider(provider: ConfigProviderInterface | type[ConfigProviderInterface]) None[source]

Registra un provider di configurazione personalizzato.

Questa funzione viene tipicamente chiamata da un plugin in fase di inizializzazione. Il provider registrato sostituisce globalmente quello precedente.

Parameters:
provider: ConfigProviderInterface | type[ConfigProviderInterface]

Istanza o classe di un provider compatibile con ConfigInterface.

class structum_lab.config.JSONConfigProvider[source]

Bases: object

Provider di configurazione basato su file JSON.

Caratteristiche: - Persistenza automatica su filesystem - Supporto a chiavi annidate tramite dot notation - Nessuna dipendenza esterna

__init__()[source]
get(key: str, default: Any = None) Any[source]

Restituisce il valore associato a una chiave (dot notation supportata).

has(key: str) bool[source]

Verifica l’esistenza di una chiave di configurazione.

reload() None[source]

Ricarica la configurazione dal file JSON gestendo errori di corruzione.

save() None[source]

Scrive la configurazione corrente su file JSON.

set(key: str, value: Any) None[source]

Imposta una chiave di configurazione e salva immediatamente su disco.

Interfaccia del sistema di configurazione di Structum.

Questo modulo definisce il contratto formale che tutti i provider di configurazione devono rispettare. Il core di Structum dipende esclusivamente da questa interfaccia e non da implementazioni concrete.

L’architettura è pensata per essere estendibile tramite plugin: provider avanzati (Dynaconf, database, servizi remoti, ecc.) possono essere registrati in fase di bootstrap senza modificare il core.

class structum_lab.config.interface.ConfigProviderInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol defining the interface for configuration providers.

This interface uses typing.Protocol to enable duck typing: explicit inheritance is not required. Any object implementing these methods with compatible signatures is considered a valid provider.

This approach maximizes flexibility and reduces coupling between core and plugins.

Implementations:

Example

Using a configuration provider:

from structum_lab.config import get_config_provider

config = get_config_provider()

# Get value with fallback
db_host = config.get("database.host", default="localhost")

# Set value
config.set("database.port", 5432)

# Check existence
if config.has("database.password"):
    password = config.get("database.password")

# Persist changes
config.save()

Note

All providers should be thread-safe and support hierarchical key access using dot-notation (e.g., “database.pool.size”).

See also

get_config_provider(): Retrieve the global configuration provider set_config_provider(): Register a custom provider

get(key: str, default: Any = None) Any[source]

Retrieve a configuration value by key.

Supports dot-notation for nested configuration values (e.g., “database.pool.size” accesses nested dictionaries).

Parameters:
key : str

Configuration key to retrieve. Supports dot-notation for hierarchical access.

default : Any, optional

Fallback value if key doesn’t exist. Defaults to None.

Returns:

The configuration value, or default if key not found.

Return type:

Any

Raises:

KeyError – If key not found and default not provided (implementation-specific behavior).

Example

Retrieving nested configuration:

# Config: {"database": {"host": "localhost", "port": 5432}}
host = config.get("database.host")  # "localhost"
port = config.get("database.port")  # 5432

# With fallback
timeout = config.get("database.timeout", 30)  # 30

See also

set(): Set a configuration value has(): Check if a key exists

set(key: str, value: Any) None[source]

Set a configuration value.

Persistence behavior depends on the concrete provider implementation. Changes may be in-memory only until save() is called.

Parameters:
key : str

Configuration key to set. Supports dot-notation to create nested structures.

value : Any

Value to associate with the key. Can be any JSON-serializable type.

Example

Setting configuration values:

# Simple value
config.set("app.debug", True)

# Nested structure (creates intermediate dicts)
config.set("database.pool.size", 10)

# Complex value
config.set("servers", ["srv1", "srv2", "srv3"])

Warning

Changes are not persisted to storage until save() is called (for file-based providers). In-memory providers lose changes on restart.

See also

get(): Retrieve a configuration value save(): Persist changes to storage

has(key: str) bool[source]

Check if a configuration key exists.

Parameters:
key : str

Configuration key to check. Supports dot-notation.

Returns:

True if the key exists, False otherwise.

Return type:

bool

Example

Checking key existence:

if config.has("database.password"):
    password = config.get("database.password")
else:
    raise ValueError("Database password not configured")

Note

A key can exist with a None value. Use get() to distinguish between missing keys and None values.

save() None[source]

Persist configuration changes to underlying storage.

For file-based providers, writes changes to disk. For remote providers, may trigger a commit or synchronization. For in-memory providers, this may be a no-op.

Raises:
  • IOError – If unable to write to storage (file permissions, disk full).

  • RuntimeError – If provider doesn’t support persistence.

Example

Saving configuration:

config.set("app.version", "2.0.0")
config.set("app.build", 123)
config.save()  # Persist both changes

Warning

Unsaved changes will be lost on process termination. Call save() periodically for critical configuration updates.

See also

reload(): Discard changes and reload from storage

reload() None[source]

Reload configuration from persistent storage.

Discards all unsaved in-memory changes and reloads the configuration from the underlying storage source.

Raises:

IOError – If unable to read from storage.

Example

Reloading configuration:

config.set("temp.value", 123)  # In-memory change
config.reload()  # Discards temp.value

# Now config reflects disk state
assert not config.has("temp.value")

Warning

All unsaved changes will be permanently lost. Consider calling save() before reload if needed.

See also

save(): Persist changes before reloading

__init__(*args, **kwargs)
structum_lab.config.interface.get_config_provider() ConfigProviderInterface[source]

Restituisce il provider di configurazione globale (Caricamento Lazy e Dinamico).

Se nessun provider è stato registrato, viene istanziato automaticamente il provider JSON di fallback basato sulla standard library.

Returns:

Un’implementazione di ConfigInterface.

structum_lab.config.interface.set_config_provider(provider: ConfigProviderInterface | type[ConfigProviderInterface]) None[source]

Registra un provider di configurazione personalizzato.

Questa funzione viene tipicamente chiamata da un plugin in fase di inizializzazione. Il provider registrato sostituisce globalmente quello precedente.

Parameters:
provider: ConfigProviderInterface | type[ConfigProviderInterface]

Istanza o classe di un provider compatibile con ConfigInterface.

Configuration Manager

Provider di configurazione JSON per Structum.

Questa implementazione rappresenta il fallback minimale basato esclusivamente sulla standard library Python. È pensata per garantire un funzionamento immediato del framework in assenza di plugin avanzati.

Per casi d’uso complessi (multi-source, validazione, override per ambiente) si raccomanda l’uso di provider esterni come Dynaconf.

class structum_lab.config.manager.JSONConfigProvider[source]

Bases: object

Provider di configurazione basato su file JSON.

Caratteristiche: - Persistenza automatica su filesystem - Supporto a chiavi annidate tramite dot notation - Nessuna dipendenza esterna

__init__()[source]
get(key: str, default: Any = None) Any[source]

Restituisce il valore associato a una chiave (dot notation supportata).

set(key: str, value: Any) None[source]

Imposta una chiave di configurazione e salva immediatamente su disco.

has(key: str) bool[source]

Verifica l’esistenza di una chiave di configurazione.

save() None[source]

Scrive la configurazione corrente su file JSON.

reload() None[source]

Ricarica la configurazione dal file JSON gestendo errori di corruzione.

Logging

Logging Interfaces

Logging and Metrics interface for Structum. Provides fallback implementations that can be replaced by plugins.

class structum_lab.logging.StandardLoggerAdapter(logger: Logger)[source]

Bases: object

Fallback implementation that adapts standard logging.Logger to LoggerInterface.

It captures structured arguments (**kwargs) and places them into the ‘extra’ dictionary, creating a rudimentary structured logging experience even without advanced plugins.

__init__(logger: Logger)[source]
debug(message: str, **kwargs: Any) None[source]
info(message: str, **kwargs: Any) None[source]
warning(message: str, **kwargs: Any) None[source]
error(message: str, **kwargs: Any) None[source]
critical(message: str, **kwargs: Any) None[source]
structum_lab.logging.get_logger(name: str) LoggerInterface[source]

Returns a logger instance conforming to LoggerInterface.

By default, this returns a StandardLoggerAdapter wrapping the stdlib logger. Plugins (like structum_observability) should patch this function to return their own implementation (e.g., a structlog BoundLogger).

class structum_lab.logging.MetricsCollectorProtocol(*args, **kwargs)[source]

Bases: Protocol

increment(metric: str, labels: dict[str, str] | None = None) None[source]
observe(metric: str, value: float, labels: dict[str, str] | None = None) None[source]
__init__(*args, **kwargs)
class structum_lab.logging.NullMetrics[source]

Bases: object

Implementazione ‘Null Object’ per le metriche (fallback).

increment(metric: str, labels: dict[str, str] | None = None) None[source]
observe(metric: str, value: float, labels: dict[str, str] | None = None) None[source]
structum_lab.logging.set_metrics_collector(collector: MetricsCollectorProtocol) None[source]

Permette ai plugin di registrare il proprio collettore di metriche.

structum_lab.logging.configure_logging(level: str = 'INFO', format: str = 'json') None[source]

Combines basic logging configuration. Plugins should patch this to provide advanced setup.

structum_lab.logging.get_logger_backend() str[source]

Returns the name of the active logging backend.

structum_lab.logging.set_context(**kwargs: Any) None[source]

Sets global context variables (Fallback: No-op).

structum_lab.logging.clear_context() None[source]

Clears global context variables (Fallback: No-op).

structum_lab.logging.bind_context(**kwargs: Any)[source]

Context manager for temporary context (Fallback: yield).

Logging Interface Definition.

This module defines the LoggerInterface Protocol, which establishes the contract for all loggers in the Structum ecosystem. This ensures that the core framework and plugins can communicate logging intentions without depending on concrete implementations (Dependency Inversion).

class structum_lab.logging.interfaces.LoggerInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for structured logging in Structum Lab.

All logging implementations (stdlib adapter, structlog wrapper, custom loggers) must conform to this interface. It follows the standard Python logger API but explicitly supports **kwargs for structured contextual data.

The protocol uses @runtime_checkable to enable isinstance() checks at runtime, useful for validation and debugging.

Implementations:
  • StructlogAdapter

  • Standard library fallback (internal)

Example

Basic logging with structured context:

from structum_lab.logging import get_logger

log = get_logger(__name__)

# Simple message
log.info("Application started")

# With structured data
log.info("User logged in", user_id=123, ip="192.168.1.1")

# Error with exception
try:
    risky_operation()
except Exception as e:
    log.error("Operation failed", exc_info=True, operation="risky")

Note

All methods accept **kwargs for structured logging. Keys should be valid Python identifiers and values should be JSON-serializable. Structured data enables powerful log aggregation and analysis.

See also

get_logger(): Get a logger instance configure_logging(): Configure logging backend

debug(message: str, **kwargs: Any) None[source]

Log a message at DEBUG level.

Debug-level logs are typically used for detailed diagnostic information useful during development and troubleshooting.

Parameters:
message : str

The log message.

**kwargs : Any

Structured context data (key-value pairs).

Example

Debug logging with context:

log.debug("Cache lookup", key="user:123", hit=True, latency_ms=2.5)
log.debug("SQL query", query="SELECT * FROM users", params={"id": 1})

Note

Debug logs are typically disabled in production for performance.

info(message: str, **kwargs: Any) None[source]

Log a message at INFO level.

Info-level logs track general application flow and significant events (e.g., service started, configuration loaded, request completed).

Parameters:
message : str

The log message.

**kwargs : Any

Structured context data (key-value pairs).

Example

Info logging for application events:

log.info("Server started", host="0.0.0.0", port=8000)
log.info("Request completed", path="/api/users", status=200, duration_ms=45)
log.info("Database connected", driver="postgresql", pool_size=10)

Note

This is the default log level for production environments.

warning(message: str, **kwargs: Any) None[source]

Log a message at WARNING level.

Warning-level logs indicate potentially problematic situations that don’t prevent the application from functioning but deserve attention.

Parameters:
message : str

The log message.

**kwargs : Any

Structured context data (key-value pairs).

Example

Warning logging for recoverable issues:

log.warning("Deprecated API used", endpoint="/old/api", caller="legacy_client")
log.warning("High memory usage", usage_mb=1024, threshold_mb=800)
log.warning("Retry attempt", operation="db_connect", attempt=2, max_attempts=3)

Note

Warnings should be actionable - something that may need fixing but doesn’t require immediate attention.

error(message: str, **kwargs: Any) None[source]

Log a message at ERROR level.

Error-level logs indicate failures that prevent specific operations from completing but don’t crash the application.

Parameters:
message : str

The log message.

**kwargs : Any

Structured context data. Common keys: - exc_info (bool): Include exception traceback if True. - error_code (str): Application-specific error code.

Example

Error logging with exception info:

try:
    process_payment(order_id)
except PaymentError as e:
    log.error(
        "Payment processing failed",
        exc_info=True,
        order_id=order_id,
        error_code="PAYMENT_DECLINED"
    )

Warning

Errors indicate problems requiring investigation. Set up alerts for error rate thresholds in production.

See also

critical(): For application-critical failures

critical(message: str, **kwargs: Any) None[source]

Log a message at CRITICAL level.

Critical-level logs indicate severe errors that may cause the application to abort or enter an unstable state.

Parameters:
message : str

The log message.

**kwargs : Any

Structured context data (key-value pairs).

Example

Critical logging for fatal errors:

log.critical(
    "Database connection pool exhausted",
    active_connections=100,
    max_connections=100,
    pending_requests=50
)

log.critical(
    "Configuration file corrupted",
    path="/etc/app/config.toml",
    exc_info=True
)

Warning

Critical logs should trigger immediate alerts. These indicate situations where the application cannot continue safely.

Note

After logging a critical message, the application may need to shut down gracefully or enter a safe degraded mode.

__init__(*args, **kwargs)

Monitoring

Monitoring Interfaces

Monitoring subsystem for Structum Framework.

Provides a pluggable metrics interface following the same pattern as logging. Core provides the interface, plugins provide implementations (Prometheus, etc).

class structum_lab.monitoring.MetricsInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for metrics emission in Structum Lab.

This interface enables plugins to emit metrics in a backend-agnostic manner. Implementations can target Prometheus, StatsD, Datadog, CloudWatch, or other monitoring systems.

Implementations:
  • PrometheusMetrics

  • StatsDMetrics

  • NoOpMetrics: Fallback no-op implementation

Example

Basic metrics usage:

from structum_lab.monitoring import get_metrics

metrics = get_metrics()

# Counter: track events
metrics.increment("api.requests", tags={"endpoint": "/users", "method": "GET"})
metrics.increment("api.errors", tags={"endpoint": "/users", "status": "500"})

# Gauge: current values
metrics.gauge("cache.size", len(cache), tags={"cache": "redis"})
metrics.gauge("db.connections.active", pool.active_count())

# Timing: operation duration
import time
start = time.time()
process_request()
duration = time.time() - start
metrics.timing("api.duration", duration, tags={"endpoint": "/users"})

# Histogram: value distributions
metrics.histogram("response.size", len(response_body), tags={"endpoint": "/users"})

Note

All metric methods are non-blocking and should not raise exceptions. Failed metric emissions should be logged but not disrupt application flow.

See also

LoggerInterface: Logging interface get_metrics(): Retrieve metrics instance

__init__(*args, **kwargs)
gauge(name: str, value: float, tags: dict[str, str] | None = None) None[source]

Set a gauge metric to a specific value.

Gauges represent current values that can increase or decrease (e.g., memory usage, active connections, queue size). Each call overwrites the previous value.

Parameters:
name : str

Metric name (e.g., memory.usage.bytes).

value : float

Current metric value.

tags : Optional[Dict[str, str]]

Labels for the metric. Defaults to None.

Example

System metrics:

import psutil

# Memory usage
mem = psutil.virtual_memory()
metrics.gauge("system.memory.used", mem.used, tags={"host": hostname})
metrics.gauge("system.memory.percent", mem.percent)

# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
metrics.gauge("system.cpu.percent", cpu_percent)

Application metrics:

# Database connection pool
metrics.gauge("db.pool.active", db.pool.active_connections())
metrics.gauge("db.pool.idle", db.pool.idle_connections())

# Queue size
metrics.gauge("queue.length", len(task_queue), tags={"queue": "background"})

# Cache size
metrics.gauge("cache.entries", cache.size(), tags={"cache": "redis"})

Warning

Gauges represent point-in-time values. For cumulative values, use increment().

Note

  • Emit gauges periodically (e.g., every minute) for accurate monitoring

  • Consider using background jobs for system metric collection

See also

increment(): For cumulative counters

histogram(name: str, value: float, tags: dict[str, str] | None = None) None[source]

Record a value in a histogram.

Histograms track value distributions (e.g., request sizes, payload sizes). Unlike gauges, all values are recorded and aggregated into buckets.

Parameters:
name : str

Metric name (e.g., request.body.size).

value : float

Value to record.

tags : Optional[Dict[str, str]]

Labels for the metric. Defaults to None.

Example

Request/response sizes:

# Request payload size
request_size = len(request.body)
metrics.histogram(
    "http.request.size",
    request_size,
    tags={"endpoint": request.path, "content_type": request.content_type}
)

# Response size
response_size = len(response.body)
metrics.histogram(
    "http.response.size",
    response_size,
    tags={"endpoint": request.path}
)

Batch sizes:

# Processing batch sizes
batch = fetch_batch_from_queue()
metrics.histogram(
    "processing.batch.size",
    len(batch),
    tags={"queue": "tasks"}
)

Query result counts:

rows = db.execute("SELECT * FROM users").fetchall()
metrics.histogram(
    "db.query.rows",
    len(rows),
    tags={"table": "users"}
)

Note

  • Histograms are ideal for analyzing value distributions (percentiles, averages)

  • Use timing() specifically for duration measurements

  • Avoid extremely high-cardinality values (>1000 unique values per second)

See also

timing(): Specialized for duration measurements gauge(): For current point-in-time values

increment(name: str, value: float = 1.0, tags: dict[str, str] | None = None) None[source]

Increment a counter metric.

Counters track cumulative values that only increase (e.g., request count, error count). Use for counting events over time.

Parameters:
name : str

Metric name using dot notation (e.g., api.requests.total).

value : float

Amount to increment by. Defaults to 1.0.

tags : Optional[Dict[str, str]]

Labels/dimensions for the metric (e.g., {"endpoint": "/users", "status": "200"}). Defaults to None.

Example

Tracking API requests:

# Request counter
metrics.increment("http.requests", tags={
    "method": request.method,
    "endpoint": request.path,
    "status": str(response.status_code)
})

# Error counter
try:
    risky_operation()
except Exception:
    metrics.increment("operations.errors", tags={"operation": "risky"})
    raise

Cache statistics:

# Cache hits/misses
if key in cache:
    metrics.increment("cache.hits", tags={"cache": "redis"})
    return cache[key]
else:
    metrics.increment("cache.misses", tags={"cache": "redis"})
    return fetch_from_db(key)

Note

  • Counter values should never decrease

  • Use consistent tag keys across increments for proper aggregation

  • Avoid high-cardinality tags (e.g., user IDs) that create too many series

See also

gauge(): For values that can increase/decrease

timing(name: str, value: float, tags: dict[str, str] | None = None) None[source]

Record a timing/duration metric.

Used for tracking operation latency and performance.

Typically implemented as a histogram with predefined buckets.

Args:

name (str): Metric name (e.g., api.request.duration). value (float): Duration in seconds (use fractional seconds for sub-second precision). tags (Optional[Dict[str, str]]): Labels for the metric. Defaults to None.

Example:

API endpoint timing:

import time

start = time.time()
try:
    result = handle_request(request)
    return result
finally:
    duration = time.time() - start
    metrics.timing(
        "api.request.duration",
        duration,
        tags={"endpoint": request.path, "method": request.method}
    )

Database query timing:

start = time.perf_counter()
rows = db.execute(query)
duration = time.perf_counter() - start

metrics.timing(
    "db.query.duration",
    duration,
    tags={"table": "users", "operation": "select"}
)

Context manager for timing:

from contextlib import contextmanager

@contextmanager
def track_time(operation: str):
    start = time.time()
    try:
        yield
    finally:
        metrics.timing(f"{operation}.duration", time.time() - start)

with track_time("data_processing"):
    process_large_dataset()
Note:
  • Use seconds as the unit for consistency

  • time.perf_counter() is more accurate than time.time() for durations

  • Most backends convert to milliseconds for display

See Also:

histogram(): For general value distributions

class structum_lab.monitoring.NoOpMetrics[source]

Bases: object

Fallback no-op implementation when no monitoring plugin is installed.

gauge(name: str, value: float, tags: dict[str, str] | None = None) None[source]

No-op gauge.

histogram(name: str, value: float, tags: dict[str, str] | None = None) None[source]

No-op histogram.

increment(name: str, value: float = 1.0, tags: dict[str, str] | None = None) None[source]

No-op increment.

timing(name: str, value: float, tags: dict[str, str] | None = None) None[source]

No-op timing.

structum_lab.monitoring.get_metrics(namespace: str = 'structum') MetricsInterface[source]

Get metrics emitter for a namespace.

Parameters:
namespace: str = 'structum'

Metric namespace prefix (e.g., ‘structum.config’)

Returns:

MetricsInterface instance (NoOp by default, or patched by plugin)

Example

>>> metrics = get_metrics("structum.config")
>>> metrics.increment("operations.total", tags={"operation": "get"})
structum_lab.monitoring.set_metrics_backend(backend: MetricsInterface) None[source]

Set the global metrics backend.

This is called by monitoring plugins (e.g., structum_observability) to inject their implementation.

Parameters:
backend: MetricsInterface

MetricsInterface implementation

Monitoring Interfaces for Structum Framework.

Provides protocol definitions for metrics emission following the same pattern as LoggerInterface.

class structum_lab.monitoring.interfaces.MetricsInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for metrics emission in Structum Lab.

This interface enables plugins to emit metrics in a backend-agnostic manner. Implementations can target Prometheus, StatsD, Datadog, CloudWatch, or other monitoring systems.

Implementations:
  • PrometheusMetrics

  • StatsDMetrics

  • NoOpMetrics: Fallback no-op implementation

Example

Basic metrics usage:

from structum_lab.monitoring import get_metrics

metrics = get_metrics()

# Counter: track events
metrics.increment("api.requests", tags={"endpoint": "/users", "method": "GET"})
metrics.increment("api.errors", tags={"endpoint": "/users", "status": "500"})

# Gauge: current values
metrics.gauge("cache.size", len(cache), tags={"cache": "redis"})
metrics.gauge("db.connections.active", pool.active_count())

# Timing: operation duration
import time
start = time.time()
process_request()
duration = time.time() - start
metrics.timing("api.duration", duration, tags={"endpoint": "/users"})

# Histogram: value distributions
metrics.histogram("response.size", len(response_body), tags={"endpoint": "/users"})

Note

All metric methods are non-blocking and should not raise exceptions. Failed metric emissions should be logged but not disrupt application flow.

See also

LoggerInterface: Logging interface get_metrics(): Retrieve metrics instance

increment(name: str, value: float = 1.0, tags: dict[str, str] | None = None) None[source]

Increment a counter metric.

Counters track cumulative values that only increase (e.g., request count, error count). Use for counting events over time.

Parameters:
name : str

Metric name using dot notation (e.g., api.requests.total).

value : float

Amount to increment by. Defaults to 1.0.

tags : Optional[Dict[str, str]]

Labels/dimensions for the metric (e.g., {"endpoint": "/users", "status": "200"}). Defaults to None.

Example

Tracking API requests:

# Request counter
metrics.increment("http.requests", tags={
    "method": request.method,
    "endpoint": request.path,
    "status": str(response.status_code)
})

# Error counter
try:
    risky_operation()
except Exception:
    metrics.increment("operations.errors", tags={"operation": "risky"})
    raise

Cache statistics:

# Cache hits/misses
if key in cache:
    metrics.increment("cache.hits", tags={"cache": "redis"})
    return cache[key]
else:
    metrics.increment("cache.misses", tags={"cache": "redis"})
    return fetch_from_db(key)

Note

  • Counter values should never decrease

  • Use consistent tag keys across increments for proper aggregation

  • Avoid high-cardinality tags (e.g., user IDs) that create too many series

See also

gauge(): For values that can increase/decrease

gauge(name: str, value: float, tags: dict[str, str] | None = None) None[source]

Set a gauge metric to a specific value.

Gauges represent current values that can increase or decrease (e.g., memory usage, active connections, queue size). Each call overwrites the previous value.

Parameters:
name : str

Metric name (e.g., memory.usage.bytes).

value : float

Current metric value.

tags : Optional[Dict[str, str]]

Labels for the metric. Defaults to None.

Example

System metrics:

import psutil

# Memory usage
mem = psutil.virtual_memory()
metrics.gauge("system.memory.used", mem.used, tags={"host": hostname})
metrics.gauge("system.memory.percent", mem.percent)

# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
metrics.gauge("system.cpu.percent", cpu_percent)

Application metrics:

# Database connection pool
metrics.gauge("db.pool.active", db.pool.active_connections())
metrics.gauge("db.pool.idle", db.pool.idle_connections())

# Queue size
metrics.gauge("queue.length", len(task_queue), tags={"queue": "background"})

# Cache size
metrics.gauge("cache.entries", cache.size(), tags={"cache": "redis"})

Warning

Gauges represent point-in-time values. For cumulative values, use increment().

Note

  • Emit gauges periodically (e.g., every minute) for accurate monitoring

  • Consider using background jobs for system metric collection

See also

increment(): For cumulative counters

timing(name: str, value: float, tags: dict[str, str] | None = None) None[source]

Record a timing/duration metric.

Used for tracking operation latency and performance.

Typically implemented as a histogram with predefined buckets.

Args:

name (str): Metric name (e.g., api.request.duration). value (float): Duration in seconds (use fractional seconds for sub-second precision). tags (Optional[Dict[str, str]]): Labels for the metric. Defaults to None.

Example:

API endpoint timing:

import time

start = time.time()
try:
    result = handle_request(request)
    return result
finally:
    duration = time.time() - start
    metrics.timing(
        "api.request.duration",
        duration,
        tags={"endpoint": request.path, "method": request.method}
    )

Database query timing:

start = time.perf_counter()
rows = db.execute(query)
duration = time.perf_counter() - start

metrics.timing(
    "db.query.duration",
    duration,
    tags={"table": "users", "operation": "select"}
)

Context manager for timing:

from contextlib import contextmanager

@contextmanager
def track_time(operation: str):
    start = time.time()
    try:
        yield
    finally:
        metrics.timing(f"{operation}.duration", time.time() - start)

with track_time("data_processing"):
    process_large_dataset()
Note:
  • Use seconds as the unit for consistency

  • time.perf_counter() is more accurate than time.time() for durations

  • Most backends convert to milliseconds for display

See Also:

histogram(): For general value distributions

histogram(name: str, value: float, tags: dict[str, str] | None = None) None[source]

Record a value in a histogram.

Histograms track value distributions (e.g., request sizes, payload sizes). Unlike gauges, all values are recorded and aggregated into buckets.

Parameters:
name : str

Metric name (e.g., request.body.size).

value : float

Value to record.

tags : Optional[Dict[str, str]]

Labels for the metric. Defaults to None.

Example

Request/response sizes:

# Request payload size
request_size = len(request.body)
metrics.histogram(
    "http.request.size",
    request_size,
    tags={"endpoint": request.path, "content_type": request.content_type}
)

# Response size
response_size = len(response.body)
metrics.histogram(
    "http.response.size",
    response_size,
    tags={"endpoint": request.path}
)

Batch sizes:

# Processing batch sizes
batch = fetch_batch_from_queue()
metrics.histogram(
    "processing.batch.size",
    len(batch),
    tags={"queue": "tasks"}
)

Query result counts:

rows = db.execute("SELECT * FROM users").fetchall()
metrics.histogram(
    "db.query.rows",
    len(rows),
    tags={"table": "users"}
)

Note

  • Histograms are ideal for analyzing value distributions (percentiles, averages)

  • Use timing() specifically for duration measurements

  • Avoid extremely high-cardinality values (>1000 unique values per second)

See also

timing(): Specialized for duration measurements gauge(): For current point-in-time values

__init__(*args, **kwargs)
class structum_lab.monitoring.interfaces.NoOpMetrics[source]

Bases: object

Fallback no-op implementation when no monitoring plugin is installed.

increment(name: str, value: float = 1.0, tags: dict[str, str] | None = None) None[source]

No-op increment.

gauge(name: str, value: float, tags: dict[str, str] | None = None) None[source]

No-op gauge.

timing(name: str, value: float, tags: dict[str, str] | None = None) None[source]

No-op timing.

histogram(name: str, value: float, tags: dict[str, str] | None = None) None[source]

No-op histogram.

Database

Database Interfaces

Database interfaces for Structum Lab.

This module provides Protocol definitions for database operations. For concrete implementations, install structum-database:

pip install structum-database

Usage:
>>> from structum_lab.plugins.database import SQLAlchemyDatabase
>>> db = SQLAlchemyDatabase.from_config()
class structum_lab.database.DatabaseInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for database managers in Structum Lab.

This is the main entry point for all database operations. Implementations must provide connection pooling, transaction management, and health monitoring.

The protocol abstracts away database-specific details, allowing applications to work with different backends (PostgreSQL, MySQL, SQLite) through a unified interface.

Implementations:

Example

Basic usage with configuration::

from structum_lab.plugins.database import SQLAlchemyDatabase

# Initialize from config db = SQLAlchemyDatabase.from_config()

# Or with explicit URL db = SQLAlchemyDatabase(url=”postgresql://user:pass@localhost/mydb”)

# Use transaction context manager (recommended) with db.transaction() as conn:

conn.execute(

“INSERT INTO users (name, email) VALUES (:name, :email)”, {“name”: “John”, “email”: “john@example.com”}

)

user_id = conn.execute(“SELECT last_insert_id()”).fetchone()[“id”]

conn.execute(

“INSERT INTO profiles (user_id, bio) VALUES (:uid, :bio)”, {“uid”: user_id, “bio”: “Software engineer”}

) # Commits automatically on success

# Check health health = db.health_check() if health.status != HealthStatus.HEALTHY:

log.warning(“Database issues detected”, result=health)

Note

Always use the transaction() context manager for database operations. It ensures proper connection pooling, automatic commit/rollback, and resource cleanup.

See also

ConnectionInterface: Connection protocol TransactionInterface: Transaction protocol HealthCheckResult: Health check result data class

__init__(*args, **kwargs)
close() None[source]

Close all connections in the pool and release resources.

Should be called during application shutdown to ensure clean termination. Any ongoing transactions should be completed before closing.

Raises:

RuntimeError – If called while transactions are active (implementation-specific).

Example

Application shutdown:

import atexit

db = SQLAlchemyDatabase.from_config ()
atexit.register(db.close)  # Ensure cleanup on exit

# Or in FastAPI lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
    db.connect()
    yield
    db.close()  # Clean shutdown

Warning

After calling close(), the database instance should not be reused. Create a new instance for additional operations.

See also

connect(): Initialize connection pool

connect() None[source]

Establish the database connection pool.

Creates and initializes the connection pool. Called automatically on first database operation if not already connected.

Raises:

Example

Explicit connection during startup:

db = SQLAlchemyDatabase.from_config()

try:
    db.connect()
    log.info("Database pool created", url=db.url)
except ConnectionError as e:
    log.critical("Cannot connect to database", exc_info=True)
    sys.exit(1)

Note

Usually not needed - the database connects automatically on first use. Explicit connection is useful for fail-fast behavior during startup.

See also

close(): Shutdown connection pool

get_connection() ConnectionInterface[source]

Acquire a connection from the pool.

Returns a connection that must be explicitly returned to the pool after use. Prefer using transaction() instead.

Returns:

A database connection from the pool.

Return type:

ConnectionInterface

Raises:
  • PoolExhaustedError – If no connections are available and pool is at maximum.

  • ConnectionError – If pool is not initialized.

Example

Manual connection management (not recommended):

conn = db.get_connection()
try:
    result = conn.execute("SELECT * FROM users")
    users = result.fetchall()
finally:
    # Must return connection to pool manually
    conn.close()

Warning

Manual connection management is error-prone. Always prefer transaction() context manager which handles connections automatically.

See also

transaction(): Recommended connection API

health_check() HealthCheckResult[source]

Check database connectivity and health status.

Performs a simple query to verify the database is responsive and measures latency. Useful for readiness probes and monitoring.

Returns:

Health status with latency and diagnostic details.

Return type:

HealthCheckResult

Example

Health check endpoint:

@app.get("/health/database")
def database_health():
    result = db.health_check()

    if result.status == HealthStatus.UNHEALTHY:
        raise HTTPException(503, detail=result.message)

    return {
        "status": result.status.value,
        "latency_ms": result.latency_ms,
        "message": result.message
    }

Prometheus metrics:

result = db.health_check()

# Record health status
db_health_gauge.labels(database="main").set(
    1 if result.status == HealthStatus.HEALTHY else 0
)

# Record latency
if result.latency_ms:
    db_latency_histogram.observe(result.latency_ms)

Note

Health checks execute a lightweight query (usually SELECT 1). They should complete quickly (<100ms typically).

See also

HealthCheckResult: Result data class HealthStatus: Health status enumeration

property is_connected : bool

Check if the database connection pool is active.

Returns:

True if connection pool is established, False otherwise.

Return type:

bool

Example

Conditional connection:

if not db.is_connected:
    db.connect()
    log.info("Database connection established")

Note

Most operations call connect() automatically if not connected. Explicit checking is mainly useful for health checks and diagnostics.

transaction() Iterator[ConnectionInterface][source]

Context manager for database transactions.

Provides automatic transaction management: commits on success, rolls back on exception. This is the recommended way to perform database operations.

Yields:

ConnectionInterface – Database connection with active transaction.

Raises:
  • DatabaseError – If transaction fails to start.

  • ConnectionError – If pool is exhausted or not connected.

Example

Recommended usage pattern:

# Single transaction
with db.transaction() as conn:
    conn.execute(
        "UPDATE accounts SET balance = balance - :amount WHERE id = :id",
        {"amount": 100, "id": 1}
    )
    conn.execute(
        "UPDATE accounts SET balance = balance + :amount WHERE id = :id",
        {"amount": 100, "id": 2}
    )
    # Commits automatically if no exception

# Exception triggers rollback
try:
    with db.transaction() as conn:
        conn.execute("DELETE FROM important_data")
        raise ValueError("Validation failed")
except ValueError:
    pass  # Transaction automatically rolled back

Note

Transactions are isolated - changes are not visible to other connections until commit. Isolation level depends on database implementation (usually READ COMMITTED).

See also

TransactionInterface: Transaction protocol ConnectionInterface: Connection protocol

property url : str

Get the database connection URL (sanitized).

Returns:

Database URL with password redacted/masked for security.

Example: "postgresql://user:***@localhost:5432/mydb"

Return type:

str

Example

Logging database configuration:

log.info("Connected to database", url=db.url)
# Logs: postgresql://user:***@localhost/mydb

Note

Passwords are automatically redacted to prevent accidental logging of credentials.

class structum_lab.database.ConnectionInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for database connections in Structum Lab.

A connection represents an active link to the database that can execute queries and retrieve results. Connections are typically obtained from a connection pool and should be returned after use.

Implementations:

Example

Using a connection within a transaction:

with db.transaction() as conn:
    # Execute query with named parameters
    conn.execute(
        "INSERT INTO users (name, email) VALUES (:name, :email)",
        {"name": "John", "email": "john@example.com"}
    )

    # Fetch results
    conn.execute("SELECT * FROM users WHERE id > :id", {"id": 100})
    users = conn.fetchall()
    for user in users:
        print(f"{user['name']} - {user['email']}")

Note

Connections are not thread-safe. Use one connection per thread or protect access with locks.

See also

DatabaseInterface: Database manager providing connections DatabaseInterface.transaction(): Recommended way to get connections

__init__(*args, **kwargs)
execute(query: str, params: dict[str, Any] | tuple[Any, ...] | None = None) Any[source]

Execute a SQL query with optional parameters.

Supports both named parameters (dict) and positional parameters (tuple). Use :name syntax for named parameters, ? for positional.

Parameters:
query : str

SQL query string. Use :param_name for named parameters or ? for positional parameters.

params : dict[str, Any] | tuple[Any, ...] | None

Query parameters. Use dict for named parameters, tuple for positional. Defaults to None.

Returns:

Result object (implementation-specific). Use fetch methods

to retrieve rows.

Return type:

Any

Raises:
  • DatabaseError – If query execution fails.

  • ParameterError – If parameters don’t match query placeholders.

Example

Different parameter styles:

# Named parameters (recommended)
conn.execute(
    "SELECT * FROM users WHERE age > :min_age AND city = :city",
    {"min_age": 18, "city": "NYC"}
)

# Positional parameters
conn.execute(
    "SELECT * FROM users WHERE age > ? AND city = ?",
    (18, "NYC")
)

# No parameters
conn.execute("SELECT COUNT(*) FROM users")

Warning

Always use parameterized queries. Never use string interpolation for user input (SQL injection risk).

See also

fetchone(): Retrieve single row fetchall(): Retrieve all rows

fetchall() list[dict[str, Any]][source]

Fetch all remaining rows from the last executed query.

Returns:

List of rows as dictionaries. Empty list

if no rows or cursor exhausted.

Return type:

list[dict[str, Any]]

Example

Fetching multiple rows:

conn.execute("SELECT * FROM users WHERE active = :active", {"active": True})
users = conn.fetchall()

for user in users:
    print(f"{user['id']}: {user['name']}")

print(f"Total active users: {len(users)}")

Warning

For large result sets, consider using fetchmany() to avoid loading all rows into memory at once.

See also

fetchone(): Fetch single row fetchmany(): Fetch rows in batches

fetchmany(size: int) list[dict[str, Any]][source]

Fetch up to size rows from the last executed query.

Useful for processing large result sets in batches to manage memory usage.

Parameters:
size : int

Maximum number of rows to fetch. Must be > 0.

Returns:

List of rows (up to size). May return fewer

rows if result set is exhausted. Empty list if no rows remain.

Return type:

list[dict[str, Any]]

Example

Batch processing large result set:

conn.execute("SELECT * FROM large_table")

batch_size = 100
while True:
    batch = conn.fetchmany(batch_size)
    if not batch:
        break

    process_batch(batch)
    print(f"Processed {len(batch)} rows")

Note

Efficient for iterating large datasets without loading everything into memory.

See also

fetchone(): Fetch single row fetchall(): Fetch all rows

fetchone() dict[str, Any] | None[source]

Fetch the next row from the last executed query.

Returns:

Row as dictionary with column names as keys,

or None if no more rows available.

Return type:

dict[str, Any] | None

Example

Fetching single row:

conn.execute("SELECT * FROM users WHERE id = :id", {"id": 1})
user = conn.fetchone()

if user:
    print(f"Found user: {user['name']}")
else:
    print("User not found")

Note

Call after execute(). Returns None when cursor exhausted.

See also

fetchall(): Fetch all remaining rows fetchmany(): Fetch specific number of rows

class structum_lab.database.TransactionInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for database transactions providing ACID guarantees.

Transactions group multiple database operations into a single atomic unit. Either all operations succeed (commit) or all fail (rollback).

Example

Manual transaction management:

tx = db.begin_transaction()
try:
    conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
    tx.commit()
except Exception:
    tx.rollback()
    raise

Preferred context manager approach:

# Automatic commit/rollback
with db.transaction() as conn:
    conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
    # Commits automatically if no exception

Note

Most applications should use DatabaseInterface.transaction() context manager instead of managing transactions manually.

See also

DatabaseInterface.transaction(): Recommended transaction API

__init__(*args, **kwargs)
commit() None[source]

Commit the current transaction.

Makes all changes since transaction start permanent in the database. After commit, the transaction is no longer active.

Raises:
  • DatabaseError – If commit fails (e.g., constraint violation).

  • TransactionError – If transaction is not active.

Example

Manual commit:

tx = db.begin_transaction()
try:
    conn.execute("INSERT INTO logs VALUES (:msg)", {"msg": "test"})
    tx.commit()
    log.info("Transaction committed successfully")
except DatabaseError as e:
    tx.rollback()
    log.error("Commit failed", exc_info=True)

Warning

After commit, the transaction cannot be reused. Start a new transaction for additional operations.

See also

rollback(): Abort transaction

property is_active : bool

Check if the transaction is still active.

Returns:

True if transaction can accept operations, False if

already committed or rolled back.

Return type:

bool

Example

Checking transaction state:

tx = db.begin_transaction()
assert tx.is_active  # True

tx.commit()
assert not tx.is_active  # False

# This would raise TransactionError
# tx.execute("SELECT 1")

Note

Useful for conditional logic and error handling.

rollback() None[source]

Rollback the current transaction.

Discards all changes made since transaction start. The database state returns to what it was before the transaction began.

Raises:

TransactionError – If transaction is not active.

Example

Manual rollback on error:

tx = db.begin_transaction()
try:
    conn.execute("DELETE FROM important_data")
    # Validation fails
    if not validate_deletion():
        tx.rollback()
        log.warning("Deletion rolled back - validation failed")
        return
    tx.commit()
except Exception:
    tx.rollback()
    raise

Note

Rollback is safe to call multiple times. Subsequent calls are no-ops.

See also

commit(): Persist transaction changes

class structum_lab.database.HealthCheckResult(status: HealthStatus, message: str, latency_ms: float | None = None, details: dict[str, Any] | None = None)[source]

Bases: object

Data class representing database health check results.

status

Overall health status (HEALTHY, DEGRADED, UNHEALTHY).

Type:

HealthStatus

message

Human-readable status description.

Type:

str

latency_ms

Query latency in milliseconds, if available.

Type:

float | None

details

Additional diagnostic information (e.g., active connections, pool statistics).

Type:

dict[str, Any] | None

Example

Creating and using a health check result:

result = HealthCheckResult(
    status=HealthStatus.HEALTHY,
    message="Database connection OK",
    latency_ms=2.5,
    details={"active_connections": 5, "pool_size": 10}
)

# Use in monitoring/alerts
if result.latency_ms and result.latency_ms > 100:
    alert("High database latency", result.latency_ms)

Note

This class is frozen (immutable) to ensure health check results cannot be modified after creation.

__init__(status: HealthStatus, message: str, latency_ms: float | None = None, details: dict[str, Any] | None = None)
details : dict[str, Any] | None = None
latency_ms : float | None = None
status : HealthStatus
message : str
class structum_lab.database.HealthStatus(*values)[source]

Bases: Enum

Enumeration of possible database health states.

HEALTHY

Database is fully operational with normal latency.

DEGRADED

Database is responsive but experiencing issues (e.g., high latency, connection warnings).

UNHEALTHY

Database is unreachable or critically impaired.

Example

Checking health status:

result = db.health_check()
if result.status == HealthStatus.HEALTHY:
    log.info("Database OK", latency_ms=result.latency_ms)
elif result.status == HealthStatus.DEGRADED:
    log.warning("Database degraded", message=result.message)
else:
    log.error("Database unhealthy", message=result.message)
HEALTHY = 'healthy'
DEGRADED = 'degraded'
UNHEALTHY = 'unhealthy'
structum_lab.database.Database

alias of DatabaseInterface

structum_lab.database.Connection

alias of ConnectionInterface

structum_lab.database.Transaction

alias of TransactionInterface

Core database interfaces for Structum Lab.

This module defines the Protocols that any database implementation must follow. Implementations are provided by the structum-database plugin.

Example

>>> from structum_lab.plugins.database import SQLAlchemyDatabase
>>> db = SQLAlchemyDatabase.from_config()
>>> with db.transaction() as conn:
...     conn.execute("SELECT 1")
class structum_lab.database.interfaces.HealthStatus(*values)[source]

Bases: Enum

Enumeration of possible database health states.

HEALTHY

Database is fully operational with normal latency.

DEGRADED

Database is responsive but experiencing issues (e.g., high latency, connection warnings).

UNHEALTHY

Database is unreachable or critically impaired.

Example

Checking health status:

result = db.health_check()
if result.status == HealthStatus.HEALTHY:
    log.info("Database OK", latency_ms=result.latency_ms)
elif result.status == HealthStatus.DEGRADED:
    log.warning("Database degraded", message=result.message)
else:
    log.error("Database unhealthy", message=result.message)
HEALTHY = 'healthy'
DEGRADED = 'degraded'
UNHEALTHY = 'unhealthy'
class structum_lab.database.interfaces.HealthCheckResult(status: HealthStatus, message: str, latency_ms: float | None = None, details: dict[str, Any] | None = None)[source]

Bases: object

Data class representing database health check results.

status

Overall health status (HEALTHY, DEGRADED, UNHEALTHY).

Type:

HealthStatus

message

Human-readable status description.

Type:

str

latency_ms

Query latency in milliseconds, if available.

Type:

float | None

details

Additional diagnostic information (e.g., active connections, pool statistics).

Type:

dict[str, Any] | None

Example

Creating and using a health check result:

result = HealthCheckResult(
    status=HealthStatus.HEALTHY,
    message="Database connection OK",
    latency_ms=2.5,
    details={"active_connections": 5, "pool_size": 10}
)

# Use in monitoring/alerts
if result.latency_ms and result.latency_ms > 100:
    alert("High database latency", result.latency_ms)

Note

This class is frozen (immutable) to ensure health check results cannot be modified after creation.

status : HealthStatus
message : str
latency_ms : float | None = None
details : dict[str, Any] | None = None
__init__(status: HealthStatus, message: str, latency_ms: float | None = None, details: dict[str, Any] | None = None)
class structum_lab.database.interfaces.ConnectionInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for database connections in Structum Lab.

A connection represents an active link to the database that can execute queries and retrieve results. Connections are typically obtained from a connection pool and should be returned after use.

Implementations:

Example

Using a connection within a transaction:

with db.transaction() as conn:
    # Execute query with named parameters
    conn.execute(
        "INSERT INTO users (name, email) VALUES (:name, :email)",
        {"name": "John", "email": "john@example.com"}
    )

    # Fetch results
    conn.execute("SELECT * FROM users WHERE id > :id", {"id": 100})
    users = conn.fetchall()
    for user in users:
        print(f"{user['name']} - {user['email']}")

Note

Connections are not thread-safe. Use one connection per thread or protect access with locks.

See also

DatabaseInterface: Database manager providing connections DatabaseInterface.transaction(): Recommended way to get connections

execute(query: str, params: dict[str, Any] | tuple[Any, ...] | None = None) Any[source]

Execute a SQL query with optional parameters.

Supports both named parameters (dict) and positional parameters (tuple). Use :name syntax for named parameters, ? for positional.

Parameters:
query : str

SQL query string. Use :param_name for named parameters or ? for positional parameters.

params : dict[str, Any] | tuple[Any, ...] | None

Query parameters. Use dict for named parameters, tuple for positional. Defaults to None.

Returns:

Result object (implementation-specific). Use fetch methods

to retrieve rows.

Return type:

Any

Raises:
  • DatabaseError – If query execution fails.

  • ParameterError – If parameters don’t match query placeholders.

Example

Different parameter styles:

# Named parameters (recommended)
conn.execute(
    "SELECT * FROM users WHERE age > :min_age AND city = :city",
    {"min_age": 18, "city": "NYC"}
)

# Positional parameters
conn.execute(
    "SELECT * FROM users WHERE age > ? AND city = ?",
    (18, "NYC")
)

# No parameters
conn.execute("SELECT COUNT(*) FROM users")

Warning

Always use parameterized queries. Never use string interpolation for user input (SQL injection risk).

See also

fetchone(): Retrieve single row fetchall(): Retrieve all rows

fetchone() dict[str, Any] | None[source]

Fetch the next row from the last executed query.

Returns:

Row as dictionary with column names as keys,

or None if no more rows available.

Return type:

dict[str, Any] | None

Example

Fetching single row:

conn.execute("SELECT * FROM users WHERE id = :id", {"id": 1})
user = conn.fetchone()

if user:
    print(f"Found user: {user['name']}")
else:
    print("User not found")

Note

Call after execute(). Returns None when cursor exhausted.

See also

fetchall(): Fetch all remaining rows fetchmany(): Fetch specific number of rows

fetchall() list[dict[str, Any]][source]

Fetch all remaining rows from the last executed query.

Returns:

List of rows as dictionaries. Empty list

if no rows or cursor exhausted.

Return type:

list[dict[str, Any]]

Example

Fetching multiple rows:

conn.execute("SELECT * FROM users WHERE active = :active", {"active": True})
users = conn.fetchall()

for user in users:
    print(f"{user['id']}: {user['name']}")

print(f"Total active users: {len(users)}")

Warning

For large result sets, consider using fetchmany() to avoid loading all rows into memory at once.

See also

fetchone(): Fetch single row fetchmany(): Fetch rows in batches

fetchmany(size: int) list[dict[str, Any]][source]

Fetch up to size rows from the last executed query.

Useful for processing large result sets in batches to manage memory usage.

Parameters:
size : int

Maximum number of rows to fetch. Must be > 0.

Returns:

List of rows (up to size). May return fewer

rows if result set is exhausted. Empty list if no rows remain.

Return type:

list[dict[str, Any]]

Example

Batch processing large result set:

conn.execute("SELECT * FROM large_table")

batch_size = 100
while True:
    batch = conn.fetchmany(batch_size)
    if not batch:
        break

    process_batch(batch)
    print(f"Processed {len(batch)} rows")

Note

Efficient for iterating large datasets without loading everything into memory.

See also

fetchone(): Fetch single row fetchall(): Fetch all rows

__init__(*args, **kwargs)
class structum_lab.database.interfaces.TransactionInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for database transactions providing ACID guarantees.

Transactions group multiple database operations into a single atomic unit. Either all operations succeed (commit) or all fail (rollback).

Example

Manual transaction management:

tx = db.begin_transaction()
try:
    conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
    tx.commit()
except Exception:
    tx.rollback()
    raise

Preferred context manager approach:

# Automatic commit/rollback
with db.transaction() as conn:
    conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
    # Commits automatically if no exception

Note

Most applications should use DatabaseInterface.transaction() context manager instead of managing transactions manually.

See also

DatabaseInterface.transaction(): Recommended transaction API

commit() None[source]

Commit the current transaction.

Makes all changes since transaction start permanent in the database. After commit, the transaction is no longer active.

Raises:
  • DatabaseError – If commit fails (e.g., constraint violation).

  • TransactionError – If transaction is not active.

Example

Manual commit:

tx = db.begin_transaction()
try:
    conn.execute("INSERT INTO logs VALUES (:msg)", {"msg": "test"})
    tx.commit()
    log.info("Transaction committed successfully")
except DatabaseError as e:
    tx.rollback()
    log.error("Commit failed", exc_info=True)

Warning

After commit, the transaction cannot be reused. Start a new transaction for additional operations.

See also

rollback(): Abort transaction

rollback() None[source]

Rollback the current transaction.

Discards all changes made since transaction start. The database state returns to what it was before the transaction began.

Raises:

TransactionError – If transaction is not active.

Example

Manual rollback on error:

tx = db.begin_transaction()
try:
    conn.execute("DELETE FROM important_data")
    # Validation fails
    if not validate_deletion():
        tx.rollback()
        log.warning("Deletion rolled back - validation failed")
        return
    tx.commit()
except Exception:
    tx.rollback()
    raise

Note

Rollback is safe to call multiple times. Subsequent calls are no-ops.

See also

commit(): Persist transaction changes

property is_active : bool

Check if the transaction is still active.

Returns:

True if transaction can accept operations, False if

already committed or rolled back.

Return type:

bool

Example

Checking transaction state:

tx = db.begin_transaction()
assert tx.is_active  # True

tx.commit()
assert not tx.is_active  # False

# This would raise TransactionError
# tx.execute("SELECT 1")

Note

Useful for conditional logic and error handling.

__init__(*args, **kwargs)
class structum_lab.database.interfaces.DatabaseInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for database managers in Structum Lab.

This is the main entry point for all database operations. Implementations must provide connection pooling, transaction management, and health monitoring.

The protocol abstracts away database-specific details, allowing applications to work with different backends (PostgreSQL, MySQL, SQLite) through a unified interface.

Implementations:

Example

Basic usage with configuration::

from structum_lab.plugins.database import SQLAlchemyDatabase

# Initialize from config db = SQLAlchemyDatabase.from_config()

# Or with explicit URL db = SQLAlchemyDatabase(url=”postgresql://user:pass@localhost/mydb”)

# Use transaction context manager (recommended) with db.transaction() as conn:

conn.execute(

“INSERT INTO users (name, email) VALUES (:name, :email)”, {“name”: “John”, “email”: “john@example.com”}

)

user_id = conn.execute(“SELECT last_insert_id()”).fetchone()[“id”]

conn.execute(

“INSERT INTO profiles (user_id, bio) VALUES (:uid, :bio)”, {“uid”: user_id, “bio”: “Software engineer”}

) # Commits automatically on success

# Check health health = db.health_check() if health.status != HealthStatus.HEALTHY:

log.warning(“Database issues detected”, result=health)

Note

Always use the transaction() context manager for database operations. It ensures proper connection pooling, automatic commit/rollback, and resource cleanup.

See also

ConnectionInterface: Connection protocol TransactionInterface: Transaction protocol HealthCheckResult: Health check result data class

property url : str

Get the database connection URL (sanitized).

Returns:

Database URL with password redacted/masked for security.

Example: "postgresql://user:***@localhost:5432/mydb"

Return type:

str

Example

Logging database configuration:

log.info("Connected to database", url=db.url)
# Logs: postgresql://user:***@localhost/mydb

Note

Passwords are automatically redacted to prevent accidental logging of credentials.

property is_connected : bool

Check if the database connection pool is active.

Returns:

True if connection pool is established, False otherwise.

Return type:

bool

Example

Conditional connection:

if not db.is_connected:
    db.connect()
    log.info("Database connection established")

Note

Most operations call connect() automatically if not connected. Explicit checking is mainly useful for health checks and diagnostics.

connect() None[source]

Establish the database connection pool.

Creates and initializes the connection pool. Called automatically on first database operation if not already connected.

Raises:

Example

Explicit connection during startup:

db = SQLAlchemyDatabase.from_config()

try:
    db.connect()
    log.info("Database pool created", url=db.url)
except ConnectionError as e:
    log.critical("Cannot connect to database", exc_info=True)
    sys.exit(1)

Note

Usually not needed - the database connects automatically on first use. Explicit connection is useful for fail-fast behavior during startup.

See also

close(): Shutdown connection pool

close() None[source]

Close all connections in the pool and release resources.

Should be called during application shutdown to ensure clean termination. Any ongoing transactions should be completed before closing.

Raises:

RuntimeError – If called while transactions are active (implementation-specific).

Example

Application shutdown:

import atexit

db = SQLAlchemyDatabase.from_config ()
atexit.register(db.close)  # Ensure cleanup on exit

# Or in FastAPI lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
    db.connect()
    yield
    db.close()  # Clean shutdown

Warning

After calling close(), the database instance should not be reused. Create a new instance for additional operations.

See also

connect(): Initialize connection pool

get_connection() ConnectionInterface[source]

Acquire a connection from the pool.

Returns a connection that must be explicitly returned to the pool after use. Prefer using transaction() instead.

Returns:

A database connection from the pool.

Return type:

ConnectionInterface

Raises:
  • PoolExhaustedError – If no connections are available and pool is at maximum.

  • ConnectionError – If pool is not initialized.

Example

Manual connection management (not recommended):

conn = db.get_connection()
try:
    result = conn.execute("SELECT * FROM users")
    users = result.fetchall()
finally:
    # Must return connection to pool manually
    conn.close()

Warning

Manual connection management is error-prone. Always prefer transaction() context manager which handles connections automatically.

See also

transaction(): Recommended connection API

transaction() Iterator[ConnectionInterface][source]

Context manager for database transactions.

Provides automatic transaction management: commits on success, rolls back on exception. This is the recommended way to perform database operations.

Yields:

ConnectionInterface – Database connection with active transaction.

Raises:
  • DatabaseError – If transaction fails to start.

  • ConnectionError – If pool is exhausted or not connected.

Example

Recommended usage pattern:

# Single transaction
with db.transaction() as conn:
    conn.execute(
        "UPDATE accounts SET balance = balance - :amount WHERE id = :id",
        {"amount": 100, "id": 1}
    )
    conn.execute(
        "UPDATE accounts SET balance = balance + :amount WHERE id = :id",
        {"amount": 100, "id": 2}
    )
    # Commits automatically if no exception

# Exception triggers rollback
try:
    with db.transaction() as conn:
        conn.execute("DELETE FROM important_data")
        raise ValueError("Validation failed")
except ValueError:
    pass  # Transaction automatically rolled back

Note

Transactions are isolated - changes are not visible to other connections until commit. Isolation level depends on database implementation (usually READ COMMITTED).

See also

TransactionInterface: Transaction protocol ConnectionInterface: Connection protocol

health_check() HealthCheckResult[source]

Check database connectivity and health status.

Performs a simple query to verify the database is responsive and measures latency. Useful for readiness probes and monitoring.

Returns:

Health status with latency and diagnostic details.

Return type:

HealthCheckResult

Example

Health check endpoint:

@app.get("/health/database")
def database_health():
    result = db.health_check()

    if result.status == HealthStatus.UNHEALTHY:
        raise HTTPException(503, detail=result.message)

    return {
        "status": result.status.value,
        "latency_ms": result.latency_ms,
        "message": result.message
    }

Prometheus metrics:

result = db.health_check()

# Record health status
db_health_gauge.labels(database="main").set(
    1 if result.status == HealthStatus.HEALTHY else 0
)

# Record latency
if result.latency_ms:
    db_latency_histogram.observe(result.latency_ms)

Note

Health checks execute a lightweight query (usually SELECT 1). They should complete quickly (<100ms typically).

See also

HealthCheckResult: Result data class HealthStatus: Health status enumeration

__init__(*args, **kwargs)
structum_lab.database.interfaces.Database

alias of DatabaseInterface

structum_lab.database.interfaces.Connection

alias of ConnectionInterface

structum_lab.database.interfaces.Transaction

alias of TransactionInterface

Authentication

Authentication Interfaces

Authentication interfaces for Structum Lab.

This module provides Protocol definitions for authentication operations. For concrete implementations, install structum-auth:

pip install structum-auth

Usage:
>>> from structum_lab.plugins.auth import JWTAuthProvider
>>> auth = JWTAuthProvider.from_config()
class structum_lab.auth.AuthInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for authentication providers in Structum Lab.

This is the main entry point for all authentication operations. Implementations provide JWT-based authentication, password hashing, and token management.

The auth system is storage-agnostic - it doesn’t manage users directly. Instead, it uses UserRepositoryInterface to fetch user data, keeping authentication logic decoupled from storage.

Implementations:

Example

See usage in specific implementations like JWTAuthProvider.

__init__(*args, **kwargs)
authenticate(username: str, password: str, user_repo: UserRepositoryInterface) TokenPair | None[source]

Authenticate a user.

hash_password(password: str) str[source]

Hash a password.

refresh(refresh_token: str, user_repo: UserRepositoryInterface) TokenPair | None[source]

Refresh access token.

verify_access_token(token: str) dict[str, Any] | None[source]

Verify an access token.

verify_password(password: str, hashed: str) bool[source]

Verify a password.

class structum_lab.auth.UserInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for authenticated user entities in Structum Lab.

Applications must implement this protocol for their User model to integrate with the authentication system. The auth system never creates or modifies users - it only queries them via UserRepositoryInterface.

Example

Implementing UserInterface:

from dataclasses import dataclass

@dataclass
class User:
    id: str
    username: str
    hashed_password: str
    roles: list[str]
    permissions: set[str]

    def has_permission(self, permission: str) -> bool:
        # Check role-based permissions
        for role in self.roles:
            if permission in ROLE_PERMISSIONS.get(role, set()):
                return True
        # Check user-specific permissions
        return permission in self.permissions

Using with auth:

user = user_repo.find_by_username("john")
if user and user.has_permission("users:write"):
    # Allow operation
    pass

Note

This is a Protocol, not a base class. Your User model doesn’t need to inherit from this - just implement the required properties and methods.

See also

UserRepositoryInterface: Repository for user data access AuthInterface: Authentication provider using users

__init__(*args, **kwargs)
has_permission(permission: str) bool[source]

Check if user has a specific permission.

Parameters:
permission : str

Permission string, typically in format resource:action (e.g., “users:write”, “posts:delete”).

Returns:

True if user has the permission, False otherwise.

Return type:

bool

Example

Permission-based authorization:

@app.delete("/users/{user_id}")
async def delete_user(user_id: str, current_user: User = Depends(get_current_user)):
    if not current_user.has_permission("users:delete"):
        raise HTTPException(403, "Permission denied")
    # Delete user

Role-based permission mapping:

ROLE_PERMISSIONS = {
    "admin": {"users:read", "users:write", "users:delete"},
    "user": {"users:read"},
}

def has_permission(self, permission: str) -> bool:
    return any(
        permission in ROLE_PERMISSIONS.get(role, set())
        for role in self.roles
    )

Note

Permission format is application-defined. Use a consistent naming scheme (e.g., resource:action).

property hashed_password : str

User’s hashed password.

Returns:

Securely hashed password (e.g., Argon2, bcrypt).

Return type:

str

Warning

Never store or transmit plain-text passwords. This property should only return hashed values.

Example

Password verification:

if auth.verify_password(input_password, user.hashed_password):
    # Password matches
    pass

See also

PasswordHasherInterface.hash(): Hash passwords PasswordHasherInterface.verify(): Verify passwords

property id : str

Unique identifier for the user.

Returns:

User ID, typically a UUID or database primary key.

Return type:

str

Example

User ID in token payload:

token_payload = {"user_id": user.id, "exp": ...}
property roles : list[str]

List of roles assigned to the user.

Returns:

Role names (e.g., [“admin”, “user”, “moderator”]).

Return type:

list[str]

Example

Role-based access control:

if "admin" in user.roles:
    # Allow admin operation
    pass

Note

Roles should be lowercase strings. Use has_permission() for fine-grained permission checks.

property username : str

User’s username or email address.

Returns:

Username, typically used for login and display.

Return type:

str

Example

username in login:

user = user_repo.find_by_username(username)
if user and auth.verify_password(password, user.hashed_password):
    return auth.authenticate(...)
class structum_lab.auth.UserRepositoryInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for user storage and retrieval in Structum Lab.

Applications implement this to connect the auth system with their database or user storage backend. The auth plugin does NOT manage user storage directly - it delegates all user operations to this repository.

This separation ensures the auth system remains storage-agnostic and can work with any database (PostgreSQL, MongoDB, etc.) or user service (LDAP, OAuth providers).

Implementations:
  • Database-backed repository (SQLAlchemy, etc.)

  • External user service adapter (LDAP, Active Directory)

  • In-memory repository (testing only)

Example

SQLAlchemy repository implementation:

class SQLAlchemyUserRepository:
    def __init__(self, db: DatabaseInterface):
        self.db = db

    def find_by_username(self, username: str) -> User | None:
        with self.db.transaction() as conn:
            conn.execute(
                "SELECT * FROM users WHERE username = :username",
                {"username": username}
            )
            row = conn.fetchone()
            if not row:
                return None

            return User(
                id=row["id"],
                username=row["username"],
                hashed_password=row["password_hash"],
                roles=row.get("roles", []),
            )

    def find_by_id(self, user_id: str) -> User | None:
        with self.db.transaction() as conn:
            conn.execute(
                "SELECT * FROM users WHERE id = :id",
                {"id": user_id}
            )
            row = conn.fetchone()
            return User(**row) if row else None

Using with authentication:

user_repo = SQLAlchemyUserRepository(db)
auth = JWTAuthProvider.from_config()

tokens = auth.authenticate("john", "password123", user_repo)
if tokens:
    print(f"Access token: {tokens.access_token}")

Note

Repository is responsible for mapping storage format to UserInterface. It should handle serialization/deserialization of user data.

See also

UserInterface: User entity protocol AuthInterface: Authentication provider using repositories

__init__(*args, **kwargs)
find_by_id(user_id: str) UserInterface | None[source]

Find a user by their unique identifier.

Parameters:
user_id : str

User’s unique identifier (typically UUID or database ID).

Returns:

User if found, None otherwise.

Return type:

UserInterface | None

Example

Loading user from token:

# After verifying access token
payload = auth.verify_access_token(token)
if payload:
    user = user_repo.find_by_id(payload["user_id"])
    if user:
        # User authenticated
        return user

Note

This method is called frequently (on every authenticated request). Consider caching user data for performance.

find_by_username(username: str) UserInterface | None[source]

Find a user by username or email.

Parameters:
username : str

Username or email to search for. Should be case-insensitive in most implementations.

Returns:

User if found, None otherwise.

Return type:

UserInterface | None

Example

Looking up user for authentication:

user = user_repo.find_by_username("john@example.com")
if user:
    # Verify password
    if auth.verify_password(password, user.hashed_password):
        return auth.create_tokens(user)
else:
    log.warning("Login attempt for unknown user", username=username)

Note

Implementation should normalize username (e.g., lowercase) before lookup. Consider using database indexes on username column for performance.

class structum_lab.auth.PasswordHasherInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for secure password hashing in Structum Lab.

Implementations must use cryptographically secure hashing algorithms (e.g., Argon2, b

crypt, scrypt). Never use fast hashes like MD5 or SHA-1

for passwords.

Implementations:
  • Argon2Hasher (recommended)

  • BcryptHasher

Example:

Using password hasher:

from structum_lab.plugins.auth.password import Argon2Hasher

hasher = Argon2Hasher()

# Hash password during registration
hashed = hasher.hash("user_password_123")
# Store hashed in database: user.hashed_password = hashed

# Verify during login
if hasher.verify("user_password_123", hashed):
    # Password matches
    return create_token(user)
else:
    # Invalid password
    raise AuthenticationError("Invalid credentials")
Warning:

Never log, display, or store plain-text passwords. Always hash passwords immediately upon receipt.

See Also:

AuthInterface: Auth provider using password hasher UserInterface: User entity with hashed_password property

__init__(*args, **kwargs)
hash(password: str) str[source]

Hash a plain-text password securely.

Parameters:
password : str

Plain-text password to hash. No length restrictions, but implementations may truncate very long passwords.

Returns:

Hashed password string including algorithm identifier and salt.

Format is implementation-specific (e.g., Argon2: $argon2id$v=19$...).

Return type:

str

Example

Creating user with hashed password:

# During user registration
plain_password = request.form["password"]
hashed = auth.hash_password(plain_password)

user = User(
    id=generate_id(),
    username=request.form["username"],
    hashed_password=hashed,  # Store this
    roles=["user"]
)
user_repo.save(user)

Warning

Hashing is intentionally slow (100-500ms) to resist brute-force attacks. Do not hash passwords in tight loops or performance-critical paths.

Note

Each call generates a unique hash (due to random salt) even for the same password. This is expected and secure behavior.

verify(password: str, hashed: str) bool[source]

Verify a plain-text password against a hash.

Parameters:
password : str

Plain-text password to verify.

hashed : str

Previously hashed password (from database).

Returns:

True if password matches hash, False otherwise.

Return type:

bool

Example

Password verification during login:

# Get user from database
user = user_repo.find_by_username(username)
if not user:
    return None  # User not found

# Verify password
if auth.verify_password(password, user.hashed_password):
    # Authentication successful
    return auth.create_tokens(user)
else:
    # Invalid password
    log.warning("Failed login attempt", username=username)
    return None

Warning

Always use constant-time comparison internally to prevent timing attacks. Most modern hashing libraries handle this automatically.

Note

Returns False for invalid/malformed hashes rather than raising exceptions. This prevents information leakage about hash format.

class structum_lab.auth.TokenPair(access_token: str, refresh_token: str, token_type: str = 'bearer', expires_at: datetime | None = None)[source]

Bases: object

Data class containing JWT access and refresh token pair.

access_token

Short-lived JWT access token for API requests.

Type:

str

refresh_token

Long-lived token for obtaining new access tokens.

Type:

str

token_type

Token type, typically “bearer” for JWT. Defaults to “bearer”.

Type:

str

expires_at

Expiration timestamp for access token, if available.

Type:

datetime | None

Example

Creating and using token pair:

tokens = TokenPair(
    access_token="eyJ0eXAiOiJKV1QiLCJhbGc...",
    refresh_token="eyJ0eXAiOiJKV1QiLCJhbGc...",
    token_type="bearer",
    expires_at=datetime.now() + timedelta(hours=1)
)

# Use in HTTP Authorization header
headers = {"Authorization": f"{tokens.token_type} {tokens.access_token}"}

Note

This class is frozen (immutable) to prevent accidental token modification. Tokens should be treated as opaque strings and never parsed by clients.

See also

AuthInterface.authenticate(): Method that returns token pairs AuthInterface.refresh(): Refresh access tokens

__init__(access_token: str, refresh_token: str, token_type: str = 'bearer', expires_at: datetime | None = None)
expires_at : datetime | None = None
token_type : str = 'bearer'
access_token : str
refresh_token : str
structum_lab.auth.Auth

alias of AuthInterface

structum_lab.auth.User

alias of UserInterface

structum_lab.auth.UserRepository

alias of UserRepositoryInterface

structum_lab.auth.PasswordHasher

alias of PasswordHasherInterface

Core authentication interfaces for Structum Lab.

This module defines the Protocols that any auth implementation must follow. Implementations are provided by the structum-auth plugin.

Key Design Decision:

Auth does NOT manage database/storage. The application implements UserRepositoryInterface to provide user data. This keeps auth decoupled from any specific storage solution.

Example

>>> from structum_lab.plugins.auth import JWTAuthProvider
>>> auth = JWTAuthProvider.from_config()
>>> tokens = auth.authenticate("user", "password", user_repo)
class structum_lab.auth.interfaces.TokenPair(access_token: str, refresh_token: str, token_type: str = 'bearer', expires_at: datetime | None = None)[source]

Bases: object

Data class containing JWT access and refresh token pair.

access_token

Short-lived JWT access token for API requests.

Type:

str

refresh_token

Long-lived token for obtaining new access tokens.

Type:

str

token_type

Token type, typically “bearer” for JWT. Defaults to “bearer”.

Type:

str

expires_at

Expiration timestamp for access token, if available.

Type:

datetime | None

Example

Creating and using token pair:

tokens = TokenPair(
    access_token="eyJ0eXAiOiJKV1QiLCJhbGc...",
    refresh_token="eyJ0eXAiOiJKV1QiLCJhbGc...",
    token_type="bearer",
    expires_at=datetime.now() + timedelta(hours=1)
)

# Use in HTTP Authorization header
headers = {"Authorization": f"{tokens.token_type} {tokens.access_token}"}

Note

This class is frozen (immutable) to prevent accidental token modification. Tokens should be treated as opaque strings and never parsed by clients.

See also

AuthInterface.authenticate(): Method that returns token pairs AuthInterface.refresh(): Refresh access tokens

access_token : str
refresh_token : str
token_type : str = 'bearer'
expires_at : datetime | None = None
__init__(access_token: str, refresh_token: str, token_type: str = 'bearer', expires_at: datetime | None = None)
class structum_lab.auth.interfaces.UserInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for authenticated user entities in Structum Lab.

Applications must implement this protocol for their User model to integrate with the authentication system. The auth system never creates or modifies users - it only queries them via UserRepositoryInterface.

Example

Implementing UserInterface:

from dataclasses import dataclass

@dataclass
class User:
    id: str
    username: str
    hashed_password: str
    roles: list[str]
    permissions: set[str]

    def has_permission(self, permission: str) -> bool:
        # Check role-based permissions
        for role in self.roles:
            if permission in ROLE_PERMISSIONS.get(role, set()):
                return True
        # Check user-specific permissions
        return permission in self.permissions

Using with auth:

user = user_repo.find_by_username("john")
if user and user.has_permission("users:write"):
    # Allow operation
    pass

Note

This is a Protocol, not a base class. Your User model doesn’t need to inherit from this - just implement the required properties and methods.

See also

UserRepositoryInterface: Repository for user data access AuthInterface: Authentication provider using users

property id : str

Unique identifier for the user.

Returns:

User ID, typically a UUID or database primary key.

Return type:

str

Example

User ID in token payload:

token_payload = {"user_id": user.id, "exp": ...}
property username : str

User’s username or email address.

Returns:

Username, typically used for login and display.

Return type:

str

Example

username in login:

user = user_repo.find_by_username(username)
if user and auth.verify_password(password, user.hashed_password):
    return auth.authenticate(...)
property roles : list[str]

List of roles assigned to the user.

Returns:

Role names (e.g., [“admin”, “user”, “moderator”]).

Return type:

list[str]

Example

Role-based access control:

if "admin" in user.roles:
    # Allow admin operation
    pass

Note

Roles should be lowercase strings. Use has_permission() for fine-grained permission checks.

property hashed_password : str

User’s hashed password.

Returns:

Securely hashed password (e.g., Argon2, bcrypt).

Return type:

str

Warning

Never store or transmit plain-text passwords. This property should only return hashed values.

Example

Password verification:

if auth.verify_password(input_password, user.hashed_password):
    # Password matches
    pass

See also

PasswordHasherInterface.hash(): Hash passwords PasswordHasherInterface.verify(): Verify passwords

has_permission(permission: str) bool[source]

Check if user has a specific permission.

Parameters:
permission : str

Permission string, typically in format resource:action (e.g., “users:write”, “posts:delete”).

Returns:

True if user has the permission, False otherwise.

Return type:

bool

Example

Permission-based authorization:

@app.delete("/users/{user_id}")
async def delete_user(user_id: str, current_user: User = Depends(get_current_user)):
    if not current_user.has_permission("users:delete"):
        raise HTTPException(403, "Permission denied")
    # Delete user

Role-based permission mapping:

ROLE_PERMISSIONS = {
    "admin": {"users:read", "users:write", "users:delete"},
    "user": {"users:read"},
}

def has_permission(self, permission: str) -> bool:
    return any(
        permission in ROLE_PERMISSIONS.get(role, set())
        for role in self.roles
    )

Note

Permission format is application-defined. Use a consistent naming scheme (e.g., resource:action).

__init__(*args, **kwargs)
class structum_lab.auth.interfaces.UserRepositoryInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for user storage and retrieval in Structum Lab.

Applications implement this to connect the auth system with their database or user storage backend. The auth plugin does NOT manage user storage directly - it delegates all user operations to this repository.

This separation ensures the auth system remains storage-agnostic and can work with any database (PostgreSQL, MongoDB, etc.) or user service (LDAP, OAuth providers).

Implementations:
  • Database-backed repository (SQLAlchemy, etc.)

  • External user service adapter (LDAP, Active Directory)

  • In-memory repository (testing only)

Example

SQLAlchemy repository implementation:

class SQLAlchemyUserRepository:
    def __init__(self, db: DatabaseInterface):
        self.db = db

    def find_by_username(self, username: str) -> User | None:
        with self.db.transaction() as conn:
            conn.execute(
                "SELECT * FROM users WHERE username = :username",
                {"username": username}
            )
            row = conn.fetchone()
            if not row:
                return None

            return User(
                id=row["id"],
                username=row["username"],
                hashed_password=row["password_hash"],
                roles=row.get("roles", []),
            )

    def find_by_id(self, user_id: str) -> User | None:
        with self.db.transaction() as conn:
            conn.execute(
                "SELECT * FROM users WHERE id = :id",
                {"id": user_id}
            )
            row = conn.fetchone()
            return User(**row) if row else None

Using with authentication:

user_repo = SQLAlchemyUserRepository(db)
auth = JWTAuthProvider.from_config()

tokens = auth.authenticate("john", "password123", user_repo)
if tokens:
    print(f"Access token: {tokens.access_token}")

Note

Repository is responsible for mapping storage format to UserInterface. It should handle serialization/deserialization of user data.

See also

UserInterface: User entity protocol AuthInterface: Authentication provider using repositories

find_by_username(username: str) UserInterface | None[source]

Find a user by username or email.

Parameters:
username : str

Username or email to search for. Should be case-insensitive in most implementations.

Returns:

User if found, None otherwise.

Return type:

UserInterface | None

Example

Looking up user for authentication:

user = user_repo.find_by_username("john@example.com")
if user:
    # Verify password
    if auth.verify_password(password, user.hashed_password):
        return auth.create_tokens(user)
else:
    log.warning("Login attempt for unknown user", username=username)

Note

Implementation should normalize username (e.g., lowercase) before lookup. Consider using database indexes on username column for performance.

find_by_id(user_id: str) UserInterface | None[source]

Find a user by their unique identifier.

Parameters:
user_id : str

User’s unique identifier (typically UUID or database ID).

Returns:

User if found, None otherwise.

Return type:

UserInterface | None

Example

Loading user from token:

# After verifying access token
payload = auth.verify_access_token(token)
if payload:
    user = user_repo.find_by_id(payload["user_id"])
    if user:
        # User authenticated
        return user

Note

This method is called frequently (on every authenticated request). Consider caching user data for performance.

__init__(*args, **kwargs)
class structum_lab.auth.interfaces.PasswordHasherInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for secure password hashing in Structum Lab.

Implementations must use cryptographically secure hashing algorithms (e.g., Argon2, b

crypt, scrypt). Never use fast hashes like MD5 or SHA-1

for passwords.

Implementations:
  • Argon2Hasher (recommended)

  • BcryptHasher

Example:

Using password hasher:

from structum_lab.plugins.auth.password import Argon2Hasher

hasher = Argon2Hasher()

# Hash password during registration
hashed = hasher.hash("user_password_123")
# Store hashed in database: user.hashed_password = hashed

# Verify during login
if hasher.verify("user_password_123", hashed):
    # Password matches
    return create_token(user)
else:
    # Invalid password
    raise AuthenticationError("Invalid credentials")
Warning:

Never log, display, or store plain-text passwords. Always hash passwords immediately upon receipt.

See Also:

AuthInterface: Auth provider using password hasher UserInterface: User entity with hashed_password property

hash(password: str) str[source]

Hash a plain-text password securely.

Parameters:
password : str

Plain-text password to hash. No length restrictions, but implementations may truncate very long passwords.

Returns:

Hashed password string including algorithm identifier and salt.

Format is implementation-specific (e.g., Argon2: $argon2id$v=19$...).

Return type:

str

Example

Creating user with hashed password:

# During user registration
plain_password = request.form["password"]
hashed = auth.hash_password(plain_password)

user = User(
    id=generate_id(),
    username=request.form["username"],
    hashed_password=hashed,  # Store this
    roles=["user"]
)
user_repo.save(user)

Warning

Hashing is intentionally slow (100-500ms) to resist brute-force attacks. Do not hash passwords in tight loops or performance-critical paths.

Note

Each call generates a unique hash (due to random salt) even for the same password. This is expected and secure behavior.

verify(password: str, hashed: str) bool[source]

Verify a plain-text password against a hash.

Parameters:
password : str

Plain-text password to verify.

hashed : str

Previously hashed password (from database).

Returns:

True if password matches hash, False otherwise.

Return type:

bool

Example

Password verification during login:

# Get user from database
user = user_repo.find_by_username(username)
if not user:
    return None  # User not found

# Verify password
if auth.verify_password(password, user.hashed_password):
    # Authentication successful
    return auth.create_tokens(user)
else:
    # Invalid password
    log.warning("Failed login attempt", username=username)
    return None

Warning

Always use constant-time comparison internally to prevent timing attacks. Most modern hashing libraries handle this automatically.

Note

Returns False for invalid/malformed hashes rather than raising exceptions. This prevents information leakage about hash format.

__init__(*args, **kwargs)
class structum_lab.auth.interfaces.AuthInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for authentication providers in Structum Lab.

This is the main entry point for all authentication operations. Implementations provide JWT-based authentication, password hashing, and token management.

The auth system is storage-agnostic - it doesn’t manage users directly. Instead, it uses UserRepositoryInterface to fetch user data, keeping authentication logic decoupled from storage.

Implementations:

Example

See usage in specific implementations like JWTAuthProvider.

authenticate(username: str, password: str, user_repo: UserRepositoryInterface) TokenPair | None[source]

Authenticate a user.

refresh(refresh_token: str, user_repo: UserRepositoryInterface) TokenPair | None[source]

Refresh access token.

verify_access_token(token: str) dict[str, Any] | None[source]

Verify an access token.

hash_password(password: str) str[source]

Hash a password.

verify_password(password: str, hashed: str) bool[source]

Verify a password.

__init__(*args, **kwargs)
structum_lab.auth.interfaces.User

alias of UserInterface

structum_lab.auth.interfaces.UserRepository

alias of UserRepositoryInterface

structum_lab.auth.interfaces.Auth

alias of AuthInterface

structum_lab.auth.interfaces.PasswordHasher

alias of PasswordHasherInterface

Validation

Core Validation Interfaces.

This module defines the protocols for system validation. Plugins can implement Validator to participate in the bootstrap process without depending directly on the structum-bootstrap package.

class structum_lab.validation.ValidationContext(*args, **kwargs)[source]

Bases: Protocol

Context object passed to validators to collect results.

add_check(name: str, passed: bool, message: str = '') None[source]

Record the result of a specific check.

add_warning(message: str) None[source]

Record a non-fatal warning.

is_valid() bool[source]

Returns True if all checks passed so far.

__init__(*args, **kwargs)
class structum_lab.validation.Validator(*args, **kwargs)[source]

Bases: Protocol

Protocol for any component that can perform validation.

validate(context: ValidationContext) None[source]

Execute validation logic and update the context.

Parameters:
context: ValidationContext

The bootstrap context to record success/failure/warnings.

__init__(*args, **kwargs)

Plugins System