# Background Jobs Component Design - v0.1.0 **Version**: 0.1.0 **Date**: 2025-12-22 **Status**: Initial Design ## Introduction This document defines the background job scheduling system for Sneaky Klaus. The system uses APScheduler to handle recurring tasks like reminder emails, automatic exchange completion, data purging, and cleanup operations. ## Requirements ### Functional Requirements 1. **Reminder Scheduling**: Send reminder emails at configured intervals before exchange date 2. **Auto-Completion**: Automatically mark exchanges as complete after exchange date 3. **Data Purging**: Delete exchange data 30 days after completion 4. **Token Cleanup**: Purge expired authentication tokens 5. **Session Cleanup**: Remove expired sessions 6. **Rate Limit Cleanup**: Clear expired rate limit entries ### Non-Functional Requirements 1. **Reliability**: Jobs must execute even if application restarts 2. **Timezone Awareness**: Respect exchange-specific timezones 3. **Error Handling**: Gracefully handle failures without crashing scheduler 4. **Observability**: Log all job executions for debugging 5. **Performance**: Jobs should not block application requests 6. **Testability**: Jobs must be unit-testable ## Architecture Overview ```mermaid flowchart TB subgraph "Application Startup" Init[Application Init] Scheduler[Initialize APScheduler] Jobs[Register Jobs] end subgraph "APScheduler" JobStore[(SQLAlchemy JobStore)] Executor[ThreadPool Executor] Triggers[Cron & Interval Triggers] end subgraph "Job Definitions" Reminders[Reminder Emails] AutoComplete[Auto-Complete Exchanges] DataPurge[Data Purge] TokenCleanup[Token Cleanup] SessionCleanup[Session Cleanup] RateLimitCleanup[Rate Limit Cleanup] end subgraph "Services" NotificationService[Notification Service] ExchangeService[Exchange Service] DB[(Database)] end Init --> Scheduler Scheduler --> JobStore Scheduler --> Executor Scheduler --> Triggers Scheduler --> Jobs Jobs --> Reminders Jobs --> AutoComplete Jobs --> DataPurge Jobs --> TokenCleanup Jobs --> SessionCleanup Jobs --> RateLimitCleanup Reminders --> NotificationService AutoComplete --> ExchangeService DataPurge --> ExchangeService TokenCleanup --> DB SessionCleanup --> DB RateLimitCleanup --> DB NotificationService --> DB ExchangeService --> DB ``` ## APScheduler Configuration ### Scheduler Setup ```python from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor from pytz import utc def create_scheduler(app): """ Initialize and configure APScheduler. Args: app: Flask application instance Returns: Configured BackgroundScheduler """ jobstores = { 'default': SQLAlchemyJobStore(url=app.config['DATABASE_URL']) } executors = { 'default': ThreadPoolExecutor(max_workers=4) } job_defaults = { 'coalesce': True, # Combine missed executions into one 'max_instances': 1, # Prevent concurrent runs of same job 'misfire_grace_time': 300 # 5 minutes grace for missed jobs } scheduler = BackgroundScheduler( jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc # All jobs in UTC, converted per-exchange ) return scheduler ``` ### Configuration Options | Option | Value | Rationale | |--------|-------|-----------| | `coalesce` | True | Prevents job queue buildup if app offline | | `max_instances` | 1 | Prevents concurrent execution of same job | | `misfire_grace_time` | 300 seconds | Allows up to 5 minutes late execution | | `timezone` | UTC | Consistent base timezone, converted per-exchange | | `max_workers` | 4 | Sufficient for low-volume background tasks | ### Job Store Persistence Jobs stored in SQLite database: - **Table**: `apscheduler_jobs` - **Persistence**: Jobs survive application restarts - **Cleanup**: Old job runs automatically removed ## Job Definitions ### 1. Reminder Email Job **Purpose**: Send reminder emails to participants before exchange date **Schedule**: Dynamic per exchange based on admin configuration **Trigger Type**: Cron (specific date/time) **Implementation**: ```python def schedule_reminder_emails(exchange_id: int, scheduler: BackgroundScheduler): """ Schedule reminder emails for an exchange. Called when exchange is matched or reminder preferences updated. Args: exchange_id: Exchange ID scheduler: APScheduler instance """ from services.notification_service import NotificationService exchange = Exchange.query.get(exchange_id) if not exchange or exchange.state != ExchangeState.MATCHED: return # Get reminder intervals (e.g., [7, 3, 1] days before) reminder_days = get_reminder_intervals(exchange_id) # Admin configurable # Convert exchange date to exchange timezone exchange_tz = pytz.timezone(exchange.timezone) exchange_datetime = exchange.exchange_date.replace(tzinfo=pytz.utc).astimezone(exchange_tz) for days_before in reminder_days: reminder_datetime = exchange_datetime - timedelta(days=days_before) # Skip if reminder time is in the past if reminder_datetime <= datetime.now(exchange_tz): continue # Schedule job job_id = f"reminder_exchange_{exchange_id}_days_{days_before}" scheduler.add_job( func=send_reminder_for_exchange, args=[exchange_id], trigger='date', run_date=reminder_datetime, id=job_id, replace_existing=True, misfire_grace_time=3600 # 1 hour grace ) logger.info(f"Scheduled reminder for exchange {exchange_id} at {reminder_datetime}") def send_reminder_for_exchange(exchange_id: int): """ Job function: Send reminder emails to all opted-in participants. Args: exchange_id: Exchange ID """ logger.info(f"Executing reminder job for exchange {exchange_id}") try: # Get exchange and validate state exchange = Exchange.query.get(exchange_id) if not exchange or exchange.state != ExchangeState.MATCHED: logger.warning(f"Skipping reminder for exchange {exchange_id}: invalid state") return # Get opted-in participants participants = Participant.query.filter_by( exchange_id=exchange_id, reminder_enabled=True, withdrawn_at=None ).all() if not participants: logger.info(f"No participants opted in for reminders in exchange {exchange_id}") return # Send reminders notification_service = NotificationService() success_count = 0 failure_count = 0 for participant in participants: result = notification_service.send_reminder_email(participant.id) if result.success: success_count += 1 else: failure_count += 1 logger.error(f"Failed to send reminder to participant {participant.id}: {result.error}") logger.info(f"Reminder job complete for exchange {exchange_id}: {success_count} sent, {failure_count} failed") except Exception as e: logger.error(f"Reminder job failed for exchange {exchange_id}: {str(e)}") raise # Re-raise to mark job as failed ``` **Default Reminder Intervals**: 7 days, 3 days, 1 day before exchange (admin configurable) --- ### 2. Auto-Complete Exchange Job **Purpose**: Automatically mark exchanges as complete after exchange date passes **Schedule**: Daily at midnight UTC **Trigger Type**: Cron **Implementation**: ```python def register_auto_complete_job(scheduler: BackgroundScheduler): """ Register job to auto-complete exchanges. Runs daily at midnight UTC. """ scheduler.add_job( func=auto_complete_exchanges, trigger='cron', hour=0, minute=0, id='auto_complete_exchanges', replace_existing=True ) def auto_complete_exchanges(): """ Job function: Mark exchanges as complete if exchange date has passed. """ logger.info("Executing auto-complete exchanges job") try: # Find exchanges in "matched" state with exchange date in the past now = datetime.utcnow() exchanges = Exchange.query.filter( Exchange.state == ExchangeState.MATCHED, Exchange.exchange_date <= now ).all() if not exchanges: logger.info("No exchanges to auto-complete") return completed_count = 0 for exchange in exchanges: try: # Update state exchange.state = ExchangeState.COMPLETED exchange.completed_at = now db.session.add(exchange) db.session.commit() completed_count += 1 logger.info(f"Auto-completed exchange {exchange.id}: {exchange.name}") except Exception as e: db.session.rollback() logger.error(f"Failed to auto-complete exchange {exchange.id}: {str(e)}") logger.info(f"Auto-complete job finished: {completed_count} exchanges completed") except Exception as e: logger.error(f"Auto-complete job failed: {str(e)}") raise ``` --- ### 3. Data Purge Job **Purpose**: Delete exchange data 30 days after completion **Schedule**: Daily at 2:00 AM UTC **Trigger Type**: Cron **Implementation**: ```python def register_data_purge_job(scheduler: BackgroundScheduler): """ Register job to purge old exchange data. Runs daily at 2:00 AM UTC. """ scheduler.add_job( func=purge_old_exchanges, trigger='cron', hour=2, minute=0, id='purge_old_exchanges', replace_existing=True ) def purge_old_exchanges(): """ Job function: Delete exchanges completed more than 30 days ago. Sends warning email to admin 7 days before purge. """ logger.info("Executing data purge job") try: now = datetime.utcnow() purge_threshold = now - timedelta(days=30) warning_threshold = now + timedelta(days=7) # Warn 7 days before # Find exchanges eligible for purge exchanges_to_purge = Exchange.query.filter( Exchange.state == ExchangeState.COMPLETED, Exchange.completed_at <= purge_threshold ).all() # Find exchanges to warn about exchanges_to_warn = Exchange.query.filter( Exchange.state == ExchangeState.COMPLETED, Exchange.completed_at <= purge_threshold - timedelta(days=7), # 23 days after completion Exchange.completed_at > purge_threshold # Not yet eligible for purge ).all() # Send warning emails notification_service = NotificationService() for exchange in exchanges_to_warn: try: notification_service.send_data_purge_warning(exchange.id) logger.info(f"Sent purge warning for exchange {exchange.id}") except Exception as e: logger.error(f"Failed to send purge warning for exchange {exchange.id}: {str(e)}") # Purge old exchanges purged_count = 0 for exchange in exchanges_to_purge: try: # Delete exchange (cascades to participants, matches, exclusions, etc.) db.session.delete(exchange) db.session.commit() purged_count += 1 logger.info(f"Purged exchange {exchange.id}: {exchange.name}") except Exception as e: db.session.rollback() logger.error(f"Failed to purge exchange {exchange.id}: {str(e)}") logger.info(f"Data purge job finished: {purged_count} exchanges purged, {len(exchanges_to_warn)} warnings sent") except Exception as e: logger.error(f"Data purge job failed: {str(e)}") raise ``` **Retention Policy**: 30 days after completion **Warning Period**: 7 days before purge --- ### 4. Token Cleanup Job **Purpose**: Purge expired or used authentication tokens **Schedule**: Hourly **Trigger Type**: Interval **Implementation**: ```python def register_token_cleanup_job(scheduler: BackgroundScheduler): """ Register job to clean up expired tokens. Runs every hour. """ scheduler.add_job( func=cleanup_expired_tokens, trigger='interval', hours=1, id='cleanup_expired_tokens', replace_existing=True ) def cleanup_expired_tokens(): """ Job function: Delete expired or used authentication tokens. """ logger.info("Executing token cleanup job") try: now = datetime.utcnow() # Delete expired tokens expired_tokens = MagicToken.query.filter( MagicToken.expires_at <= now ).delete() # Delete used tokens older than 24 hours old_used_tokens = MagicToken.query.filter( MagicToken.used_at.isnot(None), MagicToken.used_at <= now - timedelta(hours=24) ).delete() db.session.commit() total_deleted = expired_tokens + old_used_tokens logger.info(f"Token cleanup job finished: {total_deleted} tokens deleted") except Exception as e: db.session.rollback() logger.error(f"Token cleanup job failed: {str(e)}") raise ``` --- ### 5. Session Cleanup Job **Purpose**: Remove expired sessions from database **Schedule**: Daily at 3:00 AM UTC **Trigger Type**: Cron **Implementation**: ```python def register_session_cleanup_job(scheduler: BackgroundScheduler): """ Register job to clean up expired sessions. Runs daily at 3:00 AM UTC. """ scheduler.add_job( func=cleanup_expired_sessions, trigger='cron', hour=3, minute=0, id='cleanup_expired_sessions', replace_existing=True ) def cleanup_expired_sessions(): """ Job function: Delete expired sessions. """ logger.info("Executing session cleanup job") try: now = datetime.utcnow() deleted_count = Session.query.filter( Session.expires_at <= now ).delete() db.session.commit() logger.info(f"Session cleanup job finished: {deleted_count} sessions deleted") except Exception as e: db.session.rollback() logger.error(f"Session cleanup job failed: {str(e)}") raise ``` --- ### 6. Rate Limit Cleanup Job **Purpose**: Clear expired rate limit entries **Schedule**: Daily at 4:00 AM UTC **Trigger Type**: Cron **Implementation**: ```python def register_rate_limit_cleanup_job(scheduler: BackgroundScheduler): """ Register job to clean up expired rate limit entries. Runs daily at 4:00 AM UTC. """ scheduler.add_job( func=cleanup_expired_rate_limits, trigger='cron', hour=4, minute=0, id='cleanup_expired_rate_limits', replace_existing=True ) def cleanup_expired_rate_limits(): """ Job function: Delete expired rate limit entries. """ logger.info("Executing rate limit cleanup job") try: now = datetime.utcnow() deleted_count = RateLimit.query.filter( RateLimit.expires_at <= now ).delete() db.session.commit() logger.info(f"Rate limit cleanup job finished: {deleted_count} entries deleted") except Exception as e: db.session.rollback() logger.error(f"Rate limit cleanup job failed: {str(e)}") raise ``` --- ## Job Registration ### Application Initialization ```python # In app.py or scheduler_service.py def initialize_scheduler(app): """ Initialize scheduler and register all jobs. Called during application startup. """ scheduler = create_scheduler(app) # Register recurring jobs register_auto_complete_job(scheduler) register_data_purge_job(scheduler) register_token_cleanup_job(scheduler) register_session_cleanup_job(scheduler) register_rate_limit_cleanup_job(scheduler) # Start scheduler scheduler.start() logger.info("Background scheduler started successfully") # Shutdown scheduler on app exit import atexit atexit.register(lambda: scheduler.shutdown()) return scheduler # In Flask app factory def create_app(): app = Flask(__name__) # ... other initialization # Initialize scheduler with app.app_context(): scheduler = initialize_scheduler(app) app.scheduler = scheduler return app ``` ### Dynamic Job Scheduling For reminder emails, jobs are scheduled dynamically when exchange is matched: ```python # In matching service after successful matching def complete_matching_workflow(exchange_id: int): # ... matching logic if matching_result.success: # Schedule reminder emails from flask import current_app schedule_reminder_emails(exchange_id, current_app.scheduler) return result ``` **Re-scheduling**: When admin updates reminder preferences, cancel old jobs and reschedule: ```python def update_reminder_preferences(exchange_id: int, new_intervals: list[int]): """ Update reminder intervals and reschedule jobs. """ from flask import current_app # Remove existing reminder jobs job_prefix = f"reminder_exchange_{exchange_id}_" for job in current_app.scheduler.get_jobs(): if job.id.startswith(job_prefix): job.remove() # Schedule new reminders schedule_reminder_emails(exchange_id, current_app.scheduler) ``` ## Error Handling & Retries ### Job Error Handling All job functions wrapped with error handling: ```python def safe_job_wrapper(job_func): """ Decorator to wrap job functions with error handling. Args: job_func: Job function to wrap Returns: Wrapped function """ @functools.wraps(job_func) def wrapper(*args, **kwargs): try: return job_func(*args, **kwargs) except Exception as e: logger.error(f"Job {job_func.__name__} failed: {str(e)}", exc_info=True) # Don't re-raise unless we want APScheduler to mark as failed # For now, log and swallow to prevent scheduler issues return wrapper ``` ### Retry Configuration APScheduler doesn't have built-in retry logic. Implement custom retry for critical jobs: ```python from tenacity import retry, stop_after_attempt, wait_exponential @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=60)) def send_reminder_with_retry(participant_id: int): """Send reminder with retry logic.""" notification_service = NotificationService() result = notification_service.send_reminder_email(participant_id) if not result.success: raise Exception(f"Failed to send reminder: {result.error}") return result ``` ### Misfire Handling **Misfire**: Job missed scheduled time (e.g., application was down) **APScheduler Behavior**: - If `coalesce=True`: Run once immediately upon restart - If `misfire_grace_time` exceeded: Skip job - For reminder emails: 1-hour grace time is acceptable **Example**: Exchange is at 6 PM, reminder scheduled for noon. If app is down until 12:30 PM, reminder still sends (within 1-hour grace). ## Monitoring & Logging ### Job Execution Logging ```python import logging logger = logging.getLogger('background_jobs') # Log format logging.basicConfig( format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', level=logging.INFO ) # Example log entries logger.info("Executing reminder job for exchange 123") logger.error("Failed to send reminder to participant 45: SMTP error") logger.warning("Skipping reminder for exchange 67: invalid state") ``` ### Job Status Tracking (Optional Enhancement) For production monitoring, create job execution log table: ```python class JobExecution(db.Model): """Track job execution history.""" id = db.Column(db.Integer, primary_key=True) job_id = db.Column(db.String(255), nullable=False) job_type = db.Column(db.String(50), nullable=False) started_at = db.Column(db.DateTime, default=datetime.utcnow) completed_at = db.Column(db.DateTime, nullable=True) success = db.Column(db.Boolean, nullable=True) error_message = db.Column(db.Text, nullable=True) ``` Log job execution: ```python def log_job_execution(job_id: str, job_type: str): """Context manager to log job execution.""" from contextlib import contextmanager @contextmanager def _log(): execution = JobExecution(job_id=job_id, job_type=job_type) db.session.add(execution) db.session.commit() try: yield execution execution.completed_at = datetime.utcnow() execution.success = True db.session.commit() except Exception as e: execution.completed_at = datetime.utcnow() execution.success = False execution.error_message = str(e) db.session.commit() raise return _log() ``` ## Testing Strategy ### Unit Tests ```python class TestBackgroundJobs(unittest.TestCase): def test_auto_complete_exchanges(self): """Test auto-complete logic.""" # Setup: Create exchange in matched state with past exchange date exchange = create_test_exchange( state=ExchangeState.MATCHED, exchange_date=datetime.utcnow() - timedelta(days=1) ) # Execute auto_complete_exchanges() # Assert exchange = Exchange.query.get(exchange.id) self.assertEqual(exchange.state, ExchangeState.COMPLETED) self.assertIsNotNone(exchange.completed_at) def test_token_cleanup(self): """Test token cleanup removes expired tokens.""" # Setup: Create expired token token = create_test_magic_token( expires_at=datetime.utcnow() - timedelta(hours=1) ) # Execute cleanup_expired_tokens() # Assert token = MagicToken.query.get(token.id) self.assertIsNone(token) def test_reminder_scheduling(self): """Test reminder jobs are scheduled correctly.""" from flask import current_app exchange = create_matched_exchange() # Execute schedule_reminder_emails(exchange.id, current_app.scheduler) # Assert jobs = current_app.scheduler.get_jobs() reminder_jobs = [j for j in jobs if j.id.startswith(f"reminder_exchange_{exchange.id}")] # Assuming default intervals: 7, 3, 1 days self.assertEqual(len(reminder_jobs), 3) ``` ### Integration Tests ```python class TestSchedulerIntegration(TestCase): def test_scheduler_initialization(self): """Test scheduler starts and jobs are registered.""" from flask import current_app scheduler = current_app.scheduler self.assertTrue(scheduler.running) jobs = scheduler.get_jobs() job_ids = {j.id for j in jobs} expected_jobs = { 'auto_complete_exchanges', 'purge_old_exchanges', 'cleanup_expired_tokens', 'cleanup_expired_sessions', 'cleanup_expired_rate_limits' } self.assertTrue(expected_jobs.issubset(job_ids)) ``` ### Manual Testing For development, trigger jobs manually: ```python # In Flask shell or debug endpoint from jobs import auto_complete_exchanges auto_complete_exchanges() # Run job immediately ``` ## Performance Considerations ### Job Execution Time Expected execution times: | Job | Frequency | Expected Time | Notes | |-----|-----------|---------------|-------| | Reminder Emails | Dynamic | 1-30 seconds | Depends on participant count | | Auto-Complete | Daily | <1 second | Few exchanges per day | | Data Purge | Daily | 1-10 seconds | Rare (30-day retention) | | Token Cleanup | Hourly | <1 second | Small dataset | | Session Cleanup | Daily | <1 second | Small dataset | | Rate Limit Cleanup | Daily | <1 second | Small dataset | ### Concurrency **Max Workers**: 4 threads - Sufficient for low-volume tasks - Prevents resource contention - Most jobs run sequentially anyway **Job Isolation**: - Each job operates independently - No shared state between jobs - Database transactions ensure consistency ### Resource Usage **Memory**: ~10-50 MB for scheduler + job execution **CPU**: Minimal (mostly I/O bound: database, email API) **Network**: Email sending (Resend API calls) ## Timezone Handling ### Challenge Exchanges have individual timezones, but scheduler runs in UTC. ### Solution 1. **Store all datetimes in UTC** in database 2. **Convert to exchange timezone** when scheduling jobs 3. **APScheduler runs in UTC** (consistent base) **Example**: ```python # Exchange timezone: America/New_York (UTC-5) # Exchange date: 2025-12-25 18:00:00 EST exchange_tz = pytz.timezone("America/New_York") exchange_datetime_local = exchange_tz.localize(datetime(2025, 12, 25, 18, 0, 0)) exchange_datetime_utc = exchange_datetime_local.astimezone(pytz.utc) # Schedule reminder 7 days before (in UTC) reminder_datetime_utc = exchange_datetime_utc - timedelta(days=7) scheduler.add_job( func=send_reminder_for_exchange, trigger='date', run_date=reminder_datetime_utc, # Scheduled in UTC ... ) ``` ## Security Considerations ### Job Execution Context - Jobs run with full database access (no user session) - No authentication required for job functions - Jobs must validate data before operations ### Preventing Abuse - Jobs are internal only (not exposed via API) - No user-controlled job scheduling - Admin cannot arbitrarily schedule jobs (only through defined workflows) ### Data Safety - All database operations wrapped in transactions - Rollback on error prevents partial updates - Deletion jobs log before purging ## Deployment Considerations ### Container Restarts - APScheduler persists jobs to database - On restart, scheduler resumes scheduled jobs - Missed jobs execute if within `misfire_grace_time` ### Scaling **Single Instance**: - Current design assumes single application instance - SQLite doesn't support multi-instance locking **Future Multi-Instance**: - Would require external job store (Redis, PostgreSQL) - Distributed locking to prevent duplicate execution ### Health Checks Monitor scheduler health: ```python def scheduler_health_check(): """Check if scheduler is running.""" from flask import current_app scheduler = current_app.scheduler if not scheduler or not scheduler.running: return {"status": "unhealthy", "reason": "Scheduler not running"} return {"status": "healthy", "jobs": len(scheduler.get_jobs())} ``` ## Future Enhancements 1. **Job Retry Policy**: Automatic retry with exponential backoff 2. **Job Dashboard**: Admin UI to view scheduled jobs and execution history 3. **Manual Job Triggers**: Allow admin to manually trigger data purge or reminders 4. **Job Metrics**: Track execution time, success rate, failure patterns 5. **Distributed Scheduler**: Support for multi-instance deployments 6. **Webhook Jobs**: Trigger external webhooks on events ## References - [APScheduler Documentation](https://apscheduler.readthedocs.io/) - [APScheduler User Guide](https://apscheduler.readthedocs.io/en/stable/userguide.html) - [Flask APScheduler Integration](https://github.com/viniciuschiele/flask-apscheduler) - [Timezone Handling in Python](https://pytz.sourceforge.net/) - [Data Model Specification](../data-model.md) - [Notifications Component Design](./notifications.md)