Files
org-parking/services/parking.py
2026-01-13 11:20:12 +01:00

369 lines
12 KiB
Python

"""
Parking Assignment Service
Office-centric parking spot management with fairness algorithm
Key concepts:
- Offices own parking spots (defined by Office.parking_quota)
- Each office has a spot prefix (A, B, C...) for display names
- Spots are named like A1, A2, B1, B2 based on office prefix
- Fairness: users with lowest parking_days/presence_days ratio get priority
"""
from datetime import datetime, date, timezone, timedelta
from sqlalchemy.orm import Session
from sqlalchemy import or_
from database.models import (
DailyParkingAssignment, User, UserPresence, Office,
ParkingGuarantee, ParkingExclusion, OfficeClosingDay, OfficeWeeklyClosingDay,
UserRole, PresenceStatus
)
from utils.helpers import generate_uuid
from app import config
def get_spot_prefix(office: Office, db: Session) -> str:
"""Get the spot prefix for an office (from office.spot_prefix or auto-assign)"""
if office.spot_prefix:
return office.spot_prefix
# Auto-assign based on alphabetical order of offices without prefix
offices = db.query(Office).filter(
Office.spot_prefix == None
).order_by(Office.name).all()
# Find existing prefixes
existing_prefixes = set(
o.spot_prefix for o in db.query(Office).filter(
Office.spot_prefix != None
).all()
)
# Find first available letter
office_index = next((i for i, o in enumerate(offices) if o.id == office.id), 0)
letter = 'A'
count = 0
while letter in existing_prefixes or count < office_index:
if letter not in existing_prefixes:
count += 1
letter = chr(ord(letter) + 1)
if ord(letter) > ord('Z'):
letter = 'A'
break
return letter
def get_spot_display_name(spot_id: str, office_id: str, db: Session) -> str:
"""Get display name for a spot (e.g., 'A3' instead of 'spot-3')"""
office = db.query(Office).filter(Office.id == office_id).first()
if not office:
return spot_id
prefix = get_spot_prefix(office, db)
spot_number = spot_id.replace("spot-", "")
return f"{prefix}{spot_number}"
def is_closing_day(office_id: str, check_date: date, db: Session) -> bool:
"""
Check if date is a closing day for this office.
Checks both specific closing days and weekly recurring closing days.
"""
# Check specific closing day (single day or range)
specific = db.query(OfficeClosingDay).filter(
OfficeClosingDay.office_id == office_id,
or_(
OfficeClosingDay.date == check_date,
(OfficeClosingDay.end_date != None) & (OfficeClosingDay.date <= check_date) & (OfficeClosingDay.end_date >= check_date)
)
).first()
if specific:
return True
# Check weekly closing day
# Python: 0=Monday, 6=Sunday
# DB/API: 0=Sunday, 1=Monday... (Legacy convention)
python_weekday = check_date.weekday()
db_weekday = (python_weekday + 1) % 7
weekly = db.query(OfficeWeeklyClosingDay).filter(
OfficeWeeklyClosingDay.office_id == office_id,
OfficeWeeklyClosingDay.weekday == db_weekday
).first()
return weekly is not None
def initialize_parking_pool(office_id: str, quota: int, pool_date: date, db: Session) -> int:
"""Initialize empty parking spots for an office's pool on a given date.
Returns 0 if it's a closing day (no parking available).
"""
# Don't create pool on closing days
if is_closing_day(office_id, pool_date, db):
return 0
existing = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.office_id == office_id,
DailyParkingAssignment.date == pool_date
).count()
if existing > 0:
return existing
for i in range(1, quota + 1):
spot = DailyParkingAssignment(
id=generate_uuid(),
date=pool_date,
spot_id=f"spot-{i}",
user_id=None,
office_id=office_id,
created_at=datetime.now(timezone.utc)
)
db.add(spot)
db.commit()
config.logger.debug(f"Initialized {quota} parking spots for office {office_id} on {pool_date}")
return quota
def get_user_parking_ratio(user_id: str, office_id: str, db: Session) -> float:
"""
Calculate user's parking ratio: parking_days / presence_days
Lower ratio = higher priority for next parking spot
"""
# Count days user was present
presence_days = db.query(UserPresence).filter(
UserPresence.user_id == user_id,
UserPresence.status == PresenceStatus.PRESENT
).count()
if presence_days == 0:
return 0.0 # New user, highest priority
# Count days user got parking
parking_days = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.user_id == user_id,
DailyParkingAssignment.office_id == office_id
).count()
return parking_days / presence_days
def is_user_excluded(user_id: str, office_id: str, check_date: date, db: Session) -> bool:
"""Check if user is excluded from parking for this date"""
exclusion = db.query(ParkingExclusion).filter(
ParkingExclusion.office_id == office_id,
ParkingExclusion.user_id == user_id
).first()
if not exclusion:
return False
# Check date range
if exclusion.start_date and check_date < exclusion.start_date:
return False
if exclusion.end_date and check_date > exclusion.end_date:
return False
return True
def has_guarantee(user_id: str, office_id: str, check_date: date, db: Session) -> bool:
"""Check if user has a parking guarantee for this date"""
guarantee = db.query(ParkingGuarantee).filter(
ParkingGuarantee.office_id == office_id,
ParkingGuarantee.user_id == user_id
).first()
if not guarantee:
return False
# Check date range
if guarantee.start_date and check_date < guarantee.start_date:
return False
if guarantee.end_date and check_date > guarantee.end_date:
return False
return True
def get_users_wanting_parking(office_id: str, pool_date: date, db: Session) -> list[dict]:
"""
Get all users who want parking for this date, sorted by fairness priority.
Returns list of {user_id, has_guarantee, ratio}
"""
# Get users who marked "present" for this date:
# - Users belonging to this office
present_users = db.query(UserPresence).join(User).filter(
UserPresence.date == pool_date,
UserPresence.status == PresenceStatus.PRESENT,
User.office_id == office_id
).all()
candidates = []
for presence in present_users:
user_id = presence.user_id
# Skip excluded users
if is_user_excluded(user_id, office_id, pool_date, db):
continue
# Skip users who already have a spot
existing = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.date == pool_date,
DailyParkingAssignment.user_id == user_id
).first()
if existing:
continue
candidates.append({
"user_id": user_id,
"has_guarantee": has_guarantee(user_id, office_id, pool_date, db),
"ratio": get_user_parking_ratio(user_id, office_id, db)
})
# Sort: guaranteed users first, then by ratio (lowest first for fairness)
candidates.sort(key=lambda x: (not x["has_guarantee"], x["ratio"]))
return candidates
def assign_parking_fairly(office_id: str, pool_date: date, db: Session) -> dict:
"""
Assign parking spots fairly based on parking ratio.
Called after presence is set for a date.
Returns {assigned: [...], waitlist: [...]}
"""
office = db.query(Office).filter(Office.id == office_id).first()
if not office or not office.parking_quota:
return {"assigned": [], "waitlist": []}
# No parking on closing days
if is_closing_day(office_id, pool_date, db):
return {"assigned": [], "waitlist": [], "closed": True}
# Initialize pool
initialize_parking_pool(office_id, office.parking_quota, pool_date, db)
# Get candidates sorted by fairness
candidates = get_users_wanting_parking(office_id, pool_date, db)
# Get available spots
free_spots = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.office_id == office_id,
DailyParkingAssignment.date == pool_date,
DailyParkingAssignment.user_id == None
).all()
assigned = []
waitlist = []
for candidate in candidates:
if free_spots:
spot = free_spots.pop(0)
spot.user_id = candidate["user_id"]
assigned.append(candidate["user_id"])
else:
waitlist.append(candidate["user_id"])
db.commit()
return {"assigned": assigned, "waitlist": waitlist}
def release_user_spot(office_id: str, user_id: str, pool_date: date, db: Session) -> bool:
"""Release a user's parking spot and reassign to next in fairness queue"""
assignment = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.office_id == office_id,
DailyParkingAssignment.date == pool_date,
DailyParkingAssignment.user_id == user_id
).first()
if not assignment:
return False
# Release the spot
assignment.user_id = None
db.commit()
# Try to assign to next user in fairness queue
candidates = get_users_wanting_parking(office_id, pool_date, db)
if candidates:
assignment.user_id = candidates[0]["user_id"]
db.commit()
return True
def handle_presence_change(user_id: str, change_date: date, old_status: PresenceStatus, new_status: PresenceStatus, office_id: str, db: Session):
"""
Handle presence status change and update parking accordingly.
Uses fairness algorithm for assignment.
"""
# Don't process past dates
if change_date < datetime.utcnow().date():
return
# Get office (must be valid)
office = db.query(Office).filter(Office.id == office_id).first()
if not office or not office.parking_quota:
return
# Initialize pool if needed
initialize_parking_pool(office.id, office.parking_quota, change_date, db)
if old_status == PresenceStatus.PRESENT and new_status in [PresenceStatus.REMOTE, PresenceStatus.ABSENT]:
# User no longer coming - release their spot (will auto-reassign)
release_user_spot(office.id, user_id, change_date, db)
elif new_status == PresenceStatus.PRESENT:
# Check booking window
should_assign = True
if office.booking_window_enabled:
# Allocation time is Day-1 at cutoff hour
cutoff_dt = datetime.combine(change_date - timedelta(days=1), datetime.min.time())
cutoff_dt = cutoff_dt.replace(
hour=office.booking_window_end_hour,
minute=office.booking_window_end_minute
)
# If now is before cutoff, do not assign yet (wait for batch job)
if datetime.utcnow() < cutoff_dt:
should_assign = False
config.logger.debug(f"Queuing parking request for user {user_id} on {change_date} (Window open until {cutoff_dt})")
if should_assign:
# User coming in - run fair assignment for this date
assign_parking_fairly(office.id, change_date, db)
def clear_assignments_for_office_date(office_id: str, pool_date: date, db: Session) -> int:
"""
Clear all parking assignments for an office on a specific date.
Returns number of cleared spots.
"""
assignments = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.office_id == office_id,
DailyParkingAssignment.date == pool_date,
DailyParkingAssignment.user_id != None
).all()
count = len(assignments)
for a in assignments:
a.user_id = None
db.commit()
return count
def run_batch_allocation(office_id: str, pool_date: date, db: Session) -> dict:
"""
Run the batch allocation for a specific date.
Force clears existing assignments to ensure a fair clean-slate allocation.
"""
# 1. Clear existing assignments
clear_assignments_for_office_date(office_id, pool_date, db)
# 2. Run fair allocation
return assign_parking_fairly(office_id, pool_date, db)