Database Plugin API Reference

Database connectivity and ORM integration plugin for Structum Lab.

Database Plugin Package

Database plugin for Structum Lab.

Provides production-ready database connectivity with support for: - SQLAlchemy 2.0+ (PostgreSQL, MySQL, SQLite, etc.) - psycopg3 (direct PostgreSQL, high performance)

Quick Start:
>>> from structum_lab.plugins.database import SQLAlchemyDatabase
>>>
>>> # Direct configuration
>>> db = SQLAlchemyDatabase(url="postgresql://user:pass@localhost/mydb")
>>>
>>> # Or using config provider (Factory Pattern)
>>> db = SQLAlchemyDatabase.from_config()
>>>
>>> with db.transaction() as conn:
...     conn.execute("SELECT * FROM users")
...     users = conn.fetchall()
Installation:

pip install structum-database[sqlalchemy] # SQLAlchemy backend pip install structum-database[postgres] # psycopg3 backend pip install structum-database[all] # Both backends

class structum_lab.plugins.database.DatabaseInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for database managers in Structum Lab.

This is the main entry point for all database operations. Implementations must provide connection pooling, transaction management, and health monitoring.

The protocol abstracts away database-specific details, allowing applications to work with different backends (PostgreSQL, MySQL, SQLite) through a unified interface.

Implementations:

Example

Basic usage with configuration::

from structum_lab.plugins.database import SQLAlchemyDatabase

# Initialize from config db = SQLAlchemyDatabase.from_config()

# Or with explicit URL db = SQLAlchemyDatabase(url=”postgresql://user:pass@localhost/mydb”)

# Use transaction context manager (recommended) with db.transaction() as conn:

conn.execute(

“INSERT INTO users (name, email) VALUES (:name, :email)”, {“name”: “John”, “email”: “john@example.com”}

)

user_id = conn.execute(“SELECT last_insert_id()”).fetchone()[“id”]

conn.execute(

“INSERT INTO profiles (user_id, bio) VALUES (:uid, :bio)”, {“uid”: user_id, “bio”: “Software engineer”}

) # Commits automatically on success

# Check health health = db.health_check() if health.status != HealthStatus.HEALTHY:

log.warning(“Database issues detected”, result=health)

Note

Always use the transaction() context manager for database operations. It ensures proper connection pooling, automatic commit/rollback, and resource cleanup.

See also

ConnectionInterface: Connection protocol TransactionInterface: Transaction protocol HealthCheckResult: Health check result data class

__init__(*args, **kwargs)
close() None[source]

Close all connections in the pool and release resources.

Should be called during application shutdown to ensure clean termination. Any ongoing transactions should be completed before closing.

Raises:

RuntimeError – If called while transactions are active (implementation-specific).

Example

Application shutdown:

import atexit

db = SQLAlchemyDatabase.from_config ()
atexit.register(db.close)  # Ensure cleanup on exit

# Or in FastAPI lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
    db.connect()
    yield
    db.close()  # Clean shutdown

Warning

After calling close(), the database instance should not be reused. Create a new instance for additional operations.

See also

connect(): Initialize connection pool

connect() None[source]

Establish the database connection pool.

Creates and initializes the connection pool. Called automatically on first database operation if not already connected.

Raises:

Example

Explicit connection during startup:

db = SQLAlchemyDatabase.from_config()

try:
    db.connect()
    log.info("Database pool created", url=db.url)
except ConnectionError as e:
    log.critical("Cannot connect to database", exc_info=True)
    sys.exit(1)

Note

Usually not needed - the database connects automatically on first use. Explicit connection is useful for fail-fast behavior during startup.

See also

close(): Shutdown connection pool

get_connection() ConnectionInterface[source]

Acquire a connection from the pool.

Returns a connection that must be explicitly returned to the pool after use. Prefer using transaction() instead.

Returns:

A database connection from the pool.

Return type:

ConnectionInterface

Raises:
  • PoolExhaustedError – If no connections are available and pool is at maximum.

  • ConnectionError – If pool is not initialized.

Example

Manual connection management (not recommended):

conn = db.get_connection()
try:
    result = conn.execute("SELECT * FROM users")
    users = result.fetchall()
finally:
    # Must return connection to pool manually
    conn.close()

Warning

Manual connection management is error-prone. Always prefer transaction() context manager which handles connections automatically.

See also

transaction(): Recommended connection API

health_check() HealthCheckResult[source]

Check database connectivity and health status.

Performs a simple query to verify the database is responsive and measures latency. Useful for readiness probes and monitoring.

Returns:

Health status with latency and diagnostic details.

Return type:

HealthCheckResult

Example

Health check endpoint:

@app.get("/health/database")
def database_health():
    result = db.health_check()

    if result.status == HealthStatus.UNHEALTHY:
        raise HTTPException(503, detail=result.message)

    return {
        "status": result.status.value,
        "latency_ms": result.latency_ms,
        "message": result.message
    }

Prometheus metrics:

result = db.health_check()

# Record health status
db_health_gauge.labels(database="main").set(
    1 if result.status == HealthStatus.HEALTHY else 0
)

# Record latency
if result.latency_ms:
    db_latency_histogram.observe(result.latency_ms)

Note

Health checks execute a lightweight query (usually SELECT 1). They should complete quickly (<100ms typically).

See also

HealthCheckResult: Result data class HealthStatus: Health status enumeration

property is_connected : bool

Check if the database connection pool is active.

Returns:

True if connection pool is established, False otherwise.

Return type:

bool

Example

Conditional connection:

if not db.is_connected:
    db.connect()
    log.info("Database connection established")

Note

Most operations call connect() automatically if not connected. Explicit checking is mainly useful for health checks and diagnostics.

transaction() Iterator[ConnectionInterface][source]

Context manager for database transactions.

Provides automatic transaction management: commits on success, rolls back on exception. This is the recommended way to perform database operations.

Yields:

ConnectionInterface – Database connection with active transaction.

Raises:
  • DatabaseError – If transaction fails to start.

  • ConnectionError – If pool is exhausted or not connected.

Example

Recommended usage pattern:

# Single transaction
with db.transaction() as conn:
    conn.execute(
        "UPDATE accounts SET balance = balance - :amount WHERE id = :id",
        {"amount": 100, "id": 1}
    )
    conn.execute(
        "UPDATE accounts SET balance = balance + :amount WHERE id = :id",
        {"amount": 100, "id": 2}
    )
    # Commits automatically if no exception

# Exception triggers rollback
try:
    with db.transaction() as conn:
        conn.execute("DELETE FROM important_data")
        raise ValueError("Validation failed")
except ValueError:
    pass  # Transaction automatically rolled back

Note

Transactions are isolated - changes are not visible to other connections until commit. Isolation level depends on database implementation (usually READ COMMITTED).

See also

TransactionInterface: Transaction protocol ConnectionInterface: Connection protocol

property url : str

Get the database connection URL (sanitized).

Returns:

Database URL with password redacted/masked for security.

Example: "postgresql://user:***@localhost:5432/mydb"

Return type:

str

Example

Logging database configuration:

log.info("Connected to database", url=db.url)
# Logs: postgresql://user:***@localhost/mydb

Note

Passwords are automatically redacted to prevent accidental logging of credentials.

class structum_lab.plugins.database.ConnectionInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for database connections in Structum Lab.

A connection represents an active link to the database that can execute queries and retrieve results. Connections are typically obtained from a connection pool and should be returned after use.

Implementations:

Example

Using a connection within a transaction:

with db.transaction() as conn:
    # Execute query with named parameters
    conn.execute(
        "INSERT INTO users (name, email) VALUES (:name, :email)",
        {"name": "John", "email": "john@example.com"}
    )

    # Fetch results
    conn.execute("SELECT * FROM users WHERE id > :id", {"id": 100})
    users = conn.fetchall()
    for user in users:
        print(f"{user['name']} - {user['email']}")

Note

Connections are not thread-safe. Use one connection per thread or protect access with locks.

See also

DatabaseInterface: Database manager providing connections DatabaseInterface.transaction(): Recommended way to get connections

__init__(*args, **kwargs)
execute(query: str, params: dict[str, Any] | tuple[Any, ...] | None = None) Any[source]

Execute a SQL query with optional parameters.

Supports both named parameters (dict) and positional parameters (tuple). Use :name syntax for named parameters, ? for positional.

Parameters:
query : str

SQL query string. Use :param_name for named parameters or ? for positional parameters.

params : dict[str, Any] | tuple[Any, ...] | None

Query parameters. Use dict for named parameters, tuple for positional. Defaults to None.

Returns:

Result object (implementation-specific). Use fetch methods

to retrieve rows.

Return type:

Any

Raises:
  • DatabaseError – If query execution fails.

  • ParameterError – If parameters don’t match query placeholders.

Example

Different parameter styles:

# Named parameters (recommended)
conn.execute(
    "SELECT * FROM users WHERE age > :min_age AND city = :city",
    {"min_age": 18, "city": "NYC"}
)

# Positional parameters
conn.execute(
    "SELECT * FROM users WHERE age > ? AND city = ?",
    (18, "NYC")
)

# No parameters
conn.execute("SELECT COUNT(*) FROM users")

Warning

Always use parameterized queries. Never use string interpolation for user input (SQL injection risk).

See also

fetchone(): Retrieve single row fetchall(): Retrieve all rows

fetchall() list[dict[str, Any]][source]

Fetch all remaining rows from the last executed query.

Returns:

List of rows as dictionaries. Empty list

if no rows or cursor exhausted.

Return type:

list[dict[str, Any]]

Example

Fetching multiple rows:

conn.execute("SELECT * FROM users WHERE active = :active", {"active": True})
users = conn.fetchall()

for user in users:
    print(f"{user['id']}: {user['name']}")

print(f"Total active users: {len(users)}")

Warning

For large result sets, consider using fetchmany() to avoid loading all rows into memory at once.

See also

fetchone(): Fetch single row fetchmany(): Fetch rows in batches

fetchmany(size: int) list[dict[str, Any]][source]

Fetch up to size rows from the last executed query.

Useful for processing large result sets in batches to manage memory usage.

Parameters:
size : int

Maximum number of rows to fetch. Must be > 0.

Returns:

List of rows (up to size). May return fewer

rows if result set is exhausted. Empty list if no rows remain.

Return type:

list[dict[str, Any]]

Example

Batch processing large result set:

conn.execute("SELECT * FROM large_table")

batch_size = 100
while True:
    batch = conn.fetchmany(batch_size)
    if not batch:
        break

    process_batch(batch)
    print(f"Processed {len(batch)} rows")

Note

Efficient for iterating large datasets without loading everything into memory.

See also

fetchone(): Fetch single row fetchall(): Fetch all rows

fetchone() dict[str, Any] | None[source]

Fetch the next row from the last executed query.

Returns:

Row as dictionary with column names as keys,

or None if no more rows available.

Return type:

dict[str, Any] | None

Example

Fetching single row:

conn.execute("SELECT * FROM users WHERE id = :id", {"id": 1})
user = conn.fetchone()

if user:
    print(f"Found user: {user['name']}")
else:
    print("User not found")

Note

Call after execute(). Returns None when cursor exhausted.

See also

fetchall(): Fetch all remaining rows fetchmany(): Fetch specific number of rows

class structum_lab.plugins.database.TransactionInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for database transactions providing ACID guarantees.

Transactions group multiple database operations into a single atomic unit. Either all operations succeed (commit) or all fail (rollback).

Example

Manual transaction management:

tx = db.begin_transaction()
try:
    conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
    tx.commit()
except Exception:
    tx.rollback()
    raise

Preferred context manager approach:

# Automatic commit/rollback
with db.transaction() as conn:
    conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
    # Commits automatically if no exception

Note

Most applications should use DatabaseInterface.transaction() context manager instead of managing transactions manually.

See also

DatabaseInterface.transaction(): Recommended transaction API

__init__(*args, **kwargs)
commit() None[source]

Commit the current transaction.

Makes all changes since transaction start permanent in the database. After commit, the transaction is no longer active.

Raises:
  • DatabaseError – If commit fails (e.g., constraint violation).

  • TransactionError – If transaction is not active.

Example

Manual commit:

tx = db.begin_transaction()
try:
    conn.execute("INSERT INTO logs VALUES (:msg)", {"msg": "test"})
    tx.commit()
    log.info("Transaction committed successfully")
except DatabaseError as e:
    tx.rollback()
    log.error("Commit failed", exc_info=True)

Warning

After commit, the transaction cannot be reused. Start a new transaction for additional operations.

See also

rollback(): Abort transaction

property is_active : bool

Check if the transaction is still active.

Returns:

True if transaction can accept operations, False if

already committed or rolled back.

Return type:

bool

Example

Checking transaction state:

tx = db.begin_transaction()
assert tx.is_active  # True

tx.commit()
assert not tx.is_active  # False

# This would raise TransactionError
# tx.execute("SELECT 1")

Note

Useful for conditional logic and error handling.

rollback() None[source]

Rollback the current transaction.

Discards all changes made since transaction start. The database state returns to what it was before the transaction began.

Raises:

TransactionError – If transaction is not active.

Example

Manual rollback on error:

tx = db.begin_transaction()
try:
    conn.execute("DELETE FROM important_data")
    # Validation fails
    if not validate_deletion():
        tx.rollback()
        log.warning("Deletion rolled back - validation failed")
        return
    tx.commit()
except Exception:
    tx.rollback()
    raise

Note

Rollback is safe to call multiple times. Subsequent calls are no-ops.

See also

commit(): Persist transaction changes

class structum_lab.plugins.database.HealthCheckResult(status: HealthStatus, message: str, latency_ms: float | None = None, details: dict[str, Any] | None = None)[source]

Bases: object

Data class representing database health check results.

status

Overall health status (HEALTHY, DEGRADED, UNHEALTHY).

Type:

HealthStatus

message

Human-readable status description.

Type:

str

latency_ms

Query latency in milliseconds, if available.

Type:

float | None

details

Additional diagnostic information (e.g., active connections, pool statistics).

Type:

dict[str, Any] | None

Example

Creating and using a health check result:

result = HealthCheckResult(
    status=HealthStatus.HEALTHY,
    message="Database connection OK",
    latency_ms=2.5,
    details={"active_connections": 5, "pool_size": 10}
)

# Use in monitoring/alerts
if result.latency_ms and result.latency_ms > 100:
    alert("High database latency", result.latency_ms)

Note

This class is frozen (immutable) to ensure health check results cannot be modified after creation.

__init__(status: HealthStatus, message: str, latency_ms: float | None = None, details: dict[str, Any] | None = None)
details : dict[str, Any] | None = None
latency_ms : float | None = None
status : HealthStatus
message : str
class structum_lab.plugins.database.HealthStatus(*values)[source]

Bases: Enum

Enumeration of possible database health states.

HEALTHY

Database is fully operational with normal latency.

DEGRADED

Database is responsive but experiencing issues (e.g., high latency, connection warnings).

UNHEALTHY

Database is unreachable or critically impaired.

Example

Checking health status:

result = db.health_check()
if result.status == HealthStatus.HEALTHY:
    log.info("Database OK", latency_ms=result.latency_ms)
elif result.status == HealthStatus.DEGRADED:
    log.warning("Database degraded", message=result.message)
else:
    log.error("Database unhealthy", message=result.message)
HEALTHY = 'healthy'
DEGRADED = 'degraded'
UNHEALTHY = 'unhealthy'
class structum_lab.plugins.database.BaseDatabase(url: str, *, pool_size: int = 5, pool_timeout: int = 30, echo: bool = False, **kwargs: Any)[source]

Bases: ABC

Abstract base class for database implementations.

Provides the Factory Pattern via .from_config() class method. Subclasses must implement the abstract methods for their specific backend.

Example

>>> # Direct instantiation
>>> db = SQLAlchemyDatabase(url="postgresql://...")
>>>
>>> # Factory pattern with config
>>> db = SQLAlchemyDatabase.from_config()
>>> db = SQLAlchemyDatabase.from_config(config_key="secondary_db")
DEFAULT_CONFIG_KEY : ClassVar[str] = 'database'

Default config key prefix for database settings

__enter__() BaseDatabase[source]

Context manager entry.

__exit__(*args: Any) None[source]

Context manager exit.

__init__(url: str, *, pool_size: int = 5, pool_timeout: int = 30, echo: bool = False, **kwargs: Any)[source]

Initialize database with connection parameters.

Parameters:
url: str

Database connection URL (e.g., postgresql://…)

pool_size: int = 5

Connection pool size

pool_timeout: int = 30

Pool timeout in seconds

echo: bool = False

Log SQL queries (debug mode)

**kwargs: Any

Additional backend-specific options

abstractmethod close() None[source]

Close all connections.

abstractmethod connect() None[source]

Establish connection to database.

classmethod from_config(config_key: str | None = None, *, config: ConfigInterface | None = None) BaseDatabase[source]

Factory method to create database from config provider.

This method reads database configuration from the active config provider (e.g., dynaconf), allowing decoupled configuration.

Parameters:
config_key: str | None = None

Config key prefix (default: “database”)

config: ConfigInterface | None = None

Explicit config provider (optional, uses global if None)

Returns:

Configured database instance

Raises:
  • RuntimeError – If no config provider is available

  • KeyError – If required config keys are missing

Example

>>> # Uses default config key "database"
>>> db = SQLAlchemyDatabase.from_config()
>>>
>>> # Uses custom config key for secondary database
>>> db = SQLAlchemyDatabase.from_config("replica_db")
Config Structure:

`toml [database] url = "postgresql://user:pass@localhost/mydb" pool_size = 10 pool_timeout = 30 echo = false `

abstractmethod get_connection() ConnectionInterface[source]

Get a connection from the pool.

abstractmethod health_check() HealthCheckResult[source]

Check database health.

property is_connected : bool

Check if database is connected.

abstractmethod transaction() Iterator[ConnectionInterface][source]

Context manager for transactions.

property url : str

Database URL (sanitized, password hidden).

structum_lab.plugins.database.__getattr__(name: str)[source]

Lazy loading of backends to handle optional dependencies.

Base Database

Base class for database implementations with Factory Pattern support.

The factory pattern allows database backends to be configured via: 1. Direct instantiation: SQLAlchemyDatabase(url=”…”) 2. Config provider: SQLAlchemyDatabase.from_config()

The .from_config() method uses the active config provider (e.g., dynaconf) to read database settings, enabling decoupled configuration.

class structum_lab.plugins.database.base.BaseDatabase(url: str, *, pool_size: int = 5, pool_timeout: int = 30, echo: bool = False, **kwargs: Any)[source]

Bases: ABC

Abstract base class for database implementations.

Provides the Factory Pattern via .from_config() class method. Subclasses must implement the abstract methods for their specific backend.

Example

>>> # Direct instantiation
>>> db = SQLAlchemyDatabase(url="postgresql://...")
>>>
>>> # Factory pattern with config
>>> db = SQLAlchemyDatabase.from_config()
>>> db = SQLAlchemyDatabase.from_config(config_key="secondary_db")
DEFAULT_CONFIG_KEY : ClassVar[str] = 'database'

Default config key prefix for database settings

__init__(url: str, *, pool_size: int = 5, pool_timeout: int = 30, echo: bool = False, **kwargs: Any)[source]

Initialize database with connection parameters.

Parameters:
url: str

Database connection URL (e.g., postgresql://…)

pool_size: int = 5

Connection pool size

pool_timeout: int = 30

Pool timeout in seconds

echo: bool = False

Log SQL queries (debug mode)

**kwargs: Any

Additional backend-specific options

classmethod from_config(config_key: str | None = None, *, config: ConfigInterface | None = None) BaseDatabase[source]

Factory method to create database from config provider.

This method reads database configuration from the active config provider (e.g., dynaconf), allowing decoupled configuration.

Parameters:
config_key: str | None = None

Config key prefix (default: “database”)

config: ConfigInterface | None = None

Explicit config provider (optional, uses global if None)

Returns:

Configured database instance

Raises:
  • RuntimeError – If no config provider is available

  • KeyError – If required config keys are missing

Example

>>> # Uses default config key "database"
>>> db = SQLAlchemyDatabase.from_config()
>>>
>>> # Uses custom config key for secondary database
>>> db = SQLAlchemyDatabase.from_config("replica_db")
Config Structure:

`toml [database] url = "postgresql://user:pass@localhost/mydb" pool_size = 10 pool_timeout = 30 echo = false `

property url : str

Database URL (sanitized, password hidden).

property is_connected : bool

Check if database is connected.

abstractmethod connect() None[source]

Establish connection to database.

abstractmethod close() None[source]

Close all connections.

abstractmethod get_connection() ConnectionInterface[source]

Get a connection from the pool.

abstractmethod transaction() Iterator[ConnectionInterface][source]

Context manager for transactions.

abstractmethod health_check() HealthCheckResult[source]

Check database health.

__enter__() BaseDatabase[source]

Context manager entry.

__exit__(*args: Any) None[source]

Context manager exit.

SQLAlchemy Backend

SQLAlchemy-based database implementation for Structum Lab.

This module provides a production-ready database backend using SQLAlchemy 2.0+. It supports PostgreSQL, MySQL, SQLite, and other databases supported by SQLAlchemy.

Example

>>> from structum_lab.plugins.database import SQLAlchemyDatabase
>>>
>>> db = SQLAlchemyDatabase(url="postgresql://user:pass@localhost/mydb")
>>> with db.transaction() as conn:
...     conn.execute("SELECT * FROM users")
...     users = conn.fetchall()
class structum_lab.plugins.database.sqlalchemy.SQLAlchemyConnection(session: Session)[source]

Bases: object

SQLAlchemy connection wrapper implementing ConnectionInterface.

__init__(session: Session)[source]
execute(query: str, params: dict[str, Any] | tuple[Any, ...] | None = None) Any[source]

Execute SQL query.

fetchone() dict[str, Any] | None[source]

Fetch one row as dict.

fetchall() list[dict[str, Any]][source]

Fetch all rows as list of dicts.

fetchmany(size: int) list[dict[str, Any]][source]

Fetch up to size rows.

class structum_lab.plugins.database.sqlalchemy.SQLAlchemyDatabase(url: str, *, pool_size: int = 5, pool_timeout: int = 30, echo: bool = False, pool_pre_ping: bool = True, **kwargs: Any)[source]

Bases: BaseDatabase

SQLAlchemy-based database implementation.

Production-ready database backend with: - Connection pooling - Transaction management - Health checks with latency measurement - Support for PostgreSQL, MySQL, SQLite, etc.

Example

>>> db = SQLAlchemyDatabase(url="postgresql://...")
>>> db.connect()
>>>
>>> with db.transaction() as conn:
...     conn.execute("INSERT INTO users (name) VALUES (:name)", {"name": "John"})
>>>
>>> db.close()
__init__(url: str, *, pool_size: int = 5, pool_timeout: int = 30, echo: bool = False, pool_pre_ping: bool = True, **kwargs: Any)[source]

Initialize SQLAlchemy database.

Parameters:
url: str

SQLAlchemy connection URL

pool_size: int = 5

Connection pool size

pool_timeout: int = 30

Pool acquisition timeout

echo: bool = False

Log SQL queries

pool_pre_ping: bool = True

Test connections before use (recommended)

**kwargs: Any

Additional SQLAlchemy engine options

connect() None[source]

Create SQLAlchemy engine and connection pool.

close() None[source]

Dispose of connection pool.

get_connection() ConnectionInterface[source]

Get a connection from the pool.

Note

Prefer using transaction() context manager for proper cleanup.

transaction() Iterator[ConnectionInterface][source]

Execute operations within a transaction.

Automatically commits on success, rolls back on exception.

Yields:

Connection with active transaction

health_check() HealthCheckResult[source]

Check database connectivity.

Returns:

HealthCheckResult with status and latency

PostgreSQL Backend

Direct PostgreSQL database implementation using psycopg3.

This module provides a high-performance PostgreSQL backend using psycopg3 (the modern successor to psycopg2). It offers better async support and native connection pooling.

Example

>>> from structum_lab.plugins.database import PostgresDatabase
>>>
>>> db = PostgresDatabase(url="postgresql://user:pass@localhost/mydb")
>>> with db.transaction() as conn:
...     conn.execute("SELECT * FROM users")
...     users = conn.fetchall()
class structum_lab.plugins.database.postgres.PostgresConnection(conn: Any)[source]

Bases: object

psycopg3 connection wrapper implementing ConnectionInterface.

__init__(conn: Any)[source]
execute(query: str, params: dict[str, Any] | tuple[Any, ...] | None = None) Any[source]

Execute SQL query.

fetchone() dict[str, Any] | None[source]

Fetch one row as dict.

fetchall() list[dict[str, Any]][source]

Fetch all rows as list of dicts.

fetchmany(size: int) list[dict[str, Any]][source]

Fetch up to size rows.

class structum_lab.plugins.database.postgres.PostgresDatabase(url: str, *, pool_size: int = 5, pool_timeout: int = 30, echo: bool = False, min_size: int = 1, **kwargs: Any)[source]

Bases: BaseDatabase

Direct PostgreSQL database implementation using psycopg3.

High-performance PostgreSQL backend with: - Native connection pooling (psycopg_pool) - Better async support than psycopg2 - Modern Python type support

Example

>>> db = PostgresDatabase(url="postgresql://...")
>>> db.connect()
>>>
>>> with db.transaction() as conn:
...     conn.execute("INSERT INTO users (name) VALUES (%(name)s)", {"name": "John"})
>>>
>>> db.close()

Note

This backend only supports PostgreSQL. For other databases, use SQLAlchemyDatabase instead.

__init__(url: str, *, pool_size: int = 5, pool_timeout: int = 30, echo: bool = False, min_size: int = 1, **kwargs: Any)[source]

Initialize PostgreSQL database.

Parameters:
url: str

PostgreSQL connection URL

pool_size: int = 5

Maximum pool size

pool_timeout: int = 30

Pool acquisition timeout

echo: bool = False

Log SQL queries (not directly supported, use logging)

min_size: int = 1

Minimum pool size

**kwargs: Any

Additional psycopg options

connect() None[source]

Create connection pool.

close() None[source]

Close connection pool.

get_connection() ConnectionInterface[source]

Get a connection from the pool.

Note

Prefer using transaction() context manager for proper cleanup.

transaction() Iterator[ConnectionInterface][source]

Execute operations within a transaction.

Automatically commits on success, rolls back on exception.

Yields:

Connection with active transaction

health_check() HealthCheckResult[source]

Check database connectivity.

Returns:

HealthCheckResult with status and latency