Initial commit: Parking Manager

Features:
- Manager-centric parking spot management
- Fair assignment algorithm (parking/presence ratio)
- Presence tracking calendar
- Closing days (specific & weekly recurring)
- Guarantees and exclusions
- Authelia/LLDAP integration for SSO

Stack:
- FastAPI backend
- SQLite database
- Vanilla JS frontend
- Docker deployment
This commit is contained in:
Stefano Manfredi
2025-11-26 23:37:50 +00:00
commit c74a0ed350
49 changed files with 9094 additions and 0 deletions

0
services/__init__.py Normal file
View File

79
services/auth.py Normal file
View File

@@ -0,0 +1,79 @@
"""
Authentication Service
JWT token management and password hashing
"""
import uuid
import bcrypt
from datetime import datetime, timedelta
from jose import jwt
from sqlalchemy.orm import Session
from app import config
from database.models import User
def hash_password(password: str) -> str:
"""Hash a password using bcrypt"""
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def verify_password(password: str, hashed: str) -> bool:
"""Verify a password against its hash"""
return bcrypt.checkpw(password.encode(), hashed.encode())
def create_access_token(user_id: str, email: str) -> str:
"""Create a JWT access token"""
expire = datetime.utcnow() + timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
payload = {
"sub": user_id,
"email": email,
"exp": expire
}
return jwt.encode(payload, config.SECRET_KEY, algorithm=config.ALGORITHM)
def decode_token(token: str) -> dict | None:
"""Decode and validate a JWT token"""
try:
return jwt.decode(token, config.SECRET_KEY, algorithms=[config.ALGORITHM])
except Exception:
return None
def get_user_by_email(db: Session, email: str) -> User | None:
"""Get user by email address"""
return db.query(User).filter(User.email == email).first()
def get_user_by_id(db: Session, user_id: str) -> User | None:
"""Get user by ID"""
return db.query(User).filter(User.id == user_id).first()
def authenticate_user(db: Session, email: str, password: str) -> User | None:
"""Authenticate user with email and password"""
user = get_user_by_email(db, email)
if not user or not user.password_hash:
return None
if not verify_password(password, user.password_hash):
return None
return user
def create_user(db: Session, email: str, password: str, name: str, office_id: str = None, role: str = "employee") -> User:
"""Create a new user"""
user = User(
id=str(uuid.uuid4()),
email=email,
password_hash=hash_password(password),
name=name,
office_id=office_id,
role=role,
created_at=datetime.utcnow().isoformat(),
updated_at=datetime.utcnow().isoformat()
)
db.add(user)
db.commit()
db.refresh(user)
return user

116
services/holidays.py Normal file
View File

@@ -0,0 +1,116 @@
"""
Holiday Service
Configurable holiday calculation for different regions
Currently supports Italian holidays. Can be extended to support other regions
by adding new holiday sets and a configuration option.
"""
from datetime import datetime, date, timedelta
def calculate_easter(year: int) -> date:
"""Calculate Easter Sunday using the Computus algorithm"""
a = year % 19
b = year // 100
c = year % 100
d = b // 4
e = b % 4
f = (b + 8) // 25
g = (b - f + 1) // 3
h = (19 * a + b - d - g + 15) % 30
i = c // 4
k = c % 4
l = (32 + 2 * e + 2 * i - h - k) % 7
m = (a + 11 * h + 22 * l) // 451
month = (h + l - 7 * m + 114) // 31
day = ((h + l - 7 * m + 114) % 31) + 1
return date(year, month, day)
def get_easter_monday(year: int) -> date:
"""Get Easter Monday for a given year"""
easter = calculate_easter(year)
return easter + timedelta(days=1)
# Italian fixed holidays (month, day)
ITALIAN_FIXED_HOLIDAYS = [
(1, 1), # New Year's Day
(1, 6), # Epiphany
(4, 25), # Liberation Day
(5, 1), # Labour Day
(6, 2), # Republic Day
(8, 15), # Assumption
(11, 1), # All Saints
(12, 8), # Immaculate Conception
(12, 25), # Christmas
(12, 26), # St. Stephen's
]
def is_italian_holiday(check_date: date | datetime) -> bool:
"""Check if a date is an Italian public holiday"""
if isinstance(check_date, datetime):
check_date = check_date.date()
year = check_date.year
month = check_date.month
day = check_date.day
# Check fixed holidays
if (month, day) in ITALIAN_FIXED_HOLIDAYS:
return True
# Check Easter Monday
easter_monday = get_easter_monday(year)
if check_date == easter_monday:
return True
return False
def get_holidays_for_year(year: int) -> list[dict]:
"""
Get all holidays for a given year.
Returns list of {date: YYYY-MM-DD, name: string}
"""
holidays = []
# Fixed holidays
holiday_names = [
"New Year's Day", "Epiphany", "Liberation Day", "Labour Day",
"Republic Day", "Assumption", "All Saints", "Immaculate Conception",
"Christmas", "St. Stephen's Day"
]
for (month, day), name in zip(ITALIAN_FIXED_HOLIDAYS, holiday_names):
holidays.append({
"date": f"{year}-{month:02d}-{day:02d}",
"name": name
})
# Easter Monday
easter_monday = get_easter_monday(year)
holidays.append({
"date": easter_monday.strftime("%Y-%m-%d"),
"name": "Easter Monday"
})
# Sort by date
holidays.sort(key=lambda h: h["date"])
return holidays
def is_holiday(check_date: date | datetime | str, region: str = "IT") -> bool:
"""
Check if a date is a holiday for the given region.
Currently only supports IT (Italy).
"""
if isinstance(check_date, str):
check_date = datetime.strptime(check_date, "%Y-%m-%d").date()
if region == "IT":
return is_italian_holiday(check_date)
# Default: no holidays
return False

393
services/notifications.py Normal file
View File

@@ -0,0 +1,393 @@
"""
Notification Service
Handles email notifications for presence reminders and parking assignments
TODO: This service is NOT YET ACTIVE. To enable notifications:
1. Add APScheduler or similar to run run_scheduled_notifications() periodically
2. Configure SMTP environment variables (SMTP_HOST, SMTP_USER, SMTP_PASSWORD, SMTP_FROM)
3. Notifications will be sent for:
- Presence reminders (Thursday at 12:00)
- Weekly parking summary (Friday at 12:00)
- Daily parking reminders (at user's preferred time)
- Immediate parking change notifications (via queue)
"""
import smtplib
import os
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
import uuid
from database.models import (
User, UserPresence, DailyParkingAssignment,
NotificationLog, NotificationQueue, OfficeMembership
)
from services.parking import get_spot_display_name
# Email configuration (from environment variables)
SMTP_HOST = os.getenv("SMTP_HOST", "localhost")
SMTP_PORT = int(os.getenv("SMTP_PORT", "587"))
SMTP_USER = os.getenv("SMTP_USER", "")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "")
SMTP_FROM = os.getenv("SMTP_FROM", "noreply@parkingmanager.local")
SMTP_USE_TLS = os.getenv("SMTP_USE_TLS", "true").lower() == "true"
def send_email(to_email: str, subject: str, body_html: str, body_text: str = None):
"""Send an email"""
if not SMTP_USER or not SMTP_PASSWORD:
print(f"[NOTIFICATION] Email not configured. Would send to {to_email}: {subject}")
return False
try:
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = SMTP_FROM
msg["To"] = to_email
if body_text:
msg.attach(MIMEText(body_text, "plain"))
msg.attach(MIMEText(body_html, "html"))
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
if SMTP_USE_TLS:
server.starttls()
server.login(SMTP_USER, SMTP_PASSWORD)
server.sendmail(SMTP_FROM, to_email, msg.as_string())
print(f"[NOTIFICATION] Email sent to {to_email}: {subject}")
return True
except Exception as e:
print(f"[NOTIFICATION] Failed to send email to {to_email}: {e}")
return False
def get_week_dates(reference_date: datetime):
"""Get Monday-Sunday dates for the week containing reference_date"""
# Find Monday of this week
monday = reference_date - timedelta(days=reference_date.weekday())
return [monday + timedelta(days=i) for i in range(7)]
def get_next_week_dates(reference_date: datetime):
"""Get Monday-Sunday dates for the week after reference_date"""
# Find Monday of next week
days_until_next_monday = 7 - reference_date.weekday()
next_monday = reference_date + timedelta(days=days_until_next_monday)
return [next_monday + timedelta(days=i) for i in range(7)]
def check_week_presence_compiled(user_id: str, week_dates: list, db: Session) -> bool:
"""Check if user has filled presence for all working days in a week"""
date_strs = [d.strftime("%Y-%m-%d") for d in week_dates]
presences = db.query(UserPresence).filter(
UserPresence.user_id == user_id,
UserPresence.date.in_(date_strs)
).all()
# Consider week compiled if at least 5 days have presence marked
# (allowing for weekends or holidays)
return len(presences) >= 5
def get_week_reference(date: datetime) -> str:
"""Get ISO week reference string (e.g., 2024-W48)"""
return date.strftime("%Y-W%W")
def send_presence_reminder(user: User, next_week_dates: list, db: Session) -> bool:
"""Send presence compilation reminder for next week"""
week_ref = get_week_reference(next_week_dates[0])
# Check if already sent today for this week
today = datetime.now().strftime("%Y-%m-%d")
existing = db.query(NotificationLog).filter(
NotificationLog.user_id == user.id,
NotificationLog.notification_type == "presence_reminder",
NotificationLog.reference_date == week_ref,
NotificationLog.sent_at >= today
).first()
if existing:
return False # Already sent today
# Check if week is compiled
if check_week_presence_compiled(user.id, next_week_dates, db):
return False # Already compiled
# Send reminder
start_date = next_week_dates[0].strftime("%B %d")
end_date = next_week_dates[-1].strftime("%B %d, %Y")
subject = f"Reminder: Please fill your presence for {start_date} - {end_date}"
body_html = f"""
<html>
<body>
<h2>Presence Reminder</h2>
<p>Hi {user.name},</p>
<p>This is a friendly reminder to fill your presence for the upcoming week
({start_date} - {end_date}).</p>
<p>Please log in to the Parking Manager to mark your presence.</p>
<p>Best regards,<br>Parking Manager</p>
</body>
</html>
"""
if send_email(user.email, subject, body_html):
# Log the notification
log = NotificationLog(
id=str(uuid.uuid4()),
user_id=user.id,
notification_type="presence_reminder",
reference_date=week_ref,
sent_at=datetime.now().isoformat()
)
db.add(log)
db.commit()
return True
return False
def send_weekly_parking_summary(user: User, next_week_dates: list, db: Session) -> bool:
"""Send weekly parking assignment summary for next week (Friday at 12)"""
if not user.notify_weekly_parking:
return False
week_ref = get_week_reference(next_week_dates[0])
# Check if already sent for this week
existing = db.query(NotificationLog).filter(
NotificationLog.user_id == user.id,
NotificationLog.notification_type == "weekly_parking",
NotificationLog.reference_date == week_ref
).first()
if existing:
return False
# Get parking assignments for next week
date_strs = [d.strftime("%Y-%m-%d") for d in next_week_dates]
assignments = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.user_id == user.id,
DailyParkingAssignment.date.in_(date_strs)
).all()
if not assignments:
return False # No assignments, no need to notify
# Build assignment list
assignment_lines = []
for a in sorted(assignments, key=lambda x: x.date):
date_obj = datetime.strptime(a.date, "%Y-%m-%d")
day_name = date_obj.strftime("%A")
spot_name = get_spot_display_name(a.spot_id, a.manager_id, db)
assignment_lines.append(f"<li>{day_name}, {date_obj.strftime('%B %d')}: Spot {spot_name}</li>")
start_date = next_week_dates[0].strftime("%B %d")
end_date = next_week_dates[-1].strftime("%B %d, %Y")
subject = f"Your parking spots for {start_date} - {end_date}"
body_html = f"""
<html>
<body>
<h2>Weekly Parking Summary</h2>
<p>Hi {user.name},</p>
<p>Here are your parking spot assignments for the upcoming week:</p>
<ul>
{''.join(assignment_lines)}
</ul>
<p>Parking assignments are now frozen. You can still release or reassign your spots if needed.</p>
<p>Best regards,<br>Parking Manager</p>
</body>
</html>
"""
if send_email(user.email, subject, body_html):
log = NotificationLog(
id=str(uuid.uuid4()),
user_id=user.id,
notification_type="weekly_parking",
reference_date=week_ref,
sent_at=datetime.now().isoformat()
)
db.add(log)
db.commit()
return True
return False
def send_daily_parking_reminder(user: User, date: datetime, db: Session) -> bool:
"""Send daily parking reminder for a specific date"""
if not user.notify_daily_parking:
return False
date_str = date.strftime("%Y-%m-%d")
# Check if already sent for this date
existing = db.query(NotificationLog).filter(
NotificationLog.user_id == user.id,
NotificationLog.notification_type == "daily_parking",
NotificationLog.reference_date == date_str
).first()
if existing:
return False
# Get parking assignment for this date
assignment = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.user_id == user.id,
DailyParkingAssignment.date == date_str
).first()
if not assignment:
return False # No assignment today
spot_name = get_spot_display_name(assignment.spot_id, assignment.manager_id, db)
day_name = date.strftime("%A, %B %d")
subject = f"Parking reminder for {day_name}"
body_html = f"""
<html>
<body>
<h2>Daily Parking Reminder</h2>
<p>Hi {user.name},</p>
<p>You have a parking spot assigned for today ({day_name}):</p>
<p style="font-size: 18px; font-weight: bold;">Spot {spot_name}</p>
<p>Best regards,<br>Parking Manager</p>
</body>
</html>
"""
if send_email(user.email, subject, body_html):
log = NotificationLog(
id=str(uuid.uuid4()),
user_id=user.id,
notification_type="daily_parking",
reference_date=date_str,
sent_at=datetime.now().isoformat()
)
db.add(log)
db.commit()
return True
return False
def queue_parking_change_notification(
user: User,
date: str,
change_type: str, # "assigned", "released", "reassigned"
spot_name: str,
new_user_name: str = None,
db: Session = None
):
"""Queue an immediate notification for a parking assignment change"""
if not user.notify_parking_changes:
return
date_obj = datetime.strptime(date, "%Y-%m-%d")
day_name = date_obj.strftime("%A, %B %d")
if change_type == "assigned":
subject = f"Parking spot assigned for {day_name}"
body = f"""
<html>
<body>
<h2>Parking Spot Assigned</h2>
<p>Hi {user.name},</p>
<p>You have been assigned a parking spot for {day_name}:</p>
<p style="font-size: 18px; font-weight: bold;">Spot {spot_name}</p>
<p>Best regards,<br>Parking Manager</p>
</body>
</html>
"""
elif change_type == "released":
subject = f"Parking spot released for {day_name}"
body = f"""
<html>
<body>
<h2>Parking Spot Released</h2>
<p>Hi {user.name},</p>
<p>Your parking spot (Spot {spot_name}) for {day_name} has been released.</p>
<p>Best regards,<br>Parking Manager</p>
</body>
</html>
"""
elif change_type == "reassigned":
subject = f"Parking spot reassigned for {day_name}"
body = f"""
<html>
<body>
<h2>Parking Spot Reassigned</h2>
<p>Hi {user.name},</p>
<p>Your parking spot (Spot {spot_name}) for {day_name} has been reassigned to {new_user_name}.</p>
<p>Best regards,<br>Parking Manager</p>
</body>
</html>
"""
else:
return
# Add to queue
notification = NotificationQueue(
id=str(uuid.uuid4()),
user_id=user.id,
notification_type="parking_change",
subject=subject,
body=body,
created_at=datetime.now().isoformat()
)
db.add(notification)
db.commit()
def process_notification_queue(db: Session):
"""Process and send all pending notifications in the queue"""
pending = db.query(NotificationQueue).filter(
NotificationQueue.sent_at.is_(None)
).all()
for notification in pending:
user = db.query(User).filter(User.id == notification.user_id).first()
if user and send_email(user.email, notification.subject, notification.body):
notification.sent_at = datetime.now().isoformat()
db.commit()
def run_scheduled_notifications(db: Session):
"""Run all scheduled notifications - called by a scheduler/cron job"""
now = datetime.now()
today = now.date()
current_hour = now.hour
current_minute = now.minute
current_weekday = now.weekday() # 0=Monday, 6=Sunday
users = db.query(User).all()
for user in users:
# Thursday at 12: Presence reminder (unmanageable)
if current_weekday == 3 and current_hour == 12 and current_minute < 5:
next_week = get_next_week_dates(now)
send_presence_reminder(user, next_week, db)
# Friday at 12: Weekly parking summary
if current_weekday == 4 and current_hour == 12 and current_minute < 5:
next_week = get_next_week_dates(now)
send_weekly_parking_summary(user, next_week, db)
# Daily parking reminder at user's preferred time (working days only)
if current_weekday < 5: # Monday to Friday
user_hour = user.notify_daily_parking_hour or 8
user_minute = user.notify_daily_parking_minute or 0
if current_hour == user_hour and abs(current_minute - user_minute) < 5:
send_daily_parking_reminder(user, now, db)
# Process queued notifications
process_notification_queue(db)

343
services/parking.py Normal file
View File

@@ -0,0 +1,343 @@
"""
Parking Assignment Service
Manager-centric parking spot management with fairness algorithm
Key concepts:
- Managers own parking spots (defined by manager_parking_quota)
- Each manager has a spot prefix (A, B, C...) for display names
- Spots are named like A1, A2, B1, B2 based on manager prefix
- Fairness: users with lowest parking_days/office_days ratio get priority
"""
import uuid
from datetime import datetime, timezone
from sqlalchemy.orm import Session
from sqlalchemy import func
from database.models import (
DailyParkingAssignment, User, OfficeMembership, UserPresence,
ParkingGuarantee, ParkingExclusion, ManagerClosingDay, ManagerWeeklyClosingDay
)
def get_manager_for_office(office_id: str, db: Session) -> User | None:
"""Find the manager responsible for an office"""
membership = db.query(OfficeMembership).filter(
OfficeMembership.office_id == office_id
).first()
if not membership:
return None
return db.query(User).filter(User.id == membership.user_id).first()
def get_spot_prefix(manager: User, db: Session) -> str:
"""Get the spot prefix for a manager (from manager_spot_prefix or auto-assign)"""
if manager.manager_spot_prefix:
return manager.manager_spot_prefix
# Auto-assign based on alphabetical order of managers without prefix
managers = db.query(User).filter(
User.role == "manager",
User.manager_spot_prefix == None
).order_by(User.name).all()
# Find existing prefixes
existing_prefixes = set(
m.manager_spot_prefix for m in db.query(User).filter(
User.role == "manager",
User.manager_spot_prefix != None
).all()
)
# Find first available letter
manager_index = next((i for i, m in enumerate(managers) if m.id == manager.id), 0)
letter = 'A'
count = 0
while letter in existing_prefixes or count < manager_index:
if letter not in existing_prefixes:
count += 1
letter = chr(ord(letter) + 1)
if ord(letter) > ord('Z'):
letter = 'A'
break
return letter
def get_spot_display_name(spot_id: str, manager_id: str, db: Session) -> str:
"""Get display name for a spot (e.g., 'A3' instead of 'spot-3')"""
manager = db.query(User).filter(User.id == manager_id).first()
if not manager:
return spot_id
prefix = get_spot_prefix(manager, db)
spot_number = spot_id.replace("spot-", "")
return f"{prefix}{spot_number}"
def is_closing_day(manager_id: str, date: str, db: Session) -> bool:
"""
Check if date is a closing day for this manager.
Checks both specific closing days and weekly recurring closing days.
"""
# Check specific closing day
specific = db.query(ManagerClosingDay).filter(
ManagerClosingDay.manager_id == manager_id,
ManagerClosingDay.date == date
).first()
if specific:
return True
# Check weekly closing day
date_obj = datetime.strptime(date, "%Y-%m-%d")
weekday = date_obj.weekday() # 0=Monday in Python
# Convert to our format: 0=Sunday, 1=Monday, ..., 6=Saturday
weekday_sunday_start = (weekday + 1) % 7
weekly = db.query(ManagerWeeklyClosingDay).filter(
ManagerWeeklyClosingDay.manager_id == manager_id,
ManagerWeeklyClosingDay.weekday == weekday_sunday_start
).first()
return weekly is not None
def initialize_parking_pool(manager_id: str, quota: int, date: str, db: Session) -> int:
"""Initialize empty parking spots for a manager's pool on a given date.
Returns 0 if it's a closing day (no parking available).
"""
# Don't create pool on closing days
if is_closing_day(manager_id, date, db):
return 0
existing = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.manager_id == manager_id,
DailyParkingAssignment.date == date
).count()
if existing > 0:
return existing
for i in range(1, quota + 1):
spot = DailyParkingAssignment(
id=str(uuid.uuid4()),
date=date,
spot_id=f"spot-{i}",
user_id=None,
manager_id=manager_id,
created_at=datetime.now(timezone.utc).isoformat()
)
db.add(spot)
db.commit()
return quota
def get_user_parking_ratio(user_id: str, manager_id: str, db: Session) -> float:
"""
Calculate user's parking ratio: parking_days / office_days
Lower ratio = higher priority for next parking spot
"""
# Get offices managed by this manager
managed_office_ids = [
m.office_id for m in db.query(OfficeMembership).filter(
OfficeMembership.user_id == manager_id
).all()
]
# Count days user was present (office_days)
office_days = db.query(UserPresence).filter(
UserPresence.user_id == user_id,
UserPresence.status == "present"
).count()
if office_days == 0:
return 0.0 # New user, highest priority
# Count days user got parking
parking_days = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.user_id == user_id,
DailyParkingAssignment.manager_id == manager_id
).count()
return parking_days / office_days
def is_user_excluded(user_id: str, manager_id: str, date: str, db: Session) -> bool:
"""Check if user is excluded from parking for this date"""
exclusion = db.query(ParkingExclusion).filter(
ParkingExclusion.manager_id == manager_id,
ParkingExclusion.user_id == user_id
).first()
if not exclusion:
return False
# Check date range
if exclusion.start_date and date < exclusion.start_date:
return False
if exclusion.end_date and date > exclusion.end_date:
return False
return True
def has_guarantee(user_id: str, manager_id: str, date: str, db: Session) -> bool:
"""Check if user has a parking guarantee for this date"""
guarantee = db.query(ParkingGuarantee).filter(
ParkingGuarantee.manager_id == manager_id,
ParkingGuarantee.user_id == user_id
).first()
if not guarantee:
return False
# Check date range
if guarantee.start_date and date < guarantee.start_date:
return False
if guarantee.end_date and date > guarantee.end_date:
return False
return True
def get_users_wanting_parking(manager_id: str, date: str, db: Session) -> list[dict]:
"""
Get all users who want parking for this date, sorted by fairness priority.
Returns list of {user_id, has_guarantee, ratio}
"""
# Get offices managed by this manager
managed_office_ids = [
m.office_id for m in db.query(OfficeMembership).filter(
OfficeMembership.user_id == manager_id
).all()
]
# Get users who marked "present" for this date and belong to managed offices
present_users = db.query(UserPresence).join(User).filter(
UserPresence.date == date,
UserPresence.status == "present",
User.office_id.in_(managed_office_ids)
).all()
candidates = []
for presence in present_users:
user_id = presence.user_id
# Skip excluded users
if is_user_excluded(user_id, manager_id, date, db):
continue
# Skip users who already have a spot
existing = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.date == date,
DailyParkingAssignment.user_id == user_id
).first()
if existing:
continue
candidates.append({
"user_id": user_id,
"has_guarantee": has_guarantee(user_id, manager_id, date, db),
"ratio": get_user_parking_ratio(user_id, manager_id, db)
})
# Sort: guaranteed users first, then by ratio (lowest first for fairness)
candidates.sort(key=lambda x: (not x["has_guarantee"], x["ratio"]))
return candidates
def assign_parking_fairly(manager_id: str, date: str, db: Session) -> dict:
"""
Assign parking spots fairly based on parking ratio.
Called after presence is set for a date.
Returns {assigned: [...], waitlist: [...]}
"""
manager = db.query(User).filter(User.id == manager_id).first()
if not manager or not manager.manager_parking_quota:
return {"assigned": [], "waitlist": []}
# No parking on closing days
if is_closing_day(manager_id, date, db):
return {"assigned": [], "waitlist": [], "closed": True}
# Initialize pool
initialize_parking_pool(manager_id, manager.manager_parking_quota, date, db)
# Get candidates sorted by fairness
candidates = get_users_wanting_parking(manager_id, date, db)
# Get available spots
free_spots = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.manager_id == manager_id,
DailyParkingAssignment.date == date,
DailyParkingAssignment.user_id == None
).all()
assigned = []
waitlist = []
for candidate in candidates:
if free_spots:
spot = free_spots.pop(0)
spot.user_id = candidate["user_id"]
assigned.append(candidate["user_id"])
else:
waitlist.append(candidate["user_id"])
db.commit()
return {"assigned": assigned, "waitlist": waitlist}
def release_user_spot(manager_id: str, user_id: str, date: str, db: Session) -> bool:
"""Release a user's parking spot and reassign to next in fairness queue"""
assignment = db.query(DailyParkingAssignment).filter(
DailyParkingAssignment.manager_id == manager_id,
DailyParkingAssignment.date == date,
DailyParkingAssignment.user_id == user_id
).first()
if not assignment:
return False
# Release the spot
assignment.user_id = None
db.commit()
# Try to assign to next user in fairness queue
candidates = get_users_wanting_parking(manager_id, date, db)
if candidates:
assignment.user_id = candidates[0]["user_id"]
db.commit()
return True
def handle_presence_change(user_id: str, date: str, old_status: str, new_status: str, office_id: str, db: Session):
"""
Handle presence status change and update parking accordingly.
Uses fairness algorithm for assignment.
"""
# Don't process past dates
target_date = datetime.strptime(date, "%Y-%m-%d").date()
if target_date < datetime.now().date():
return
# Find manager for this office
manager = get_manager_for_office(office_id, db)
if not manager or not manager.manager_parking_quota:
return
# Initialize pool if needed
initialize_parking_pool(manager.id, manager.manager_parking_quota, date, db)
if old_status == "present" and new_status in ["remote", "absent"]:
# User no longer coming - release their spot (will auto-reassign)
release_user_spot(manager.id, user_id, date, db)
elif new_status == "present":
# User coming in - run fair assignment for this date
assign_parking_fairly(manager.id, date, db)