Modulo Database (structum_lab.database)

Documentation Source Code License: Apache-2.0

Protocolli e Interfacce Core per l’astrazione dell’accesso ai dati (Repository/UoW).

Feature

Dettagli

Namespace

structum_lab.database

Ruolo

Definizione Interfacce (Protocol)


Indice

  1. Cos’è il Modulo Database

  2. Architettura Protocol-Based

  3. DatabaseInterface Protocol

  4. ConnectionInterface Protocol

  5. TransactionContext Protocol

  6. Health Check System

  7. Implementazioni Available

  8. Usage Patterns

  9. API Reference


1. Cos’è il Modulo Database

Il modulo structum_lab.database definisce le Protocol interfaces per database operations. Non contiene implementazioni - solo contratti per connection management, transactions, e health monitoring.

1.1 Separation of Concerns

┌───────────────────────────────────────────┐
│ Application Layer                         │
│ - Uses: get_database(), transaction()     │
│ - Executes: SQL queries                   │
└─────────────────┬─────────────────────────┘
                  │
┌─────────────────▼─────────────────────────┐
│ Core Module (THIS)                        │
│ - Defines: Protocols (interfaces)         │
│ - Zero Dependencies                        │
│ - Type Contracts Only                      │
└─────────────────┬─────────────────────────┘
                  │
┌─────────────────▼─────────────────────────┐
│ Plugin Layer                              │
│ - Provides: SQLAlchemyDatabase            │
│ - Implements: DatabaseInterface           │
│ - Dependencies: sqlalchemy, psycopg       │
└───────────────────────────────────────────┘

Benefits:

  • Testability: Mock database without SQL dependencies

  • Flexibility: Switch databases (SQLAlchemy → psycopg3 → custom)

  • Zero Lock-in: Core stays database-agnostic


2. Architettura Protocol-Based

2.1 Protocol vs Abstract Base Class

Structum usa typing.Protocol per structural subtyping:

✅ Protocol Advantage:

from typing import Protocol

class DatabaseInterface(Protocol):
    def transaction(self) -> TransactionContext:
        ...

# NO inheritance required
class MyDatabase:  # Automatically compatible!
    def transaction(self) -> TransactionContext:
        # Implementation
        return MyTransactionContext()

# Type checker verifies compatibility
db: DatabaseInterface = MyDatabase()  # ✓ Works!

2.2 Static Type Checking

from structum_lab.database import DatabaseInterface

def fetch_user(db: DatabaseInterface, user_id: int) -> dict:
    """Type-safe database function."""
    with db.transaction() as conn:
        result = conn.execute(
            "SELECT * FROM users WHERE id = :id",
            {"id": user_id}
        )
        return result.fetchone()

# mypy verifies db has transaction() at compile-time

3. DatabaseInterface Protocol

3.1 Protocol Definition

from typing import Protocol, ContextManager

class DatabaseInterface(Protocol):
    """
    Protocol for database connectivity.
    
    Implementations must provide:
    - Connection management
    - Transaction support
    - Health monitoring
    """
    
    def transaction(self) -> ContextManager[ConnectionInterface]:
        """
        Get transaction context manager.
        
        Usage:
            with db.transaction() as conn:
                conn.execute("SELECT ...")
        
        Returns:
            Context manager that:
            - Acquires connection from pool
            - Commits on success
            - Rollbacks on exception
            - Returns connection to pool
        """
        ...
    
    def get_connection(self) -> ConnectionInterface:
        """
        Get raw connection (manual management).
        
        Warning:
            MUST call connection.close() manually.
            Prefer transaction() context manager.
        """
        ...
    
    def health_check(self) -> HealthResult:
        """
        Perform database health check.
        
        Returns:
            HealthResult with status, message, latency
        """
        ...
    
    def shutdown(self) -> None:
        """
        Gracefully shutdown database connections.
        
        Call at application shutdown to:
        - Close all pool connections
        - Release resources
        """
        ...

3.2 Usage Example

from structum_lab.database import get_database

# Get database instance
db = get_database()

# Execute query in transaction
with db.transaction() as conn:
    # Insert
    result = conn.execute(
        "INSERT INTO users (username, email) VALUES (:u, :e) RETURNING id",
        {"u": "john_doe", "e": "john@example.com"}
    )
    new_id = result.fetchone()["id"]
    
    # Query
    result = conn.execute(
        "SELECT * FROM users WHERE id = :id",
        {"id": new_id}
    )
    user = result.fetchone()
    
    # Auto-commit on success
    # Auto-rollback on exception

# Check health
health = db.health_check()
if health.status == HealthStatus.HEALTHY:
    print(f"Database OK - latency: {health.latency_ms}ms")

4. ConnectionInterface Protocol

4.1 Protocol Definition

from typing import Protocol, Any, Optional

class ConnectionInterface(Protocol):
    """
    Protocol for active database connection.
    
    Represents a single connection that can execute queries.
    """
    
    def execute(self, query: str, params: dict = None) -> ResultProxy:
        """
        Execute SQL query.
        
        Args:
            query: SQL statement (use :param for placeholders)
            params: Dictionary of parameter values
        
        Returns:
            ResultProxy for fetching results
        
        Example:
            result = conn.execute(
                "SELECT * FROM users WHERE id = :id",
                {"id": 123}
            )
            user = result.fetchone()
        """
        ...
    
    def commit(self) -> None:
        """Commit current transaction."""
        ...
    
    def rollback(self) -> None:
        """Rollback current transaction."""
        ...
    
    def close(self) -> None:
        """Close connection and return to pool."""
        ...

4.2 ResultProxy Protocol

class ResultProxy(Protocol):
    """Protocol for query results."""
    
    def fetchone(self) -> Optional[dict]:
        """
        Fetch single row.
        
        Returns:
            Dictionary with column names as keys
            None if no rows
        """
        ...
    
    def fetchall(self) -> list[dict]:
        """
        Fetch all remaining rows.
        
        Returns:
            List of dictionaries
        """
        ...
    
    def fetchmany(self, size: int = None) -> list[dict]:
        """
        Fetch batch of rows.
        
        Args:
            size: Number of rows (default: arraysize)
        
        Returns:
            List of dictionaries
        """
        ...
    
    @property
    def rowcount(self) -> int:
        """Number of rows affected by query."""
        ...

4.3 Usage Examples

Select Query:

with db.transaction() as conn:
    result = conn.execute("SELECT * FROM users WHERE active = :active", {"active": True})
    
    # Single row
    user = result.fetchone()
    if user:
        print(f"User: {user['username']}")
    
    # All rows
    users = result.fetchall()
    for user in users:
        print(user['username'])
    
    # Batch fetch
    batch = result.fetchmany(size=100)

Insert Query:

with db.transaction() as conn:
    result = conn.execute(
        "INSERT INTO users (username, email) VALUES (:u, :e) RETURNING id",
        {"u": "jane", "e": "jane@example.com"}
    )
    new_id = result.fetchone()["id"]
    print(f"Created user with ID: {new_id}")

Update Query:

with db.transaction() as conn:
    result = conn.execute(
        "UPDATE users SET last_login = :now WHERE id = :id",
        {"now": datetime.utcnow(), "id": 123}
    )
    rows_updated = result.rowcount
    print(f"Updated {rows_updated} rows")

5. TransactionContext Protocol

5.1 Protocol Definition

from typing import Protocol, ContextManager

class TransactionContext(Protocol):
    """Context manager for database transactions."""
    
    def __enter__(self) -> ConnectionInterface:
        """
        Acquire connection and start transaction.
        
        Returns:
            Connection for executing queries
        """
        ...
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
        """
        Complete transaction and release connection.
        
        Behavior:
            - If exc_type is None: COMMIT
            - If exception occurred: ROLLBACK
            - Always: Return connection to pool
        """
        ...

5.2 Transaction Behavior

# Success case
with db.transaction() as conn:
    conn.execute("INSERT INTO orders (...) VALUES (...)")
    conn.execute("INSERT INTO order_items (...) VALUES (...)")
    # Automatic COMMIT on clean exit

# Failure case
try:
    with db.transaction() as conn:
        conn.execute("INSERT INTO orders (...) VALUES (...)")
        raise ValueError("Something went wrong!")
        conn.execute("INSERT INTO order_items (...) VALUES (...)")
except ValueError:
    # Automatic ROLLBACK - no partial data
    pass

5.3 Nested Transactions

def create_user_with_profile(username: str, email: str):
    """Transactional function."""
    with db.transaction() as conn:
        # Insert user
        result = conn.execute(
            "INSERT INTO users (username, email) VALUES (:u, :e) RETURNING id",
            {"u": username, "e": email}
        )
        user_id = result.fetchone()["id"]
        
        # Insert profile (same transaction)
        conn.execute(
            "INSERT INTO profiles (user_id, bio) VALUES (:id, :bio)",
            {"id": user_id, "bio": "New user"}
        )
        
        # Both inserts commit atomically
        return user_id

6. Health Check System

6.1 HealthResult Data Structure

from enum import Enum
from dataclasses import dataclass

class HealthStatus(Enum):
    """Health check status enumeration."""
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"

@dataclass
class HealthResult:
    """Database health check result."""
    
    status: HealthStatus      # Overall health status
    message: str              # Human-readable message
    latency_ms: float         # Query latency in milliseconds
    details: dict = None      # Optional detailed info

6.2 Health Check Usage

from structum_lab.database import get_database, HealthStatus

db = get_database()
health = db.health_check()

# Check status
if health.status == HealthStatus.HEALTHY:
    print(f"✓ Database healthy - latency: {health.latency_ms}ms")
elif health.status == HealthStatus.DEGRADED:
    print(f"⚠ Database degraded: {health.message}")
else:
    print(f"✗ Database unhealthy: {health.message}")

# Log metrics
from structum_lab.logging import get_logger
logger = get_logger(__name__)

logger.info(
    "Database health check",
    status=health.status.value,
    latency_ms=health.latency_ms,
    message=health.message
)

6.3 FastAPI Health Endpoint

from fastapi import FastAPI

app = FastAPI()

@app.get("/health")
async def health_check():
    """Application health endpoint."""
    db = get_database()
    db_health = db.health_check()
    
    return {
        "status": "healthy" if db_health.status == HealthStatus.HEALTHY else "unhealthy",
        "database": {
            "status": db_health.status.value,
            "latency_ms": db_health.latency_ms,
            "message": db_health.message
        }
    }

7. Implementazioni Available

7.1 Official Plugins

Plugin

Backend

Databases

Features

structum-database

SQLAlchemy 2.0

PostgreSQL, MySQL, SQLite, Oracle, MSSQL

ORM support, Universal

structum-database

psycopg3

PostgreSQL only

High performance, Async native

Installation:

# SQLAlchemy backend
pip install -e packages/database[sqlalchemy]

# PostgreSQL backend
pip install -e packages/database[postgres]

# Both
pip install -e packages/database[all]

Setup:

from structum_lab.database import set_database
from structum_lab.plugins.database import SQLAlchemyDatabase

# Initialize and register
db = SQLAlchemyDatabase.from_config()
set_database(db)

# Now available globally
from structum_lab.database import get_database
db = get_database()

7.2 Factory Pattern

from structum_lab.plugins.database import SQLAlchemyDatabase

# Method 1: Direct configuration
db = SQLAlchemyDatabase(
    url="postgresql://user:pass@localhost/mydb",
    pool_size=10
)

# Method 2: From config provider (recommended)
db = SQLAlchemyDatabase.from_config()
# Reads: config.get("database.url"), config.get("database.pool_size"), etc.

# Method 3: Custom config namespace
primary_db = SQLAlchemyDatabase.from_config("primary_db")
replica_db = SQLAlchemyDatabase.from_config("replica_db")

8. Usage Patterns

8.1 Global Singleton Pattern

from structum_lab.database import get_database, set_database
from structum_lab.plugins.database import SQLAlchemyDatabase

# Setup (once at application startup)
def setup_database():
    db = SQLAlchemyDatabase.from_config()
    set_database(db)

# Usage (anywhere in application)
def get_user(user_id: int):
    db = get_database()
    
    with db.transaction() as conn:
        result = conn.execute(
            "SELECT * FROM users WHERE id = :id",
            {"id": user_id}
        )
        return result.fetchone()

8.2 Dependency Injection Pattern

from structum_lab.database import DatabaseInterface

class UserRepository:
    """Repository with injected database dependency."""
    
    def __init__(self, db: DatabaseInterface):
        self.db = db
    
    def find_by_id(self, user_id: int) -> dict | None:
        with self.db.transaction() as conn:
            result = conn.execute(
                "SELECT * FROM users WHERE id = :id",
                {"id": user_id}
            )
            return result.fetchone()

# With DI container
from structum_lab.plugins.di import StructumContainer

class AppContainer(StructumContainer):
    database = providers.Singleton(SQLAlchemyDatabase.from_config)
    
    user_repo = providers.Factory(
        UserRepository,
        db=database
    )

8.3 FastAPI Integration

from fastapi import FastAPI, Depends
from structum_lab.database import get_database, DatabaseInterface

app = FastAPI()

# Startup/shutdown
@app.on_event("startup")
async def startup():
    db = SQLAlchemyDatabase.from_config()
    set_database(db)

@app.on_event("shutdown")
async def shutdown():
    db = get_database()
    db.shutdown()

# Dependency
def get_db() -> DatabaseInterface:
    return get_database()

# Route
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: DatabaseInterface = Depends(get_db)):
    with db.transaction() as conn:
        result = conn.execute(
            "SELECT * FROM users WHERE id = :id",
            {"id": user_id}
        )
        user = result.fetchone()
        
        if not user:
            raise HTTPException(status_code=404)
        
        return user

9. API Reference

9.1 Global Functions

def get_database() -> DatabaseInterface:
    """
    Get global database singleton.
    
    Raises:
        RuntimeError if set_database() not called
    """
    ...

def set_database(db: DatabaseInterface) -> None:
    """
    Set global database instance (call once at startup).
    
    Args:
        db: Database implementation instance
    """
    ...

9.2 Protocols

class DatabaseInterface(Protocol):
    def transaction(self) -> ContextManager[ConnectionInterface]:
        ...
    
    def get_connection(self) -> ConnectionInterface:
        ...
    
    def health_check(self) -> HealthResult:
        ...
    
    def shutdown(self) -> None:
        ...

class ConnectionInterface(Protocol):
    def execute(self, query: str, params: dict = None) -> ResultProxy:
        ...
    
    def commit(self) -> None:
        ...
    
    def rollback(self) -> None:
        ...
    
    def close(self) -> None:
        ...

class ResultProxy(Protocol):
    def fetchone(self) -> Optional[dict]:
        ...
    
    def fetchall(self) -> list[dict]:
        ...
    
    def fetchmany(self, size: int = None) -> list[dict]:
        ...
    
    @property
    def rowcount(self) -> int:
        ...

9.3 Data Structures

class HealthStatus(Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"

@dataclass
class HealthResult:
    status: HealthStatus
    message: str
    latency_ms: float
    details: dict = None

See Also