Modulo Database (structum_lab.database)¶
Protocolli e Interfacce Core per l’astrazione dell’accesso ai dati (Repository/UoW).
Feature |
Dettagli |
|---|---|
Namespace |
|
Ruolo |
Definizione Interfacce (Protocol) |
Indice¶
1. Cos’è il Modulo Database¶
Il modulo structum_lab.database definisce le Protocol interfaces per database operations. Non contiene implementazioni - solo contratti per connection management, transactions, e health monitoring.
1.1 Separation of Concerns¶
┌───────────────────────────────────────────┐
│ Application Layer │
│ - Uses: get_database(), transaction() │
│ - Executes: SQL queries │
└─────────────────┬─────────────────────────┘
│
┌─────────────────▼─────────────────────────┐
│ Core Module (THIS) │
│ - Defines: Protocols (interfaces) │
│ - Zero Dependencies │
│ - Type Contracts Only │
└─────────────────┬─────────────────────────┘
│
┌─────────────────▼─────────────────────────┐
│ Plugin Layer │
│ - Provides: SQLAlchemyDatabase │
│ - Implements: DatabaseInterface │
│ - Dependencies: sqlalchemy, psycopg │
└───────────────────────────────────────────┘
Benefits:
Testability: Mock database without SQL dependencies
Flexibility: Switch databases (SQLAlchemy → psycopg3 → custom)
Zero Lock-in: Core stays database-agnostic
2. Architettura Protocol-Based¶
2.1 Protocol vs Abstract Base Class¶
Structum usa typing.Protocol per structural subtyping:
✅ Protocol Advantage:
from typing import Protocol
class DatabaseInterface(Protocol):
def transaction(self) -> TransactionContext:
...
# NO inheritance required
class MyDatabase: # Automatically compatible!
def transaction(self) -> TransactionContext:
# Implementation
return MyTransactionContext()
# Type checker verifies compatibility
db: DatabaseInterface = MyDatabase() # ✓ Works!
2.2 Static Type Checking¶
from structum_lab.database import DatabaseInterface
def fetch_user(db: DatabaseInterface, user_id: int) -> dict:
"""Type-safe database function."""
with db.transaction() as conn:
result = conn.execute(
"SELECT * FROM users WHERE id = :id",
{"id": user_id}
)
return result.fetchone()
# mypy verifies db has transaction() at compile-time
3. DatabaseInterface Protocol¶
3.1 Protocol Definition¶
from typing import Protocol, ContextManager
class DatabaseInterface(Protocol):
"""
Protocol for database connectivity.
Implementations must provide:
- Connection management
- Transaction support
- Health monitoring
"""
def transaction(self) -> ContextManager[ConnectionInterface]:
"""
Get transaction context manager.
Usage:
with db.transaction() as conn:
conn.execute("SELECT ...")
Returns:
Context manager that:
- Acquires connection from pool
- Commits on success
- Rollbacks on exception
- Returns connection to pool
"""
...
def get_connection(self) -> ConnectionInterface:
"""
Get raw connection (manual management).
Warning:
MUST call connection.close() manually.
Prefer transaction() context manager.
"""
...
def health_check(self) -> HealthResult:
"""
Perform database health check.
Returns:
HealthResult with status, message, latency
"""
...
def shutdown(self) -> None:
"""
Gracefully shutdown database connections.
Call at application shutdown to:
- Close all pool connections
- Release resources
"""
...
3.2 Usage Example¶
from structum_lab.database import get_database
# Get database instance
db = get_database()
# Execute query in transaction
with db.transaction() as conn:
# Insert
result = conn.execute(
"INSERT INTO users (username, email) VALUES (:u, :e) RETURNING id",
{"u": "john_doe", "e": "john@example.com"}
)
new_id = result.fetchone()["id"]
# Query
result = conn.execute(
"SELECT * FROM users WHERE id = :id",
{"id": new_id}
)
user = result.fetchone()
# Auto-commit on success
# Auto-rollback on exception
# Check health
health = db.health_check()
if health.status == HealthStatus.HEALTHY:
print(f"Database OK - latency: {health.latency_ms}ms")
4. ConnectionInterface Protocol¶
4.1 Protocol Definition¶
from typing import Protocol, Any, Optional
class ConnectionInterface(Protocol):
"""
Protocol for active database connection.
Represents a single connection that can execute queries.
"""
def execute(self, query: str, params: dict = None) -> ResultProxy:
"""
Execute SQL query.
Args:
query: SQL statement (use :param for placeholders)
params: Dictionary of parameter values
Returns:
ResultProxy for fetching results
Example:
result = conn.execute(
"SELECT * FROM users WHERE id = :id",
{"id": 123}
)
user = result.fetchone()
"""
...
def commit(self) -> None:
"""Commit current transaction."""
...
def rollback(self) -> None:
"""Rollback current transaction."""
...
def close(self) -> None:
"""Close connection and return to pool."""
...
4.2 ResultProxy Protocol¶
class ResultProxy(Protocol):
"""Protocol for query results."""
def fetchone(self) -> Optional[dict]:
"""
Fetch single row.
Returns:
Dictionary with column names as keys
None if no rows
"""
...
def fetchall(self) -> list[dict]:
"""
Fetch all remaining rows.
Returns:
List of dictionaries
"""
...
def fetchmany(self, size: int = None) -> list[dict]:
"""
Fetch batch of rows.
Args:
size: Number of rows (default: arraysize)
Returns:
List of dictionaries
"""
...
@property
def rowcount(self) -> int:
"""Number of rows affected by query."""
...
4.3 Usage Examples¶
Select Query:
with db.transaction() as conn:
result = conn.execute("SELECT * FROM users WHERE active = :active", {"active": True})
# Single row
user = result.fetchone()
if user:
print(f"User: {user['username']}")
# All rows
users = result.fetchall()
for user in users:
print(user['username'])
# Batch fetch
batch = result.fetchmany(size=100)
Insert Query:
with db.transaction() as conn:
result = conn.execute(
"INSERT INTO users (username, email) VALUES (:u, :e) RETURNING id",
{"u": "jane", "e": "jane@example.com"}
)
new_id = result.fetchone()["id"]
print(f"Created user with ID: {new_id}")
Update Query:
with db.transaction() as conn:
result = conn.execute(
"UPDATE users SET last_login = :now WHERE id = :id",
{"now": datetime.utcnow(), "id": 123}
)
rows_updated = result.rowcount
print(f"Updated {rows_updated} rows")
5. TransactionContext Protocol¶
5.1 Protocol Definition¶
from typing import Protocol, ContextManager
class TransactionContext(Protocol):
"""Context manager for database transactions."""
def __enter__(self) -> ConnectionInterface:
"""
Acquire connection and start transaction.
Returns:
Connection for executing queries
"""
...
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
"""
Complete transaction and release connection.
Behavior:
- If exc_type is None: COMMIT
- If exception occurred: ROLLBACK
- Always: Return connection to pool
"""
...
5.2 Transaction Behavior¶
# Success case
with db.transaction() as conn:
conn.execute("INSERT INTO orders (...) VALUES (...)")
conn.execute("INSERT INTO order_items (...) VALUES (...)")
# Automatic COMMIT on clean exit
# Failure case
try:
with db.transaction() as conn:
conn.execute("INSERT INTO orders (...) VALUES (...)")
raise ValueError("Something went wrong!")
conn.execute("INSERT INTO order_items (...) VALUES (...)")
except ValueError:
# Automatic ROLLBACK - no partial data
pass
5.3 Nested Transactions¶
def create_user_with_profile(username: str, email: str):
"""Transactional function."""
with db.transaction() as conn:
# Insert user
result = conn.execute(
"INSERT INTO users (username, email) VALUES (:u, :e) RETURNING id",
{"u": username, "e": email}
)
user_id = result.fetchone()["id"]
# Insert profile (same transaction)
conn.execute(
"INSERT INTO profiles (user_id, bio) VALUES (:id, :bio)",
{"id": user_id, "bio": "New user"}
)
# Both inserts commit atomically
return user_id
6. Health Check System¶
6.1 HealthResult Data Structure¶
from enum import Enum
from dataclasses import dataclass
class HealthStatus(Enum):
"""Health check status enumeration."""
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
@dataclass
class HealthResult:
"""Database health check result."""
status: HealthStatus # Overall health status
message: str # Human-readable message
latency_ms: float # Query latency in milliseconds
details: dict = None # Optional detailed info
6.2 Health Check Usage¶
from structum_lab.database import get_database, HealthStatus
db = get_database()
health = db.health_check()
# Check status
if health.status == HealthStatus.HEALTHY:
print(f"✓ Database healthy - latency: {health.latency_ms}ms")
elif health.status == HealthStatus.DEGRADED:
print(f"⚠ Database degraded: {health.message}")
else:
print(f"✗ Database unhealthy: {health.message}")
# Log metrics
from structum_lab.logging import get_logger
logger = get_logger(__name__)
logger.info(
"Database health check",
status=health.status.value,
latency_ms=health.latency_ms,
message=health.message
)
6.3 FastAPI Health Endpoint¶
from fastapi import FastAPI
app = FastAPI()
@app.get("/health")
async def health_check():
"""Application health endpoint."""
db = get_database()
db_health = db.health_check()
return {
"status": "healthy" if db_health.status == HealthStatus.HEALTHY else "unhealthy",
"database": {
"status": db_health.status.value,
"latency_ms": db_health.latency_ms,
"message": db_health.message
}
}
7. Implementazioni Available¶
7.1 Official Plugins¶
Plugin |
Backend |
Databases |
Features |
|---|---|---|---|
structum-database |
SQLAlchemy 2.0 |
PostgreSQL, MySQL, SQLite, Oracle, MSSQL |
ORM support, Universal |
structum-database |
psycopg3 |
PostgreSQL only |
High performance, Async native |
Installation:
# SQLAlchemy backend
pip install -e packages/database[sqlalchemy]
# PostgreSQL backend
pip install -e packages/database[postgres]
# Both
pip install -e packages/database[all]
Setup:
from structum_lab.database import set_database
from structum_lab.plugins.database import SQLAlchemyDatabase
# Initialize and register
db = SQLAlchemyDatabase.from_config()
set_database(db)
# Now available globally
from structum_lab.database import get_database
db = get_database()
7.2 Factory Pattern¶
from structum_lab.plugins.database import SQLAlchemyDatabase
# Method 1: Direct configuration
db = SQLAlchemyDatabase(
url="postgresql://user:pass@localhost/mydb",
pool_size=10
)
# Method 2: From config provider (recommended)
db = SQLAlchemyDatabase.from_config()
# Reads: config.get("database.url"), config.get("database.pool_size"), etc.
# Method 3: Custom config namespace
primary_db = SQLAlchemyDatabase.from_config("primary_db")
replica_db = SQLAlchemyDatabase.from_config("replica_db")
8. Usage Patterns¶
8.1 Global Singleton Pattern¶
from structum_lab.database import get_database, set_database
from structum_lab.plugins.database import SQLAlchemyDatabase
# Setup (once at application startup)
def setup_database():
db = SQLAlchemyDatabase.from_config()
set_database(db)
# Usage (anywhere in application)
def get_user(user_id: int):
db = get_database()
with db.transaction() as conn:
result = conn.execute(
"SELECT * FROM users WHERE id = :id",
{"id": user_id}
)
return result.fetchone()
8.2 Dependency Injection Pattern¶
from structum_lab.database import DatabaseInterface
class UserRepository:
"""Repository with injected database dependency."""
def __init__(self, db: DatabaseInterface):
self.db = db
def find_by_id(self, user_id: int) -> dict | None:
with self.db.transaction() as conn:
result = conn.execute(
"SELECT * FROM users WHERE id = :id",
{"id": user_id}
)
return result.fetchone()
# With DI container
from structum_lab.plugins.di import StructumContainer
class AppContainer(StructumContainer):
database = providers.Singleton(SQLAlchemyDatabase.from_config)
user_repo = providers.Factory(
UserRepository,
db=database
)
8.3 FastAPI Integration¶
from fastapi import FastAPI, Depends
from structum_lab.database import get_database, DatabaseInterface
app = FastAPI()
# Startup/shutdown
@app.on_event("startup")
async def startup():
db = SQLAlchemyDatabase.from_config()
set_database(db)
@app.on_event("shutdown")
async def shutdown():
db = get_database()
db.shutdown()
# Dependency
def get_db() -> DatabaseInterface:
return get_database()
# Route
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: DatabaseInterface = Depends(get_db)):
with db.transaction() as conn:
result = conn.execute(
"SELECT * FROM users WHERE id = :id",
{"id": user_id}
)
user = result.fetchone()
if not user:
raise HTTPException(status_code=404)
return user
9. API Reference¶
9.1 Global Functions¶
def get_database() -> DatabaseInterface:
"""
Get global database singleton.
Raises:
RuntimeError if set_database() not called
"""
...
def set_database(db: DatabaseInterface) -> None:
"""
Set global database instance (call once at startup).
Args:
db: Database implementation instance
"""
...
9.2 Protocols¶
class DatabaseInterface(Protocol):
def transaction(self) -> ContextManager[ConnectionInterface]:
...
def get_connection(self) -> ConnectionInterface:
...
def health_check(self) -> HealthResult:
...
def shutdown(self) -> None:
...
class ConnectionInterface(Protocol):
def execute(self, query: str, params: dict = None) -> ResultProxy:
...
def commit(self) -> None:
...
def rollback(self) -> None:
...
def close(self) -> None:
...
class ResultProxy(Protocol):
def fetchone(self) -> Optional[dict]:
...
def fetchall(self) -> list[dict]:
...
def fetchmany(self, size: int = None) -> list[dict]:
...
@property
def rowcount(self) -> int:
...
9.3 Data Structures¶
class HealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
@dataclass
class HealthResult:
status: HealthStatus
message: str
latency_ms: float
details: dict = None
See Also¶
Database Plugin - Full implementation guide (SQLAlchemy, PostgreSQL)
Configuration Module - Config provider interface
Auth Module - UserRepository pattern example