Primo commit

This commit is contained in:
2026-01-13 11:20:12 +01:00
parent ce9e2fdf2a
commit 17453f5d13
51 changed files with 3883 additions and 2508 deletions

View File

@@ -3,13 +3,13 @@ Presence Management Routes
User presence marking and admin management
"""
from typing import List
from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, HTTPException
from datetime import datetime, timedelta, date
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy.orm import Session
from database.connection import get_db
from database.models import UserPresence, User, DailyParkingAssignment
from database.models import UserPresence, User, DailyParkingAssignment, UserRole, PresenceStatus, Office
from utils.auth_middleware import get_current_user, require_manager_or_admin
from utils.helpers import generate_uuid
from services.parking import handle_presence_change, get_spot_display_name
@@ -20,38 +20,26 @@ router = APIRouter(prefix="/api/presence", tags=["presence"])
# Request/Response Models
class PresenceMarkRequest(BaseModel):
date: str # YYYY-MM-DD
status: str # present, remote, absent
date: date
status: PresenceStatus
class AdminPresenceMarkRequest(BaseModel):
user_id: str
date: str
status: str
date: date
status: PresenceStatus
class BulkPresenceRequest(BaseModel):
start_date: str
end_date: str
status: str
days: List[int] | None = None # Optional: [0,1,2,3,4] for Mon-Fri
class AdminBulkPresenceRequest(BaseModel):
user_id: str
start_date: str
end_date: str
status: str
days: List[int] | None = None
class PresenceResponse(BaseModel):
id: str
user_id: str
date: str
status: str
created_at: str | None
updated_at: str | None
date: date
status: PresenceStatus
created_at: datetime | None
updated_at: datetime | None
parking_spot_number: str | None = None
class Config:
@@ -59,51 +47,38 @@ class PresenceResponse(BaseModel):
# Helper functions
def validate_status(status: str):
if status not in ["present", "remote", "absent"]:
raise HTTPException(status_code=400, detail="Status must be: present, remote, or absent")
def parse_date(date_str: str) -> datetime:
try:
return datetime.strptime(date_str, "%Y-%m-%d")
except ValueError:
raise HTTPException(status_code=400, detail="Invalid date format. Use YYYY-MM-DD")
def check_manager_access(current_user: User, target_user: User, db: Session):
"""Check if current_user has access to target_user"""
if current_user.role == "admin":
if current_user.role == UserRole.ADMIN:
return True
if current_user.role == "manager":
# Manager can access users they manage
if target_user.manager_id == current_user.id:
if current_user.role == UserRole.MANAGER:
# Manager can access users in their Office
if target_user.office_id == current_user.office_id:
return True
raise HTTPException(status_code=403, detail="User is not managed by you")
raise HTTPException(status_code=403, detail="User is not in your office")
raise HTTPException(status_code=403, detail="Access denied")
def _mark_presence_for_user(
user_id: str,
date: str,
status: str,
presence_date: date,
status: PresenceStatus,
db: Session,
target_user: User
) -> UserPresence:
"""
Core presence marking logic - shared by user and admin routes.
"""
validate_status(status)
parse_date(date)
existing = db.query(UserPresence).filter(
UserPresence.user_id == user_id,
UserPresence.date == date
UserPresence.date == presence_date
).first()
now = datetime.utcnow().isoformat()
now = datetime.utcnow()
old_status = existing.status if existing else None
if existing:
@@ -116,7 +91,7 @@ def _mark_presence_for_user(
presence = UserPresence(
id=generate_uuid(),
user_id=user_id,
date=date,
date=presence_date,
status=status,
created_at=now,
updated_at=now
@@ -125,114 +100,36 @@ def _mark_presence_for_user(
db.commit()
db.refresh(presence)
# Handle parking assignment
# Use manager_id if user has one, or user's own id if they are a manager
parking_manager_id = target_user.manager_id
if not parking_manager_id and target_user.role == "manager":
# Manager is part of their own team for parking purposes
parking_manager_id = target_user.id
if old_status != status and parking_manager_id:
# Handle parking assignment (if user is in an office)
if target_user.office_id and old_status != status:
try:
handle_presence_change(
user_id, date,
old_status or "absent", status,
parking_manager_id, db
user_id, presence_date,
old_status or PresenceStatus.ABSENT, status,
target_user.office_id, db
)
except Exception as e:
config.logger.warning(f"Parking handler failed for user {user_id} on {date}: {e}")
config.logger.warning(f"Parking handler failed for user {user_id} on {presence_date}: {e}")
return presence
def _bulk_mark_presence(
user_id: str,
start_date: str,
end_date: str,
status: str,
days: List[int] | None,
db: Session,
target_user: User
) -> List[UserPresence]:
"""
Core bulk presence marking logic - shared by user and admin routes.
"""
validate_status(status)
start = parse_date(start_date)
end = parse_date(end_date)
if end < start:
raise HTTPException(status_code=400, detail="End date must be after start date")
if (end - start).days > 90:
raise HTTPException(status_code=400, detail="Range cannot exceed 90 days")
results = []
current_date = start
now = datetime.utcnow().isoformat()
while current_date <= end:
if days is None or current_date.weekday() in days:
date_str = current_date.strftime("%Y-%m-%d")
existing = db.query(UserPresence).filter(
UserPresence.user_id == user_id,
UserPresence.date == date_str
).first()
old_status = existing.status if existing else None
if existing:
existing.status = status
existing.updated_at = now
results.append(existing)
else:
presence = UserPresence(
id=generate_uuid(),
user_id=user_id,
date=date_str,
status=status,
created_at=now,
updated_at=now
)
db.add(presence)
results.append(presence)
# Handle parking for each date
# Use manager_id if user has one, or user's own id if they are a manager
parking_manager_id = target_user.manager_id
if not parking_manager_id and target_user.role == "manager":
parking_manager_id = target_user.id
if old_status != status and parking_manager_id:
try:
handle_presence_change(
user_id, date_str,
old_status or "absent", status,
parking_manager_id, db
)
except Exception as e:
config.logger.warning(f"Parking handler failed for user {user_id} on {date_str}: {e}")
current_date += timedelta(days=1)
db.commit()
return results
def _delete_presence(
user_id: str,
date: str,
presence_date: date,
db: Session,
target_user: User
) -> dict:
"""
Core presence deletion logic - shared by user and admin routes.
"""
parse_date(date)
presence = db.query(UserPresence).filter(
UserPresence.user_id == user_id,
UserPresence.date == date
UserPresence.date == presence_date
).first()
if not presence:
@@ -242,20 +139,15 @@ def _delete_presence(
db.delete(presence)
db.commit()
# Use manager_id if user has one, or user's own id if they are a manager
parking_manager_id = target_user.manager_id
if not parking_manager_id and target_user.role == "manager":
parking_manager_id = target_user.id
if parking_manager_id:
if target_user.office_id:
try:
handle_presence_change(
user_id, date,
old_status, "absent",
parking_manager_id, db
user_id, presence_date,
old_status, PresenceStatus.ABSENT,
target_user.office_id, db
)
except Exception as e:
config.logger.warning(f"Parking handler failed for user {user_id} on {date}: {e}")
config.logger.warning(f"Parking handler failed for user {user_id} on {presence_date}: {e}")
return {"message": "Presence deleted"}
@@ -267,34 +159,26 @@ def mark_presence(data: PresenceMarkRequest, db: Session = Depends(get_db), curr
return _mark_presence_for_user(current_user.id, data.date, data.status, db, current_user)
@router.post("/mark-bulk", response_model=List[PresenceResponse])
def mark_bulk_presence(data: BulkPresenceRequest, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Mark presence for a date range"""
return _bulk_mark_presence(
current_user.id, data.start_date, data.end_date,
data.status, data.days, db, current_user
)
@router.get("/my-presences", response_model=List[PresenceResponse])
def get_my_presences(start_date: str = None, end_date: str = None, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
def get_my_presences(start_date: date = None, end_date: date = None, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Get current user's presences"""
query = db.query(UserPresence).filter(UserPresence.user_id == current_user.id)
if start_date:
parse_date(start_date)
query = query.filter(UserPresence.date >= start_date)
if end_date:
parse_date(end_date)
query = query.filter(UserPresence.date <= end_date)
return query.order_by(UserPresence.date.desc()).all()
@router.delete("/{date}")
def delete_presence(date: str, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
@router.delete("/{date_val}")
def delete_presence(date_val: date, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Delete presence for a date"""
return _delete_presence(current_user.id, date, db, current_user)
return _delete_presence(current_user.id, date_val, db, current_user)
# Admin/Manager Routes
@@ -309,66 +193,47 @@ def admin_mark_presence(data: AdminPresenceMarkRequest, db: Session = Depends(ge
return _mark_presence_for_user(data.user_id, data.date, data.status, db, target_user)
@router.post("/admin/mark-bulk", response_model=List[PresenceResponse])
def admin_mark_bulk_presence(data: AdminBulkPresenceRequest, db: Session = Depends(get_db), current_user=Depends(require_manager_or_admin)):
"""Bulk mark presence for any user (manager/admin)"""
target_user = db.query(User).filter(User.id == data.user_id).first()
if not target_user:
raise HTTPException(status_code=404, detail="User not found")
check_manager_access(current_user, target_user, db)
return _bulk_mark_presence(
data.user_id, data.start_date, data.end_date,
data.status, data.days, db, target_user
)
@router.delete("/admin/{user_id}/{date}")
def admin_delete_presence(user_id: str, date: str, db: Session = Depends(get_db), current_user=Depends(require_manager_or_admin)):
@router.delete("/admin/{user_id}/{date_val}")
def admin_delete_presence(user_id: str, date_val: date, db: Session = Depends(get_db), current_user=Depends(require_manager_or_admin)):
"""Delete presence for any user (manager/admin)"""
target_user = db.query(User).filter(User.id == user_id).first()
if not target_user:
raise HTTPException(status_code=404, detail="User not found")
check_manager_access(current_user, target_user, db)
return _delete_presence(user_id, date, db, target_user)
return _delete_presence(user_id, date_val, db, target_user)
@router.get("/team")
def get_team_presences(start_date: str, end_date: str, manager_id: str = None, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Get team presences with parking info, filtered by manager.
- Admins can see all teams
- Managers see their own team
- Employees can only see their own team (read-only view)
def get_team_presences(start_date: date, end_date: date, office_id: str = None, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Get office presences with parking info.
- Admins can see all users (or filter by office_id)
- Managers see their own office's users
- Employees can see their own office's users (read-only view)
"""
parse_date(start_date)
parse_date(end_date)
# Get users based on permissions and manager filter
# Note: Manager is part of their own team (for parking assignment purposes)
if current_user.role == "employee":
# Employees can only see their own team (users with same manager_id + the manager)
if not current_user.manager_id:
return [] # No manager assigned, no team to show
users = db.query(User).filter(
(User.manager_id == current_user.manager_id) | (User.id == current_user.manager_id)
).all()
elif manager_id:
# Filter by specific manager (for admins/managers) - include the manager themselves
users = db.query(User).filter(
(User.manager_id == manager_id) | (User.id == manager_id)
).all()
elif current_user.role == "admin":
# Admin sees all users
users = db.query(User).all()
if current_user.role == UserRole.ADMIN:
if office_id:
users = db.query(User).filter(User.office_id == office_id).all()
else:
users = db.query(User).all()
elif current_user.office_id:
# Non-admin users see their office members
users = db.query(User).filter(User.office_id == current_user.office_id).all()
else:
# Manager sees their team + themselves
users = db.query(User).filter(
(User.manager_id == current_user.id) | (User.id == current_user.id)
).all()
# No office assigned
return []
# Batch query presences and parking for all users
# Batch query presences and parking for all selected users
user_ids = [u.id for u in users]
if not user_ids:
return []
presences = db.query(UserPresence).filter(
UserPresence.user_id.in_(user_ids),
UserPresence.date >= start_date,
@@ -389,7 +254,7 @@ def get_team_presences(start_date: str, end_date: str, manager_id: str = None, d
parking_lookup[p.user_id] = []
parking_info_lookup[p.user_id] = []
parking_lookup[p.user_id].append(p.date)
spot_display_name = get_spot_display_name(p.spot_id, p.manager_id, db)
spot_display_name = get_spot_display_name(p.spot_id, p.office_id, db)
parking_info_lookup[p.user_id].append({
"id": p.id,
"date": p.date,
@@ -397,10 +262,10 @@ def get_team_presences(start_date: str, end_date: str, manager_id: str = None, d
"spot_display_name": spot_display_name
})
# Build manager lookup for display
manager_ids = list(set(u.manager_id for u in users if u.manager_id))
managers = db.query(User).filter(User.id.in_(manager_ids)).all() if manager_ids else []
manager_lookup = {m.id: m.name for m in managers}
# Build office lookup for display (replacing old manager_lookup)
office_ids = list(set(u.office_id for u in users if u.office_id))
offices = db.query(Office).filter(Office.id.in_(office_ids)).all() if office_ids else []
office_lookup = {o.id: o.name for o in offices}
# Build response
result = []
@@ -410,8 +275,8 @@ def get_team_presences(start_date: str, end_date: str, manager_id: str = None, d
result.append({
"id": user.id,
"name": user.name,
"manager_id": user.manager_id,
"manager_name": manager_lookup.get(user.manager_id),
"office_id": user.office_id,
"office_name": office_lookup.get(user.office_id),
"presences": [{"date": p.date, "status": p.status} for p in user_presences],
"parking_dates": parking_lookup.get(user.id, []),
"parking_info": parking_info_lookup.get(user.id, [])
@@ -421,7 +286,7 @@ def get_team_presences(start_date: str, end_date: str, manager_id: str = None, d
@router.get("/admin/{user_id}")
def get_user_presences(user_id: str, start_date: str = None, end_date: str = None, db: Session = Depends(get_db), current_user=Depends(require_manager_or_admin)):
def get_user_presences(user_id: str, start_date: date = None, end_date: date = None, db: Session = Depends(get_db), current_user=Depends(require_manager_or_admin)):
"""Get any user's presences with parking info (manager/admin)"""
target_user = db.query(User).filter(User.id == user_id).first()
if not target_user:
@@ -432,24 +297,23 @@ def get_user_presences(user_id: str, start_date: str = None, end_date: str = Non
query = db.query(UserPresence).filter(UserPresence.user_id == user_id)
if start_date:
parse_date(start_date)
query = query.filter(UserPresence.date >= start_date)
if end_date:
parse_date(end_date)
query = query.filter(UserPresence.date <= end_date)
presences = query.order_by(UserPresence.date.desc()).all()
# Batch query parking assignments
date_strs = [p.date for p in presences]
dates = [p.date for p in presences]
parking_map = {}
if date_strs:
if dates:
# Note: Assignments link to user. We can find spot display name by looking up assignment -> office
assignments = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.user_id == user_id,
DailyParkingAssignment.date.in_(date_strs)
DailyParkingAssignment.date.in_(dates)
).all()
for a in assignments:
parking_map[a.date] = get_spot_display_name(a.spot_id, a.manager_id, db)
parking_map[a.date] = get_spot_display_name(a.spot_id, a.office_id, db)
# Build response
result = []