365 lines
12 KiB
Python
365 lines
12 KiB
Python
"""
|
|
Parking Assignment Service
|
|
Office-centric parking spot management with fairness algorithm
|
|
|
|
Key concepts:
|
|
- Offices own parking spots (defined by Office.parking_quota)
|
|
- Each office has a spot prefix (A, B, C...) for display names
|
|
- Spots are named like A1, A2, B1, B2 based on office prefix
|
|
- Fairness: users with lowest parking_days/presence_days ratio get priority
|
|
"""
|
|
from datetime import datetime, date, timezone, timedelta
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import or_
|
|
|
|
from database.models import (
|
|
DailyParkingAssignment, User, UserPresence, Office, OfficeSpot,
|
|
ParkingGuarantee, ParkingExclusion, OfficeClosingDay, OfficeWeeklyClosingDay,
|
|
UserRole, PresenceStatus
|
|
)
|
|
from utils.helpers import generate_uuid
|
|
from app import config
|
|
|
|
|
|
def get_spot_prefix(office: Office, db: Session) -> str:
|
|
"""Get the spot prefix for an office (from office.spot_prefix or auto-assign)"""
|
|
# Logic moved to Office creation/update mostly, but keeping helper if needed
|
|
if office.spot_prefix:
|
|
return office.spot_prefix
|
|
return "A" # Fallback
|
|
|
|
|
|
def get_spot_display_name(spot_id: str, office_id: str, db: Session) -> str:
|
|
"""Get display name for a spot"""
|
|
# Now easy: fetch from OfficeSpot
|
|
# But wait: spot_id in assignment IS the OfficeSpot.id
|
|
spot = db.query(OfficeSpot).filter(OfficeSpot.id == spot_id).first()
|
|
if spot:
|
|
return spot.name
|
|
return "Unknown"
|
|
|
|
|
|
def is_closing_day(office_id: str, check_date: date, db: Session) -> bool:
|
|
"""
|
|
Check if date is a closing day for this office.
|
|
Checks both specific closing days and weekly recurring closing days.
|
|
"""
|
|
# Check specific closing day (single day or range)
|
|
specific = db.query(OfficeClosingDay).filter(
|
|
OfficeClosingDay.office_id == office_id,
|
|
or_(
|
|
OfficeClosingDay.date == check_date,
|
|
(OfficeClosingDay.end_date != None) & (OfficeClosingDay.date <= check_date) & (OfficeClosingDay.end_date >= check_date)
|
|
)
|
|
).first()
|
|
if specific:
|
|
return True
|
|
|
|
# Check weekly closing day
|
|
# Python: 0=Monday, 6=Sunday
|
|
# DB/API: 0=Sunday, 1=Monday... (Legacy convention)
|
|
python_weekday = check_date.weekday()
|
|
db_weekday = (python_weekday + 1) % 7
|
|
|
|
weekly = db.query(OfficeWeeklyClosingDay).filter(
|
|
OfficeWeeklyClosingDay.office_id == office_id,
|
|
OfficeWeeklyClosingDay.weekday == db_weekday
|
|
).first()
|
|
|
|
return weekly is not None
|
|
|
|
|
|
def initialize_parking_pool(office_id: str, quota: int, pool_date: date, db: Session) -> int:
|
|
"""
|
|
Get total capacity for the date.
|
|
(Legacy name kept for compatibility, but now it just returns count).
|
|
"""
|
|
if is_closing_day(office_id, pool_date, db):
|
|
return 0
|
|
|
|
return db.query(OfficeSpot).filter(
|
|
OfficeSpot.office_id == office_id,
|
|
OfficeSpot.is_unavailable == False
|
|
).count()
|
|
|
|
|
|
def get_available_spots(office_id: str, pool_date: date, db: Session) -> list[OfficeSpot]:
|
|
"""Get list of unassigned OfficeSpots for a date"""
|
|
# 1. Get all active spots
|
|
all_spots = db.query(OfficeSpot).filter(
|
|
OfficeSpot.office_id == office_id,
|
|
OfficeSpot.is_unavailable == False
|
|
).all()
|
|
|
|
# 2. Get assigned spot IDs
|
|
assigned_ids = db.query(DailyParkingAssignment.spot_id).filter(
|
|
DailyParkingAssignment.office_id == office_id,
|
|
DailyParkingAssignment.date == pool_date
|
|
).all()
|
|
assigned_set = {a[0] for a in assigned_ids}
|
|
|
|
# 3. Filter
|
|
return [s for s in all_spots if s.id not in assigned_set]
|
|
|
|
|
|
def get_user_parking_ratio(user_id: str, office_id: str, db: Session) -> float:
|
|
"""
|
|
Calculate user's parking ratio: parking_days / presence_days
|
|
Lower ratio = higher priority for next parking spot
|
|
"""
|
|
# Count days user was present
|
|
presence_days = db.query(UserPresence).filter(
|
|
UserPresence.user_id == user_id,
|
|
UserPresence.status == PresenceStatus.PRESENT
|
|
).count()
|
|
|
|
if presence_days == 0:
|
|
return 0.0 # New user, highest priority
|
|
|
|
# Count days user got parking
|
|
parking_days = db.query(DailyParkingAssignment).filter(
|
|
DailyParkingAssignment.user_id == user_id,
|
|
DailyParkingAssignment.office_id == office_id
|
|
).count()
|
|
|
|
return parking_days / presence_days
|
|
|
|
|
|
def is_user_excluded(user_id: str, office_id: str, check_date: date, db: Session) -> bool:
|
|
"""Check if user is excluded from parking for this date"""
|
|
exclusions = db.query(ParkingExclusion).filter(
|
|
ParkingExclusion.office_id == office_id,
|
|
ParkingExclusion.user_id == user_id
|
|
).all()
|
|
|
|
if not exclusions:
|
|
return False
|
|
|
|
# Check against all exclusions
|
|
for exclusion in exclusions:
|
|
# If any exclusion covers this date, user is excluded
|
|
|
|
# Check date range
|
|
start_ok = True
|
|
if exclusion.start_date and check_date < exclusion.start_date:
|
|
start_ok = False
|
|
|
|
end_ok = True
|
|
if exclusion.end_date and check_date > exclusion.end_date:
|
|
end_ok = False
|
|
|
|
if start_ok and end_ok:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def has_guarantee(user_id: str, office_id: str, check_date: date, db: Session) -> bool:
|
|
"""Check if user has a parking guarantee for this date"""
|
|
guarantee = db.query(ParkingGuarantee).filter(
|
|
ParkingGuarantee.office_id == office_id,
|
|
ParkingGuarantee.user_id == user_id
|
|
).first()
|
|
|
|
if not guarantee:
|
|
return False
|
|
|
|
# Check date range
|
|
if guarantee.start_date and check_date < guarantee.start_date:
|
|
return False
|
|
if guarantee.end_date and check_date > guarantee.end_date:
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def get_users_wanting_parking(office_id: str, pool_date: date, db: Session) -> list[dict]:
|
|
"""
|
|
Get all users who want parking for this date, sorted by fairness priority.
|
|
Returns list of {user_id, has_guarantee, ratio}
|
|
"""
|
|
# Get users who marked "present" for this date:
|
|
# - Users belonging to this office
|
|
present_users = db.query(UserPresence).join(User).filter(
|
|
UserPresence.date == pool_date,
|
|
UserPresence.status == PresenceStatus.PRESENT,
|
|
User.office_id == office_id
|
|
).all()
|
|
|
|
candidates = []
|
|
for presence in present_users:
|
|
user_id = presence.user_id
|
|
|
|
# Skip excluded users
|
|
if is_user_excluded(user_id, office_id, pool_date, db):
|
|
continue
|
|
|
|
# Skip users who already have a spot
|
|
existing = db.query(DailyParkingAssignment).filter(
|
|
DailyParkingAssignment.date == pool_date,
|
|
DailyParkingAssignment.user_id == user_id
|
|
).first()
|
|
if existing:
|
|
continue
|
|
|
|
candidates.append({
|
|
"user_id": user_id,
|
|
"has_guarantee": has_guarantee(user_id, office_id, pool_date, db),
|
|
"ratio": get_user_parking_ratio(user_id, office_id, db)
|
|
})
|
|
|
|
# Sort: guaranteed users first, then by ratio (lowest first for fairness)
|
|
candidates.sort(key=lambda x: (not x["has_guarantee"], x["ratio"]))
|
|
|
|
return candidates
|
|
|
|
def assign_parking_fairly(office_id: str, pool_date: date, db: Session) -> dict:
|
|
"""
|
|
Assign parking spots fairly based on parking ratio.
|
|
Creates new DailyParkingAssignment rows only for assigned users.
|
|
"""
|
|
if is_closing_day(office_id, pool_date, db):
|
|
return {"assigned": [], "waitlist": [], "closed": True}
|
|
|
|
# Get candidates sorted by fairness
|
|
candidates = get_users_wanting_parking(office_id, pool_date, db)
|
|
|
|
# Get available spots (OfficeSpots not yet in assignments table)
|
|
free_spots = get_available_spots(office_id, pool_date, db)
|
|
|
|
assigned = []
|
|
waitlist = []
|
|
|
|
for candidate in candidates:
|
|
if free_spots:
|
|
# Sort spots by number to fill A1, A2... order
|
|
free_spots.sort(key=lambda s: s.spot_number)
|
|
|
|
spot = free_spots.pop(0)
|
|
|
|
# Create assignment
|
|
assignment = DailyParkingAssignment(
|
|
id=generate_uuid(),
|
|
date=pool_date,
|
|
spot_id=spot.id,
|
|
user_id=candidate["user_id"],
|
|
office_id=office_id,
|
|
created_at=datetime.now(timezone.utc)
|
|
)
|
|
db.add(assignment)
|
|
assigned.append(candidate["user_id"])
|
|
else:
|
|
waitlist.append(candidate["user_id"])
|
|
|
|
db.commit()
|
|
return {"assigned": assigned, "waitlist": waitlist}
|
|
|
|
|
|
def release_user_spot(office_id: str, user_id: str, pool_date: date, db: Session) -> bool:
|
|
"""Release a user's parking spot and reassign to next in fairness queue"""
|
|
assignment = db.query(DailyParkingAssignment).filter(
|
|
DailyParkingAssignment.office_id == office_id,
|
|
DailyParkingAssignment.date == pool_date,
|
|
DailyParkingAssignment.user_id == user_id
|
|
).first()
|
|
|
|
if not assignment:
|
|
return False
|
|
|
|
# Capture spot ID before deletion
|
|
spot_id = assignment.spot_id
|
|
|
|
# Release the spot (Delete the row)
|
|
db.delete(assignment)
|
|
db.commit()
|
|
|
|
# Try to assign to next user in fairness queue
|
|
candidates = get_users_wanting_parking(office_id, pool_date, db)
|
|
if candidates:
|
|
top_candidate = candidates[0]
|
|
|
|
# Create new assignment for top candidate
|
|
new_assignment = DailyParkingAssignment(
|
|
id=generate_uuid(),
|
|
date=pool_date,
|
|
spot_id=spot_id,
|
|
user_id=top_candidate["user_id"],
|
|
office_id=office_id,
|
|
created_at=datetime.now(timezone.utc)
|
|
)
|
|
db.add(new_assignment)
|
|
db.commit()
|
|
|
|
return True
|
|
|
|
|
|
def handle_presence_change(user_id: str, change_date: date, old_status: PresenceStatus, new_status: PresenceStatus, office_id: str, db: Session):
|
|
"""
|
|
Handle presence status change and update parking accordingly.
|
|
Uses fairness algorithm for assignment.
|
|
"""
|
|
# Don't process past dates
|
|
if change_date < datetime.utcnow().date():
|
|
return
|
|
|
|
# Get office (must be valid)
|
|
office = db.query(Office).filter(Office.id == office_id).first()
|
|
if not office or not office.parking_quota:
|
|
return
|
|
|
|
# No initialization needed for sparse model
|
|
|
|
if old_status == PresenceStatus.PRESENT and new_status in [PresenceStatus.REMOTE, PresenceStatus.ABSENT]:
|
|
# User no longer coming - release their spot (will auto-reassign)
|
|
release_user_spot(office.id, user_id, change_date, db)
|
|
|
|
elif new_status == PresenceStatus.PRESENT:
|
|
# Check booking window
|
|
should_assign = True
|
|
if office.booking_window_enabled:
|
|
# Allocation time is Day-1 at cutoff hour
|
|
cutoff_dt = datetime.combine(change_date - timedelta(days=1), datetime.min.time())
|
|
cutoff_dt = cutoff_dt.replace(
|
|
hour=office.booking_window_end_hour,
|
|
minute=office.booking_window_end_minute
|
|
)
|
|
|
|
# If now is before cutoff, do not assign yet (wait for batch job)
|
|
if datetime.utcnow() < cutoff_dt:
|
|
should_assign = False
|
|
config.logger.debug(f"Queuing parking request for user {user_id} on {change_date} (Window open until {cutoff_dt})")
|
|
|
|
if should_assign:
|
|
# User coming in - run fair assignment for this date
|
|
assign_parking_fairly(office.id, change_date, db)
|
|
|
|
|
|
def clear_assignments_for_office_date(office_id: str, pool_date: date, db: Session) -> int:
|
|
"""
|
|
Clear all parking assignments for an office on a specific date.
|
|
Returns number of cleared spots.
|
|
"""
|
|
assignments = db.query(DailyParkingAssignment).filter(
|
|
DailyParkingAssignment.office_id == office_id,
|
|
DailyParkingAssignment.date == pool_date
|
|
).all()
|
|
|
|
count = len(assignments)
|
|
for a in assignments:
|
|
db.delete(a)
|
|
|
|
db.commit()
|
|
return count
|
|
|
|
|
|
def run_batch_allocation(office_id: str, pool_date: date, db: Session) -> dict:
|
|
"""
|
|
Run the batch allocation for a specific date.
|
|
Force clears existing assignments to ensure a fair clean-slate allocation.
|
|
"""
|
|
# 1. Clear existing assignments
|
|
clear_assignments_for_office_date(office_id, pool_date, db)
|
|
|
|
# 2. Run fair allocation
|
|
return assign_parking_fairly(office_id, pool_date, db)
|