Structum Database (structum-database)

Documentation Source Code Python 3.11+ License: Apache-2.0

Structum Database offre un layer di astrazione robusto per SQLAlchemy (Sync/Async) e PostgreSQL (psycopg3).

Feature

Stato

Versione

Stato

Alpha

0.1.0

Namespace

structum_lab.plugins.database

Backend

SQLAlchemy, PostgreSQL


Indice

  1. Cos’è Database Plugin

  2. Concetti Fondamentali

  3. Quick Start (5 Minuti)

  4. Backend SQLAlchemy

  5. Backend PostgreSQL

  6. Connection Pooling

  7. Transactions

  8. Health Checks

  9. Configuration

  10. FastAPI Integration

  11. Testing

  12. Production Best Practices

  13. API Reference


1. Cos’è Database Plugin

structum-database fornisce database connectivity production-ready con connection pooling, health monitoring, e transaction management.

1.1 Problema Risolto

Prima (senza plugin):

# ❌ Connection management manuale
import psycopg2

conn = psycopg2.connect("postgresql://...")
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
# Nessun pooling
# Nessun health check
# Transaction management manuale

Dopo (con plugin):

# ✅ Connection pooling automatico
# ✅ Health checks integrati
# ✅ Transaction context manager
# ✅ Type-safe queries

db = SQLAlchemyDatabase.from_config()

with db.transaction() as conn:
    conn.execute("SELECT * FROM users WHERE id = :id", {"id": 1})
    user = conn.fetchone()

1.2 Caratteristiche Principali

Feature

Descrizione

Multiple Backends

SQLAlchemy (universal) o psycopg3 (PostgreSQL-only)

Connection Pooling

Automatic pool management con pre-ping

Transaction Management

Context managers per ACID guarantees

Health Checks

Built-in health monitoring con latency metrics

Type Safety

Full type hints per mypy strict

Config Integration

Setup via Dynaconf con secrets isolation


2. Concetti Fondamentali

2.1 Backend Comparison

Feature

SQLAlchemyDatabase

PostgresDatabase

Databases Supported

PostgreSQL, MySQL, SQLite, Oracle, MSSQL

PostgreSQL only

Performance

Good

Excellent (native driver)

ORM Support

✅ Full SQLAlchemy ORM

❌ Raw SQL only

Async Support

via asyncio extension

✅ Native

Pool Management

SQLAlchemy Engine

psycopg3 Pool

Dependencies

sqlalchemy>=2.0

psycopg[binary]>=3.1

Recommendation:

  • Use SQLAlchemyDatabase per progetti multi-database o con ORM

  • Use PostgresDatabase per maximum performance PostgreSQL-only

2.2 Connection Lifecycle

┌─────────────────────────────────────────┐
│ Application Startup                     │
├─────────────────────────────────────────┤
│ db = SQLAlchemyDatabase.from_config()  │
│   └─ Create connection pool             │
│      (min_size=2, max_size=10)          │
└─────────────────┬───────────────────────┘
                  │
┌─────────────────▼───────────────────────┐
│ Request Handling                        │
├─────────────────────────────────────────┤
│ with db.transaction() as conn:         │
│   └─ Acquire connection from pool      │
│      Execute query                      │
│      Commit on success / Rollback       │
│      Release connection to pool         │
└─────────────────┬───────────────────────┘
                  │
┌─────────────────▼───────────────────────┐
│ Application Shutdown                    │
├─────────────────────────────────────────┤
│ db.shutdown()                           │
│   └─ Close all pool connections        │
└─────────────────────────────────────────┘

3. Quick Start (5 Minuti)

Step 1: Installazione

# SQLAlchemy backend
pip install -e packages/database[sqlalchemy]

# PostgreSQL backend
pip install -e packages/database[postgres]

# Both
pip install -e packages/database[all]

Step 2: Configurazione

File: config/app/database.toml

[default]
url = "postgresql://user:password@localhost:5432/mydb"
pool_size = 10
pool_timeout = 30
echo = false  # Set true to log SQL queries

File: config/.secrets.toml

[database]
url = "postgresql://user:YOUR_PASSWORD@prod.db.example.com:5432/production_db"

Step 3: Basic Usage

from structum_lab.plugins.database import SQLAlchemyDatabase
from structum_lab.config import set_config_provider
from structum_lab.plugins.dynaconf import DynaconfConfigProvider

# Setup config
provider = DynaconfConfigProvider(root_path=".")
provider.auto_discover()
set_config_provider(provider)

# Create database connection
db = SQLAlchemyDatabase.from_config()

# Execute query in transaction
with db.transaction() as conn:
    result = conn.execute(
        "SELECT * FROM users WHERE id = :user_id",
        {"user_id": 1}
    )
    user = result.fetchone()
    
    if user:
        print(f"Found user: {user['username']}")

# Shutdown (cleanup)
db.shutdown()

4. Backend SQLAlchemy

4.1 Initialization

from structum_lab.plugins.database import SQLAlchemyDatabase

# From config
db = SQLAlchemyDatabase.from_config()

# Manual initialization
db = SQLAlchemyDatabase(
    url="postgresql://user:pass@localhost/mydb",
    pool_size=10,
    pool_timeout=30,
    pool_recycle=3600,  # Recycle connections after 1 hour
    echo=False,  # Log SQL queries
    pool_pre_ping=True  # Test connections before use
)

4.2 Query Execution

Select Query:

with db.transaction() as conn:
    result = conn.execute(
        "SELECT id, username, email FROM users WHERE active = :active",
        {"active": True}
    )
    
    # Fetch one
    user = result.fetchone()
    if user:
        print(f"{user['id']}: {user['username']}")
    
    # Fetch all
    users = result.fetchall()
    for user in users:
        print(f"{user['id']}: {user['username']}")
    
    # Fetch many
    batch = result.fetchmany(size=100)

Insert Query:

with db.transaction() as conn:
    result = conn.execute(
        """
        INSERT INTO users (username, email, created_at)
        VALUES (:username, :email, :created_at)
        RETURNING id
        """,
        {
            "username": "john_doe",
            "email": "john@example.com",
            "created_at": datetime.utcnow()
        }
    )
    
    new_id = result.fetchone()["id"]
    print(f"Created user with ID: {new_id}")

Update Query:

with db.transaction() as conn:
    result = conn.execute(
        """
        UPDATE users 
        SET last_login = :now
        WHERE id = :user_id
        """,
        {
            "now": datetime.utcnow(),
            "user_id": 123
        }
    )
    
    rows_affected = result.rowcount
    print(f"Updated {rows_affected} rows")

Delete Query:

with db.transaction() as conn:
    result = conn.execute(
        "DELETE FROM sessions WHERE expires_at < :now",
        {"now": datetime.utcnow()}
    )
    
    deleted_count = result.rowcount
    print(f"Deleted {deleted_count} expired sessions")

4.3 ORM Support

from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, Integer

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    username: Mapped[str] = mapped_column(String(50))
    email: Mapped[str] = mapped_column(String(255))

# Get ORM session
with db.get_session() as session:
    # Query with ORM
    users = session.query(User).filter(User.username.like("john%")).all()
    
    # Create new record
    new_user = User(username="jane_doe", email="jane@example.com")
    session.add(new_user)
    session.commit()
    
    # Update
    user = session.get(User, 1)
    if user:
        user.email = "newemail@example.com"
        session.commit()

5. Backend PostgreSQL

5.1 Initialization

from structum_lab.plugins.database import PostgresDatabase

# From config
db = PostgresDatabase.from_config()

# Manual
db = PostgresDatabase(
    url="postgresql://user:pass@localhost/mydb",
    pool_size=10,
    min_size=2,  # Minimum connections to maintain
    max_size=20,  # Maximum connections
    timeout=5.0
)

5.2 Query Execution

with db.transaction() as conn:
    # Execute with parameters
    result = conn.execute(
        "SELECT * FROM users WHERE id = %s",
        (user_id,)  # Note: tuple for parameters
    )
    
    # Fetch results
    row = result.fetchone()
    if row:
        print(f"User: {row['username']}")

5.3 Async Support

import asyncio
from structum_lab.plugins.database import AsyncPostgresDatabase

async def main():
    db = AsyncPostgresDatabase.from_config()
    
    async with db.transaction() as conn:
        result = await conn.execute(
            "SELECT * FROM users WHERE id = $1",
            [user_id]
        )
        
        user = await result.fetchone()
        print(f"User: {user['username']}")
    
    await db.shutdown()

asyncio.run(main())

6. Connection Pooling

6.1 Pool Configuration

db = SQLAlchemyDatabase(
    url="postgresql://...",
    pool_size=10,          # Normal connections
    max_overflow=20,       # Extra connections under load
    pool_timeout=30,       # Wait time for connection (seconds)
    pool_recycle=3600,     # Recycle after 1 hour
    pool_pre_ping=True     # Test before use
)

6.2 Pool Monitoring

# Get pool status
info = db.get_pool_info()

print(f"Checked out: {info['checked_out']}")
print(f"Available: {info['available']}")
print(f"Total: {info['total']}")
print(f"Overflow: {info['overflow']}")

# Log pool stats
from structum_lab.plugins.observability import get_logger

logger = get_logger(__name__)
logger.info("Database pool status", **info)

6.3 Pool Exhaustion Handling

from sqlalchemy.exc import TimeoutError

try:
    with db.transaction() as conn:
        # Query execution
        pass
except TimeoutError:
    logger.error("Database pool exhausted - increase pool_size or check for connection leaks")
    # Alert monitoring
    metrics.increment("database.pool_exhaustion")

7. Transactions

7.1 Basic Transaction

with db.transaction() as conn:
    # All queries in this block are part of single transaction
    conn.execute("INSERT INTO users (...) VALUES (...)")
    conn.execute("INSERT INTO profiles (...) VALUES (...)")
    
    # Automatic COMMIT on success
    # Automatic ROLLBACK on exception

7.2 Explicit Commit/Rollback

conn = db.get_connection()
try:
    conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
    
    conn.commit()
except Exception as e:
    conn.rollback()
    logger.error(f"Transaction failed: {e}")
    raise
finally:
    conn.close()

7.3 Savepoints

with db.transaction() as conn:
    # Main transaction
    conn.execute("INSERT INTO orders (...) VALUES (...)")
    
    # Savepoint for partial rollback
    savepoint = conn.execute("SAVEPOINT sp1")
    
    try:
        conn.execute("INSERT INTO order_items (...) VALUES (...)")
    except Exception:
        # Rollback only to savepoint
        conn.execute("ROLLBACK TO SAVEPOINT sp1")
        logger.warning("Order items insert failed, continuing with order")
    
    # Main transaction still commits

7.4 Nested Transactions

def create_user_with_profile(username: str, email: str):
    """Transactional function."""
    with db.transaction() as conn:
        # Insert user
        result = conn.execute(
            "INSERT INTO users (username, email) VALUES (:u, :e) RETURNING id",
            {"u": username, "e": email}
        )
        user_id = result.fetchone()["id"]
        
        # Insert profile (nested transaction)
        create_profile(conn, user_id)
        
        return user_id

def create_profile(conn, user_id: int):
    """Nested transactional function."""
    conn.execute(
        "INSERT INTO profiles (user_id, bio) VALUES (:id, :bio)",
        {"id": user_id, "bio": "New user"}
    )

8. Health Checks

8.1 Basic Health Check

from structum_lab.plugins.database import HealthStatus

# Perform health check
result = db.health_check()

print(f"Status: {result.status}")           # HealthStatus.HEALTHY
print(f"Message: {result.message}")         # "Database connection OK"
print(f"Latency: {result.latency_ms}ms")   # 2.5

8.2 Health Check Endpoint

from fastapi import FastAPI

app = FastAPI()

@app.get("/health")
async def health():
    """Health check endpoint."""
    db_health = db.health_check()
    
    return {
        "status": "healthy" if db_health.status == HealthStatus.HEALTHY else "unhealthy",
        "database": {
            "status": db_health.status.value,
            "latency_ms": db_health.latency_ms,
            "message": db_health.message
        }
    }

8.3 Detailed Health Monitoring

def monitor_database_health():
    """Periodic health monitoring."""
    result = db.health_check()
    
    # Log metrics
    metrics.gauge("database.latency_ms", result.latency_ms)
    
    if result.status != HealthStatus.HEALTHY:
        logger.error(
            "Database unhealthy",
            status=result.status.value,
            message=result.message,
            latency_ms=result.latency_ms
        )
        
        # Alert
        alert_service.send_alert(
            "Database Health Check Failed",
            f"Status: {result.status.value}\nLatency: {result.latency_ms}ms"
        )
    else:
        logger.debug(f"Database healthy - latency: {result.latency_ms}ms")

9. Configuration

9.1 Complete Configuration Example

File: config/app/database.toml

[default]
url = "postgresql://localhost:5432/dev_db"
pool_size = 5
pool_timeout = 30
pool_recycle = 3600
echo = true  # Log SQL in development

[default.options]
pool_pre_ping = true
connect_args = { "connect_timeout": 10 }

[production]
url = "postgresql://prod.db.internal:5432/prod_db"
pool_size = 20
pool_timeout = 60
pool_recycle = 1800
echo = false

[production.options]
pool_pre_ping = true
pool_use_lifo = true  # LIFO for better connection reuse

File: config/.secrets.toml

[database]
# Production credentials
url = "postgresql://app_user:STRONG_PASSWORD@prod.db.example.com:5432/production"

9.2 Environment Variable Override

# Override database URL
export STRUCTUM_DATABASE__URL="postgresql://localhost/test_db"

# Override pool size
export STRUCTUM_DATABASE__POOL_SIZE=20

# Multiple databases
export STRUCTUM_PRIMARY_DB__URL="postgresql://..."
export STRUCTUM_REPLICA_DB__URL="postgresql://..."

9.3 Multiple Database Connections

# Primary database
primary_db = SQLAlchemyDatabase.from_config(namespace="primary_db")

# Read replica
replica_db = SQLAlchemyDatabase.from_config(namespace="replica_db")

# Writes to primary
with primary_db.transaction() as conn:
    conn.execute("INSERT INTO users (...) VALUES (...)")

# Reads from replica
with replica_db.transaction() as conn:
    result = conn.execute("SELECT * FROM users WHERE id = :id", {"id": 1})
    user = result.fetchone()

10. FastAPI Integration

10.1 Dependency Injection

from fastapi import FastAPI, Depends
from structum_lab.plugins.database import get_database, DatabaseInterface

app = FastAPI()

# Database initialization
@app.on_event("startup")
async def startup():
    db = SQLAlchemyDatabase.from_config()
    set_database(db)

@app.on_event("shutdown")
async def shutdown():
    db = get_database()
    db.shutdown()

# Dependency
def get_db() -> DatabaseInterface:
    return get_database()

# Route with injection
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: DatabaseInterface = Depends(get_db)):
    with db.transaction() as conn:
        result = conn.execute(
            "SELECT * FROM users WHERE id = :id",
            {"id": user_id}
        )
        user = result.fetchone()
        
        if not user:
            raise HTTPException(status_code=404, detail="User not found")
        
        return {"id": user["id"], "username": user["username"]}

10.2 Transaction Middleware

from starlette.middleware.base import BaseHTTPMiddleware

class TransactionMiddleware(BaseHTTPMiddleware):
    """Automatic transaction per request."""
    
    async def dispatch(self, request, call_next):
        db = get_database()
        
        with db.transaction() as conn:
            # Attach connection to request state
            request.state.db_conn = conn
            
            try:
                response = await call_next(request)
                # Transaction auto-commits on success
                return response
            except Exception:
                # Transaction auto-rollbacks on error
                raise

app.add_middleware(TransactionMiddleware)

# Routes can access connection
@app.post("/users")
async def create_user(user: UserCreate, request: Request):
    conn = request.state.db_conn
    
    result = conn.execute(
        "INSERT INTO users (username, email) VALUES (:u, :e) RETURNING id",
        {"u": user.username, "e": user.email}
    )
    
    new_id = result.fetchone()["id"]
    return {"id": new_id}

11. Testing

11.1 Test Database Setup

import pytest
from structum_lab.plugins.database import SQLAlchemyDatabase

@pytest.fixture(scope="session")
def test_db():
    """Create test database."""
    db = SQLAlchemyDatabase(
        url="postgresql://localhost/test_db",
        pool_size=2
    )
    
    # Run migrations
    with db.transaction() as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id SERIAL PRIMARY KEY,
                username VARCHAR(50) UNIQUE NOT NULL,
                email VARCHAR(255) UNIQUE NOT NULL
            )
        """)
    
    yield db
    
    # Cleanup
    with db.transaction() as conn:
        conn.execute("DROP TABLE IF EXISTS users")
    
    db.shutdown()

@pytest.fixture
def clean_db(test_db):
    """Clean database before each test."""
    with test_db.transaction() as conn:
        conn.execute("TRUNCATE users RESTART IDENTITY CASCADE")
    
    yield test_db

11.2 Test Queries

def test_create_user(clean_db):
    """Test user creation."""
    with clean_db.transaction() as conn:
        result = conn.execute(
            "INSERT INTO users (username, email) VALUES (:u, :e) RETURNING id",
            {"u": "testuser", "e": "test@example.com"}
        )
        
        user_id = result.fetchone()["id"]
        assert user_id > 0

def test_query_user(clean_db):
    """Test user query."""
    # Insert test data
    with clean_db.transaction() as conn:
        conn.execute(
            "INSERT INTO users (username, email) VALUES (:u, :e)",
            {"u": "testuser", "e": "test@example.com"}
        )
    
    # Query
    with clean_db.transaction() as conn:
        result = conn.execute(
            "SELECT * FROM users WHERE username = :u",
            {"u": "testuser"}
        )
        
        user = result.fetchone()
        assert user["username"] == "testuser"
        assert user["email"] == "test@example.com"

11.3 Mock Database

class MockDatabase(DatabaseInterface):
    """Mock database for testing."""
    
    def __init__(self):
        self.data = {}
        self.queries = []
    
    def transaction(self):
        return MockConnection(self)
    
    def health_check(self):
        return HealthResult(
            status=HealthStatus.HEALTHY,
            message="Mock OK",
            latency_ms=0.1
        )

@pytest.fixture
def mock_db():
    return MockDatabase()

def test_with_mock(mock_db):
    """Test using mock database."""
    with mock_db.transaction() as conn:
        conn.execute("SELECT * FROM users")
    
    assert len(mock_db.queries) == 1

12. Production Best Practices

12.1 Connection String Security

❌ Never:

# Hard-coded credentials
url = "postgresql://admin:password123@db.example.com/prod"

✅ Always:

# config/.secrets.toml (git-ignored)
[database]
url = "postgresql://app_user:SECURE_PASS@db.internal:5432/prod"

12.2 Pool Sizing

# Rule of thumb: pool_size = (num_workers * 2) + spare
#
# Example: 4 workers
# pool_size = (4 * 2) + 2 = 10
# max_overflow = pool_size * 0.5 = 5

db = SQLAlchemyDatabase(
    url=db_url,
    pool_size=10,
    max_overflow=5,
    pool_timeout=30
)

12.3 Query Timeouts

with db.transaction() as conn:
    # Set statement timeout
    conn.execute("SET statement_timeout = '5000'")  # 5 seconds
    
    # Long query with timeout protection
    result = conn.execute("SELECT * FROM large_table WHERE ...")

12.4 Connection Leak Detection

import atexit

def check_pool_on_exit():
    """Detect connection leaks at shutdown."""
    info = db.get_pool_info()
    
    if info['checked_out'] > 0:
        logger.warning(
            f"Connection leak detected: {info['checked_out']} connections not returned"
        )

atexit.register(check_pool_on_exit)

13. API Reference

13.1 SQLAlchemyDatabase

class SQLAlchemyDatabase(DatabaseInterface):
    """SQLAlchemy-based database backend."""
    
    def __init__(
        self,
        url: str,
        pool_size: int = 5,
        max_overflow: int = 10,
        pool_timeout: int = 30,
        pool_recycle: int = 3600,
        echo: bool = False,
        pool_pre_ping: bool = True
    ):
        ...
    
    @classmethod
    def from_config(cls, namespace: str = "database") -> "SQLAlchemyDatabase":
        """Create from Structum config."""
        ...
    
    def transaction(self) -> TransactionContext:
        """Get transaction context manager."""
        ...
    
    def get_session(self) -> Session:
        """Get ORM session."""
        ...
    
    def health_check(self) -> HealthResult:
        """Perform database health check."""
        ...
    
    def shutdown(self) -> None:
        """Close all connections."""
        ...

13.2 PostgresDatabase

class PostgresDatabase(DatabaseInterface):
    """psycopg3-based PostgreSQL backend."""
    
    def __init__(
        self,
        url: str,
        pool_size: int = 10,
        min_size: int = 2,
        max_size: int = 20,
        timeout: float = 5.0
    ):
        ...
    
    @classmethod
    def from_config(cls, namespace: str = "database") -> "PostgresDatabase":
        ...
    
    def transaction(self) -> TransactionContext:
        ...
    
    def health_check(self) -> HealthResult:
        ...
    
    def shutdown(self) -> None:
        ...

See Also