Files
org-parking/app/routes/offices.py

514 lines
19 KiB
Python

"""
Office Management Routes
Office settings, closing days, guarantees, and exclusions
"""
from datetime import datetime, date
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.orm import Session
from sqlalchemy import func
from database.connection import get_db
from database.models import (
User, Office,
OfficeClosingDay, OfficeWeeklyClosingDay,
ParkingGuarantee, ParkingExclusion,
UserRole
)
from utils.auth_middleware import require_admin, require_manager_or_admin, get_current_user
from utils.helpers import generate_uuid
from app import config
router = APIRouter(prefix="/api/offices", tags=["offices"])
# Request/Response Models
class ValidOfficeCreate(BaseModel):
name: str
parking_quota: int = 0
class ClosingDayCreate(BaseModel):
date: date # Start date
end_date: date | None = None # Optional end date (inclusive)
reason: str | None = None
class WeeklyClosingDayCreate(BaseModel):
weekday: int # 0=Sunday, 1=Monday, ..., 6=Saturday
class GuaranteeCreate(BaseModel):
user_id: str
start_date: date | None = None
end_date: date | None = None
notes: str | None = None
class ExclusionCreate(BaseModel):
user_id: str
start_date: date | None = None
end_date: date | None = None
notes: str | None = None
class OfficeSettingsUpdate(BaseModel):
parking_quota: int | None = None
name: str | None = None
booking_window_enabled: bool | None = None
booking_window_end_hour: int | None = None
booking_window_end_minute: int | None = None
# Helper check
def check_office_access(user: User, office_id: str):
if user.role == UserRole.ADMIN:
return True
if user.role == UserRole.MANAGER and user.office_id == office_id:
return True
raise HTTPException(status_code=403, detail="Access denied to this office")
# Office listing and details
@router.get("")
def list_offices(db: Session = Depends(get_db), user=Depends(require_manager_or_admin)):
"""Get all offices with their user count and parking quota"""
offices = db.query(Office).all()
# Batch query user counts
counts = db.query(User.office_id, func.count(User.id)).filter(
User.office_id.isnot(None)
).group_by(User.office_id).all()
user_counts = {office_id: count for office_id, count in counts}
return [
{
"id": office.id,
"name": office.name,
"parking_quota": office.parking_quota,
"spot_prefix": office.spot_prefix,
"user_count": user_counts.get(office.id, 0)
}
for office in offices
]
def get_next_available_prefix(db: Session) -> str:
"""Find the next available office prefix (A, B, C... AA, AB...)"""
existing = db.query(Office.spot_prefix).filter(Office.spot_prefix.isnot(None)).all()
used_prefixes = {row[0] for row in existing}
# Try single letters A-Z
for i in range(26):
char = chr(65 + i)
if char not in used_prefixes:
return char
# Try double letters AA-ZZ if needed
for i in range(26):
for j in range(26):
char = chr(65 + i) + chr(65 + j)
if char not in used_prefixes:
return char
raise HTTPException(status_code=400, detail="No more office prefixes available")
@router.post("")
def create_office(data: ValidOfficeCreate, db: Session = Depends(get_db), user=Depends(require_admin)):
"""Create a new office (admin only)"""
office = Office(
id=generate_uuid(),
name=data.name,
parking_quota=data.parking_quota,
spot_prefix=get_next_available_prefix(db),
created_at=datetime.utcnow()
)
db.add(office)
db.commit()
# Sync spots
from services.offices import sync_office_spots
sync_office_spots(office.id, office.parking_quota, office.spot_prefix, db)
return office
@router.get("/{office_id}")
def get_office_details(office_id: str, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)):
"""Get office details"""
office = db.query(Office).filter(Office.id == office_id).first()
if not office:
raise HTTPException(status_code=404, detail="Office not found")
check_office_access(user, office_id)
user_count = db.query(User).filter(User.office_id == office_id).count()
return {
"id": office.id,
"name": office.name,
"parking_quota": office.parking_quota,
"spot_prefix": office.spot_prefix,
"user_count": user_count,
"booking_window_enabled": office.booking_window_enabled,
"booking_window_end_hour": office.booking_window_end_hour,
"booking_window_end_minute": office.booking_window_end_minute
}
@router.put("/{office_id}")
def update_office_settings(office_id: str, data: OfficeSettingsUpdate, db: Session = Depends(get_db), user=Depends(require_admin)):
"""Update office settings (admin only) - Manager can view but usually Admin sets quota"""
# Verify access - currently assume admin manages quota. If manager should too, update logic.
# User request description: "Admin manage all offices with CRUD... rimodulare posti auto".
# So Managers might not edit quota? Or maybe they can?
# Keeping it simple: require_admin for structural changes.
office = db.query(Office).filter(Office.id == office_id).first()
if not office:
raise HTTPException(status_code=404, detail="Office not found")
if data.name:
office.name = data.name
if data.parking_quota is not None:
if data.parking_quota < 0:
raise HTTPException(status_code=400, detail="Parking quota must be non-negative")
office.parking_quota = data.parking_quota
if data.booking_window_enabled is not None:
office.booking_window_enabled = data.booking_window_enabled
if data.booking_window_end_hour is not None:
if not (0 <= data.booking_window_end_hour <= 23):
raise HTTPException(status_code=400, detail="Hour must be 0-23")
office.booking_window_end_hour = data.booking_window_end_hour
if data.booking_window_end_minute is not None:
if not (0 <= data.booking_window_end_minute <= 59):
raise HTTPException(status_code=400, detail="Minute must be 0-59")
office.booking_window_end_minute = data.booking_window_end_minute
office.updated_at = datetime.utcnow()
db.commit()
# Sync spots
from services.offices import sync_office_spots
sync_office_spots(office.id, office.parking_quota, office.spot_prefix, db)
return {
"id": office.id,
"name": office.name,
"parking_quota": office.parking_quota,
"spot_prefix": office.spot_prefix,
"booking_window_enabled": office.booking_window_enabled,
"booking_window_end_hour": office.booking_window_end_hour,
"booking_window_end_minute": office.booking_window_end_minute
}
@router.delete("/{office_id}")
def delete_office(office_id: str, db: Session = Depends(get_db), user=Depends(require_admin)):
"""Delete an office (admin only)"""
office = db.query(Office).filter(Office.id == office_id).first()
if not office:
raise HTTPException(status_code=404, detail="Office not found")
db.delete(office)
db.commit()
return {"message": "Office deleted"}
@router.get("/{office_id}/users")
def get_office_users(office_id: str, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)):
"""Get all users in an office"""
check_office_access(user, office_id)
office = db.query(Office).filter(Office.id == office_id).first()
if not office:
raise HTTPException(status_code=404, detail="Office not found")
users = db.query(User).filter(User.office_id == office_id).all()
return [{"id": u.id, "name": u.name, "email": u.email, "role": u.role} for u in users]
# Closing days
@router.get("/{office_id}/closing-days")
def get_office_closing_days(office_id: str, db: Session = Depends(get_db), user=Depends(get_current_user)):
"""Get closing days for an office"""
# Any user in the office can read closing days? Or just manager?
# check_office_access(user, office_id) # Let's allow read for all authenticated (frontend might need it)
days = db.query(OfficeClosingDay).filter(
OfficeClosingDay.office_id == office_id
).order_by(OfficeClosingDay.date).all()
return [{"id": d.id, "date": d.date, "end_date": d.end_date, "reason": d.reason} for d in days]
@router.post("/{office_id}/closing-days")
def add_office_closing_day(office_id: str, data: ClosingDayCreate, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)):
"""Add a closing day for an office"""
check_office_access(user, office_id)
office = db.query(Office).filter(Office.id == office_id).first()
if not office:
raise HTTPException(status_code=404, detail="Office not found")
existing = db.query(OfficeClosingDay).filter(
OfficeClosingDay.office_id == office_id,
OfficeClosingDay.date == data.date
).first()
if existing:
raise HTTPException(status_code=400, detail="Closing day already exists for this date")
if data.end_date and data.end_date < data.date:
raise HTTPException(status_code=400, detail="End date must be after start date")
closing_day = OfficeClosingDay(
id=generate_uuid(),
office_id=office_id,
date=data.date,
end_date=data.end_date,
reason=data.reason
)
db.add(closing_day)
db.commit()
return {"id": closing_day.id, "message": "Closing day added"}
@router.delete("/{office_id}/closing-days/{closing_day_id}")
def remove_office_closing_day(office_id: str, closing_day_id: str, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)):
"""Remove a closing day for an office"""
check_office_access(user, office_id)
closing_day = db.query(OfficeClosingDay).filter(
OfficeClosingDay.id == closing_day_id,
OfficeClosingDay.office_id == office_id
).first()
if not closing_day:
raise HTTPException(status_code=404, detail="Closing day not found")
db.delete(closing_day)
db.commit()
return {"message": "Closing day removed"}
# Weekly closing days
@router.get("/{office_id}/weekly-closing-days")
def get_office_weekly_closing_days(office_id: str, db: Session = Depends(get_db), user=Depends(get_current_user)):
"""Get weekly closing days for an office"""
days = db.query(OfficeWeeklyClosingDay).filter(
OfficeWeeklyClosingDay.office_id == office_id
).all()
return [{"id": d.id, "weekday": d.weekday} for d in days]
@router.post("/{office_id}/weekly-closing-days")
def add_office_weekly_closing_day(office_id: str, data: WeeklyClosingDayCreate, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)):
"""Add a weekly closing day for an office"""
check_office_access(user, office_id)
office = db.query(Office).filter(Office.id == office_id).first()
if not office:
raise HTTPException(status_code=404, detail="Office not found")
if data.weekday < 0 or data.weekday > 6:
raise HTTPException(status_code=400, detail="Weekday must be 0-6 (0=Sunday, 6=Saturday)")
existing = db.query(OfficeWeeklyClosingDay).filter(
OfficeWeeklyClosingDay.office_id == office_id,
OfficeWeeklyClosingDay.weekday == data.weekday
).first()
if existing:
raise HTTPException(status_code=400, detail="Weekly closing day already exists for this weekday")
weekly_closing = OfficeWeeklyClosingDay(
id=generate_uuid(),
office_id=office_id,
weekday=data.weekday
)
db.add(weekly_closing)
db.commit()
return {"id": weekly_closing.id, "message": "Weekly closing day added"}
@router.delete("/{office_id}/weekly-closing-days/{weekly_id}")
def remove_office_weekly_closing_day(office_id: str, weekly_id: str, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)):
"""Remove a weekly closing day for an office"""
check_office_access(user, office_id)
weekly_closing = db.query(OfficeWeeklyClosingDay).filter(
OfficeWeeklyClosingDay.id == weekly_id,
OfficeWeeklyClosingDay.office_id == office_id
).first()
if not weekly_closing:
raise HTTPException(status_code=404, detail="Weekly closing day not found")
db.delete(weekly_closing)
db.commit()
return {"message": "Weekly closing day removed"}
# Guarantees
@router.get("/{office_id}/guarantees")
def get_office_guarantees(office_id: str, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)):
"""Get parking guarantees for an office"""
check_office_access(user, office_id)
guarantees = db.query(ParkingGuarantee).filter(ParkingGuarantee.office_id == office_id).all()
# Batch query to get all user names at once
user_ids = [g.user_id for g in guarantees]
if user_ids:
users = db.query(User).filter(User.id.in_(user_ids)).all()
user_lookup = {u.id: u.name for u in users}
else:
user_lookup = {}
return [
{
"id": g.id,
"user_id": g.user_id,
"user_name": user_lookup.get(g.user_id),
"start_date": g.start_date,
"end_date": g.end_date,
"notes": g.notes
}
for g in guarantees
]
@router.post("/{office_id}/guarantees")
def add_office_guarantee(office_id: str, data: GuaranteeCreate, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)):
"""Add parking guarantee for an office"""
check_office_access(user, office_id)
office = db.query(Office).filter(Office.id == office_id).first()
if not office:
raise HTTPException(status_code=404, detail="Office not found")
if not db.query(User).filter(User.id == data.user_id).first():
raise HTTPException(status_code=404, detail="User not found")
existing = db.query(ParkingGuarantee).filter(
ParkingGuarantee.office_id == office_id,
ParkingGuarantee.user_id == data.user_id
).first()
if existing:
raise HTTPException(status_code=400, detail="User already has a parking guarantee")
if data.start_date and data.end_date and data.end_date < data.start_date:
raise HTTPException(status_code=400, detail="End date must be after start date")
guarantee = ParkingGuarantee(
id=generate_uuid(),
office_id=office_id,
user_id=data.user_id,
start_date=data.start_date,
end_date=data.end_date,
notes=data.notes,
created_at=datetime.utcnow()
)
db.add(guarantee)
db.commit()
return {"id": guarantee.id, "message": "Guarantee added"}
@router.delete("/{office_id}/guarantees/{guarantee_id}")
def remove_office_guarantee(office_id: str, guarantee_id: str, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)):
"""Remove parking guarantee for an office"""
check_office_access(user, office_id)
guarantee = db.query(ParkingGuarantee).filter(
ParkingGuarantee.id == guarantee_id,
ParkingGuarantee.office_id == office_id
).first()
if not guarantee:
raise HTTPException(status_code=404, detail="Guarantee not found")
db.delete(guarantee)
db.commit()
return {"message": "Guarantee removed"}
# Exclusions
@router.get("/{office_id}/exclusions")
def get_office_exclusions(office_id: str, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)):
"""Get parking exclusions for an office"""
check_office_access(user, office_id)
exclusions = db.query(ParkingExclusion).filter(ParkingExclusion.office_id == office_id).all()
# Batch query to get all user names at once
user_ids = [e.user_id for e in exclusions]
if user_ids:
users = db.query(User).filter(User.id.in_(user_ids)).all()
user_lookup = {u.id: u.name for u in users}
else:
user_lookup = {}
return [
{
"id": e.id,
"user_id": e.user_id,
"user_name": user_lookup.get(e.user_id),
"start_date": e.start_date,
"end_date": e.end_date,
"notes": e.notes
}
for e in exclusions
]
@router.post("/{office_id}/exclusions")
def add_office_exclusion(office_id: str, data: ExclusionCreate, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)):
"""Add parking exclusion for an office"""
check_office_access(user, office_id)
office = db.query(Office).filter(Office.id == office_id).first()
if not office:
raise HTTPException(status_code=404, detail="Office not found")
if not db.query(User).filter(User.id == data.user_id).first():
raise HTTPException(status_code=404, detail="User not found")
# Relaxed unique check - user can have multiple exclusions (different periods)
# existing = db.query(ParkingExclusion).filter(
# ParkingExclusion.office_id == office_id,
# ParkingExclusion.user_id == data.user_id
# ).first()
# if existing:
# raise HTTPException(status_code=400, detail="User already has a parking exclusion")
if data.start_date and data.end_date and data.end_date < data.start_date:
raise HTTPException(status_code=400, detail="End date must be after start date")
if data.end_date and not data.start_date:
raise HTTPException(status_code=400, detail="Start date is required if an end date is specified")
exclusion = ParkingExclusion(
id=generate_uuid(),
office_id=office_id,
user_id=data.user_id,
start_date=data.start_date,
end_date=data.end_date,
notes=data.notes,
created_at=datetime.utcnow()
)
db.add(exclusion)
db.commit()
return {"id": exclusion.id, "message": "Exclusion added"}
@router.delete("/{office_id}/exclusions/{exclusion_id}")
def remove_office_exclusion(office_id: str, exclusion_id: str, db: Session = Depends(get_db), user=Depends(require_manager_or_admin)):
"""Remove parking exclusion for an office"""
check_office_access(user, office_id)
exclusion = db.query(ParkingExclusion).filter(
ParkingExclusion.id == exclusion_id,
ParkingExclusion.office_id == office_id
).first()
if not exclusion:
raise HTTPException(status_code=404, detail="Exclusion not found")
db.delete(exclusion)
db.commit()
return {"message": "Exclusion removed"}