Compare commits
2 Commits
052d3ad3e1
...
v1.0.0-rc.
| Author | SHA1 | Date | |
|---|---|---|---|
| 9b50f359a6 | |||
| 8dddc73826 |
@@ -5,6 +5,7 @@ Supports both IndieAuth flows per W3C specification:
|
|||||||
- Authorization (response_type=code): Returns access token, code redeemed at token endpoint
|
- Authorization (response_type=code): Returns access token, code redeemed at token endpoint
|
||||||
"""
|
"""
|
||||||
import logging
|
import logging
|
||||||
|
from datetime import datetime
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from urllib.parse import urlencode
|
from urllib.parse import urlencode
|
||||||
|
|
||||||
@@ -12,6 +13,7 @@ from fastapi import APIRouter, Depends, Form, Request, Response
|
|||||||
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
|
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
|
||||||
from fastapi.templating import Jinja2Templates
|
from fastapi.templating import Jinja2Templates
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
from sqlalchemy import text
|
||||||
|
|
||||||
from gondulf.database.connection import Database
|
from gondulf.database.connection import Database
|
||||||
from gondulf.dependencies import get_code_storage, get_database, get_happ_parser, get_verification_service
|
from gondulf.dependencies import get_code_storage, get_database, get_happ_parser, get_verification_service
|
||||||
@@ -20,6 +22,7 @@ from gondulf.services.happ_parser import HAppParser
|
|||||||
from gondulf.storage import CodeStore
|
from gondulf.storage import CodeStore
|
||||||
from gondulf.utils.validation import (
|
from gondulf.utils.validation import (
|
||||||
extract_domain_from_url,
|
extract_domain_from_url,
|
||||||
|
mask_email,
|
||||||
normalize_client_id,
|
normalize_client_id,
|
||||||
validate_redirect_uri,
|
validate_redirect_uri,
|
||||||
)
|
)
|
||||||
@@ -43,6 +46,63 @@ class AuthenticationResponse(BaseModel):
|
|||||||
me: str
|
me: str
|
||||||
|
|
||||||
|
|
||||||
|
async def check_domain_verified(database: Database, domain: str) -> bool:
|
||||||
|
"""
|
||||||
|
Check if domain is verified in the database.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
database: Database service
|
||||||
|
domain: Domain to check (e.g., "example.com")
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if domain is verified, False otherwise
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
engine = database.get_engine()
|
||||||
|
with engine.connect() as conn:
|
||||||
|
result = conn.execute(
|
||||||
|
text("SELECT verified FROM domains WHERE domain = :domain AND verified = 1"),
|
||||||
|
{"domain": domain}
|
||||||
|
)
|
||||||
|
row = result.fetchone()
|
||||||
|
return row is not None
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to check domain verification: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
async def store_verified_domain(database: Database, domain: str, email: str) -> None:
|
||||||
|
"""
|
||||||
|
Store verified domain in database.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
database: Database service
|
||||||
|
domain: Verified domain
|
||||||
|
email: Email used for verification (for audit)
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
engine = database.get_engine()
|
||||||
|
now = datetime.utcnow()
|
||||||
|
with engine.begin() as conn:
|
||||||
|
# Use INSERT OR REPLACE for SQLite
|
||||||
|
conn.execute(
|
||||||
|
text("""
|
||||||
|
INSERT OR REPLACE INTO domains
|
||||||
|
(domain, email, verification_code, verified, verified_at, two_factor)
|
||||||
|
VALUES (:domain, :email, '', 1, :verified_at, 1)
|
||||||
|
"""),
|
||||||
|
{
|
||||||
|
"domain": domain,
|
||||||
|
"email": email,
|
||||||
|
"verified_at": now
|
||||||
|
}
|
||||||
|
)
|
||||||
|
logger.info(f"Stored verified domain: {domain}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to store verified domain: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
@router.get("/authorize")
|
@router.get("/authorize")
|
||||||
async def authorize_get(
|
async def authorize_get(
|
||||||
request: Request,
|
request: Request,
|
||||||
@@ -55,7 +115,8 @@ async def authorize_get(
|
|||||||
scope: str | None = None,
|
scope: str | None = None,
|
||||||
me: str | None = None,
|
me: str | None = None,
|
||||||
database: Database = Depends(get_database),
|
database: Database = Depends(get_database),
|
||||||
happ_parser: HAppParser = Depends(get_happ_parser)
|
happ_parser: HAppParser = Depends(get_happ_parser),
|
||||||
|
verification_service: DomainVerificationService = Depends(get_verification_service)
|
||||||
) -> HTMLResponse:
|
) -> HTMLResponse:
|
||||||
"""
|
"""
|
||||||
Handle authorization request (GET).
|
Handle authorization request (GET).
|
||||||
@@ -79,9 +140,10 @@ async def authorize_get(
|
|||||||
me: User identity URL
|
me: User identity URL
|
||||||
database: Database service
|
database: Database service
|
||||||
happ_parser: H-app parser for client metadata
|
happ_parser: H-app parser for client metadata
|
||||||
|
verification_service: Domain verification service
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
HTML response with consent form or error page
|
HTML response with consent form, verification form, or error page
|
||||||
"""
|
"""
|
||||||
# Validate required parameters (pre-client validation)
|
# Validate required parameters (pre-client validation)
|
||||||
if not client_id:
|
if not client_id:
|
||||||
@@ -176,9 +238,9 @@ async def authorize_get(
|
|||||||
redirect_url = f"{redirect_uri}?{urlencode(error_params)}"
|
redirect_url = f"{redirect_uri}?{urlencode(error_params)}"
|
||||||
return RedirectResponse(url=redirect_url, status_code=302)
|
return RedirectResponse(url=redirect_url, status_code=302)
|
||||||
|
|
||||||
# Validate me URL format
|
# Validate me URL format and extract domain
|
||||||
try:
|
try:
|
||||||
extract_domain_from_url(me)
|
domain = extract_domain_from_url(me)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
error_params = {
|
error_params = {
|
||||||
"error": "invalid_request",
|
"error": "invalid_request",
|
||||||
@@ -188,11 +250,71 @@ async def authorize_get(
|
|||||||
redirect_url = f"{redirect_uri}?{urlencode(error_params)}"
|
redirect_url = f"{redirect_uri}?{urlencode(error_params)}"
|
||||||
return RedirectResponse(url=redirect_url, status_code=302)
|
return RedirectResponse(url=redirect_url, status_code=302)
|
||||||
|
|
||||||
# Check if domain is verified
|
# SECURITY FIX: Check if domain is verified before showing consent
|
||||||
# For Phase 2, we'll show consent form immediately (domain verification happens separately)
|
is_verified = await check_domain_verified(database, domain)
|
||||||
# In Phase 3, we'll check database for verified domains
|
|
||||||
|
if not is_verified:
|
||||||
|
logger.info(f"Domain {domain} not verified, starting verification")
|
||||||
|
|
||||||
|
# Start two-factor verification
|
||||||
|
result = verification_service.start_verification(domain, me)
|
||||||
|
|
||||||
|
if not result["success"]:
|
||||||
|
# Verification cannot start (DNS failed, no rel=me, etc)
|
||||||
|
error_message = result.get("error", "verification_failed")
|
||||||
|
|
||||||
|
# Map error codes to user-friendly messages
|
||||||
|
error_messages = {
|
||||||
|
"dns_verification_failed": "DNS verification failed. Please add the required TXT record.",
|
||||||
|
"email_discovery_failed": "Could not find an email address on your homepage. Please add a rel='me' link to your email.",
|
||||||
|
"invalid_email_format": "The email address discovered on your homepage is invalid.",
|
||||||
|
"email_send_failed": "Failed to send verification email. Please try again."
|
||||||
|
}
|
||||||
|
friendly_error = error_messages.get(error_message, error_message)
|
||||||
|
|
||||||
|
logger.warning(f"Verification start failed for domain={domain}: {error_message}")
|
||||||
|
|
||||||
|
return templates.TemplateResponse(
|
||||||
|
"verification_error.html",
|
||||||
|
{
|
||||||
|
"request": request,
|
||||||
|
"error": friendly_error,
|
||||||
|
"domain": domain,
|
||||||
|
"client_id": normalized_client_id,
|
||||||
|
"redirect_uri": redirect_uri,
|
||||||
|
"response_type": effective_response_type,
|
||||||
|
"state": state or "",
|
||||||
|
"code_challenge": code_challenge,
|
||||||
|
"code_challenge_method": code_challenge_method,
|
||||||
|
"scope": scope or "",
|
||||||
|
"me": me
|
||||||
|
},
|
||||||
|
status_code=200
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verification started - show code entry form
|
||||||
|
logger.info(f"Verification code sent for domain={domain}")
|
||||||
|
|
||||||
|
return templates.TemplateResponse(
|
||||||
|
"verify_code.html",
|
||||||
|
{
|
||||||
|
"request": request,
|
||||||
|
"masked_email": result["email"],
|
||||||
|
"domain": domain,
|
||||||
|
"client_id": normalized_client_id,
|
||||||
|
"redirect_uri": redirect_uri,
|
||||||
|
"response_type": effective_response_type,
|
||||||
|
"state": state or "",
|
||||||
|
"code_challenge": code_challenge,
|
||||||
|
"code_challenge_method": code_challenge_method,
|
||||||
|
"scope": scope or "",
|
||||||
|
"me": me
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Domain is verified - fetch client metadata and show consent form
|
||||||
|
logger.info(f"Domain {domain} is verified, showing consent page")
|
||||||
|
|
||||||
# Fetch client metadata (h-app microformat)
|
|
||||||
client_metadata = None
|
client_metadata = None
|
||||||
try:
|
try:
|
||||||
client_metadata = await happ_parser.fetch_and_parse(normalized_client_id)
|
client_metadata = await happ_parser.fetch_and_parse(normalized_client_id)
|
||||||
@@ -219,6 +341,118 @@ async def authorize_get(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/authorize/verify-code")
|
||||||
|
async def authorize_verify_code(
|
||||||
|
request: Request,
|
||||||
|
domain: str = Form(...),
|
||||||
|
code: str = Form(...),
|
||||||
|
client_id: str = Form(...),
|
||||||
|
redirect_uri: str = Form(...),
|
||||||
|
response_type: str = Form("id"),
|
||||||
|
state: str = Form(...),
|
||||||
|
code_challenge: str = Form(...),
|
||||||
|
code_challenge_method: str = Form(...),
|
||||||
|
scope: str = Form(""),
|
||||||
|
me: str = Form(...),
|
||||||
|
database: Database = Depends(get_database),
|
||||||
|
verification_service: DomainVerificationService = Depends(get_verification_service),
|
||||||
|
happ_parser: HAppParser = Depends(get_happ_parser)
|
||||||
|
) -> HTMLResponse:
|
||||||
|
"""
|
||||||
|
Handle verification code submission during authorization flow.
|
||||||
|
|
||||||
|
This endpoint is called when user submits the 6-digit email verification code.
|
||||||
|
On success, shows consent page. On failure, shows code entry form with error.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
request: FastAPI request object
|
||||||
|
domain: Domain being verified
|
||||||
|
code: 6-digit verification code from email
|
||||||
|
client_id: Client application identifier
|
||||||
|
redirect_uri: Callback URI
|
||||||
|
response_type: "id" for authentication, "code" for authorization
|
||||||
|
state: Client state parameter
|
||||||
|
code_challenge: PKCE code challenge
|
||||||
|
code_challenge_method: PKCE method
|
||||||
|
scope: Requested scope
|
||||||
|
me: User identity URL
|
||||||
|
database: Database service
|
||||||
|
verification_service: Domain verification service
|
||||||
|
happ_parser: H-app parser for client metadata
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
HTML response: consent page on success, code form with error on failure
|
||||||
|
"""
|
||||||
|
logger.info(f"Verification code submission for domain={domain}")
|
||||||
|
|
||||||
|
# Verify the code
|
||||||
|
result = verification_service.verify_email_code(domain, code)
|
||||||
|
|
||||||
|
if not result["success"]:
|
||||||
|
logger.warning(f"Verification code invalid for domain={domain}: {result.get('error')}")
|
||||||
|
|
||||||
|
# Get masked email for display
|
||||||
|
email = verification_service.code_storage.get(f"email_addr:{domain}")
|
||||||
|
masked = mask_email(email) if email else "unknown"
|
||||||
|
|
||||||
|
# Map error codes to user-friendly messages
|
||||||
|
error_messages = {
|
||||||
|
"invalid_code": "Invalid verification code. Please check and try again.",
|
||||||
|
"email_not_found": "Verification session expired. Please start over."
|
||||||
|
}
|
||||||
|
error_message = result.get("error", "invalid_code")
|
||||||
|
friendly_error = error_messages.get(error_message, error_message)
|
||||||
|
|
||||||
|
return templates.TemplateResponse(
|
||||||
|
"verify_code.html",
|
||||||
|
{
|
||||||
|
"request": request,
|
||||||
|
"error": friendly_error,
|
||||||
|
"masked_email": masked,
|
||||||
|
"domain": domain,
|
||||||
|
"client_id": client_id,
|
||||||
|
"redirect_uri": redirect_uri,
|
||||||
|
"response_type": response_type,
|
||||||
|
"state": state,
|
||||||
|
"code_challenge": code_challenge,
|
||||||
|
"code_challenge_method": code_challenge_method,
|
||||||
|
"scope": scope,
|
||||||
|
"me": me
|
||||||
|
},
|
||||||
|
status_code=200
|
||||||
|
)
|
||||||
|
|
||||||
|
# Code valid - store verified domain in database
|
||||||
|
email = result.get("email", "")
|
||||||
|
await store_verified_domain(database, domain, email)
|
||||||
|
|
||||||
|
logger.info(f"Domain verified successfully: {domain}")
|
||||||
|
|
||||||
|
# Fetch client metadata for consent page
|
||||||
|
client_metadata = None
|
||||||
|
try:
|
||||||
|
client_metadata = await happ_parser.fetch_and_parse(client_id)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to fetch client metadata: {e}")
|
||||||
|
|
||||||
|
# Show consent form
|
||||||
|
return templates.TemplateResponse(
|
||||||
|
"authorize.html",
|
||||||
|
{
|
||||||
|
"request": request,
|
||||||
|
"client_id": client_id,
|
||||||
|
"redirect_uri": redirect_uri,
|
||||||
|
"response_type": response_type,
|
||||||
|
"state": state,
|
||||||
|
"code_challenge": code_challenge,
|
||||||
|
"code_challenge_method": code_challenge_method,
|
||||||
|
"scope": scope,
|
||||||
|
"me": me,
|
||||||
|
"client_metadata": client_metadata
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@router.post("/authorize/consent")
|
@router.post("/authorize/consent")
|
||||||
async def authorize_consent(
|
async def authorize_consent(
|
||||||
request: Request,
|
request: Request,
|
||||||
|
|||||||
40
src/gondulf/templates/verification_error.html
Normal file
40
src/gondulf/templates/verification_error.html
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
{% extends "base.html" %}
|
||||||
|
|
||||||
|
{% block title %}Verification Failed - Gondulf{% endblock %}
|
||||||
|
|
||||||
|
{% block content %}
|
||||||
|
<h1>Verification Failed</h1>
|
||||||
|
|
||||||
|
<div class="error">
|
||||||
|
<p>{{ error }}</p>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
{% if "DNS" in error or "dns" in error %}
|
||||||
|
<div class="instructions">
|
||||||
|
<h2>How to Fix</h2>
|
||||||
|
<p>Add the following DNS TXT record to your domain:</p>
|
||||||
|
<code>
|
||||||
|
Type: TXT<br>
|
||||||
|
Name: _gondulf.{{ domain }}<br>
|
||||||
|
Value: gondulf-verify-domain
|
||||||
|
</code>
|
||||||
|
<p>DNS changes may take up to 24 hours to propagate.</p>
|
||||||
|
</div>
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
|
{% if "email" in error.lower() or "rel" in error.lower() %}
|
||||||
|
<div class="instructions">
|
||||||
|
<h2>How to Fix</h2>
|
||||||
|
<p>Add a rel="me" link to your homepage pointing to your email:</p>
|
||||||
|
<code><link rel="me" href="mailto:you@example.com"></code>
|
||||||
|
<p>Or as an anchor tag:</p>
|
||||||
|
<code><a rel="me" href="mailto:you@example.com">Email me</a></code>
|
||||||
|
</div>
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
|
<p>
|
||||||
|
<a href="/authorize?client_id={{ client_id }}&redirect_uri={{ redirect_uri }}&response_type={{ response_type }}&state={{ state }}&code_challenge={{ code_challenge }}&code_challenge_method={{ code_challenge_method }}&scope={{ scope }}&me={{ me }}">
|
||||||
|
Try Again
|
||||||
|
</a>
|
||||||
|
</p>
|
||||||
|
{% endblock %}
|
||||||
51
src/gondulf/templates/verify_code.html
Normal file
51
src/gondulf/templates/verify_code.html
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
{% extends "base.html" %}
|
||||||
|
|
||||||
|
{% block title %}Verify Your Identity - Gondulf{% endblock %}
|
||||||
|
|
||||||
|
{% block content %}
|
||||||
|
<h1>Verify Your Identity</h1>
|
||||||
|
|
||||||
|
<p>To sign in as <strong>{{ domain }}</strong>, please enter the verification code sent to <strong>{{ masked_email }}</strong>.</p>
|
||||||
|
|
||||||
|
{% if error %}
|
||||||
|
<div class="error">
|
||||||
|
<p>{{ error }}</p>
|
||||||
|
</div>
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
|
<form method="POST" action="/authorize/verify-code">
|
||||||
|
<!-- Pass through authorization parameters -->
|
||||||
|
<input type="hidden" name="domain" value="{{ domain }}">
|
||||||
|
<input type="hidden" name="client_id" value="{{ client_id }}">
|
||||||
|
<input type="hidden" name="redirect_uri" value="{{ redirect_uri }}">
|
||||||
|
<input type="hidden" name="response_type" value="{{ response_type }}">
|
||||||
|
<input type="hidden" name="state" value="{{ state }}">
|
||||||
|
<input type="hidden" name="code_challenge" value="{{ code_challenge }}">
|
||||||
|
<input type="hidden" name="code_challenge_method" value="{{ code_challenge_method }}">
|
||||||
|
<input type="hidden" name="scope" value="{{ scope }}">
|
||||||
|
<input type="hidden" name="me" value="{{ me }}">
|
||||||
|
|
||||||
|
<div class="form-group">
|
||||||
|
<label for="code">Verification Code:</label>
|
||||||
|
<input type="text"
|
||||||
|
id="code"
|
||||||
|
name="code"
|
||||||
|
placeholder="000000"
|
||||||
|
maxlength="6"
|
||||||
|
pattern="[0-9]{6}"
|
||||||
|
inputmode="numeric"
|
||||||
|
autocomplete="one-time-code"
|
||||||
|
required
|
||||||
|
autofocus>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<button type="submit">Verify</button>
|
||||||
|
</form>
|
||||||
|
|
||||||
|
<p class="help-text">
|
||||||
|
Did not receive a code? Check your spam folder.
|
||||||
|
<a href="/authorize?client_id={{ client_id }}&redirect_uri={{ redirect_uri }}&response_type={{ response_type }}&state={{ state }}&code_challenge={{ code_challenge }}&code_challenge_method={{ code_challenge_method }}&scope={{ scope }}&me={{ me }}">
|
||||||
|
Request a new code
|
||||||
|
</a>
|
||||||
|
</p>
|
||||||
|
{% endblock %}
|
||||||
532
tests/integration/api/test_authorization_verification.py
Normal file
532
tests/integration/api/test_authorization_verification.py
Normal file
@@ -0,0 +1,532 @@
|
|||||||
|
"""
|
||||||
|
Integration tests for authorization endpoint domain verification.
|
||||||
|
|
||||||
|
Tests the security fix that requires domain verification before showing the consent page.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
from unittest.mock import AsyncMock, Mock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def valid_auth_params():
|
||||||
|
"""Valid authorization request parameters."""
|
||||||
|
return {
|
||||||
|
"client_id": "https://app.example.com",
|
||||||
|
"redirect_uri": "https://app.example.com/callback",
|
||||||
|
"response_type": "code",
|
||||||
|
"state": "test123",
|
||||||
|
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
|
||||||
|
"code_challenge_method": "S256",
|
||||||
|
"me": "https://user.example.com",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def create_mock_verification_service(start_success=True, verify_success=True, start_error="dns_verification_failed"):
|
||||||
|
"""Create a mock verification service with configurable behavior."""
|
||||||
|
mock_service = Mock()
|
||||||
|
|
||||||
|
if start_success:
|
||||||
|
mock_service.start_verification.return_value = {
|
||||||
|
"success": True,
|
||||||
|
"email": "t***@example.com",
|
||||||
|
"verification_method": "email"
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
mock_service.start_verification.return_value = {
|
||||||
|
"success": False,
|
||||||
|
"error": start_error
|
||||||
|
}
|
||||||
|
|
||||||
|
if verify_success:
|
||||||
|
mock_service.verify_email_code.return_value = {
|
||||||
|
"success": True,
|
||||||
|
"email": "test@example.com"
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
mock_service.verify_email_code.return_value = {
|
||||||
|
"success": False,
|
||||||
|
"error": "invalid_code"
|
||||||
|
}
|
||||||
|
|
||||||
|
mock_service.code_storage = Mock()
|
||||||
|
mock_service.code_storage.get.return_value = "test@example.com"
|
||||||
|
mock_service.create_authorization_code.return_value = "test_auth_code_12345"
|
||||||
|
|
||||||
|
return mock_service
|
||||||
|
|
||||||
|
|
||||||
|
def create_mock_happ_parser():
|
||||||
|
"""Create a mock h-app parser."""
|
||||||
|
from gondulf.services.happ_parser import ClientMetadata
|
||||||
|
|
||||||
|
mock_parser = Mock()
|
||||||
|
mock_parser.fetch_and_parse = AsyncMock(return_value=ClientMetadata(
|
||||||
|
name="Test Application",
|
||||||
|
url="https://app.example.com",
|
||||||
|
logo="https://app.example.com/logo.png"
|
||||||
|
))
|
||||||
|
return mock_parser
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def configured_app(monkeypatch, tmp_path):
|
||||||
|
"""Create a fully configured app with fresh database."""
|
||||||
|
db_path = tmp_path / "test.db"
|
||||||
|
|
||||||
|
monkeypatch.setenv("GONDULF_SECRET_KEY", "a" * 32)
|
||||||
|
monkeypatch.setenv("GONDULF_BASE_URL", "https://auth.example.com")
|
||||||
|
monkeypatch.setenv("GONDULF_DATABASE_URL", f"sqlite:///{db_path}")
|
||||||
|
monkeypatch.setenv("GONDULF_DEBUG", "true")
|
||||||
|
|
||||||
|
from gondulf.main import app
|
||||||
|
return app, db_path
|
||||||
|
|
||||||
|
|
||||||
|
class TestUnverifiedDomainTriggersVerification:
|
||||||
|
"""Tests that unverified domains trigger the verification flow."""
|
||||||
|
|
||||||
|
def test_unverified_domain_shows_verification_form(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test that an unverified domain shows the verification code form."""
|
||||||
|
app, _ = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||||
|
|
||||||
|
mock_service = create_mock_verification_service(start_success=True)
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.get("/authorize", params=valid_auth_params)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
# Should show verification form, not consent form
|
||||||
|
assert "Verify Your Identity" in response.text
|
||||||
|
assert "verification code" in response.text.lower()
|
||||||
|
# Should show masked email
|
||||||
|
assert "t***@example.com" in response.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
def test_unverified_domain_preserves_auth_params(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test that authorization parameters are preserved in verification form."""
|
||||||
|
app, _ = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||||
|
|
||||||
|
mock_service = create_mock_verification_service(start_success=True)
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.get("/authorize", params=valid_auth_params)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
# Check hidden fields contain auth params
|
||||||
|
assert 'name="client_id"' in response.text
|
||||||
|
assert 'name="redirect_uri"' in response.text
|
||||||
|
assert 'name="state"' in response.text
|
||||||
|
assert 'name="code_challenge"' in response.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
def test_unverified_domain_does_not_show_consent(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test that unverified domain does NOT show consent form directly."""
|
||||||
|
app, _ = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||||
|
|
||||||
|
mock_service = create_mock_verification_service(start_success=True)
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.get("/authorize", params=valid_auth_params)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
# Should NOT show consent/authorization form
|
||||||
|
assert "Authorization Request" not in response.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
|
||||||
|
class TestVerifiedDomainShowsConsent:
|
||||||
|
"""Tests that verified domains skip verification and show consent."""
|
||||||
|
|
||||||
|
def test_verified_domain_shows_consent_page(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test that a verified domain shows consent page directly."""
|
||||||
|
app, db_path = configured_app
|
||||||
|
from gondulf.dependencies import get_happ_parser, get_database
|
||||||
|
from gondulf.database.connection import Database
|
||||||
|
from sqlalchemy import text
|
||||||
|
|
||||||
|
# Create database and insert verified domain
|
||||||
|
db = Database(f"sqlite:///{db_path}")
|
||||||
|
db.initialize()
|
||||||
|
|
||||||
|
with db.get_engine().begin() as conn:
|
||||||
|
conn.execute(
|
||||||
|
text("""
|
||||||
|
INSERT INTO domains (domain, email, verification_code, verified, verified_at, two_factor)
|
||||||
|
VALUES (:domain, :email, '', 1, :verified_at, 1)
|
||||||
|
"""),
|
||||||
|
{"domain": "user.example.com", "email": "test@example.com", "verified_at": datetime.utcnow()}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Override database to use same instance
|
||||||
|
app.dependency_overrides[get_database] = lambda: db
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.get("/authorize", params=valid_auth_params)
|
||||||
|
|
||||||
|
# Should show consent page
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert "Authorization Request" in response.text
|
||||||
|
assert 'action="/authorize/consent"' in response.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
|
||||||
|
class TestVerificationCodeValidation:
|
||||||
|
"""Tests for the verification code submission endpoint."""
|
||||||
|
|
||||||
|
def test_valid_code_shows_consent(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test that valid verification code shows consent page."""
|
||||||
|
app, _ = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||||
|
|
||||||
|
mock_service = create_mock_verification_service(verify_success=True)
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
form_data = {
|
||||||
|
"domain": "user.example.com",
|
||||||
|
"code": "123456",
|
||||||
|
"client_id": valid_auth_params["client_id"],
|
||||||
|
"redirect_uri": valid_auth_params["redirect_uri"],
|
||||||
|
"response_type": valid_auth_params["response_type"],
|
||||||
|
"state": valid_auth_params["state"],
|
||||||
|
"code_challenge": valid_auth_params["code_challenge"],
|
||||||
|
"code_challenge_method": valid_auth_params["code_challenge_method"],
|
||||||
|
"scope": "",
|
||||||
|
"me": valid_auth_params["me"],
|
||||||
|
}
|
||||||
|
|
||||||
|
response = client.post("/authorize/verify-code", data=form_data)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
# Should show consent page after successful verification
|
||||||
|
assert "Authorization Request" in response.text or "Authorize" in response.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
def test_invalid_code_shows_error_with_retry(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test that invalid code shows error and allows retry."""
|
||||||
|
app, _ = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||||
|
|
||||||
|
mock_service = create_mock_verification_service(verify_success=False)
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
form_data = {
|
||||||
|
"domain": "user.example.com",
|
||||||
|
"code": "000000",
|
||||||
|
"client_id": valid_auth_params["client_id"],
|
||||||
|
"redirect_uri": valid_auth_params["redirect_uri"],
|
||||||
|
"response_type": valid_auth_params["response_type"],
|
||||||
|
"state": valid_auth_params["state"],
|
||||||
|
"code_challenge": valid_auth_params["code_challenge"],
|
||||||
|
"code_challenge_method": valid_auth_params["code_challenge_method"],
|
||||||
|
"scope": "",
|
||||||
|
"me": valid_auth_params["me"],
|
||||||
|
}
|
||||||
|
|
||||||
|
response = client.post("/authorize/verify-code", data=form_data)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
# Should show verify_code page with error
|
||||||
|
assert "Invalid verification code" in response.text or "invalid" in response.text.lower()
|
||||||
|
# Should still have the form for retry
|
||||||
|
assert 'name="code"' in response.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
|
||||||
|
class TestDNSFailureHandling:
|
||||||
|
"""Tests for DNS verification failure scenarios."""
|
||||||
|
|
||||||
|
def test_dns_failure_shows_instructions(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test that DNS verification failure shows helpful instructions."""
|
||||||
|
app, db_path = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
|
||||||
|
from gondulf.database.connection import Database
|
||||||
|
from sqlalchemy import text
|
||||||
|
|
||||||
|
# Clear any pre-existing verified domain to ensure test isolation
|
||||||
|
db = Database(f"sqlite:///{db_path}")
|
||||||
|
db.initialize()
|
||||||
|
with db.get_engine().begin() as conn:
|
||||||
|
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
|
||||||
|
|
||||||
|
app.dependency_overrides[get_database] = lambda: db
|
||||||
|
|
||||||
|
mock_service = create_mock_verification_service(start_success=False, start_error="dns_verification_failed")
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.get("/authorize", params=valid_auth_params)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
# Should show error page with DNS instructions
|
||||||
|
assert "DNS" in response.text or "dns" in response.text.lower()
|
||||||
|
assert "TXT" in response.text
|
||||||
|
assert "_gondulf" in response.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
|
||||||
|
class TestEmailFailureHandling:
|
||||||
|
"""Tests for email discovery failure scenarios."""
|
||||||
|
|
||||||
|
def test_email_discovery_failure_shows_instructions(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test that email discovery failure shows helpful instructions."""
|
||||||
|
app, db_path = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
|
||||||
|
from gondulf.database.connection import Database
|
||||||
|
from sqlalchemy import text
|
||||||
|
|
||||||
|
# Clear any pre-existing verified domain to ensure test isolation
|
||||||
|
db = Database(f"sqlite:///{db_path}")
|
||||||
|
db.initialize()
|
||||||
|
with db.get_engine().begin() as conn:
|
||||||
|
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
|
||||||
|
|
||||||
|
app.dependency_overrides[get_database] = lambda: db
|
||||||
|
|
||||||
|
mock_service = create_mock_verification_service(start_success=False, start_error="email_discovery_failed")
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.get("/authorize", params=valid_auth_params)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
# Should show error page with email instructions
|
||||||
|
assert "email" in response.text.lower()
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
|
||||||
|
class TestFullVerificationFlow:
|
||||||
|
"""Integration tests for the complete verification flow."""
|
||||||
|
|
||||||
|
def test_full_flow_new_domain(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test complete flow: unverified domain -> verify code -> consent."""
|
||||||
|
app, db_path = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
|
||||||
|
from gondulf.database.connection import Database
|
||||||
|
from sqlalchemy import text
|
||||||
|
|
||||||
|
# Clear any pre-existing verified domain to ensure test isolation
|
||||||
|
db = Database(f"sqlite:///{db_path}")
|
||||||
|
db.initialize()
|
||||||
|
with db.get_engine().begin() as conn:
|
||||||
|
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
|
||||||
|
|
||||||
|
app.dependency_overrides[get_database] = lambda: db
|
||||||
|
|
||||||
|
mock_service = create_mock_verification_service(start_success=True, verify_success=True)
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
# Step 1: GET /authorize -> should show verification form
|
||||||
|
response1 = client.get("/authorize", params=valid_auth_params)
|
||||||
|
|
||||||
|
assert response1.status_code == 200
|
||||||
|
assert "Verify Your Identity" in response1.text
|
||||||
|
|
||||||
|
# Step 2: POST /authorize/verify-code -> should show consent
|
||||||
|
form_data = {
|
||||||
|
"domain": "user.example.com",
|
||||||
|
"code": "123456",
|
||||||
|
"client_id": valid_auth_params["client_id"],
|
||||||
|
"redirect_uri": valid_auth_params["redirect_uri"],
|
||||||
|
"response_type": valid_auth_params["response_type"],
|
||||||
|
"state": valid_auth_params["state"],
|
||||||
|
"code_challenge": valid_auth_params["code_challenge"],
|
||||||
|
"code_challenge_method": valid_auth_params["code_challenge_method"],
|
||||||
|
"scope": "",
|
||||||
|
"me": valid_auth_params["me"],
|
||||||
|
}
|
||||||
|
|
||||||
|
response2 = client.post("/authorize/verify-code", data=form_data)
|
||||||
|
|
||||||
|
assert response2.status_code == 200
|
||||||
|
# Should show consent page
|
||||||
|
assert "Authorization Request" in response2.text or "Authorize" in response2.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
def test_verification_code_retry_with_correct_code(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test that user can retry with correct code after failure."""
|
||||||
|
app, _ = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||||
|
|
||||||
|
mock_service = Mock()
|
||||||
|
# First verify_email_code call fails, second succeeds
|
||||||
|
mock_service.verify_email_code.side_effect = [
|
||||||
|
{"success": False, "error": "invalid_code"},
|
||||||
|
{"success": True, "email": "test@example.com"}
|
||||||
|
]
|
||||||
|
mock_service.code_storage = Mock()
|
||||||
|
mock_service.code_storage.get.return_value = "test@example.com"
|
||||||
|
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
form_data = {
|
||||||
|
"domain": "user.example.com",
|
||||||
|
"code": "000000", # Wrong code
|
||||||
|
"client_id": valid_auth_params["client_id"],
|
||||||
|
"redirect_uri": valid_auth_params["redirect_uri"],
|
||||||
|
"response_type": valid_auth_params["response_type"],
|
||||||
|
"state": valid_auth_params["state"],
|
||||||
|
"code_challenge": valid_auth_params["code_challenge"],
|
||||||
|
"code_challenge_method": valid_auth_params["code_challenge_method"],
|
||||||
|
"scope": "",
|
||||||
|
"me": valid_auth_params["me"],
|
||||||
|
}
|
||||||
|
|
||||||
|
# First attempt with wrong code
|
||||||
|
response1 = client.post("/authorize/verify-code", data=form_data)
|
||||||
|
assert response1.status_code == 200
|
||||||
|
assert "Invalid" in response1.text or "invalid" in response1.text.lower()
|
||||||
|
|
||||||
|
# Second attempt with correct code
|
||||||
|
form_data["code"] = "123456"
|
||||||
|
response2 = client.post("/authorize/verify-code", data=form_data)
|
||||||
|
assert response2.status_code == 200
|
||||||
|
assert "Authorization Request" in response2.text or "Authorize" in response2.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
|
||||||
|
class TestSecurityRequirements:
|
||||||
|
"""Tests for security requirements of the fix."""
|
||||||
|
|
||||||
|
def test_unverified_domain_never_sees_consent_directly(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Critical: Unverified domains must NEVER see consent page directly."""
|
||||||
|
app, db_path = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
|
||||||
|
from gondulf.database.connection import Database
|
||||||
|
from sqlalchemy import text
|
||||||
|
|
||||||
|
# Clear any pre-existing verified domain to ensure test isolation
|
||||||
|
db = Database(f"sqlite:///{db_path}")
|
||||||
|
db.initialize()
|
||||||
|
with db.get_engine().begin() as conn:
|
||||||
|
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
|
||||||
|
|
||||||
|
app.dependency_overrides[get_database] = lambda: db
|
||||||
|
|
||||||
|
mock_service = create_mock_verification_service(start_success=True)
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.get("/authorize", params=valid_auth_params)
|
||||||
|
|
||||||
|
# The consent page should NOT be shown
|
||||||
|
assert "Authorization Request" not in response.text
|
||||||
|
# Verify code page should be shown instead
|
||||||
|
assert "Verify Your Identity" in response.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
def test_state_parameter_preserved_through_flow(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test that state parameter is preserved through verification flow."""
|
||||||
|
app, _ = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||||
|
|
||||||
|
mock_service = create_mock_verification_service(start_success=True)
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
unique_state = "unique_state_abc123xyz"
|
||||||
|
params = valid_auth_params.copy()
|
||||||
|
params["state"] = unique_state
|
||||||
|
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.get("/authorize", params=params)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
# State should be in hidden form field
|
||||||
|
assert f'value="{unique_state}"' in response.text or f"value='{unique_state}'" in response.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
Reference in New Issue
Block a user