Files
org-parking/app/routes/parking.py

532 lines
20 KiB
Python

"""
Parking Management Routes
Parking assignments, spot management, and pool initialization
Manager-centric model:
- Managers own parking spots (defined by manager_parking_quota)
- Spots are named with manager's letter prefix (A1, A2, B1, B2...)
- Assignments reference manager_id directly
"""
"""
Parking Management Routes
Parking assignments, spot management, and pool initialization
Manager-centric model:
- Managers own parking spots (defined by manager_parking_quota)
- Spots are named with manager's letter prefix (A1, A2, B1, B2...)
- Assignments reference manager_id directly
"""
from typing import List
from datetime import datetime, date
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.orm import Session
from database.connection import get_db
from database.models import DailyParkingAssignment, User, UserRole, Office, OfficeSpot
from utils.auth_middleware import get_current_user, require_manager_or_admin
from services.parking import (
initialize_parking_pool, get_spot_display_name, release_user_spot,
run_batch_allocation, clear_assignments_for_office_date
)
from services.notifications import notify_parking_assigned, notify_parking_released, notify_parking_reassigned
from app import config
router = APIRouter(prefix="/api/parking", tags=["parking"])
# Request/Response Models
class InitPoolRequest(BaseModel):
date: date
class ManualAssignRequest(BaseModel):
office_id: str
user_id: str
spot_id: str
date: date
class ReassignSpotRequest(BaseModel):
assignment_id: str
new_user_id: str | None # None = release spot
class AssignmentResponse(BaseModel):
id: str
date: date
spot_id: str
spot_display_name: str | None = None
user_id: str | None
office_id: str
user_name: str | None = None
user_email: str | None = None
class RunAllocationRequest(BaseModel):
date: date
office_id: str
class ClearAssignmentsRequest(BaseModel):
date: date
office_id: str
# Routes
@router.post("/init-office-pool")
def init_office_pool(request: InitPoolRequest, db: Session = Depends(get_db), current_user=Depends(require_manager_or_admin)):
"""Initialize parking pool for an office on a given date"""
pool_date = request.date
if not current_user.office_id:
raise HTTPException(status_code=400, detail="User does not belong to an office")
office = db.query(Office).filter(Office.id == current_user.office_id).first()
if not office or not office.parking_quota:
return {"success": True, "message": "No parking quota configured", "spots": 0}
spots = initialize_parking_pool(office.id, office.parking_quota, pool_date, db)
return {"success": True, "spots": spots}
@router.get("/assignments/{date_val}", response_model=List[AssignmentResponse])
def get_assignments(date_val: date, office_id: str | None = None, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Get parking assignments for a date, merging active assignments with empty spots"""
query_date = date_val
# Defaults to user's office if not specified
target_office_id = office_id or current_user.office_id
if not target_office_id:
# Admin looking at all? Or error?
# If no office_id, we might fetch all spots from all offices?
# Let's support specific office filtering primarily as per UI use case
# If office_id is None, we proceed with caution (maybe all offices)
pass
# 1. Get ALL spots for the target office(s)
# Note: Sorting by spot_number for consistent display order
spot_query = db.query(OfficeSpot).filter(OfficeSpot.is_unavailable == False)
if target_office_id:
spot_query = spot_query.filter(OfficeSpot.office_id == target_office_id)
spots = spot_query.order_by(OfficeSpot.spot_number).all()
# 2. Get EXISTING assignments
assign_query = db.query(DailyParkingAssignment).filter(DailyParkingAssignment.date == query_date)
if target_office_id:
assign_query = assign_query.filter(DailyParkingAssignment.office_id == target_office_id)
active_assignments = assign_query.all()
# Map assignment by spot_id for O(1) lookup
assignment_map = {a.spot_id: a for a in active_assignments}
results = []
# 3. Merge
for spot in spots:
assignment = assignment_map.get(spot.id)
if assignment:
# Active assignment
result = AssignmentResponse(
id=assignment.id,
date=assignment.date,
spot_id=spot.id, # The FK
spot_display_name=spot.name,
user_id=assignment.user_id,
office_id=spot.office_id
)
if assignment.user_id:
user = db.query(User).filter(User.id == assignment.user_id).first()
if user:
result.user_name = user.name
result.user_email = user.email
else:
# Empty spot (Virtual assignment response)
# We use "virtual" ID or just None? Schema says ID is str.
# Frontend might need an ID for keys. Let's use "virtual-{spot.id}"
result = AssignmentResponse(
id=f"virtual-{spot.id}",
date=query_date,
spot_id=spot.id,
spot_display_name=spot.name,
user_id=None,
office_id=spot.office_id
)
results.append(result)
return results
@router.get("/my-assignments", response_model=List[AssignmentResponse])
def get_my_assignments(start_date: date = None, end_date: date = None, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Get current user's parking assignments"""
query = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.user_id == current_user.id
)
if start_date:
query = query.filter(DailyParkingAssignment.date >= start_date)
if end_date:
query = query.filter(DailyParkingAssignment.date <= end_date)
assignments = query.order_by(DailyParkingAssignment.date.desc()).all()
results = []
for assignment in assignments:
spot_display_name = get_spot_display_name(assignment.spot_id, assignment.office_id, db)
results.append(AssignmentResponse(
id=assignment.id,
date=assignment.date,
spot_id=assignment.spot_id,
spot_display_name=spot_display_name,
user_id=assignment.user_id,
office_id=assignment.office_id,
user_name=current_user.name,
user_email=current_user.email
))
return results
@router.post("/run-allocation")
def run_fair_allocation(data: RunAllocationRequest, db: Session = Depends(get_db), current_user=Depends(require_manager_or_admin)):
"""Manually trigger fair allocation for a date (Test Tool)"""
# Verify office access
if current_user.role == UserRole.MANAGER and current_user.office_id != data.office_id:
raise HTTPException(status_code=403, detail="Not authorized for this office")
result = run_batch_allocation(data.office_id, data.date, db)
return {"message": "Allocation completed", "result": result}
@router.post("/clear-assignments")
def clear_assignments(data: ClearAssignmentsRequest, db: Session = Depends(get_db), current_user=Depends(require_manager_or_admin)):
"""Clear all assignments for a date (Test Tool)"""
# Verify office access
if current_user.role == UserRole.MANAGER and current_user.office_id != data.office_id:
raise HTTPException(status_code=403, detail="Not authorized for this office")
count = clear_assignments_for_office_date(data.office_id, data.date, db)
return {"message": "Assignments cleared", "count": count}
@router.post("/manual-assign")
def manual_assign(data: ManualAssignRequest, db: Session = Depends(get_db), current_user=Depends(require_manager_or_admin)):
"""Manually assign a spot to a user"""
assign_date = data.date
# Verify user exists
user = db.query(User).filter(User.id == data.user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Verify office exists
office = db.query(Office).filter(Office.id == data.office_id).first()
if not office:
raise HTTPException(status_code=404, detail="Office not found")
# Only admin or the manager of that office can assign spots
is_manager = (current_user.role == UserRole.MANAGER and current_user.office_id == data.office_id)
if current_user.role != UserRole.ADMIN and not is_manager:
raise HTTPException(status_code=403, detail="Not authorized to assign spots for this office")
# Check if spot exists (OfficeSpot)
spot_def = db.query(OfficeSpot).filter(OfficeSpot.id == data.spot_id).first()
if not spot_def:
raise HTTPException(status_code=404, detail="Spot definition not found")
# Check if spot is already assigned
existing_assignment = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.office_id == data.office_id,
DailyParkingAssignment.date == assign_date,
DailyParkingAssignment.spot_id == data.spot_id
).first()
if existing_assignment:
raise HTTPException(status_code=400, detail="Spot already assigned")
# Check if user already has a spot for this date (from any manager)
user_has_spot = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.date == assign_date,
DailyParkingAssignment.user_id == data.user_id
).first()
if user_has_spot:
raise HTTPException(status_code=400, detail="User already has a spot for this date")
# Create Assignment
new_assignment = DailyParkingAssignment(
id=generate_uuid(),
date=assign_date,
spot_id=data.spot_id,
user_id=data.user_id,
office_id=data.office_id,
created_at=datetime.utcnow()
)
db.add(new_assignment)
db.commit()
return {"message": "Spot assigned", "spot_id": data.spot_id, "spot_display_name": spot_def.name}
@router.post("/release-my-spot/{assignment_id}")
def release_my_spot(assignment_id: str, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Release a parking spot assigned to the current user"""
assignment = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.id == assignment_id
).first()
if not assignment:
raise HTTPException(status_code=404, detail="Assignment not found")
if assignment.user_id != current_user.id:
raise HTTPException(status_code=403, detail="You can only release your own parking spot")
# Get spot display name for notification
spot_name = assignment.spot.name if assignment.spot else "Unknown"
# Delete assignment (Release)
db.delete(assignment)
db.commit()
# Send notification (self-release, so just confirmation)
notify_parking_released(current_user, assignment.date, spot_name)
config.logger.info(f"User {current_user.email} released parking spot {spot_name} on {assignment.date}")
return {"message": "Parking spot released"}
@router.post("/reassign-spot")
def reassign_spot(data: ReassignSpotRequest, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Reassign a spot to another user or release it.
Allowed by: spot owner, their manager, or admin.
"""
assignment = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.id == data.assignment_id
).first()
if not assignment:
raise HTTPException(status_code=404, detail="Assignment not found")
# Check permission: admin, manager who owns the spot, or current spot holder
is_admin = current_user.role == UserRole.ADMIN
is_spot_owner = assignment.user_id == current_user.id
is_manager = (current_user.role == UserRole.MANAGER and current_user.office_id == assignment.office_id)
if not (is_admin or is_manager or is_spot_owner):
raise HTTPException(status_code=403, detail="Not authorized to reassign this spot")
# Store old user for notification
old_user_id = assignment.user_id
old_user = db.query(User).filter(User.id == old_user_id).first() if old_user_id else None
# Get spot display name for notifications
spot_name = assignment.spot.name if assignment.spot else "Unknown"
if data.new_user_id == "auto":
# "Auto assign" means releasing the spot so the system picks the next person
# release_user_spot returns True if it released it (and potentially reassigned it)
success = release_user_spot(assignment.office_id, assignment.user_id, assignment.date, db)
if not success:
raise HTTPException(status_code=400, detail="Could not auto-reassign spot")
return {"message": "Spot released for auto-assignment"}
elif data.new_user_id:
# Check new user exists
new_user = db.query(User).filter(User.id == data.new_user_id).first()
if not new_user:
raise HTTPException(status_code=404, detail="User not found")
# Check user doesn't already have a spot for this date
existing = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.user_id == data.new_user_id,
DailyParkingAssignment.date == assignment.date,
DailyParkingAssignment.id != assignment.id
).first()
if existing:
raise HTTPException(status_code=400, detail="User already has a spot for this date")
# Update assignment to new user
assignment.user_id = data.new_user_id
# Send notifications
# Notify old user that spot was reassigned
if old_user and old_user.id != new_user.id:
notify_parking_reassigned(old_user, assignment.date, spot_name, new_user.name)
# Notify new user that spot was assigned
notify_parking_assigned(new_user, assignment.date, spot_name)
config.logger.info(f"Parking spot {spot_name} on {assignment.date} reassigned from {old_user.email if old_user else 'unassigned'} to {new_user.email}")
db.commit()
db.refresh(assignment)
result = AssignmentResponse(
id=assignment.id,
date=assignment.date,
spot_id=assignment.spot_id,
spot_display_name=spot_name,
user_id=assignment.user_id,
office_id=assignment.office_id
)
if assignment.user_id:
result.user_name = new_user.name
result.user_email = new_user.email
return result
else:
# Release (Delete assignment)
db.delete(assignment)
# Notify old user that spot was released
if old_user:
notify_parking_released(old_user, assignment.date, spot_name)
config.logger.info(f"Parking spot {spot_name} on {assignment.date} released by {old_user.email if old_user else 'unknown'}")
db.commit()
return {"message": "Spot released"}
@router.get("/eligible-users/{assignment_id}")
def get_eligible_users(assignment_id: str, db: Session = Depends(get_db), current_user=Depends(get_current_user)):
"""Get users eligible for reassignment of a parking spot.
Returns users managed by the same manager.
"""
assignment = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.id == assignment_id
).first()
if not assignment:
raise HTTPException(status_code=404, detail="Assignment not found")
# Check permission: admin, manager who owns the spot, or current spot holder
is_admin = current_user.role == UserRole.ADMIN
is_spot_owner = assignment.user_id == current_user.id
is_manager = (current_user.role == UserRole.MANAGER and current_user.office_id == assignment.office_id)
if not (is_admin or is_manager or is_spot_owner):
raise HTTPException(status_code=403, detail="Not authorized")
# Get users in this office (including the manager themselves)
users = db.query(User).filter(
User.office_id == assignment.office_id,
User.id != assignment.user_id # Exclude current holder
).all()
# Filter out users who already have a spot for this date
existing_assignments = db.query(DailyParkingAssignment.user_id).filter(
DailyParkingAssignment.date == assignment.date,
DailyParkingAssignment.user_id.isnot(None),
DailyParkingAssignment.id != assignment.id
).all()
users_with_spots = {a[0] for a in existing_assignments}
result = []
for user in users:
if user.id not in users_with_spots:
result.append({
"id": user.id,
"name": user.name,
"email": user.email
})
return result
class TestEmailRequest(BaseModel):
date: date = None
office_id: str
@router.post("/test-email")
def send_test_email_tool(data: TestEmailRequest, db: Session = Depends(get_db), current_user=Depends(require_manager_or_admin)):
"""Send a test email to the current user (Test Tool)"""
from services.notifications import send_email
from database.models import OfficeClosingDay, OfficeWeeklyClosingDay
from datetime import timedelta
# Verify office access
if current_user.role == UserRole.MANAGER and current_user.office_id != data.office_id:
raise HTTPException(status_code=403, detail="Not authorized for this office")
target_date = data.date
if not target_date:
# Find next open day
# Start from tomorrow (or today? Prompt says "dopo il giorno corrente" -> after today)
check_date = date.today() + timedelta(days=1)
# Load closing rules
weekly_closed = db.query(OfficeWeeklyClosingDay.weekday).filter(
OfficeWeeklyClosingDay.office_id == data.office_id
).all()
weekly_closed_set = {w[0] for w in weekly_closed}
specific_closed = db.query(OfficeClosingDay).filter(
OfficeClosingDay.office_id == data.office_id,
OfficeClosingDay.date >= check_date
).all()
# Max lookahead 30 days to avoid infinite loop
found = False
for _ in range(30):
# Check weekly
if check_date.weekday() in weekly_closed_set:
check_date += timedelta(days=1)
continue
# Check specific
is_specific = False
for d in specific_closed:
s = d.date
e = d.end_date or d.date
if s <= check_date <= e:
is_specific = True
break
if is_specific:
check_date += timedelta(days=1)
continue
found = True
break
if found:
target_date = check_date
else:
# Fallback
target_date = date.today() + timedelta(days=1)
# Send Email
subject = f"Test Email - Parking System ({target_date})"
body_html = f"""
<html>
<body>
<h2>Parking System Test Email</h2>
<p>Hi {current_user.name},</p>
<p>This is a test email triggered from the Parking Manager Test Tools.</p>
<p><strong>Selected Date:</strong> {target_date}</p>
<p><strong>SMTP Status:</strong> {'Enabled' if config.SMTP_ENABLED else 'Disabled (File Logging)'}</p>
<p>If you received this, the notification system is working.</p>
</body>
</html>
"""
success = send_email(current_user.email, subject, body_html)
return {
"success": success,
"mode": "SMTP" if config.SMTP_ENABLED else "FILE",
"date": target_date,
"message": "Email sent successfully" if success else "Failed to send email"
}