Files
Gondulf/tests/integration/api/test_authorization_verification.py
Phil Skentelbery 9135edfe84 fix(auth): require email authentication every login
CRITICAL SECURITY FIX:
- Email code required EVERY login (authentication, not verification)
- DNS TXT check cached separately (domain verification)
- New auth_sessions table for per-login state
- Codes hashed with SHA-256, constant-time comparison
- Max 3 attempts, 10-minute session expiry
- OAuth params stored server-side (security improvement)

New files:
- services/auth_session.py
- migrations 004, 005
- ADR-010: domain verification vs user authentication

312 tests passing, 86.21% coverage

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 15:16:26 -07:00

597 lines
24 KiB
Python

"""
Integration tests for authorization endpoint domain verification.
Tests the authentication flow that requires email verification on EVERY login.
See ADR-010 for the architectural decision.
Key principle: Email code is AUTHENTICATION (every login), not domain verification.
"""
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, Mock, patch
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def valid_auth_params():
"""Valid authorization request parameters."""
return {
"client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback",
"response_type": "code",
"state": "test123",
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
"code_challenge_method": "S256",
"me": "https://user.example.com",
}
def create_mock_dns_service(verify_success=True):
"""Create a mock DNS service."""
mock_service = Mock()
mock_service.verify_txt_record.return_value = verify_success
return mock_service
def create_mock_email_service():
"""Create a mock email service."""
mock_service = Mock()
mock_service.send_verification_code = Mock()
return mock_service
def create_mock_html_fetcher(email="test@example.com"):
"""Create a mock HTML fetcher that returns a page with rel=me email."""
mock_fetcher = Mock()
if email:
html = f'''
<html>
<body>
<a href="mailto:{email}" rel="me">Email</a>
</body>
</html>
'''
else:
html = '<html><body></body></html>'
mock_fetcher.fetch.return_value = html
return mock_fetcher
def create_mock_relme_parser():
"""Create a real RelMeParser (it's simple enough)."""
from gondulf.services.relme_parser import RelMeParser
return RelMeParser()
def create_mock_happ_parser():
"""Create a mock h-app parser."""
from gondulf.services.happ_parser import ClientMetadata
mock_parser = Mock()
mock_parser.fetch_and_parse = AsyncMock(return_value=ClientMetadata(
name="Test Application",
url="https://app.example.com",
logo="https://app.example.com/logo.png"
))
return mock_parser
def create_mock_auth_session_service(session_id="test_session_123", code="123456", verified=False):
"""Create a mock auth session service."""
from gondulf.services.auth_session import (
AuthSessionService,
CodeVerificationError,
SessionNotFoundError,
)
mock_service = Mock(spec=AuthSessionService)
mock_service.create_session.return_value = {
"session_id": session_id,
"verification_code": code,
"expires_at": datetime.utcnow() + timedelta(minutes=10)
}
mock_service.get_session.return_value = {
"session_id": session_id,
"me": "https://user.example.com",
"email": "test@example.com",
"code_verified": verified,
"client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback",
"state": "test123",
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
"code_challenge_method": "S256",
"scope": "",
"response_type": "code"
}
mock_service.verify_code.return_value = {
"session_id": session_id,
"me": "https://user.example.com",
"email": "test@example.com",
"code_verified": True,
"client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback",
"state": "test123",
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
"code_challenge_method": "S256",
"scope": "",
"response_type": "code"
}
mock_service.is_session_verified.return_value = verified
mock_service.delete_session = Mock()
return mock_service
@pytest.fixture
def configured_app(monkeypatch, tmp_path):
"""Create a fully configured app with fresh database."""
db_path = tmp_path / "test.db"
monkeypatch.setenv("GONDULF_SECRET_KEY", "a" * 32)
monkeypatch.setenv("GONDULF_BASE_URL", "https://auth.example.com")
monkeypatch.setenv("GONDULF_DATABASE_URL", f"sqlite:///{db_path}")
monkeypatch.setenv("GONDULF_DEBUG", "true")
from gondulf.main import app
return app, db_path
class TestUnverifiedDomainTriggersVerification:
"""Tests that any login triggers authentication (email code)."""
def test_unverified_domain_shows_verification_form(
self, configured_app, valid_auth_params
):
"""Test that DNS-verified domain STILL shows verification form (email auth required)."""
app, db_path = configured_app
from gondulf.dependencies import (
get_dns_service, get_email_service, get_html_fetcher,
get_relme_parser, get_happ_parser, get_auth_session_service, get_database
)
from gondulf.database.connection import Database
from sqlalchemy import text
# Setup database with DNS-verified domain
db = Database(f"sqlite:///{db_path}")
db.initialize()
now = datetime.utcnow()
with db.get_engine().begin() as conn:
conn.execute(
text("""
INSERT OR REPLACE INTO domains
(domain, email, verification_code, verified, verified_at, last_checked, two_factor)
VALUES (:domain, '', '', 1, :now, :now, 0)
"""),
{"domain": "user.example.com", "now": now}
)
app.dependency_overrides[get_database] = lambda: db
app.dependency_overrides[get_dns_service] = lambda: create_mock_dns_service(True)
app.dependency_overrides[get_email_service] = lambda: create_mock_email_service()
app.dependency_overrides[get_html_fetcher] = lambda: create_mock_html_fetcher("test@example.com")
app.dependency_overrides[get_relme_parser] = create_mock_relme_parser
app.dependency_overrides[get_happ_parser] = lambda: create_mock_happ_parser()
app.dependency_overrides[get_auth_session_service] = lambda: create_mock_auth_session_service()
try:
with TestClient(app) as client:
response = client.get("/authorize", params=valid_auth_params)
assert response.status_code == 200
# CRITICAL: Even DNS-verified domains require email verification every login
assert "Verify Your Identity" in response.text
assert "verification code" in response.text.lower()
finally:
app.dependency_overrides.clear()
def test_unverified_domain_preserves_auth_params(
self, configured_app, valid_auth_params
):
"""Test that authorization parameters are preserved in verification form."""
app, db_path = configured_app
from gondulf.dependencies import (
get_dns_service, get_email_service, get_html_fetcher,
get_relme_parser, get_happ_parser, get_auth_session_service, get_database
)
from gondulf.database.connection import Database
from sqlalchemy import text
db = Database(f"sqlite:///{db_path}")
db.initialize()
now = datetime.utcnow()
with db.get_engine().begin() as conn:
conn.execute(
text("""
INSERT OR REPLACE INTO domains
(domain, email, verification_code, verified, verified_at, last_checked, two_factor)
VALUES (:domain, '', '', 1, :now, :now, 0)
"""),
{"domain": "user.example.com", "now": now}
)
app.dependency_overrides[get_database] = lambda: db
app.dependency_overrides[get_dns_service] = lambda: create_mock_dns_service(True)
app.dependency_overrides[get_email_service] = lambda: create_mock_email_service()
app.dependency_overrides[get_html_fetcher] = lambda: create_mock_html_fetcher("test@example.com")
app.dependency_overrides[get_relme_parser] = create_mock_relme_parser
app.dependency_overrides[get_happ_parser] = lambda: create_mock_happ_parser()
app.dependency_overrides[get_auth_session_service] = lambda: create_mock_auth_session_service()
try:
with TestClient(app) as client:
response = client.get("/authorize", params=valid_auth_params)
assert response.status_code == 200
# New flow uses session_id instead of passing all params
assert 'name="session_id"' in response.text
finally:
app.dependency_overrides.clear()
class TestVerifiedDomainShowsConsent:
"""Tests that verified sessions (email code verified) show consent."""
def test_verified_domain_shows_consent_page(
self, configured_app, valid_auth_params
):
"""Test that after email verification, consent page is shown."""
app, db_path = configured_app
from gondulf.dependencies import get_happ_parser, get_auth_session_service
from gondulf.services.auth_session import CodeVerificationError
# Mock auth session that succeeds on verify
mock_session = create_mock_auth_session_service(verified=True)
app.dependency_overrides[get_auth_session_service] = lambda: mock_session
app.dependency_overrides[get_happ_parser] = lambda: create_mock_happ_parser()
try:
with TestClient(app) as client:
# Simulate verifying the code
form_data = {
"session_id": "test_session_123",
"code": "123456",
}
response = client.post("/authorize/verify-code", data=form_data)
# Should show consent page
assert response.status_code == 200
assert "Authorization Request" in response.text or "Authorize" in response.text
assert 'action="/authorize/consent"' in response.text
finally:
app.dependency_overrides.clear()
class TestVerificationCodeValidation:
"""Tests for the verification code submission endpoint."""
def test_valid_code_shows_consent(
self, configured_app, valid_auth_params
):
"""Test that valid verification code shows consent page."""
app, _ = configured_app
from gondulf.dependencies import get_happ_parser, get_auth_session_service
mock_session = create_mock_auth_session_service()
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_auth_session_service] = lambda: mock_session
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
form_data = {
"session_id": "test_session_123",
"code": "123456",
}
response = client.post("/authorize/verify-code", data=form_data)
assert response.status_code == 200
# Should show consent page after successful verification
assert "Authorization Request" in response.text or "Authorize" in response.text
finally:
app.dependency_overrides.clear()
def test_invalid_code_shows_error_with_retry(
self, configured_app, valid_auth_params
):
"""Test that invalid code shows error and allows retry."""
app, _ = configured_app
from gondulf.dependencies import get_happ_parser, get_auth_session_service
from gondulf.services.auth_session import CodeVerificationError
mock_session = create_mock_auth_session_service()
mock_session.verify_code.side_effect = CodeVerificationError("Invalid code")
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_auth_session_service] = lambda: mock_session
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
form_data = {
"session_id": "test_session_123",
"code": "000000",
}
response = client.post("/authorize/verify-code", data=form_data)
assert response.status_code == 200
# Should show verify_code page with error
assert "Invalid verification code" in response.text or "invalid" in response.text.lower()
# Should still have the form for retry
assert 'name="code"' in response.text
finally:
app.dependency_overrides.clear()
class TestEmailFailureHandling:
"""Tests for email discovery failure scenarios."""
def test_email_discovery_failure_shows_instructions(
self, configured_app, valid_auth_params
):
"""Test that email discovery failure shows helpful instructions."""
app, db_path = configured_app
from gondulf.dependencies import (
get_dns_service, get_email_service, get_html_fetcher,
get_relme_parser, get_happ_parser, get_auth_session_service, get_database
)
from gondulf.database.connection import Database
from sqlalchemy import text
db = Database(f"sqlite:///{db_path}")
db.initialize()
now = datetime.utcnow()
with db.get_engine().begin() as conn:
conn.execute(
text("""
INSERT OR REPLACE INTO domains
(domain, email, verification_code, verified, verified_at, last_checked, two_factor)
VALUES (:domain, '', '', 1, :now, :now, 0)
"""),
{"domain": "user.example.com", "now": now}
)
app.dependency_overrides[get_database] = lambda: db
app.dependency_overrides[get_dns_service] = lambda: create_mock_dns_service(True)
app.dependency_overrides[get_email_service] = lambda: create_mock_email_service()
# HTML fetcher returns page with no email
app.dependency_overrides[get_html_fetcher] = lambda: create_mock_html_fetcher(email=None)
app.dependency_overrides[get_relme_parser] = create_mock_relme_parser
app.dependency_overrides[get_happ_parser] = lambda: create_mock_happ_parser()
app.dependency_overrides[get_auth_session_service] = lambda: create_mock_auth_session_service()
try:
with TestClient(app) as client:
response = client.get("/authorize", params=valid_auth_params)
assert response.status_code == 200
# Should show error page with email instructions
assert "email" in response.text.lower()
finally:
app.dependency_overrides.clear()
class TestFullVerificationFlow:
"""Integration tests for the complete verification flow."""
def test_full_flow_new_domain(
self, configured_app, valid_auth_params
):
"""Test complete flow: authorize -> verify code -> consent."""
app, db_path = configured_app
from gondulf.dependencies import (
get_dns_service, get_email_service, get_html_fetcher,
get_relme_parser, get_happ_parser, get_auth_session_service, get_database
)
from gondulf.database.connection import Database
from sqlalchemy import text
db = Database(f"sqlite:///{db_path}")
db.initialize()
now = datetime.utcnow()
with db.get_engine().begin() as conn:
conn.execute(
text("""
INSERT OR REPLACE INTO domains
(domain, email, verification_code, verified, verified_at, last_checked, two_factor)
VALUES (:domain, '', '', 1, :now, :now, 0)
"""),
{"domain": "user.example.com", "now": now}
)
mock_session = create_mock_auth_session_service()
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_database] = lambda: db
app.dependency_overrides[get_dns_service] = lambda: create_mock_dns_service(True)
app.dependency_overrides[get_email_service] = lambda: create_mock_email_service()
app.dependency_overrides[get_html_fetcher] = lambda: create_mock_html_fetcher("test@example.com")
app.dependency_overrides[get_relme_parser] = create_mock_relme_parser
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
app.dependency_overrides[get_auth_session_service] = lambda: mock_session
try:
with TestClient(app) as client:
# Step 1: GET /authorize -> should show verification form (always!)
response1 = client.get("/authorize", params=valid_auth_params)
assert response1.status_code == 200
assert "Verify Your Identity" in response1.text
# Step 2: POST /authorize/verify-code -> should show consent
form_data = {
"session_id": "test_session_123",
"code": "123456",
}
response2 = client.post("/authorize/verify-code", data=form_data)
assert response2.status_code == 200
# Should show consent page
assert "Authorization Request" in response2.text or "Authorize" in response2.text
finally:
app.dependency_overrides.clear()
def test_verification_code_retry_with_correct_code(
self, configured_app, valid_auth_params
):
"""Test that user can retry with correct code after failure."""
app, _ = configured_app
from gondulf.dependencies import get_happ_parser, get_auth_session_service
from gondulf.services.auth_session import CodeVerificationError
mock_session = create_mock_auth_session_service()
# First verify_code call fails, second succeeds
mock_session.verify_code.side_effect = [
CodeVerificationError("Invalid code"),
{
"session_id": "test_session_123",
"me": "https://user.example.com",
"email": "test@example.com",
"code_verified": True,
"client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback",
"state": "test123",
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
"code_challenge_method": "S256",
"scope": "",
"response_type": "code"
}
]
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_auth_session_service] = lambda: mock_session
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
form_data = {
"session_id": "test_session_123",
"code": "000000", # Wrong code
}
# First attempt with wrong code
response1 = client.post("/authorize/verify-code", data=form_data)
assert response1.status_code == 200
assert "Invalid" in response1.text or "invalid" in response1.text.lower()
# Second attempt with correct code
form_data["code"] = "123456"
response2 = client.post("/authorize/verify-code", data=form_data)
assert response2.status_code == 200
assert "Authorization Request" in response2.text or "Authorize" in response2.text
finally:
app.dependency_overrides.clear()
class TestSecurityRequirements:
"""Tests for security requirements - email auth required every login."""
def test_unverified_domain_never_sees_consent_directly(
self, configured_app, valid_auth_params
):
"""Critical: Even DNS-verified domains must authenticate via email every time."""
app, db_path = configured_app
from gondulf.dependencies import (
get_dns_service, get_email_service, get_html_fetcher,
get_relme_parser, get_happ_parser, get_auth_session_service, get_database
)
from gondulf.database.connection import Database
from sqlalchemy import text
db = Database(f"sqlite:///{db_path}")
db.initialize()
now = datetime.utcnow()
with db.get_engine().begin() as conn:
conn.execute(
text("""
INSERT OR REPLACE INTO domains
(domain, email, verification_code, verified, verified_at, last_checked, two_factor)
VALUES (:domain, '', '', 1, :now, :now, 0)
"""),
{"domain": "user.example.com", "now": now}
)
mock_session = create_mock_auth_session_service()
app.dependency_overrides[get_database] = lambda: db
app.dependency_overrides[get_dns_service] = lambda: create_mock_dns_service(True)
app.dependency_overrides[get_email_service] = lambda: create_mock_email_service()
app.dependency_overrides[get_html_fetcher] = lambda: create_mock_html_fetcher("test@example.com")
app.dependency_overrides[get_relme_parser] = create_mock_relme_parser
app.dependency_overrides[get_happ_parser] = lambda: create_mock_happ_parser()
app.dependency_overrides[get_auth_session_service] = lambda: mock_session
try:
with TestClient(app) as client:
response = client.get("/authorize", params=valid_auth_params)
# CRITICAL: The consent page should NOT be shown without email verification
assert "Authorization Request" not in response.text
# Verify code page should be shown instead
assert "Verify Your Identity" in response.text
finally:
app.dependency_overrides.clear()
def test_state_parameter_preserved_through_flow(
self, configured_app, valid_auth_params
):
"""Test that state parameter is preserved through verification flow."""
app, db_path = configured_app
from gondulf.dependencies import (
get_dns_service, get_email_service, get_html_fetcher,
get_relme_parser, get_happ_parser, get_auth_session_service, get_database
)
from gondulf.database.connection import Database
from sqlalchemy import text
db = Database(f"sqlite:///{db_path}")
db.initialize()
now = datetime.utcnow()
with db.get_engine().begin() as conn:
conn.execute(
text("""
INSERT OR REPLACE INTO domains
(domain, email, verification_code, verified, verified_at, last_checked, two_factor)
VALUES (:domain, '', '', 1, :now, :now, 0)
"""),
{"domain": "user.example.com", "now": now}
)
mock_session = create_mock_auth_session_service()
app.dependency_overrides[get_database] = lambda: db
app.dependency_overrides[get_dns_service] = lambda: create_mock_dns_service(True)
app.dependency_overrides[get_email_service] = lambda: create_mock_email_service()
app.dependency_overrides[get_html_fetcher] = lambda: create_mock_html_fetcher("test@example.com")
app.dependency_overrides[get_relme_parser] = create_mock_relme_parser
app.dependency_overrides[get_happ_parser] = lambda: create_mock_happ_parser()
app.dependency_overrides[get_auth_session_service] = lambda: mock_session
try:
unique_state = "unique_state_abc123xyz"
params = valid_auth_params.copy()
params["state"] = unique_state
with TestClient(app) as client:
response = client.get("/authorize", params=params)
assert response.status_code == 200
# State is now stored in session, so we check session_id is present
assert 'name="session_id"' in response.text
# The state should be stored in the session service
assert mock_session.create_session.called
finally:
app.dependency_overrides.clear()