Clear pre-existing verified domains at the start of each test that expects an unverified domain to ensure proper test isolation. This prevents test failures caused by verified domains persisting from earlier tests in the test session. Fixes: - test_dns_failure_shows_instructions - test_email_discovery_failure_shows_instructions - test_full_flow_new_domain - test_unverified_domain_never_sees_consent_directly 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
533 lines
21 KiB
Python
533 lines
21 KiB
Python
"""
|
|
Integration tests for authorization endpoint domain verification.
|
|
|
|
Tests the security fix that requires domain verification before showing the consent page.
|
|
"""
|
|
|
|
from datetime import datetime
|
|
from unittest.mock import AsyncMock, Mock, patch
|
|
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
|
|
|
|
@pytest.fixture
|
|
def valid_auth_params():
|
|
"""Valid authorization request parameters."""
|
|
return {
|
|
"client_id": "https://app.example.com",
|
|
"redirect_uri": "https://app.example.com/callback",
|
|
"response_type": "code",
|
|
"state": "test123",
|
|
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
|
|
"code_challenge_method": "S256",
|
|
"me": "https://user.example.com",
|
|
}
|
|
|
|
|
|
def create_mock_verification_service(start_success=True, verify_success=True, start_error="dns_verification_failed"):
|
|
"""Create a mock verification service with configurable behavior."""
|
|
mock_service = Mock()
|
|
|
|
if start_success:
|
|
mock_service.start_verification.return_value = {
|
|
"success": True,
|
|
"email": "t***@example.com",
|
|
"verification_method": "email"
|
|
}
|
|
else:
|
|
mock_service.start_verification.return_value = {
|
|
"success": False,
|
|
"error": start_error
|
|
}
|
|
|
|
if verify_success:
|
|
mock_service.verify_email_code.return_value = {
|
|
"success": True,
|
|
"email": "test@example.com"
|
|
}
|
|
else:
|
|
mock_service.verify_email_code.return_value = {
|
|
"success": False,
|
|
"error": "invalid_code"
|
|
}
|
|
|
|
mock_service.code_storage = Mock()
|
|
mock_service.code_storage.get.return_value = "test@example.com"
|
|
mock_service.create_authorization_code.return_value = "test_auth_code_12345"
|
|
|
|
return mock_service
|
|
|
|
|
|
def create_mock_happ_parser():
|
|
"""Create a mock h-app parser."""
|
|
from gondulf.services.happ_parser import ClientMetadata
|
|
|
|
mock_parser = Mock()
|
|
mock_parser.fetch_and_parse = AsyncMock(return_value=ClientMetadata(
|
|
name="Test Application",
|
|
url="https://app.example.com",
|
|
logo="https://app.example.com/logo.png"
|
|
))
|
|
return mock_parser
|
|
|
|
|
|
@pytest.fixture
|
|
def configured_app(monkeypatch, tmp_path):
|
|
"""Create a fully configured app with fresh database."""
|
|
db_path = tmp_path / "test.db"
|
|
|
|
monkeypatch.setenv("GONDULF_SECRET_KEY", "a" * 32)
|
|
monkeypatch.setenv("GONDULF_BASE_URL", "https://auth.example.com")
|
|
monkeypatch.setenv("GONDULF_DATABASE_URL", f"sqlite:///{db_path}")
|
|
monkeypatch.setenv("GONDULF_DEBUG", "true")
|
|
|
|
from gondulf.main import app
|
|
return app, db_path
|
|
|
|
|
|
class TestUnverifiedDomainTriggersVerification:
|
|
"""Tests that unverified domains trigger the verification flow."""
|
|
|
|
def test_unverified_domain_shows_verification_form(
|
|
self, configured_app, valid_auth_params
|
|
):
|
|
"""Test that an unverified domain shows the verification code form."""
|
|
app, _ = configured_app
|
|
from gondulf.dependencies import get_verification_service, get_happ_parser
|
|
|
|
mock_service = create_mock_verification_service(start_success=True)
|
|
mock_parser = create_mock_happ_parser()
|
|
|
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
|
|
|
try:
|
|
with TestClient(app) as client:
|
|
response = client.get("/authorize", params=valid_auth_params)
|
|
|
|
assert response.status_code == 200
|
|
# Should show verification form, not consent form
|
|
assert "Verify Your Identity" in response.text
|
|
assert "verification code" in response.text.lower()
|
|
# Should show masked email
|
|
assert "t***@example.com" in response.text
|
|
finally:
|
|
app.dependency_overrides.clear()
|
|
|
|
def test_unverified_domain_preserves_auth_params(
|
|
self, configured_app, valid_auth_params
|
|
):
|
|
"""Test that authorization parameters are preserved in verification form."""
|
|
app, _ = configured_app
|
|
from gondulf.dependencies import get_verification_service, get_happ_parser
|
|
|
|
mock_service = create_mock_verification_service(start_success=True)
|
|
mock_parser = create_mock_happ_parser()
|
|
|
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
|
|
|
try:
|
|
with TestClient(app) as client:
|
|
response = client.get("/authorize", params=valid_auth_params)
|
|
|
|
assert response.status_code == 200
|
|
# Check hidden fields contain auth params
|
|
assert 'name="client_id"' in response.text
|
|
assert 'name="redirect_uri"' in response.text
|
|
assert 'name="state"' in response.text
|
|
assert 'name="code_challenge"' in response.text
|
|
finally:
|
|
app.dependency_overrides.clear()
|
|
|
|
def test_unverified_domain_does_not_show_consent(
|
|
self, configured_app, valid_auth_params
|
|
):
|
|
"""Test that unverified domain does NOT show consent form directly."""
|
|
app, _ = configured_app
|
|
from gondulf.dependencies import get_verification_service, get_happ_parser
|
|
|
|
mock_service = create_mock_verification_service(start_success=True)
|
|
mock_parser = create_mock_happ_parser()
|
|
|
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
|
|
|
try:
|
|
with TestClient(app) as client:
|
|
response = client.get("/authorize", params=valid_auth_params)
|
|
|
|
assert response.status_code == 200
|
|
# Should NOT show consent/authorization form
|
|
assert "Authorization Request" not in response.text
|
|
finally:
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
class TestVerifiedDomainShowsConsent:
|
|
"""Tests that verified domains skip verification and show consent."""
|
|
|
|
def test_verified_domain_shows_consent_page(
|
|
self, configured_app, valid_auth_params
|
|
):
|
|
"""Test that a verified domain shows consent page directly."""
|
|
app, db_path = configured_app
|
|
from gondulf.dependencies import get_happ_parser, get_database
|
|
from gondulf.database.connection import Database
|
|
from sqlalchemy import text
|
|
|
|
# Create database and insert verified domain
|
|
db = Database(f"sqlite:///{db_path}")
|
|
db.initialize()
|
|
|
|
with db.get_engine().begin() as conn:
|
|
conn.execute(
|
|
text("""
|
|
INSERT INTO domains (domain, email, verification_code, verified, verified_at, two_factor)
|
|
VALUES (:domain, :email, '', 1, :verified_at, 1)
|
|
"""),
|
|
{"domain": "user.example.com", "email": "test@example.com", "verified_at": datetime.utcnow()}
|
|
)
|
|
|
|
# Override database to use same instance
|
|
app.dependency_overrides[get_database] = lambda: db
|
|
mock_parser = create_mock_happ_parser()
|
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
|
|
|
try:
|
|
with TestClient(app) as client:
|
|
response = client.get("/authorize", params=valid_auth_params)
|
|
|
|
# Should show consent page
|
|
assert response.status_code == 200
|
|
assert "Authorization Request" in response.text
|
|
assert 'action="/authorize/consent"' in response.text
|
|
finally:
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
class TestVerificationCodeValidation:
|
|
"""Tests for the verification code submission endpoint."""
|
|
|
|
def test_valid_code_shows_consent(
|
|
self, configured_app, valid_auth_params
|
|
):
|
|
"""Test that valid verification code shows consent page."""
|
|
app, _ = configured_app
|
|
from gondulf.dependencies import get_verification_service, get_happ_parser
|
|
|
|
mock_service = create_mock_verification_service(verify_success=True)
|
|
mock_parser = create_mock_happ_parser()
|
|
|
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
|
|
|
try:
|
|
with TestClient(app) as client:
|
|
form_data = {
|
|
"domain": "user.example.com",
|
|
"code": "123456",
|
|
"client_id": valid_auth_params["client_id"],
|
|
"redirect_uri": valid_auth_params["redirect_uri"],
|
|
"response_type": valid_auth_params["response_type"],
|
|
"state": valid_auth_params["state"],
|
|
"code_challenge": valid_auth_params["code_challenge"],
|
|
"code_challenge_method": valid_auth_params["code_challenge_method"],
|
|
"scope": "",
|
|
"me": valid_auth_params["me"],
|
|
}
|
|
|
|
response = client.post("/authorize/verify-code", data=form_data)
|
|
|
|
assert response.status_code == 200
|
|
# Should show consent page after successful verification
|
|
assert "Authorization Request" in response.text or "Authorize" in response.text
|
|
finally:
|
|
app.dependency_overrides.clear()
|
|
|
|
def test_invalid_code_shows_error_with_retry(
|
|
self, configured_app, valid_auth_params
|
|
):
|
|
"""Test that invalid code shows error and allows retry."""
|
|
app, _ = configured_app
|
|
from gondulf.dependencies import get_verification_service, get_happ_parser
|
|
|
|
mock_service = create_mock_verification_service(verify_success=False)
|
|
mock_parser = create_mock_happ_parser()
|
|
|
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
|
|
|
try:
|
|
with TestClient(app) as client:
|
|
form_data = {
|
|
"domain": "user.example.com",
|
|
"code": "000000",
|
|
"client_id": valid_auth_params["client_id"],
|
|
"redirect_uri": valid_auth_params["redirect_uri"],
|
|
"response_type": valid_auth_params["response_type"],
|
|
"state": valid_auth_params["state"],
|
|
"code_challenge": valid_auth_params["code_challenge"],
|
|
"code_challenge_method": valid_auth_params["code_challenge_method"],
|
|
"scope": "",
|
|
"me": valid_auth_params["me"],
|
|
}
|
|
|
|
response = client.post("/authorize/verify-code", data=form_data)
|
|
|
|
assert response.status_code == 200
|
|
# Should show verify_code page with error
|
|
assert "Invalid verification code" in response.text or "invalid" in response.text.lower()
|
|
# Should still have the form for retry
|
|
assert 'name="code"' in response.text
|
|
finally:
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
class TestDNSFailureHandling:
|
|
"""Tests for DNS verification failure scenarios."""
|
|
|
|
def test_dns_failure_shows_instructions(
|
|
self, configured_app, valid_auth_params
|
|
):
|
|
"""Test that DNS verification failure shows helpful instructions."""
|
|
app, db_path = configured_app
|
|
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
|
|
from gondulf.database.connection import Database
|
|
from sqlalchemy import text
|
|
|
|
# Clear any pre-existing verified domain to ensure test isolation
|
|
db = Database(f"sqlite:///{db_path}")
|
|
db.initialize()
|
|
with db.get_engine().begin() as conn:
|
|
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
|
|
|
|
app.dependency_overrides[get_database] = lambda: db
|
|
|
|
mock_service = create_mock_verification_service(start_success=False, start_error="dns_verification_failed")
|
|
mock_parser = create_mock_happ_parser()
|
|
|
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
|
|
|
try:
|
|
with TestClient(app) as client:
|
|
response = client.get("/authorize", params=valid_auth_params)
|
|
|
|
assert response.status_code == 200
|
|
# Should show error page with DNS instructions
|
|
assert "DNS" in response.text or "dns" in response.text.lower()
|
|
assert "TXT" in response.text
|
|
assert "_gondulf" in response.text
|
|
finally:
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
class TestEmailFailureHandling:
|
|
"""Tests for email discovery failure scenarios."""
|
|
|
|
def test_email_discovery_failure_shows_instructions(
|
|
self, configured_app, valid_auth_params
|
|
):
|
|
"""Test that email discovery failure shows helpful instructions."""
|
|
app, db_path = configured_app
|
|
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
|
|
from gondulf.database.connection import Database
|
|
from sqlalchemy import text
|
|
|
|
# Clear any pre-existing verified domain to ensure test isolation
|
|
db = Database(f"sqlite:///{db_path}")
|
|
db.initialize()
|
|
with db.get_engine().begin() as conn:
|
|
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
|
|
|
|
app.dependency_overrides[get_database] = lambda: db
|
|
|
|
mock_service = create_mock_verification_service(start_success=False, start_error="email_discovery_failed")
|
|
mock_parser = create_mock_happ_parser()
|
|
|
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
|
|
|
try:
|
|
with TestClient(app) as client:
|
|
response = client.get("/authorize", params=valid_auth_params)
|
|
|
|
assert response.status_code == 200
|
|
# Should show error page with email instructions
|
|
assert "email" in response.text.lower()
|
|
finally:
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
class TestFullVerificationFlow:
|
|
"""Integration tests for the complete verification flow."""
|
|
|
|
def test_full_flow_new_domain(
|
|
self, configured_app, valid_auth_params
|
|
):
|
|
"""Test complete flow: unverified domain -> verify code -> consent."""
|
|
app, db_path = configured_app
|
|
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
|
|
from gondulf.database.connection import Database
|
|
from sqlalchemy import text
|
|
|
|
# Clear any pre-existing verified domain to ensure test isolation
|
|
db = Database(f"sqlite:///{db_path}")
|
|
db.initialize()
|
|
with db.get_engine().begin() as conn:
|
|
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
|
|
|
|
app.dependency_overrides[get_database] = lambda: db
|
|
|
|
mock_service = create_mock_verification_service(start_success=True, verify_success=True)
|
|
mock_parser = create_mock_happ_parser()
|
|
|
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
|
|
|
try:
|
|
with TestClient(app) as client:
|
|
# Step 1: GET /authorize -> should show verification form
|
|
response1 = client.get("/authorize", params=valid_auth_params)
|
|
|
|
assert response1.status_code == 200
|
|
assert "Verify Your Identity" in response1.text
|
|
|
|
# Step 2: POST /authorize/verify-code -> should show consent
|
|
form_data = {
|
|
"domain": "user.example.com",
|
|
"code": "123456",
|
|
"client_id": valid_auth_params["client_id"],
|
|
"redirect_uri": valid_auth_params["redirect_uri"],
|
|
"response_type": valid_auth_params["response_type"],
|
|
"state": valid_auth_params["state"],
|
|
"code_challenge": valid_auth_params["code_challenge"],
|
|
"code_challenge_method": valid_auth_params["code_challenge_method"],
|
|
"scope": "",
|
|
"me": valid_auth_params["me"],
|
|
}
|
|
|
|
response2 = client.post("/authorize/verify-code", data=form_data)
|
|
|
|
assert response2.status_code == 200
|
|
# Should show consent page
|
|
assert "Authorization Request" in response2.text or "Authorize" in response2.text
|
|
finally:
|
|
app.dependency_overrides.clear()
|
|
|
|
def test_verification_code_retry_with_correct_code(
|
|
self, configured_app, valid_auth_params
|
|
):
|
|
"""Test that user can retry with correct code after failure."""
|
|
app, _ = configured_app
|
|
from gondulf.dependencies import get_verification_service, get_happ_parser
|
|
|
|
mock_service = Mock()
|
|
# First verify_email_code call fails, second succeeds
|
|
mock_service.verify_email_code.side_effect = [
|
|
{"success": False, "error": "invalid_code"},
|
|
{"success": True, "email": "test@example.com"}
|
|
]
|
|
mock_service.code_storage = Mock()
|
|
mock_service.code_storage.get.return_value = "test@example.com"
|
|
|
|
mock_parser = create_mock_happ_parser()
|
|
|
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
|
|
|
try:
|
|
with TestClient(app) as client:
|
|
form_data = {
|
|
"domain": "user.example.com",
|
|
"code": "000000", # Wrong code
|
|
"client_id": valid_auth_params["client_id"],
|
|
"redirect_uri": valid_auth_params["redirect_uri"],
|
|
"response_type": valid_auth_params["response_type"],
|
|
"state": valid_auth_params["state"],
|
|
"code_challenge": valid_auth_params["code_challenge"],
|
|
"code_challenge_method": valid_auth_params["code_challenge_method"],
|
|
"scope": "",
|
|
"me": valid_auth_params["me"],
|
|
}
|
|
|
|
# First attempt with wrong code
|
|
response1 = client.post("/authorize/verify-code", data=form_data)
|
|
assert response1.status_code == 200
|
|
assert "Invalid" in response1.text or "invalid" in response1.text.lower()
|
|
|
|
# Second attempt with correct code
|
|
form_data["code"] = "123456"
|
|
response2 = client.post("/authorize/verify-code", data=form_data)
|
|
assert response2.status_code == 200
|
|
assert "Authorization Request" in response2.text or "Authorize" in response2.text
|
|
finally:
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
class TestSecurityRequirements:
|
|
"""Tests for security requirements of the fix."""
|
|
|
|
def test_unverified_domain_never_sees_consent_directly(
|
|
self, configured_app, valid_auth_params
|
|
):
|
|
"""Critical: Unverified domains must NEVER see consent page directly."""
|
|
app, db_path = configured_app
|
|
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
|
|
from gondulf.database.connection import Database
|
|
from sqlalchemy import text
|
|
|
|
# Clear any pre-existing verified domain to ensure test isolation
|
|
db = Database(f"sqlite:///{db_path}")
|
|
db.initialize()
|
|
with db.get_engine().begin() as conn:
|
|
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
|
|
|
|
app.dependency_overrides[get_database] = lambda: db
|
|
|
|
mock_service = create_mock_verification_service(start_success=True)
|
|
mock_parser = create_mock_happ_parser()
|
|
|
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
|
|
|
try:
|
|
with TestClient(app) as client:
|
|
response = client.get("/authorize", params=valid_auth_params)
|
|
|
|
# The consent page should NOT be shown
|
|
assert "Authorization Request" not in response.text
|
|
# Verify code page should be shown instead
|
|
assert "Verify Your Identity" in response.text
|
|
finally:
|
|
app.dependency_overrides.clear()
|
|
|
|
def test_state_parameter_preserved_through_flow(
|
|
self, configured_app, valid_auth_params
|
|
):
|
|
"""Test that state parameter is preserved through verification flow."""
|
|
app, _ = configured_app
|
|
from gondulf.dependencies import get_verification_service, get_happ_parser
|
|
|
|
mock_service = create_mock_verification_service(start_success=True)
|
|
mock_parser = create_mock_happ_parser()
|
|
|
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
|
|
|
try:
|
|
unique_state = "unique_state_abc123xyz"
|
|
params = valid_auth_params.copy()
|
|
params["state"] = unique_state
|
|
|
|
with TestClient(app) as client:
|
|
response = client.get("/authorize", params=params)
|
|
|
|
assert response.status_code == 200
|
|
# State should be in hidden form field
|
|
assert f'value="{unique_state}"' in response.text or f"value='{unique_state}'" in response.text
|
|
finally:
|
|
app.dependency_overrides.clear()
|