CRITICAL SECURITY FIX: - Email code required EVERY login (authentication, not verification) - DNS TXT check cached separately (domain verification) - New auth_sessions table for per-login state - Codes hashed with SHA-256, constant-time comparison - Max 3 attempts, 10-minute session expiry - OAuth params stored server-side (security improvement) New files: - services/auth_session.py - migrations 004, 005 - ADR-010: domain verification vs user authentication 312 tests passing, 86.21% coverage 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
284 lines
9.7 KiB
Python
284 lines
9.7 KiB
Python
"""
|
|
Unit tests for database connection and migrations.
|
|
|
|
Tests database initialization, migration running, and health checks.
|
|
"""
|
|
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
from sqlalchemy import text
|
|
|
|
from gondulf.database.connection import Database, DatabaseError
|
|
|
|
|
|
class TestDatabaseInit:
|
|
"""Tests for Database initialization."""
|
|
|
|
def test_init_with_valid_url(self):
|
|
"""Test Database can be initialized with valid URL."""
|
|
db = Database("sqlite:///:memory:")
|
|
assert db.database_url == "sqlite:///:memory:"
|
|
|
|
def test_init_with_file_url(self):
|
|
"""Test Database can be initialized with file URL."""
|
|
db = Database("sqlite:///./test.db")
|
|
assert db.database_url == "sqlite:///./test.db"
|
|
|
|
|
|
class TestDatabaseDirectory:
|
|
"""Tests for database directory creation."""
|
|
|
|
def test_ensure_directory_creates_parent(self):
|
|
"""Test ensure_database_directory creates parent directories."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "subdir" / "nested" / "test.db"
|
|
db_url = f"sqlite:///{db_path}"
|
|
|
|
db = Database(db_url)
|
|
db.ensure_database_directory()
|
|
|
|
assert db_path.parent.exists()
|
|
|
|
def test_ensure_directory_relative_path(self):
|
|
"""Test ensure_database_directory works with relative paths."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Change to temp dir temporarily to test relative paths
|
|
import os
|
|
|
|
original_cwd = os.getcwd()
|
|
try:
|
|
os.chdir(tmpdir)
|
|
|
|
db = Database("sqlite:///./data/test.db")
|
|
db.ensure_database_directory()
|
|
|
|
assert Path("data").exists()
|
|
finally:
|
|
os.chdir(original_cwd)
|
|
|
|
def test_ensure_directory_does_not_fail_if_exists(self):
|
|
"""Test ensure_database_directory doesn't fail if directory exists."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.db"
|
|
db_url = f"sqlite:///{db_path}"
|
|
|
|
db = Database(db_url)
|
|
db.ensure_database_directory()
|
|
# Call again - should not raise
|
|
db.ensure_database_directory()
|
|
|
|
|
|
class TestDatabaseEngine:
|
|
"""Tests for database engine creation."""
|
|
|
|
def test_get_engine_creates_engine(self):
|
|
"""Test get_engine creates SQLAlchemy engine."""
|
|
db = Database("sqlite:///:memory:")
|
|
engine = db.get_engine()
|
|
|
|
assert engine is not None
|
|
assert engine.url.drivername == "sqlite"
|
|
|
|
def test_get_engine_returns_same_instance(self):
|
|
"""Test get_engine returns same engine instance."""
|
|
db = Database("sqlite:///:memory:")
|
|
engine1 = db.get_engine()
|
|
engine2 = db.get_engine()
|
|
|
|
assert engine1 is engine2
|
|
|
|
def test_get_engine_with_invalid_url_raises_error(self):
|
|
"""Test get_engine raises DatabaseError with invalid URL."""
|
|
db = Database("invalid://bad_url")
|
|
|
|
with pytest.raises(DatabaseError, match="Failed to create database engine"):
|
|
db.get_engine()
|
|
|
|
|
|
class TestDatabaseHealth:
|
|
"""Tests for database health checks."""
|
|
|
|
def test_check_health_success(self):
|
|
"""Test health check passes for healthy database."""
|
|
db = Database("sqlite:///:memory:")
|
|
db.get_engine() # Initialize engine
|
|
|
|
assert db.check_health() is True
|
|
|
|
def test_check_health_failure(self):
|
|
"""Test health check fails for inaccessible database."""
|
|
db = Database("sqlite:////nonexistent/path/db.db")
|
|
|
|
# Trying to check health on non-existent DB should fail gracefully
|
|
assert db.check_health() is False
|
|
|
|
|
|
class TestDatabaseMigrations:
|
|
"""Tests for database migrations."""
|
|
|
|
def test_get_applied_migrations_empty(self):
|
|
"""Test get_applied_migrations returns empty set for new database."""
|
|
db = Database("sqlite:///:memory:")
|
|
db.get_engine() # Initialize engine
|
|
|
|
migrations = db.get_applied_migrations()
|
|
|
|
assert migrations == set()
|
|
|
|
def test_get_applied_migrations_after_running(self):
|
|
"""Test get_applied_migrations returns versions after running migrations."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.db"
|
|
db = Database(f"sqlite:///{db_path}")
|
|
|
|
# Initialize will run migrations
|
|
db.initialize()
|
|
|
|
migrations = db.get_applied_migrations()
|
|
|
|
# Both migrations should be applied
|
|
assert 1 in migrations
|
|
assert 2 in migrations
|
|
|
|
def test_run_migrations_creates_tables(self):
|
|
"""Test run_migrations creates expected tables."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.db"
|
|
db = Database(f"sqlite:///{db_path}")
|
|
|
|
db.ensure_database_directory()
|
|
db.run_migrations()
|
|
|
|
# Check that tables were created
|
|
engine = db.get_engine()
|
|
with engine.connect() as conn:
|
|
# Check migrations table
|
|
result = conn.execute(text("SELECT name FROM sqlite_master WHERE type='table'"))
|
|
tables = {row[0] for row in result}
|
|
|
|
assert "migrations" in tables
|
|
assert "authorization_codes" in tables
|
|
assert "domains" in tables
|
|
|
|
def test_run_migrations_idempotent(self):
|
|
"""Test run_migrations can be run multiple times safely."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.db"
|
|
db = Database(f"sqlite:///{db_path}")
|
|
|
|
db.ensure_database_directory()
|
|
db.run_migrations()
|
|
# Run again - should not raise or duplicate
|
|
db.run_migrations()
|
|
|
|
engine = db.get_engine()
|
|
with engine.connect() as conn:
|
|
# Check migrations were recorded correctly (001-005)
|
|
result = conn.execute(text("SELECT COUNT(*) FROM migrations"))
|
|
count = result.fetchone()[0]
|
|
assert count == 5
|
|
|
|
# Verify all migrations are present
|
|
result = conn.execute(text("SELECT version FROM migrations ORDER BY version"))
|
|
versions = [row[0] for row in result]
|
|
assert versions == [1, 2, 3, 4, 5]
|
|
|
|
def test_initialize_full_setup(self):
|
|
"""Test initialize performs full database setup."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.db"
|
|
db = Database(f"sqlite:///{db_path}")
|
|
|
|
db.initialize()
|
|
|
|
# Verify database is healthy
|
|
assert db.check_health() is True
|
|
|
|
# Verify all migrations ran
|
|
migrations = db.get_applied_migrations()
|
|
assert 1 in migrations
|
|
assert 2 in migrations
|
|
|
|
# Verify tables exist
|
|
engine = db.get_engine()
|
|
with engine.connect() as conn:
|
|
result = conn.execute(text("SELECT name FROM sqlite_master WHERE type='table'"))
|
|
tables = {row[0] for row in result}
|
|
|
|
assert "migrations" in tables
|
|
assert "authorization_codes" in tables
|
|
assert "domains" in tables
|
|
|
|
|
|
class TestMigrationSchemaCorrectness:
|
|
"""Tests for correctness of migration schema."""
|
|
|
|
def test_authorization_codes_schema(self):
|
|
"""Test authorization_codes table has correct columns."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.db"
|
|
db = Database(f"sqlite:///{db_path}")
|
|
db.initialize()
|
|
|
|
engine = db.get_engine()
|
|
with engine.connect() as conn:
|
|
result = conn.execute(text("PRAGMA table_info(authorization_codes)"))
|
|
columns = {row[1] for row in result} # row[1] is column name
|
|
|
|
expected_columns = {
|
|
"code",
|
|
"client_id",
|
|
"redirect_uri",
|
|
"state",
|
|
"code_challenge",
|
|
"code_challenge_method",
|
|
"scope",
|
|
"me",
|
|
"created_at",
|
|
}
|
|
|
|
assert columns == expected_columns
|
|
|
|
def test_domains_schema(self):
|
|
"""Test domains table has correct columns."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.db"
|
|
db = Database(f"sqlite:///{db_path}")
|
|
db.initialize()
|
|
|
|
engine = db.get_engine()
|
|
with engine.connect() as conn:
|
|
result = conn.execute(text("PRAGMA table_info(domains)"))
|
|
columns = {row[1] for row in result}
|
|
|
|
expected_columns = {
|
|
"domain",
|
|
"email",
|
|
"verification_code",
|
|
"verified",
|
|
"created_at",
|
|
"verified_at",
|
|
"two_factor",
|
|
"last_checked", # Added in migration 005
|
|
}
|
|
|
|
assert columns == expected_columns
|
|
|
|
def test_migrations_schema(self):
|
|
"""Test migrations table has correct columns."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.db"
|
|
db = Database(f"sqlite:///{db_path}")
|
|
db.initialize()
|
|
|
|
engine = db.get_engine()
|
|
with engine.connect() as conn:
|
|
result = conn.execute(text("PRAGMA table_info(migrations)"))
|
|
columns = {row[1] for row in result}
|
|
|
|
expected_columns = {"version", "description", "applied_at"}
|
|
|
|
assert columns == expected_columns
|