Structum Database (structum-database)¶
Structum Database offre un layer di astrazione robusto per SQLAlchemy (Sync/Async) e PostgreSQL (psycopg3).
Feature |
Stato |
Versione |
|---|---|---|
Stato |
Alpha |
0.1.0 |
Namespace |
|
|
Backend |
SQLAlchemy, PostgreSQL |
Indice¶
1. Cos’è Database Plugin¶
structum-database fornisce database connectivity production-ready con connection pooling, health monitoring, e transaction management.
1.1 Problema Risolto¶
Prima (senza plugin):
# ❌ Connection management manuale
import psycopg2
conn = psycopg2.connect("postgresql://...")
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
# Nessun pooling
# Nessun health check
# Transaction management manuale
Dopo (con plugin):
# ✅ Connection pooling automatico
# ✅ Health checks integrati
# ✅ Transaction context manager
# ✅ Type-safe queries
db = SQLAlchemyDatabase.from_config()
with db.transaction() as conn:
conn.execute("SELECT * FROM users WHERE id = :id", {"id": 1})
user = conn.fetchone()
1.2 Caratteristiche Principali¶
Feature |
Descrizione |
|---|---|
Multiple Backends |
SQLAlchemy (universal) o psycopg3 (PostgreSQL-only) |
Connection Pooling |
Automatic pool management con pre-ping |
Transaction Management |
Context managers per ACID guarantees |
Health Checks |
Built-in health monitoring con latency metrics |
Type Safety |
Full type hints per mypy strict |
Config Integration |
Setup via Dynaconf con secrets isolation |
2. Concetti Fondamentali¶
2.1 Backend Comparison¶
Feature |
SQLAlchemyDatabase |
PostgresDatabase |
|---|---|---|
Databases Supported |
PostgreSQL, MySQL, SQLite, Oracle, MSSQL |
PostgreSQL only |
Performance |
Good |
Excellent (native driver) |
ORM Support |
✅ Full SQLAlchemy ORM |
❌ Raw SQL only |
Async Support |
via asyncio extension |
✅ Native |
Pool Management |
SQLAlchemy Engine |
psycopg3 Pool |
Dependencies |
sqlalchemy>=2.0 |
psycopg[binary]>=3.1 |
Recommendation:
Use
SQLAlchemyDatabaseper progetti multi-database o con ORMUse
PostgresDatabaseper maximum performance PostgreSQL-only
2.2 Connection Lifecycle¶
┌─────────────────────────────────────────┐
│ Application Startup │
├─────────────────────────────────────────┤
│ db = SQLAlchemyDatabase.from_config() │
│ └─ Create connection pool │
│ (min_size=2, max_size=10) │
└─────────────────┬───────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ Request Handling │
├─────────────────────────────────────────┤
│ with db.transaction() as conn: │
│ └─ Acquire connection from pool │
│ Execute query │
│ Commit on success / Rollback │
│ Release connection to pool │
└─────────────────┬───────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ Application Shutdown │
├─────────────────────────────────────────┤
│ db.shutdown() │
│ └─ Close all pool connections │
└─────────────────────────────────────────┘
3. Quick Start (5 Minuti)¶
Step 1: Installazione¶
# SQLAlchemy backend
pip install -e packages/database[sqlalchemy]
# PostgreSQL backend
pip install -e packages/database[postgres]
# Both
pip install -e packages/database[all]
Step 2: Configurazione¶
File: config/app/database.toml
[default]
url = "postgresql://user:password@localhost:5432/mydb"
pool_size = 10
pool_timeout = 30
echo = false # Set true to log SQL queries
File: config/.secrets.toml
[database]
url = "postgresql://user:YOUR_PASSWORD@prod.db.example.com:5432/production_db"
Step 3: Basic Usage¶
from structum_lab.plugins.database import SQLAlchemyDatabase
from structum_lab.config import set_config_provider
from structum_lab.plugins.dynaconf import DynaconfConfigProvider
# Setup config
provider = DynaconfConfigProvider(root_path=".")
provider.auto_discover()
set_config_provider(provider)
# Create database connection
db = SQLAlchemyDatabase.from_config()
# Execute query in transaction
with db.transaction() as conn:
result = conn.execute(
"SELECT * FROM users WHERE id = :user_id",
{"user_id": 1}
)
user = result.fetchone()
if user:
print(f"Found user: {user['username']}")
# Shutdown (cleanup)
db.shutdown()
4. Backend SQLAlchemy¶
4.1 Initialization¶
from structum_lab.plugins.database import SQLAlchemyDatabase
# From config
db = SQLAlchemyDatabase.from_config()
# Manual initialization
db = SQLAlchemyDatabase(
url="postgresql://user:pass@localhost/mydb",
pool_size=10,
pool_timeout=30,
pool_recycle=3600, # Recycle connections after 1 hour
echo=False, # Log SQL queries
pool_pre_ping=True # Test connections before use
)
4.2 Query Execution¶
Select Query:
with db.transaction() as conn:
result = conn.execute(
"SELECT id, username, email FROM users WHERE active = :active",
{"active": True}
)
# Fetch one
user = result.fetchone()
if user:
print(f"{user['id']}: {user['username']}")
# Fetch all
users = result.fetchall()
for user in users:
print(f"{user['id']}: {user['username']}")
# Fetch many
batch = result.fetchmany(size=100)
Insert Query:
with db.transaction() as conn:
result = conn.execute(
"""
INSERT INTO users (username, email, created_at)
VALUES (:username, :email, :created_at)
RETURNING id
""",
{
"username": "john_doe",
"email": "john@example.com",
"created_at": datetime.utcnow()
}
)
new_id = result.fetchone()["id"]
print(f"Created user with ID: {new_id}")
Update Query:
with db.transaction() as conn:
result = conn.execute(
"""
UPDATE users
SET last_login = :now
WHERE id = :user_id
""",
{
"now": datetime.utcnow(),
"user_id": 123
}
)
rows_affected = result.rowcount
print(f"Updated {rows_affected} rows")
Delete Query:
with db.transaction() as conn:
result = conn.execute(
"DELETE FROM sessions WHERE expires_at < :now",
{"now": datetime.utcnow()}
)
deleted_count = result.rowcount
print(f"Deleted {deleted_count} expired sessions")
4.3 ORM Support¶
from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, Integer
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(String(50))
email: Mapped[str] = mapped_column(String(255))
# Get ORM session
with db.get_session() as session:
# Query with ORM
users = session.query(User).filter(User.username.like("john%")).all()
# Create new record
new_user = User(username="jane_doe", email="jane@example.com")
session.add(new_user)
session.commit()
# Update
user = session.get(User, 1)
if user:
user.email = "newemail@example.com"
session.commit()
5. Backend PostgreSQL¶
5.1 Initialization¶
from structum_lab.plugins.database import PostgresDatabase
# From config
db = PostgresDatabase.from_config()
# Manual
db = PostgresDatabase(
url="postgresql://user:pass@localhost/mydb",
pool_size=10,
min_size=2, # Minimum connections to maintain
max_size=20, # Maximum connections
timeout=5.0
)
5.2 Query Execution¶
with db.transaction() as conn:
# Execute with parameters
result = conn.execute(
"SELECT * FROM users WHERE id = %s",
(user_id,) # Note: tuple for parameters
)
# Fetch results
row = result.fetchone()
if row:
print(f"User: {row['username']}")
5.3 Async Support¶
import asyncio
from structum_lab.plugins.database import AsyncPostgresDatabase
async def main():
db = AsyncPostgresDatabase.from_config()
async with db.transaction() as conn:
result = await conn.execute(
"SELECT * FROM users WHERE id = $1",
[user_id]
)
user = await result.fetchone()
print(f"User: {user['username']}")
await db.shutdown()
asyncio.run(main())
6. Connection Pooling¶
6.1 Pool Configuration¶
db = SQLAlchemyDatabase(
url="postgresql://...",
pool_size=10, # Normal connections
max_overflow=20, # Extra connections under load
pool_timeout=30, # Wait time for connection (seconds)
pool_recycle=3600, # Recycle after 1 hour
pool_pre_ping=True # Test before use
)
6.2 Pool Monitoring¶
# Get pool status
info = db.get_pool_info()
print(f"Checked out: {info['checked_out']}")
print(f"Available: {info['available']}")
print(f"Total: {info['total']}")
print(f"Overflow: {info['overflow']}")
# Log pool stats
from structum_lab.plugins.observability import get_logger
logger = get_logger(__name__)
logger.info("Database pool status", **info)
6.3 Pool Exhaustion Handling¶
from sqlalchemy.exc import TimeoutError
try:
with db.transaction() as conn:
# Query execution
pass
except TimeoutError:
logger.error("Database pool exhausted - increase pool_size or check for connection leaks")
# Alert monitoring
metrics.increment("database.pool_exhaustion")
7. Transactions¶
7.1 Basic Transaction¶
with db.transaction() as conn:
# All queries in this block are part of single transaction
conn.execute("INSERT INTO users (...) VALUES (...)")
conn.execute("INSERT INTO profiles (...) VALUES (...)")
# Automatic COMMIT on success
# Automatic ROLLBACK on exception
7.2 Explicit Commit/Rollback¶
conn = db.get_connection()
try:
conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
conn.commit()
except Exception as e:
conn.rollback()
logger.error(f"Transaction failed: {e}")
raise
finally:
conn.close()
7.3 Savepoints¶
with db.transaction() as conn:
# Main transaction
conn.execute("INSERT INTO orders (...) VALUES (...)")
# Savepoint for partial rollback
savepoint = conn.execute("SAVEPOINT sp1")
try:
conn.execute("INSERT INTO order_items (...) VALUES (...)")
except Exception:
# Rollback only to savepoint
conn.execute("ROLLBACK TO SAVEPOINT sp1")
logger.warning("Order items insert failed, continuing with order")
# Main transaction still commits
7.4 Nested Transactions¶
def create_user_with_profile(username: str, email: str):
"""Transactional function."""
with db.transaction() as conn:
# Insert user
result = conn.execute(
"INSERT INTO users (username, email) VALUES (:u, :e) RETURNING id",
{"u": username, "e": email}
)
user_id = result.fetchone()["id"]
# Insert profile (nested transaction)
create_profile(conn, user_id)
return user_id
def create_profile(conn, user_id: int):
"""Nested transactional function."""
conn.execute(
"INSERT INTO profiles (user_id, bio) VALUES (:id, :bio)",
{"id": user_id, "bio": "New user"}
)
8. Health Checks¶
8.1 Basic Health Check¶
from structum_lab.plugins.database import HealthStatus
# Perform health check
result = db.health_check()
print(f"Status: {result.status}") # HealthStatus.HEALTHY
print(f"Message: {result.message}") # "Database connection OK"
print(f"Latency: {result.latency_ms}ms") # 2.5
8.2 Health Check Endpoint¶
from fastapi import FastAPI
app = FastAPI()
@app.get("/health")
async def health():
"""Health check endpoint."""
db_health = db.health_check()
return {
"status": "healthy" if db_health.status == HealthStatus.HEALTHY else "unhealthy",
"database": {
"status": db_health.status.value,
"latency_ms": db_health.latency_ms,
"message": db_health.message
}
}
8.3 Detailed Health Monitoring¶
def monitor_database_health():
"""Periodic health monitoring."""
result = db.health_check()
# Log metrics
metrics.gauge("database.latency_ms", result.latency_ms)
if result.status != HealthStatus.HEALTHY:
logger.error(
"Database unhealthy",
status=result.status.value,
message=result.message,
latency_ms=result.latency_ms
)
# Alert
alert_service.send_alert(
"Database Health Check Failed",
f"Status: {result.status.value}\nLatency: {result.latency_ms}ms"
)
else:
logger.debug(f"Database healthy - latency: {result.latency_ms}ms")
9. Configuration¶
9.1 Complete Configuration Example¶
File: config/app/database.toml
[default]
url = "postgresql://localhost:5432/dev_db"
pool_size = 5
pool_timeout = 30
pool_recycle = 3600
echo = true # Log SQL in development
[default.options]
pool_pre_ping = true
connect_args = { "connect_timeout": 10 }
[production]
url = "postgresql://prod.db.internal:5432/prod_db"
pool_size = 20
pool_timeout = 60
pool_recycle = 1800
echo = false
[production.options]
pool_pre_ping = true
pool_use_lifo = true # LIFO for better connection reuse
File: config/.secrets.toml
[database]
# Production credentials
url = "postgresql://app_user:STRONG_PASSWORD@prod.db.example.com:5432/production"
9.2 Environment Variable Override¶
# Override database URL
export STRUCTUM_DATABASE__URL="postgresql://localhost/test_db"
# Override pool size
export STRUCTUM_DATABASE__POOL_SIZE=20
# Multiple databases
export STRUCTUM_PRIMARY_DB__URL="postgresql://..."
export STRUCTUM_REPLICA_DB__URL="postgresql://..."
9.3 Multiple Database Connections¶
# Primary database
primary_db = SQLAlchemyDatabase.from_config(namespace="primary_db")
# Read replica
replica_db = SQLAlchemyDatabase.from_config(namespace="replica_db")
# Writes to primary
with primary_db.transaction() as conn:
conn.execute("INSERT INTO users (...) VALUES (...)")
# Reads from replica
with replica_db.transaction() as conn:
result = conn.execute("SELECT * FROM users WHERE id = :id", {"id": 1})
user = result.fetchone()
10. FastAPI Integration¶
10.1 Dependency Injection¶
from fastapi import FastAPI, Depends
from structum_lab.plugins.database import get_database, DatabaseInterface
app = FastAPI()
# Database initialization
@app.on_event("startup")
async def startup():
db = SQLAlchemyDatabase.from_config()
set_database(db)
@app.on_event("shutdown")
async def shutdown():
db = get_database()
db.shutdown()
# Dependency
def get_db() -> DatabaseInterface:
return get_database()
# Route with injection
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: DatabaseInterface = Depends(get_db)):
with db.transaction() as conn:
result = conn.execute(
"SELECT * FROM users WHERE id = :id",
{"id": user_id}
)
user = result.fetchone()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return {"id": user["id"], "username": user["username"]}
10.2 Transaction Middleware¶
from starlette.middleware.base import BaseHTTPMiddleware
class TransactionMiddleware(BaseHTTPMiddleware):
"""Automatic transaction per request."""
async def dispatch(self, request, call_next):
db = get_database()
with db.transaction() as conn:
# Attach connection to request state
request.state.db_conn = conn
try:
response = await call_next(request)
# Transaction auto-commits on success
return response
except Exception:
# Transaction auto-rollbacks on error
raise
app.add_middleware(TransactionMiddleware)
# Routes can access connection
@app.post("/users")
async def create_user(user: UserCreate, request: Request):
conn = request.state.db_conn
result = conn.execute(
"INSERT INTO users (username, email) VALUES (:u, :e) RETURNING id",
{"u": user.username, "e": user.email}
)
new_id = result.fetchone()["id"]
return {"id": new_id}
11. Testing¶
11.1 Test Database Setup¶
import pytest
from structum_lab.plugins.database import SQLAlchemyDatabase
@pytest.fixture(scope="session")
def test_db():
"""Create test database."""
db = SQLAlchemyDatabase(
url="postgresql://localhost/test_db",
pool_size=2
)
# Run migrations
with db.transaction() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL
)
""")
yield db
# Cleanup
with db.transaction() as conn:
conn.execute("DROP TABLE IF EXISTS users")
db.shutdown()
@pytest.fixture
def clean_db(test_db):
"""Clean database before each test."""
with test_db.transaction() as conn:
conn.execute("TRUNCATE users RESTART IDENTITY CASCADE")
yield test_db
11.2 Test Queries¶
def test_create_user(clean_db):
"""Test user creation."""
with clean_db.transaction() as conn:
result = conn.execute(
"INSERT INTO users (username, email) VALUES (:u, :e) RETURNING id",
{"u": "testuser", "e": "test@example.com"}
)
user_id = result.fetchone()["id"]
assert user_id > 0
def test_query_user(clean_db):
"""Test user query."""
# Insert test data
with clean_db.transaction() as conn:
conn.execute(
"INSERT INTO users (username, email) VALUES (:u, :e)",
{"u": "testuser", "e": "test@example.com"}
)
# Query
with clean_db.transaction() as conn:
result = conn.execute(
"SELECT * FROM users WHERE username = :u",
{"u": "testuser"}
)
user = result.fetchone()
assert user["username"] == "testuser"
assert user["email"] == "test@example.com"
11.3 Mock Database¶
class MockDatabase(DatabaseInterface):
"""Mock database for testing."""
def __init__(self):
self.data = {}
self.queries = []
def transaction(self):
return MockConnection(self)
def health_check(self):
return HealthResult(
status=HealthStatus.HEALTHY,
message="Mock OK",
latency_ms=0.1
)
@pytest.fixture
def mock_db():
return MockDatabase()
def test_with_mock(mock_db):
"""Test using mock database."""
with mock_db.transaction() as conn:
conn.execute("SELECT * FROM users")
assert len(mock_db.queries) == 1
12. Production Best Practices¶
12.1 Connection String Security¶
❌ Never:
# Hard-coded credentials
url = "postgresql://admin:password123@db.example.com/prod"
✅ Always:
# config/.secrets.toml (git-ignored)
[database]
url = "postgresql://app_user:SECURE_PASS@db.internal:5432/prod"
12.2 Pool Sizing¶
# Rule of thumb: pool_size = (num_workers * 2) + spare
#
# Example: 4 workers
# pool_size = (4 * 2) + 2 = 10
# max_overflow = pool_size * 0.5 = 5
db = SQLAlchemyDatabase(
url=db_url,
pool_size=10,
max_overflow=5,
pool_timeout=30
)
12.3 Query Timeouts¶
with db.transaction() as conn:
# Set statement timeout
conn.execute("SET statement_timeout = '5000'") # 5 seconds
# Long query with timeout protection
result = conn.execute("SELECT * FROM large_table WHERE ...")
12.4 Connection Leak Detection¶
import atexit
def check_pool_on_exit():
"""Detect connection leaks at shutdown."""
info = db.get_pool_info()
if info['checked_out'] > 0:
logger.warning(
f"Connection leak detected: {info['checked_out']} connections not returned"
)
atexit.register(check_pool_on_exit)
13. API Reference¶
13.1 SQLAlchemyDatabase¶
class SQLAlchemyDatabase(DatabaseInterface):
"""SQLAlchemy-based database backend."""
def __init__(
self,
url: str,
pool_size: int = 5,
max_overflow: int = 10,
pool_timeout: int = 30,
pool_recycle: int = 3600,
echo: bool = False,
pool_pre_ping: bool = True
):
...
@classmethod
def from_config(cls, namespace: str = "database") -> "SQLAlchemyDatabase":
"""Create from Structum config."""
...
def transaction(self) -> TransactionContext:
"""Get transaction context manager."""
...
def get_session(self) -> Session:
"""Get ORM session."""
...
def health_check(self) -> HealthResult:
"""Perform database health check."""
...
def shutdown(self) -> None:
"""Close all connections."""
...
13.2 PostgresDatabase¶
class PostgresDatabase(DatabaseInterface):
"""psycopg3-based PostgreSQL backend."""
def __init__(
self,
url: str,
pool_size: int = 10,
min_size: int = 2,
max_size: int = 20,
timeout: float = 5.0
):
...
@classmethod
def from_config(cls, namespace: str = "database") -> "PostgresDatabase":
...
def transaction(self) -> TransactionContext:
...
def health_check(self) -> HealthResult:
...
def shutdown(self) -> None:
...
See Also¶
Database Module (Core) - Core interfaces
Dynaconf Plugin - Configuration provider
SQLAlchemy Documentation - ORM reference