Source code for structum_lab.plugins.database.sqlalchemy

# SQLAlchemy Database Implementation
# SPDX-License-Identifier: Apache-2.0

"""
SQLAlchemy-based database implementation for Structum Lab.

This module provides a production-ready database backend using SQLAlchemy 2.0+.
It supports PostgreSQL, MySQL, SQLite, and other databases supported by SQLAlchemy.

Example:
    >>> from structum_lab.plugins.database import SQLAlchemyDatabase
    >>>
    >>> db = SQLAlchemyDatabase(url="postgresql://user:pass@localhost/mydb")
    >>> with db.transaction() as conn:
    ...     conn.execute("SELECT * FROM users")
    ...     users = conn.fetchall()
"""

from __future__ import annotations

import time
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Iterator

from structum_lab.database.interfaces import (
    ConnectionInterface,
    HealthCheckResult,
    HealthStatus,
)

from .base import BaseDatabase

if TYPE_CHECKING:
    from sqlalchemy import Engine
    from sqlalchemy.orm import Session


[docs] class SQLAlchemyConnection: """SQLAlchemy connection wrapper implementing ConnectionInterface."""
[docs] def __init__(self, session: Session) -> None: self._session = session self._result: Any = None
[docs] def execute( self, query: str, params: dict[str, Any] | tuple[Any, ...] | None = None, ) -> Any: """Execute SQL query.""" from sqlalchemy import text stmt = text(query) if params: if isinstance(params, dict): self._result = self._session.execute(stmt, params) else: self._result = self._session.execute( stmt, {"p" + str(i): v for i, v in enumerate(params)} ) else: self._result = self._session.execute(stmt) return self._result
[docs] def fetchone(self) -> dict[str, Any] | None: """Fetch one row as dict.""" if self._result is None: return None row = self._result.fetchone() if row is None: return None return dict(row._mapping)
[docs] def fetchall(self) -> list[dict[str, Any]]: """Fetch all rows as list of dicts.""" if self._result is None: return [] return [dict(row._mapping) for row in self._result.fetchall()]
[docs] def fetchmany(self, size: int) -> list[dict[str, Any]]: """Fetch up to size rows.""" if self._result is None: return [] return [dict(row._mapping) for row in self._result.fetchmany(size)]
[docs] class SQLAlchemyDatabase(BaseDatabase): """SQLAlchemy-based database implementation. Production-ready database backend with: - Connection pooling - Transaction management - Health checks with latency measurement - Support for PostgreSQL, MySQL, SQLite, etc. Example: >>> db = SQLAlchemyDatabase(url="postgresql://...") >>> db.connect() >>> >>> with db.transaction() as conn: ... conn.execute("INSERT INTO users (name) VALUES (:name)", {"name": "John"}) >>> >>> db.close() """
[docs] def __init__( self, url: str, *, pool_size: int = 5, pool_timeout: int = 30, echo: bool = False, pool_pre_ping: bool = True, **kwargs: Any, ) -> None: """Initialize SQLAlchemy database. Args: url: SQLAlchemy connection URL pool_size: Connection pool size pool_timeout: Pool acquisition timeout echo: Log SQL queries pool_pre_ping: Test connections before use (recommended) **kwargs: Additional SQLAlchemy engine options """ super().__init__( url=url, pool_size=pool_size, pool_timeout=pool_timeout, echo=echo, **kwargs, ) self._pool_pre_ping = pool_pre_ping self._engine: Engine | None = None
[docs] def connect(self) -> None: """Create SQLAlchemy engine and connection pool.""" if self._connected: return from sqlalchemy import create_engine self._engine = create_engine( self._url, pool_size=self._pool_size, pool_timeout=self._pool_timeout, pool_pre_ping=self._pool_pre_ping, echo=self._echo, **self._options, ) self._connected = True
[docs] def close(self) -> None: """Dispose of connection pool.""" if self._engine is not None: self._engine.dispose() self._engine = None self._connected = False
[docs] def get_connection(self) -> ConnectionInterface: """Get a connection from the pool. Note: Prefer using `transaction()` context manager for proper cleanup. """ if not self._connected: self.connect() from sqlalchemy.orm import Session assert self._engine is not None session = Session(self._engine) return SQLAlchemyConnection(session)
[docs] @contextmanager def transaction(self) -> Iterator[ConnectionInterface]: """Execute operations within a transaction. Automatically commits on success, rolls back on exception. Yields: Connection with active transaction """ if not self._connected: self.connect() from sqlalchemy.orm import Session assert self._engine is not None session = Session(self._engine) conn = SQLAlchemyConnection(session) try: yield conn session.commit() except Exception: session.rollback() raise finally: session.close()
[docs] def health_check(self) -> HealthCheckResult: """Check database connectivity. Returns: HealthCheckResult with status and latency """ if not self._connected: return HealthCheckResult( status=HealthStatus.UNHEALTHY, message="Database not connected", ) try: start = time.perf_counter() with self.transaction() as conn: conn.execute("SELECT 1") latency_ms = (time.perf_counter() - start) * 1000 return HealthCheckResult( status=HealthStatus.HEALTHY, message="Database connection OK", latency_ms=round(latency_ms, 2), ) except Exception as e: return HealthCheckResult( status=HealthStatus.UNHEALTHY, message=f"Database error: {e}", )