Initialize Sneaky Klaus project with: - uv package management and pyproject.toml - Flask application structure (app.py, config.py) - SQLAlchemy models for Admin and Exchange - Alembic database migrations - Pre-commit hooks configuration - Development tooling (pytest, ruff, mypy) Initial structure follows design documents in docs/: - src/app.py: Application factory with Flask extensions - src/config.py: Environment-based configuration - src/models/: Admin and Exchange models - migrations/: Alembic migration setup 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1018 lines
28 KiB
Markdown
1018 lines
28 KiB
Markdown
# Background Jobs Component Design - v0.1.0
|
|
|
|
**Version**: 0.1.0
|
|
**Date**: 2025-12-22
|
|
**Status**: Initial Design
|
|
|
|
## Introduction
|
|
|
|
This document defines the background job scheduling system for Sneaky Klaus. The system uses APScheduler to handle recurring tasks like reminder emails, automatic exchange completion, data purging, and cleanup operations.
|
|
|
|
## Requirements
|
|
|
|
### Functional Requirements
|
|
|
|
1. **Reminder Scheduling**: Send reminder emails at configured intervals before exchange date
|
|
2. **Auto-Completion**: Automatically mark exchanges as complete after exchange date
|
|
3. **Data Purging**: Delete exchange data 30 days after completion
|
|
4. **Token Cleanup**: Purge expired authentication tokens
|
|
5. **Session Cleanup**: Remove expired sessions
|
|
6. **Rate Limit Cleanup**: Clear expired rate limit entries
|
|
|
|
### Non-Functional Requirements
|
|
|
|
1. **Reliability**: Jobs must execute even if application restarts
|
|
2. **Timezone Awareness**: Respect exchange-specific timezones
|
|
3. **Error Handling**: Gracefully handle failures without crashing scheduler
|
|
4. **Observability**: Log all job executions for debugging
|
|
5. **Performance**: Jobs should not block application requests
|
|
6. **Testability**: Jobs must be unit-testable
|
|
|
|
## Architecture Overview
|
|
|
|
```mermaid
|
|
flowchart TB
|
|
subgraph "Application Startup"
|
|
Init[Application Init]
|
|
Scheduler[Initialize APScheduler]
|
|
Jobs[Register Jobs]
|
|
end
|
|
|
|
subgraph "APScheduler"
|
|
JobStore[(SQLAlchemy JobStore)]
|
|
Executor[ThreadPool Executor]
|
|
Triggers[Cron & Interval Triggers]
|
|
end
|
|
|
|
subgraph "Job Definitions"
|
|
Reminders[Reminder Emails]
|
|
AutoComplete[Auto-Complete Exchanges]
|
|
DataPurge[Data Purge]
|
|
TokenCleanup[Token Cleanup]
|
|
SessionCleanup[Session Cleanup]
|
|
RateLimitCleanup[Rate Limit Cleanup]
|
|
end
|
|
|
|
subgraph "Services"
|
|
NotificationService[Notification Service]
|
|
ExchangeService[Exchange Service]
|
|
DB[(Database)]
|
|
end
|
|
|
|
Init --> Scheduler
|
|
Scheduler --> JobStore
|
|
Scheduler --> Executor
|
|
Scheduler --> Triggers
|
|
Scheduler --> Jobs
|
|
|
|
Jobs --> Reminders
|
|
Jobs --> AutoComplete
|
|
Jobs --> DataPurge
|
|
Jobs --> TokenCleanup
|
|
Jobs --> SessionCleanup
|
|
Jobs --> RateLimitCleanup
|
|
|
|
Reminders --> NotificationService
|
|
AutoComplete --> ExchangeService
|
|
DataPurge --> ExchangeService
|
|
TokenCleanup --> DB
|
|
SessionCleanup --> DB
|
|
RateLimitCleanup --> DB
|
|
|
|
NotificationService --> DB
|
|
ExchangeService --> DB
|
|
```
|
|
|
|
## APScheduler Configuration
|
|
|
|
### Scheduler Setup
|
|
|
|
```python
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
|
|
from apscheduler.executors.pool import ThreadPoolExecutor
|
|
from pytz import utc
|
|
|
|
def create_scheduler(app):
|
|
"""
|
|
Initialize and configure APScheduler.
|
|
|
|
Args:
|
|
app: Flask application instance
|
|
|
|
Returns:
|
|
Configured BackgroundScheduler
|
|
"""
|
|
jobstores = {
|
|
'default': SQLAlchemyJobStore(url=app.config['DATABASE_URL'])
|
|
}
|
|
|
|
executors = {
|
|
'default': ThreadPoolExecutor(max_workers=4)
|
|
}
|
|
|
|
job_defaults = {
|
|
'coalesce': True, # Combine missed executions into one
|
|
'max_instances': 1, # Prevent concurrent runs of same job
|
|
'misfire_grace_time': 300 # 5 minutes grace for missed jobs
|
|
}
|
|
|
|
scheduler = BackgroundScheduler(
|
|
jobstores=jobstores,
|
|
executors=executors,
|
|
job_defaults=job_defaults,
|
|
timezone=utc # All jobs in UTC, converted per-exchange
|
|
)
|
|
|
|
return scheduler
|
|
```
|
|
|
|
### Configuration Options
|
|
|
|
| Option | Value | Rationale |
|
|
|--------|-------|-----------|
|
|
| `coalesce` | True | Prevents job queue buildup if app offline |
|
|
| `max_instances` | 1 | Prevents concurrent execution of same job |
|
|
| `misfire_grace_time` | 300 seconds | Allows up to 5 minutes late execution |
|
|
| `timezone` | UTC | Consistent base timezone, converted per-exchange |
|
|
| `max_workers` | 4 | Sufficient for low-volume background tasks |
|
|
|
|
### Job Store Persistence
|
|
|
|
Jobs stored in SQLite database:
|
|
- **Table**: `apscheduler_jobs`
|
|
- **Persistence**: Jobs survive application restarts
|
|
- **Cleanup**: Old job runs automatically removed
|
|
|
|
## Job Definitions
|
|
|
|
### 1. Reminder Email Job
|
|
|
|
**Purpose**: Send reminder emails to participants before exchange date
|
|
|
|
**Schedule**: Dynamic per exchange based on admin configuration
|
|
|
|
**Trigger Type**: Cron (specific date/time)
|
|
|
|
**Implementation**:
|
|
|
|
```python
|
|
def schedule_reminder_emails(exchange_id: int, scheduler: BackgroundScheduler):
|
|
"""
|
|
Schedule reminder emails for an exchange.
|
|
|
|
Called when exchange is matched or reminder preferences updated.
|
|
|
|
Args:
|
|
exchange_id: Exchange ID
|
|
scheduler: APScheduler instance
|
|
"""
|
|
from services.notification_service import NotificationService
|
|
|
|
exchange = Exchange.query.get(exchange_id)
|
|
if not exchange or exchange.state != ExchangeState.MATCHED:
|
|
return
|
|
|
|
# Get reminder intervals (e.g., [7, 3, 1] days before)
|
|
reminder_days = get_reminder_intervals(exchange_id) # Admin configurable
|
|
|
|
# Convert exchange date to exchange timezone
|
|
exchange_tz = pytz.timezone(exchange.timezone)
|
|
exchange_datetime = exchange.exchange_date.replace(tzinfo=pytz.utc).astimezone(exchange_tz)
|
|
|
|
for days_before in reminder_days:
|
|
reminder_datetime = exchange_datetime - timedelta(days=days_before)
|
|
|
|
# Skip if reminder time is in the past
|
|
if reminder_datetime <= datetime.now(exchange_tz):
|
|
continue
|
|
|
|
# Schedule job
|
|
job_id = f"reminder_exchange_{exchange_id}_days_{days_before}"
|
|
|
|
scheduler.add_job(
|
|
func=send_reminder_for_exchange,
|
|
args=[exchange_id],
|
|
trigger='date',
|
|
run_date=reminder_datetime,
|
|
id=job_id,
|
|
replace_existing=True,
|
|
misfire_grace_time=3600 # 1 hour grace
|
|
)
|
|
|
|
logger.info(f"Scheduled reminder for exchange {exchange_id} at {reminder_datetime}")
|
|
|
|
|
|
def send_reminder_for_exchange(exchange_id: int):
|
|
"""
|
|
Job function: Send reminder emails to all opted-in participants.
|
|
|
|
Args:
|
|
exchange_id: Exchange ID
|
|
"""
|
|
logger.info(f"Executing reminder job for exchange {exchange_id}")
|
|
|
|
try:
|
|
# Get exchange and validate state
|
|
exchange = Exchange.query.get(exchange_id)
|
|
if not exchange or exchange.state != ExchangeState.MATCHED:
|
|
logger.warning(f"Skipping reminder for exchange {exchange_id}: invalid state")
|
|
return
|
|
|
|
# Get opted-in participants
|
|
participants = Participant.query.filter_by(
|
|
exchange_id=exchange_id,
|
|
reminder_enabled=True,
|
|
withdrawn_at=None
|
|
).all()
|
|
|
|
if not participants:
|
|
logger.info(f"No participants opted in for reminders in exchange {exchange_id}")
|
|
return
|
|
|
|
# Send reminders
|
|
notification_service = NotificationService()
|
|
success_count = 0
|
|
failure_count = 0
|
|
|
|
for participant in participants:
|
|
result = notification_service.send_reminder_email(participant.id)
|
|
if result.success:
|
|
success_count += 1
|
|
else:
|
|
failure_count += 1
|
|
logger.error(f"Failed to send reminder to participant {participant.id}: {result.error}")
|
|
|
|
logger.info(f"Reminder job complete for exchange {exchange_id}: {success_count} sent, {failure_count} failed")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Reminder job failed for exchange {exchange_id}: {str(e)}")
|
|
raise # Re-raise to mark job as failed
|
|
```
|
|
|
|
**Default Reminder Intervals**: 7 days, 3 days, 1 day before exchange (admin configurable)
|
|
|
|
---
|
|
|
|
### 2. Auto-Complete Exchange Job
|
|
|
|
**Purpose**: Automatically mark exchanges as complete after exchange date passes
|
|
|
|
**Schedule**: Daily at midnight UTC
|
|
|
|
**Trigger Type**: Cron
|
|
|
|
**Implementation**:
|
|
|
|
```python
|
|
def register_auto_complete_job(scheduler: BackgroundScheduler):
|
|
"""
|
|
Register job to auto-complete exchanges.
|
|
|
|
Runs daily at midnight UTC.
|
|
"""
|
|
scheduler.add_job(
|
|
func=auto_complete_exchanges,
|
|
trigger='cron',
|
|
hour=0,
|
|
minute=0,
|
|
id='auto_complete_exchanges',
|
|
replace_existing=True
|
|
)
|
|
|
|
|
|
def auto_complete_exchanges():
|
|
"""
|
|
Job function: Mark exchanges as complete if exchange date has passed.
|
|
"""
|
|
logger.info("Executing auto-complete exchanges job")
|
|
|
|
try:
|
|
# Find exchanges in "matched" state with exchange date in the past
|
|
now = datetime.utcnow()
|
|
|
|
exchanges = Exchange.query.filter(
|
|
Exchange.state == ExchangeState.MATCHED,
|
|
Exchange.exchange_date <= now
|
|
).all()
|
|
|
|
if not exchanges:
|
|
logger.info("No exchanges to auto-complete")
|
|
return
|
|
|
|
completed_count = 0
|
|
|
|
for exchange in exchanges:
|
|
try:
|
|
# Update state
|
|
exchange.state = ExchangeState.COMPLETED
|
|
exchange.completed_at = now
|
|
|
|
db.session.add(exchange)
|
|
db.session.commit()
|
|
|
|
completed_count += 1
|
|
logger.info(f"Auto-completed exchange {exchange.id}: {exchange.name}")
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"Failed to auto-complete exchange {exchange.id}: {str(e)}")
|
|
|
|
logger.info(f"Auto-complete job finished: {completed_count} exchanges completed")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Auto-complete job failed: {str(e)}")
|
|
raise
|
|
```
|
|
|
|
---
|
|
|
|
### 3. Data Purge Job
|
|
|
|
**Purpose**: Delete exchange data 30 days after completion
|
|
|
|
**Schedule**: Daily at 2:00 AM UTC
|
|
|
|
**Trigger Type**: Cron
|
|
|
|
**Implementation**:
|
|
|
|
```python
|
|
def register_data_purge_job(scheduler: BackgroundScheduler):
|
|
"""
|
|
Register job to purge old exchange data.
|
|
|
|
Runs daily at 2:00 AM UTC.
|
|
"""
|
|
scheduler.add_job(
|
|
func=purge_old_exchanges,
|
|
trigger='cron',
|
|
hour=2,
|
|
minute=0,
|
|
id='purge_old_exchanges',
|
|
replace_existing=True
|
|
)
|
|
|
|
|
|
def purge_old_exchanges():
|
|
"""
|
|
Job function: Delete exchanges completed more than 30 days ago.
|
|
|
|
Sends warning email to admin 7 days before purge.
|
|
"""
|
|
logger.info("Executing data purge job")
|
|
|
|
try:
|
|
now = datetime.utcnow()
|
|
purge_threshold = now - timedelta(days=30)
|
|
warning_threshold = now + timedelta(days=7) # Warn 7 days before
|
|
|
|
# Find exchanges eligible for purge
|
|
exchanges_to_purge = Exchange.query.filter(
|
|
Exchange.state == ExchangeState.COMPLETED,
|
|
Exchange.completed_at <= purge_threshold
|
|
).all()
|
|
|
|
# Find exchanges to warn about
|
|
exchanges_to_warn = Exchange.query.filter(
|
|
Exchange.state == ExchangeState.COMPLETED,
|
|
Exchange.completed_at <= purge_threshold - timedelta(days=7), # 23 days after completion
|
|
Exchange.completed_at > purge_threshold # Not yet eligible for purge
|
|
).all()
|
|
|
|
# Send warning emails
|
|
notification_service = NotificationService()
|
|
for exchange in exchanges_to_warn:
|
|
try:
|
|
notification_service.send_data_purge_warning(exchange.id)
|
|
logger.info(f"Sent purge warning for exchange {exchange.id}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to send purge warning for exchange {exchange.id}: {str(e)}")
|
|
|
|
# Purge old exchanges
|
|
purged_count = 0
|
|
for exchange in exchanges_to_purge:
|
|
try:
|
|
# Delete exchange (cascades to participants, matches, exclusions, etc.)
|
|
db.session.delete(exchange)
|
|
db.session.commit()
|
|
|
|
purged_count += 1
|
|
logger.info(f"Purged exchange {exchange.id}: {exchange.name}")
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"Failed to purge exchange {exchange.id}: {str(e)}")
|
|
|
|
logger.info(f"Data purge job finished: {purged_count} exchanges purged, {len(exchanges_to_warn)} warnings sent")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Data purge job failed: {str(e)}")
|
|
raise
|
|
```
|
|
|
|
**Retention Policy**: 30 days after completion
|
|
**Warning Period**: 7 days before purge
|
|
|
|
---
|
|
|
|
### 4. Token Cleanup Job
|
|
|
|
**Purpose**: Purge expired or used authentication tokens
|
|
|
|
**Schedule**: Hourly
|
|
|
|
**Trigger Type**: Interval
|
|
|
|
**Implementation**:
|
|
|
|
```python
|
|
def register_token_cleanup_job(scheduler: BackgroundScheduler):
|
|
"""
|
|
Register job to clean up expired tokens.
|
|
|
|
Runs every hour.
|
|
"""
|
|
scheduler.add_job(
|
|
func=cleanup_expired_tokens,
|
|
trigger='interval',
|
|
hours=1,
|
|
id='cleanup_expired_tokens',
|
|
replace_existing=True
|
|
)
|
|
|
|
|
|
def cleanup_expired_tokens():
|
|
"""
|
|
Job function: Delete expired or used authentication tokens.
|
|
"""
|
|
logger.info("Executing token cleanup job")
|
|
|
|
try:
|
|
now = datetime.utcnow()
|
|
|
|
# Delete expired tokens
|
|
expired_tokens = MagicToken.query.filter(
|
|
MagicToken.expires_at <= now
|
|
).delete()
|
|
|
|
# Delete used tokens older than 24 hours
|
|
old_used_tokens = MagicToken.query.filter(
|
|
MagicToken.used_at.isnot(None),
|
|
MagicToken.used_at <= now - timedelta(hours=24)
|
|
).delete()
|
|
|
|
db.session.commit()
|
|
|
|
total_deleted = expired_tokens + old_used_tokens
|
|
logger.info(f"Token cleanup job finished: {total_deleted} tokens deleted")
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"Token cleanup job failed: {str(e)}")
|
|
raise
|
|
```
|
|
|
|
---
|
|
|
|
### 5. Session Cleanup Job
|
|
|
|
**Purpose**: Remove expired sessions from database
|
|
|
|
**Schedule**: Daily at 3:00 AM UTC
|
|
|
|
**Trigger Type**: Cron
|
|
|
|
**Implementation**:
|
|
|
|
```python
|
|
def register_session_cleanup_job(scheduler: BackgroundScheduler):
|
|
"""
|
|
Register job to clean up expired sessions.
|
|
|
|
Runs daily at 3:00 AM UTC.
|
|
"""
|
|
scheduler.add_job(
|
|
func=cleanup_expired_sessions,
|
|
trigger='cron',
|
|
hour=3,
|
|
minute=0,
|
|
id='cleanup_expired_sessions',
|
|
replace_existing=True
|
|
)
|
|
|
|
|
|
def cleanup_expired_sessions():
|
|
"""
|
|
Job function: Delete expired sessions.
|
|
"""
|
|
logger.info("Executing session cleanup job")
|
|
|
|
try:
|
|
now = datetime.utcnow()
|
|
|
|
deleted_count = Session.query.filter(
|
|
Session.expires_at <= now
|
|
).delete()
|
|
|
|
db.session.commit()
|
|
|
|
logger.info(f"Session cleanup job finished: {deleted_count} sessions deleted")
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"Session cleanup job failed: {str(e)}")
|
|
raise
|
|
```
|
|
|
|
---
|
|
|
|
### 6. Rate Limit Cleanup Job
|
|
|
|
**Purpose**: Clear expired rate limit entries
|
|
|
|
**Schedule**: Daily at 4:00 AM UTC
|
|
|
|
**Trigger Type**: Cron
|
|
|
|
**Implementation**:
|
|
|
|
```python
|
|
def register_rate_limit_cleanup_job(scheduler: BackgroundScheduler):
|
|
"""
|
|
Register job to clean up expired rate limit entries.
|
|
|
|
Runs daily at 4:00 AM UTC.
|
|
"""
|
|
scheduler.add_job(
|
|
func=cleanup_expired_rate_limits,
|
|
trigger='cron',
|
|
hour=4,
|
|
minute=0,
|
|
id='cleanup_expired_rate_limits',
|
|
replace_existing=True
|
|
)
|
|
|
|
|
|
def cleanup_expired_rate_limits():
|
|
"""
|
|
Job function: Delete expired rate limit entries.
|
|
"""
|
|
logger.info("Executing rate limit cleanup job")
|
|
|
|
try:
|
|
now = datetime.utcnow()
|
|
|
|
deleted_count = RateLimit.query.filter(
|
|
RateLimit.expires_at <= now
|
|
).delete()
|
|
|
|
db.session.commit()
|
|
|
|
logger.info(f"Rate limit cleanup job finished: {deleted_count} entries deleted")
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"Rate limit cleanup job failed: {str(e)}")
|
|
raise
|
|
```
|
|
|
|
---
|
|
|
|
## Job Registration
|
|
|
|
### Application Initialization
|
|
|
|
```python
|
|
# In app.py or scheduler_service.py
|
|
|
|
def initialize_scheduler(app):
|
|
"""
|
|
Initialize scheduler and register all jobs.
|
|
|
|
Called during application startup.
|
|
"""
|
|
scheduler = create_scheduler(app)
|
|
|
|
# Register recurring jobs
|
|
register_auto_complete_job(scheduler)
|
|
register_data_purge_job(scheduler)
|
|
register_token_cleanup_job(scheduler)
|
|
register_session_cleanup_job(scheduler)
|
|
register_rate_limit_cleanup_job(scheduler)
|
|
|
|
# Start scheduler
|
|
scheduler.start()
|
|
|
|
logger.info("Background scheduler started successfully")
|
|
|
|
# Shutdown scheduler on app exit
|
|
import atexit
|
|
atexit.register(lambda: scheduler.shutdown())
|
|
|
|
return scheduler
|
|
|
|
|
|
# In Flask app factory
|
|
def create_app():
|
|
app = Flask(__name__)
|
|
# ... other initialization
|
|
|
|
# Initialize scheduler
|
|
with app.app_context():
|
|
scheduler = initialize_scheduler(app)
|
|
app.scheduler = scheduler
|
|
|
|
return app
|
|
```
|
|
|
|
### Dynamic Job Scheduling
|
|
|
|
For reminder emails, jobs are scheduled dynamically when exchange is matched:
|
|
|
|
```python
|
|
# In matching service after successful matching
|
|
def complete_matching_workflow(exchange_id: int):
|
|
# ... matching logic
|
|
|
|
if matching_result.success:
|
|
# Schedule reminder emails
|
|
from flask import current_app
|
|
schedule_reminder_emails(exchange_id, current_app.scheduler)
|
|
|
|
return result
|
|
```
|
|
|
|
**Re-scheduling**: When admin updates reminder preferences, cancel old jobs and reschedule:
|
|
|
|
```python
|
|
def update_reminder_preferences(exchange_id: int, new_intervals: list[int]):
|
|
"""
|
|
Update reminder intervals and reschedule jobs.
|
|
"""
|
|
from flask import current_app
|
|
|
|
# Remove existing reminder jobs
|
|
job_prefix = f"reminder_exchange_{exchange_id}_"
|
|
for job in current_app.scheduler.get_jobs():
|
|
if job.id.startswith(job_prefix):
|
|
job.remove()
|
|
|
|
# Schedule new reminders
|
|
schedule_reminder_emails(exchange_id, current_app.scheduler)
|
|
```
|
|
|
|
## Error Handling & Retries
|
|
|
|
### Job Error Handling
|
|
|
|
All job functions wrapped with error handling:
|
|
|
|
```python
|
|
def safe_job_wrapper(job_func):
|
|
"""
|
|
Decorator to wrap job functions with error handling.
|
|
|
|
Args:
|
|
job_func: Job function to wrap
|
|
|
|
Returns:
|
|
Wrapped function
|
|
"""
|
|
@functools.wraps(job_func)
|
|
def wrapper(*args, **kwargs):
|
|
try:
|
|
return job_func(*args, **kwargs)
|
|
except Exception as e:
|
|
logger.error(f"Job {job_func.__name__} failed: {str(e)}", exc_info=True)
|
|
# Don't re-raise unless we want APScheduler to mark as failed
|
|
# For now, log and swallow to prevent scheduler issues
|
|
return wrapper
|
|
```
|
|
|
|
### Retry Configuration
|
|
|
|
APScheduler doesn't have built-in retry logic. Implement custom retry for critical jobs:
|
|
|
|
```python
|
|
from tenacity import retry, stop_after_attempt, wait_exponential
|
|
|
|
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=60))
|
|
def send_reminder_with_retry(participant_id: int):
|
|
"""Send reminder with retry logic."""
|
|
notification_service = NotificationService()
|
|
result = notification_service.send_reminder_email(participant_id)
|
|
|
|
if not result.success:
|
|
raise Exception(f"Failed to send reminder: {result.error}")
|
|
|
|
return result
|
|
```
|
|
|
|
### Misfire Handling
|
|
|
|
**Misfire**: Job missed scheduled time (e.g., application was down)
|
|
|
|
**APScheduler Behavior**:
|
|
- If `coalesce=True`: Run once immediately upon restart
|
|
- If `misfire_grace_time` exceeded: Skip job
|
|
- For reminder emails: 1-hour grace time is acceptable
|
|
|
|
**Example**: Exchange is at 6 PM, reminder scheduled for noon. If app is down until 12:30 PM, reminder still sends (within 1-hour grace).
|
|
|
|
## Monitoring & Logging
|
|
|
|
### Job Execution Logging
|
|
|
|
```python
|
|
import logging
|
|
|
|
logger = logging.getLogger('background_jobs')
|
|
|
|
# Log format
|
|
logging.basicConfig(
|
|
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
|
|
level=logging.INFO
|
|
)
|
|
|
|
# Example log entries
|
|
logger.info("Executing reminder job for exchange 123")
|
|
logger.error("Failed to send reminder to participant 45: SMTP error")
|
|
logger.warning("Skipping reminder for exchange 67: invalid state")
|
|
```
|
|
|
|
### Job Status Tracking (Optional Enhancement)
|
|
|
|
For production monitoring, create job execution log table:
|
|
|
|
```python
|
|
class JobExecution(db.Model):
|
|
"""Track job execution history."""
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
job_id = db.Column(db.String(255), nullable=False)
|
|
job_type = db.Column(db.String(50), nullable=False)
|
|
started_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
completed_at = db.Column(db.DateTime, nullable=True)
|
|
success = db.Column(db.Boolean, nullable=True)
|
|
error_message = db.Column(db.Text, nullable=True)
|
|
```
|
|
|
|
Log job execution:
|
|
|
|
```python
|
|
def log_job_execution(job_id: str, job_type: str):
|
|
"""Context manager to log job execution."""
|
|
from contextlib import contextmanager
|
|
|
|
@contextmanager
|
|
def _log():
|
|
execution = JobExecution(job_id=job_id, job_type=job_type)
|
|
db.session.add(execution)
|
|
db.session.commit()
|
|
|
|
try:
|
|
yield execution
|
|
execution.completed_at = datetime.utcnow()
|
|
execution.success = True
|
|
db.session.commit()
|
|
except Exception as e:
|
|
execution.completed_at = datetime.utcnow()
|
|
execution.success = False
|
|
execution.error_message = str(e)
|
|
db.session.commit()
|
|
raise
|
|
|
|
return _log()
|
|
```
|
|
|
|
## Testing Strategy
|
|
|
|
### Unit Tests
|
|
|
|
```python
|
|
class TestBackgroundJobs(unittest.TestCase):
|
|
|
|
def test_auto_complete_exchanges(self):
|
|
"""Test auto-complete logic."""
|
|
# Setup: Create exchange in matched state with past exchange date
|
|
exchange = create_test_exchange(
|
|
state=ExchangeState.MATCHED,
|
|
exchange_date=datetime.utcnow() - timedelta(days=1)
|
|
)
|
|
|
|
# Execute
|
|
auto_complete_exchanges()
|
|
|
|
# Assert
|
|
exchange = Exchange.query.get(exchange.id)
|
|
self.assertEqual(exchange.state, ExchangeState.COMPLETED)
|
|
self.assertIsNotNone(exchange.completed_at)
|
|
|
|
def test_token_cleanup(self):
|
|
"""Test token cleanup removes expired tokens."""
|
|
# Setup: Create expired token
|
|
token = create_test_magic_token(
|
|
expires_at=datetime.utcnow() - timedelta(hours=1)
|
|
)
|
|
|
|
# Execute
|
|
cleanup_expired_tokens()
|
|
|
|
# Assert
|
|
token = MagicToken.query.get(token.id)
|
|
self.assertIsNone(token)
|
|
|
|
def test_reminder_scheduling(self):
|
|
"""Test reminder jobs are scheduled correctly."""
|
|
from flask import current_app
|
|
|
|
exchange = create_matched_exchange()
|
|
|
|
# Execute
|
|
schedule_reminder_emails(exchange.id, current_app.scheduler)
|
|
|
|
# Assert
|
|
jobs = current_app.scheduler.get_jobs()
|
|
reminder_jobs = [j for j in jobs if j.id.startswith(f"reminder_exchange_{exchange.id}")]
|
|
|
|
# Assuming default intervals: 7, 3, 1 days
|
|
self.assertEqual(len(reminder_jobs), 3)
|
|
```
|
|
|
|
### Integration Tests
|
|
|
|
```python
|
|
class TestSchedulerIntegration(TestCase):
|
|
|
|
def test_scheduler_initialization(self):
|
|
"""Test scheduler starts and jobs are registered."""
|
|
from flask import current_app
|
|
|
|
scheduler = current_app.scheduler
|
|
|
|
self.assertTrue(scheduler.running)
|
|
|
|
jobs = scheduler.get_jobs()
|
|
job_ids = {j.id for j in jobs}
|
|
|
|
expected_jobs = {
|
|
'auto_complete_exchanges',
|
|
'purge_old_exchanges',
|
|
'cleanup_expired_tokens',
|
|
'cleanup_expired_sessions',
|
|
'cleanup_expired_rate_limits'
|
|
}
|
|
|
|
self.assertTrue(expected_jobs.issubset(job_ids))
|
|
```
|
|
|
|
### Manual Testing
|
|
|
|
For development, trigger jobs manually:
|
|
|
|
```python
|
|
# In Flask shell or debug endpoint
|
|
from jobs import auto_complete_exchanges
|
|
|
|
auto_complete_exchanges() # Run job immediately
|
|
```
|
|
|
|
## Performance Considerations
|
|
|
|
### Job Execution Time
|
|
|
|
Expected execution times:
|
|
|
|
| Job | Frequency | Expected Time | Notes |
|
|
|-----|-----------|---------------|-------|
|
|
| Reminder Emails | Dynamic | 1-30 seconds | Depends on participant count |
|
|
| Auto-Complete | Daily | <1 second | Few exchanges per day |
|
|
| Data Purge | Daily | 1-10 seconds | Rare (30-day retention) |
|
|
| Token Cleanup | Hourly | <1 second | Small dataset |
|
|
| Session Cleanup | Daily | <1 second | Small dataset |
|
|
| Rate Limit Cleanup | Daily | <1 second | Small dataset |
|
|
|
|
### Concurrency
|
|
|
|
**Max Workers**: 4 threads
|
|
- Sufficient for low-volume tasks
|
|
- Prevents resource contention
|
|
- Most jobs run sequentially anyway
|
|
|
|
**Job Isolation**:
|
|
- Each job operates independently
|
|
- No shared state between jobs
|
|
- Database transactions ensure consistency
|
|
|
|
### Resource Usage
|
|
|
|
**Memory**: ~10-50 MB for scheduler + job execution
|
|
**CPU**: Minimal (mostly I/O bound: database, email API)
|
|
**Network**: Email sending (Resend API calls)
|
|
|
|
## Timezone Handling
|
|
|
|
### Challenge
|
|
|
|
Exchanges have individual timezones, but scheduler runs in UTC.
|
|
|
|
### Solution
|
|
|
|
1. **Store all datetimes in UTC** in database
|
|
2. **Convert to exchange timezone** when scheduling jobs
|
|
3. **APScheduler runs in UTC** (consistent base)
|
|
|
|
**Example**:
|
|
|
|
```python
|
|
# Exchange timezone: America/New_York (UTC-5)
|
|
# Exchange date: 2025-12-25 18:00:00 EST
|
|
|
|
exchange_tz = pytz.timezone("America/New_York")
|
|
exchange_datetime_local = exchange_tz.localize(datetime(2025, 12, 25, 18, 0, 0))
|
|
exchange_datetime_utc = exchange_datetime_local.astimezone(pytz.utc)
|
|
|
|
# Schedule reminder 7 days before (in UTC)
|
|
reminder_datetime_utc = exchange_datetime_utc - timedelta(days=7)
|
|
|
|
scheduler.add_job(
|
|
func=send_reminder_for_exchange,
|
|
trigger='date',
|
|
run_date=reminder_datetime_utc, # Scheduled in UTC
|
|
...
|
|
)
|
|
```
|
|
|
|
## Security Considerations
|
|
|
|
### Job Execution Context
|
|
|
|
- Jobs run with full database access (no user session)
|
|
- No authentication required for job functions
|
|
- Jobs must validate data before operations
|
|
|
|
### Preventing Abuse
|
|
|
|
- Jobs are internal only (not exposed via API)
|
|
- No user-controlled job scheduling
|
|
- Admin cannot arbitrarily schedule jobs (only through defined workflows)
|
|
|
|
### Data Safety
|
|
|
|
- All database operations wrapped in transactions
|
|
- Rollback on error prevents partial updates
|
|
- Deletion jobs log before purging
|
|
|
|
## Deployment Considerations
|
|
|
|
### Container Restarts
|
|
|
|
- APScheduler persists jobs to database
|
|
- On restart, scheduler resumes scheduled jobs
|
|
- Missed jobs execute if within `misfire_grace_time`
|
|
|
|
### Scaling
|
|
|
|
**Single Instance**:
|
|
- Current design assumes single application instance
|
|
- SQLite doesn't support multi-instance locking
|
|
|
|
**Future Multi-Instance**:
|
|
- Would require external job store (Redis, PostgreSQL)
|
|
- Distributed locking to prevent duplicate execution
|
|
|
|
### Health Checks
|
|
|
|
Monitor scheduler health:
|
|
|
|
```python
|
|
def scheduler_health_check():
|
|
"""Check if scheduler is running."""
|
|
from flask import current_app
|
|
|
|
scheduler = current_app.scheduler
|
|
|
|
if not scheduler or not scheduler.running:
|
|
return {"status": "unhealthy", "reason": "Scheduler not running"}
|
|
|
|
return {"status": "healthy", "jobs": len(scheduler.get_jobs())}
|
|
```
|
|
|
|
## Future Enhancements
|
|
|
|
1. **Job Retry Policy**: Automatic retry with exponential backoff
|
|
2. **Job Dashboard**: Admin UI to view scheduled jobs and execution history
|
|
3. **Manual Job Triggers**: Allow admin to manually trigger data purge or reminders
|
|
4. **Job Metrics**: Track execution time, success rate, failure patterns
|
|
5. **Distributed Scheduler**: Support for multi-instance deployments
|
|
6. **Webhook Jobs**: Trigger external webhooks on events
|
|
|
|
## References
|
|
|
|
- [APScheduler Documentation](https://apscheduler.readthedocs.io/)
|
|
- [APScheduler User Guide](https://apscheduler.readthedocs.io/en/stable/userguide.html)
|
|
- [Flask APScheduler Integration](https://github.com/viniciuschiele/flask-apscheduler)
|
|
- [Timezone Handling in Python](https://pytz.sourceforge.net/)
|
|
- [Data Model Specification](../data-model.md)
|
|
- [Notifications Component Design](./notifications.md)
|