# Database Interfaces - Core Protocols
# SPDX-License-Identifier: Apache-2.0
"""
Core database interfaces for Structum Lab.
This module defines the Protocols that any database implementation must follow.
Implementations are provided by the `structum-database` plugin.
Example:
>>> from structum_lab.plugins.database import SQLAlchemyDatabase
>>> db = SQLAlchemyDatabase.from_config()
>>> with db.transaction() as conn:
... conn.execute("SELECT 1")
"""
from __future__ import annotations
from abc import abstractmethod
from contextlib import contextmanager
from dataclasses import dataclass
from enum import Enum
from typing import Any, ContextManager, Iterator, Protocol, runtime_checkable
[docs]
class HealthStatus(Enum):
"""
Enumeration of possible database health states.
Attributes:
HEALTHY: Database is fully operational with normal latency.
DEGRADED: Database is responsive but experiencing issues
(e.g., high latency, connection warnings).
UNHEALTHY: Database is unreachable or critically impaired.
Example:
Checking health status::
result = db.health_check()
if result.status == HealthStatus.HEALTHY:
log.info("Database OK", latency_ms=result.latency_ms)
elif result.status == HealthStatus.DEGRADED:
log.warning("Database degraded", message=result.message)
else:
log.error("Database unhealthy", message=result.message)
"""
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
[docs]
@dataclass(frozen=True)
class HealthCheckResult:
"""
Data class representing database health check results.
Attributes:
status (HealthStatus): Overall health status (HEALTHY, DEGRADED, UNHEALTHY).
message (str): Human-readable status description.
latency_ms (float | None): Query latency in milliseconds, if available.
details (dict[str, Any] | None): Additional diagnostic information
(e.g., active connections, pool statistics).
Example:
Creating and using a health check result::
result = HealthCheckResult(
status=HealthStatus.HEALTHY,
message="Database connection OK",
latency_ms=2.5,
details={"active_connections": 5, "pool_size": 10}
)
# Use in monitoring/alerts
if result.latency_ms and result.latency_ms > 100:
alert("High database latency", result.latency_ms)
Note:
This class is frozen (immutable) to ensure health check results
cannot be modified after creation.
"""
status: HealthStatus
message: str
latency_ms: float | None = None
details: dict[str, Any] | None = None
[docs]
@runtime_checkable
class ConnectionInterface(Protocol):
"""
Protocol for database connections in Structum Lab.
A connection represents an active link to the database that can
execute queries and retrieve results. Connections are typically
obtained from a connection pool and should be returned after use.
Implementations:
- :class:`~structum_lab.plugins.database.sqlalchemy.SQLAlchemyConnection`
- :class:`~structum_lab.plugins.database.postgres.PostgresConnection`
Example:
Using a connection within a transaction::
with db.transaction() as conn:
# Execute query with named parameters
conn.execute(
"INSERT INTO users (name, email) VALUES (:name, :email)",
{"name": "John", "email": "john@example.com"}
)
# Fetch results
conn.execute("SELECT * FROM users WHERE id > :id", {"id": 100})
users = conn.fetchall()
for user in users:
print(f"{user['name']} - {user['email']}")
Note:
Connections are not thread-safe. Use one connection per thread
or protect access with locks.
See Also:
:class:`DatabaseInterface`: Database manager providing connections
:meth:`DatabaseInterface.transaction`: Recommended way to get connections
"""
[docs]
def execute(
self,
query: str,
params: dict[str, Any] | tuple[Any, ...] | None = None,
) -> Any:
"""
Execute a SQL query with optional parameters.
Supports both named parameters (dict) and positional parameters (tuple).
Use ``:name`` syntax for named parameters, ``?`` for positional.
Args:
query (str): SQL query string. Use ``:param_name`` for named
parameters or ``?`` for positional parameters.
params (dict[str, Any] | tuple[Any, ...] | None): Query parameters.
Use dict for named parameters, tuple for positional.
Defaults to None.
Returns:
Any: Result object (implementation-specific). Use fetch methods
to retrieve rows.
Raises:
DatabaseError: If query execution fails.
ParameterError: If parameters don't match query placeholders.
Example:
Different parameter styles::
# Named parameters (recommended)
conn.execute(
"SELECT * FROM users WHERE age > :min_age AND city = :city",
{"min_age": 18, "city": "NYC"}
)
# Positional parameters
conn.execute(
"SELECT * FROM users WHERE age > ? AND city = ?",
(18, "NYC")
)
# No parameters
conn.execute("SELECT COUNT(*) FROM users")
Warning:
Always use parameterized queries. Never use string interpolation
for user input (SQL injection risk).
See Also:
:meth:`fetchone`: Retrieve single row
:meth:`fetchall`: Retrieve all rows
"""
...
[docs]
def fetchone(self) -> dict[str, Any] | None:
"""
Fetch the next row from the last executed query.
Returns:
dict[str, Any] | None: Row as dictionary with column names as keys,
or None if no more rows available.
Example:
Fetching single row::
conn.execute("SELECT * FROM users WHERE id = :id", {"id": 1})
user = conn.fetchone()
if user:
print(f"Found user: {user['name']}")
else:
print("User not found")
Note:
Call after :meth:`execute`. Returns None when cursor exhausted.
See Also:
:meth:`fetchall`: Fetch all remaining rows
:meth:`fetchmany`: Fetch specific number of rows
"""
...
[docs]
def fetchall(self) -> list[dict[str, Any]]:
"""
Fetch all remaining rows from the last executed query.
Returns:
list[dict[str, Any]]: List of rows as dictionaries. Empty list
if no rows or cursor exhausted.
Example:
Fetching multiple rows::
conn.execute("SELECT * FROM users WHERE active = :active", {"active": True})
users = conn.fetchall()
for user in users:
print(f"{user['id']}: {user['name']}")
print(f"Total active users: {len(users)}")
Warning:
For large result sets, consider using :meth:`fetchmany` to avoid
loading all rows into memory at once.
See Also:
:meth:`fetchone`: Fetch single row
:meth:`fetchmany`: Fetch rows in batches
"""
...
[docs]
def fetchmany(self, size: int) -> list[dict[str, Any]]:
"""
Fetch up to ``size`` rows from the last executed query.
Useful for processing large result sets in batches to manage memory usage.
Args:
size (int): Maximum number of rows to fetch. Must be > 0.
Returns:
list[dict[str, Any]]: List of rows (up to size). May return fewer
rows if result set is exhausted. Empty list if no rows remain.
Example:
Batch processing large result set::
conn.execute("SELECT * FROM large_table")
batch_size = 100
while True:
batch = conn.fetchmany(batch_size)
if not batch:
break
process_batch(batch)
print(f"Processed {len(batch)} rows")
Note:
Efficient for iterating large datasets without loading
everything into memory.
See Also:
:meth:`fetchone`: Fetch single row
:meth:`fetchall`: Fetch all rows
"""
...
[docs]
@runtime_checkable
class TransactionInterface(Protocol):
"""
Protocol for database transactions providing ACID guarantees.
Transactions group multiple database operations into a single atomic unit.
Either all operations succeed (commit) or all fail (rollback).
Example:
Manual transaction management::
tx = db.begin_transaction()
try:
conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
tx.commit()
except Exception:
tx.rollback()
raise
Preferred context manager approach::
# Automatic commit/rollback
with db.transaction() as conn:
conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
# Commits automatically if no exception
Note:
Most applications should use :meth:`DatabaseInterface.transaction`
context manager instead of managing transactions manually.
See Also:
:meth:`DatabaseInterface.transaction`: Recommended transaction API
"""
[docs]
def commit(self) -> None:
"""
Commit the current transaction.
Makes all changes since transaction start permanent in the database.
After commit, the transaction is no longer active.
Raises:
DatabaseError: If commit fails (e.g., constraint violation).
TransactionError: If transaction is not active.
Example:
Manual commit::
tx = db.begin_transaction()
try:
conn.execute("INSERT INTO logs VALUES (:msg)", {"msg": "test"})
tx.commit()
log.info("Transaction committed successfully")
except DatabaseError as e:
tx.rollback()
log.error("Commit failed", exc_info=True)
Warning:
After commit, the transaction cannot be reused. Start a new
transaction for additional operations.
See Also:
:meth:`rollback`: Abort transaction
"""
...
[docs]
def rollback(self) -> None:
"""
Rollback the current transaction.
Discards all changes made since transaction start. The database
state returns to what it was before the transaction began.
Raises:
TransactionError: If transaction is not active.
Example:
Manual rollback on error::
tx = db.begin_transaction()
try:
conn.execute("DELETE FROM important_data")
# Validation fails
if not validate_deletion():
tx.rollback()
log.warning("Deletion rolled back - validation failed")
return
tx.commit()
except Exception:
tx.rollback()
raise
Note:
Rollback is safe to call multiple times. Subsequent calls are no-ops.
See Also:
:meth:`commit`: Persist transaction changes
"""
...
@property
def is_active(self) -> bool:
"""
Check if the transaction is still active.
Returns:
bool: True if transaction can accept operations, False if
already committed or rolled back.
Example:
Checking transaction state::
tx = db.begin_transaction()
assert tx.is_active # True
tx.commit()
assert not tx.is_active # False
# This would raise TransactionError
# tx.execute("SELECT 1")
Note:
Useful for conditional logic and error handling.
"""
...
[docs]
@runtime_checkable
class DatabaseInterface(Protocol):
"""
Protocol for database managers in Structum Lab.
This is the main entry point for all database operations. Implementations
must provide connection pooling, transaction management, and health monitoring.
The protocol abstracts away database-specific details, allowing applications
to work with different backends (PostgreSQL, MySQL, SQLite) through a
unified interface.
Implementations:
- :class:`~structum_lab.plugins.database.sqlalchemy.SQLAlchemyDatabase`
- :class:`~structum_lab.plugins.database.postgres.PostgresDatabase`
Example:
Basic usage with configuration::\\
from structum_lab.plugins.database import SQLAlchemyDatabase
# Initialize from config
db = SQLAlchemyDatabase.from_config()
# Or with explicit URL
db = SQLAlchemyDatabase(url="postgresql://user:pass@localhost/mydb")
# Use transaction context manager (recommended)
with db.transaction() as conn:
conn.execute(
"INSERT INTO users (name, email) VALUES (:name, :email)",
{"name": "John", "email": "john@example.com"}
)
user_id = conn.execute("SELECT last_insert_id()").fetchone()["id"]
conn.execute(
"INSERT INTO profiles (user_id, bio) VALUES (:uid, :bio)",
{"uid": user_id, "bio": "Software engineer"}
)
# Commits automatically on success
# Check health
health = db.health_check()
if health.status != HealthStatus.HEALTHY:
log.warning("Database issues detected", result=health)
Note:
Always use the :meth:`transaction` context manager for database operations.
It ensures proper connection pooling, automatic commit/rollback, and
resource cleanup.
See Also:
:class:`ConnectionInterface`: Connection protocol
:class:`TransactionInterface`: Transaction protocol
:class:`HealthCheckResult`: Health check result data class
"""
@property
def url(self) -> str:
"""
Get the database connection URL (sanitized).
Returns:
str: Database URL with password redacted/masked for security.
Example: ``"postgresql://user:***@localhost:5432/mydb"``
Example:
Logging database configuration::
log.info("Connected to database", url=db.url)
# Logs: postgresql://user:***@localhost/mydb
Note:
Passwords are automatically redacted to prevent accidental logging
of credentials.
"""
...
@property
def is_connected(self) -> bool:
"""
Check if the database connection pool is active.
Returns:
bool: True if connection pool is established, False otherwise.
Example:
Conditional connection::
if not db.is_connected:
db.connect()
log.info("Database connection established")
Note:
Most operations call :meth:`connect` automatically if not connected.
Explicit checking is mainly useful for health checks and diagnostics.
"""
...
[docs]
def connect(self) -> None:
"""
Establish the database connection pool.
Creates and initializes the connection pool. Called automatically
on first database operation if not already connected.
Raises:
ConnectionError: If unable to connect to database.
ConfigurationError: If database URL or settings are invalid.
Example:
Explicit connection during startup::
db = SQLAlchemyDatabase.from_config()
try:
db.connect()
log.info("Database pool created", url=db.url)
except ConnectionError as e:
log.critical("Cannot connect to database", exc_info=True)
sys.exit(1)
Note:
Usually not needed - the database connects automatically on first use.
Explicit connection is useful for fail-fast behavior during startup.
See Also:
:meth:`close`: Shutdown connection pool
"""
...
[docs]
def close(self) -> None:
"""
Close all connections in the pool and release resources.
Should be called during application shutdown to ensure clean termination.
Any ongoing transactions should be completed before closing.
Raises:
RuntimeError: If called while transactions are active (implementation-specific).
Example:
Application shutdown::
import atexit
db = SQLAlchemyDatabase.from_config ()
atexit.register(db.close) # Ensure cleanup on exit
# Or in FastAPI lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
db.connect()
yield
db.close() # Clean shutdown
Warning:
After calling close(), the database instance should not be reused.
Create a new instance for additional operations.
See Also:
:meth:`connect`: Initialize connection pool
"""
...
[docs]
def get_connection(self) -> ConnectionInterface:
"""
Acquire a connection from the pool.
Returns a connection that must be explicitly returned to the pool
after use. **Prefer using** :meth:`transaction` **instead.**
Returns:
ConnectionInterface: A database connection from the pool.
Raises:
PoolExhaustedError: If no connections are available and pool is at maximum.
ConnectionError: If pool is not initialized.
Example:
Manual connection management (not recommended)::
conn = db.get_connection()
try:
result = conn.execute("SELECT * FROM users")
users = result.fetchall()
finally:
# Must return connection to pool manually
conn.close()
Warning:
Manual connection management is error-prone. Always prefer
:meth:`transaction` context manager which handles connections
automatically.
See Also:
:meth:`transaction`: Recommended connection API
"""
...
[docs]
@contextmanager
def transaction(self) -> Iterator[ConnectionInterface]:
"""
Context manager for database transactions.
Provides automatic transaction management: commits on success,
rolls back on exception. This is the recommended way to perform
database operations.
Yields:
ConnectionInterface: Database connection with active transaction.
Raises:
DatabaseError: If transaction fails to start.
ConnectionError: If pool is exhausted or not connected.
Example:
Recommended usage pattern::
# Single transaction
with db.transaction() as conn:
conn.execute(
"UPDATE accounts SET balance = balance - :amount WHERE id = :id",
{"amount": 100, "id": 1}
)
conn.execute(
"UPDATE accounts SET balance = balance + :amount WHERE id = :id",
{"amount": 100, "id": 2}
)
# Commits automatically if no exception
# Exception triggers rollback
try:
with db.transaction() as conn:
conn.execute("DELETE FROM important_data")
raise ValueError("Validation failed")
except ValueError:
pass # Transaction automatically rolled back
Note:
Transactions are isolated - changes are not visible to other
connections until commit. Isolation level depends on database
implementation (usually READ COMMITTED).
See Also:
:class:`TransactionInterface`: Transaction protocol
:class:`ConnectionInterface`: Connection protocol
"""
...
[docs]
def health_check(self) -> HealthCheckResult:
"""
Check database connectivity and health status.
Performs a simple query to verify the database is responsive and
measures latency. Useful for readiness probes and monitoring.
Returns:
HealthCheckResult: Health status with latency and diagnostic details.
Example:
Health check endpoint::
@app.get("/health/database")
def database_health():
result = db.health_check()
if result.status == HealthStatus.UNHEALTHY:
raise HTTPException(503, detail=result.message)
return {
"status": result.status.value,
"latency_ms": result.latency_ms,
"message": result.message
}
Prometheus metrics::
result = db.health_check()
# Record health status
db_health_gauge.labels(database="main").set(
1 if result.status == HealthStatus.HEALTHY else 0
)
# Record latency
if result.latency_ms:
db_latency_histogram.observe(result.latency_ms)
Note:
Health checks execute a lightweight query (usually `SELECT 1`).
They should complete quickly (<100ms typically).
See Also:
:class:`HealthCheckResult`: Result data class
:class:`HealthStatus`: Health status enumeration
"""
...
# Type alias for clarity in type hints
Database = DatabaseInterface
Connection = ConnectionInterface
Transaction = TransactionInterface