Files
Org-Parking/app/routes/presence.py
2026-01-13 11:20:12 +01:00

332 lines
11 KiB
Python

"""
Presence Management Routes
User presence marking and admin management
"""
from typing import List
from datetime import datetime, timedelta, date
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy.orm import Session
from database.connection import get_db
from database.models import UserPresence, User, DailyParkingAssignment, UserRole, PresenceStatus, Office
from utils.auth_middleware import get_current_user, require_manager_or_admin
from utils.helpers import generate_uuid
from services.parking import handle_presence_change, get_spot_display_name
from app import config
router = APIRouter(prefix="/api/presence", tags=["presence"])
# Request/Response Models
class PresenceMarkRequest(BaseModel):
date: date
status: PresenceStatus
class AdminPresenceMarkRequest(BaseModel):
user_id: str
date: date
status: PresenceStatus
class PresenceResponse(BaseModel):
id: str
user_id: str
date: date
status: PresenceStatus
created_at: datetime | None
updated_at: datetime | None
parking_spot_number: str | None = None
class Config:
from_attributes = True
# Helper functions
def check_manager_access(current_user: User, target_user: User, db: Session):
"""Check if current_user has access to target_user"""
if current_user.role == UserRole.ADMIN:
return True
if current_user.role == UserRole.MANAGER:
# Manager can access users in their Office
if target_user.office_id == current_user.office_id:
return True
raise HTTPException(status_code=403, detail="User is not in your office")
raise HTTPException(status_code=403, detail="Access denied")
def _mark_presence_for_user(
user_id: str,
presence_date: date,
status: PresenceStatus,
db: Session,
target_user: User
) -> UserPresence:
"""
Core presence marking logic - shared by user and admin routes.
"""
existing = db.query(UserPresence).filter(
UserPresence.user_id == user_id,
UserPresence.date == presence_date
).first()
now = datetime.utcnow()
old_status = existing.status if existing else None
if existing:
existing.status = status
existing.updated_at = now
db.commit()
db.refresh(existing)
presence = existing
else:
presence = UserPresence(
id=generate_uuid(),
user_id=user_id,
date=presence_date,
status=status,
created_at=now,
updated_at=now
)
db.add(presence)
db.commit()
db.refresh(presence)
# Handle parking assignment (if user is in an office)
if target_user.office_id and old_status != status:
try:
handle_presence_change(
user_id, presence_date,
old_status or PresenceStatus.ABSENT, status,
target_user.office_id, db
)
except Exception as e:
config.logger.warning(f"Parking handler failed for user {user_id} on {presence_date}: {e}")
return presence
def _delete_presence(
user_id: str,
presence_date: date,
db: Session,
target_user: User
) -> dict:
"""
Core presence deletion logic - shared by user and admin routes.
"""
presence = db.query(UserPresence).filter(
UserPresence.user_id == user_id,
UserPresence.date == presence_date
).first()
if not presence:
raise HTTPException(status_code=404, detail="Presence not found")
old_status = presence.status
db.delete(presence)
db.commit()
if target_user.office_id:
try:
handle_presence_change(
user_id, presence_date,
old_status, PresenceStatus.ABSENT,
target_user.office_id, db
)
except Exception as e:
config.logger.warning(f"Parking handler failed for user {user_id} on {presence_date}: {e}")
return {"message": "Presence deleted"}
# User Routes
@router.post("/mark", response_model=PresenceResponse)
def mark_presence(data: PresenceMarkRequest, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Mark presence for a date"""
return _mark_presence_for_user(current_user.id, data.date, data.status, db, current_user)
@router.get("/my-presences", response_model=List[PresenceResponse])
def get_my_presences(start_date: date = None, end_date: date = None, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Get current user's presences"""
query = db.query(UserPresence).filter(UserPresence.user_id == current_user.id)
if start_date:
query = query.filter(UserPresence.date >= start_date)
if end_date:
query = query.filter(UserPresence.date <= end_date)
return query.order_by(UserPresence.date.desc()).all()
@router.delete("/{date_val}")
def delete_presence(date_val: date, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Delete presence for a date"""
return _delete_presence(current_user.id, date_val, db, current_user)
# Admin/Manager Routes
@router.post("/admin/mark", response_model=PresenceResponse)
def admin_mark_presence(data: AdminPresenceMarkRequest, db: Session = Depends(get_db), current_user=Depends(require_manager_or_admin)):
"""Mark presence for any user (manager/admin)"""
target_user = db.query(User).filter(User.id == data.user_id).first()
if not target_user:
raise HTTPException(status_code=404, detail="User not found")
check_manager_access(current_user, target_user, db)
return _mark_presence_for_user(data.user_id, data.date, data.status, db, target_user)
@router.delete("/admin/{user_id}/{date_val}")
def admin_delete_presence(user_id: str, date_val: date, db: Session = Depends(get_db), current_user=Depends(require_manager_or_admin)):
"""Delete presence for any user (manager/admin)"""
target_user = db.query(User).filter(User.id == user_id).first()
if not target_user:
raise HTTPException(status_code=404, detail="User not found")
check_manager_access(current_user, target_user, db)
return _delete_presence(user_id, date_val, db, target_user)
@router.get("/team")
def get_team_presences(start_date: date, end_date: date, office_id: str = None, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Get office presences with parking info.
- Admins can see all users (or filter by office_id)
- Managers see their own office's users
- Employees can see their own office's users (read-only view)
"""
if current_user.role == UserRole.ADMIN:
if office_id:
users = db.query(User).filter(User.office_id == office_id).all()
else:
users = db.query(User).all()
elif current_user.office_id:
# Non-admin users see their office members
users = db.query(User).filter(User.office_id == current_user.office_id).all()
else:
# No office assigned
return []
# Batch query presences and parking for all selected users
user_ids = [u.id for u in users]
if not user_ids:
return []
presences = db.query(UserPresence).filter(
UserPresence.user_id.in_(user_ids),
UserPresence.date >= start_date,
UserPresence.date <= end_date
).all()
parking_assignments = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.user_id.in_(user_ids),
DailyParkingAssignment.date >= start_date,
DailyParkingAssignment.date <= end_date
).all()
# Build lookups
parking_lookup = {}
parking_info_lookup = {}
for p in parking_assignments:
if p.user_id not in parking_lookup:
parking_lookup[p.user_id] = []
parking_info_lookup[p.user_id] = []
parking_lookup[p.user_id].append(p.date)
spot_display_name = get_spot_display_name(p.spot_id, p.office_id, db)
parking_info_lookup[p.user_id].append({
"id": p.id,
"date": p.date,
"spot_id": p.spot_id,
"spot_display_name": spot_display_name
})
# Build office lookup for display (replacing old manager_lookup)
office_ids = list(set(u.office_id for u in users if u.office_id))
offices = db.query(Office).filter(Office.id.in_(office_ids)).all() if office_ids else []
office_lookup = {o.id: o.name for o in offices}
# Build response
result = []
for user in users:
user_presences = [p for p in presences if p.user_id == user.id]
result.append({
"id": user.id,
"name": user.name,
"office_id": user.office_id,
"office_name": office_lookup.get(user.office_id),
"presences": [{"date": p.date, "status": p.status} for p in user_presences],
"parking_dates": parking_lookup.get(user.id, []),
"parking_info": parking_info_lookup.get(user.id, [])
})
return result
@router.get("/admin/{user_id}")
def get_user_presences(user_id: str, start_date: date = None, end_date: date = None, db: Session = Depends(get_db), current_user=Depends(require_manager_or_admin)):
"""Get any user's presences with parking info (manager/admin)"""
target_user = db.query(User).filter(User.id == user_id).first()
if not target_user:
raise HTTPException(status_code=404, detail="User not found")
check_manager_access(current_user, target_user, db)
query = db.query(UserPresence).filter(UserPresence.user_id == user_id)
if start_date:
query = query.filter(UserPresence.date >= start_date)
if end_date:
query = query.filter(UserPresence.date <= end_date)
presences = query.order_by(UserPresence.date.desc()).all()
# Batch query parking assignments
dates = [p.date for p in presences]
parking_map = {}
if dates:
# Note: Assignments link to user. We can find spot display name by looking up assignment -> office
assignments = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.user_id == user_id,
DailyParkingAssignment.date.in_(dates)
).all()
for a in assignments:
parking_map[a.date] = get_spot_display_name(a.spot_id, a.office_id, db)
# Build response
result = []
for presence in presences:
result.append({
"id": presence.id,
"user_id": presence.user_id,
"date": presence.date,
"status": presence.status,
"created_at": presence.created_at,
"updated_at": presence.updated_at,
"parking_spot_number": parking_map.get(presence.date)
})
return result