""" Authentication Middleware JWT token validation and Authelia header authentication for protected routes """ from fastapi import Depends, HTTPException, status, Request from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm import Session from datetime import datetime import uuid from database.connection import get_db from database.models import User from services.auth import decode_token, get_user_by_id, get_user_by_email from app import config security = HTTPBearer(auto_error=False) def is_admin_from_groups(groups: list[str]) -> bool: """Check if user is admin based on Authelia groups""" return config.AUTHELIA_ADMIN_GROUP in groups def get_or_create_authelia_user( email: str, name: str, groups: list[str], db: Session ) -> User: """ Get existing user or create from Authelia headers. Role logic: - If user is in parking_admins group -> admin role (always synced from LLDAP) - Otherwise -> keep existing role, or 'employee' for new users Admin assigns manager role and user assignments via the UI. """ user = get_user_by_email(db, email) is_admin = is_admin_from_groups(groups) if user: # Only sync admin status from LLDAP, other roles managed by app admin if is_admin and user.role != "admin": user.role = "admin" user.updated_at = datetime.utcnow().isoformat() db.commit() db.refresh(user) elif not is_admin and user.role == "admin": # Removed from parking_admins group -> demote to employee user.role = "employee" user.updated_at = datetime.utcnow().isoformat() db.commit() db.refresh(user) # Update name if changed if user.name != name and name: user.name = name user.updated_at = datetime.utcnow().isoformat() db.commit() db.refresh(user) return user # Create new user from Authelia user = User( id=str(uuid.uuid4()), email=email, name=name or email.split("@")[0], role="admin" if is_admin else "employee", password_hash=None, # No password for Authelia users created_at=datetime.utcnow().isoformat(), updated_at=datetime.utcnow().isoformat() ) db.add(user) db.commit() db.refresh(user) return user def get_current_user( request: Request, credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ): """ Extract and validate user from JWT token or Authelia headers. Authelia mode takes precedence when enabled. """ # Authelia mode: trust headers from reverse proxy if config.AUTHELIA_ENABLED: remote_user = request.headers.get(config.AUTHELIA_HEADER_USER) remote_email = request.headers.get(config.AUTHELIA_HEADER_EMAIL, remote_user) remote_name = request.headers.get(config.AUTHELIA_HEADER_NAME, "") remote_groups = request.headers.get(config.AUTHELIA_HEADER_GROUPS, "") print(f"[Authelia] Headers: user={remote_user}, email={remote_email}, name={remote_name}, groups={remote_groups}") if not remote_user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated (Authelia headers missing)" ) # Parse groups (comma-separated) groups = [g.strip() for g in remote_groups.split(",") if g.strip()] # Get or create user return get_or_create_authelia_user(remote_email, remote_name, groups, db) # JWT mode: validate Bearer token if not credentials: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated" ) token = credentials.credentials payload = decode_token(token) if not payload: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token" ) user_id = payload.get("sub") if not user_id: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token payload" ) user = get_user_by_id(db, user_id) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found" ) return user def require_admin(user=Depends(get_current_user)): """Require admin role""" if user.role != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required" ) return user def require_manager_or_admin(user=Depends(get_current_user)): """Require manager or admin role""" if user.role not in ["admin", "manager"]: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Manager or admin role required" ) return user def check_manager_access_to_user(current_user, target_user, db: Session) -> bool: """ Check if current_user (manager) has access to target_user. Admins always have access. Managers can only access users they manage. Returns True if access granted, raises HTTPException if not. """ if current_user.role == "admin": return True if current_user.role == "manager": if target_user.manager_id != current_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User is not managed by you" ) return True raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Access denied" )