Initialize Sneaky Klaus project with: - uv package management and pyproject.toml - Flask application structure (app.py, config.py) - SQLAlchemy models for Admin and Exchange - Alembic database migrations - Pre-commit hooks configuration - Development tooling (pytest, ruff, mypy) Initial structure follows design documents in docs/: - src/app.py: Application factory with Flask extensions - src/config.py: Environment-based configuration - src/models/: Admin and Exchange models - migrations/: Alembic migration setup 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
28 KiB
Background Jobs Component Design - v0.1.0
Version: 0.1.0 Date: 2025-12-22 Status: Initial Design
Introduction
This document defines the background job scheduling system for Sneaky Klaus. The system uses APScheduler to handle recurring tasks like reminder emails, automatic exchange completion, data purging, and cleanup operations.
Requirements
Functional Requirements
- Reminder Scheduling: Send reminder emails at configured intervals before exchange date
- Auto-Completion: Automatically mark exchanges as complete after exchange date
- Data Purging: Delete exchange data 30 days after completion
- Token Cleanup: Purge expired authentication tokens
- Session Cleanup: Remove expired sessions
- Rate Limit Cleanup: Clear expired rate limit entries
Non-Functional Requirements
- Reliability: Jobs must execute even if application restarts
- Timezone Awareness: Respect exchange-specific timezones
- Error Handling: Gracefully handle failures without crashing scheduler
- Observability: Log all job executions for debugging
- Performance: Jobs should not block application requests
- Testability: Jobs must be unit-testable
Architecture Overview
flowchart TB
subgraph "Application Startup"
Init[Application Init]
Scheduler[Initialize APScheduler]
Jobs[Register Jobs]
end
subgraph "APScheduler"
JobStore[(SQLAlchemy JobStore)]
Executor[ThreadPool Executor]
Triggers[Cron & Interval Triggers]
end
subgraph "Job Definitions"
Reminders[Reminder Emails]
AutoComplete[Auto-Complete Exchanges]
DataPurge[Data Purge]
TokenCleanup[Token Cleanup]
SessionCleanup[Session Cleanup]
RateLimitCleanup[Rate Limit Cleanup]
end
subgraph "Services"
NotificationService[Notification Service]
ExchangeService[Exchange Service]
DB[(Database)]
end
Init --> Scheduler
Scheduler --> JobStore
Scheduler --> Executor
Scheduler --> Triggers
Scheduler --> Jobs
Jobs --> Reminders
Jobs --> AutoComplete
Jobs --> DataPurge
Jobs --> TokenCleanup
Jobs --> SessionCleanup
Jobs --> RateLimitCleanup
Reminders --> NotificationService
AutoComplete --> ExchangeService
DataPurge --> ExchangeService
TokenCleanup --> DB
SessionCleanup --> DB
RateLimitCleanup --> DB
NotificationService --> DB
ExchangeService --> DB
APScheduler Configuration
Scheduler Setup
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
from pytz import utc
def create_scheduler(app):
"""
Initialize and configure APScheduler.
Args:
app: Flask application instance
Returns:
Configured BackgroundScheduler
"""
jobstores = {
'default': SQLAlchemyJobStore(url=app.config['DATABASE_URL'])
}
executors = {
'default': ThreadPoolExecutor(max_workers=4)
}
job_defaults = {
'coalesce': True, # Combine missed executions into one
'max_instances': 1, # Prevent concurrent runs of same job
'misfire_grace_time': 300 # 5 minutes grace for missed jobs
}
scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone=utc # All jobs in UTC, converted per-exchange
)
return scheduler
Configuration Options
| Option | Value | Rationale |
|---|---|---|
coalesce |
True | Prevents job queue buildup if app offline |
max_instances |
1 | Prevents concurrent execution of same job |
misfire_grace_time |
300 seconds | Allows up to 5 minutes late execution |
timezone |
UTC | Consistent base timezone, converted per-exchange |
max_workers |
4 | Sufficient for low-volume background tasks |
Job Store Persistence
Jobs stored in SQLite database:
- Table:
apscheduler_jobs - Persistence: Jobs survive application restarts
- Cleanup: Old job runs automatically removed
Job Definitions
1. Reminder Email Job
Purpose: Send reminder emails to participants before exchange date
Schedule: Dynamic per exchange based on admin configuration
Trigger Type: Cron (specific date/time)
Implementation:
def schedule_reminder_emails(exchange_id: int, scheduler: BackgroundScheduler):
"""
Schedule reminder emails for an exchange.
Called when exchange is matched or reminder preferences updated.
Args:
exchange_id: Exchange ID
scheduler: APScheduler instance
"""
from services.notification_service import NotificationService
exchange = Exchange.query.get(exchange_id)
if not exchange or exchange.state != ExchangeState.MATCHED:
return
# Get reminder intervals (e.g., [7, 3, 1] days before)
reminder_days = get_reminder_intervals(exchange_id) # Admin configurable
# Convert exchange date to exchange timezone
exchange_tz = pytz.timezone(exchange.timezone)
exchange_datetime = exchange.exchange_date.replace(tzinfo=pytz.utc).astimezone(exchange_tz)
for days_before in reminder_days:
reminder_datetime = exchange_datetime - timedelta(days=days_before)
# Skip if reminder time is in the past
if reminder_datetime <= datetime.now(exchange_tz):
continue
# Schedule job
job_id = f"reminder_exchange_{exchange_id}_days_{days_before}"
scheduler.add_job(
func=send_reminder_for_exchange,
args=[exchange_id],
trigger='date',
run_date=reminder_datetime,
id=job_id,
replace_existing=True,
misfire_grace_time=3600 # 1 hour grace
)
logger.info(f"Scheduled reminder for exchange {exchange_id} at {reminder_datetime}")
def send_reminder_for_exchange(exchange_id: int):
"""
Job function: Send reminder emails to all opted-in participants.
Args:
exchange_id: Exchange ID
"""
logger.info(f"Executing reminder job for exchange {exchange_id}")
try:
# Get exchange and validate state
exchange = Exchange.query.get(exchange_id)
if not exchange or exchange.state != ExchangeState.MATCHED:
logger.warning(f"Skipping reminder for exchange {exchange_id}: invalid state")
return
# Get opted-in participants
participants = Participant.query.filter_by(
exchange_id=exchange_id,
reminder_enabled=True,
withdrawn_at=None
).all()
if not participants:
logger.info(f"No participants opted in for reminders in exchange {exchange_id}")
return
# Send reminders
notification_service = NotificationService()
success_count = 0
failure_count = 0
for participant in participants:
result = notification_service.send_reminder_email(participant.id)
if result.success:
success_count += 1
else:
failure_count += 1
logger.error(f"Failed to send reminder to participant {participant.id}: {result.error}")
logger.info(f"Reminder job complete for exchange {exchange_id}: {success_count} sent, {failure_count} failed")
except Exception as e:
logger.error(f"Reminder job failed for exchange {exchange_id}: {str(e)}")
raise # Re-raise to mark job as failed
Default Reminder Intervals: 7 days, 3 days, 1 day before exchange (admin configurable)
2. Auto-Complete Exchange Job
Purpose: Automatically mark exchanges as complete after exchange date passes
Schedule: Daily at midnight UTC
Trigger Type: Cron
Implementation:
def register_auto_complete_job(scheduler: BackgroundScheduler):
"""
Register job to auto-complete exchanges.
Runs daily at midnight UTC.
"""
scheduler.add_job(
func=auto_complete_exchanges,
trigger='cron',
hour=0,
minute=0,
id='auto_complete_exchanges',
replace_existing=True
)
def auto_complete_exchanges():
"""
Job function: Mark exchanges as complete if exchange date has passed.
"""
logger.info("Executing auto-complete exchanges job")
try:
# Find exchanges in "matched" state with exchange date in the past
now = datetime.utcnow()
exchanges = Exchange.query.filter(
Exchange.state == ExchangeState.MATCHED,
Exchange.exchange_date <= now
).all()
if not exchanges:
logger.info("No exchanges to auto-complete")
return
completed_count = 0
for exchange in exchanges:
try:
# Update state
exchange.state = ExchangeState.COMPLETED
exchange.completed_at = now
db.session.add(exchange)
db.session.commit()
completed_count += 1
logger.info(f"Auto-completed exchange {exchange.id}: {exchange.name}")
except Exception as e:
db.session.rollback()
logger.error(f"Failed to auto-complete exchange {exchange.id}: {str(e)}")
logger.info(f"Auto-complete job finished: {completed_count} exchanges completed")
except Exception as e:
logger.error(f"Auto-complete job failed: {str(e)}")
raise
3. Data Purge Job
Purpose: Delete exchange data 30 days after completion
Schedule: Daily at 2:00 AM UTC
Trigger Type: Cron
Implementation:
def register_data_purge_job(scheduler: BackgroundScheduler):
"""
Register job to purge old exchange data.
Runs daily at 2:00 AM UTC.
"""
scheduler.add_job(
func=purge_old_exchanges,
trigger='cron',
hour=2,
minute=0,
id='purge_old_exchanges',
replace_existing=True
)
def purge_old_exchanges():
"""
Job function: Delete exchanges completed more than 30 days ago.
Sends warning email to admin 7 days before purge.
"""
logger.info("Executing data purge job")
try:
now = datetime.utcnow()
purge_threshold = now - timedelta(days=30)
warning_threshold = now + timedelta(days=7) # Warn 7 days before
# Find exchanges eligible for purge
exchanges_to_purge = Exchange.query.filter(
Exchange.state == ExchangeState.COMPLETED,
Exchange.completed_at <= purge_threshold
).all()
# Find exchanges to warn about
exchanges_to_warn = Exchange.query.filter(
Exchange.state == ExchangeState.COMPLETED,
Exchange.completed_at <= purge_threshold - timedelta(days=7), # 23 days after completion
Exchange.completed_at > purge_threshold # Not yet eligible for purge
).all()
# Send warning emails
notification_service = NotificationService()
for exchange in exchanges_to_warn:
try:
notification_service.send_data_purge_warning(exchange.id)
logger.info(f"Sent purge warning for exchange {exchange.id}")
except Exception as e:
logger.error(f"Failed to send purge warning for exchange {exchange.id}: {str(e)}")
# Purge old exchanges
purged_count = 0
for exchange in exchanges_to_purge:
try:
# Delete exchange (cascades to participants, matches, exclusions, etc.)
db.session.delete(exchange)
db.session.commit()
purged_count += 1
logger.info(f"Purged exchange {exchange.id}: {exchange.name}")
except Exception as e:
db.session.rollback()
logger.error(f"Failed to purge exchange {exchange.id}: {str(e)}")
logger.info(f"Data purge job finished: {purged_count} exchanges purged, {len(exchanges_to_warn)} warnings sent")
except Exception as e:
logger.error(f"Data purge job failed: {str(e)}")
raise
Retention Policy: 30 days after completion Warning Period: 7 days before purge
4. Token Cleanup Job
Purpose: Purge expired or used authentication tokens
Schedule: Hourly
Trigger Type: Interval
Implementation:
def register_token_cleanup_job(scheduler: BackgroundScheduler):
"""
Register job to clean up expired tokens.
Runs every hour.
"""
scheduler.add_job(
func=cleanup_expired_tokens,
trigger='interval',
hours=1,
id='cleanup_expired_tokens',
replace_existing=True
)
def cleanup_expired_tokens():
"""
Job function: Delete expired or used authentication tokens.
"""
logger.info("Executing token cleanup job")
try:
now = datetime.utcnow()
# Delete expired tokens
expired_tokens = MagicToken.query.filter(
MagicToken.expires_at <= now
).delete()
# Delete used tokens older than 24 hours
old_used_tokens = MagicToken.query.filter(
MagicToken.used_at.isnot(None),
MagicToken.used_at <= now - timedelta(hours=24)
).delete()
db.session.commit()
total_deleted = expired_tokens + old_used_tokens
logger.info(f"Token cleanup job finished: {total_deleted} tokens deleted")
except Exception as e:
db.session.rollback()
logger.error(f"Token cleanup job failed: {str(e)}")
raise
5. Session Cleanup Job
Purpose: Remove expired sessions from database
Schedule: Daily at 3:00 AM UTC
Trigger Type: Cron
Implementation:
def register_session_cleanup_job(scheduler: BackgroundScheduler):
"""
Register job to clean up expired sessions.
Runs daily at 3:00 AM UTC.
"""
scheduler.add_job(
func=cleanup_expired_sessions,
trigger='cron',
hour=3,
minute=0,
id='cleanup_expired_sessions',
replace_existing=True
)
def cleanup_expired_sessions():
"""
Job function: Delete expired sessions.
"""
logger.info("Executing session cleanup job")
try:
now = datetime.utcnow()
deleted_count = Session.query.filter(
Session.expires_at <= now
).delete()
db.session.commit()
logger.info(f"Session cleanup job finished: {deleted_count} sessions deleted")
except Exception as e:
db.session.rollback()
logger.error(f"Session cleanup job failed: {str(e)}")
raise
6. Rate Limit Cleanup Job
Purpose: Clear expired rate limit entries
Schedule: Daily at 4:00 AM UTC
Trigger Type: Cron
Implementation:
def register_rate_limit_cleanup_job(scheduler: BackgroundScheduler):
"""
Register job to clean up expired rate limit entries.
Runs daily at 4:00 AM UTC.
"""
scheduler.add_job(
func=cleanup_expired_rate_limits,
trigger='cron',
hour=4,
minute=0,
id='cleanup_expired_rate_limits',
replace_existing=True
)
def cleanup_expired_rate_limits():
"""
Job function: Delete expired rate limit entries.
"""
logger.info("Executing rate limit cleanup job")
try:
now = datetime.utcnow()
deleted_count = RateLimit.query.filter(
RateLimit.expires_at <= now
).delete()
db.session.commit()
logger.info(f"Rate limit cleanup job finished: {deleted_count} entries deleted")
except Exception as e:
db.session.rollback()
logger.error(f"Rate limit cleanup job failed: {str(e)}")
raise
Job Registration
Application Initialization
# In app.py or scheduler_service.py
def initialize_scheduler(app):
"""
Initialize scheduler and register all jobs.
Called during application startup.
"""
scheduler = create_scheduler(app)
# Register recurring jobs
register_auto_complete_job(scheduler)
register_data_purge_job(scheduler)
register_token_cleanup_job(scheduler)
register_session_cleanup_job(scheduler)
register_rate_limit_cleanup_job(scheduler)
# Start scheduler
scheduler.start()
logger.info("Background scheduler started successfully")
# Shutdown scheduler on app exit
import atexit
atexit.register(lambda: scheduler.shutdown())
return scheduler
# In Flask app factory
def create_app():
app = Flask(__name__)
# ... other initialization
# Initialize scheduler
with app.app_context():
scheduler = initialize_scheduler(app)
app.scheduler = scheduler
return app
Dynamic Job Scheduling
For reminder emails, jobs are scheduled dynamically when exchange is matched:
# In matching service after successful matching
def complete_matching_workflow(exchange_id: int):
# ... matching logic
if matching_result.success:
# Schedule reminder emails
from flask import current_app
schedule_reminder_emails(exchange_id, current_app.scheduler)
return result
Re-scheduling: When admin updates reminder preferences, cancel old jobs and reschedule:
def update_reminder_preferences(exchange_id: int, new_intervals: list[int]):
"""
Update reminder intervals and reschedule jobs.
"""
from flask import current_app
# Remove existing reminder jobs
job_prefix = f"reminder_exchange_{exchange_id}_"
for job in current_app.scheduler.get_jobs():
if job.id.startswith(job_prefix):
job.remove()
# Schedule new reminders
schedule_reminder_emails(exchange_id, current_app.scheduler)
Error Handling & Retries
Job Error Handling
All job functions wrapped with error handling:
def safe_job_wrapper(job_func):
"""
Decorator to wrap job functions with error handling.
Args:
job_func: Job function to wrap
Returns:
Wrapped function
"""
@functools.wraps(job_func)
def wrapper(*args, **kwargs):
try:
return job_func(*args, **kwargs)
except Exception as e:
logger.error(f"Job {job_func.__name__} failed: {str(e)}", exc_info=True)
# Don't re-raise unless we want APScheduler to mark as failed
# For now, log and swallow to prevent scheduler issues
return wrapper
Retry Configuration
APScheduler doesn't have built-in retry logic. Implement custom retry for critical jobs:
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=60))
def send_reminder_with_retry(participant_id: int):
"""Send reminder with retry logic."""
notification_service = NotificationService()
result = notification_service.send_reminder_email(participant_id)
if not result.success:
raise Exception(f"Failed to send reminder: {result.error}")
return result
Misfire Handling
Misfire: Job missed scheduled time (e.g., application was down)
APScheduler Behavior:
- If
coalesce=True: Run once immediately upon restart - If
misfire_grace_timeexceeded: Skip job - For reminder emails: 1-hour grace time is acceptable
Example: Exchange is at 6 PM, reminder scheduled for noon. If app is down until 12:30 PM, reminder still sends (within 1-hour grace).
Monitoring & Logging
Job Execution Logging
import logging
logger = logging.getLogger('background_jobs')
# Log format
logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
level=logging.INFO
)
# Example log entries
logger.info("Executing reminder job for exchange 123")
logger.error("Failed to send reminder to participant 45: SMTP error")
logger.warning("Skipping reminder for exchange 67: invalid state")
Job Status Tracking (Optional Enhancement)
For production monitoring, create job execution log table:
class JobExecution(db.Model):
"""Track job execution history."""
id = db.Column(db.Integer, primary_key=True)
job_id = db.Column(db.String(255), nullable=False)
job_type = db.Column(db.String(50), nullable=False)
started_at = db.Column(db.DateTime, default=datetime.utcnow)
completed_at = db.Column(db.DateTime, nullable=True)
success = db.Column(db.Boolean, nullable=True)
error_message = db.Column(db.Text, nullable=True)
Log job execution:
def log_job_execution(job_id: str, job_type: str):
"""Context manager to log job execution."""
from contextlib import contextmanager
@contextmanager
def _log():
execution = JobExecution(job_id=job_id, job_type=job_type)
db.session.add(execution)
db.session.commit()
try:
yield execution
execution.completed_at = datetime.utcnow()
execution.success = True
db.session.commit()
except Exception as e:
execution.completed_at = datetime.utcnow()
execution.success = False
execution.error_message = str(e)
db.session.commit()
raise
return _log()
Testing Strategy
Unit Tests
class TestBackgroundJobs(unittest.TestCase):
def test_auto_complete_exchanges(self):
"""Test auto-complete logic."""
# Setup: Create exchange in matched state with past exchange date
exchange = create_test_exchange(
state=ExchangeState.MATCHED,
exchange_date=datetime.utcnow() - timedelta(days=1)
)
# Execute
auto_complete_exchanges()
# Assert
exchange = Exchange.query.get(exchange.id)
self.assertEqual(exchange.state, ExchangeState.COMPLETED)
self.assertIsNotNone(exchange.completed_at)
def test_token_cleanup(self):
"""Test token cleanup removes expired tokens."""
# Setup: Create expired token
token = create_test_magic_token(
expires_at=datetime.utcnow() - timedelta(hours=1)
)
# Execute
cleanup_expired_tokens()
# Assert
token = MagicToken.query.get(token.id)
self.assertIsNone(token)
def test_reminder_scheduling(self):
"""Test reminder jobs are scheduled correctly."""
from flask import current_app
exchange = create_matched_exchange()
# Execute
schedule_reminder_emails(exchange.id, current_app.scheduler)
# Assert
jobs = current_app.scheduler.get_jobs()
reminder_jobs = [j for j in jobs if j.id.startswith(f"reminder_exchange_{exchange.id}")]
# Assuming default intervals: 7, 3, 1 days
self.assertEqual(len(reminder_jobs), 3)
Integration Tests
class TestSchedulerIntegration(TestCase):
def test_scheduler_initialization(self):
"""Test scheduler starts and jobs are registered."""
from flask import current_app
scheduler = current_app.scheduler
self.assertTrue(scheduler.running)
jobs = scheduler.get_jobs()
job_ids = {j.id for j in jobs}
expected_jobs = {
'auto_complete_exchanges',
'purge_old_exchanges',
'cleanup_expired_tokens',
'cleanup_expired_sessions',
'cleanup_expired_rate_limits'
}
self.assertTrue(expected_jobs.issubset(job_ids))
Manual Testing
For development, trigger jobs manually:
# In Flask shell or debug endpoint
from jobs import auto_complete_exchanges
auto_complete_exchanges() # Run job immediately
Performance Considerations
Job Execution Time
Expected execution times:
| Job | Frequency | Expected Time | Notes |
|---|---|---|---|
| Reminder Emails | Dynamic | 1-30 seconds | Depends on participant count |
| Auto-Complete | Daily | <1 second | Few exchanges per day |
| Data Purge | Daily | 1-10 seconds | Rare (30-day retention) |
| Token Cleanup | Hourly | <1 second | Small dataset |
| Session Cleanup | Daily | <1 second | Small dataset |
| Rate Limit Cleanup | Daily | <1 second | Small dataset |
Concurrency
Max Workers: 4 threads
- Sufficient for low-volume tasks
- Prevents resource contention
- Most jobs run sequentially anyway
Job Isolation:
- Each job operates independently
- No shared state between jobs
- Database transactions ensure consistency
Resource Usage
Memory: ~10-50 MB for scheduler + job execution CPU: Minimal (mostly I/O bound: database, email API) Network: Email sending (Resend API calls)
Timezone Handling
Challenge
Exchanges have individual timezones, but scheduler runs in UTC.
Solution
- Store all datetimes in UTC in database
- Convert to exchange timezone when scheduling jobs
- APScheduler runs in UTC (consistent base)
Example:
# Exchange timezone: America/New_York (UTC-5)
# Exchange date: 2025-12-25 18:00:00 EST
exchange_tz = pytz.timezone("America/New_York")
exchange_datetime_local = exchange_tz.localize(datetime(2025, 12, 25, 18, 0, 0))
exchange_datetime_utc = exchange_datetime_local.astimezone(pytz.utc)
# Schedule reminder 7 days before (in UTC)
reminder_datetime_utc = exchange_datetime_utc - timedelta(days=7)
scheduler.add_job(
func=send_reminder_for_exchange,
trigger='date',
run_date=reminder_datetime_utc, # Scheduled in UTC
...
)
Security Considerations
Job Execution Context
- Jobs run with full database access (no user session)
- No authentication required for job functions
- Jobs must validate data before operations
Preventing Abuse
- Jobs are internal only (not exposed via API)
- No user-controlled job scheduling
- Admin cannot arbitrarily schedule jobs (only through defined workflows)
Data Safety
- All database operations wrapped in transactions
- Rollback on error prevents partial updates
- Deletion jobs log before purging
Deployment Considerations
Container Restarts
- APScheduler persists jobs to database
- On restart, scheduler resumes scheduled jobs
- Missed jobs execute if within
misfire_grace_time
Scaling
Single Instance:
- Current design assumes single application instance
- SQLite doesn't support multi-instance locking
Future Multi-Instance:
- Would require external job store (Redis, PostgreSQL)
- Distributed locking to prevent duplicate execution
Health Checks
Monitor scheduler health:
def scheduler_health_check():
"""Check if scheduler is running."""
from flask import current_app
scheduler = current_app.scheduler
if not scheduler or not scheduler.running:
return {"status": "unhealthy", "reason": "Scheduler not running"}
return {"status": "healthy", "jobs": len(scheduler.get_jobs())}
Future Enhancements
- Job Retry Policy: Automatic retry with exponential backoff
- Job Dashboard: Admin UI to view scheduled jobs and execution history
- Manual Job Triggers: Allow admin to manually trigger data purge or reminders
- Job Metrics: Track execution time, success rate, failure patterns
- Distributed Scheduler: Support for multi-instance deployments
- Webhook Jobs: Trigger external webhooks on events