Files
org-parking/app/routes/users.py
Stefano Manfredi 7168fa4b72 Refactor to manager-centric model, add team calendar for all users
Key changes:
- Removed office-centric model (deleted offices.py, office-rules)
- Renamed to team-rules, managers are part of their own team
- Team calendar visible to all (read-only for employees)
- Admins can have a manager assigned
2025-12-02 13:30:04 +00:00

374 lines
14 KiB
Python

"""
User Management Routes
Admin user CRUD and user self-service (profile, settings, password)
"""
from typing import List
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, EmailStr
from sqlalchemy.orm import Session
import uuid
import re
from database.connection import get_db
from database.models import User
from utils.auth_middleware import get_current_user, require_admin
from services.auth import hash_password, verify_password
from app import config
router = APIRouter(prefix="/api/users", tags=["users"])
# Request/Response Models
class UserCreate(BaseModel):
email: EmailStr
password: str
name: str | None = None
role: str = "employee"
manager_id: str | None = None
class UserUpdate(BaseModel):
name: str | None = None
role: str | None = None
manager_id: str | None = None
manager_parking_quota: int | None = None
manager_spot_prefix: str | None = None
class ProfileUpdate(BaseModel):
name: str | None = None
class SettingsUpdate(BaseModel):
week_start_day: int | None = None
# Notification preferences
notify_weekly_parking: int | None = None
notify_daily_parking: int | None = None
notify_daily_parking_hour: int | None = None
notify_daily_parking_minute: int | None = None
notify_parking_changes: int | None = None
class ChangePasswordRequest(BaseModel):
current_password: str
new_password: str
class UserResponse(BaseModel):
id: str
email: str
name: str | None
role: str
manager_id: str | None = None
manager_name: str | None = None
manager_parking_quota: int | None = None
manager_spot_prefix: str | None = None
managed_user_count: int | None = None
is_ldap_user: bool = False
is_ldap_admin: bool = False
created_at: str | None
class Config:
from_attributes = True
def user_to_response(user: User, db: Session) -> dict:
"""Convert user to response dict with computed fields"""
# Get manager name if user has a manager
manager_name = None
if user.manager_id:
manager = db.query(User).filter(User.id == user.manager_id).first()
if manager:
manager_name = manager.name
# Count managed users if this user is a manager
managed_user_count = None
if user.role == "manager":
managed_user_count = db.query(User).filter(User.manager_id == user.id).count()
# Determine if user is LDAP-managed
is_ldap_user = config.AUTHELIA_ENABLED and user.password_hash is None
is_ldap_admin = is_ldap_user and user.role == "admin"
return {
"id": user.id,
"email": user.email,
"name": user.name,
"role": user.role,
"manager_id": user.manager_id,
"manager_name": manager_name,
"manager_parking_quota": user.manager_parking_quota,
"manager_spot_prefix": user.manager_spot_prefix,
"managed_user_count": managed_user_count,
"is_ldap_user": is_ldap_user,
"is_ldap_admin": is_ldap_admin,
"created_at": user.created_at
}
# Admin Routes
@router.get("")
def list_users(db: Session = Depends(get_db), user=Depends(require_admin)):
"""List all users (admin only)"""
users = db.query(User).all()
return [user_to_response(u, db) for u in users]
@router.get("/{user_id}")
def get_user(user_id: str, db: Session = Depends(get_db), user=Depends(require_admin)):
"""Get user by ID (admin only)"""
target = db.query(User).filter(User.id == user_id).first()
if not target:
raise HTTPException(status_code=404, detail="User not found")
return user_to_response(target, db)
@router.post("")
def create_user(data: UserCreate, db: Session = Depends(get_db), user=Depends(require_admin)):
"""Create new user (admin only) - only for non-LDAP mode"""
if config.AUTHELIA_ENABLED:
raise HTTPException(status_code=400, detail="User creation disabled in LDAP mode. Users are created on first login.")
if db.query(User).filter(User.email == data.email).first():
raise HTTPException(status_code=400, detail="Email already registered")
if data.role not in ["admin", "manager", "employee"]:
raise HTTPException(status_code=400, detail="Invalid role")
if data.manager_id:
manager = db.query(User).filter(User.id == data.manager_id).first()
if not manager or manager.role != "manager":
raise HTTPException(status_code=400, detail="Invalid manager")
new_user = User(
id=str(uuid.uuid4()),
email=data.email,
password_hash=hash_password(data.password),
name=data.name,
role=data.role,
manager_id=data.manager_id,
created_at=datetime.utcnow().isoformat()
)
db.add(new_user)
db.commit()
db.refresh(new_user)
return user_to_response(new_user, db)
@router.put("/{user_id}")
def update_user(user_id: str, data: UserUpdate, db: Session = Depends(get_db), user=Depends(require_admin)):
"""Update user (admin only)"""
target = db.query(User).filter(User.id == user_id).first()
if not target:
raise HTTPException(status_code=404, detail="User not found")
# Check if user is LDAP-managed
is_ldap_user = config.AUTHELIA_ENABLED and target.password_hash is None
is_ldap_admin = is_ldap_user and target.role == "admin"
# Name update - blocked for LDAP users
if data.name is not None:
if is_ldap_user:
raise HTTPException(status_code=400, detail="Name is managed by LDAP")
target.name = data.name
# Role update
if data.role is not None:
if data.role not in ["admin", "manager", "employee"]:
raise HTTPException(status_code=400, detail="Invalid role")
# Can't change admin role for LDAP admins (they get admin from parking_admins group)
if is_ldap_admin and data.role != "admin":
raise HTTPException(status_code=400, detail="Admin role is managed by LDAP group (parking_admins)")
# If changing from manager to another role, check for managed users
if target.role == "manager" and data.role != "manager":
managed_count = db.query(User).filter(User.manager_id == user_id).count()
if managed_count > 0:
raise HTTPException(status_code=400, detail=f"Cannot change role: {managed_count} users are assigned to this manager")
# Clear manager-specific fields
target.manager_parking_quota = 0
target.manager_spot_prefix = None
target.role = data.role
# Manager assignment (any user including admins can be assigned to a manager)
if data.manager_id is not None:
if data.manager_id:
manager = db.query(User).filter(User.id == data.manager_id).first()
if not manager or manager.role != "manager":
raise HTTPException(status_code=400, detail="Invalid manager")
if data.manager_id == user_id:
raise HTTPException(status_code=400, detail="User cannot be their own manager")
target.manager_id = data.manager_id if data.manager_id else None
# Manager-specific fields
if data.manager_parking_quota is not None:
if target.role != "manager":
raise HTTPException(status_code=400, detail="Parking quota only for managers")
target.manager_parking_quota = data.manager_parking_quota
if data.manager_spot_prefix is not None:
if target.role != "manager":
raise HTTPException(status_code=400, detail="Spot prefix only for managers")
prefix = data.manager_spot_prefix.upper() if data.manager_spot_prefix else None
if prefix and not prefix.isalpha():
raise HTTPException(status_code=400, detail="Spot prefix must be a letter")
# Check for duplicate prefix
if prefix:
existing = db.query(User).filter(
User.manager_spot_prefix == prefix,
User.id != user_id
).first()
if existing:
raise HTTPException(status_code=400, detail=f"Spot prefix '{prefix}' is already used by another manager")
target.manager_spot_prefix = prefix
target.updated_at = datetime.utcnow().isoformat()
db.commit()
db.refresh(target)
return user_to_response(target, db)
@router.delete("/{user_id}")
def delete_user(user_id: str, db: Session = Depends(get_db), current_user=Depends(require_admin)):
"""Delete user (admin only)"""
if user_id == current_user.id:
raise HTTPException(status_code=400, detail="Cannot delete yourself")
target = db.query(User).filter(User.id == user_id).first()
if not target:
raise HTTPException(status_code=404, detail="User not found")
# Check if user is a manager with managed users
if target.role == "manager":
managed_count = db.query(User).filter(User.manager_id == user_id).count()
if managed_count > 0:
raise HTTPException(status_code=400, detail=f"Cannot delete: {managed_count} users are assigned to this manager")
db.delete(target)
db.commit()
return {"message": "User deleted"}
# Self-service Routes
@router.get("/me/profile")
def get_profile(db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Get current user's profile"""
is_ldap_user = config.AUTHELIA_ENABLED and current_user.password_hash is None
# Get manager name
manager_name = None
if current_user.manager_id:
manager = db.query(User).filter(User.id == current_user.manager_id).first()
if manager:
manager_name = manager.name
return {
"id": current_user.id,
"email": current_user.email,
"name": current_user.name,
"role": current_user.role,
"manager_id": current_user.manager_id,
"manager_name": manager_name,
"is_ldap_user": is_ldap_user
}
@router.put("/me/profile")
def update_profile(data: ProfileUpdate, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Update current user's profile (limited fields)"""
is_ldap_user = config.AUTHELIA_ENABLED and current_user.password_hash is None
if data.name is not None:
if is_ldap_user:
raise HTTPException(status_code=400, detail="Name is managed by LDAP")
current_user.name = data.name
current_user.updated_at = datetime.utcnow().isoformat()
db.commit()
return {"message": "Profile updated"}
@router.get("/me/settings")
def get_settings(current_user=Depends(get_current_user)):
"""Get current user's settings"""
return {
"week_start_day": current_user.week_start_day or 0,
"notify_weekly_parking": current_user.notify_weekly_parking if current_user.notify_weekly_parking is not None else 1,
"notify_daily_parking": current_user.notify_daily_parking if current_user.notify_daily_parking is not None else 1,
"notify_daily_parking_hour": current_user.notify_daily_parking_hour if current_user.notify_daily_parking_hour is not None else 8,
"notify_daily_parking_minute": current_user.notify_daily_parking_minute if current_user.notify_daily_parking_minute is not None else 0,
"notify_parking_changes": current_user.notify_parking_changes if current_user.notify_parking_changes is not None else 1
}
@router.put("/me/settings")
def update_settings(data: SettingsUpdate, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Update current user's settings"""
if data.week_start_day is not None:
if data.week_start_day not in [0, 1]:
raise HTTPException(status_code=400, detail="Week start must be 0 (Sunday) or 1 (Monday)")
current_user.week_start_day = data.week_start_day
# Notification preferences
if data.notify_weekly_parking is not None:
current_user.notify_weekly_parking = data.notify_weekly_parking
if data.notify_daily_parking is not None:
current_user.notify_daily_parking = data.notify_daily_parking
if data.notify_daily_parking_hour is not None:
if data.notify_daily_parking_hour < 0 or data.notify_daily_parking_hour > 23:
raise HTTPException(status_code=400, detail="Hour must be 0-23")
current_user.notify_daily_parking_hour = data.notify_daily_parking_hour
if data.notify_daily_parking_minute is not None:
if data.notify_daily_parking_minute < 0 or data.notify_daily_parking_minute > 59:
raise HTTPException(status_code=400, detail="Minute must be 0-59")
current_user.notify_daily_parking_minute = data.notify_daily_parking_minute
if data.notify_parking_changes is not None:
current_user.notify_parking_changes = data.notify_parking_changes
current_user.updated_at = datetime.utcnow().isoformat()
db.commit()
return {
"message": "Settings updated",
"week_start_day": current_user.week_start_day,
"notify_weekly_parking": current_user.notify_weekly_parking,
"notify_daily_parking": current_user.notify_daily_parking,
"notify_daily_parking_hour": current_user.notify_daily_parking_hour,
"notify_daily_parking_minute": current_user.notify_daily_parking_minute,
"notify_parking_changes": current_user.notify_parking_changes
}
@router.post("/me/change-password")
def change_password(data: ChangePasswordRequest, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Change current user's password (not available in LDAP mode)"""
if config.AUTHELIA_ENABLED and current_user.password_hash is None:
raise HTTPException(status_code=400, detail="Password is managed by LDAP")
if not verify_password(data.current_password, current_user.password_hash):
raise HTTPException(status_code=400, detail="Current password is incorrect")
# Validate new password
password = data.new_password
errors = []
if len(password) < 8:
errors.append("at least 8 characters")
if not re.search(r'[A-Z]', password):
errors.append("one uppercase letter")
if not re.search(r'[a-z]', password):
errors.append("one lowercase letter")
if not re.search(r'[0-9]', password):
errors.append("one number")
if errors:
raise HTTPException(status_code=400, detail=f"Password must contain: {', '.join(errors)}")
current_user.password_hash = hash_password(password)
current_user.updated_at = datetime.utcnow().isoformat()
db.commit()
return {"message": "Password changed"}