Database Plugin API Reference¶
Database connectivity and ORM integration plugin for Structum Lab.
Database Plugin Package¶
Database plugin for Structum Lab.
Provides production-ready database connectivity with support for: - SQLAlchemy 2.0+ (PostgreSQL, MySQL, SQLite, etc.) - psycopg3 (direct PostgreSQL, high performance)
- Quick Start:
>>> from structum_lab.plugins.database import SQLAlchemyDatabase >>> >>> # Direct configuration >>> db = SQLAlchemyDatabase(url="postgresql://user:pass@localhost/mydb") >>> >>> # Or using config provider (Factory Pattern) >>> db = SQLAlchemyDatabase.from_config() >>> >>> with db.transaction() as conn: ... conn.execute("SELECT * FROM users") ... users = conn.fetchall()- Installation:
pip install structum-database[sqlalchemy] # SQLAlchemy backend pip install structum-database[postgres] # psycopg3 backend pip install structum-database[all] # Both backends
- class structum_lab.plugins.database.DatabaseInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for database managers in Structum Lab.
This is the main entry point for all database operations. Implementations must provide connection pooling, transaction management, and health monitoring.
The protocol abstracts away database-specific details, allowing applications to work with different backends (PostgreSQL, MySQL, SQLite) through a unified interface.
- Implementations:
Example
Basic usage with configuration::
from structum_lab.plugins.database import SQLAlchemyDatabase
# Initialize from config db = SQLAlchemyDatabase.from_config()
# Or with explicit URL db = SQLAlchemyDatabase(url=”postgresql://user:pass@localhost/mydb”)
# Use transaction context manager (recommended) with db.transaction() as conn:
- conn.execute(
“INSERT INTO users (name, email) VALUES (:name, :email)”, {“name”: “John”, “email”: “john@example.com”}
)
user_id = conn.execute(“SELECT last_insert_id()”).fetchone()[“id”]
- conn.execute(
“INSERT INTO profiles (user_id, bio) VALUES (:uid, :bio)”, {“uid”: user_id, “bio”: “Software engineer”}
) # Commits automatically on success
# Check health health = db.health_check() if health.status != HealthStatus.HEALTHY:
log.warning(“Database issues detected”, result=health)
Note
Always use the
transaction()context manager for database operations. It ensures proper connection pooling, automatic commit/rollback, and resource cleanup.See also
ConnectionInterface: Connection protocolTransactionInterface: Transaction protocolHealthCheckResult: Health check result data class- close() None[source]¶
Close all connections in the pool and release resources.
Should be called during application shutdown to ensure clean termination. Any ongoing transactions should be completed before closing.
- Raises:¶
RuntimeError – If called while transactions are active (implementation-specific).
Example
Application shutdown:
import atexit db = SQLAlchemyDatabase.from_config () atexit.register(db.close) # Ensure cleanup on exit # Or in FastAPI lifespan @asynccontextmanager async def lifespan(app: FastAPI): db.connect() yield db.close() # Clean shutdownWarning
After calling close(), the database instance should not be reused. Create a new instance for additional operations.
See also
connect(): Initialize connection pool
- connect() None[source]¶
Establish the database connection pool.
Creates and initializes the connection pool. Called automatically on first database operation if not already connected.
- Raises:¶
ConnectionError – If unable to connect to database.
ConfigurationError – If database URL or settings are invalid.
Example
Explicit connection during startup:
db = SQLAlchemyDatabase.from_config() try: db.connect() log.info("Database pool created", url=db.url) except ConnectionError as e: log.critical("Cannot connect to database", exc_info=True) sys.exit(1)Note
Usually not needed - the database connects automatically on first use. Explicit connection is useful for fail-fast behavior during startup.
See also
close(): Shutdown connection pool
- get_connection() ConnectionInterface[source]¶
Acquire a connection from the pool.
Returns a connection that must be explicitly returned to the pool after use. Prefer using
transaction()instead.- Returns:¶
A database connection from the pool.
- Return type:¶
- Raises:¶
PoolExhaustedError – If no connections are available and pool is at maximum.
ConnectionError – If pool is not initialized.
Example
Manual connection management (not recommended):
conn = db.get_connection() try: result = conn.execute("SELECT * FROM users") users = result.fetchall() finally: # Must return connection to pool manually conn.close()Warning
Manual connection management is error-prone. Always prefer
transaction()context manager which handles connections automatically.See also
transaction(): Recommended connection API
- health_check() HealthCheckResult[source]¶
Check database connectivity and health status.
Performs a simple query to verify the database is responsive and measures latency. Useful for readiness probes and monitoring.
Example
Health check endpoint:
@app.get("/health/database") def database_health(): result = db.health_check() if result.status == HealthStatus.UNHEALTHY: raise HTTPException(503, detail=result.message) return { "status": result.status.value, "latency_ms": result.latency_ms, "message": result.message }Prometheus metrics:
result = db.health_check() # Record health status db_health_gauge.labels(database="main").set( 1 if result.status == HealthStatus.HEALTHY else 0 ) # Record latency if result.latency_ms: db_latency_histogram.observe(result.latency_ms)Note
Health checks execute a lightweight query (usually SELECT 1). They should complete quickly (<100ms typically).
See also
HealthCheckResult: Result data classHealthStatus: Health status enumeration
- property is_connected : bool¶
Check if the database connection pool is active.
Example
Conditional connection:
if not db.is_connected: db.connect() log.info("Database connection established")Note
Most operations call
connect()automatically if not connected. Explicit checking is mainly useful for health checks and diagnostics.
- transaction() Iterator[ConnectionInterface][source]¶
Context manager for database transactions.
Provides automatic transaction management: commits on success, rolls back on exception. This is the recommended way to perform database operations.
- Yields:¶
ConnectionInterface – Database connection with active transaction.
- Raises:¶
DatabaseError – If transaction fails to start.
ConnectionError – If pool is exhausted or not connected.
Example
Recommended usage pattern:
# Single transaction with db.transaction() as conn: conn.execute( "UPDATE accounts SET balance = balance - :amount WHERE id = :id", {"amount": 100, "id": 1} ) conn.execute( "UPDATE accounts SET balance = balance + :amount WHERE id = :id", {"amount": 100, "id": 2} ) # Commits automatically if no exception # Exception triggers rollback try: with db.transaction() as conn: conn.execute("DELETE FROM important_data") raise ValueError("Validation failed") except ValueError: pass # Transaction automatically rolled backNote
Transactions are isolated - changes are not visible to other connections until commit. Isolation level depends on database implementation (usually READ COMMITTED).
See also
TransactionInterface: Transaction protocolConnectionInterface: Connection protocol
- property url : str¶
Get the database connection URL (sanitized).
- Returns:¶
- Database URL with password redacted/masked for security.
Example:
"postgresql://user:***@localhost:5432/mydb"
- Return type:¶
Example
Logging database configuration:
log.info("Connected to database", url=db.url) # Logs: postgresql://user:***@localhost/mydbNote
Passwords are automatically redacted to prevent accidental logging of credentials.
- class structum_lab.plugins.database.ConnectionInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for database connections in Structum Lab.
A connection represents an active link to the database that can execute queries and retrieve results. Connections are typically obtained from a connection pool and should be returned after use.
- Implementations:
Example
Using a connection within a transaction:
with db.transaction() as conn: # Execute query with named parameters conn.execute( "INSERT INTO users (name, email) VALUES (:name, :email)", {"name": "John", "email": "john@example.com"} ) # Fetch results conn.execute("SELECT * FROM users WHERE id > :id", {"id": 100}) users = conn.fetchall() for user in users: print(f"{user['name']} - {user['email']}")Note
Connections are not thread-safe. Use one connection per thread or protect access with locks.
See also
DatabaseInterface: Database manager providing connectionsDatabaseInterface.transaction(): Recommended way to get connections-
execute(query: str, params: dict[str, Any] | tuple[Any, ...] | None =
None) Any[source]¶ Execute a SQL query with optional parameters.
Supports both named parameters (dict) and positional parameters (tuple). Use
:namesyntax for named parameters,?for positional.- Parameters:¶
- Returns:¶
- Result object (implementation-specific). Use fetch methods
to retrieve rows.
- Return type:¶
Any
- Raises:¶
DatabaseError – If query execution fails.
ParameterError – If parameters don’t match query placeholders.
Example
Different parameter styles:
# Named parameters (recommended) conn.execute( "SELECT * FROM users WHERE age > :min_age AND city = :city", {"min_age": 18, "city": "NYC"} ) # Positional parameters conn.execute( "SELECT * FROM users WHERE age > ? AND city = ?", (18, "NYC") ) # No parameters conn.execute("SELECT COUNT(*) FROM users")Warning
Always use parameterized queries. Never use string interpolation for user input (SQL injection risk).
See also
fetchone(): Retrieve single rowfetchall(): Retrieve all rows
- fetchall() list[dict[str, Any]][source]¶
Fetch all remaining rows from the last executed query.
Example
Fetching multiple rows:
conn.execute("SELECT * FROM users WHERE active = :active", {"active": True}) users = conn.fetchall() for user in users: print(f"{user['id']}: {user['name']}") print(f"Total active users: {len(users)}")Warning
For large result sets, consider using
fetchmany()to avoid loading all rows into memory at once.See also
fetchone(): Fetch single rowfetchmany(): Fetch rows in batches
- fetchmany(size: int) list[dict[str, Any]][source]¶
Fetch up to
sizerows from the last executed query.Useful for processing large result sets in batches to manage memory usage.
- Parameters:¶
- Returns:¶
- List of rows (up to size). May return fewer
rows if result set is exhausted. Empty list if no rows remain.
- Return type:¶
Example
Batch processing large result set:
conn.execute("SELECT * FROM large_table") batch_size = 100 while True: batch = conn.fetchmany(batch_size) if not batch: break process_batch(batch) print(f"Processed {len(batch)} rows")Note
Efficient for iterating large datasets without loading everything into memory.
See also
fetchone(): Fetch single rowfetchall(): Fetch all rows
- fetchone() dict[str, Any] | None[source]¶
Fetch the next row from the last executed query.
- Returns:¶
- Row as dictionary with column names as keys,
or None if no more rows available.
- Return type:¶
Example
Fetching single row:
conn.execute("SELECT * FROM users WHERE id = :id", {"id": 1}) user = conn.fetchone() if user: print(f"Found user: {user['name']}") else: print("User not found")Note
Call after
execute(). Returns None when cursor exhausted.See also
fetchall(): Fetch all remaining rowsfetchmany(): Fetch specific number of rows
- class structum_lab.plugins.database.TransactionInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for database transactions providing ACID guarantees.
Transactions group multiple database operations into a single atomic unit. Either all operations succeed (commit) or all fail (rollback).
Example
Manual transaction management:
tx = db.begin_transaction() try: conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1") conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2") tx.commit() except Exception: tx.rollback() raisePreferred context manager approach:
# Automatic commit/rollback with db.transaction() as conn: conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1") conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2") # Commits automatically if no exceptionNote
Most applications should use
DatabaseInterface.transaction()context manager instead of managing transactions manually.See also
DatabaseInterface.transaction(): Recommended transaction API- commit() None[source]¶
Commit the current transaction.
Makes all changes since transaction start permanent in the database. After commit, the transaction is no longer active.
- Raises:¶
DatabaseError – If commit fails (e.g., constraint violation).
TransactionError – If transaction is not active.
Example
Manual commit:
tx = db.begin_transaction() try: conn.execute("INSERT INTO logs VALUES (:msg)", {"msg": "test"}) tx.commit() log.info("Transaction committed successfully") except DatabaseError as e: tx.rollback() log.error("Commit failed", exc_info=True)Warning
After commit, the transaction cannot be reused. Start a new transaction for additional operations.
See also
rollback(): Abort transaction
- property is_active : bool¶
Check if the transaction is still active.
- Returns:¶
- True if transaction can accept operations, False if
already committed or rolled back.
- Return type:¶
Example
Checking transaction state:
tx = db.begin_transaction() assert tx.is_active # True tx.commit() assert not tx.is_active # False # This would raise TransactionError # tx.execute("SELECT 1")Note
Useful for conditional logic and error handling.
- rollback() None[source]¶
Rollback the current transaction.
Discards all changes made since transaction start. The database state returns to what it was before the transaction began.
- Raises:¶
TransactionError – If transaction is not active.
Example
Manual rollback on error:
tx = db.begin_transaction() try: conn.execute("DELETE FROM important_data") # Validation fails if not validate_deletion(): tx.rollback() log.warning("Deletion rolled back - validation failed") return tx.commit() except Exception: tx.rollback() raiseNote
Rollback is safe to call multiple times. Subsequent calls are no-ops.
See also
commit(): Persist transaction changes
-
class structum_lab.plugins.database.HealthCheckResult(status: HealthStatus, message: str, latency_ms: float | None =
None, details: dict[str, Any] | None =None)[source]¶ Bases:
objectData class representing database health check results.
- details¶
Additional diagnostic information (e.g., active connections, pool statistics).
Example
Creating and using a health check result:
result = HealthCheckResult( status=HealthStatus.HEALTHY, message="Database connection OK", latency_ms=2.5, details={"active_connections": 5, "pool_size": 10} ) # Use in monitoring/alerts if result.latency_ms and result.latency_ms > 100: alert("High database latency", result.latency_ms)Note
This class is frozen (immutable) to ensure health check results cannot be modified after creation.
-
__init__(status: HealthStatus, message: str, latency_ms: float | None =
None, details: dict[str, Any] | None =None)¶
- status : HealthStatus¶
- class structum_lab.plugins.database.HealthStatus(*values)[source]¶
Bases:
EnumEnumeration of possible database health states.
- HEALTHY¶
Database is fully operational with normal latency.
- DEGRADED¶
Database is responsive but experiencing issues (e.g., high latency, connection warnings).
- UNHEALTHY¶
Database is unreachable or critically impaired.
Example
Checking health status:
result = db.health_check() if result.status == HealthStatus.HEALTHY: log.info("Database OK", latency_ms=result.latency_ms) elif result.status == HealthStatus.DEGRADED: log.warning("Database degraded", message=result.message) else: log.error("Database unhealthy", message=result.message)-
HEALTHY =
'healthy'¶
-
DEGRADED =
'degraded'¶
-
UNHEALTHY =
'unhealthy'¶
-
class structum_lab.plugins.database.BaseDatabase(url: str, *, pool_size: int =
5, pool_timeout: int =30, echo: bool =False, **kwargs: Any)[source]¶ Bases:
ABCAbstract base class for database implementations.
Provides the Factory Pattern via .from_config() class method. Subclasses must implement the abstract methods for their specific backend.
Example
>>> # Direct instantiation >>> db = SQLAlchemyDatabase(url="postgresql://...") >>> >>> # Factory pattern with config >>> db = SQLAlchemyDatabase.from_config() >>> db = SQLAlchemyDatabase.from_config(config_key="secondary_db")- __enter__() BaseDatabase[source]¶
Context manager entry.
-
__init__(url: str, *, pool_size: int =
5, pool_timeout: int =30, echo: bool =False, **kwargs: Any)[source]¶ Initialize database with connection parameters.
-
classmethod from_config(config_key: str | None =
None, *, config: ConfigInterface | None =None) BaseDatabase[source]¶ Factory method to create database from config provider.
This method reads database configuration from the active config provider (e.g., dynaconf), allowing decoupled configuration.
- Parameters:¶
- Returns:¶
Configured database instance
- Raises:¶
RuntimeError – If no config provider is available
KeyError – If required config keys are missing
Example
>>> # Uses default config key "database" >>> db = SQLAlchemyDatabase.from_config() >>> >>> # Uses custom config key for secondary database >>> db = SQLAlchemyDatabase.from_config("replica_db")- Config Structure:
`toml [database] url = "postgresql://user:pass@localhost/mydb" pool_size = 10 pool_timeout = 30 echo = false `
- abstractmethod get_connection() ConnectionInterface[source]¶
Get a connection from the pool.
- abstractmethod health_check() HealthCheckResult[source]¶
Check database health.
- abstractmethod transaction() Iterator[ConnectionInterface][source]¶
Context manager for transactions.
- structum_lab.plugins.database.__getattr__(name: str)[source]¶
Lazy loading of backends to handle optional dependencies.
Base Database¶
Base class for database implementations with Factory Pattern support.
The factory pattern allows database backends to be configured via: 1. Direct instantiation: SQLAlchemyDatabase(url=”…”) 2. Config provider: SQLAlchemyDatabase.from_config()
The .from_config() method uses the active config provider (e.g., dynaconf) to read database settings, enabling decoupled configuration.
-
class structum_lab.plugins.database.base.BaseDatabase(url: str, *, pool_size: int =
5, pool_timeout: int =30, echo: bool =False, **kwargs: Any)[source]¶ Bases:
ABCAbstract base class for database implementations.
Provides the Factory Pattern via .from_config() class method. Subclasses must implement the abstract methods for their specific backend.
Example
>>> # Direct instantiation >>> db = SQLAlchemyDatabase(url="postgresql://...") >>> >>> # Factory pattern with config >>> db = SQLAlchemyDatabase.from_config() >>> db = SQLAlchemyDatabase.from_config(config_key="secondary_db")-
__init__(url: str, *, pool_size: int =
5, pool_timeout: int =30, echo: bool =False, **kwargs: Any)[source]¶ Initialize database with connection parameters.
-
classmethod from_config(config_key: str | None =
None, *, config: ConfigInterface | None =None) BaseDatabase[source]¶ Factory method to create database from config provider.
This method reads database configuration from the active config provider (e.g., dynaconf), allowing decoupled configuration.
- Parameters:¶
- Returns:¶
Configured database instance
- Raises:¶
RuntimeError – If no config provider is available
KeyError – If required config keys are missing
Example
>>> # Uses default config key "database" >>> db = SQLAlchemyDatabase.from_config() >>> >>> # Uses custom config key for secondary database >>> db = SQLAlchemyDatabase.from_config("replica_db")- Config Structure:
`toml [database] url = "postgresql://user:pass@localhost/mydb" pool_size = 10 pool_timeout = 30 echo = false `
- abstractmethod get_connection() ConnectionInterface[source]¶
Get a connection from the pool.
- abstractmethod transaction() Iterator[ConnectionInterface][source]¶
Context manager for transactions.
- abstractmethod health_check() HealthCheckResult[source]¶
Check database health.
- __enter__() BaseDatabase[source]¶
Context manager entry.
-
__init__(url: str, *, pool_size: int =
SQLAlchemy Backend¶
SQLAlchemy-based database implementation for Structum Lab.
This module provides a production-ready database backend using SQLAlchemy 2.0+. It supports PostgreSQL, MySQL, SQLite, and other databases supported by SQLAlchemy.
Example
>>> from structum_lab.plugins.database import SQLAlchemyDatabase
>>>
>>> db = SQLAlchemyDatabase(url="postgresql://user:pass@localhost/mydb")
>>> with db.transaction() as conn:
... conn.execute("SELECT * FROM users")
... users = conn.fetchall()
- class structum_lab.plugins.database.sqlalchemy.SQLAlchemyConnection(session: Session)[source]¶
Bases:
objectSQLAlchemy connection wrapper implementing ConnectionInterface.
-
class structum_lab.plugins.database.sqlalchemy.SQLAlchemyDatabase(url: str, *, pool_size: int =
5, pool_timeout: int =30, echo: bool =False, pool_pre_ping: bool =True, **kwargs: Any)[source]¶ Bases:
BaseDatabaseSQLAlchemy-based database implementation.
Production-ready database backend with: - Connection pooling - Transaction management - Health checks with latency measurement - Support for PostgreSQL, MySQL, SQLite, etc.
Example
>>> db = SQLAlchemyDatabase(url="postgresql://...") >>> db.connect() >>> >>> with db.transaction() as conn: ... conn.execute("INSERT INTO users (name) VALUES (:name)", {"name": "John"}) >>> >>> db.close()-
__init__(url: str, *, pool_size: int =
5, pool_timeout: int =30, echo: bool =False, pool_pre_ping: bool =True, **kwargs: Any)[source]¶ Initialize SQLAlchemy database.
- get_connection() ConnectionInterface[source]¶
Get a connection from the pool.
Note
Prefer using transaction() context manager for proper cleanup.
- transaction() Iterator[ConnectionInterface][source]¶
Execute operations within a transaction.
Automatically commits on success, rolls back on exception.
- Yields:¶
Connection with active transaction
- health_check() HealthCheckResult[source]¶
Check database connectivity.
- Returns:¶
HealthCheckResult with status and latency
-
__init__(url: str, *, pool_size: int =
PostgreSQL Backend¶
Direct PostgreSQL database implementation using psycopg3.
This module provides a high-performance PostgreSQL backend using psycopg3 (the modern successor to psycopg2). It offers better async support and native connection pooling.
Example
>>> from structum_lab.plugins.database import PostgresDatabase
>>>
>>> db = PostgresDatabase(url="postgresql://user:pass@localhost/mydb")
>>> with db.transaction() as conn:
... conn.execute("SELECT * FROM users")
... users = conn.fetchall()
- class structum_lab.plugins.database.postgres.PostgresConnection(conn: Any)[source]¶
Bases:
objectpsycopg3 connection wrapper implementing ConnectionInterface.
-
class structum_lab.plugins.database.postgres.PostgresDatabase(url: str, *, pool_size: int =
5, pool_timeout: int =30, echo: bool =False, min_size: int =1, **kwargs: Any)[source]¶ Bases:
BaseDatabaseDirect PostgreSQL database implementation using psycopg3.
High-performance PostgreSQL backend with: - Native connection pooling (psycopg_pool) - Better async support than psycopg2 - Modern Python type support
Example
>>> db = PostgresDatabase(url="postgresql://...") >>> db.connect() >>> >>> with db.transaction() as conn: ... conn.execute("INSERT INTO users (name) VALUES (%(name)s)", {"name": "John"}) >>> >>> db.close()Note
This backend only supports PostgreSQL. For other databases, use SQLAlchemyDatabase instead.
-
__init__(url: str, *, pool_size: int =
5, pool_timeout: int =30, echo: bool =False, min_size: int =1, **kwargs: Any)[source]¶ Initialize PostgreSQL database.
- get_connection() ConnectionInterface[source]¶
Get a connection from the pool.
Note
Prefer using transaction() context manager for proper cleanup.
- transaction() Iterator[ConnectionInterface][source]¶
Execute operations within a transaction.
Automatically commits on success, rolls back on exception.
- Yields:¶
Connection with active transaction
- health_check() HealthCheckResult[source]¶
Check database connectivity.
- Returns:¶
HealthCheckResult with status and latency
-
__init__(url: str, *, pool_size: int =