# SQLAlchemy Database Implementation
# SPDX-License-Identifier: Apache-2.0
"""
SQLAlchemy-based database implementation for Structum Lab.
This module provides a production-ready database backend using SQLAlchemy 2.0+.
It supports PostgreSQL, MySQL, SQLite, and other databases supported by SQLAlchemy.
Example:
>>> from structum_lab.plugins.database import SQLAlchemyDatabase
>>>
>>> db = SQLAlchemyDatabase(url="postgresql://user:pass@localhost/mydb")
>>> with db.transaction() as conn:
... conn.execute("SELECT * FROM users")
... users = conn.fetchall()
"""
from __future__ import annotations
import time
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Iterator
from structum_lab.database.interfaces import (
ConnectionInterface,
HealthCheckResult,
HealthStatus,
)
from .base import BaseDatabase
if TYPE_CHECKING:
from sqlalchemy import Engine
from sqlalchemy.orm import Session
[docs]
class SQLAlchemyConnection:
"""SQLAlchemy connection wrapper implementing ConnectionInterface."""
[docs]
def __init__(self, session: Session) -> None:
self._session = session
self._result: Any = None
[docs]
def execute(
self,
query: str,
params: dict[str, Any] | tuple[Any, ...] | None = None,
) -> Any:
"""Execute SQL query."""
from sqlalchemy import text
stmt = text(query)
if params:
if isinstance(params, dict):
self._result = self._session.execute(stmt, params)
else:
self._result = self._session.execute(
stmt, {"p" + str(i): v for i, v in enumerate(params)}
)
else:
self._result = self._session.execute(stmt)
return self._result
[docs]
def fetchone(self) -> dict[str, Any] | None:
"""Fetch one row as dict."""
if self._result is None:
return None
row = self._result.fetchone()
if row is None:
return None
return dict(row._mapping)
[docs]
def fetchall(self) -> list[dict[str, Any]]:
"""Fetch all rows as list of dicts."""
if self._result is None:
return []
return [dict(row._mapping) for row in self._result.fetchall()]
[docs]
def fetchmany(self, size: int) -> list[dict[str, Any]]:
"""Fetch up to size rows."""
if self._result is None:
return []
return [dict(row._mapping) for row in self._result.fetchmany(size)]
[docs]
class SQLAlchemyDatabase(BaseDatabase):
"""SQLAlchemy-based database implementation.
Production-ready database backend with:
- Connection pooling
- Transaction management
- Health checks with latency measurement
- Support for PostgreSQL, MySQL, SQLite, etc.
Example:
>>> db = SQLAlchemyDatabase(url="postgresql://...")
>>> db.connect()
>>>
>>> with db.transaction() as conn:
... conn.execute("INSERT INTO users (name) VALUES (:name)", {"name": "John"})
>>>
>>> db.close()
"""
[docs]
def __init__(
self,
url: str,
*,
pool_size: int = 5,
pool_timeout: int = 30,
echo: bool = False,
pool_pre_ping: bool = True,
**kwargs: Any,
) -> None:
"""Initialize SQLAlchemy database.
Args:
url: SQLAlchemy connection URL
pool_size: Connection pool size
pool_timeout: Pool acquisition timeout
echo: Log SQL queries
pool_pre_ping: Test connections before use (recommended)
**kwargs: Additional SQLAlchemy engine options
"""
super().__init__(
url=url,
pool_size=pool_size,
pool_timeout=pool_timeout,
echo=echo,
**kwargs,
)
self._pool_pre_ping = pool_pre_ping
self._engine: Engine | None = None
[docs]
def connect(self) -> None:
"""Create SQLAlchemy engine and connection pool."""
if self._connected:
return
from sqlalchemy import create_engine
self._engine = create_engine(
self._url,
pool_size=self._pool_size,
pool_timeout=self._pool_timeout,
pool_pre_ping=self._pool_pre_ping,
echo=self._echo,
**self._options,
)
self._connected = True
[docs]
def close(self) -> None:
"""Dispose of connection pool."""
if self._engine is not None:
self._engine.dispose()
self._engine = None
self._connected = False
[docs]
def get_connection(self) -> ConnectionInterface:
"""Get a connection from the pool.
Note:
Prefer using `transaction()` context manager for proper cleanup.
"""
if not self._connected:
self.connect()
from sqlalchemy.orm import Session
assert self._engine is not None
session = Session(self._engine)
return SQLAlchemyConnection(session)
[docs]
@contextmanager
def transaction(self) -> Iterator[ConnectionInterface]:
"""Execute operations within a transaction.
Automatically commits on success, rolls back on exception.
Yields:
Connection with active transaction
"""
if not self._connected:
self.connect()
from sqlalchemy.orm import Session
assert self._engine is not None
session = Session(self._engine)
conn = SQLAlchemyConnection(session)
try:
yield conn
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
[docs]
def health_check(self) -> HealthCheckResult:
"""Check database connectivity.
Returns:
HealthCheckResult with status and latency
"""
if not self._connected:
return HealthCheckResult(
status=HealthStatus.UNHEALTHY,
message="Database not connected",
)
try:
start = time.perf_counter()
with self.transaction() as conn:
conn.execute("SELECT 1")
latency_ms = (time.perf_counter() - start) * 1000
return HealthCheckResult(
status=HealthStatus.HEALTHY,
message="Database connection OK",
latency_ms=round(latency_ms, 2),
)
except Exception as e:
return HealthCheckResult(
status=HealthStatus.UNHEALTHY,
message=f"Database error: {e}",
)