Modulo Auth (structum_lab.auth)

Documentation Source Code License: Apache-2.0

Protocolli e Interfacce Core per il sistema di autenticazione e sicurezza.

Feature

Dettagli

Namespace

structum_lab.auth

Ruolo

Definizione Interfacce (Protocol)


Indice

  1. Cos’è il Modulo Auth

  2. Architettura Protocol-Based

  3. UserInterface Protocol

  4. AuthInterface Protocol

  5. UserRepositoryInterface Protocol

  6. Token Data Structures

  7. Implementazioni Available

  8. Usage Patterns

  9. API Reference


1. Cos’è il Modulo Auth

Il modulo structum_lab.auth definisce le Protocol interfaces per authentication e user management. Non contiene implementazioni - solo contratti che le implementazioni devono rispettare.

1.1 Separation of Concerns

┌───────────────────────────────────────────┐
│ Application Layer                         │
│ - Uses: get_auth(), authenticate()        │
│ - Implements: UserRepository              │
└─────────────────┬─────────────────────────┘
                  │
┌─────────────────▼─────────────────────────┐
│ Core Module (THIS)                        │
│ - Defines: Protocols (interfaces)         │
│ - Zero Dependencies                        │
│ - Type Contracts Only                      │
└─────────────────┬─────────────────────────┘
                  │
┌─────────────────▼─────────────────────────┐
│ Plugin Layer                              │
│ - Provides: JWTAuthProvider               │
│ - Implements: AuthInterface               │
│ - Dependencies: PyJWT, argon2-cffi        │
└───────────────────────────────────────────┘

Benefits:

  • Testability: Mock protocols without plugin dependencies

  • Flexibility: Switch auth implementations (JWT → OAuth2 → Custom)

  • Zero Lock-in: Core stays implementation-agnostic


2. Architettura Protocol-Based

2.1 Protocol vs Abstract Base Class

Structum usa typing.Protocol invece di Abstract Base Classes (ABC):

❌ Traditional ABC:

from abc import ABC, abstractmethod

class AuthBase(ABC):
    @abstractmethod
    def authenticate(self, username, password):
        pass

# Requires inheritance
class MyAuth(AuthBase):  # Tight coupling
    def authenticate(self, username, password):
        # Implementation

✅ Protocol (Structural Subtyping):

from typing import Protocol

class AuthInterface(Protocol):
    def authenticate(self, username, password) -> TokenPair | None:
        ...

# NO inheritance required - duck typing
class MyAuth:  # Loose coupling
    def authenticate(self, username, password) -> TokenPair | None:
        # Implementation
        # MyAuth is compatible automatically!

2.2 Static Type Checking

Protocols enable full static type checking with mypy:

from structum_lab.auth import AuthInterface

def login_user(auth_provider: AuthInterface, username: str, password: str):
    """Type-safe function using protocol."""
    tokens = auth_provider.authenticate(username, password)
    
    if tokens:
        return tokens.access_token
    
    return None

# mypy verifies at compile-time that auth_provider has authenticate()

3. UserInterface Protocol

3.1 Protocol Definition

from typing import Protocol

class UserInterface(Protocol):
    """
    Protocol for authenticated user objects.
    
    Your User model must implement these attributes/methods
    to be compatible with Structum auth system.
    """
    
    # Required attributes
    id: str
    username: str
    hashed_password: str
    roles: list[str]
    
    # Required methods
    def has_permission(self, permission: str) -> bool:
        """Check if user has specific permission."""
        ...

3.2 Implementation Example

from dataclasses import dataclass

@dataclass
class User(UserInterface):
    """User implementation with dataclass."""
    
    id: str
    username: str
    email: str
    hashed_password: str
    roles: list[str]
    is_active: bool = True
    
    def has_permission(self, permission: str) -> bool:
        """Check permissions based on roles."""
        if "admin" in self.roles:
            return True
        
        role_permissions = {
            "editor": ["edit_posts", "create_posts"],
            "viewer": ["view_posts"]
        }
        
        for role in self.roles:
            if permission in role_permissions.get(role, []):
                return True
        
        return False

3.3 ORM Integration

from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class User(Base, UserInterface):
    """SQLAlchemy model implementing UserInterface."""
    
    __tablename__ = "users"
    
    id: Mapped[str] = mapped_column(primary_key=True)
    username: Mapped[str] = mapped_column(unique=True)
    email: Mapped[str] = mapped_column(unique=True)
    hashed_password: Mapped[str]
    roles: Mapped[list[str]] = mapped_column(JSON)
    
    def has_permission(self, permission: str) -> bool:
        return "admin" in self.roles

4. AuthInterface Protocol

4.1 Protocol Definition

from typing import Protocol, Optional

class AuthInterface(Protocol):
    """
    Protocol for authentication providers.
    
    Implementations must provide:
    - User authentication (login)
    - Token refresh
    - Token verification
    """
    
    def authenticate(
        self,
        username: str,
        password: str,
        user_repo: UserRepositoryInterface
    ) -> Optional[TokenPair]:
        """
        Authenticate user with username/password.
        
        Returns:
            TokenPair with access/refresh tokens if successful
            None if authentication failed
        """
        ...
    
    def refresh_access_token(
        self,
        refresh_token: str,
        user_repo: UserRepositoryInterface
    ) -> TokenPair:
        """
        Get new access token using refresh token.
        
        Raises:
            Exception if refresh token invalid/expired
        """
        ...
    
    def verify_access_token(self, token: str) -> dict:
        """
        Verify and decode access token.
        
        Returns:
            Decoded token payload
        
        Raises:
            Exception if token invalid/expired
        """
        ...

4.2 Usage Example

from structum_lab.auth import get_auth, UserRepositoryInterface

# Get auth provider (injected by plugin)
auth = get_auth()

# Authenticate user
user_repo = MyUserRepository()
tokens = auth.authenticate("john_doe", "password123", user_repo)

if tokens:
    print(f"Access token: {tokens.access_token}")
    print(f"Refresh token: {tokens.refresh_token}")
    print(f"Expires in: {tokens.expires_in} seconds")
else:
    print("Authentication failed")

# Verify token later
try:
    payload = auth.verify_access_token(tokens.access_token)
    user_id = payload["sub"]
    roles = payload["roles"]
except Exception as e:
    print(f"Token invalid: {e}")

5. UserRepositoryInterface Protocol

5.1 Protocol Definition

from typing import Protocol, Optional

class UserRepositoryInterface(Protocol):
    """
    Protocol for user data access.
    
    The APPLICATION must implement this to provide
    user lookup functionality to the auth provider.
    """
    
    def find_by_username(self, username: str) -> Optional[UserInterface]:
        """
        Find user by username.
        
        Returns:
            User object if found
            None if not found
        """
        ...

5.2 Implementation Example

from structum_lab.database import get_database

class DatabaseUserRepository(UserRepositoryInterface):
    """Repository using Structum database."""
    
    def find_by_username(self, username: str) -> Optional[User]:
        db = get_database()
        
        with db.transaction() as conn:
            result = conn.execute(
                """
                SELECT id, username, email, hashed_password, roles
                FROM users
                WHERE username = :username AND is_active = true
                """,
                {"username": username}
            )
            row = result.fetchone()
        
        if not row:
            return None
        
        return User(
            id=row["id"],
            username=row["username"],
            email=row["email"],
            hashed_password=row["hashed_password"],
            roles=row["roles"]  # Assume JSON array
        )

5.3 In-Memory Implementation (Testing)

class InMemoryUserRepository(UserRepositoryInterface):
    """In-memory repository for testing."""
    
    def __init__(self, users: dict[str, User] = None):
        self.users = users or {}
    
    def find_by_username(self, username: str) -> Optional[User]:
        return self.users.get(username)

# Test usage
test_repo = InMemoryUserRepository({
    "john": User(
        id="1",
        username="john",
        email="john@example.com",
        hashed_password="...",
        roles=["user"]
    )
})

6. Token Data Structures

6.1 TokenPair

from dataclasses import dataclass

@dataclass
class TokenPair:
    """JWT token pair returned by authenticate()."""
    
    access_token: str       # Short-lived token for API requests
    refresh_token: str      # Long-lived token for renewal
    token_type: str         # "Bearer"
    expires_in: int         # Seconds until access_token expires

Usage:

tokens = auth.authenticate("user", "pass", repo)

# Use in HTTP Authorization header
headers = {
    "Authorization": f"{tokens.token_type} {tokens.access_token}"
}

6.2 Token Payload Structure

Access Token Payload:

{
    "sub": "user_id_123",          # Subject (user ID)
    "username": "john_doe",
    "roles": ["editor", "viewer"],
    "type": "access",
    "exp": 1705234567,             # Expiration timestamp
    "iat": 1705233667              # Issued at timestamp
}

Refresh Token Payload:

{
    "sub": "user_id_123",
    "type": "refresh",
    "exp": 1705838467,
    "iat": 1705233667
}

7. Implementazioni Available

7.1 Official Plugins

Plugin

Implementation

Features

structum-auth

JWTAuthProvider

JWT tokens, Argon2 hashing, RBAC

Installation:

pip install -e packages/auth

Setup:

from structum_lab.auth import set_auth_provider
from structum_lab.plugins.auth import JWTAuthProvider

# Initialize and register
auth = JWTAuthProvider.from_config()
set_auth_provider(auth)

# Now available globally
from structum_lab.auth import get_auth
auth = get_auth()

7.2 Custom Implementation

from structum_lab.auth import AuthInterface, TokenPair

class CustomAuthProvider(AuthInterface):
    """Custom auth implementation."""
    
    def authenticate(
        self,
        username: str,
        password: str,
        user_repo: UserRepositoryInterface
    ) -> Optional[TokenPair]:
        # Custom authentication logic
        user = user_repo.find_by_username(username)
        
        if not user:
            return None
        
        # Custom password verification
        if not self.verify_password(password, user.hashed_password):
            return None
        
        # Generate custom tokens
        return TokenPair(
            access_token=self.generate_access_token(user),
            refresh_token=self.generate_refresh_token(user),
            token_type="Bearer",
            expires_in=900  # 15 minutes
        )
    
    def refresh_access_token(self, refresh_token, user_repo):
        # Custom refresh logic
        pass
    
    def verify_access_token(self, token):
        # Custom verification logic
        pass

8. Usage Patterns

8.1 Global Singleton Pattern

from structum_lab.auth import get_auth, set_auth_provider
from structum_lab.plugins.auth import JWTAuthProvider

# Setup (once at application startup)
def setup_auth():
    auth = JWTAuthProvider.from_config()
    set_auth_provider(auth)

# Usage (anywhere in application)
def login_endpoint(username: str, password: str):
    auth = get_auth()
    tokens = auth.authenticate(username, password, user_repo)
    
    return tokens

8.2 Dependency Injection Pattern

from structum_lab.auth import AuthInterface

class UserService:
    """Service with injected auth dependency."""
    
    def __init__(self, auth: AuthInterface):
        self.auth = auth
    
    def login(self, username: str, password: str, user_repo):
        return self.auth.authenticate(username, password, user_repo)

# With DI container
from structum_lab.plugins.di import StructumContainer

class AppContainer(StructumContainer):
    auth = providers.Singleton(JWTAuthProvider.from_config)
    
    user_service = providers.Factory(
        UserService,
        auth=auth
    )

8.3 FastAPI Integration

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from structum_lab.auth import get_auth

app = FastAPI()
security = HTTPBearer()

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security)
) -> User:
    """Extract user from JWT token."""
    auth = get_auth()
    
    try:
        payload = auth.verify_access_token(credentials.credentials)
        user_id = payload["sub"]
        
        # Load user from repository
        user = user_repo.find_by_id(user_id)
        
        if not user:
            raise HTTPException(status_code=401, detail="Invalid token")
        
        return user
        
    except Exception as e:
        raise HTTPException(status_code=401, detail=f"Invalid token: {e}")

@app.get("/protected")
async def protected_route(current_user: User = Depends(get_current_user)):
    return {"message": f"Hello {current_user.username}!"}

9. API Reference

9.1 Global Functions

def get_auth() -> AuthInterface:
    """
    Get global auth provider singleton.
    
    Raises:
        RuntimeError if set_auth_provider() not called
    """
    ...

def set_auth_provider(provider: AuthInterface) -> None:
    """
    Set global auth provider (call once at startup).
    
    Args:
        provider: Auth implementation instance
    """
    ...

9.2 Protocols

class UserInterface(Protocol):
    id: str
    username: str
    hashed_password: str
    roles: list[str]
    
    def has_permission(self, permission: str) -> bool:
        ...

class AuthInterface(Protocol):
    def authenticate(
        self,
        username: str,
        password: str,
        user_repo: UserRepositoryInterface
    ) -> Optional[TokenPair]:
        ...
    
    def refresh_access_token(
        self,
        refresh_token: str,
        user_repo: UserRepositoryInterface
    ) -> TokenPair:
        ...
    
    def verify_access_token(self, token: str) -> dict:
        ...

class UserRepositoryInterface(Protocol):
    def find_by_username(self, username: str) -> Optional[UserInterface]:
        ...

9.3 Data Structures

@dataclass
class TokenPair:
    access_token: str
    refresh_token: str
    token_type: str
    expires_in: int

See Also