Implement automatic database migration system
Following design in ADR-020, implementation guidance, and quick reference. Phase 1: Migration System Core (starpunk/migrations.py) - Create migration runner with fresh database detection - Implement is_schema_current() heuristic for fresh DB detection - Add helper functions (table_exists, column_exists, index_exists) - Complete error handling with MigrationError exception - 315 lines of production code Phase 2: Database Integration (starpunk/database.py) - Modify init_db() to call run_migrations() - Add logger parameter handling - 5 lines changed for integration Phase 3: Comprehensive Testing (tests/test_migrations.py) - 26 tests covering all scenarios (100% pass rate) - Tests for fresh DB, legacy DB, helpers, error handling - Integration test with actual migration file - 560 lines of test code Phase 4: Version and Documentation - Bump version to 0.9.0 (MINOR increment per versioning strategy) - Update CHANGELOG.md with comprehensive v0.9.0 entry - Create implementation report documenting all details Features: - Fresh database detection prevents unnecessary migrations - Legacy database detection applies pending migrations automatically - Migration tracking table records all applied migrations - Idempotent execution safe for multiple runs - Fail-safe: app won't start if migrations fail - Container deployments now fully automatic Testing: - All 26 migration tests passing (100%) - Fresh database scenario verified (auto-skip) - Legacy database scenario verified (migrations applied) - Idempotent behavior confirmed Documentation: - Implementation report in docs/reports/ - CHANGELOG.md updated with v0.9.0 entry - All architecture decisions from ADR-020 implemented 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
311
starpunk/migrations.py
Normal file
311
starpunk/migrations.py
Normal file
@@ -0,0 +1,311 @@
|
||||
"""
|
||||
Database migration runner for StarPunk
|
||||
|
||||
Automatically discovers and applies pending migrations on startup.
|
||||
Migrations are numbered SQL files in the migrations/ directory.
|
||||
|
||||
Fresh Database Detection:
|
||||
- If schema_migrations table is empty AND schema is current
|
||||
- Marks all migrations as applied (skip execution)
|
||||
- This handles databases created with current SCHEMA_SQL
|
||||
|
||||
Existing Database Behavior:
|
||||
- Applies only pending migrations
|
||||
- Migrations already in schema_migrations are skipped
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
import logging
|
||||
|
||||
|
||||
class MigrationError(Exception):
|
||||
"""Raised when a migration fails to apply"""
|
||||
pass
|
||||
|
||||
|
||||
def create_migrations_table(conn):
|
||||
"""
|
||||
Create schema_migrations tracking table if it doesn't exist
|
||||
|
||||
Args:
|
||||
conn: SQLite connection
|
||||
"""
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS schema_migrations (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
migration_name TEXT UNIQUE NOT NULL,
|
||||
applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
""")
|
||||
|
||||
conn.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_schema_migrations_name
|
||||
ON schema_migrations(migration_name)
|
||||
""")
|
||||
|
||||
conn.commit()
|
||||
|
||||
|
||||
def is_schema_current(conn):
|
||||
"""
|
||||
Check if database schema is current (matches SCHEMA_SQL)
|
||||
|
||||
Uses heuristic: Check for presence of latest schema features
|
||||
Currently checks for code_verifier column in auth_state table
|
||||
|
||||
Args:
|
||||
conn: SQLite connection
|
||||
|
||||
Returns:
|
||||
bool: True if schema appears current, False if legacy
|
||||
"""
|
||||
try:
|
||||
cursor = conn.execute("PRAGMA table_info(auth_state)")
|
||||
columns = [row[1] for row in cursor.fetchall()]
|
||||
return 'code_verifier' in columns
|
||||
except sqlite3.OperationalError:
|
||||
# Table doesn't exist - definitely not current
|
||||
return False
|
||||
|
||||
|
||||
def table_exists(conn, table_name):
|
||||
"""
|
||||
Check if table exists in database
|
||||
|
||||
Args:
|
||||
conn: SQLite connection
|
||||
table_name: Name of table to check
|
||||
|
||||
Returns:
|
||||
bool: True if table exists
|
||||
"""
|
||||
cursor = conn.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
|
||||
(table_name,)
|
||||
)
|
||||
return cursor.fetchone() is not None
|
||||
|
||||
|
||||
def column_exists(conn, table_name, column_name):
|
||||
"""
|
||||
Check if column exists in table
|
||||
|
||||
Args:
|
||||
conn: SQLite connection
|
||||
table_name: Name of table
|
||||
column_name: Name of column
|
||||
|
||||
Returns:
|
||||
bool: True if column exists
|
||||
"""
|
||||
try:
|
||||
cursor = conn.execute(f"PRAGMA table_info({table_name})")
|
||||
columns = [row[1] for row in cursor.fetchall()]
|
||||
return column_name in columns
|
||||
except sqlite3.OperationalError:
|
||||
return False
|
||||
|
||||
|
||||
def index_exists(conn, index_name):
|
||||
"""
|
||||
Check if index exists in database
|
||||
|
||||
Args:
|
||||
conn: SQLite connection
|
||||
index_name: Name of index to check
|
||||
|
||||
Returns:
|
||||
bool: True if index exists
|
||||
"""
|
||||
cursor = conn.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='index' AND name=?",
|
||||
(index_name,)
|
||||
)
|
||||
return cursor.fetchone() is not None
|
||||
|
||||
|
||||
def get_applied_migrations(conn):
|
||||
"""
|
||||
Get set of already-applied migration names
|
||||
|
||||
Args:
|
||||
conn: SQLite connection
|
||||
|
||||
Returns:
|
||||
set: Set of migration filenames that have been applied
|
||||
"""
|
||||
cursor = conn.execute(
|
||||
"SELECT migration_name FROM schema_migrations ORDER BY id"
|
||||
)
|
||||
return set(row[0] for row in cursor.fetchall())
|
||||
|
||||
|
||||
def discover_migration_files(migrations_dir):
|
||||
"""
|
||||
Discover all migration files in migrations directory
|
||||
|
||||
Args:
|
||||
migrations_dir: Path to migrations directory
|
||||
|
||||
Returns:
|
||||
list: Sorted list of (filename, full_path) tuples
|
||||
"""
|
||||
if not migrations_dir.exists():
|
||||
return []
|
||||
|
||||
migration_files = []
|
||||
for file_path in migrations_dir.glob("*.sql"):
|
||||
migration_files.append((file_path.name, file_path))
|
||||
|
||||
# Sort by filename (numeric prefix ensures correct order)
|
||||
migration_files.sort(key=lambda x: x[0])
|
||||
|
||||
return migration_files
|
||||
|
||||
|
||||
def apply_migration(conn, migration_name, migration_path, logger=None):
|
||||
"""
|
||||
Apply a single migration file
|
||||
|
||||
Args:
|
||||
conn: SQLite connection
|
||||
migration_name: Filename of migration
|
||||
migration_path: Full path to migration file
|
||||
logger: Optional logger for output
|
||||
|
||||
Raises:
|
||||
MigrationError: If migration fails to apply
|
||||
"""
|
||||
try:
|
||||
# Read migration SQL
|
||||
migration_sql = migration_path.read_text()
|
||||
|
||||
if logger:
|
||||
logger.debug(f"Applying migration: {migration_name}")
|
||||
|
||||
# Execute migration in transaction
|
||||
conn.execute("BEGIN")
|
||||
conn.executescript(migration_sql)
|
||||
|
||||
# Record migration as applied
|
||||
conn.execute(
|
||||
"INSERT INTO schema_migrations (migration_name) VALUES (?)",
|
||||
(migration_name,)
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
|
||||
if logger:
|
||||
logger.info(f"Applied migration: {migration_name}")
|
||||
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
error_msg = f"Migration {migration_name} failed: {e}"
|
||||
if logger:
|
||||
logger.error(error_msg)
|
||||
raise MigrationError(error_msg)
|
||||
|
||||
|
||||
def run_migrations(db_path, logger=None):
|
||||
"""
|
||||
Run all pending database migrations
|
||||
|
||||
Called automatically during database initialization.
|
||||
Discovers migration files, checks which have been applied,
|
||||
and applies any pending migrations in order.
|
||||
|
||||
Fresh Database Behavior:
|
||||
- If schema_migrations table is empty AND schema is current
|
||||
- Marks all migrations as applied (skip execution)
|
||||
- This handles databases created with current SCHEMA_SQL
|
||||
|
||||
Existing Database Behavior:
|
||||
- Applies only pending migrations
|
||||
- Migrations already in schema_migrations are skipped
|
||||
|
||||
Args:
|
||||
db_path: Path to SQLite database file
|
||||
logger: Optional logger for output
|
||||
|
||||
Raises:
|
||||
MigrationError: If any migration fails to apply
|
||||
"""
|
||||
if logger is None:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Determine migrations directory
|
||||
# Assumes migrations/ is in project root, sibling to starpunk/
|
||||
migrations_dir = Path(__file__).parent.parent / "migrations"
|
||||
|
||||
if not migrations_dir.exists():
|
||||
logger.warning(f"Migrations directory not found: {migrations_dir}")
|
||||
return
|
||||
|
||||
# Connect to database
|
||||
conn = sqlite3.connect(db_path)
|
||||
|
||||
try:
|
||||
# Ensure migrations tracking table exists
|
||||
create_migrations_table(conn)
|
||||
|
||||
# Check if this is a fresh database with current schema
|
||||
cursor = conn.execute("SELECT COUNT(*) FROM schema_migrations")
|
||||
migration_count = cursor.fetchone()[0]
|
||||
|
||||
# Discover migration files
|
||||
migration_files = discover_migration_files(migrations_dir)
|
||||
|
||||
if not migration_files:
|
||||
logger.info("No migration files found")
|
||||
return
|
||||
|
||||
# Fresh database detection
|
||||
if migration_count == 0:
|
||||
if is_schema_current(conn):
|
||||
# Schema is current - mark all migrations as applied
|
||||
for migration_name, _ in migration_files:
|
||||
conn.execute(
|
||||
"INSERT INTO schema_migrations (migration_name) VALUES (?)",
|
||||
(migration_name,)
|
||||
)
|
||||
conn.commit()
|
||||
logger.info(
|
||||
f"Fresh database detected: marked {len(migration_files)} "
|
||||
f"migrations as applied (schema already current)"
|
||||
)
|
||||
return
|
||||
else:
|
||||
logger.info("Legacy database detected: applying all migrations")
|
||||
|
||||
# Get already-applied migrations
|
||||
applied = get_applied_migrations(conn)
|
||||
|
||||
# Apply pending migrations
|
||||
pending_count = 0
|
||||
for migration_name, migration_path in migration_files:
|
||||
if migration_name not in applied:
|
||||
apply_migration(conn, migration_name, migration_path, logger)
|
||||
pending_count += 1
|
||||
|
||||
# Summary
|
||||
total_count = len(migration_files)
|
||||
if pending_count > 0:
|
||||
logger.info(
|
||||
f"Migrations complete: {pending_count} applied, "
|
||||
f"{total_count} total"
|
||||
)
|
||||
else:
|
||||
logger.info(f"All migrations up to date ({total_count} total)")
|
||||
|
||||
except MigrationError:
|
||||
# Re-raise migration errors (already logged)
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Migration system error: {e}"
|
||||
logger.error(error_msg)
|
||||
raise MigrationError(error_msg)
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
Reference in New Issue
Block a user