Source code for structum_lab.auth.interfaces

# Auth Interfaces - Core Protocols
# SPDX-License-Identifier: Apache-2.0

"""
Core authentication interfaces for Structum Lab.

This module defines the Protocols that any auth implementation must follow.
Implementations are provided by the `structum-auth` plugin.

Key Design Decision:
    Auth does NOT manage database/storage. The application implements
    `UserRepositoryInterface` to provide user data. This keeps auth
    decoupled from any specific storage solution.

Example:
    >>> from structum_lab.plugins.auth import JWTAuthProvider
    >>> auth = JWTAuthProvider.from_config()
    >>> tokens = auth.authenticate("user", "password", user_repo)
"""

from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime
from typing import Any, Protocol, runtime_checkable


[docs] @dataclass(frozen=True) class TokenPair: """ Data class containing JWT access and refresh token pair. Attributes: access_token (str): Short-lived JWT access token for API requests. refresh_token (str): Long-lived token for obtaining new access tokens. token_type (str): Token type, typically "bearer" for JWT. Defaults to "bearer". expires_at (datetime | None): Expiration timestamp for access token, if available. Example: Creating and using token pair:: tokens = TokenPair( access_token="eyJ0eXAiOiJKV1QiLCJhbGc...", refresh_token="eyJ0eXAiOiJKV1QiLCJhbGc...", token_type="bearer", expires_at=datetime.now() + timedelta(hours=1) ) # Use in HTTP Authorization header headers = {"Authorization": f"{tokens.token_type} {tokens.access_token}"} Note: This class is frozen (immutable) to prevent accidental token modification. Tokens should be treated as opaque strings and never parsed by clients. See Also: :meth:`AuthInterface.authenticate`: Method that returns token pairs :meth:`AuthInterface.refresh`: Refresh access tokens """ access_token: str refresh_token: str token_type: str = "bearer" expires_at: datetime | None = None
[docs] @runtime_checkable class UserInterface(Protocol): """ Protocol for authenticated user entities in Structum Lab. Applications must implement this protocol for their User model to integrate with the authentication system. The auth system never creates or modifies users - it only queries them via :class:`UserRepositoryInterface`. Example: Implementing UserInterface:: from dataclasses import dataclass @dataclass class User: id: str username: str hashed_password: str roles: list[str] permissions: set[str] def has_permission(self, permission: str) -> bool: # Check role-based permissions for role in self.roles: if permission in ROLE_PERMISSIONS.get(role, set()): return True # Check user-specific permissions return permission in self.permissions Using with auth:: user = user_repo.find_by_username("john") if user and user.has_permission("users:write"): # Allow operation pass Note: This is a Protocol, not a base class. Your User model doesn't need to inherit from this - just implement the required properties and methods. See Also: :class:`UserRepositoryInterface`: Repository for user data access :class:`AuthInterface`: Authentication provider using users """ @property def id(self) -> str: """ Unique identifier for the user. Returns: str: User ID, typically a UUID or database primary key. Example: User ID in token payload:: token_payload = {"user_id": user.id, "exp": ...} """ ... @property def username(self) -> str: """ User's username or email address. Returns: str: Username, typically used for login and display. Example: username in login:: user = user_repo.find_by_username(username) if user and auth.verify_password(password, user.hashed_password): return auth.authenticate(...) """ ... @property def roles(self) -> list[str]: """ List of roles assigned to the user. Returns: list[str]: Role names (e.g., ["admin", "user", "moderator"]). Example: Role-based access control:: if "admin" in user.roles: # Allow admin operation pass Note: Roles should be lowercase strings. Use :meth:`has_permission` for fine-grained permission checks. """ ... @property def hashed_password(self) -> str: """ User's hashed password. Returns: str: Securely hashed password (e.g., Argon2, bcrypt). Warning: Never store or transmit plain-text passwords. This property should only return hashed values. Example: Password verification:: if auth.verify_password(input_password, user.hashed_password): # Password matches pass See Also: :meth:`PasswordHasherInterface.hash`: Hash passwords :meth:`PasswordHasherInterface.verify`: Verify passwords """ ...
[docs] def has_permission(self, permission: str) -> bool: """ Check if user has a specific permission. Args: permission (str): Permission string, typically in format ``resource:action`` (e.g., "users:write", "posts:delete"). Returns: bool: True if user has the permission, False otherwise. Example: Permission-based authorization:: @app.delete("/users/{user_id}") async def delete_user(user_id: str, current_user: User = Depends(get_current_user)): if not current_user.has_permission("users:delete"): raise HTTPException(403, "Permission denied") # Delete user Role-based permission mapping:: ROLE_PERMISSIONS = { "admin": {"users:read", "users:write", "users:delete"}, "user": {"users:read"}, } def has_permission(self, permission: str) -> bool: return any( permission in ROLE_PERMISSIONS.get(role, set()) for role in self.roles ) Note: Permission format is application-defined. Use a consistent naming scheme (e.g., resource:action). """ ...
[docs] @runtime_checkable class UserRepositoryInterface(Protocol): """ Protocol for user storage and retrieval in Structum Lab. Applications implement this to connect the auth system with their database or user storage backend. The auth plugin does NOT manage user storage directly - it delegates all user operations to this repository. This separation ensures the auth system remains storage-agnostic and can work with any database (PostgreSQL, MongoDB, etc.) or user service (LDAP, OAuth providers). Implementations: - Database-backed repository (SQLAlchemy, etc.) - External user service adapter (LDAP, Active Directory) - In-memory repository (testing only) Example: SQLAlchemy repository implementation:: class SQLAlchemyUserRepository: def __init__(self, db: DatabaseInterface): self.db = db def find_by_username(self, username: str) -> User | None: with self.db.transaction() as conn: conn.execute( "SELECT * FROM users WHERE username = :username", {"username": username} ) row = conn.fetchone() if not row: return None return User( id=row["id"], username=row["username"], hashed_password=row["password_hash"], roles=row.get("roles", []), ) def find_by_id(self, user_id: str) -> User | None: with self.db.transaction() as conn: conn.execute( "SELECT * FROM users WHERE id = :id", {"id": user_id} ) row = conn.fetchone() return User(**row) if row else None Using with authentication:: user_repo = SQLAlchemyUserRepository(db) auth = JWTAuthProvider.from_config() tokens = auth.authenticate("john", "password123", user_repo) if tokens: print(f"Access token: {tokens.access_token}") Note: Repository is responsible for mapping storage format to :class:`UserInterface`. It should handle serialization/deserialization of user data. See Also: :class:`UserInterface`: User entity protocol :class:`AuthInterface`: Authentication provider using repositories """
[docs] def find_by_username(self, username: str) -> UserInterface | None: """ Find a user by username or email. Args: username (str): Username or email to search for. Should be case-insensitive in most implementations. Returns: UserInterface | None: User if found, None otherwise. Example: Looking up user for authentication:: user = user_repo.find_by_username("john@example.com") if user: # Verify password if auth.verify_password(password, user.hashed_password): return auth.create_tokens(user) else: log.warning("Login attempt for unknown user", username=username) Note: Implementation should normalize username (e.g., lowercase) before lookup. Consider using database indexes on username column for performance. """ ...
[docs] def find_by_id(self, user_id: str) -> UserInterface | None: """ Find a user by their unique identifier. Args: user_id (str): User's unique identifier (typically UUID or database ID). Returns: UserInterface | None: User if found, None otherwise. Example: Loading user from token:: # After verifying access token payload = auth.verify_access_token(token) if payload: user = user_repo.find_by_id(payload["user_id"]) if user: # User authenticated return user Note: This method is called frequently (on every authenticated request). Consider caching user data for performance. """ ...
[docs] @runtime_checkable class PasswordHasherInterface(Protocol): """ Protocol for secure password hashing in Structum Lab. Implementations must use cryptographically secure hashing algorithms (e.g., Argon2, b crypt, scrypt). Never use fast hashes like MD5 or SHA-1 for passwords. Implementations: - :class:`~structum_lab.plugins.auth.password.Argon2Hasher` (recommended) - :class:`~structum_lab.plugins.auth.password.BcryptHasher` Example: Using password hasher:: from structum_lab.plugins.auth.password import Argon2Hasher hasher = Argon2Hasher() # Hash password during registration hashed = hasher.hash("user_password_123") # Store hashed in database: user.hashed_password = hashed # Verify during login if hasher.verify("user_password_123", hashed): # Password matches return create_token(user) else: # Invalid password raise AuthenticationError("Invalid credentials") Warning: Never log, display, or store plain-text passwords. Always hash passwords immediately upon receipt. See Also: :class:`AuthInterface`: Auth provider using password hasher :class:`UserInterface`: User entity with hashed_password property """
[docs] def hash(self, password: str) -> str: """ Hash a plain-text password securely. Args: password (str): Plain-text password to hash. No length restrictions, but implementations may truncate very long passwords. Returns: str: Hashed password string including algorithm identifier and salt. Format is implementation-specific (e.g., Argon2: ``$argon2id$v=19$...``). Example: Creating user with hashed password:: # During user registration plain_password = request.form["password"] hashed = auth.hash_password(plain_password) user = User( id=generate_id(), username=request.form["username"], hashed_password=hashed, # Store this roles=["user"] ) user_repo.save(user) Warning: Hashing is intentionally slow (100-500ms) to resist brute-force attacks. Do not hash passwords in tight loops or performance-critical paths. Note: Each call generates a unique hash (due to random salt) even for the same password. This is expected and secure behavior. """ ...
[docs] def verify(self, password: str, hashed: str) -> bool: """ Verify a plain-text password against a hash. Args: password (str): Plain-text password to verify. hashed (str): Previously hashed password (from database). Returns: bool: True if password matches hash, False otherwise. Example: Password verification during login:: # Get user from database user = user_repo.find_by_username(username) if not user: return None # User not found # Verify password if auth.verify_password(password, user.hashed_password): # Authentication successful return auth.create_tokens(user) else: # Invalid password log.warning("Failed login attempt", username=username) return None Warning: Always use constant-time comparison internally to prevent timing attacks. Most modern hashing libraries handle this automatically. Note: Returns False for invalid/malformed hashes rather than raising exceptions. This prevents information leakage about hash format. """ ...
[docs] @runtime_checkable class AuthInterface(Protocol): """ Protocol for authentication providers in Structum Lab. This is the main entry point for all authentication operations. Implementations provide JWT-based authentication, password hashing, and token management. The auth system is storage-agnostic - it doesn't manage users directly. Instead, it uses :class:`UserRepositoryInterface` to fetch user data, keeping authentication logic decoupled from storage. Implementations: - :class:`~structum_lab.plugins.auth.jwt.JWTAuthProvider` (recommended) - :class:`~structum_lab.plugins.auth.oauth.OAuthProvider` Example: See usage in specific implementations like JWTAuthProvider. """
[docs] def authenticate( self, username: str, password: str, user_repo: UserRepositoryInterface ) -> TokenPair | None: """Authenticate a user.""" ...
[docs] def refresh( self, refresh_token: str, user_repo: UserRepositoryInterface ) -> TokenPair | None: """Refresh access token.""" ...
[docs] def verify_access_token(self, token: str) -> dict[str, Any] | None: """Verify an access token.""" ...
[docs] def hash_password(self, password: str) -> str: """Hash a password.""" ...
[docs] def verify_password(self, password: str, hashed: str) -> bool: """Verify a password.""" ...
# Type aliases for convenience User = UserInterface UserRepository = UserRepositoryInterface Auth = AuthInterface PasswordHasher = PasswordHasherInterface