Files
StarPunk/starpunk/migrations.py
Phil Skentelbery 9a805ec316 Implement automatic database migration system
Following design in ADR-020, implementation guidance, and quick reference.

Phase 1: Migration System Core (starpunk/migrations.py)
- Create migration runner with fresh database detection
- Implement is_schema_current() heuristic for fresh DB detection
- Add helper functions (table_exists, column_exists, index_exists)
- Complete error handling with MigrationError exception
- 315 lines of production code

Phase 2: Database Integration (starpunk/database.py)
- Modify init_db() to call run_migrations()
- Add logger parameter handling
- 5 lines changed for integration

Phase 3: Comprehensive Testing (tests/test_migrations.py)
- 26 tests covering all scenarios (100% pass rate)
- Tests for fresh DB, legacy DB, helpers, error handling
- Integration test with actual migration file
- 560 lines of test code

Phase 4: Version and Documentation
- Bump version to 0.9.0 (MINOR increment per versioning strategy)
- Update CHANGELOG.md with comprehensive v0.9.0 entry
- Create implementation report documenting all details

Features:
- Fresh database detection prevents unnecessary migrations
- Legacy database detection applies pending migrations automatically
- Migration tracking table records all applied migrations
- Idempotent execution safe for multiple runs
- Fail-safe: app won't start if migrations fail
- Container deployments now fully automatic

Testing:
- All 26 migration tests passing (100%)
- Fresh database scenario verified (auto-skip)
- Legacy database scenario verified (migrations applied)
- Idempotent behavior confirmed

Documentation:
- Implementation report in docs/reports/
- CHANGELOG.md updated with v0.9.0 entry
- All architecture decisions from ADR-020 implemented

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-19 16:08:33 -07:00

312 lines
8.5 KiB
Python

"""
Database migration runner for StarPunk
Automatically discovers and applies pending migrations on startup.
Migrations are numbered SQL files in the migrations/ directory.
Fresh Database Detection:
- If schema_migrations table is empty AND schema is current
- Marks all migrations as applied (skip execution)
- This handles databases created with current SCHEMA_SQL
Existing Database Behavior:
- Applies only pending migrations
- Migrations already in schema_migrations are skipped
"""
import sqlite3
from pathlib import Path
import logging
class MigrationError(Exception):
"""Raised when a migration fails to apply"""
pass
def create_migrations_table(conn):
"""
Create schema_migrations tracking table if it doesn't exist
Args:
conn: SQLite connection
"""
conn.execute("""
CREATE TABLE IF NOT EXISTS schema_migrations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
migration_name TEXT UNIQUE NOT NULL,
applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_schema_migrations_name
ON schema_migrations(migration_name)
""")
conn.commit()
def is_schema_current(conn):
"""
Check if database schema is current (matches SCHEMA_SQL)
Uses heuristic: Check for presence of latest schema features
Currently checks for code_verifier column in auth_state table
Args:
conn: SQLite connection
Returns:
bool: True if schema appears current, False if legacy
"""
try:
cursor = conn.execute("PRAGMA table_info(auth_state)")
columns = [row[1] for row in cursor.fetchall()]
return 'code_verifier' in columns
except sqlite3.OperationalError:
# Table doesn't exist - definitely not current
return False
def table_exists(conn, table_name):
"""
Check if table exists in database
Args:
conn: SQLite connection
table_name: Name of table to check
Returns:
bool: True if table exists
"""
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(table_name,)
)
return cursor.fetchone() is not None
def column_exists(conn, table_name, column_name):
"""
Check if column exists in table
Args:
conn: SQLite connection
table_name: Name of table
column_name: Name of column
Returns:
bool: True if column exists
"""
try:
cursor = conn.execute(f"PRAGMA table_info({table_name})")
columns = [row[1] for row in cursor.fetchall()]
return column_name in columns
except sqlite3.OperationalError:
return False
def index_exists(conn, index_name):
"""
Check if index exists in database
Args:
conn: SQLite connection
index_name: Name of index to check
Returns:
bool: True if index exists
"""
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='index' AND name=?",
(index_name,)
)
return cursor.fetchone() is not None
def get_applied_migrations(conn):
"""
Get set of already-applied migration names
Args:
conn: SQLite connection
Returns:
set: Set of migration filenames that have been applied
"""
cursor = conn.execute(
"SELECT migration_name FROM schema_migrations ORDER BY id"
)
return set(row[0] for row in cursor.fetchall())
def discover_migration_files(migrations_dir):
"""
Discover all migration files in migrations directory
Args:
migrations_dir: Path to migrations directory
Returns:
list: Sorted list of (filename, full_path) tuples
"""
if not migrations_dir.exists():
return []
migration_files = []
for file_path in migrations_dir.glob("*.sql"):
migration_files.append((file_path.name, file_path))
# Sort by filename (numeric prefix ensures correct order)
migration_files.sort(key=lambda x: x[0])
return migration_files
def apply_migration(conn, migration_name, migration_path, logger=None):
"""
Apply a single migration file
Args:
conn: SQLite connection
migration_name: Filename of migration
migration_path: Full path to migration file
logger: Optional logger for output
Raises:
MigrationError: If migration fails to apply
"""
try:
# Read migration SQL
migration_sql = migration_path.read_text()
if logger:
logger.debug(f"Applying migration: {migration_name}")
# Execute migration in transaction
conn.execute("BEGIN")
conn.executescript(migration_sql)
# Record migration as applied
conn.execute(
"INSERT INTO schema_migrations (migration_name) VALUES (?)",
(migration_name,)
)
conn.commit()
if logger:
logger.info(f"Applied migration: {migration_name}")
except Exception as e:
conn.rollback()
error_msg = f"Migration {migration_name} failed: {e}"
if logger:
logger.error(error_msg)
raise MigrationError(error_msg)
def run_migrations(db_path, logger=None):
"""
Run all pending database migrations
Called automatically during database initialization.
Discovers migration files, checks which have been applied,
and applies any pending migrations in order.
Fresh Database Behavior:
- If schema_migrations table is empty AND schema is current
- Marks all migrations as applied (skip execution)
- This handles databases created with current SCHEMA_SQL
Existing Database Behavior:
- Applies only pending migrations
- Migrations already in schema_migrations are skipped
Args:
db_path: Path to SQLite database file
logger: Optional logger for output
Raises:
MigrationError: If any migration fails to apply
"""
if logger is None:
logger = logging.getLogger(__name__)
# Determine migrations directory
# Assumes migrations/ is in project root, sibling to starpunk/
migrations_dir = Path(__file__).parent.parent / "migrations"
if not migrations_dir.exists():
logger.warning(f"Migrations directory not found: {migrations_dir}")
return
# Connect to database
conn = sqlite3.connect(db_path)
try:
# Ensure migrations tracking table exists
create_migrations_table(conn)
# Check if this is a fresh database with current schema
cursor = conn.execute("SELECT COUNT(*) FROM schema_migrations")
migration_count = cursor.fetchone()[0]
# Discover migration files
migration_files = discover_migration_files(migrations_dir)
if not migration_files:
logger.info("No migration files found")
return
# Fresh database detection
if migration_count == 0:
if is_schema_current(conn):
# Schema is current - mark all migrations as applied
for migration_name, _ in migration_files:
conn.execute(
"INSERT INTO schema_migrations (migration_name) VALUES (?)",
(migration_name,)
)
conn.commit()
logger.info(
f"Fresh database detected: marked {len(migration_files)} "
f"migrations as applied (schema already current)"
)
return
else:
logger.info("Legacy database detected: applying all migrations")
# Get already-applied migrations
applied = get_applied_migrations(conn)
# Apply pending migrations
pending_count = 0
for migration_name, migration_path in migration_files:
if migration_name not in applied:
apply_migration(conn, migration_name, migration_path, logger)
pending_count += 1
# Summary
total_count = len(migration_files)
if pending_count > 0:
logger.info(
f"Migrations complete: {pending_count} applied, "
f"{total_count} total"
)
else:
logger.info(f"All migrations up to date ({total_count} total)")
except MigrationError:
# Re-raise migration errors (already logged)
raise
except Exception as e:
error_msg = f"Migration system error: {e}"
logger.error(error_msg)
raise MigrationError(error_msg)
finally:
conn.close()