""" Database migration runner for StarPunk Automatically discovers and applies pending migrations on startup. Migrations are numbered SQL files in the migrations/ directory. Fresh Database Detection: - If schema_migrations table is empty AND schema is current - Marks all migrations as applied (skip execution) - This handles databases created with current SCHEMA_SQL Existing Database Behavior: - Applies only pending migrations - Migrations already in schema_migrations are skipped """ import sqlite3 from pathlib import Path import logging class MigrationError(Exception): """Raised when a migration fails to apply""" pass def create_migrations_table(conn): """ Create schema_migrations tracking table if it doesn't exist Args: conn: SQLite connection """ conn.execute(""" CREATE TABLE IF NOT EXISTS schema_migrations ( id INTEGER PRIMARY KEY AUTOINCREMENT, migration_name TEXT UNIQUE NOT NULL, applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_schema_migrations_name ON schema_migrations(migration_name) """) conn.commit() def is_schema_current(conn): """ Check if database schema is current (matches SCHEMA_SQL + all migrations) Uses heuristic: Check for presence of latest schema features Checks for: - code_verifier column in auth_state (migration 001 or SCHEMA_SQL >= v0.8.0) - authorization_codes table (migration 002 or SCHEMA_SQL >= v1.0.0-rc.1) - token_hash column in tokens table (migration 002) - Token indexes (migration 002 only, removed from SCHEMA_SQL in v1.0.0-rc.2) Args: conn: SQLite connection Returns: bool: True if schema is fully current (all tables, columns, AND indexes exist) False if any piece is missing (legacy database needing migrations) """ try: # Check for code_verifier column in auth_state (migration 001) # This is also in SCHEMA_SQL, so we can't use it alone if not column_exists(conn, 'auth_state', 'code_verifier'): return False # Check for authorization_codes table (added in migration 002) if not table_exists(conn, 'authorization_codes'): return False # Check for token_hash column in tokens table (migration 002) if not column_exists(conn, 'tokens', 'token_hash'): return False # Check for token indexes (created by migration 002 ONLY) # These indexes were removed from SCHEMA_SQL in v1.0.0-rc.2 # to prevent conflicts when migrations run. # A database with tables/columns but no indexes means: # - SCHEMA_SQL was run (creating tables/columns) # - But migration 002 hasn't run yet (no indexes) # So it's NOT fully current and needs migrations. if not index_exists(conn, 'idx_tokens_hash'): return False if not index_exists(conn, 'idx_tokens_me'): return False if not index_exists(conn, 'idx_tokens_expires'): return False return True except sqlite3.OperationalError: # Schema check failed - definitely not current return False def table_exists(conn, table_name): """ Check if table exists in database Args: conn: SQLite connection table_name: Name of table to check Returns: bool: True if table exists """ cursor = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,) ) return cursor.fetchone() is not None def column_exists(conn, table_name, column_name): """ Check if column exists in table Args: conn: SQLite connection table_name: Name of table column_name: Name of column Returns: bool: True if column exists """ try: cursor = conn.execute(f"PRAGMA table_info({table_name})") columns = [row[1] for row in cursor.fetchall()] return column_name in columns except sqlite3.OperationalError: return False def index_exists(conn, index_name): """ Check if index exists in database Args: conn: SQLite connection index_name: Name of index to check Returns: bool: True if index exists """ cursor = conn.execute( "SELECT name FROM sqlite_master WHERE type='index' AND name=?", (index_name,) ) return cursor.fetchone() is not None def is_migration_needed(conn, migration_name): """ Check if a specific migration is needed based on database state This is used for fresh databases where SCHEMA_SQL may have already included some migration features. We check the actual database state rather than just applying all migrations blindly. Args: conn: SQLite connection migration_name: Migration filename to check Returns: bool: True if migration should be applied, False if already applied via SCHEMA_SQL """ # Migration 001: Adds code_verifier column to auth_state if migration_name == "001_add_code_verifier_to_auth_state.sql": # Check if column already exists (was added to SCHEMA_SQL in v0.8.0) return not column_exists(conn, 'auth_state', 'code_verifier') # Migration 002: Creates new tokens/authorization_codes tables with indexes if migration_name == "002_secure_tokens_and_authorization_codes.sql": # This migration drops and recreates the tokens table, so we check if: # 1. The new tokens table structure exists (token_hash column) # 2. The authorization_codes table exists # 3. The indexes exist # If tables/columns are missing, this is a truly legacy database - migration needed if not table_exists(conn, 'authorization_codes'): return True if not column_exists(conn, 'tokens', 'token_hash'): return True # If tables exist with correct structure, check indexes # If indexes are missing but tables exist, this is a fresh database from # SCHEMA_SQL that just needs indexes. We CANNOT run the full migration # (it will fail trying to CREATE TABLE). Instead, we mark it as not needed # and apply indexes separately. has_all_indexes = ( index_exists(conn, 'idx_tokens_hash') and index_exists(conn, 'idx_tokens_me') and index_exists(conn, 'idx_tokens_expires') and index_exists(conn, 'idx_auth_codes_hash') and index_exists(conn, 'idx_auth_codes_expires') ) if not has_all_indexes: # Tables exist but indexes missing - this is a fresh database from SCHEMA_SQL # We need to create just the indexes, not run the full migration # Return False (don't run migration) and handle indexes separately return False # All features exist - migration not needed return False # Unknown migration - assume it's needed return True def get_applied_migrations(conn): """ Get set of already-applied migration names Args: conn: SQLite connection Returns: set: Set of migration filenames that have been applied """ cursor = conn.execute( "SELECT migration_name FROM schema_migrations ORDER BY id" ) return set(row[0] for row in cursor.fetchall()) def discover_migration_files(migrations_dir): """ Discover all migration files in migrations directory Args: migrations_dir: Path to migrations directory Returns: list: Sorted list of (filename, full_path) tuples """ if not migrations_dir.exists(): return [] migration_files = [] for file_path in migrations_dir.glob("*.sql"): migration_files.append((file_path.name, file_path)) # Sort by filename (numeric prefix ensures correct order) migration_files.sort(key=lambda x: x[0]) return migration_files def apply_migration(conn, migration_name, migration_path, logger=None): """ Apply a single migration file Args: conn: SQLite connection migration_name: Filename of migration migration_path: Full path to migration file logger: Optional logger for output Raises: MigrationError: If migration fails to apply """ try: # Read migration SQL migration_sql = migration_path.read_text() if logger: logger.debug(f"Applying migration: {migration_name}") # Execute migration in transaction conn.execute("BEGIN") conn.executescript(migration_sql) # Record migration as applied conn.execute( "INSERT INTO schema_migrations (migration_name) VALUES (?)", (migration_name,) ) conn.commit() if logger: logger.info(f"Applied migration: {migration_name}") except Exception as e: conn.rollback() error_msg = f"Migration {migration_name} failed: {e}" if logger: logger.error(error_msg) raise MigrationError(error_msg) def run_migrations(db_path, logger=None): """ Run all pending database migrations Called automatically during database initialization. Discovers migration files, checks which have been applied, and applies any pending migrations in order. Fresh Database Behavior: - If schema_migrations table is empty AND schema is current - Marks all migrations as applied (skip execution) - This handles databases created with current SCHEMA_SQL Existing Database Behavior: - Applies only pending migrations - Migrations already in schema_migrations are skipped Args: db_path: Path to SQLite database file logger: Optional logger for output Raises: MigrationError: If any migration fails to apply """ if logger is None: logger = logging.getLogger(__name__) # Determine migrations directory # Assumes migrations/ is in project root, sibling to starpunk/ migrations_dir = Path(__file__).parent.parent / "migrations" if not migrations_dir.exists(): logger.warning(f"Migrations directory not found: {migrations_dir}") return # Connect to database conn = sqlite3.connect(db_path) try: # Ensure migrations tracking table exists create_migrations_table(conn) # Check if this is a fresh database with current schema cursor = conn.execute("SELECT COUNT(*) FROM schema_migrations") migration_count = cursor.fetchone()[0] # Discover migration files migration_files = discover_migration_files(migrations_dir) if not migration_files: logger.info("No migration files found") return # Fresh database detection if migration_count == 0: if is_schema_current(conn): # Schema is current - mark all migrations as applied for migration_name, _ in migration_files: conn.execute( "INSERT INTO schema_migrations (migration_name) VALUES (?)", (migration_name,) ) conn.commit() logger.info( f"Fresh database detected: marked {len(migration_files)} " f"migrations as applied (schema already current)" ) return else: logger.info("Fresh database with partial schema: applying needed migrations") # Get already-applied migrations applied = get_applied_migrations(conn) # Apply pending migrations (using smart detection for fresh databases) pending_count = 0 skipped_count = 0 for migration_name, migration_path in migration_files: if migration_name not in applied: # For fresh databases (migration_count == 0), check if migration is actually needed # Some migrations may have been included in SCHEMA_SQL if migration_count == 0 and not is_migration_needed(conn, migration_name): # Special handling for migration 002: if tables exist but indexes don't, # create just the indexes if migration_name == "002_secure_tokens_and_authorization_codes.sql": # Check if we need to create indexes indexes_to_create = [] if not index_exists(conn, 'idx_tokens_hash'): indexes_to_create.append("CREATE INDEX idx_tokens_hash ON tokens(token_hash)") if not index_exists(conn, 'idx_tokens_me'): indexes_to_create.append("CREATE INDEX idx_tokens_me ON tokens(me)") if not index_exists(conn, 'idx_tokens_expires'): indexes_to_create.append("CREATE INDEX idx_tokens_expires ON tokens(expires_at)") if not index_exists(conn, 'idx_auth_codes_hash'): indexes_to_create.append("CREATE INDEX idx_auth_codes_hash ON authorization_codes(code_hash)") if not index_exists(conn, 'idx_auth_codes_expires'): indexes_to_create.append("CREATE INDEX idx_auth_codes_expires ON authorization_codes(expires_at)") if indexes_to_create: try: for index_sql in indexes_to_create: conn.execute(index_sql) conn.commit() if logger: logger.info(f"Created {len(indexes_to_create)} missing indexes from migration 002") except Exception as e: conn.rollback() error_msg = f"Failed to create indexes for migration 002: {e}" if logger: logger.error(error_msg) raise MigrationError(error_msg) # Mark as applied without executing full migration (SCHEMA_SQL already has table changes) conn.execute( "INSERT INTO schema_migrations (migration_name) VALUES (?)", (migration_name,) ) conn.commit() skipped_count += 1 if logger: logger.debug(f"Skipped migration {migration_name} (already in SCHEMA_SQL)") else: apply_migration(conn, migration_name, migration_path, logger) pending_count += 1 # Summary total_count = len(migration_files) if pending_count > 0 or skipped_count > 0: if skipped_count > 0: logger.info( f"Migrations complete: {pending_count} applied, {skipped_count} skipped " f"(already in SCHEMA_SQL), {total_count} total" ) else: logger.info( f"Migrations complete: {pending_count} applied, " f"{total_count} total" ) else: logger.info(f"All migrations up to date ({total_count} total)") except MigrationError: # Re-raise migration errors (already logged) raise except Exception as e: error_msg = f"Migration system error: {e}" logger.error(error_msg) raise MigrationError(error_msg) finally: conn.close()