""" Office Management Routes Office settings, closing days, guarantees, and exclusions """ from datetime import datetime, date from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from sqlalchemy.orm import Session from sqlalchemy import func from database.connection import get_db from database.models import ( User, Office, OfficeClosingDay, OfficeWeeklyClosingDay, ParkingGuarantee, ParkingExclusion, UserRole ) from utils.auth_middleware import require_admin, require_manager_or_admin, get_current_user from utils.helpers import generate_uuid from app import config router = APIRouter(prefix="/api/offices", tags=["offices"]) # Request/Response Models class ValidOfficeCreate(BaseModel): name: str parking_quota: int = 0 class ClosingDayCreate(BaseModel): date: date # Start date end_date: date | None = None # Optional end date (inclusive) reason: str | None = None class WeeklyClosingDayCreate(BaseModel): weekday: int # 0=Sunday, 1=Monday, ..., 6=Saturday class GuaranteeCreate(BaseModel): user_id: str start_date: date | None = None end_date: date | None = None notes: str | None = None class ExclusionCreate(BaseModel): user_id: str start_date: date | None = None end_date: date | None = None notes: str | None = None class OfficeSettingsUpdate(BaseModel): parking_quota: int | None = None name: str | None = None booking_window_enabled: bool | None = None booking_window_end_hour: int | None = None booking_window_end_minute: int | None = None # Helper check def check_office_access(user: User, office_id: str): if user.role == UserRole.ADMIN: return True if user.role == UserRole.MANAGER and user.office_id == office_id: return True raise HTTPException(status_code=403, detail="Access denied to this office") # Office listing and details @router.get("") def list_offices(db: Session = Depends(get_db), user=Depends(require_manager_or_admin)): """Get all offices with their user count and parking quota""" offices = db.query(Office).all() # Batch query user counts counts = db.query(User.office_id, func.count(User.id)).filter( User.office_id.isnot(None) ).group_by(User.office_id).all() user_counts = {office_id: count for office_id, count in counts} return [ { "id": office.id, "name": office.name, "parking_quota": office.parking_quota, "spot_prefix": office.spot_prefix, "user_count": user_counts.get(office.id, 0) } for office in offices ] def get_next_available_prefix(db: Session) -> str: """Find the next available office prefix (A, B, C... AA, AB...)""" existing = db.query(Office.spot_prefix).filter(Office.spot_prefix.isnot(None)).all() used_prefixes = {row[0] for row in existing} # Try single letters A-Z for i in range(26): char = chr(65 + i) if char not in used_prefixes: return char # Try double letters AA-ZZ if needed for i in range(26): for j in range(26): char = chr(65 + i) + chr(65 + j) if char not in used_prefixes: return char raise HTTPException(status_code=400, detail="No more office prefixes available") @router.post("") def create_office(data: ValidOfficeCreate, db: Session = Depends(get_db), user=Depends(require_admin)): """Create a new office (admin only)""" office = Office( id=generate_uuid(), name=data.name, parking_quota=data.parking_quota, spot_prefix=get_next_available_prefix(db), created_at=datetime.utcnow() ) db.add(office) db.commit() return office @router.get("/{office_id}") def get_office_details(office_id: str, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)): """Get office details""" office = db.query(Office).filter(Office.id == office_id).first() if not office: raise HTTPException(status_code=404, detail="Office not found") check_office_access(user, office_id) user_count = db.query(User).filter(User.office_id == office_id).count() return { "id": office.id, "name": office.name, "parking_quota": office.parking_quota, "spot_prefix": office.spot_prefix, "user_count": user_count, "booking_window_enabled": office.booking_window_enabled, "booking_window_end_hour": office.booking_window_end_hour, "booking_window_end_minute": office.booking_window_end_minute } @router.put("/{office_id}") def update_office_settings(office_id: str, data: OfficeSettingsUpdate, db: Session = Depends(get_db), user=Depends(require_admin)): """Update office settings (admin only) - Manager can view but usually Admin sets quota""" # Verify access - currently assume admin manages quota. If manager should too, update logic. # User request description: "Admin manage all offices with CRUD... rimodulare posti auto". # So Managers might not edit quota? Or maybe they can? # Keeping it simple: require_admin for structural changes. office = db.query(Office).filter(Office.id == office_id).first() if not office: raise HTTPException(status_code=404, detail="Office not found") if data.name: office.name = data.name if data.parking_quota is not None: if data.parking_quota < 0: raise HTTPException(status_code=400, detail="Parking quota must be non-negative") office.parking_quota = data.parking_quota if data.booking_window_enabled is not None: office.booking_window_enabled = data.booking_window_enabled if data.booking_window_end_hour is not None: if not (0 <= data.booking_window_end_hour <= 23): raise HTTPException(status_code=400, detail="Hour must be 0-23") office.booking_window_end_hour = data.booking_window_end_hour if data.booking_window_end_minute is not None: if not (0 <= data.booking_window_end_minute <= 59): raise HTTPException(status_code=400, detail="Minute must be 0-59") office.booking_window_end_minute = data.booking_window_end_minute office.updated_at = datetime.utcnow() db.commit() return { "id": office.id, "name": office.name, "parking_quota": office.parking_quota, "spot_prefix": office.spot_prefix, "booking_window_enabled": office.booking_window_enabled, "booking_window_end_hour": office.booking_window_end_hour, "booking_window_end_minute": office.booking_window_end_minute } @router.delete("/{office_id}") def delete_office(office_id: str, db: Session = Depends(get_db), user=Depends(require_admin)): """Delete an office (admin only)""" office = db.query(Office).filter(Office.id == office_id).first() if not office: raise HTTPException(status_code=404, detail="Office not found") db.delete(office) db.commit() return {"message": "Office deleted"} @router.get("/{office_id}/users") def get_office_users(office_id: str, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)): """Get all users in an office""" check_office_access(user, office_id) office = db.query(Office).filter(Office.id == office_id).first() if not office: raise HTTPException(status_code=404, detail="Office not found") users = db.query(User).filter(User.office_id == office_id).all() return [{"id": u.id, "name": u.name, "email": u.email, "role": u.role} for u in users] # Closing days @router.get("/{office_id}/closing-days") def get_office_closing_days(office_id: str, db: Session = Depends(get_db), user=Depends(get_current_user)): """Get closing days for an office""" # Any user in the office can read closing days? Or just manager? # check_office_access(user, office_id) # Let's allow read for all authenticated (frontend might need it) days = db.query(OfficeClosingDay).filter( OfficeClosingDay.office_id == office_id ).order_by(OfficeClosingDay.date).all() return [{"id": d.id, "date": d.date, "end_date": d.end_date, "reason": d.reason} for d in days] @router.post("/{office_id}/closing-days") def add_office_closing_day(office_id: str, data: ClosingDayCreate, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)): """Add a closing day for an office""" check_office_access(user, office_id) office = db.query(Office).filter(Office.id == office_id).first() if not office: raise HTTPException(status_code=404, detail="Office not found") existing = db.query(OfficeClosingDay).filter( OfficeClosingDay.office_id == office_id, OfficeClosingDay.date == data.date ).first() if existing: raise HTTPException(status_code=400, detail="Closing day already exists for this date") if data.end_date and data.end_date < data.date: raise HTTPException(status_code=400, detail="End date must be after start date") closing_day = OfficeClosingDay( id=generate_uuid(), office_id=office_id, date=data.date, end_date=data.end_date, reason=data.reason ) db.add(closing_day) db.commit() return {"id": closing_day.id, "message": "Closing day added"} @router.delete("/{office_id}/closing-days/{closing_day_id}") def remove_office_closing_day(office_id: str, closing_day_id: str, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)): """Remove a closing day for an office""" check_office_access(user, office_id) closing_day = db.query(OfficeClosingDay).filter( OfficeClosingDay.id == closing_day_id, OfficeClosingDay.office_id == office_id ).first() if not closing_day: raise HTTPException(status_code=404, detail="Closing day not found") db.delete(closing_day) db.commit() return {"message": "Closing day removed"} # Weekly closing days @router.get("/{office_id}/weekly-closing-days") def get_office_weekly_closing_days(office_id: str, db: Session = Depends(get_db), user=Depends(get_current_user)): """Get weekly closing days for an office""" days = db.query(OfficeWeeklyClosingDay).filter( OfficeWeeklyClosingDay.office_id == office_id ).all() return [{"id": d.id, "weekday": d.weekday} for d in days] @router.post("/{office_id}/weekly-closing-days") def add_office_weekly_closing_day(office_id: str, data: WeeklyClosingDayCreate, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)): """Add a weekly closing day for an office""" check_office_access(user, office_id) office = db.query(Office).filter(Office.id == office_id).first() if not office: raise HTTPException(status_code=404, detail="Office not found") if data.weekday < 0 or data.weekday > 6: raise HTTPException(status_code=400, detail="Weekday must be 0-6 (0=Sunday, 6=Saturday)") existing = db.query(OfficeWeeklyClosingDay).filter( OfficeWeeklyClosingDay.office_id == office_id, OfficeWeeklyClosingDay.weekday == data.weekday ).first() if existing: raise HTTPException(status_code=400, detail="Weekly closing day already exists for this weekday") weekly_closing = OfficeWeeklyClosingDay( id=generate_uuid(), office_id=office_id, weekday=data.weekday ) db.add(weekly_closing) db.commit() return {"id": weekly_closing.id, "message": "Weekly closing day added"} @router.delete("/{office_id}/weekly-closing-days/{weekly_id}") def remove_office_weekly_closing_day(office_id: str, weekly_id: str, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)): """Remove a weekly closing day for an office""" check_office_access(user, office_id) weekly_closing = db.query(OfficeWeeklyClosingDay).filter( OfficeWeeklyClosingDay.id == weekly_id, OfficeWeeklyClosingDay.office_id == office_id ).first() if not weekly_closing: raise HTTPException(status_code=404, detail="Weekly closing day not found") db.delete(weekly_closing) db.commit() return {"message": "Weekly closing day removed"} # Guarantees @router.get("/{office_id}/guarantees") def get_office_guarantees(office_id: str, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)): """Get parking guarantees for an office""" check_office_access(user, office_id) guarantees = db.query(ParkingGuarantee).filter(ParkingGuarantee.office_id == office_id).all() # Batch query to get all user names at once user_ids = [g.user_id for g in guarantees] if user_ids: users = db.query(User).filter(User.id.in_(user_ids)).all() user_lookup = {u.id: u.name for u in users} else: user_lookup = {} return [ { "id": g.id, "user_id": g.user_id, "user_name": user_lookup.get(g.user_id), "start_date": g.start_date, "end_date": g.end_date, "notes": g.notes } for g in guarantees ] @router.post("/{office_id}/guarantees") def add_office_guarantee(office_id: str, data: GuaranteeCreate, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)): """Add parking guarantee for an office""" check_office_access(user, office_id) office = db.query(Office).filter(Office.id == office_id).first() if not office: raise HTTPException(status_code=404, detail="Office not found") if not db.query(User).filter(User.id == data.user_id).first(): raise HTTPException(status_code=404, detail="User not found") existing = db.query(ParkingGuarantee).filter( ParkingGuarantee.office_id == office_id, ParkingGuarantee.user_id == data.user_id ).first() if existing: raise HTTPException(status_code=400, detail="User already has a parking guarantee") if data.start_date and data.end_date and data.end_date < data.start_date: raise HTTPException(status_code=400, detail="End date must be after start date") guarantee = ParkingGuarantee( id=generate_uuid(), office_id=office_id, user_id=data.user_id, start_date=data.start_date, end_date=data.end_date, notes=data.notes, created_at=datetime.utcnow() ) db.add(guarantee) db.commit() return {"id": guarantee.id, "message": "Guarantee added"} @router.delete("/{office_id}/guarantees/{guarantee_id}") def remove_office_guarantee(office_id: str, guarantee_id: str, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)): """Remove parking guarantee for an office""" check_office_access(user, office_id) guarantee = db.query(ParkingGuarantee).filter( ParkingGuarantee.id == guarantee_id, ParkingGuarantee.office_id == office_id ).first() if not guarantee: raise HTTPException(status_code=404, detail="Guarantee not found") db.delete(guarantee) db.commit() return {"message": "Guarantee removed"} # Exclusions @router.get("/{office_id}/exclusions") def get_office_exclusions(office_id: str, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)): """Get parking exclusions for an office""" check_office_access(user, office_id) exclusions = db.query(ParkingExclusion).filter(ParkingExclusion.office_id == office_id).all() # Batch query to get all user names at once user_ids = [e.user_id for e in exclusions] if user_ids: users = db.query(User).filter(User.id.in_(user_ids)).all() user_lookup = {u.id: u.name for u in users} else: user_lookup = {} return [ { "id": e.id, "user_id": e.user_id, "user_name": user_lookup.get(e.user_id), "start_date": e.start_date, "end_date": e.end_date, "notes": e.notes } for e in exclusions ] @router.post("/{office_id}/exclusions") def add_office_exclusion(office_id: str, data: ExclusionCreate, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)): """Add parking exclusion for an office""" check_office_access(user, office_id) office = db.query(Office).filter(Office.id == office_id).first() if not office: raise HTTPException(status_code=404, detail="Office not found") if not db.query(User).filter(User.id == data.user_id).first(): raise HTTPException(status_code=404, detail="User not found") existing = db.query(ParkingExclusion).filter( ParkingExclusion.office_id == office_id, ParkingExclusion.user_id == data.user_id ).first() if existing: raise HTTPException(status_code=400, detail="User already has a parking exclusion") if data.start_date and data.end_date and data.end_date < data.start_date: raise HTTPException(status_code=400, detail="End date must be after start date") exclusion = ParkingExclusion( id=generate_uuid(), office_id=office_id, user_id=data.user_id, start_date=data.start_date, end_date=data.end_date, notes=data.notes, created_at=datetime.utcnow() ) db.add(exclusion) db.commit() return {"id": exclusion.id, "message": "Exclusion added"} @router.delete("/{office_id}/exclusions/{exclusion_id}") def remove_office_exclusion(office_id: str, exclusion_id: str, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)): """Remove parking exclusion for an office""" check_office_access(user, office_id) exclusion = db.query(ParkingExclusion).filter( ParkingExclusion.id == exclusion_id, ParkingExclusion.office_id == office_id ).first() if not exclusion: raise HTTPException(status_code=404, detail="Exclusion not found") db.delete(exclusion) db.commit() return {"message": "Exclusion removed"}