Source code for structum_lab.plugins.database.postgres

# psycopg3 Direct PostgreSQL Implementation
# SPDX-License-Identifier: Apache-2.0

"""
Direct PostgreSQL database implementation using psycopg3.

This module provides a high-performance PostgreSQL backend using psycopg3
(the modern successor to psycopg2). It offers better async support and
native connection pooling.

Example:
    >>> from structum_lab.plugins.database import PostgresDatabase
    >>>
    >>> db = PostgresDatabase(url="postgresql://user:pass@localhost/mydb")
    >>> with db.transaction() as conn:
    ...     conn.execute("SELECT * FROM users")
    ...     users = conn.fetchall()
"""

from __future__ import annotations

import time
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Iterator
from urllib.parse import urlparse

from structum_lab.database.interfaces import (
    ConnectionInterface,
    HealthCheckResult,
    HealthStatus,
)

from .base import BaseDatabase

if TYPE_CHECKING:
    from psycopg_pool import ConnectionPool


[docs] class PostgresConnection: """psycopg3 connection wrapper implementing ConnectionInterface."""
[docs] def __init__(self, conn: Any) -> None: self._conn = conn self._cursor: Any = None
[docs] def execute( self, query: str, params: dict[str, Any] | tuple[Any, ...] | None = None, ) -> Any: """Execute SQL query.""" self._cursor = self._conn.cursor() if params: if isinstance(params, dict): # Convert named params to psycopg format self._cursor.execute(query, params) else: self._cursor.execute(query, params) else: self._cursor.execute(query) return self._cursor
[docs] def fetchone(self) -> dict[str, Any] | None: """Fetch one row as dict.""" if self._cursor is None: return None row = self._cursor.fetchone() if row is None: return None # Get column names from cursor description columns = [desc[0] for desc in self._cursor.description] return dict(zip(columns, row))
[docs] def fetchall(self) -> list[dict[str, Any]]: """Fetch all rows as list of dicts.""" if self._cursor is None: return [] rows = self._cursor.fetchall() columns = [desc[0] for desc in self._cursor.description] return [dict(zip(columns, row)) for row in rows]
[docs] def fetchmany(self, size: int) -> list[dict[str, Any]]: """Fetch up to size rows.""" if self._cursor is None: return [] rows = self._cursor.fetchmany(size) columns = [desc[0] for desc in self._cursor.description] return [dict(zip(columns, row)) for row in rows]
[docs] class PostgresDatabase(BaseDatabase): """Direct PostgreSQL database implementation using psycopg3. High-performance PostgreSQL backend with: - Native connection pooling (psycopg_pool) - Better async support than psycopg2 - Modern Python type support Example: >>> db = PostgresDatabase(url="postgresql://...") >>> db.connect() >>> >>> with db.transaction() as conn: ... conn.execute("INSERT INTO users (name) VALUES (%(name)s)", {"name": "John"}) >>> >>> db.close() Note: This backend only supports PostgreSQL. For other databases, use SQLAlchemyDatabase instead. """
[docs] def __init__( self, url: str, *, pool_size: int = 5, pool_timeout: int = 30, echo: bool = False, min_size: int = 1, **kwargs: Any, ) -> None: """Initialize PostgreSQL database. Args: url: PostgreSQL connection URL pool_size: Maximum pool size pool_timeout: Pool acquisition timeout echo: Log SQL queries (not directly supported, use logging) min_size: Minimum pool size **kwargs: Additional psycopg options """ super().__init__( url=url, pool_size=pool_size, pool_timeout=pool_timeout, echo=echo, **kwargs, ) self._min_size = min_size self._pool: ConnectionPool | None = None # Validate URL is PostgreSQL parsed = urlparse(url) if parsed.scheme not in ("postgresql", "postgres"): raise ValueError( f"PostgresDatabase only supports PostgreSQL URLs, got: {parsed.scheme}" )
[docs] def connect(self) -> None: """Create connection pool.""" if self._connected: return from psycopg_pool import ConnectionPool self._pool = ConnectionPool( conninfo=self._url, min_size=self._min_size, max_size=self._pool_size, timeout=self._pool_timeout, ) self._connected = True
[docs] def close(self) -> None: """Close connection pool.""" if self._pool is not None: self._pool.close() self._pool = None self._connected = False
[docs] def get_connection(self) -> ConnectionInterface: """Get a connection from the pool. Note: Prefer using `transaction()` context manager for proper cleanup. """ if not self._connected: self.connect() assert self._pool is not None conn = self._pool.getconn() return PostgresConnection(conn)
[docs] @contextmanager def transaction(self) -> Iterator[ConnectionInterface]: """Execute operations within a transaction. Automatically commits on success, rolls back on exception. Yields: Connection with active transaction """ if not self._connected: self.connect() assert self._pool is not None with self._pool.connection() as conn: pg_conn = PostgresConnection(conn) try: yield pg_conn conn.commit() except Exception: conn.rollback() raise
[docs] def health_check(self) -> HealthCheckResult: """Check database connectivity. Returns: HealthCheckResult with status and latency """ if not self._connected: return HealthCheckResult( status=HealthStatus.UNHEALTHY, message="Database not connected", ) try: start = time.perf_counter() with self.transaction() as conn: conn.execute("SELECT 1") latency_ms = (time.perf_counter() - start) * 1000 return HealthCheckResult( status=HealthStatus.HEALTHY, message="PostgreSQL connection OK", latency_ms=round(latency_ms, 2), ) except Exception as e: return HealthCheckResult( status=HealthStatus.UNHEALTHY, message=f"PostgreSQL error: {e}", )