Files
org-parking/app/routes/users.py
Stefano Manfredi ce9e2fdf2a fix landing page
2025-12-02 23:18:43 +00:00

396 lines
15 KiB
Python

"""
User Management Routes
Admin user CRUD and user self-service (profile, settings, password)
"""
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, EmailStr
from sqlalchemy.orm import Session
from database.connection import get_db
from database.models import User
from utils.auth_middleware import get_current_user, require_admin
from utils.helpers import (
generate_uuid, is_ldap_user, is_ldap_admin,
validate_password, format_password_errors, get_notification_default
)
from services.auth import hash_password, verify_password
from app import config
router = APIRouter(prefix="/api/users", tags=["users"])
# Request/Response Models
class UserCreate(BaseModel):
email: EmailStr
password: str
name: str | None = None
role: str = "employee"
manager_id: str | None = None
class UserUpdate(BaseModel):
name: str | None = None
role: str | None = None
manager_id: str | None = None
manager_parking_quota: int | None = None
manager_spot_prefix: str | None = None
class ProfileUpdate(BaseModel):
name: str | None = None
class SettingsUpdate(BaseModel):
week_start_day: int | None = None
# Notification preferences
notify_weekly_parking: int | None = None
notify_daily_parking: int | None = None
notify_daily_parking_hour: int | None = None
notify_daily_parking_minute: int | None = None
notify_parking_changes: int | None = None
class ChangePasswordRequest(BaseModel):
current_password: str
new_password: str
class UserResponse(BaseModel):
id: str
email: str
name: str | None
role: str
manager_id: str | None = None
manager_name: str | None = None
manager_parking_quota: int | None = None
manager_spot_prefix: str | None = None
managed_user_count: int | None = None
is_ldap_user: bool = False
is_ldap_admin: bool = False
created_at: str | None
class Config:
from_attributes = True
def user_to_response(user: User, db: Session, manager_lookup: dict = None, managed_counts: dict = None) -> dict:
"""
Convert user to response dict with computed fields.
Args:
user: The user to convert
db: Database session
manager_lookup: Optional pre-fetched dict of manager_id -> name (for batch operations)
managed_counts: Optional pre-fetched dict of user_id -> managed_user_count (for batch operations)
"""
# Get manager name - use lookup if available, otherwise query
manager_name = None
if user.manager_id:
if manager_lookup is not None:
manager_name = manager_lookup.get(user.manager_id)
else:
manager = db.query(User).filter(User.id == user.manager_id).first()
if manager:
manager_name = manager.name
# Count managed users if this user is a manager
managed_user_count = None
if user.role == "manager":
if managed_counts is not None:
managed_user_count = managed_counts.get(user.id, 0)
else:
managed_user_count = db.query(User).filter(User.manager_id == user.id).count()
return {
"id": user.id,
"email": user.email,
"name": user.name,
"role": user.role,
"manager_id": user.manager_id,
"manager_name": manager_name,
"manager_parking_quota": user.manager_parking_quota,
"manager_spot_prefix": user.manager_spot_prefix,
"managed_user_count": managed_user_count,
"is_ldap_user": is_ldap_user(user),
"is_ldap_admin": is_ldap_admin(user),
"created_at": user.created_at
}
# Admin Routes
@router.get("")
def list_users(db: Session = Depends(get_db), user=Depends(require_admin)):
"""List all users (admin only)"""
users = db.query(User).all()
# Build lookups to avoid N+1 queries
# Manager lookup: id -> name
manager_ids = list(set(u.manager_id for u in users if u.manager_id))
managers = db.query(User).filter(User.id.in_(manager_ids)).all() if manager_ids else []
manager_lookup = {m.id: m.name for m in managers}
# Managed user counts for managers
from sqlalchemy import func
manager_user_ids = [u.id for u in users if u.role == "manager"]
if manager_user_ids:
counts = db.query(User.manager_id, func.count(User.id)).filter(
User.manager_id.in_(manager_user_ids)
).group_by(User.manager_id).all()
managed_counts = {manager_id: count for manager_id, count in counts}
else:
managed_counts = {}
return [user_to_response(u, db, manager_lookup, managed_counts) for u in users]
@router.get("/{user_id}")
def get_user(user_id: str, db: Session = Depends(get_db), user=Depends(require_admin)):
"""Get user by ID (admin only)"""
target = db.query(User).filter(User.id == user_id).first()
if not target:
raise HTTPException(status_code=404, detail="User not found")
return user_to_response(target, db)
@router.post("")
def create_user(data: UserCreate, db: Session = Depends(get_db), user=Depends(require_admin)):
"""Create new user (admin only) - only for non-LDAP mode"""
if config.AUTHELIA_ENABLED:
raise HTTPException(status_code=400, detail="User creation disabled in LDAP mode. Users are created on first login.")
if db.query(User).filter(User.email == data.email).first():
raise HTTPException(status_code=400, detail="Email already registered")
if data.role not in ["admin", "manager", "employee"]:
raise HTTPException(status_code=400, detail="Invalid role")
# Validate password strength
password_errors = validate_password(data.password)
if password_errors:
raise HTTPException(status_code=400, detail=format_password_errors(password_errors))
if data.manager_id:
manager = db.query(User).filter(User.id == data.manager_id).first()
if not manager or manager.role != "manager":
raise HTTPException(status_code=400, detail="Invalid manager")
new_user = User(
id=generate_uuid(),
email=data.email,
password_hash=hash_password(data.password),
name=data.name,
role=data.role,
manager_id=data.manager_id,
created_at=datetime.utcnow().isoformat()
)
db.add(new_user)
db.commit()
db.refresh(new_user)
config.logger.info(f"Admin created new user: {data.email}")
return user_to_response(new_user, db)
@router.put("/{user_id}")
def update_user(user_id: str, data: UserUpdate, db: Session = Depends(get_db), user=Depends(require_admin)):
"""Update user (admin only)"""
target = db.query(User).filter(User.id == user_id).first()
if not target:
raise HTTPException(status_code=404, detail="User not found")
# Check if user is LDAP-managed
target_is_ldap = is_ldap_user(target)
target_is_ldap_admin = is_ldap_admin(target)
# Name update - blocked for LDAP users
if data.name is not None:
if target_is_ldap:
raise HTTPException(status_code=400, detail="Name is managed by LDAP")
target.name = data.name
# Role update
if data.role is not None:
if data.role not in ["admin", "manager", "employee"]:
raise HTTPException(status_code=400, detail="Invalid role")
# Can't change admin role for LDAP admins (they get admin from parking_admins group)
if target_is_ldap_admin and data.role != "admin":
raise HTTPException(status_code=400, detail="Admin role is managed by LDAP group (parking_admins)")
# If changing from manager to another role, check for managed users
if target.role == "manager" and data.role != "manager":
managed_count = db.query(User).filter(User.manager_id == user_id).count()
if managed_count > 0:
raise HTTPException(status_code=400, detail=f"Cannot change role: {managed_count} users are assigned to this manager")
# Clear manager-specific fields
target.manager_parking_quota = 0
target.manager_spot_prefix = None
target.role = data.role
# Manager assignment (any user including admins can be assigned to a manager)
if data.manager_id is not None:
if data.manager_id:
manager = db.query(User).filter(User.id == data.manager_id).first()
if not manager or manager.role != "manager":
raise HTTPException(status_code=400, detail="Invalid manager")
if data.manager_id == user_id:
raise HTTPException(status_code=400, detail="User cannot be their own manager")
target.manager_id = data.manager_id if data.manager_id else None
# Manager-specific fields
if data.manager_parking_quota is not None:
if target.role != "manager":
raise HTTPException(status_code=400, detail="Parking quota only for managers")
target.manager_parking_quota = data.manager_parking_quota
if data.manager_spot_prefix is not None:
if target.role != "manager":
raise HTTPException(status_code=400, detail="Spot prefix only for managers")
prefix = data.manager_spot_prefix.upper() if data.manager_spot_prefix else None
if prefix and not prefix.isalpha():
raise HTTPException(status_code=400, detail="Spot prefix must be a letter")
# Check for duplicate prefix
if prefix:
existing = db.query(User).filter(
User.manager_spot_prefix == prefix,
User.id != user_id
).first()
if existing:
raise HTTPException(status_code=400, detail=f"Spot prefix '{prefix}' is already used by another manager")
target.manager_spot_prefix = prefix
target.updated_at = datetime.utcnow().isoformat()
db.commit()
db.refresh(target)
return user_to_response(target, db)
@router.delete("/{user_id}")
def delete_user(user_id: str, db: Session = Depends(get_db), current_user=Depends(require_admin)):
"""Delete user (admin only)"""
if user_id == current_user.id:
raise HTTPException(status_code=400, detail="Cannot delete yourself")
target = db.query(User).filter(User.id == user_id).first()
if not target:
raise HTTPException(status_code=404, detail="User not found")
# Check if user is a manager with managed users
if target.role == "manager":
managed_count = db.query(User).filter(User.manager_id == user_id).count()
if managed_count > 0:
raise HTTPException(status_code=400, detail=f"Cannot delete: {managed_count} users are assigned to this manager")
db.delete(target)
db.commit()
return {"message": "User deleted"}
# Self-service Routes
@router.get("/me/profile")
def get_profile(db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Get current user's profile"""
# Get manager name
manager_name = None
if current_user.manager_id:
manager = db.query(User).filter(User.id == current_user.manager_id).first()
if manager:
manager_name = manager.name
return {
"id": current_user.id,
"email": current_user.email,
"name": current_user.name,
"role": current_user.role,
"manager_id": current_user.manager_id,
"manager_name": manager_name,
"is_ldap_user": is_ldap_user(current_user)
}
@router.put("/me/profile")
def update_profile(data: ProfileUpdate, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Update current user's profile (limited fields)"""
if data.name is not None:
if is_ldap_user(current_user):
raise HTTPException(status_code=400, detail="Name is managed by LDAP")
current_user.name = data.name
current_user.updated_at = datetime.utcnow().isoformat()
db.commit()
return {"message": "Profile updated"}
@router.get("/me/settings")
def get_settings(current_user=Depends(get_current_user)):
"""Get current user's settings"""
return {
"week_start_day": get_notification_default(current_user.week_start_day, 0),
"notify_weekly_parking": get_notification_default(current_user.notify_weekly_parking, 1),
"notify_daily_parking": get_notification_default(current_user.notify_daily_parking, 1),
"notify_daily_parking_hour": get_notification_default(current_user.notify_daily_parking_hour, 8),
"notify_daily_parking_minute": get_notification_default(current_user.notify_daily_parking_minute, 0),
"notify_parking_changes": get_notification_default(current_user.notify_parking_changes, 1)
}
@router.put("/me/settings")
def update_settings(data: SettingsUpdate, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Update current user's settings"""
if data.week_start_day is not None:
if data.week_start_day not in [0, 1]:
raise HTTPException(status_code=400, detail="Week start must be 0 (Sunday) or 1 (Monday)")
current_user.week_start_day = data.week_start_day
# Notification preferences
if data.notify_weekly_parking is not None:
current_user.notify_weekly_parking = data.notify_weekly_parking
if data.notify_daily_parking is not None:
current_user.notify_daily_parking = data.notify_daily_parking
if data.notify_daily_parking_hour is not None:
if data.notify_daily_parking_hour < 0 or data.notify_daily_parking_hour > 23:
raise HTTPException(status_code=400, detail="Hour must be 0-23")
current_user.notify_daily_parking_hour = data.notify_daily_parking_hour
if data.notify_daily_parking_minute is not None:
if data.notify_daily_parking_minute < 0 or data.notify_daily_parking_minute > 59:
raise HTTPException(status_code=400, detail="Minute must be 0-59")
current_user.notify_daily_parking_minute = data.notify_daily_parking_minute
if data.notify_parking_changes is not None:
current_user.notify_parking_changes = data.notify_parking_changes
current_user.updated_at = datetime.utcnow().isoformat()
db.commit()
return {
"message": "Settings updated",
"week_start_day": current_user.week_start_day,
"notify_weekly_parking": current_user.notify_weekly_parking,
"notify_daily_parking": current_user.notify_daily_parking,
"notify_daily_parking_hour": current_user.notify_daily_parking_hour,
"notify_daily_parking_minute": current_user.notify_daily_parking_minute,
"notify_parking_changes": current_user.notify_parking_changes
}
@router.post("/me/change-password")
def change_password(data: ChangePasswordRequest, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Change current user's password (not available in LDAP mode)"""
if is_ldap_user(current_user):
raise HTTPException(status_code=400, detail="Password is managed by LDAP")
if not verify_password(data.current_password, current_user.password_hash):
raise HTTPException(status_code=400, detail="Current password is incorrect")
# Validate new password
password_errors = validate_password(data.new_password)
if password_errors:
raise HTTPException(status_code=400, detail=format_password_errors(password_errors))
current_user.password_hash = hash_password(data.new_password)
current_user.updated_at = datetime.utcnow().isoformat()
db.commit()
config.logger.info(f"User {current_user.email} changed password")
return {"message": "Password changed"}