Files
org-parking/utils/auth_middleware.py
Stefano Manfredi 7168fa4b72 Refactor to manager-centric model, add team calendar for all users
Key changes:
- Removed office-centric model (deleted offices.py, office-rules)
- Renamed to team-rules, managers are part of their own team
- Team calendar visible to all (read-only for employees)
- Admins can have a manager assigned
2025-12-02 13:30:04 +00:00

182 lines
5.6 KiB
Python

"""
Authentication Middleware
JWT token validation and Authelia header authentication for protected routes
"""
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from datetime import datetime
import uuid
from database.connection import get_db
from database.models import User
from services.auth import decode_token, get_user_by_id, get_user_by_email
from app import config
security = HTTPBearer(auto_error=False)
def is_admin_from_groups(groups: list[str]) -> bool:
"""Check if user is admin based on Authelia groups"""
return config.AUTHELIA_ADMIN_GROUP in groups
def get_or_create_authelia_user(
email: str,
name: str,
groups: list[str],
db: Session
) -> User:
"""
Get existing user or create from Authelia headers.
Role logic:
- If user is in parking_admins group -> admin role (always synced from LLDAP)
- Otherwise -> keep existing role, or 'employee' for new users
Admin assigns manager role and user assignments via the UI.
"""
user = get_user_by_email(db, email)
is_admin = is_admin_from_groups(groups)
if user:
# Only sync admin status from LLDAP, other roles managed by app admin
if is_admin and user.role != "admin":
user.role = "admin"
user.updated_at = datetime.utcnow().isoformat()
db.commit()
db.refresh(user)
elif not is_admin and user.role == "admin":
# Removed from parking_admins group -> demote to employee
user.role = "employee"
user.updated_at = datetime.utcnow().isoformat()
db.commit()
db.refresh(user)
# Update name if changed
if user.name != name and name:
user.name = name
user.updated_at = datetime.utcnow().isoformat()
db.commit()
db.refresh(user)
return user
# Create new user from Authelia
user = User(
id=str(uuid.uuid4()),
email=email,
name=name or email.split("@")[0],
role="admin" if is_admin else "employee",
password_hash=None, # No password for Authelia users
created_at=datetime.utcnow().isoformat(),
updated_at=datetime.utcnow().isoformat()
)
db.add(user)
db.commit()
db.refresh(user)
return user
def get_current_user(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
"""
Extract and validate user from JWT token or Authelia headers.
Authelia mode takes precedence when enabled.
"""
# Authelia mode: trust headers from reverse proxy
if config.AUTHELIA_ENABLED:
remote_user = request.headers.get(config.AUTHELIA_HEADER_USER)
remote_email = request.headers.get(config.AUTHELIA_HEADER_EMAIL, remote_user)
remote_name = request.headers.get(config.AUTHELIA_HEADER_NAME, "")
remote_groups = request.headers.get(config.AUTHELIA_HEADER_GROUPS, "")
print(f"[Authelia] Headers: user={remote_user}, email={remote_email}, name={remote_name}, groups={remote_groups}")
if not remote_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated (Authelia headers missing)"
)
# Parse groups (comma-separated)
groups = [g.strip() for g in remote_groups.split(",") if g.strip()]
# Get or create user
return get_or_create_authelia_user(remote_email, remote_name, groups, db)
# JWT mode: validate Bearer token
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated"
)
token = credentials.credentials
payload = decode_token(token)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token"
)
user_id = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload"
)
user = get_user_by_id(db, user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found"
)
return user
def require_admin(user=Depends(get_current_user)):
"""Require admin role"""
if user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin role required"
)
return user
def require_manager_or_admin(user=Depends(get_current_user)):
"""Require manager or admin role"""
if user.role not in ["admin", "manager"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Manager or admin role required"
)
return user
def check_manager_access_to_user(current_user, target_user, db: Session) -> bool:
"""
Check if current_user (manager) has access to target_user.
Admins always have access. Managers can only access users they manage.
Returns True if access granted, raises HTTPException if not.
"""
if current_user.role == "admin":
return True
if current_user.role == "manager":
if target_user.manager_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User is not managed by you"
)
return True
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied"
)