Source code for structum_lab.database.interfaces

# Database Interfaces - Core Protocols
# SPDX-License-Identifier: Apache-2.0

"""
Core database interfaces for Structum Lab.

This module defines the Protocols that any database implementation must follow.
Implementations are provided by the `structum-database` plugin.

Example:
    >>> from structum_lab.plugins.database import SQLAlchemyDatabase
    >>> db = SQLAlchemyDatabase.from_config()
    >>> with db.transaction() as conn:
    ...     conn.execute("SELECT 1")
"""

from __future__ import annotations

from abc import abstractmethod
from contextlib import contextmanager
from dataclasses import dataclass
from enum import Enum
from typing import Any, ContextManager, Iterator, Protocol, runtime_checkable


[docs] class HealthStatus(Enum): """ Enumeration of possible database health states. Attributes: HEALTHY: Database is fully operational with normal latency. DEGRADED: Database is responsive but experiencing issues (e.g., high latency, connection warnings). UNHEALTHY: Database is unreachable or critically impaired. Example: Checking health status:: result = db.health_check() if result.status == HealthStatus.HEALTHY: log.info("Database OK", latency_ms=result.latency_ms) elif result.status == HealthStatus.DEGRADED: log.warning("Database degraded", message=result.message) else: log.error("Database unhealthy", message=result.message) """ HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy"
[docs] @dataclass(frozen=True) class HealthCheckResult: """ Data class representing database health check results. Attributes: status (HealthStatus): Overall health status (HEALTHY, DEGRADED, UNHEALTHY). message (str): Human-readable status description. latency_ms (float | None): Query latency in milliseconds, if available. details (dict[str, Any] | None): Additional diagnostic information (e.g., active connections, pool statistics). Example: Creating and using a health check result:: result = HealthCheckResult( status=HealthStatus.HEALTHY, message="Database connection OK", latency_ms=2.5, details={"active_connections": 5, "pool_size": 10} ) # Use in monitoring/alerts if result.latency_ms and result.latency_ms > 100: alert("High database latency", result.latency_ms) Note: This class is frozen (immutable) to ensure health check results cannot be modified after creation. """ status: HealthStatus message: str latency_ms: float | None = None details: dict[str, Any] | None = None
[docs] @runtime_checkable class ConnectionInterface(Protocol): """ Protocol for database connections in Structum Lab. A connection represents an active link to the database that can execute queries and retrieve results. Connections are typically obtained from a connection pool and should be returned after use. Implementations: - :class:`~structum_lab.plugins.database.sqlalchemy.SQLAlchemyConnection` - :class:`~structum_lab.plugins.database.postgres.PostgresConnection` Example: Using a connection within a transaction:: with db.transaction() as conn: # Execute query with named parameters conn.execute( "INSERT INTO users (name, email) VALUES (:name, :email)", {"name": "John", "email": "john@example.com"} ) # Fetch results conn.execute("SELECT * FROM users WHERE id > :id", {"id": 100}) users = conn.fetchall() for user in users: print(f"{user['name']} - {user['email']}") Note: Connections are not thread-safe. Use one connection per thread or protect access with locks. See Also: :class:`DatabaseInterface`: Database manager providing connections :meth:`DatabaseInterface.transaction`: Recommended way to get connections """
[docs] def execute( self, query: str, params: dict[str, Any] | tuple[Any, ...] | None = None, ) -> Any: """ Execute a SQL query with optional parameters. Supports both named parameters (dict) and positional parameters (tuple). Use ``:name`` syntax for named parameters, ``?`` for positional. Args: query (str): SQL query string. Use ``:param_name`` for named parameters or ``?`` for positional parameters. params (dict[str, Any] | tuple[Any, ...] | None): Query parameters. Use dict for named parameters, tuple for positional. Defaults to None. Returns: Any: Result object (implementation-specific). Use fetch methods to retrieve rows. Raises: DatabaseError: If query execution fails. ParameterError: If parameters don't match query placeholders. Example: Different parameter styles:: # Named parameters (recommended) conn.execute( "SELECT * FROM users WHERE age > :min_age AND city = :city", {"min_age": 18, "city": "NYC"} ) # Positional parameters conn.execute( "SELECT * FROM users WHERE age > ? AND city = ?", (18, "NYC") ) # No parameters conn.execute("SELECT COUNT(*) FROM users") Warning: Always use parameterized queries. Never use string interpolation for user input (SQL injection risk). See Also: :meth:`fetchone`: Retrieve single row :meth:`fetchall`: Retrieve all rows """ ...
[docs] def fetchone(self) -> dict[str, Any] | None: """ Fetch the next row from the last executed query. Returns: dict[str, Any] | None: Row as dictionary with column names as keys, or None if no more rows available. Example: Fetching single row:: conn.execute("SELECT * FROM users WHERE id = :id", {"id": 1}) user = conn.fetchone() if user: print(f"Found user: {user['name']}") else: print("User not found") Note: Call after :meth:`execute`. Returns None when cursor exhausted. See Also: :meth:`fetchall`: Fetch all remaining rows :meth:`fetchmany`: Fetch specific number of rows """ ...
[docs] def fetchall(self) -> list[dict[str, Any]]: """ Fetch all remaining rows from the last executed query. Returns: list[dict[str, Any]]: List of rows as dictionaries. Empty list if no rows or cursor exhausted. Example: Fetching multiple rows:: conn.execute("SELECT * FROM users WHERE active = :active", {"active": True}) users = conn.fetchall() for user in users: print(f"{user['id']}: {user['name']}") print(f"Total active users: {len(users)}") Warning: For large result sets, consider using :meth:`fetchmany` to avoid loading all rows into memory at once. See Also: :meth:`fetchone`: Fetch single row :meth:`fetchmany`: Fetch rows in batches """ ...
[docs] def fetchmany(self, size: int) -> list[dict[str, Any]]: """ Fetch up to ``size`` rows from the last executed query. Useful for processing large result sets in batches to manage memory usage. Args: size (int): Maximum number of rows to fetch. Must be > 0. Returns: list[dict[str, Any]]: List of rows (up to size). May return fewer rows if result set is exhausted. Empty list if no rows remain. Example: Batch processing large result set:: conn.execute("SELECT * FROM large_table") batch_size = 100 while True: batch = conn.fetchmany(batch_size) if not batch: break process_batch(batch) print(f"Processed {len(batch)} rows") Note: Efficient for iterating large datasets without loading everything into memory. See Also: :meth:`fetchone`: Fetch single row :meth:`fetchall`: Fetch all rows """ ...
[docs] @runtime_checkable class TransactionInterface(Protocol): """ Protocol for database transactions providing ACID guarantees. Transactions group multiple database operations into a single atomic unit. Either all operations succeed (commit) or all fail (rollback). Example: Manual transaction management:: tx = db.begin_transaction() try: conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1") conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2") tx.commit() except Exception: tx.rollback() raise Preferred context manager approach:: # Automatic commit/rollback with db.transaction() as conn: conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1") conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2") # Commits automatically if no exception Note: Most applications should use :meth:`DatabaseInterface.transaction` context manager instead of managing transactions manually. See Also: :meth:`DatabaseInterface.transaction`: Recommended transaction API """
[docs] def commit(self) -> None: """ Commit the current transaction. Makes all changes since transaction start permanent in the database. After commit, the transaction is no longer active. Raises: DatabaseError: If commit fails (e.g., constraint violation). TransactionError: If transaction is not active. Example: Manual commit:: tx = db.begin_transaction() try: conn.execute("INSERT INTO logs VALUES (:msg)", {"msg": "test"}) tx.commit() log.info("Transaction committed successfully") except DatabaseError as e: tx.rollback() log.error("Commit failed", exc_info=True) Warning: After commit, the transaction cannot be reused. Start a new transaction for additional operations. See Also: :meth:`rollback`: Abort transaction """ ...
[docs] def rollback(self) -> None: """ Rollback the current transaction. Discards all changes made since transaction start. The database state returns to what it was before the transaction began. Raises: TransactionError: If transaction is not active. Example: Manual rollback on error:: tx = db.begin_transaction() try: conn.execute("DELETE FROM important_data") # Validation fails if not validate_deletion(): tx.rollback() log.warning("Deletion rolled back - validation failed") return tx.commit() except Exception: tx.rollback() raise Note: Rollback is safe to call multiple times. Subsequent calls are no-ops. See Also: :meth:`commit`: Persist transaction changes """ ...
@property def is_active(self) -> bool: """ Check if the transaction is still active. Returns: bool: True if transaction can accept operations, False if already committed or rolled back. Example: Checking transaction state:: tx = db.begin_transaction() assert tx.is_active # True tx.commit() assert not tx.is_active # False # This would raise TransactionError # tx.execute("SELECT 1") Note: Useful for conditional logic and error handling. """ ...
[docs] @runtime_checkable class DatabaseInterface(Protocol): """ Protocol for database managers in Structum Lab. This is the main entry point for all database operations. Implementations must provide connection pooling, transaction management, and health monitoring. The protocol abstracts away database-specific details, allowing applications to work with different backends (PostgreSQL, MySQL, SQLite) through a unified interface. Implementations: - :class:`~structum_lab.plugins.database.sqlalchemy.SQLAlchemyDatabase` - :class:`~structum_lab.plugins.database.postgres.PostgresDatabase` Example: Basic usage with configuration::\\ from structum_lab.plugins.database import SQLAlchemyDatabase # Initialize from config db = SQLAlchemyDatabase.from_config() # Or with explicit URL db = SQLAlchemyDatabase(url="postgresql://user:pass@localhost/mydb") # Use transaction context manager (recommended) with db.transaction() as conn: conn.execute( "INSERT INTO users (name, email) VALUES (:name, :email)", {"name": "John", "email": "john@example.com"} ) user_id = conn.execute("SELECT last_insert_id()").fetchone()["id"] conn.execute( "INSERT INTO profiles (user_id, bio) VALUES (:uid, :bio)", {"uid": user_id, "bio": "Software engineer"} ) # Commits automatically on success # Check health health = db.health_check() if health.status != HealthStatus.HEALTHY: log.warning("Database issues detected", result=health) Note: Always use the :meth:`transaction` context manager for database operations. It ensures proper connection pooling, automatic commit/rollback, and resource cleanup. See Also: :class:`ConnectionInterface`: Connection protocol :class:`TransactionInterface`: Transaction protocol :class:`HealthCheckResult`: Health check result data class """ @property def url(self) -> str: """ Get the database connection URL (sanitized). Returns: str: Database URL with password redacted/masked for security. Example: ``"postgresql://user:***@localhost:5432/mydb"`` Example: Logging database configuration:: log.info("Connected to database", url=db.url) # Logs: postgresql://user:***@localhost/mydb Note: Passwords are automatically redacted to prevent accidental logging of credentials. """ ... @property def is_connected(self) -> bool: """ Check if the database connection pool is active. Returns: bool: True if connection pool is established, False otherwise. Example: Conditional connection:: if not db.is_connected: db.connect() log.info("Database connection established") Note: Most operations call :meth:`connect` automatically if not connected. Explicit checking is mainly useful for health checks and diagnostics. """ ...
[docs] def connect(self) -> None: """ Establish the database connection pool. Creates and initializes the connection pool. Called automatically on first database operation if not already connected. Raises: ConnectionError: If unable to connect to database. ConfigurationError: If database URL or settings are invalid. Example: Explicit connection during startup:: db = SQLAlchemyDatabase.from_config() try: db.connect() log.info("Database pool created", url=db.url) except ConnectionError as e: log.critical("Cannot connect to database", exc_info=True) sys.exit(1) Note: Usually not needed - the database connects automatically on first use. Explicit connection is useful for fail-fast behavior during startup. See Also: :meth:`close`: Shutdown connection pool """ ...
[docs] def close(self) -> None: """ Close all connections in the pool and release resources. Should be called during application shutdown to ensure clean termination. Any ongoing transactions should be completed before closing. Raises: RuntimeError: If called while transactions are active (implementation-specific). Example: Application shutdown:: import atexit db = SQLAlchemyDatabase.from_config () atexit.register(db.close) # Ensure cleanup on exit # Or in FastAPI lifespan @asynccontextmanager async def lifespan(app: FastAPI): db.connect() yield db.close() # Clean shutdown Warning: After calling close(), the database instance should not be reused. Create a new instance for additional operations. See Also: :meth:`connect`: Initialize connection pool """ ...
[docs] def get_connection(self) -> ConnectionInterface: """ Acquire a connection from the pool. Returns a connection that must be explicitly returned to the pool after use. **Prefer using** :meth:`transaction` **instead.** Returns: ConnectionInterface: A database connection from the pool. Raises: PoolExhaustedError: If no connections are available and pool is at maximum. ConnectionError: If pool is not initialized. Example: Manual connection management (not recommended):: conn = db.get_connection() try: result = conn.execute("SELECT * FROM users") users = result.fetchall() finally: # Must return connection to pool manually conn.close() Warning: Manual connection management is error-prone. Always prefer :meth:`transaction` context manager which handles connections automatically. See Also: :meth:`transaction`: Recommended connection API """ ...
[docs] @contextmanager def transaction(self) -> Iterator[ConnectionInterface]: """ Context manager for database transactions. Provides automatic transaction management: commits on success, rolls back on exception. This is the recommended way to perform database operations. Yields: ConnectionInterface: Database connection with active transaction. Raises: DatabaseError: If transaction fails to start. ConnectionError: If pool is exhausted or not connected. Example: Recommended usage pattern:: # Single transaction with db.transaction() as conn: conn.execute( "UPDATE accounts SET balance = balance - :amount WHERE id = :id", {"amount": 100, "id": 1} ) conn.execute( "UPDATE accounts SET balance = balance + :amount WHERE id = :id", {"amount": 100, "id": 2} ) # Commits automatically if no exception # Exception triggers rollback try: with db.transaction() as conn: conn.execute("DELETE FROM important_data") raise ValueError("Validation failed") except ValueError: pass # Transaction automatically rolled back Note: Transactions are isolated - changes are not visible to other connections until commit. Isolation level depends on database implementation (usually READ COMMITTED). See Also: :class:`TransactionInterface`: Transaction protocol :class:`ConnectionInterface`: Connection protocol """ ...
[docs] def health_check(self) -> HealthCheckResult: """ Check database connectivity and health status. Performs a simple query to verify the database is responsive and measures latency. Useful for readiness probes and monitoring. Returns: HealthCheckResult: Health status with latency and diagnostic details. Example: Health check endpoint:: @app.get("/health/database") def database_health(): result = db.health_check() if result.status == HealthStatus.UNHEALTHY: raise HTTPException(503, detail=result.message) return { "status": result.status.value, "latency_ms": result.latency_ms, "message": result.message } Prometheus metrics:: result = db.health_check() # Record health status db_health_gauge.labels(database="main").set( 1 if result.status == HealthStatus.HEALTHY else 0 ) # Record latency if result.latency_ms: db_latency_histogram.observe(result.latency_ms) Note: Health checks execute a lightweight query (usually `SELECT 1`). They should complete quickly (<100ms typically). See Also: :class:`HealthCheckResult`: Result data class :class:`HealthStatus`: Health status enumeration """ ...
# Type alias for clarity in type hints Database = DatabaseInterface Connection = ConnectionInterface Transaction = TransactionInterface