Compare commits
2 Commits
052d3ad3e1
...
9b50f359a6
| Author | SHA1 | Date | |
|---|---|---|---|
| 9b50f359a6 | |||
| 8dddc73826 |
@@ -5,6 +5,7 @@ Supports both IndieAuth flows per W3C specification:
|
||||
- Authorization (response_type=code): Returns access token, code redeemed at token endpoint
|
||||
"""
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
from urllib.parse import urlencode
|
||||
|
||||
@@ -12,6 +13,7 @@ from fastapi import APIRouter, Depends, Form, Request, Response
|
||||
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
|
||||
from fastapi.templating import Jinja2Templates
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import text
|
||||
|
||||
from gondulf.database.connection import Database
|
||||
from gondulf.dependencies import get_code_storage, get_database, get_happ_parser, get_verification_service
|
||||
@@ -20,6 +22,7 @@ from gondulf.services.happ_parser import HAppParser
|
||||
from gondulf.storage import CodeStore
|
||||
from gondulf.utils.validation import (
|
||||
extract_domain_from_url,
|
||||
mask_email,
|
||||
normalize_client_id,
|
||||
validate_redirect_uri,
|
||||
)
|
||||
@@ -43,6 +46,63 @@ class AuthenticationResponse(BaseModel):
|
||||
me: str
|
||||
|
||||
|
||||
async def check_domain_verified(database: Database, domain: str) -> bool:
|
||||
"""
|
||||
Check if domain is verified in the database.
|
||||
|
||||
Args:
|
||||
database: Database service
|
||||
domain: Domain to check (e.g., "example.com")
|
||||
|
||||
Returns:
|
||||
True if domain is verified, False otherwise
|
||||
"""
|
||||
try:
|
||||
engine = database.get_engine()
|
||||
with engine.connect() as conn:
|
||||
result = conn.execute(
|
||||
text("SELECT verified FROM domains WHERE domain = :domain AND verified = 1"),
|
||||
{"domain": domain}
|
||||
)
|
||||
row = result.fetchone()
|
||||
return row is not None
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to check domain verification: {e}")
|
||||
return False
|
||||
|
||||
|
||||
async def store_verified_domain(database: Database, domain: str, email: str) -> None:
|
||||
"""
|
||||
Store verified domain in database.
|
||||
|
||||
Args:
|
||||
database: Database service
|
||||
domain: Verified domain
|
||||
email: Email used for verification (for audit)
|
||||
"""
|
||||
try:
|
||||
engine = database.get_engine()
|
||||
now = datetime.utcnow()
|
||||
with engine.begin() as conn:
|
||||
# Use INSERT OR REPLACE for SQLite
|
||||
conn.execute(
|
||||
text("""
|
||||
INSERT OR REPLACE INTO domains
|
||||
(domain, email, verification_code, verified, verified_at, two_factor)
|
||||
VALUES (:domain, :email, '', 1, :verified_at, 1)
|
||||
"""),
|
||||
{
|
||||
"domain": domain,
|
||||
"email": email,
|
||||
"verified_at": now
|
||||
}
|
||||
)
|
||||
logger.info(f"Stored verified domain: {domain}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to store verified domain: {e}")
|
||||
raise
|
||||
|
||||
|
||||
@router.get("/authorize")
|
||||
async def authorize_get(
|
||||
request: Request,
|
||||
@@ -55,7 +115,8 @@ async def authorize_get(
|
||||
scope: str | None = None,
|
||||
me: str | None = None,
|
||||
database: Database = Depends(get_database),
|
||||
happ_parser: HAppParser = Depends(get_happ_parser)
|
||||
happ_parser: HAppParser = Depends(get_happ_parser),
|
||||
verification_service: DomainVerificationService = Depends(get_verification_service)
|
||||
) -> HTMLResponse:
|
||||
"""
|
||||
Handle authorization request (GET).
|
||||
@@ -79,9 +140,10 @@ async def authorize_get(
|
||||
me: User identity URL
|
||||
database: Database service
|
||||
happ_parser: H-app parser for client metadata
|
||||
verification_service: Domain verification service
|
||||
|
||||
Returns:
|
||||
HTML response with consent form or error page
|
||||
HTML response with consent form, verification form, or error page
|
||||
"""
|
||||
# Validate required parameters (pre-client validation)
|
||||
if not client_id:
|
||||
@@ -176,9 +238,9 @@ async def authorize_get(
|
||||
redirect_url = f"{redirect_uri}?{urlencode(error_params)}"
|
||||
return RedirectResponse(url=redirect_url, status_code=302)
|
||||
|
||||
# Validate me URL format
|
||||
# Validate me URL format and extract domain
|
||||
try:
|
||||
extract_domain_from_url(me)
|
||||
domain = extract_domain_from_url(me)
|
||||
except ValueError:
|
||||
error_params = {
|
||||
"error": "invalid_request",
|
||||
@@ -188,11 +250,71 @@ async def authorize_get(
|
||||
redirect_url = f"{redirect_uri}?{urlencode(error_params)}"
|
||||
return RedirectResponse(url=redirect_url, status_code=302)
|
||||
|
||||
# Check if domain is verified
|
||||
# For Phase 2, we'll show consent form immediately (domain verification happens separately)
|
||||
# In Phase 3, we'll check database for verified domains
|
||||
# SECURITY FIX: Check if domain is verified before showing consent
|
||||
is_verified = await check_domain_verified(database, domain)
|
||||
|
||||
if not is_verified:
|
||||
logger.info(f"Domain {domain} not verified, starting verification")
|
||||
|
||||
# Start two-factor verification
|
||||
result = verification_service.start_verification(domain, me)
|
||||
|
||||
if not result["success"]:
|
||||
# Verification cannot start (DNS failed, no rel=me, etc)
|
||||
error_message = result.get("error", "verification_failed")
|
||||
|
||||
# Map error codes to user-friendly messages
|
||||
error_messages = {
|
||||
"dns_verification_failed": "DNS verification failed. Please add the required TXT record.",
|
||||
"email_discovery_failed": "Could not find an email address on your homepage. Please add a rel='me' link to your email.",
|
||||
"invalid_email_format": "The email address discovered on your homepage is invalid.",
|
||||
"email_send_failed": "Failed to send verification email. Please try again."
|
||||
}
|
||||
friendly_error = error_messages.get(error_message, error_message)
|
||||
|
||||
logger.warning(f"Verification start failed for domain={domain}: {error_message}")
|
||||
|
||||
return templates.TemplateResponse(
|
||||
"verification_error.html",
|
||||
{
|
||||
"request": request,
|
||||
"error": friendly_error,
|
||||
"domain": domain,
|
||||
"client_id": normalized_client_id,
|
||||
"redirect_uri": redirect_uri,
|
||||
"response_type": effective_response_type,
|
||||
"state": state or "",
|
||||
"code_challenge": code_challenge,
|
||||
"code_challenge_method": code_challenge_method,
|
||||
"scope": scope or "",
|
||||
"me": me
|
||||
},
|
||||
status_code=200
|
||||
)
|
||||
|
||||
# Verification started - show code entry form
|
||||
logger.info(f"Verification code sent for domain={domain}")
|
||||
|
||||
return templates.TemplateResponse(
|
||||
"verify_code.html",
|
||||
{
|
||||
"request": request,
|
||||
"masked_email": result["email"],
|
||||
"domain": domain,
|
||||
"client_id": normalized_client_id,
|
||||
"redirect_uri": redirect_uri,
|
||||
"response_type": effective_response_type,
|
||||
"state": state or "",
|
||||
"code_challenge": code_challenge,
|
||||
"code_challenge_method": code_challenge_method,
|
||||
"scope": scope or "",
|
||||
"me": me
|
||||
}
|
||||
)
|
||||
|
||||
# Domain is verified - fetch client metadata and show consent form
|
||||
logger.info(f"Domain {domain} is verified, showing consent page")
|
||||
|
||||
# Fetch client metadata (h-app microformat)
|
||||
client_metadata = None
|
||||
try:
|
||||
client_metadata = await happ_parser.fetch_and_parse(normalized_client_id)
|
||||
@@ -219,6 +341,118 @@ async def authorize_get(
|
||||
)
|
||||
|
||||
|
||||
@router.post("/authorize/verify-code")
|
||||
async def authorize_verify_code(
|
||||
request: Request,
|
||||
domain: str = Form(...),
|
||||
code: str = Form(...),
|
||||
client_id: str = Form(...),
|
||||
redirect_uri: str = Form(...),
|
||||
response_type: str = Form("id"),
|
||||
state: str = Form(...),
|
||||
code_challenge: str = Form(...),
|
||||
code_challenge_method: str = Form(...),
|
||||
scope: str = Form(""),
|
||||
me: str = Form(...),
|
||||
database: Database = Depends(get_database),
|
||||
verification_service: DomainVerificationService = Depends(get_verification_service),
|
||||
happ_parser: HAppParser = Depends(get_happ_parser)
|
||||
) -> HTMLResponse:
|
||||
"""
|
||||
Handle verification code submission during authorization flow.
|
||||
|
||||
This endpoint is called when user submits the 6-digit email verification code.
|
||||
On success, shows consent page. On failure, shows code entry form with error.
|
||||
|
||||
Args:
|
||||
request: FastAPI request object
|
||||
domain: Domain being verified
|
||||
code: 6-digit verification code from email
|
||||
client_id: Client application identifier
|
||||
redirect_uri: Callback URI
|
||||
response_type: "id" for authentication, "code" for authorization
|
||||
state: Client state parameter
|
||||
code_challenge: PKCE code challenge
|
||||
code_challenge_method: PKCE method
|
||||
scope: Requested scope
|
||||
me: User identity URL
|
||||
database: Database service
|
||||
verification_service: Domain verification service
|
||||
happ_parser: H-app parser for client metadata
|
||||
|
||||
Returns:
|
||||
HTML response: consent page on success, code form with error on failure
|
||||
"""
|
||||
logger.info(f"Verification code submission for domain={domain}")
|
||||
|
||||
# Verify the code
|
||||
result = verification_service.verify_email_code(domain, code)
|
||||
|
||||
if not result["success"]:
|
||||
logger.warning(f"Verification code invalid for domain={domain}: {result.get('error')}")
|
||||
|
||||
# Get masked email for display
|
||||
email = verification_service.code_storage.get(f"email_addr:{domain}")
|
||||
masked = mask_email(email) if email else "unknown"
|
||||
|
||||
# Map error codes to user-friendly messages
|
||||
error_messages = {
|
||||
"invalid_code": "Invalid verification code. Please check and try again.",
|
||||
"email_not_found": "Verification session expired. Please start over."
|
||||
}
|
||||
error_message = result.get("error", "invalid_code")
|
||||
friendly_error = error_messages.get(error_message, error_message)
|
||||
|
||||
return templates.TemplateResponse(
|
||||
"verify_code.html",
|
||||
{
|
||||
"request": request,
|
||||
"error": friendly_error,
|
||||
"masked_email": masked,
|
||||
"domain": domain,
|
||||
"client_id": client_id,
|
||||
"redirect_uri": redirect_uri,
|
||||
"response_type": response_type,
|
||||
"state": state,
|
||||
"code_challenge": code_challenge,
|
||||
"code_challenge_method": code_challenge_method,
|
||||
"scope": scope,
|
||||
"me": me
|
||||
},
|
||||
status_code=200
|
||||
)
|
||||
|
||||
# Code valid - store verified domain in database
|
||||
email = result.get("email", "")
|
||||
await store_verified_domain(database, domain, email)
|
||||
|
||||
logger.info(f"Domain verified successfully: {domain}")
|
||||
|
||||
# Fetch client metadata for consent page
|
||||
client_metadata = None
|
||||
try:
|
||||
client_metadata = await happ_parser.fetch_and_parse(client_id)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to fetch client metadata: {e}")
|
||||
|
||||
# Show consent form
|
||||
return templates.TemplateResponse(
|
||||
"authorize.html",
|
||||
{
|
||||
"request": request,
|
||||
"client_id": client_id,
|
||||
"redirect_uri": redirect_uri,
|
||||
"response_type": response_type,
|
||||
"state": state,
|
||||
"code_challenge": code_challenge,
|
||||
"code_challenge_method": code_challenge_method,
|
||||
"scope": scope,
|
||||
"me": me,
|
||||
"client_metadata": client_metadata
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@router.post("/authorize/consent")
|
||||
async def authorize_consent(
|
||||
request: Request,
|
||||
|
||||
40
src/gondulf/templates/verification_error.html
Normal file
40
src/gondulf/templates/verification_error.html
Normal file
@@ -0,0 +1,40 @@
|
||||
{% extends "base.html" %}
|
||||
|
||||
{% block title %}Verification Failed - Gondulf{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
<h1>Verification Failed</h1>
|
||||
|
||||
<div class="error">
|
||||
<p>{{ error }}</p>
|
||||
</div>
|
||||
|
||||
{% if "DNS" in error or "dns" in error %}
|
||||
<div class="instructions">
|
||||
<h2>How to Fix</h2>
|
||||
<p>Add the following DNS TXT record to your domain:</p>
|
||||
<code>
|
||||
Type: TXT<br>
|
||||
Name: _gondulf.{{ domain }}<br>
|
||||
Value: gondulf-verify-domain
|
||||
</code>
|
||||
<p>DNS changes may take up to 24 hours to propagate.</p>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if "email" in error.lower() or "rel" in error.lower() %}
|
||||
<div class="instructions">
|
||||
<h2>How to Fix</h2>
|
||||
<p>Add a rel="me" link to your homepage pointing to your email:</p>
|
||||
<code><link rel="me" href="mailto:you@example.com"></code>
|
||||
<p>Or as an anchor tag:</p>
|
||||
<code><a rel="me" href="mailto:you@example.com">Email me</a></code>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
<p>
|
||||
<a href="/authorize?client_id={{ client_id }}&redirect_uri={{ redirect_uri }}&response_type={{ response_type }}&state={{ state }}&code_challenge={{ code_challenge }}&code_challenge_method={{ code_challenge_method }}&scope={{ scope }}&me={{ me }}">
|
||||
Try Again
|
||||
</a>
|
||||
</p>
|
||||
{% endblock %}
|
||||
51
src/gondulf/templates/verify_code.html
Normal file
51
src/gondulf/templates/verify_code.html
Normal file
@@ -0,0 +1,51 @@
|
||||
{% extends "base.html" %}
|
||||
|
||||
{% block title %}Verify Your Identity - Gondulf{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
<h1>Verify Your Identity</h1>
|
||||
|
||||
<p>To sign in as <strong>{{ domain }}</strong>, please enter the verification code sent to <strong>{{ masked_email }}</strong>.</p>
|
||||
|
||||
{% if error %}
|
||||
<div class="error">
|
||||
<p>{{ error }}</p>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
<form method="POST" action="/authorize/verify-code">
|
||||
<!-- Pass through authorization parameters -->
|
||||
<input type="hidden" name="domain" value="{{ domain }}">
|
||||
<input type="hidden" name="client_id" value="{{ client_id }}">
|
||||
<input type="hidden" name="redirect_uri" value="{{ redirect_uri }}">
|
||||
<input type="hidden" name="response_type" value="{{ response_type }}">
|
||||
<input type="hidden" name="state" value="{{ state }}">
|
||||
<input type="hidden" name="code_challenge" value="{{ code_challenge }}">
|
||||
<input type="hidden" name="code_challenge_method" value="{{ code_challenge_method }}">
|
||||
<input type="hidden" name="scope" value="{{ scope }}">
|
||||
<input type="hidden" name="me" value="{{ me }}">
|
||||
|
||||
<div class="form-group">
|
||||
<label for="code">Verification Code:</label>
|
||||
<input type="text"
|
||||
id="code"
|
||||
name="code"
|
||||
placeholder="000000"
|
||||
maxlength="6"
|
||||
pattern="[0-9]{6}"
|
||||
inputmode="numeric"
|
||||
autocomplete="one-time-code"
|
||||
required
|
||||
autofocus>
|
||||
</div>
|
||||
|
||||
<button type="submit">Verify</button>
|
||||
</form>
|
||||
|
||||
<p class="help-text">
|
||||
Did not receive a code? Check your spam folder.
|
||||
<a href="/authorize?client_id={{ client_id }}&redirect_uri={{ redirect_uri }}&response_type={{ response_type }}&state={{ state }}&code_challenge={{ code_challenge }}&code_challenge_method={{ code_challenge_method }}&scope={{ scope }}&me={{ me }}">
|
||||
Request a new code
|
||||
</a>
|
||||
</p>
|
||||
{% endblock %}
|
||||
532
tests/integration/api/test_authorization_verification.py
Normal file
532
tests/integration/api/test_authorization_verification.py
Normal file
@@ -0,0 +1,532 @@
|
||||
"""
|
||||
Integration tests for authorization endpoint domain verification.
|
||||
|
||||
Tests the security fix that requires domain verification before showing the consent page.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from unittest.mock import AsyncMock, Mock, patch
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def valid_auth_params():
|
||||
"""Valid authorization request parameters."""
|
||||
return {
|
||||
"client_id": "https://app.example.com",
|
||||
"redirect_uri": "https://app.example.com/callback",
|
||||
"response_type": "code",
|
||||
"state": "test123",
|
||||
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
|
||||
"code_challenge_method": "S256",
|
||||
"me": "https://user.example.com",
|
||||
}
|
||||
|
||||
|
||||
def create_mock_verification_service(start_success=True, verify_success=True, start_error="dns_verification_failed"):
|
||||
"""Create a mock verification service with configurable behavior."""
|
||||
mock_service = Mock()
|
||||
|
||||
if start_success:
|
||||
mock_service.start_verification.return_value = {
|
||||
"success": True,
|
||||
"email": "t***@example.com",
|
||||
"verification_method": "email"
|
||||
}
|
||||
else:
|
||||
mock_service.start_verification.return_value = {
|
||||
"success": False,
|
||||
"error": start_error
|
||||
}
|
||||
|
||||
if verify_success:
|
||||
mock_service.verify_email_code.return_value = {
|
||||
"success": True,
|
||||
"email": "test@example.com"
|
||||
}
|
||||
else:
|
||||
mock_service.verify_email_code.return_value = {
|
||||
"success": False,
|
||||
"error": "invalid_code"
|
||||
}
|
||||
|
||||
mock_service.code_storage = Mock()
|
||||
mock_service.code_storage.get.return_value = "test@example.com"
|
||||
mock_service.create_authorization_code.return_value = "test_auth_code_12345"
|
||||
|
||||
return mock_service
|
||||
|
||||
|
||||
def create_mock_happ_parser():
|
||||
"""Create a mock h-app parser."""
|
||||
from gondulf.services.happ_parser import ClientMetadata
|
||||
|
||||
mock_parser = Mock()
|
||||
mock_parser.fetch_and_parse = AsyncMock(return_value=ClientMetadata(
|
||||
name="Test Application",
|
||||
url="https://app.example.com",
|
||||
logo="https://app.example.com/logo.png"
|
||||
))
|
||||
return mock_parser
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def configured_app(monkeypatch, tmp_path):
|
||||
"""Create a fully configured app with fresh database."""
|
||||
db_path = tmp_path / "test.db"
|
||||
|
||||
monkeypatch.setenv("GONDULF_SECRET_KEY", "a" * 32)
|
||||
monkeypatch.setenv("GONDULF_BASE_URL", "https://auth.example.com")
|
||||
monkeypatch.setenv("GONDULF_DATABASE_URL", f"sqlite:///{db_path}")
|
||||
monkeypatch.setenv("GONDULF_DEBUG", "true")
|
||||
|
||||
from gondulf.main import app
|
||||
return app, db_path
|
||||
|
||||
|
||||
class TestUnverifiedDomainTriggersVerification:
|
||||
"""Tests that unverified domains trigger the verification flow."""
|
||||
|
||||
def test_unverified_domain_shows_verification_form(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test that an unverified domain shows the verification code form."""
|
||||
app, _ = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||
|
||||
mock_service = create_mock_verification_service(start_success=True)
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/authorize", params=valid_auth_params)
|
||||
|
||||
assert response.status_code == 200
|
||||
# Should show verification form, not consent form
|
||||
assert "Verify Your Identity" in response.text
|
||||
assert "verification code" in response.text.lower()
|
||||
# Should show masked email
|
||||
assert "t***@example.com" in response.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
def test_unverified_domain_preserves_auth_params(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test that authorization parameters are preserved in verification form."""
|
||||
app, _ = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||
|
||||
mock_service = create_mock_verification_service(start_success=True)
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/authorize", params=valid_auth_params)
|
||||
|
||||
assert response.status_code == 200
|
||||
# Check hidden fields contain auth params
|
||||
assert 'name="client_id"' in response.text
|
||||
assert 'name="redirect_uri"' in response.text
|
||||
assert 'name="state"' in response.text
|
||||
assert 'name="code_challenge"' in response.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
def test_unverified_domain_does_not_show_consent(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test that unverified domain does NOT show consent form directly."""
|
||||
app, _ = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||
|
||||
mock_service = create_mock_verification_service(start_success=True)
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/authorize", params=valid_auth_params)
|
||||
|
||||
assert response.status_code == 200
|
||||
# Should NOT show consent/authorization form
|
||||
assert "Authorization Request" not in response.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
class TestVerifiedDomainShowsConsent:
|
||||
"""Tests that verified domains skip verification and show consent."""
|
||||
|
||||
def test_verified_domain_shows_consent_page(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test that a verified domain shows consent page directly."""
|
||||
app, db_path = configured_app
|
||||
from gondulf.dependencies import get_happ_parser, get_database
|
||||
from gondulf.database.connection import Database
|
||||
from sqlalchemy import text
|
||||
|
||||
# Create database and insert verified domain
|
||||
db = Database(f"sqlite:///{db_path}")
|
||||
db.initialize()
|
||||
|
||||
with db.get_engine().begin() as conn:
|
||||
conn.execute(
|
||||
text("""
|
||||
INSERT INTO domains (domain, email, verification_code, verified, verified_at, two_factor)
|
||||
VALUES (:domain, :email, '', 1, :verified_at, 1)
|
||||
"""),
|
||||
{"domain": "user.example.com", "email": "test@example.com", "verified_at": datetime.utcnow()}
|
||||
)
|
||||
|
||||
# Override database to use same instance
|
||||
app.dependency_overrides[get_database] = lambda: db
|
||||
mock_parser = create_mock_happ_parser()
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/authorize", params=valid_auth_params)
|
||||
|
||||
# Should show consent page
|
||||
assert response.status_code == 200
|
||||
assert "Authorization Request" in response.text
|
||||
assert 'action="/authorize/consent"' in response.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
class TestVerificationCodeValidation:
|
||||
"""Tests for the verification code submission endpoint."""
|
||||
|
||||
def test_valid_code_shows_consent(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test that valid verification code shows consent page."""
|
||||
app, _ = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||
|
||||
mock_service = create_mock_verification_service(verify_success=True)
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
form_data = {
|
||||
"domain": "user.example.com",
|
||||
"code": "123456",
|
||||
"client_id": valid_auth_params["client_id"],
|
||||
"redirect_uri": valid_auth_params["redirect_uri"],
|
||||
"response_type": valid_auth_params["response_type"],
|
||||
"state": valid_auth_params["state"],
|
||||
"code_challenge": valid_auth_params["code_challenge"],
|
||||
"code_challenge_method": valid_auth_params["code_challenge_method"],
|
||||
"scope": "",
|
||||
"me": valid_auth_params["me"],
|
||||
}
|
||||
|
||||
response = client.post("/authorize/verify-code", data=form_data)
|
||||
|
||||
assert response.status_code == 200
|
||||
# Should show consent page after successful verification
|
||||
assert "Authorization Request" in response.text or "Authorize" in response.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
def test_invalid_code_shows_error_with_retry(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test that invalid code shows error and allows retry."""
|
||||
app, _ = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||
|
||||
mock_service = create_mock_verification_service(verify_success=False)
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
form_data = {
|
||||
"domain": "user.example.com",
|
||||
"code": "000000",
|
||||
"client_id": valid_auth_params["client_id"],
|
||||
"redirect_uri": valid_auth_params["redirect_uri"],
|
||||
"response_type": valid_auth_params["response_type"],
|
||||
"state": valid_auth_params["state"],
|
||||
"code_challenge": valid_auth_params["code_challenge"],
|
||||
"code_challenge_method": valid_auth_params["code_challenge_method"],
|
||||
"scope": "",
|
||||
"me": valid_auth_params["me"],
|
||||
}
|
||||
|
||||
response = client.post("/authorize/verify-code", data=form_data)
|
||||
|
||||
assert response.status_code == 200
|
||||
# Should show verify_code page with error
|
||||
assert "Invalid verification code" in response.text or "invalid" in response.text.lower()
|
||||
# Should still have the form for retry
|
||||
assert 'name="code"' in response.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
class TestDNSFailureHandling:
|
||||
"""Tests for DNS verification failure scenarios."""
|
||||
|
||||
def test_dns_failure_shows_instructions(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test that DNS verification failure shows helpful instructions."""
|
||||
app, db_path = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
|
||||
from gondulf.database.connection import Database
|
||||
from sqlalchemy import text
|
||||
|
||||
# Clear any pre-existing verified domain to ensure test isolation
|
||||
db = Database(f"sqlite:///{db_path}")
|
||||
db.initialize()
|
||||
with db.get_engine().begin() as conn:
|
||||
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
|
||||
|
||||
app.dependency_overrides[get_database] = lambda: db
|
||||
|
||||
mock_service = create_mock_verification_service(start_success=False, start_error="dns_verification_failed")
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/authorize", params=valid_auth_params)
|
||||
|
||||
assert response.status_code == 200
|
||||
# Should show error page with DNS instructions
|
||||
assert "DNS" in response.text or "dns" in response.text.lower()
|
||||
assert "TXT" in response.text
|
||||
assert "_gondulf" in response.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
class TestEmailFailureHandling:
|
||||
"""Tests for email discovery failure scenarios."""
|
||||
|
||||
def test_email_discovery_failure_shows_instructions(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test that email discovery failure shows helpful instructions."""
|
||||
app, db_path = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
|
||||
from gondulf.database.connection import Database
|
||||
from sqlalchemy import text
|
||||
|
||||
# Clear any pre-existing verified domain to ensure test isolation
|
||||
db = Database(f"sqlite:///{db_path}")
|
||||
db.initialize()
|
||||
with db.get_engine().begin() as conn:
|
||||
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
|
||||
|
||||
app.dependency_overrides[get_database] = lambda: db
|
||||
|
||||
mock_service = create_mock_verification_service(start_success=False, start_error="email_discovery_failed")
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/authorize", params=valid_auth_params)
|
||||
|
||||
assert response.status_code == 200
|
||||
# Should show error page with email instructions
|
||||
assert "email" in response.text.lower()
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
class TestFullVerificationFlow:
|
||||
"""Integration tests for the complete verification flow."""
|
||||
|
||||
def test_full_flow_new_domain(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test complete flow: unverified domain -> verify code -> consent."""
|
||||
app, db_path = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
|
||||
from gondulf.database.connection import Database
|
||||
from sqlalchemy import text
|
||||
|
||||
# Clear any pre-existing verified domain to ensure test isolation
|
||||
db = Database(f"sqlite:///{db_path}")
|
||||
db.initialize()
|
||||
with db.get_engine().begin() as conn:
|
||||
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
|
||||
|
||||
app.dependency_overrides[get_database] = lambda: db
|
||||
|
||||
mock_service = create_mock_verification_service(start_success=True, verify_success=True)
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
# Step 1: GET /authorize -> should show verification form
|
||||
response1 = client.get("/authorize", params=valid_auth_params)
|
||||
|
||||
assert response1.status_code == 200
|
||||
assert "Verify Your Identity" in response1.text
|
||||
|
||||
# Step 2: POST /authorize/verify-code -> should show consent
|
||||
form_data = {
|
||||
"domain": "user.example.com",
|
||||
"code": "123456",
|
||||
"client_id": valid_auth_params["client_id"],
|
||||
"redirect_uri": valid_auth_params["redirect_uri"],
|
||||
"response_type": valid_auth_params["response_type"],
|
||||
"state": valid_auth_params["state"],
|
||||
"code_challenge": valid_auth_params["code_challenge"],
|
||||
"code_challenge_method": valid_auth_params["code_challenge_method"],
|
||||
"scope": "",
|
||||
"me": valid_auth_params["me"],
|
||||
}
|
||||
|
||||
response2 = client.post("/authorize/verify-code", data=form_data)
|
||||
|
||||
assert response2.status_code == 200
|
||||
# Should show consent page
|
||||
assert "Authorization Request" in response2.text or "Authorize" in response2.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
def test_verification_code_retry_with_correct_code(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test that user can retry with correct code after failure."""
|
||||
app, _ = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||
|
||||
mock_service = Mock()
|
||||
# First verify_email_code call fails, second succeeds
|
||||
mock_service.verify_email_code.side_effect = [
|
||||
{"success": False, "error": "invalid_code"},
|
||||
{"success": True, "email": "test@example.com"}
|
||||
]
|
||||
mock_service.code_storage = Mock()
|
||||
mock_service.code_storage.get.return_value = "test@example.com"
|
||||
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
form_data = {
|
||||
"domain": "user.example.com",
|
||||
"code": "000000", # Wrong code
|
||||
"client_id": valid_auth_params["client_id"],
|
||||
"redirect_uri": valid_auth_params["redirect_uri"],
|
||||
"response_type": valid_auth_params["response_type"],
|
||||
"state": valid_auth_params["state"],
|
||||
"code_challenge": valid_auth_params["code_challenge"],
|
||||
"code_challenge_method": valid_auth_params["code_challenge_method"],
|
||||
"scope": "",
|
||||
"me": valid_auth_params["me"],
|
||||
}
|
||||
|
||||
# First attempt with wrong code
|
||||
response1 = client.post("/authorize/verify-code", data=form_data)
|
||||
assert response1.status_code == 200
|
||||
assert "Invalid" in response1.text or "invalid" in response1.text.lower()
|
||||
|
||||
# Second attempt with correct code
|
||||
form_data["code"] = "123456"
|
||||
response2 = client.post("/authorize/verify-code", data=form_data)
|
||||
assert response2.status_code == 200
|
||||
assert "Authorization Request" in response2.text or "Authorize" in response2.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
class TestSecurityRequirements:
|
||||
"""Tests for security requirements of the fix."""
|
||||
|
||||
def test_unverified_domain_never_sees_consent_directly(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Critical: Unverified domains must NEVER see consent page directly."""
|
||||
app, db_path = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
|
||||
from gondulf.database.connection import Database
|
||||
from sqlalchemy import text
|
||||
|
||||
# Clear any pre-existing verified domain to ensure test isolation
|
||||
db = Database(f"sqlite:///{db_path}")
|
||||
db.initialize()
|
||||
with db.get_engine().begin() as conn:
|
||||
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
|
||||
|
||||
app.dependency_overrides[get_database] = lambda: db
|
||||
|
||||
mock_service = create_mock_verification_service(start_success=True)
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/authorize", params=valid_auth_params)
|
||||
|
||||
# The consent page should NOT be shown
|
||||
assert "Authorization Request" not in response.text
|
||||
# Verify code page should be shown instead
|
||||
assert "Verify Your Identity" in response.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
def test_state_parameter_preserved_through_flow(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test that state parameter is preserved through verification flow."""
|
||||
app, _ = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||
|
||||
mock_service = create_mock_verification_service(start_success=True)
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
unique_state = "unique_state_abc123xyz"
|
||||
params = valid_auth_params.copy()
|
||||
params["state"] = unique_state
|
||||
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/authorize", params=params)
|
||||
|
||||
assert response.status_code == 200
|
||||
# State should be in hidden form field
|
||||
assert f'value="{unique_state}"' in response.text or f"value='{unique_state}'" in response.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
Reference in New Issue
Block a user