Files
StarPunk/starpunk/migrations.py
Phil Skentelbery 3b41029c75 feat: Implement secure token management for Micropub
Implements token security and management as specified in ADR-029:

Database Changes (BREAKING):
- Add secure tokens table with SHA256 hashed storage
- Add authorization_codes table for IndieAuth token exchange
- Drop old insecure tokens table (invalidates existing tokens)
- Update SCHEMA_SQL to match post-migration state

Token Management (starpunk/tokens.py):
- Generate cryptographically secure tokens
- Hash tokens with SHA256 for secure storage
- Create and verify access tokens
- Create and exchange authorization codes
- PKCE support (optional but recommended)
- Scope validation (V1: only 'create' scope)
- Token expiry and revocation support

Testing:
- Comprehensive test suite for all token operations
- Test authorization code replay protection
- Test PKCE validation
- Test parameter validation
- Test token expiry

Security:
- Tokens never stored in plain text
- Authorization codes single-use with replay protection
- Optional PKCE for enhanced security
- Proper UTC datetime handling for expiry

Related:
- ADR-029: Micropub IndieAuth Integration Strategy
- Migration 002: Secure tokens and authorization codes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-24 11:52:09 -07:00

318 lines
8.7 KiB
Python

"""
Database migration runner for StarPunk
Automatically discovers and applies pending migrations on startup.
Migrations are numbered SQL files in the migrations/ directory.
Fresh Database Detection:
- If schema_migrations table is empty AND schema is current
- Marks all migrations as applied (skip execution)
- This handles databases created with current SCHEMA_SQL
Existing Database Behavior:
- Applies only pending migrations
- Migrations already in schema_migrations are skipped
"""
import sqlite3
from pathlib import Path
import logging
class MigrationError(Exception):
"""Raised when a migration fails to apply"""
pass
def create_migrations_table(conn):
"""
Create schema_migrations tracking table if it doesn't exist
Args:
conn: SQLite connection
"""
conn.execute("""
CREATE TABLE IF NOT EXISTS schema_migrations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
migration_name TEXT UNIQUE NOT NULL,
applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_schema_migrations_name
ON schema_migrations(migration_name)
""")
conn.commit()
def is_schema_current(conn):
"""
Check if database schema is current (matches SCHEMA_SQL)
Uses heuristic: Check for presence of latest schema features
Currently checks for authorization_codes table and token_hash column in tokens table
Args:
conn: SQLite connection
Returns:
bool: True if schema appears current, False if legacy
"""
try:
# Check for authorization_codes table (added in migration 002)
if not table_exists(conn, 'authorization_codes'):
return False
# Check for token_hash column in tokens table (migration 002)
if not column_exists(conn, 'tokens', 'token_hash'):
return False
return True
except sqlite3.OperationalError:
# Schema check failed - definitely not current
return False
def table_exists(conn, table_name):
"""
Check if table exists in database
Args:
conn: SQLite connection
table_name: Name of table to check
Returns:
bool: True if table exists
"""
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(table_name,)
)
return cursor.fetchone() is not None
def column_exists(conn, table_name, column_name):
"""
Check if column exists in table
Args:
conn: SQLite connection
table_name: Name of table
column_name: Name of column
Returns:
bool: True if column exists
"""
try:
cursor = conn.execute(f"PRAGMA table_info({table_name})")
columns = [row[1] for row in cursor.fetchall()]
return column_name in columns
except sqlite3.OperationalError:
return False
def index_exists(conn, index_name):
"""
Check if index exists in database
Args:
conn: SQLite connection
index_name: Name of index to check
Returns:
bool: True if index exists
"""
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='index' AND name=?",
(index_name,)
)
return cursor.fetchone() is not None
def get_applied_migrations(conn):
"""
Get set of already-applied migration names
Args:
conn: SQLite connection
Returns:
set: Set of migration filenames that have been applied
"""
cursor = conn.execute(
"SELECT migration_name FROM schema_migrations ORDER BY id"
)
return set(row[0] for row in cursor.fetchall())
def discover_migration_files(migrations_dir):
"""
Discover all migration files in migrations directory
Args:
migrations_dir: Path to migrations directory
Returns:
list: Sorted list of (filename, full_path) tuples
"""
if not migrations_dir.exists():
return []
migration_files = []
for file_path in migrations_dir.glob("*.sql"):
migration_files.append((file_path.name, file_path))
# Sort by filename (numeric prefix ensures correct order)
migration_files.sort(key=lambda x: x[0])
return migration_files
def apply_migration(conn, migration_name, migration_path, logger=None):
"""
Apply a single migration file
Args:
conn: SQLite connection
migration_name: Filename of migration
migration_path: Full path to migration file
logger: Optional logger for output
Raises:
MigrationError: If migration fails to apply
"""
try:
# Read migration SQL
migration_sql = migration_path.read_text()
if logger:
logger.debug(f"Applying migration: {migration_name}")
# Execute migration in transaction
conn.execute("BEGIN")
conn.executescript(migration_sql)
# Record migration as applied
conn.execute(
"INSERT INTO schema_migrations (migration_name) VALUES (?)",
(migration_name,)
)
conn.commit()
if logger:
logger.info(f"Applied migration: {migration_name}")
except Exception as e:
conn.rollback()
error_msg = f"Migration {migration_name} failed: {e}"
if logger:
logger.error(error_msg)
raise MigrationError(error_msg)
def run_migrations(db_path, logger=None):
"""
Run all pending database migrations
Called automatically during database initialization.
Discovers migration files, checks which have been applied,
and applies any pending migrations in order.
Fresh Database Behavior:
- If schema_migrations table is empty AND schema is current
- Marks all migrations as applied (skip execution)
- This handles databases created with current SCHEMA_SQL
Existing Database Behavior:
- Applies only pending migrations
- Migrations already in schema_migrations are skipped
Args:
db_path: Path to SQLite database file
logger: Optional logger for output
Raises:
MigrationError: If any migration fails to apply
"""
if logger is None:
logger = logging.getLogger(__name__)
# Determine migrations directory
# Assumes migrations/ is in project root, sibling to starpunk/
migrations_dir = Path(__file__).parent.parent / "migrations"
if not migrations_dir.exists():
logger.warning(f"Migrations directory not found: {migrations_dir}")
return
# Connect to database
conn = sqlite3.connect(db_path)
try:
# Ensure migrations tracking table exists
create_migrations_table(conn)
# Check if this is a fresh database with current schema
cursor = conn.execute("SELECT COUNT(*) FROM schema_migrations")
migration_count = cursor.fetchone()[0]
# Discover migration files
migration_files = discover_migration_files(migrations_dir)
if not migration_files:
logger.info("No migration files found")
return
# Fresh database detection
if migration_count == 0:
if is_schema_current(conn):
# Schema is current - mark all migrations as applied
for migration_name, _ in migration_files:
conn.execute(
"INSERT INTO schema_migrations (migration_name) VALUES (?)",
(migration_name,)
)
conn.commit()
logger.info(
f"Fresh database detected: marked {len(migration_files)} "
f"migrations as applied (schema already current)"
)
return
else:
logger.info("Legacy database detected: applying all migrations")
# Get already-applied migrations
applied = get_applied_migrations(conn)
# Apply pending migrations
pending_count = 0
for migration_name, migration_path in migration_files:
if migration_name not in applied:
apply_migration(conn, migration_name, migration_path, logger)
pending_count += 1
# Summary
total_count = len(migration_files)
if pending_count > 0:
logger.info(
f"Migrations complete: {pending_count} applied, "
f"{total_count} total"
)
else:
logger.info(f"All migrations up to date ({total_count} total)")
except MigrationError:
# Re-raise migration errors (already logged)
raise
except Exception as e:
error_msg = f"Migration system error: {e}"
logger.error(error_msg)
raise MigrationError(error_msg)
finally:
conn.close()