Structum Auth (structum-auth)¶
Structum Auth gestisce l’autenticazione JWT, l’hashing sicuro delle password (Argon2) e il controllo degli accessi (RBAC).
Feature |
Stato |
Versione |
|---|---|---|
Stato |
Alpha |
0.1.0 |
Namespace |
|
|
Dipendenze |
|
Indice¶
1. Cos’è Auth Plugin¶
structum-auth è un sistema di autenticazione production-ready che porta sicurezza enterprise in Structum:
1.1 Problema Risolto¶
Prima (senza plugin):
# ❌ Password in plaintext
users_db = {"john": "password123"}
# ❌ Nessuna expirazione token
# ❌ Nessun hash sicuro
# ❌ Nessun RBAC
Dopo (con plugin):
# ✅ Argon2 password hashing
# ✅ JWT con access + refresh tokens
# ✅ RBAC integrato
# ✅ Type-safe con Pydantic
auth = JWTAuthProvider.from_config()
tokens = auth.authenticate("john", "password123", user_repo)
1.2 Caratteristiche Principali¶
Feature |
Descrizione |
|---|---|
JWT Tokens |
Access token short-lived + Refresh token long-lived |
Argon2 Hashing |
State-of-the-art password hashing (migliore di bcrypt) |
RBAC |
Role-Based Access Control con permission checking |
Type Safety |
Validazione Pydantic su tutti i modelli |
Config Integration |
Setup via Dynaconf con secrets isolation |
Extensible |
Protocol-based per custom implementations |
2. Concetti Fondamentali¶
2.1 JWT Token System¶
Il plugin usa un dual-token system per sicurezza e usabilità:
┌─────────────────────────────────────────┐
│ Access Token (Short-lived: 15 min) │
│ - Usato per API requests │
│ - Contiene user ID + roles │
│ - Non rinnovabile │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ Refresh Token (Long-lived: 7 giorni) │
│ - Usato solo per renewal │
│ - Stored securely (HttpOnly cookie) │
│ - Revocabile │
└─────────────────────────────────────────┘
Workflow:
Login → Ritorna access + refresh token
Client usa access token per API calls
Access token scade → Client usa refresh per ottenere nuovo access
Refresh token scade → Richiede nuovo login
2.2 Argon2 Password Hashing¶
Perché Argon2 invece di bcrypt?
Feature |
Argon2 |
bcrypt |
|---|---|---|
Memory-hard |
✅ Sì (resistente a GPU) |
❌ No |
Configurabilità |
Time + Memory + Parallelism |
Solo iterations |
Winner PHC |
✅ Password Hashing Competition 2015 |
Older (1999) |
Performance |
Parametrizzabile |
Fixed |
2.3 RBAC Model¶
User
├── roles: list[str] # ["admin", "editor"]
└── has_permission(perm: str) # Business logic
# Esempio permission check
if user.has_permission("delete_posts"):
delete_post()
3. Quick Start (5 Minuti)¶
Step 1: Installazione¶
pip install -e packages/auth
Step 2: Configurazione¶
File: config/app/auth.toml
[default]
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7
File: config/.secrets.toml (⚠️ aggiungi a .gitignore)
[auth]
SECRET_KEY = "your-256-bit-secret-key-change-this"
REFRESH_SECRET_KEY = "different-256-bit-refresh-key"
Step 3: Implementa User Model¶
from dataclasses import dataclass
from structum_lab.auth import UserInterface
@dataclass
class User(UserInterface):
"""User model per la tua applicazione."""
id: str
username: str
email: str
hashed_password: str
roles: list[str]
is_active: bool = True
def has_permission(self, permission: str) -> bool:
"""
Logica custom per permission checking.
Esempio: admin ha tutti i permessi.
"""
if "admin" in self.roles:
return True
# Permission mapping custom
role_permissions = {
"editor": ["edit_posts", "view_posts"],
"viewer": ["view_posts"]
}
for role in self.roles:
if permission in role_permissions.get(role, []):
return True
return False
Step 4: Implementa Repository¶
from structum_lab.auth import UserRepositoryInterface
from typing import Optional
class UserRepository(UserRepositoryInterface):
"""Repository per accesso database utenti."""
def __init__(self, db_connection):
self.db = db_connection
def find_by_username(self, username: str) -> Optional[User]:
"""Trova utente per username."""
# Esempio con SQL
result = self.db.execute(
"SELECT * FROM users WHERE username = :username",
{"username": username}
)
row = result.fetchone()
if not row:
return None
return User(
id=row["id"],
username=row["username"],
email=row["email"],
hashed_password=row["hashed_password"],
roles=row["roles"].split(","), # Assume comma-separated
is_active=row["is_active"]
)
Step 5: Authenticate¶
from structum_lab.plugins.auth import JWTAuthProvider
from structum_lab.config import set_config_provider
from structum_lab.plugins.dynaconf import DynaconfConfigProvider
# Setup config
provider = DynaconfConfigProvider(root_path=".")
provider.auto_discover()
set_config_provider(provider)
# Crea auth provider
auth = JWTAuthProvider.from_config()
# Crea repository
user_repo = UserRepository(db_connection)
# Login
tokens = auth.authenticate("john_doe", "password123", user_repo)
if tokens:
print(f"✅ Login successful")
print(f"Access Token: {tokens.access_token}")
print(f"Refresh Token: {tokens.refresh_token}")
print(f"Expires in: {tokens.expires_in} seconds")
else:
print("❌ Invalid credentials")
4. Implementazione User Model¶
4.1 UserInterface Protocol¶
Il plugin richiede che il tuo User model implementi UserInterface:
from typing import Protocol
class UserInterface(Protocol):
"""Protocol che ogni User model deve rispettare."""
id: str
username: str
hashed_password: str
roles: list[str]
def has_permission(self, permission: str) -> bool:
"""Check se user ha un permesso specifico."""
...
4.2 Esempio con Dataclass¶
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class User(UserInterface):
id: str
username: str
email: str
hashed_password: str
roles: list[str] = field(default_factory=list)
is_active: bool = True
created_at: datetime = field(default_factory=datetime.utcnow)
last_login: datetime | None = None
def has_permission(self, permission: str) -> bool:
# Implementazione custom
return "admin" in self.roles
4.3 Esempio con Pydantic¶
from pydantic import BaseModel, Field
from datetime import datetime
class User(BaseModel, UserInterface):
"""User model con validazione Pydantic."""
id: str = Field(..., min_length=1)
username: str = Field(..., min_length=3, max_length=50)
email: str = Field(..., pattern=r'^[\w\.-]+@[\w\.-]+\.\w+$')
hashed_password: str
roles: list[str] = Field(default_factory=list)
is_active: bool = True
created_at: datetime = Field(default_factory=datetime.utcnow)
def has_permission(self, permission: str) -> bool:
return "admin" in self.roles
4.4 Esempio con SQLAlchemy ORM¶
from sqlalchemy import Column, String, Boolean, DateTime, JSON
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base, UserInterface):
__tablename__ = "users"
id = Column(String, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(255), unique=True, nullable=False)
hashed_password = Column(String(255), nullable=False)
roles = Column(JSON, default=list)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
def has_permission(self, permission: str) -> bool:
return "admin" in (self.roles or [])
5. Repository Pattern¶
5.1 UserRepositoryInterface¶
from typing import Protocol, Optional
class UserRepositoryInterface(Protocol):
"""Protocol per repository implementability."""
def find_by_username(self, username: str) -> Optional[UserInterface]:
"""Trova user per username. Ritorna None se non trovato."""
...
5.2 Implementazione con Database¶
from structum_lab.plugins.database import get_database
class DatabaseUserRepository(UserRepositoryInterface):
"""Repository usando Structum Database plugin."""
def find_by_username(self, username: str) -> Optional[User]:
db = get_database()
with db.transaction() as conn:
result = conn.execute(
"SELECT * FROM users WHERE username = :u AND is_active = true",
{"u": username}
)
row = result.fetchone()
if not row:
return None
return User(
id=row["id"],
username=row["username"],
email=row["email"],
hashed_password=row["hashed_password"],
roles=row["roles"] # Assume già lista
)
5.3 Implementazione In-Memory (Testing)¶
class InMemoryUserRepository(UserRepositoryInterface):
"""Repository in-memory per testing."""
def __init__(self):
# Pre-populate con utenti di test
self.users = {
"admin": User(
id="1",
username="admin",
email="admin@example.com",
hashed_password="$argon2id$...", # Hash pre-calcolato
roles=["admin"]
),
"john": User(
id="2",
username="john",
email="john@example.com",
hashed_password="$argon2id$...",
roles=["editor"]
)
}
def find_by_username(self, username: str) -> Optional[User]:
return self.users.get(username)
6. JWT Token System¶
6.1 Token Structure¶
Access Token Payload:
{
"sub": "user_id_123",
"username": "john_doe",
"roles": ["editor", "viewer"],
"type": "access",
"exp": 1705234567,
"iat": 1705233667
}
Refresh Token Payload:
{
"sub": "user_id_123",
"type": "refresh",
"exp": 1705838467,
"iat": 1705233667
}
6.2 Token Generation¶
auth = JWTAuthProvider.from_config()
# Authenticate and get tokens
tokens = auth.authenticate("john", "password", user_repo)
# Tokens object
tokens.access_token # str: JWT access token
tokens.refresh_token # str: JWT refresh token
tokens.token_type # str: "Bearer"
tokens.expires_in # int: seconds until access expires
6.3 Token Verification¶
# Verifica access token
try:
payload = auth.verify_access_token(access_token)
user_id = payload["sub"]
roles = payload["roles"]
except Exception as e:
# Token invalid/expired
print(f"Invalid token: {e}")
6.4 Token Refresh¶
# Usa refresh token per ottenere nuovo access token
try:
new_tokens = auth.refresh_access_token(refresh_token, user_repo)
new_access = new_tokens.access_token
except Exception as e:
# Refresh token invalid/expired
print(f"Refresh failed: {e}")
7. Password Hashing¶
7.1 Hash Password¶
from structum_lab.plugins.auth import hash_password, verify_password
# Durante registrazione
plain_password = "SuperSecret123!"
hashed = hash_password(plain_password)
# Store in database
user.hashed_password = hashed
Output:
$argon2id$v=19$m=65536,t=3,p=4$randomsalt$hashedvalue
7.2 Verify Password¶
# Durante login
stored_hash = user.hashed_password
input_password = "SuperSecret123!"
if verify_password(input_password, stored_hash):
print("✅ Password correct")
else:
print("❌ Password incorrect")
7.3 Custom Argon2 Parameters¶
# Advanced: Custom hashing parameters
from argon2 import PasswordHasher
ph = PasswordHasher(
time_cost=3, # Iterations
memory_cost=65536, # 64 MB
parallelism=4, # Threads
hash_len=32, # Output length
salt_len=16 # Salt length
)
hashed = ph.hash("password")
verified = ph.verify(hashed, "password")
8. RBAC (Role-Based Access Control)¶
8.1 Role Checker Utility¶
from structum_lab.plugins.auth import RoleChecker
# Check singolo role
if RoleChecker.has_role(user, "admin"):
print("User is admin")
# Check multipli roles (OR logic)
if RoleChecker.has_any_role(user, ["admin", "moderator"]):
print("User is admin OR moderator")
# Check tutti roles (AND logic)
if RoleChecker.has_all_roles(user, ["editor", "reviewer"]):
print("User is editor AND reviewer")
8.2 Permission Decorator¶
from structum_lab.plugins.auth import require_permission
@require_permission("delete_posts")
def delete_post(post_id: str, current_user: User):
"""Solo admin possono chiamare questa funzione."""
# Delete logic
pass
# Uso
try:
delete_post("post_123", current_user)
except PermissionError:
print("User doesn't have delete_posts permission")
8.3 Advanced Permission System¶
class User(UserInterface):
# ... other fields...
ROLE_PERMISSIONS = {
"admin": ["*"], # Wildcard: tutti i permessi
"editor": [
"edit_posts",
"create_posts",
"delete_own_posts",
"view_posts"
],
"viewer": ["view_posts"]
}
def has_permission(self, permission: str) -> bool:
"""Sophisticated permission checking."""
for role in self.roles:
perms = self.ROLE_PERMISSIONS.get(role, [])
# Wildcard check
if "*" in perms:
return True
# Exact match
if permission in perms:
return True
# Glob pattern match (e.g., "delete_*")
if any(fnmatch(permission, p) for p in perms):
return True
return False
9. Configurazione¶
9.1 File di Configurazione Completo¶
File: config/app/auth.toml
[default]
# JWT Algorithm
ALGORITHM = "HS256" # HS256, HS384, HS512, RS256, RS384, RS512
# Token Expiration
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7
# Argon2 Parameters (opzionale, usa defaults se omesso)
[default.argon2]
time_cost = 3
memory_cost = 65536 # 64 MB
parallelism = 4
hash_len = 32
salt_len = 16
File: config/.secrets.toml
[auth]
# ⚠️ CRITICAL: Keep these secret!
SECRET_KEY = "your-256-bit-secret-key-here"
REFRESH_SECRET_KEY = "different-256-bit-secret-key-for-refresh"
9.2 Genera Secret Keys¶
# Generate secure random keys
python -c "import secrets; print(secrets.token_urlsafe(32))"
# Output: kR7vN2wP9sT4bL8xC1mQ6fH3jD5gK0aZ
# Generate another for refresh
python -c "import secrets; print(secrets.token_urlsafe(32))"
# Output: xY9zW2vU4tS7qP1oN3mL6kJ8hG5fD0cB
9.3 Environment Variable Override¶
# Override in production
export STRUCTUM_AUTH__SECRET_KEY="production-secret-key"
export STRUCTUM_AUTH__REFRESH_SECRET_KEY="production-refresh-key"
export STRUCTUM_AUTH__ACCESS_TOKEN_EXPIRE_MINUTES=30
10. Integrazione FastAPI¶
10.1 Middleware Setup¶
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from structum_lab.plugins.auth import JWTAuthProvider
app = FastAPI()
security = HTTPBearer()
auth = JWTAuthProvider.from_config()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> User:
"""
Dependency per estrarre current user da JWT.
Usa in routes come: current_user: User = Depends(get_current_user)
"""
token = credentials.credentials
try:
payload = auth.verify_access_token(token)
user_id = payload["sub"]
# Load user from database
user_repo = UserRepository(get_database())
user = user_repo.find_by_id(user_id)
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
return user
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Could not validate credentials: {e}"
)
@app.get("/protected")
async def protected_route(current_user: User = Depends(get_current_user)):
"""Route protetta - richiede autenticazione."""
return {"message": f"Hello {current_user.username}!"}
10.2 Login Endpoint¶
from pydantic import BaseModel
class LoginRequest(BaseModel):
username: str
password: str
class TokenResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str
expires_in: int
@app.post("/auth/login", response_model=TokenResponse)
async def login(credentials: LoginRequest):
"""Login endpoint."""
user_repo = UserRepository(get_database())
tokens = auth.authenticate(
credentials.username,
credentials.password,
user_repo
)
if not tokens:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password"
)
return TokenResponse(
access_token=tokens.access_token,
refresh_token=tokens.refresh_token,
token_type=tokens.token_type,
expires_in=tokens.expires_in
)
10.3 Refresh Endpoint¶
class RefreshRequest(BaseModel):
refresh_token: str
@app.post("/auth/refresh", response_model=TokenResponse)
async def refresh(request: RefreshRequest):
"""Refresh access token using refresh token."""
user_repo = UserRepository(get_database())
try:
new_tokens = auth.refresh_access_token(
request.refresh_token,
user_repo
)
return TokenResponse(
access_token=new_tokens.access_token,
refresh_token=new_tokens.refresh_token,
token_type=new_tokens.token_type,
expires_in=new_tokens.expires_in
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid refresh token: {e}"
)
10.4 Permission Guard¶
from typing import Callable
def require_permission(permission: str):
"""Decorator factory per permission checking."""
def decorator(func: Callable):
async def wrapper(*args, current_user: User = Depends(get_current_user), **kwargs):
if not current_user.has_permission(permission):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission '{permission}' required"
)
return await func(*args, current_user=current_user, **kwargs)
return wrapper
return decorator
@app.delete("/posts/{post_id}")
@require_permission("delete_posts")
async def delete_post(post_id: str, current_user: User = Depends(get_current_user)):
"""Solo admin possono delete posts."""
# Delete logic
return {"status": "deleted"}
11. Testing¶
11.1 Setup di Test¶
import pytest
from structum_lab.plugins.auth import JWTAuthProvider, hash_password
@pytest.fixture
def auth_provider():
"""Mock auth provider per testing."""
# Override config per testing
test_config = {
"auth": {
"SECRET_KEY": "test-secret-key",
"REFRESH_SECRET_KEY": "test-refresh-secret",
"ACCESS_TOKEN_EXPIRE_MINUTES": 15,
"REFRESH_TOKEN_EXPIRE_DAYS": 7,
"ALGORITHM": "HS256"
}
}
# Create provider with test config
return JWTAuthProvider(config=test_config)
@pytest.fixture
def test_user():
"""Mock test user."""
return User(
id="test_user_1",
username="testuser",
email="test@example.com",
hashed_password=hash_password("testpass123"),
roles=["user"]
)
@pytest.fixture
def test_user_repo(test_user):
"""Mock repository."""
return InMemoryUserRepository(users={"testuser": test_user})
11.2 Test Authentication¶
def test_successful_login(auth_provider, test_user_repo):
"""Test login con credenziali corrette."""
tokens = auth_provider.authenticate("testuser", "testpass123", test_user_repo)
assert tokens is not None
assert tokens.access_token
assert tokens.refresh_token
assert tokens.token_type == "Bearer"
assert tokens.expires_in > 0
def test_failed_login_wrong_password(auth_provider, test_user_repo):
"""Test login con password sbagliata."""
tokens = auth_provider.authenticate("testuser", "wrongpass", test_user_repo)
assert tokens is None
def test_failed_login_nonexistent_user(auth_provider, test_user_repo):
"""Test login con user inesistente."""
tokens = auth_provider.authenticate("nobody", "anypass", test_user_repo)
assert tokens is None
11.3 Test Token Verification¶
def test_verify_valid_token(auth_provider, test_user_repo):
"""Test verifica token valido."""
tokens = auth_provider.authenticate("testuser", "testpass123", test_user_repo)
payload = auth_provider.verify_access_token(tokens.access_token)
assert payload["sub"] == "test_user_1"
assert payload["username"] == "testuser"
assert "user" in payload["roles"]
def test_verify_invalid_token(auth_provider):
"""Test verifica token invalido."""
with pytest.raises(Exception):
auth_provider.verify_access_token("invalid.token.here")
def test_verify_expired_token(auth_provider, test_user_repo):
"""Test token scaduto."""
# Create provider con expiration immediata
fast_auth = JWTAuthProvider(
config={"auth": {"ACCESS_TOKEN_EXPIRE_MINUTES": 0}}
)
tokens = fast_auth.authenticate("testuser", "testpass123", test_user_repo)
import time
time.sleep(1) # Wait for expiration
with pytest.raises(Exception):
fast_auth.verify_access_token(tokens.access_token)
11.4 Test Password Hashing¶
def test_password_hashing():
"""Test hash e verify password."""
password = "SuperSecret123!"
# Hash
hashed = hash_password(password)
assert hashed != password
assert hashed.startswith("$argon2id$")
# Verify correct password
assert verify_password(password, hashed)
# Verify wrong password
assert not verify_password("WrongPassword", hashed)
12. Security Best Practices¶
12.1 Secret Key Management¶
❌ MAI fare questo:
# Hard-coded secrets
SECRET_KEY = "my-secret-123" # ❌ WRONG
✅ Approccio corretto:
# config/.secrets.toml (in .gitignore!)
[auth]
SECRET_KEY = "random-256-bit-key"
12.2 Password Requirements¶
from pydantic import BaseModel, validator
class UserRegistration(BaseModel):
username: str
password: str
@validator('password')
def validate_password(cls, v):
"""Enforce strong passwords."""
if len(v) < 12:
raise ValueError("Password must be at least 12 characters")
if not any(c.isupper() for c in v):
raise ValueError("Password must contain uppercase letter")
if not any(c.islower() for c in v):
raise ValueError("Password must contain lowercase letter")
if not any(c.isdigit() for c in v):
raise ValueError("Password must contain digit")
if not any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in v):
raise ValueError("Password must contain special character")
return v
12.3 Token Storage¶
Client-side (Browser):
✅ Access Token: Memory/SessionStorage (NOT LocalStorage)
✅ Refresh Token: HttpOnly Secure Cookie
Server-side:
✅ Secrets in environment variables o vault
❌ Mai in version control
12.4 HTTPS Only¶
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
# Force HTTPS in production
if os.getenv("ENV") == "production":
app.add_middleware(HTTPSRedirectMiddleware)
12.5 Rate Limiting¶
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
@app.post("/auth/login")
@limiter.limit("5/minute") # Max 5 login attempts per minute
async def login(request: Request, credentials: LoginRequest):
# Login logic
pass
13. API Reference¶
13.1 JWTAuthProvider¶
class JWTAuthProvider:
"""Main authentication provider."""
def __init__(
self,
secret_key: str,
refresh_secret_key: str,
algorithm: str = "HS256",
access_token_expire_minutes: int = 15,
refresh_token_expire_days: int = 7
):
...
@classmethod
def from_config(cls, namespace: str = "auth") -> "JWTAuthProvider":
"""Create from Structum config."""
...
def authenticate(
self,
username: str,
password: str,
user_repo: UserRepositoryInterface
) -> TokenPair | None:
"""Authenticate user and return tokens."""
...
def verify_access_token(self, token: str) -> dict:
"""Verify and decode access token."""
...
def refresh_access_token(
self,
refresh_token: str,
user_repo: UserRepositoryInterface
) -> TokenPair:
"""Get new access token from refresh token."""
...
13.2 Password Utilities¶
def hash_password(password: str) -> str:
"""Hash password using Argon2."""
...
def verify_password(password: str, hashed: str) -> bool:
"""Verify plain password against hash."""
...
13.3 RoleChecker¶
class RoleChecker:
"""Utility for role-based checking."""
@staticmethod
def has_role(user: UserInterface, role: str) -> bool:
"""Check if user has specific role."""
...
@staticmethod
def has_any_role(user: UserInterface, roles: list[str]) -> bool:
"""Check if user has ANY of the roles (OR logic)."""
...
@staticmethod
def has_all_roles(user: UserInterface, roles: list[str]) -> bool:
"""Check if user has ALL roles (AND logic)."""
...
See Also¶
Auth Module (Core) - Core interfaces
Dynaconf Plugin - Configuration provider
FastAPI Documentation - Web framework integration
Argon2 Documentation - Password hashing details