fix(security): require domain verification before authorization

CRITICAL SECURITY FIX: The authorization endpoint was bypassing domain
verification entirely, allowing anyone to authenticate as any domain.

Changes:
- Add domain verification check in GET /authorize before showing consent
- Add POST /authorize/verify-code endpoint for code validation
- Add verify_code.html and verification_error.html templates
- Add check_domain_verified() and store_verified_domain() functions
- Preserve OAuth parameters through verification flow

Flow for unverified domains:
1. GET /authorize -> Check DB for verified domain
2. If not verified: start 2FA (DNS + email) -> show code entry form
3. POST /authorize/verify-code -> validate code -> store verified
4. Show consent page
5. POST /authorize/consent -> issue authorization code

Verified domains skip directly to consent page.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-11-22 12:45:59 -07:00
parent 052d3ad3e1
commit 8dddc73826
4 changed files with 825 additions and 8 deletions

View File

@@ -0,0 +1,492 @@
"""
Integration tests for authorization endpoint domain verification.
Tests the security fix that requires domain verification before showing the consent page.
"""
from datetime import datetime
from unittest.mock import AsyncMock, Mock, patch
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def valid_auth_params():
"""Valid authorization request parameters."""
return {
"client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback",
"response_type": "code",
"state": "test123",
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
"code_challenge_method": "S256",
"me": "https://user.example.com",
}
def create_mock_verification_service(start_success=True, verify_success=True, start_error="dns_verification_failed"):
"""Create a mock verification service with configurable behavior."""
mock_service = Mock()
if start_success:
mock_service.start_verification.return_value = {
"success": True,
"email": "t***@example.com",
"verification_method": "email"
}
else:
mock_service.start_verification.return_value = {
"success": False,
"error": start_error
}
if verify_success:
mock_service.verify_email_code.return_value = {
"success": True,
"email": "test@example.com"
}
else:
mock_service.verify_email_code.return_value = {
"success": False,
"error": "invalid_code"
}
mock_service.code_storage = Mock()
mock_service.code_storage.get.return_value = "test@example.com"
mock_service.create_authorization_code.return_value = "test_auth_code_12345"
return mock_service
def create_mock_happ_parser():
"""Create a mock h-app parser."""
from gondulf.services.happ_parser import ClientMetadata
mock_parser = Mock()
mock_parser.fetch_and_parse = AsyncMock(return_value=ClientMetadata(
name="Test Application",
url="https://app.example.com",
logo="https://app.example.com/logo.png"
))
return mock_parser
@pytest.fixture
def configured_app(monkeypatch, tmp_path):
"""Create a fully configured app with fresh database."""
db_path = tmp_path / "test.db"
monkeypatch.setenv("GONDULF_SECRET_KEY", "a" * 32)
monkeypatch.setenv("GONDULF_BASE_URL", "https://auth.example.com")
monkeypatch.setenv("GONDULF_DATABASE_URL", f"sqlite:///{db_path}")
monkeypatch.setenv("GONDULF_DEBUG", "true")
from gondulf.main import app
return app, db_path
class TestUnverifiedDomainTriggersVerification:
"""Tests that unverified domains trigger the verification flow."""
def test_unverified_domain_shows_verification_form(
self, configured_app, valid_auth_params
):
"""Test that an unverified domain shows the verification code form."""
app, _ = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser
mock_service = create_mock_verification_service(start_success=True)
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
response = client.get("/authorize", params=valid_auth_params)
assert response.status_code == 200
# Should show verification form, not consent form
assert "Verify Your Identity" in response.text
assert "verification code" in response.text.lower()
# Should show masked email
assert "t***@example.com" in response.text
finally:
app.dependency_overrides.clear()
def test_unverified_domain_preserves_auth_params(
self, configured_app, valid_auth_params
):
"""Test that authorization parameters are preserved in verification form."""
app, _ = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser
mock_service = create_mock_verification_service(start_success=True)
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
response = client.get("/authorize", params=valid_auth_params)
assert response.status_code == 200
# Check hidden fields contain auth params
assert 'name="client_id"' in response.text
assert 'name="redirect_uri"' in response.text
assert 'name="state"' in response.text
assert 'name="code_challenge"' in response.text
finally:
app.dependency_overrides.clear()
def test_unverified_domain_does_not_show_consent(
self, configured_app, valid_auth_params
):
"""Test that unverified domain does NOT show consent form directly."""
app, _ = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser
mock_service = create_mock_verification_service(start_success=True)
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
response = client.get("/authorize", params=valid_auth_params)
assert response.status_code == 200
# Should NOT show consent/authorization form
assert "Authorization Request" not in response.text
finally:
app.dependency_overrides.clear()
class TestVerifiedDomainShowsConsent:
"""Tests that verified domains skip verification and show consent."""
def test_verified_domain_shows_consent_page(
self, configured_app, valid_auth_params
):
"""Test that a verified domain shows consent page directly."""
app, db_path = configured_app
from gondulf.dependencies import get_happ_parser, get_database
from gondulf.database.connection import Database
from sqlalchemy import text
# Create database and insert verified domain
db = Database(f"sqlite:///{db_path}")
db.initialize()
with db.get_engine().begin() as conn:
conn.execute(
text("""
INSERT INTO domains (domain, email, verification_code, verified, verified_at, two_factor)
VALUES (:domain, :email, '', 1, :verified_at, 1)
"""),
{"domain": "user.example.com", "email": "test@example.com", "verified_at": datetime.utcnow()}
)
# Override database to use same instance
app.dependency_overrides[get_database] = lambda: db
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
response = client.get("/authorize", params=valid_auth_params)
# Should show consent page
assert response.status_code == 200
assert "Authorization Request" in response.text
assert 'action="/authorize/consent"' in response.text
finally:
app.dependency_overrides.clear()
class TestVerificationCodeValidation:
"""Tests for the verification code submission endpoint."""
def test_valid_code_shows_consent(
self, configured_app, valid_auth_params
):
"""Test that valid verification code shows consent page."""
app, _ = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser
mock_service = create_mock_verification_service(verify_success=True)
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
form_data = {
"domain": "user.example.com",
"code": "123456",
"client_id": valid_auth_params["client_id"],
"redirect_uri": valid_auth_params["redirect_uri"],
"response_type": valid_auth_params["response_type"],
"state": valid_auth_params["state"],
"code_challenge": valid_auth_params["code_challenge"],
"code_challenge_method": valid_auth_params["code_challenge_method"],
"scope": "",
"me": valid_auth_params["me"],
}
response = client.post("/authorize/verify-code", data=form_data)
assert response.status_code == 200
# Should show consent page after successful verification
assert "Authorization Request" in response.text or "Authorize" in response.text
finally:
app.dependency_overrides.clear()
def test_invalid_code_shows_error_with_retry(
self, configured_app, valid_auth_params
):
"""Test that invalid code shows error and allows retry."""
app, _ = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser
mock_service = create_mock_verification_service(verify_success=False)
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
form_data = {
"domain": "user.example.com",
"code": "000000",
"client_id": valid_auth_params["client_id"],
"redirect_uri": valid_auth_params["redirect_uri"],
"response_type": valid_auth_params["response_type"],
"state": valid_auth_params["state"],
"code_challenge": valid_auth_params["code_challenge"],
"code_challenge_method": valid_auth_params["code_challenge_method"],
"scope": "",
"me": valid_auth_params["me"],
}
response = client.post("/authorize/verify-code", data=form_data)
assert response.status_code == 200
# Should show verify_code page with error
assert "Invalid verification code" in response.text or "invalid" in response.text.lower()
# Should still have the form for retry
assert 'name="code"' in response.text
finally:
app.dependency_overrides.clear()
class TestDNSFailureHandling:
"""Tests for DNS verification failure scenarios."""
def test_dns_failure_shows_instructions(
self, configured_app, valid_auth_params
):
"""Test that DNS verification failure shows helpful instructions."""
app, _ = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser
mock_service = create_mock_verification_service(start_success=False, start_error="dns_verification_failed")
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
response = client.get("/authorize", params=valid_auth_params)
assert response.status_code == 200
# Should show error page with DNS instructions
assert "DNS" in response.text or "dns" in response.text.lower()
assert "TXT" in response.text
assert "_gondulf" in response.text
finally:
app.dependency_overrides.clear()
class TestEmailFailureHandling:
"""Tests for email discovery failure scenarios."""
def test_email_discovery_failure_shows_instructions(
self, configured_app, valid_auth_params
):
"""Test that email discovery failure shows helpful instructions."""
app, _ = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser
mock_service = create_mock_verification_service(start_success=False, start_error="email_discovery_failed")
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
response = client.get("/authorize", params=valid_auth_params)
assert response.status_code == 200
# Should show error page with email instructions
assert "email" in response.text.lower()
finally:
app.dependency_overrides.clear()
class TestFullVerificationFlow:
"""Integration tests for the complete verification flow."""
def test_full_flow_new_domain(
self, configured_app, valid_auth_params
):
"""Test complete flow: unverified domain -> verify code -> consent."""
app, _ = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser
mock_service = create_mock_verification_service(start_success=True, verify_success=True)
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
# Step 1: GET /authorize -> should show verification form
response1 = client.get("/authorize", params=valid_auth_params)
assert response1.status_code == 200
assert "Verify Your Identity" in response1.text
# Step 2: POST /authorize/verify-code -> should show consent
form_data = {
"domain": "user.example.com",
"code": "123456",
"client_id": valid_auth_params["client_id"],
"redirect_uri": valid_auth_params["redirect_uri"],
"response_type": valid_auth_params["response_type"],
"state": valid_auth_params["state"],
"code_challenge": valid_auth_params["code_challenge"],
"code_challenge_method": valid_auth_params["code_challenge_method"],
"scope": "",
"me": valid_auth_params["me"],
}
response2 = client.post("/authorize/verify-code", data=form_data)
assert response2.status_code == 200
# Should show consent page
assert "Authorization Request" in response2.text or "Authorize" in response2.text
finally:
app.dependency_overrides.clear()
def test_verification_code_retry_with_correct_code(
self, configured_app, valid_auth_params
):
"""Test that user can retry with correct code after failure."""
app, _ = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser
mock_service = Mock()
# First verify_email_code call fails, second succeeds
mock_service.verify_email_code.side_effect = [
{"success": False, "error": "invalid_code"},
{"success": True, "email": "test@example.com"}
]
mock_service.code_storage = Mock()
mock_service.code_storage.get.return_value = "test@example.com"
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
form_data = {
"domain": "user.example.com",
"code": "000000", # Wrong code
"client_id": valid_auth_params["client_id"],
"redirect_uri": valid_auth_params["redirect_uri"],
"response_type": valid_auth_params["response_type"],
"state": valid_auth_params["state"],
"code_challenge": valid_auth_params["code_challenge"],
"code_challenge_method": valid_auth_params["code_challenge_method"],
"scope": "",
"me": valid_auth_params["me"],
}
# First attempt with wrong code
response1 = client.post("/authorize/verify-code", data=form_data)
assert response1.status_code == 200
assert "Invalid" in response1.text or "invalid" in response1.text.lower()
# Second attempt with correct code
form_data["code"] = "123456"
response2 = client.post("/authorize/verify-code", data=form_data)
assert response2.status_code == 200
assert "Authorization Request" in response2.text or "Authorize" in response2.text
finally:
app.dependency_overrides.clear()
class TestSecurityRequirements:
"""Tests for security requirements of the fix."""
def test_unverified_domain_never_sees_consent_directly(
self, configured_app, valid_auth_params
):
"""Critical: Unverified domains must NEVER see consent page directly."""
app, _ = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser
mock_service = create_mock_verification_service(start_success=True)
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
response = client.get("/authorize", params=valid_auth_params)
# The consent page should NOT be shown
assert "Authorization Request" not in response.text
# Verify code page should be shown instead
assert "Verify Your Identity" in response.text
finally:
app.dependency_overrides.clear()
def test_state_parameter_preserved_through_flow(
self, configured_app, valid_auth_params
):
"""Test that state parameter is preserved through verification flow."""
app, _ = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser
mock_service = create_mock_verification_service(start_success=True)
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
unique_state = "unique_state_abc123xyz"
params = valid_auth_params.copy()
params["state"] = unique_state
with TestClient(app) as client:
response = client.get("/authorize", params=params)
assert response.status_code == 200
# State should be in hidden form field
assert f'value="{unique_state}"' in response.text or f"value='{unique_state}'" in response.text
finally:
app.dependency_overrides.clear()