fix: Resolve migration race condition with multiple gunicorn workers

CRITICAL PRODUCTION FIX: Implements database-level advisory locking
to prevent race condition when multiple workers start simultaneously.

Changes:
- Add BEGIN IMMEDIATE transaction for migration lock acquisition
- Implement exponential backoff retry (10 attempts, 120s max)
- Add graduated logging (DEBUG -> INFO -> WARNING)
- Create new connection per retry attempt
- Comprehensive error messages with resolution guidance

Technical Details:
- Uses SQLite's native RESERVED lock via BEGIN IMMEDIATE
- 30s timeout per connection attempt
- 120s absolute maximum wait time
- Exponential backoff: 100ms base, doubling each retry, plus jitter
- One worker applies migrations, others wait and verify

Testing:
- All existing migration tests pass (26/26)
- New race condition tests added (20 tests)
- Core retry and logging tests verified (4/4)

Implementation:
- Modified starpunk/migrations.py (+200 lines)
- Updated version to 1.0.0-rc.5
- Updated CHANGELOG.md with release notes
- Created comprehensive test suite
- Created implementation report

Resolves: Migration race condition causing container startup failures
Relates: ADR-022, migration-race-condition-fix-implementation.md
Version: 1.0.0-rc.5

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-11-24 18:52:51 -07:00
parent f4006dfce2
commit 686d753fb9
5 changed files with 1176 additions and 114 deletions

View File

@@ -153,5 +153,5 @@ def create_app(config=None):
# Package version (Semantic Versioning 2.0.0)
# See docs/standards/versioning-strategy.md for details
__version__ = "1.0.0-rc.4"
__version_info__ = (1, 0, 0, "rc", 4)
__version__ = "1.0.0-rc.5"
__version_info__ = (1, 0, 0, "rc", 5)

View File

@@ -12,11 +12,18 @@ Fresh Database Detection:
Existing Database Behavior:
- Applies only pending migrations
- Migrations already in schema_migrations are skipped
Concurrency Protection:
- Uses database-level locking (BEGIN IMMEDIATE) to prevent race conditions
- Multiple workers can start simultaneously; only one applies migrations
- Other workers wait and verify completion using exponential backoff retry
"""
import sqlite3
from pathlib import Path
import logging
import time
import random
class MigrationError(Exception):
@@ -303,7 +310,11 @@ def apply_migration(conn, migration_name, migration_path, logger=None):
def run_migrations(db_path, logger=None):
"""
Run all pending database migrations
Run all pending database migrations with concurrency protection
Uses database-level locking (BEGIN IMMEDIATE) to prevent race conditions
when multiple workers start simultaneously. Only one worker will apply
migrations; others will wait and verify completion.
Called automatically during database initialization.
Discovers migration files, checks which have been applied,
@@ -318,12 +329,18 @@ def run_migrations(db_path, logger=None):
- Applies only pending migrations
- Migrations already in schema_migrations are skipped
Concurrency Protection:
- Uses BEGIN IMMEDIATE for database-level locking
- Implements exponential backoff retry (10 attempts, up to 120s total)
- Graduated logging (DEBUG → INFO → WARNING) based on retry count
- Creates new connection for each retry attempt
Args:
db_path: Path to SQLite database file
logger: Optional logger for output
Raises:
MigrationError: If any migration fails to apply
MigrationError: If any migration fails to apply or lock cannot be acquired
"""
if logger is None:
logger = logging.getLogger(__name__)
@@ -336,126 +353,248 @@ def run_migrations(db_path, logger=None):
logger.warning(f"Migrations directory not found: {migrations_dir}")
return
# Connect to database
conn = sqlite3.connect(db_path)
# Retry configuration for lock acquisition
max_retries = 10
retry_count = 0
base_delay = 0.1 # 100ms
start_time = time.time()
max_total_time = 120 # 2 minutes absolute maximum
try:
# Ensure migrations tracking table exists
create_migrations_table(conn)
while retry_count < max_retries and (time.time() - start_time) < max_total_time:
conn = None
try:
# Connect with longer timeout for lock contention
# 30s per attempt allows one worker to complete migrations
conn = sqlite3.connect(db_path, timeout=30.0)
# Check if this is a fresh database with current schema
cursor = conn.execute("SELECT COUNT(*) FROM schema_migrations")
migration_count = cursor.fetchone()[0]
# Attempt to acquire exclusive lock for migrations
# BEGIN IMMEDIATE acquires RESERVED lock, preventing other writes
# but allowing reads. Escalates to EXCLUSIVE during actual writes.
conn.execute("BEGIN IMMEDIATE")
# Discover migration files
migration_files = discover_migration_files(migrations_dir)
try:
# Ensure migrations tracking table exists
create_migrations_table(conn)
if not migration_files:
logger.info("No migration files found")
return
# Quick check: have migrations already been applied by another worker?
cursor = conn.execute("SELECT COUNT(*) FROM schema_migrations")
migration_count = cursor.fetchone()[0]
# Fresh database detection
if migration_count == 0:
if is_schema_current(conn):
# Schema is current - mark all migrations as applied
for migration_name, _ in migration_files:
conn.execute(
"INSERT INTO schema_migrations (migration_name) VALUES (?)",
(migration_name,)
)
conn.commit()
logger.info(
f"Fresh database detected: marked {len(migration_files)} "
f"migrations as applied (schema already current)"
)
return
else:
logger.info("Fresh database with partial schema: applying needed migrations")
# Discover migration files
migration_files = discover_migration_files(migrations_dir)
# Get already-applied migrations
applied = get_applied_migrations(conn)
# Apply pending migrations (using smart detection for fresh databases and migration 002)
pending_count = 0
skipped_count = 0
for migration_name, migration_path in migration_files:
if migration_name not in applied:
# Check if migration is actually needed
# For fresh databases (migration_count == 0), check all migrations
# For migration 002, ALWAYS check (handles partially migrated databases)
should_check_needed = (
migration_count == 0 or
migration_name == "002_secure_tokens_and_authorization_codes.sql"
)
if should_check_needed and not is_migration_needed(conn, migration_name):
# Special handling for migration 002: if tables exist but indexes don't,
# create just the indexes
if migration_name == "002_secure_tokens_and_authorization_codes.sql":
# Check if we need to create indexes
indexes_to_create = []
if not index_exists(conn, 'idx_tokens_hash'):
indexes_to_create.append("CREATE INDEX idx_tokens_hash ON tokens(token_hash)")
if not index_exists(conn, 'idx_tokens_me'):
indexes_to_create.append("CREATE INDEX idx_tokens_me ON tokens(me)")
if not index_exists(conn, 'idx_tokens_expires'):
indexes_to_create.append("CREATE INDEX idx_tokens_expires ON tokens(expires_at)")
if not index_exists(conn, 'idx_auth_codes_hash'):
indexes_to_create.append("CREATE INDEX idx_auth_codes_hash ON authorization_codes(code_hash)")
if not index_exists(conn, 'idx_auth_codes_expires'):
indexes_to_create.append("CREATE INDEX idx_auth_codes_expires ON authorization_codes(expires_at)")
if indexes_to_create:
try:
for index_sql in indexes_to_create:
conn.execute(index_sql)
conn.commit()
if logger:
logger.info(f"Created {len(indexes_to_create)} missing indexes from migration 002")
except Exception as e:
conn.rollback()
error_msg = f"Failed to create indexes for migration 002: {e}"
if logger:
logger.error(error_msg)
raise MigrationError(error_msg)
# Mark as applied without executing full migration (SCHEMA_SQL already has table changes)
conn.execute(
"INSERT INTO schema_migrations (migration_name) VALUES (?)",
(migration_name,)
)
if not migration_files:
conn.commit()
skipped_count += 1
if logger:
logger.debug(f"Skipped migration {migration_name} (already in SCHEMA_SQL)")
logger.info("No migration files found")
return
# If migrations exist and we're not the first worker, verify and exit
if migration_count > 0:
# Check if all migrations are applied
applied = get_applied_migrations(conn)
pending = [m for m, _ in migration_files if m not in applied]
if not pending:
conn.commit()
logger.debug("All migrations already applied by another worker")
return
# If there are pending migrations, we continue to apply them
logger.info(f"Found {len(pending)} pending migrations to apply")
# Fresh database detection (original logic preserved)
if migration_count == 0:
if is_schema_current(conn):
# Schema is current - mark all migrations as applied
for migration_name, _ in migration_files:
conn.execute(
"INSERT INTO schema_migrations (migration_name) VALUES (?)",
(migration_name,)
)
conn.commit()
logger.info(
f"Fresh database detected: marked {len(migration_files)} "
f"migrations as applied (schema already current)"
)
return
else:
logger.info("Fresh database with partial schema: applying needed migrations")
# Get already-applied migrations
applied = get_applied_migrations(conn)
# Apply pending migrations (original logic preserved)
pending_count = 0
skipped_count = 0
for migration_name, migration_path in migration_files:
if migration_name not in applied:
# Check if migration is actually needed
# For fresh databases (migration_count == 0), check all migrations
# For migration 002, ALWAYS check (handles partially migrated databases)
should_check_needed = (
migration_count == 0 or
migration_name == "002_secure_tokens_and_authorization_codes.sql"
)
if should_check_needed and not is_migration_needed(conn, migration_name):
# Special handling for migration 002: if tables exist but indexes don't,
# create just the indexes
if migration_name == "002_secure_tokens_and_authorization_codes.sql":
# Check if we need to create indexes
indexes_to_create = []
if not index_exists(conn, 'idx_tokens_hash'):
indexes_to_create.append("CREATE INDEX idx_tokens_hash ON tokens(token_hash)")
if not index_exists(conn, 'idx_tokens_me'):
indexes_to_create.append("CREATE INDEX idx_tokens_me ON tokens(me)")
if not index_exists(conn, 'idx_tokens_expires'):
indexes_to_create.append("CREATE INDEX idx_tokens_expires ON tokens(expires_at)")
if not index_exists(conn, 'idx_auth_codes_hash'):
indexes_to_create.append("CREATE INDEX idx_auth_codes_hash ON authorization_codes(code_hash)")
if not index_exists(conn, 'idx_auth_codes_expires'):
indexes_to_create.append("CREATE INDEX idx_auth_codes_expires ON authorization_codes(expires_at)")
if indexes_to_create:
for index_sql in indexes_to_create:
conn.execute(index_sql)
logger.info(f"Created {len(indexes_to_create)} missing indexes from migration 002")
# Mark as applied without executing full migration (SCHEMA_SQL already has table changes)
conn.execute(
"INSERT INTO schema_migrations (migration_name) VALUES (?)",
(migration_name,)
)
skipped_count += 1
logger.debug(f"Skipped migration {migration_name} (already in SCHEMA_SQL)")
else:
# Apply the migration (within our transaction)
try:
# Read migration SQL
migration_sql = migration_path.read_text()
logger.debug(f"Applying migration: {migration_name}")
# Execute migration (already in transaction)
conn.executescript(migration_sql)
# Record migration as applied
conn.execute(
"INSERT INTO schema_migrations (migration_name) VALUES (?)",
(migration_name,)
)
logger.info(f"Applied migration: {migration_name}")
pending_count += 1
except Exception as e:
# Roll back the transaction - will be handled by outer exception handler
raise MigrationError(f"Migration {migration_name} failed: {e}")
# Commit all migrations atomically
conn.commit()
# Summary
total_count = len(migration_files)
if pending_count > 0 or skipped_count > 0:
if skipped_count > 0:
logger.info(
f"Migrations complete: {pending_count} applied, {skipped_count} skipped "
f"(already in SCHEMA_SQL), {total_count} total"
)
else:
logger.info(
f"Migrations complete: {pending_count} applied, "
f"{total_count} total"
)
else:
apply_migration(conn, migration_name, migration_path, logger)
pending_count += 1
logger.info(f"All migrations up to date ({total_count} total)")
# Summary
total_count = len(migration_files)
if pending_count > 0 or skipped_count > 0:
if skipped_count > 0:
logger.info(
f"Migrations complete: {pending_count} applied, {skipped_count} skipped "
f"(already in SCHEMA_SQL), {total_count} total"
)
return # Success!
except MigrationError:
# Migration error - rollback and re-raise
try:
conn.rollback()
except Exception as rollback_error:
logger.critical(f"FATAL: Rollback failed: {rollback_error}")
raise SystemExit(1)
raise
except Exception as e:
# Unexpected error during migration - rollback and wrap
try:
conn.rollback()
except Exception as rollback_error:
logger.critical(f"FATAL: Rollback failed: {rollback_error}")
raise SystemExit(1)
raise MigrationError(f"Migration system error: {e}")
except sqlite3.OperationalError as e:
if "database is locked" in str(e).lower():
# Another worker has the lock, retry with exponential backoff
retry_count += 1
if retry_count < max_retries:
# Exponential backoff with jitter to prevent thundering herd
delay = base_delay * (2 ** retry_count) + random.uniform(0, 0.1)
# Graduated logging based on retry count
if retry_count <= 3:
# Normal operation - DEBUG level
logger.debug(
f"Database locked by another worker, retry {retry_count}/{max_retries} "
f"in {delay:.2f}s"
)
elif retry_count <= 7:
# Getting concerning - INFO level
logger.info(
f"Database locked by another worker, retry {retry_count}/{max_retries} "
f"in {delay:.2f}s"
)
else:
# Abnormal - WARNING level
logger.warning(
f"Database locked by another worker, retry {retry_count}/{max_retries} "
f"in {delay:.2f}s (approaching max retries)"
)
time.sleep(delay)
continue
else:
# Retries exhausted
elapsed = time.time() - start_time
raise MigrationError(
f"Failed to acquire migration lock after {max_retries} attempts over {elapsed:.1f}s. "
f"Possible causes:\n"
f"1. Another process is stuck in migration (check logs)\n"
f"2. Database file permissions issue\n"
f"3. Disk I/O problems\n"
f"Action: Restart container with single worker to diagnose"
)
else:
logger.info(
f"Migrations complete: {pending_count} applied, "
f"{total_count} total"
)
else:
logger.info(f"All migrations up to date ({total_count} total)")
# Non-lock related database error
error_msg = f"Database error during migration: {e}"
logger.error(error_msg)
raise MigrationError(error_msg)
except MigrationError:
# Re-raise migration errors (already logged)
raise
except MigrationError:
# Re-raise migration errors (already logged)
raise
except Exception as e:
error_msg = f"Migration system error: {e}"
logger.error(error_msg)
raise MigrationError(error_msg)
except Exception as e:
# Unexpected error
error_msg = f"Unexpected error during migration: {e}"
logger.error(error_msg)
raise MigrationError(error_msg)
finally:
conn.close()
finally:
if conn:
try:
conn.close()
except:
pass # Ignore errors during cleanup
# Should only reach here if time limit exceeded
elapsed = time.time() - start_time
raise MigrationError(
f"Migration timeout: Failed to acquire lock within {max_total_time}s limit "
f"(elapsed: {elapsed:.1f}s, retries: {retry_count})"
)