Following design in ADR-020, implementation guidance, and quick reference. Phase 1: Migration System Core (starpunk/migrations.py) - Create migration runner with fresh database detection - Implement is_schema_current() heuristic for fresh DB detection - Add helper functions (table_exists, column_exists, index_exists) - Complete error handling with MigrationError exception - 315 lines of production code Phase 2: Database Integration (starpunk/database.py) - Modify init_db() to call run_migrations() - Add logger parameter handling - 5 lines changed for integration Phase 3: Comprehensive Testing (tests/test_migrations.py) - 26 tests covering all scenarios (100% pass rate) - Tests for fresh DB, legacy DB, helpers, error handling - Integration test with actual migration file - 560 lines of test code Phase 4: Version and Documentation - Bump version to 0.9.0 (MINOR increment per versioning strategy) - Update CHANGELOG.md with comprehensive v0.9.0 entry - Create implementation report documenting all details Features: - Fresh database detection prevents unnecessary migrations - Legacy database detection applies pending migrations automatically - Migration tracking table records all applied migrations - Idempotent execution safe for multiple runs - Fail-safe: app won't start if migrations fail - Container deployments now fully automatic Testing: - All 26 migration tests passing (100%) - Fresh database scenario verified (auto-skip) - Legacy database scenario verified (migrations applied) - Idempotent behavior confirmed Documentation: - Implementation report in docs/reports/ - CHANGELOG.md updated with v0.9.0 entry - All architecture decisions from ADR-020 implemented 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
312 lines
8.5 KiB
Python
312 lines
8.5 KiB
Python
"""
|
|
Database migration runner for StarPunk
|
|
|
|
Automatically discovers and applies pending migrations on startup.
|
|
Migrations are numbered SQL files in the migrations/ directory.
|
|
|
|
Fresh Database Detection:
|
|
- If schema_migrations table is empty AND schema is current
|
|
- Marks all migrations as applied (skip execution)
|
|
- This handles databases created with current SCHEMA_SQL
|
|
|
|
Existing Database Behavior:
|
|
- Applies only pending migrations
|
|
- Migrations already in schema_migrations are skipped
|
|
"""
|
|
|
|
import sqlite3
|
|
from pathlib import Path
|
|
import logging
|
|
|
|
|
|
class MigrationError(Exception):
|
|
"""Raised when a migration fails to apply"""
|
|
pass
|
|
|
|
|
|
def create_migrations_table(conn):
|
|
"""
|
|
Create schema_migrations tracking table if it doesn't exist
|
|
|
|
Args:
|
|
conn: SQLite connection
|
|
"""
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS schema_migrations (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
migration_name TEXT UNIQUE NOT NULL,
|
|
applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
|
|
conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_schema_migrations_name
|
|
ON schema_migrations(migration_name)
|
|
""")
|
|
|
|
conn.commit()
|
|
|
|
|
|
def is_schema_current(conn):
|
|
"""
|
|
Check if database schema is current (matches SCHEMA_SQL)
|
|
|
|
Uses heuristic: Check for presence of latest schema features
|
|
Currently checks for code_verifier column in auth_state table
|
|
|
|
Args:
|
|
conn: SQLite connection
|
|
|
|
Returns:
|
|
bool: True if schema appears current, False if legacy
|
|
"""
|
|
try:
|
|
cursor = conn.execute("PRAGMA table_info(auth_state)")
|
|
columns = [row[1] for row in cursor.fetchall()]
|
|
return 'code_verifier' in columns
|
|
except sqlite3.OperationalError:
|
|
# Table doesn't exist - definitely not current
|
|
return False
|
|
|
|
|
|
def table_exists(conn, table_name):
|
|
"""
|
|
Check if table exists in database
|
|
|
|
Args:
|
|
conn: SQLite connection
|
|
table_name: Name of table to check
|
|
|
|
Returns:
|
|
bool: True if table exists
|
|
"""
|
|
cursor = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
|
|
(table_name,)
|
|
)
|
|
return cursor.fetchone() is not None
|
|
|
|
|
|
def column_exists(conn, table_name, column_name):
|
|
"""
|
|
Check if column exists in table
|
|
|
|
Args:
|
|
conn: SQLite connection
|
|
table_name: Name of table
|
|
column_name: Name of column
|
|
|
|
Returns:
|
|
bool: True if column exists
|
|
"""
|
|
try:
|
|
cursor = conn.execute(f"PRAGMA table_info({table_name})")
|
|
columns = [row[1] for row in cursor.fetchall()]
|
|
return column_name in columns
|
|
except sqlite3.OperationalError:
|
|
return False
|
|
|
|
|
|
def index_exists(conn, index_name):
|
|
"""
|
|
Check if index exists in database
|
|
|
|
Args:
|
|
conn: SQLite connection
|
|
index_name: Name of index to check
|
|
|
|
Returns:
|
|
bool: True if index exists
|
|
"""
|
|
cursor = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='index' AND name=?",
|
|
(index_name,)
|
|
)
|
|
return cursor.fetchone() is not None
|
|
|
|
|
|
def get_applied_migrations(conn):
|
|
"""
|
|
Get set of already-applied migration names
|
|
|
|
Args:
|
|
conn: SQLite connection
|
|
|
|
Returns:
|
|
set: Set of migration filenames that have been applied
|
|
"""
|
|
cursor = conn.execute(
|
|
"SELECT migration_name FROM schema_migrations ORDER BY id"
|
|
)
|
|
return set(row[0] for row in cursor.fetchall())
|
|
|
|
|
|
def discover_migration_files(migrations_dir):
|
|
"""
|
|
Discover all migration files in migrations directory
|
|
|
|
Args:
|
|
migrations_dir: Path to migrations directory
|
|
|
|
Returns:
|
|
list: Sorted list of (filename, full_path) tuples
|
|
"""
|
|
if not migrations_dir.exists():
|
|
return []
|
|
|
|
migration_files = []
|
|
for file_path in migrations_dir.glob("*.sql"):
|
|
migration_files.append((file_path.name, file_path))
|
|
|
|
# Sort by filename (numeric prefix ensures correct order)
|
|
migration_files.sort(key=lambda x: x[0])
|
|
|
|
return migration_files
|
|
|
|
|
|
def apply_migration(conn, migration_name, migration_path, logger=None):
|
|
"""
|
|
Apply a single migration file
|
|
|
|
Args:
|
|
conn: SQLite connection
|
|
migration_name: Filename of migration
|
|
migration_path: Full path to migration file
|
|
logger: Optional logger for output
|
|
|
|
Raises:
|
|
MigrationError: If migration fails to apply
|
|
"""
|
|
try:
|
|
# Read migration SQL
|
|
migration_sql = migration_path.read_text()
|
|
|
|
if logger:
|
|
logger.debug(f"Applying migration: {migration_name}")
|
|
|
|
# Execute migration in transaction
|
|
conn.execute("BEGIN")
|
|
conn.executescript(migration_sql)
|
|
|
|
# Record migration as applied
|
|
conn.execute(
|
|
"INSERT INTO schema_migrations (migration_name) VALUES (?)",
|
|
(migration_name,)
|
|
)
|
|
|
|
conn.commit()
|
|
|
|
if logger:
|
|
logger.info(f"Applied migration: {migration_name}")
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
error_msg = f"Migration {migration_name} failed: {e}"
|
|
if logger:
|
|
logger.error(error_msg)
|
|
raise MigrationError(error_msg)
|
|
|
|
|
|
def run_migrations(db_path, logger=None):
|
|
"""
|
|
Run all pending database migrations
|
|
|
|
Called automatically during database initialization.
|
|
Discovers migration files, checks which have been applied,
|
|
and applies any pending migrations in order.
|
|
|
|
Fresh Database Behavior:
|
|
- If schema_migrations table is empty AND schema is current
|
|
- Marks all migrations as applied (skip execution)
|
|
- This handles databases created with current SCHEMA_SQL
|
|
|
|
Existing Database Behavior:
|
|
- Applies only pending migrations
|
|
- Migrations already in schema_migrations are skipped
|
|
|
|
Args:
|
|
db_path: Path to SQLite database file
|
|
logger: Optional logger for output
|
|
|
|
Raises:
|
|
MigrationError: If any migration fails to apply
|
|
"""
|
|
if logger is None:
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Determine migrations directory
|
|
# Assumes migrations/ is in project root, sibling to starpunk/
|
|
migrations_dir = Path(__file__).parent.parent / "migrations"
|
|
|
|
if not migrations_dir.exists():
|
|
logger.warning(f"Migrations directory not found: {migrations_dir}")
|
|
return
|
|
|
|
# Connect to database
|
|
conn = sqlite3.connect(db_path)
|
|
|
|
try:
|
|
# Ensure migrations tracking table exists
|
|
create_migrations_table(conn)
|
|
|
|
# Check if this is a fresh database with current schema
|
|
cursor = conn.execute("SELECT COUNT(*) FROM schema_migrations")
|
|
migration_count = cursor.fetchone()[0]
|
|
|
|
# Discover migration files
|
|
migration_files = discover_migration_files(migrations_dir)
|
|
|
|
if not migration_files:
|
|
logger.info("No migration files found")
|
|
return
|
|
|
|
# Fresh database detection
|
|
if migration_count == 0:
|
|
if is_schema_current(conn):
|
|
# Schema is current - mark all migrations as applied
|
|
for migration_name, _ in migration_files:
|
|
conn.execute(
|
|
"INSERT INTO schema_migrations (migration_name) VALUES (?)",
|
|
(migration_name,)
|
|
)
|
|
conn.commit()
|
|
logger.info(
|
|
f"Fresh database detected: marked {len(migration_files)} "
|
|
f"migrations as applied (schema already current)"
|
|
)
|
|
return
|
|
else:
|
|
logger.info("Legacy database detected: applying all migrations")
|
|
|
|
# Get already-applied migrations
|
|
applied = get_applied_migrations(conn)
|
|
|
|
# Apply pending migrations
|
|
pending_count = 0
|
|
for migration_name, migration_path in migration_files:
|
|
if migration_name not in applied:
|
|
apply_migration(conn, migration_name, migration_path, logger)
|
|
pending_count += 1
|
|
|
|
# Summary
|
|
total_count = len(migration_files)
|
|
if pending_count > 0:
|
|
logger.info(
|
|
f"Migrations complete: {pending_count} applied, "
|
|
f"{total_count} total"
|
|
)
|
|
else:
|
|
logger.info(f"All migrations up to date ({total_count} total)")
|
|
|
|
except MigrationError:
|
|
# Re-raise migration errors (already logged)
|
|
raise
|
|
|
|
except Exception as e:
|
|
error_msg = f"Migration system error: {e}"
|
|
logger.error(error_msg)
|
|
raise MigrationError(error_msg)
|
|
|
|
finally:
|
|
conn.close()
|