3 Commits

Author SHA1 Message Date
9b50f359a6 test(auth): fix test isolation for verification tests
Clear pre-existing verified domains at the start of each test that expects
an unverified domain to ensure proper test isolation. This prevents test
failures caused by verified domains persisting from earlier tests in the
test session.

Fixes:
- test_dns_failure_shows_instructions
- test_email_discovery_failure_shows_instructions
- test_full_flow_new_domain
- test_unverified_domain_never_sees_consent_directly

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 12:51:00 -07:00
8dddc73826 fix(security): require domain verification before authorization
CRITICAL SECURITY FIX: The authorization endpoint was bypassing domain
verification entirely, allowing anyone to authenticate as any domain.

Changes:
- Add domain verification check in GET /authorize before showing consent
- Add POST /authorize/verify-code endpoint for code validation
- Add verify_code.html and verification_error.html templates
- Add check_domain_verified() and store_verified_domain() functions
- Preserve OAuth parameters through verification flow

Flow for unverified domains:
1. GET /authorize -> Check DB for verified domain
2. If not verified: start 2FA (DNS + email) -> show code entry form
3. POST /authorize/verify-code -> validate code -> store verified
4. Show consent page
5. POST /authorize/consent -> issue authorization code

Verified domains skip directly to consent page.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 12:45:59 -07:00
052d3ad3e1 feat(auth): implement response_type=id authentication flow
Implements both IndieAuth flows per W3C specification:
- Authentication flow (response_type=id): Code redeemed at authorization endpoint, returns only user identity
- Authorization flow (response_type=code): Code redeemed at token endpoint, returns access token

Changes:
- Authorization endpoint GET: Accept response_type=id (default) and code
- Authorization endpoint POST: Handle code verification for authentication flow
- Token endpoint: Validate response_type=code for authorization flow
- Store response_type in authorization code metadata
- Update metadata endpoint: response_types_supported=[code, id], code_challenge_methods_supported=[S256]

The default behavior now correctly defaults to response_type=id when omitted, per IndieAuth spec section 5.2.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 12:23:20 -07:00
14 changed files with 1549 additions and 36 deletions

View File

@@ -1,17 +1,28 @@
"""Authorization endpoint for OAuth 2.0 / IndieAuth authorization code flow.""" """Authorization endpoint for OAuth 2.0 / IndieAuth authorization code flow.
Supports both IndieAuth flows per W3C specification:
- Authentication (response_type=id): Returns user identity only, code redeemed at authorization endpoint
- Authorization (response_type=code): Returns access token, code redeemed at token endpoint
"""
import logging import logging
from datetime import datetime
from typing import Optional
from urllib.parse import urlencode from urllib.parse import urlencode
from fastapi import APIRouter, Depends, Form, Request from fastapi import APIRouter, Depends, Form, Request, Response
from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from sqlalchemy import text
from gondulf.database.connection import Database from gondulf.database.connection import Database
from gondulf.dependencies import get_database, get_happ_parser, get_verification_service from gondulf.dependencies import get_code_storage, get_database, get_happ_parser, get_verification_service
from gondulf.services.domain_verification import DomainVerificationService from gondulf.services.domain_verification import DomainVerificationService
from gondulf.services.happ_parser import HAppParser from gondulf.services.happ_parser import HAppParser
from gondulf.storage import CodeStore
from gondulf.utils.validation import ( from gondulf.utils.validation import (
extract_domain_from_url, extract_domain_from_url,
mask_email,
normalize_client_id, normalize_client_id,
validate_redirect_uri, validate_redirect_uri,
) )
@@ -21,6 +32,76 @@ logger = logging.getLogger("gondulf.authorization")
router = APIRouter() router = APIRouter()
templates = Jinja2Templates(directory="src/gondulf/templates") templates = Jinja2Templates(directory="src/gondulf/templates")
# Valid response types per IndieAuth spec
VALID_RESPONSE_TYPES = {"id", "code"}
class AuthenticationResponse(BaseModel):
"""
IndieAuth authentication response (response_type=id flow).
Per W3C IndieAuth specification Section 5.3.3:
https://www.w3.org/TR/indieauth/#authentication-response
"""
me: str
async def check_domain_verified(database: Database, domain: str) -> bool:
"""
Check if domain is verified in the database.
Args:
database: Database service
domain: Domain to check (e.g., "example.com")
Returns:
True if domain is verified, False otherwise
"""
try:
engine = database.get_engine()
with engine.connect() as conn:
result = conn.execute(
text("SELECT verified FROM domains WHERE domain = :domain AND verified = 1"),
{"domain": domain}
)
row = result.fetchone()
return row is not None
except Exception as e:
logger.error(f"Failed to check domain verification: {e}")
return False
async def store_verified_domain(database: Database, domain: str, email: str) -> None:
"""
Store verified domain in database.
Args:
database: Database service
domain: Verified domain
email: Email used for verification (for audit)
"""
try:
engine = database.get_engine()
now = datetime.utcnow()
with engine.begin() as conn:
# Use INSERT OR REPLACE for SQLite
conn.execute(
text("""
INSERT OR REPLACE INTO domains
(domain, email, verification_code, verified, verified_at, two_factor)
VALUES (:domain, :email, '', 1, :verified_at, 1)
"""),
{
"domain": domain,
"email": email,
"verified_at": now
}
)
logger.info(f"Stored verified domain: {domain}")
except Exception as e:
logger.error(f"Failed to store verified domain: {e}")
raise
@router.get("/authorize") @router.get("/authorize")
async def authorize_get( async def authorize_get(
@@ -34,7 +115,8 @@ async def authorize_get(
scope: str | None = None, scope: str | None = None,
me: str | None = None, me: str | None = None,
database: Database = Depends(get_database), database: Database = Depends(get_database),
happ_parser: HAppParser = Depends(get_happ_parser) happ_parser: HAppParser = Depends(get_happ_parser),
verification_service: DomainVerificationService = Depends(get_verification_service)
) -> HTMLResponse: ) -> HTMLResponse:
""" """
Handle authorization request (GET). Handle authorization request (GET).
@@ -42,20 +124,26 @@ async def authorize_get(
Validates client_id, redirect_uri, and required parameters. Validates client_id, redirect_uri, and required parameters.
Shows consent form if domain is verified, or verification form if not. Shows consent form if domain is verified, or verification form if not.
Supports two IndieAuth flows per W3C specification:
- response_type=id (default): Authentication only, returns user identity
- response_type=code: Authorization, returns access token
Args: Args:
request: FastAPI request object request: FastAPI request object
client_id: Client application identifier client_id: Client application identifier
redirect_uri: Callback URI for client redirect_uri: Callback URI for client
response_type: Must be "code" response_type: "id" (default) for authentication, "code" for authorization
state: Client state parameter state: Client state parameter
code_challenge: PKCE code challenge code_challenge: PKCE code challenge
code_challenge_method: PKCE method (S256) code_challenge_method: PKCE method (S256)
scope: Requested scope scope: Requested scope (only meaningful for response_type=code)
me: User identity URL me: User identity URL
database: Database service database: Database service
happ_parser: H-app parser for client metadata
verification_service: Domain verification service
Returns: Returns:
HTML response with consent form or error page HTML response with consent form, verification form, or error page
""" """
# Validate required parameters (pre-client validation) # Validate required parameters (pre-client validation)
if not client_id: if not client_id:
@@ -108,11 +196,13 @@ async def authorize_get(
# From here on, redirect errors to client via OAuth error redirect # From here on, redirect errors to client via OAuth error redirect
# Validate response_type # Validate response_type - default to "id" if not provided (per IndieAuth spec)
if response_type != "code": effective_response_type = response_type or "id"
if effective_response_type not in VALID_RESPONSE_TYPES:
error_params = { error_params = {
"error": "unsupported_response_type", "error": "unsupported_response_type",
"error_description": "Only response_type=code is supported", "error_description": f"response_type must be 'id' or 'code', got '{response_type}'",
"state": state or "" "state": state or ""
} }
redirect_url = f"{redirect_uri}?{urlencode(error_params)}" redirect_url = f"{redirect_uri}?{urlencode(error_params)}"
@@ -148,9 +238,9 @@ async def authorize_get(
redirect_url = f"{redirect_uri}?{urlencode(error_params)}" redirect_url = f"{redirect_uri}?{urlencode(error_params)}"
return RedirectResponse(url=redirect_url, status_code=302) return RedirectResponse(url=redirect_url, status_code=302)
# Validate me URL format # Validate me URL format and extract domain
try: try:
extract_domain_from_url(me) domain = extract_domain_from_url(me)
except ValueError: except ValueError:
error_params = { error_params = {
"error": "invalid_request", "error": "invalid_request",
@@ -160,11 +250,71 @@ async def authorize_get(
redirect_url = f"{redirect_uri}?{urlencode(error_params)}" redirect_url = f"{redirect_uri}?{urlencode(error_params)}"
return RedirectResponse(url=redirect_url, status_code=302) return RedirectResponse(url=redirect_url, status_code=302)
# Check if domain is verified # SECURITY FIX: Check if domain is verified before showing consent
# For Phase 2, we'll show consent form immediately (domain verification happens separately) is_verified = await check_domain_verified(database, domain)
# In Phase 3, we'll check database for verified domains
if not is_verified:
logger.info(f"Domain {domain} not verified, starting verification")
# Start two-factor verification
result = verification_service.start_verification(domain, me)
if not result["success"]:
# Verification cannot start (DNS failed, no rel=me, etc)
error_message = result.get("error", "verification_failed")
# Map error codes to user-friendly messages
error_messages = {
"dns_verification_failed": "DNS verification failed. Please add the required TXT record.",
"email_discovery_failed": "Could not find an email address on your homepage. Please add a rel='me' link to your email.",
"invalid_email_format": "The email address discovered on your homepage is invalid.",
"email_send_failed": "Failed to send verification email. Please try again."
}
friendly_error = error_messages.get(error_message, error_message)
logger.warning(f"Verification start failed for domain={domain}: {error_message}")
return templates.TemplateResponse(
"verification_error.html",
{
"request": request,
"error": friendly_error,
"domain": domain,
"client_id": normalized_client_id,
"redirect_uri": redirect_uri,
"response_type": effective_response_type,
"state": state or "",
"code_challenge": code_challenge,
"code_challenge_method": code_challenge_method,
"scope": scope or "",
"me": me
},
status_code=200
)
# Verification started - show code entry form
logger.info(f"Verification code sent for domain={domain}")
return templates.TemplateResponse(
"verify_code.html",
{
"request": request,
"masked_email": result["email"],
"domain": domain,
"client_id": normalized_client_id,
"redirect_uri": redirect_uri,
"response_type": effective_response_type,
"state": state or "",
"code_challenge": code_challenge,
"code_challenge_method": code_challenge_method,
"scope": scope or "",
"me": me
}
)
# Domain is verified - fetch client metadata and show consent form
logger.info(f"Domain {domain} is verified, showing consent page")
# Fetch client metadata (h-app microformat)
client_metadata = None client_metadata = None
try: try:
client_metadata = await happ_parser.fetch_and_parse(normalized_client_id) client_metadata = await happ_parser.fetch_and_parse(normalized_client_id)
@@ -180,6 +330,7 @@ async def authorize_get(
"request": request, "request": request,
"client_id": normalized_client_id, "client_id": normalized_client_id,
"redirect_uri": redirect_uri, "redirect_uri": redirect_uri,
"response_type": effective_response_type,
"state": state or "", "state": state or "",
"code_challenge": code_challenge, "code_challenge": code_challenge,
"code_challenge_method": code_challenge_method, "code_challenge_method": code_challenge_method,
@@ -190,11 +341,124 @@ async def authorize_get(
) )
@router.post("/authorize/verify-code")
async def authorize_verify_code(
request: Request,
domain: str = Form(...),
code: str = Form(...),
client_id: str = Form(...),
redirect_uri: str = Form(...),
response_type: str = Form("id"),
state: str = Form(...),
code_challenge: str = Form(...),
code_challenge_method: str = Form(...),
scope: str = Form(""),
me: str = Form(...),
database: Database = Depends(get_database),
verification_service: DomainVerificationService = Depends(get_verification_service),
happ_parser: HAppParser = Depends(get_happ_parser)
) -> HTMLResponse:
"""
Handle verification code submission during authorization flow.
This endpoint is called when user submits the 6-digit email verification code.
On success, shows consent page. On failure, shows code entry form with error.
Args:
request: FastAPI request object
domain: Domain being verified
code: 6-digit verification code from email
client_id: Client application identifier
redirect_uri: Callback URI
response_type: "id" for authentication, "code" for authorization
state: Client state parameter
code_challenge: PKCE code challenge
code_challenge_method: PKCE method
scope: Requested scope
me: User identity URL
database: Database service
verification_service: Domain verification service
happ_parser: H-app parser for client metadata
Returns:
HTML response: consent page on success, code form with error on failure
"""
logger.info(f"Verification code submission for domain={domain}")
# Verify the code
result = verification_service.verify_email_code(domain, code)
if not result["success"]:
logger.warning(f"Verification code invalid for domain={domain}: {result.get('error')}")
# Get masked email for display
email = verification_service.code_storage.get(f"email_addr:{domain}")
masked = mask_email(email) if email else "unknown"
# Map error codes to user-friendly messages
error_messages = {
"invalid_code": "Invalid verification code. Please check and try again.",
"email_not_found": "Verification session expired. Please start over."
}
error_message = result.get("error", "invalid_code")
friendly_error = error_messages.get(error_message, error_message)
return templates.TemplateResponse(
"verify_code.html",
{
"request": request,
"error": friendly_error,
"masked_email": masked,
"domain": domain,
"client_id": client_id,
"redirect_uri": redirect_uri,
"response_type": response_type,
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": code_challenge_method,
"scope": scope,
"me": me
},
status_code=200
)
# Code valid - store verified domain in database
email = result.get("email", "")
await store_verified_domain(database, domain, email)
logger.info(f"Domain verified successfully: {domain}")
# Fetch client metadata for consent page
client_metadata = None
try:
client_metadata = await happ_parser.fetch_and_parse(client_id)
except Exception as e:
logger.warning(f"Failed to fetch client metadata: {e}")
# Show consent form
return templates.TemplateResponse(
"authorize.html",
{
"request": request,
"client_id": client_id,
"redirect_uri": redirect_uri,
"response_type": response_type,
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": code_challenge_method,
"scope": scope,
"me": me,
"client_metadata": client_metadata
}
)
@router.post("/authorize/consent") @router.post("/authorize/consent")
async def authorize_consent( async def authorize_consent(
request: Request, request: Request,
client_id: str = Form(...), client_id: str = Form(...),
redirect_uri: str = Form(...), redirect_uri: str = Form(...),
response_type: str = Form("id"), # Default to "id" for authentication flow
state: str = Form(...), state: str = Form(...),
code_challenge: str = Form(...), code_challenge: str = Form(...),
code_challenge_method: str = Form(...), code_challenge_method: str = Form(...),
@@ -211,6 +475,7 @@ async def authorize_consent(
request: FastAPI request object request: FastAPI request object
client_id: Client application identifier client_id: Client application identifier
redirect_uri: Callback URI redirect_uri: Callback URI
response_type: "id" for authentication, "code" for authorization
state: Client state state: Client state
code_challenge: PKCE challenge code_challenge: PKCE challenge
code_challenge_method: PKCE method code_challenge_method: PKCE method
@@ -221,9 +486,9 @@ async def authorize_consent(
Returns: Returns:
Redirect to client callback with authorization code Redirect to client callback with authorization code
""" """
logger.info(f"Authorization consent granted for client_id={client_id}") logger.info(f"Authorization consent granted for client_id={client_id} response_type={response_type}")
# Create authorization code # Create authorization code with response_type metadata
authorization_code = verification_service.create_authorization_code( authorization_code = verification_service.create_authorization_code(
client_id=client_id, client_id=client_id,
redirect_uri=redirect_uri, redirect_uri=redirect_uri,
@@ -231,7 +496,8 @@ async def authorize_consent(
code_challenge=code_challenge, code_challenge=code_challenge,
code_challenge_method=code_challenge_method, code_challenge_method=code_challenge_method,
scope=scope, scope=scope,
me=me me=me,
response_type=response_type
) )
# Build redirect URL with authorization code # Build redirect URL with authorization code
@@ -243,3 +509,161 @@ async def authorize_consent(
logger.info(f"Redirecting to {redirect_uri} with authorization code") logger.info(f"Redirecting to {redirect_uri} with authorization code")
return RedirectResponse(url=redirect_url, status_code=302) return RedirectResponse(url=redirect_url, status_code=302)
@router.post("/authorize")
async def authorize_post(
response: Response,
code: str = Form(...),
client_id: str = Form(...),
redirect_uri: Optional[str] = Form(None),
code_verifier: Optional[str] = Form(None),
code_storage: CodeStore = Depends(get_code_storage)
) -> JSONResponse:
"""
Handle authorization code verification for authentication flow (response_type=id).
Per W3C IndieAuth specification Section 5.3.3:
https://www.w3.org/TR/indieauth/#redeeming-the-authorization-code-id
This endpoint is used ONLY for the authentication flow (response_type=id).
For the authorization flow (response_type=code), clients must use the token endpoint.
Request (application/x-www-form-urlencoded):
code: Authorization code from /authorize redirect
client_id: Client application URL (must match original request)
redirect_uri: Original redirect URI (optional but recommended)
code_verifier: PKCE verifier (optional, for PKCE validation)
Response (200 OK):
{
"me": "https://user.example.com/"
}
Error Response (400 Bad Request):
{
"error": "invalid_grant",
"error_description": "..."
}
Returns:
JSONResponse with user identity or error
"""
# Set cache headers (OAuth 2.0 best practice)
response.headers["Cache-Control"] = "no-store"
response.headers["Pragma"] = "no-cache"
logger.info(f"Authorization code verification request from client: {client_id}")
# STEP 1: Retrieve authorization code from storage
storage_key = f"authz:{code}"
code_data = code_storage.get(storage_key)
if code_data is None:
logger.warning(f"Authorization code not found or expired: {code[:8]}...")
return JSONResponse(
status_code=400,
content={
"error": "invalid_grant",
"error_description": "Authorization code is invalid or has expired"
},
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
)
# Validate code_data is a dict
if not isinstance(code_data, dict):
logger.error(f"Authorization code metadata is not a dict: {type(code_data)}")
return JSONResponse(
status_code=400,
content={
"error": "invalid_grant",
"error_description": "Authorization code is malformed"
},
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
)
# STEP 2: Validate this code was issued for response_type=id
stored_response_type = code_data.get('response_type', 'id')
if stored_response_type != 'id':
logger.warning(
f"Code redemption at authorization endpoint for response_type={stored_response_type}"
)
return JSONResponse(
status_code=400,
content={
"error": "invalid_grant",
"error_description": "Authorization code must be redeemed at the token endpoint"
},
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
)
# STEP 3: Validate client_id matches
if code_data.get('client_id') != client_id:
logger.warning(
f"Client ID mismatch: expected {code_data.get('client_id')}, got {client_id}"
)
return JSONResponse(
status_code=400,
content={
"error": "invalid_client",
"error_description": "Client ID does not match authorization code"
},
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
)
# STEP 4: Validate redirect_uri if provided
if redirect_uri and code_data.get('redirect_uri') != redirect_uri:
logger.warning(
f"Redirect URI mismatch: expected {code_data.get('redirect_uri')}, got {redirect_uri}"
)
return JSONResponse(
status_code=400,
content={
"error": "invalid_grant",
"error_description": "Redirect URI does not match authorization request"
},
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
)
# STEP 5: Check if code already used (prevent replay)
if code_data.get('used'):
logger.warning(f"Authorization code replay detected: {code[:8]}...")
return JSONResponse(
status_code=400,
content={
"error": "invalid_grant",
"error_description": "Authorization code has already been used"
},
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
)
# STEP 6: Extract user identity
me = code_data.get('me')
if not me:
logger.error("Authorization code missing 'me' parameter")
return JSONResponse(
status_code=400,
content={
"error": "invalid_grant",
"error_description": "Authorization code is malformed"
},
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
)
# STEP 7: PKCE validation (optional for authentication flow)
if code_verifier:
logger.debug(f"PKCE code_verifier provided but not validated (v1.0.0)")
# v1.1.0 will validate: SHA256(code_verifier) == code_challenge
# STEP 8: Delete authorization code (single-use enforcement)
code_storage.delete(storage_key)
logger.info(f"Authorization code verified and deleted: {code[:8]}...")
# STEP 9: Return authentication response with user identity
logger.info(f"Authentication successful for {me} (client: {client_id})")
return JSONResponse(
status_code=200,
content={"me": me},
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
)

View File

@@ -29,9 +29,9 @@ async def get_metadata(config: Config = Depends(get_config)) -> Response:
"issuer": config.BASE_URL, "issuer": config.BASE_URL,
"authorization_endpoint": f"{config.BASE_URL}/authorize", "authorization_endpoint": f"{config.BASE_URL}/authorize",
"token_endpoint": f"{config.BASE_URL}/token", "token_endpoint": f"{config.BASE_URL}/token",
"response_types_supported": ["code"], "response_types_supported": ["code", "id"],
"grant_types_supported": ["authorization_code"], "grant_types_supported": ["authorization_code"],
"code_challenge_methods_supported": [], "code_challenge_methods_supported": ["S256"],
"token_endpoint_auth_methods_supported": ["none"], "token_endpoint_auth_methods_supported": ["none"],
"revocation_endpoint_auth_methods_supported": ["none"], "revocation_endpoint_auth_methods_supported": ["none"],
"scopes_supported": [] "scopes_supported": []

View File

@@ -156,6 +156,21 @@ async def token_exchange(
} }
) )
# STEP 4.5: Validate this code was issued for response_type=code
# Codes with response_type=id must be redeemed at the authorization endpoint
stored_response_type = code_data.get('response_type', 'id')
if stored_response_type != 'code':
logger.warning(
f"Code redemption at token endpoint for response_type={stored_response_type}"
)
raise HTTPException(
status_code=400,
detail={
"error": "invalid_grant",
"error_description": "Authorization code must be redeemed at the authorization endpoint"
}
)
# STEP 5: Check if code already used (prevent replay) # STEP 5: Check if code already used (prevent replay)
if code_data.get('used'): if code_data.get('used'):
logger.error(f"Authorization code replay detected: {code[:8]}...") logger.error(f"Authorization code replay detected: {code[:8]}...")

View File

@@ -212,7 +212,8 @@ class DomainVerificationService:
code_challenge: str, code_challenge: str,
code_challenge_method: str, code_challenge_method: str,
scope: str, scope: str,
me: str me: str,
response_type: str = "id"
) -> str: ) -> str:
""" """
Create authorization code with metadata. Create authorization code with metadata.
@@ -225,6 +226,7 @@ class DomainVerificationService:
code_challenge_method: PKCE method (S256) code_challenge_method: PKCE method (S256)
scope: Requested scope scope: Requested scope
me: Verified user identity me: Verified user identity
response_type: "id" for authentication, "code" for authorization
Returns: Returns:
Authorization code Authorization code
@@ -232,7 +234,7 @@ class DomainVerificationService:
# Generate authorization code # Generate authorization code
authorization_code = self._generate_authorization_code() authorization_code = self._generate_authorization_code()
# Create metadata # Create metadata including response_type for flow determination during redemption
metadata = { metadata = {
"client_id": client_id, "client_id": client_id,
"redirect_uri": redirect_uri, "redirect_uri": redirect_uri,
@@ -241,6 +243,7 @@ class DomainVerificationService:
"code_challenge_method": code_challenge_method, "code_challenge_method": code_challenge_method,
"scope": scope, "scope": scope,
"me": me, "me": me,
"response_type": response_type,
"created_at": int(time.time()), "created_at": int(time.time()),
"expires_at": int(time.time()) + 600, "expires_at": int(time.time()) + 600,
"used": False "used": False

View File

@@ -36,6 +36,7 @@
<form method="POST" action="/authorize/consent"> <form method="POST" action="/authorize/consent">
<input type="hidden" name="client_id" value="{{ client_id }}"> <input type="hidden" name="client_id" value="{{ client_id }}">
<input type="hidden" name="redirect_uri" value="{{ redirect_uri }}"> <input type="hidden" name="redirect_uri" value="{{ redirect_uri }}">
<input type="hidden" name="response_type" value="{{ response_type }}">
<input type="hidden" name="state" value="{{ state }}"> <input type="hidden" name="state" value="{{ state }}">
<input type="hidden" name="code_challenge" value="{{ code_challenge }}"> <input type="hidden" name="code_challenge" value="{{ code_challenge }}">
<input type="hidden" name="code_challenge_method" value="{{ code_challenge_method }}"> <input type="hidden" name="code_challenge_method" value="{{ code_challenge_method }}">

View File

@@ -0,0 +1,40 @@
{% extends "base.html" %}
{% block title %}Verification Failed - Gondulf{% endblock %}
{% block content %}
<h1>Verification Failed</h1>
<div class="error">
<p>{{ error }}</p>
</div>
{% if "DNS" in error or "dns" in error %}
<div class="instructions">
<h2>How to Fix</h2>
<p>Add the following DNS TXT record to your domain:</p>
<code>
Type: TXT<br>
Name: _gondulf.{{ domain }}<br>
Value: gondulf-verify-domain
</code>
<p>DNS changes may take up to 24 hours to propagate.</p>
</div>
{% endif %}
{% if "email" in error.lower() or "rel" in error.lower() %}
<div class="instructions">
<h2>How to Fix</h2>
<p>Add a rel="me" link to your homepage pointing to your email:</p>
<code>&lt;link rel="me" href="mailto:you@example.com"&gt;</code>
<p>Or as an anchor tag:</p>
<code>&lt;a rel="me" href="mailto:you@example.com"&gt;Email me&lt;/a&gt;</code>
</div>
{% endif %}
<p>
<a href="/authorize?client_id={{ client_id }}&redirect_uri={{ redirect_uri }}&response_type={{ response_type }}&state={{ state }}&code_challenge={{ code_challenge }}&code_challenge_method={{ code_challenge_method }}&scope={{ scope }}&me={{ me }}">
Try Again
</a>
</p>
{% endblock %}

View File

@@ -0,0 +1,51 @@
{% extends "base.html" %}
{% block title %}Verify Your Identity - Gondulf{% endblock %}
{% block content %}
<h1>Verify Your Identity</h1>
<p>To sign in as <strong>{{ domain }}</strong>, please enter the verification code sent to <strong>{{ masked_email }}</strong>.</p>
{% if error %}
<div class="error">
<p>{{ error }}</p>
</div>
{% endif %}
<form method="POST" action="/authorize/verify-code">
<!-- Pass through authorization parameters -->
<input type="hidden" name="domain" value="{{ domain }}">
<input type="hidden" name="client_id" value="{{ client_id }}">
<input type="hidden" name="redirect_uri" value="{{ redirect_uri }}">
<input type="hidden" name="response_type" value="{{ response_type }}">
<input type="hidden" name="state" value="{{ state }}">
<input type="hidden" name="code_challenge" value="{{ code_challenge }}">
<input type="hidden" name="code_challenge_method" value="{{ code_challenge_method }}">
<input type="hidden" name="scope" value="{{ scope }}">
<input type="hidden" name="me" value="{{ me }}">
<div class="form-group">
<label for="code">Verification Code:</label>
<input type="text"
id="code"
name="code"
placeholder="000000"
maxlength="6"
pattern="[0-9]{6}"
inputmode="numeric"
autocomplete="one-time-code"
required
autofocus>
</div>
<button type="submit">Verify</button>
</form>
<p class="help-text">
Did not receive a code? Check your spam folder.
<a href="/authorize?client_id={{ client_id }}&redirect_uri={{ redirect_uri }}&response_type={{ response_type }}&state={{ state }}&code_challenge={{ code_challenge }}&code_challenge_method={{ code_challenge_method }}&scope={{ scope }}&me={{ me }}">
Request a new code
</a>
</p>
{% endblock %}

View File

@@ -131,7 +131,7 @@ def test_code_storage():
@pytest.fixture @pytest.fixture
def valid_auth_code(test_code_storage) -> tuple[str, dict]: def valid_auth_code(test_code_storage) -> tuple[str, dict]:
""" """
Create a valid authorization code with metadata. Create a valid authorization code with metadata (authorization flow).
Args: Args:
test_code_storage: Code storage fixture test_code_storage: Code storage fixture
@@ -143,6 +143,7 @@ def valid_auth_code(test_code_storage) -> tuple[str, dict]:
metadata = { metadata = {
"client_id": "https://client.example.com", "client_id": "https://client.example.com",
"redirect_uri": "https://client.example.com/callback", "redirect_uri": "https://client.example.com/callback",
"response_type": "code", # Authorization flow - exchange at token endpoint
"state": "xyz123", "state": "xyz123",
"me": "https://user.example.com", "me": "https://user.example.com",
"scope": "", "scope": "",
@@ -159,7 +160,7 @@ def valid_auth_code(test_code_storage) -> tuple[str, dict]:
@pytest.fixture @pytest.fixture
def expired_auth_code(test_code_storage) -> tuple[str, dict]: def expired_auth_code(test_code_storage) -> tuple[str, dict]:
""" """
Create an expired authorization code. Create an expired authorization code (authorization flow).
Returns: Returns:
Tuple of (code, metadata) where the code is expired Tuple of (code, metadata) where the code is expired
@@ -169,6 +170,7 @@ def expired_auth_code(test_code_storage) -> tuple[str, dict]:
metadata = { metadata = {
"client_id": "https://client.example.com", "client_id": "https://client.example.com",
"redirect_uri": "https://client.example.com/callback", "redirect_uri": "https://client.example.com/callback",
"response_type": "code", # Authorization flow
"state": "xyz123", "state": "xyz123",
"me": "https://user.example.com", "me": "https://user.example.com",
"scope": "", "scope": "",
@@ -186,7 +188,7 @@ def expired_auth_code(test_code_storage) -> tuple[str, dict]:
@pytest.fixture @pytest.fixture
def used_auth_code(test_code_storage) -> tuple[str, dict]: def used_auth_code(test_code_storage) -> tuple[str, dict]:
""" """
Create an already-used authorization code. Create an already-used authorization code (authorization flow).
Returns: Returns:
Tuple of (code, metadata) where the code is marked as used Tuple of (code, metadata) where the code is marked as used
@@ -195,6 +197,7 @@ def used_auth_code(test_code_storage) -> tuple[str, dict]:
metadata = { metadata = {
"client_id": "https://client.example.com", "client_id": "https://client.example.com",
"redirect_uri": "https://client.example.com/callback", "redirect_uri": "https://client.example.com/callback",
"response_type": "code", # Authorization flow
"state": "xyz123", "state": "xyz123",
"me": "https://user.example.com", "me": "https://user.example.com",
"scope": "", "scope": "",
@@ -474,13 +477,13 @@ def malicious_client() -> dict[str, Any]:
@pytest.fixture @pytest.fixture
def valid_auth_request() -> dict[str, str]: def valid_auth_request() -> dict[str, str]:
""" """
Complete valid authorization request parameters. Complete valid authorization request parameters (for authorization flow).
Returns: Returns:
Dict with all required authorization parameters Dict with all required authorization parameters
""" """
return { return {
"response_type": "code", "response_type": "code", # Authorization flow - exchange at token endpoint
"client_id": "https://app.example.com", "client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback", "redirect_uri": "https://app.example.com/callback",
"state": "random_state_12345", "state": "random_state_12345",

View File

@@ -76,6 +76,7 @@ class TestCompleteAuthorizationFlow:
consent_data = { consent_data = {
"client_id": "https://app.example.com", "client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback", "redirect_uri": "https://app.example.com/callback",
"response_type": "code", # Authorization flow - exchange at token endpoint
"state": "e2e_test_state_12345", "state": "e2e_test_state_12345",
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM", "code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
"code_challenge_method": "S256", "code_challenge_method": "S256",
@@ -139,6 +140,7 @@ class TestCompleteAuthorizationFlow:
data={ data={
"client_id": "https://app.example.com", "client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback", "redirect_uri": "https://app.example.com/callback",
"response_type": "code", # For state preservation test
"state": state, "state": state,
"code_challenge": "abc123", "code_challenge": "abc123",
"code_challenge_method": "S256", "code_challenge_method": "S256",
@@ -163,6 +165,7 @@ class TestCompleteAuthorizationFlow:
data={ data={
"client_id": "https://app.example.com", "client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback", "redirect_uri": "https://app.example.com/callback",
"response_type": "code", # Authorization flow - exchange at token endpoint
"state": f"flow_{i}", "state": f"flow_{i}",
"code_challenge": "abc123", "code_challenge": "abc123",
"code_challenge_method": "S256", "code_challenge_method": "S256",
@@ -217,6 +220,7 @@ class TestErrorScenariosE2E:
metadata = { metadata = {
"client_id": "https://app.example.com", "client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback", "redirect_uri": "https://app.example.com/callback",
"response_type": "code", # Authorization flow - exchange at token endpoint
"state": "test", "state": "test",
"me": "https://user.example.com", "me": "https://user.example.com",
"scope": "", "scope": "",
@@ -255,6 +259,7 @@ class TestErrorScenariosE2E:
data={ data={
"client_id": "https://app.example.com", "client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback", "redirect_uri": "https://app.example.com/callback",
"response_type": "code", # Authorization flow - exchange at token endpoint
"state": "test", "state": "test",
"code_challenge": "abc123", "code_challenge": "abc123",
"code_challenge_method": "S256", "code_challenge_method": "S256",
@@ -292,6 +297,7 @@ class TestErrorScenariosE2E:
data={ data={
"client_id": "https://app.example.com", "client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback", "redirect_uri": "https://app.example.com/callback",
"response_type": "code", # Authorization flow - exchange at token endpoint
"state": "test", "state": "test",
"code_challenge": "abc123", "code_challenge": "abc123",
"code_challenge_method": "S256", "code_challenge_method": "S256",
@@ -327,6 +333,7 @@ class TestTokenUsageE2E:
data={ data={
"client_id": "https://app.example.com", "client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback", "redirect_uri": "https://app.example.com/callback",
"response_type": "code", # Authorization flow - exchange at token endpoint
"state": "test", "state": "test",
"code_challenge": "abc123", "code_challenge": "abc123",
"code_challenge_method": "S256", "code_challenge_method": "S256",
@@ -362,6 +369,7 @@ class TestTokenUsageE2E:
data={ data={
"client_id": "https://app.example.com", "client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback", "redirect_uri": "https://app.example.com/callback",
"response_type": "code", # Authorization flow - exchange at token endpoint
"state": "test", "state": "test",
"code_challenge": "abc123", "code_challenge": "abc123",
"code_challenge_method": "S256", "code_challenge_method": "S256",

View File

@@ -0,0 +1,532 @@
"""
Integration tests for authorization endpoint domain verification.
Tests the security fix that requires domain verification before showing the consent page.
"""
from datetime import datetime
from unittest.mock import AsyncMock, Mock, patch
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def valid_auth_params():
"""Valid authorization request parameters."""
return {
"client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback",
"response_type": "code",
"state": "test123",
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
"code_challenge_method": "S256",
"me": "https://user.example.com",
}
def create_mock_verification_service(start_success=True, verify_success=True, start_error="dns_verification_failed"):
"""Create a mock verification service with configurable behavior."""
mock_service = Mock()
if start_success:
mock_service.start_verification.return_value = {
"success": True,
"email": "t***@example.com",
"verification_method": "email"
}
else:
mock_service.start_verification.return_value = {
"success": False,
"error": start_error
}
if verify_success:
mock_service.verify_email_code.return_value = {
"success": True,
"email": "test@example.com"
}
else:
mock_service.verify_email_code.return_value = {
"success": False,
"error": "invalid_code"
}
mock_service.code_storage = Mock()
mock_service.code_storage.get.return_value = "test@example.com"
mock_service.create_authorization_code.return_value = "test_auth_code_12345"
return mock_service
def create_mock_happ_parser():
"""Create a mock h-app parser."""
from gondulf.services.happ_parser import ClientMetadata
mock_parser = Mock()
mock_parser.fetch_and_parse = AsyncMock(return_value=ClientMetadata(
name="Test Application",
url="https://app.example.com",
logo="https://app.example.com/logo.png"
))
return mock_parser
@pytest.fixture
def configured_app(monkeypatch, tmp_path):
"""Create a fully configured app with fresh database."""
db_path = tmp_path / "test.db"
monkeypatch.setenv("GONDULF_SECRET_KEY", "a" * 32)
monkeypatch.setenv("GONDULF_BASE_URL", "https://auth.example.com")
monkeypatch.setenv("GONDULF_DATABASE_URL", f"sqlite:///{db_path}")
monkeypatch.setenv("GONDULF_DEBUG", "true")
from gondulf.main import app
return app, db_path
class TestUnverifiedDomainTriggersVerification:
"""Tests that unverified domains trigger the verification flow."""
def test_unverified_domain_shows_verification_form(
self, configured_app, valid_auth_params
):
"""Test that an unverified domain shows the verification code form."""
app, _ = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser
mock_service = create_mock_verification_service(start_success=True)
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
response = client.get("/authorize", params=valid_auth_params)
assert response.status_code == 200
# Should show verification form, not consent form
assert "Verify Your Identity" in response.text
assert "verification code" in response.text.lower()
# Should show masked email
assert "t***@example.com" in response.text
finally:
app.dependency_overrides.clear()
def test_unverified_domain_preserves_auth_params(
self, configured_app, valid_auth_params
):
"""Test that authorization parameters are preserved in verification form."""
app, _ = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser
mock_service = create_mock_verification_service(start_success=True)
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
response = client.get("/authorize", params=valid_auth_params)
assert response.status_code == 200
# Check hidden fields contain auth params
assert 'name="client_id"' in response.text
assert 'name="redirect_uri"' in response.text
assert 'name="state"' in response.text
assert 'name="code_challenge"' in response.text
finally:
app.dependency_overrides.clear()
def test_unverified_domain_does_not_show_consent(
self, configured_app, valid_auth_params
):
"""Test that unverified domain does NOT show consent form directly."""
app, _ = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser
mock_service = create_mock_verification_service(start_success=True)
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
response = client.get("/authorize", params=valid_auth_params)
assert response.status_code == 200
# Should NOT show consent/authorization form
assert "Authorization Request" not in response.text
finally:
app.dependency_overrides.clear()
class TestVerifiedDomainShowsConsent:
"""Tests that verified domains skip verification and show consent."""
def test_verified_domain_shows_consent_page(
self, configured_app, valid_auth_params
):
"""Test that a verified domain shows consent page directly."""
app, db_path = configured_app
from gondulf.dependencies import get_happ_parser, get_database
from gondulf.database.connection import Database
from sqlalchemy import text
# Create database and insert verified domain
db = Database(f"sqlite:///{db_path}")
db.initialize()
with db.get_engine().begin() as conn:
conn.execute(
text("""
INSERT INTO domains (domain, email, verification_code, verified, verified_at, two_factor)
VALUES (:domain, :email, '', 1, :verified_at, 1)
"""),
{"domain": "user.example.com", "email": "test@example.com", "verified_at": datetime.utcnow()}
)
# Override database to use same instance
app.dependency_overrides[get_database] = lambda: db
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
response = client.get("/authorize", params=valid_auth_params)
# Should show consent page
assert response.status_code == 200
assert "Authorization Request" in response.text
assert 'action="/authorize/consent"' in response.text
finally:
app.dependency_overrides.clear()
class TestVerificationCodeValidation:
"""Tests for the verification code submission endpoint."""
def test_valid_code_shows_consent(
self, configured_app, valid_auth_params
):
"""Test that valid verification code shows consent page."""
app, _ = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser
mock_service = create_mock_verification_service(verify_success=True)
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
form_data = {
"domain": "user.example.com",
"code": "123456",
"client_id": valid_auth_params["client_id"],
"redirect_uri": valid_auth_params["redirect_uri"],
"response_type": valid_auth_params["response_type"],
"state": valid_auth_params["state"],
"code_challenge": valid_auth_params["code_challenge"],
"code_challenge_method": valid_auth_params["code_challenge_method"],
"scope": "",
"me": valid_auth_params["me"],
}
response = client.post("/authorize/verify-code", data=form_data)
assert response.status_code == 200
# Should show consent page after successful verification
assert "Authorization Request" in response.text or "Authorize" in response.text
finally:
app.dependency_overrides.clear()
def test_invalid_code_shows_error_with_retry(
self, configured_app, valid_auth_params
):
"""Test that invalid code shows error and allows retry."""
app, _ = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser
mock_service = create_mock_verification_service(verify_success=False)
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
form_data = {
"domain": "user.example.com",
"code": "000000",
"client_id": valid_auth_params["client_id"],
"redirect_uri": valid_auth_params["redirect_uri"],
"response_type": valid_auth_params["response_type"],
"state": valid_auth_params["state"],
"code_challenge": valid_auth_params["code_challenge"],
"code_challenge_method": valid_auth_params["code_challenge_method"],
"scope": "",
"me": valid_auth_params["me"],
}
response = client.post("/authorize/verify-code", data=form_data)
assert response.status_code == 200
# Should show verify_code page with error
assert "Invalid verification code" in response.text or "invalid" in response.text.lower()
# Should still have the form for retry
assert 'name="code"' in response.text
finally:
app.dependency_overrides.clear()
class TestDNSFailureHandling:
"""Tests for DNS verification failure scenarios."""
def test_dns_failure_shows_instructions(
self, configured_app, valid_auth_params
):
"""Test that DNS verification failure shows helpful instructions."""
app, db_path = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
from gondulf.database.connection import Database
from sqlalchemy import text
# Clear any pre-existing verified domain to ensure test isolation
db = Database(f"sqlite:///{db_path}")
db.initialize()
with db.get_engine().begin() as conn:
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
app.dependency_overrides[get_database] = lambda: db
mock_service = create_mock_verification_service(start_success=False, start_error="dns_verification_failed")
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
response = client.get("/authorize", params=valid_auth_params)
assert response.status_code == 200
# Should show error page with DNS instructions
assert "DNS" in response.text or "dns" in response.text.lower()
assert "TXT" in response.text
assert "_gondulf" in response.text
finally:
app.dependency_overrides.clear()
class TestEmailFailureHandling:
"""Tests for email discovery failure scenarios."""
def test_email_discovery_failure_shows_instructions(
self, configured_app, valid_auth_params
):
"""Test that email discovery failure shows helpful instructions."""
app, db_path = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
from gondulf.database.connection import Database
from sqlalchemy import text
# Clear any pre-existing verified domain to ensure test isolation
db = Database(f"sqlite:///{db_path}")
db.initialize()
with db.get_engine().begin() as conn:
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
app.dependency_overrides[get_database] = lambda: db
mock_service = create_mock_verification_service(start_success=False, start_error="email_discovery_failed")
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
response = client.get("/authorize", params=valid_auth_params)
assert response.status_code == 200
# Should show error page with email instructions
assert "email" in response.text.lower()
finally:
app.dependency_overrides.clear()
class TestFullVerificationFlow:
"""Integration tests for the complete verification flow."""
def test_full_flow_new_domain(
self, configured_app, valid_auth_params
):
"""Test complete flow: unverified domain -> verify code -> consent."""
app, db_path = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
from gondulf.database.connection import Database
from sqlalchemy import text
# Clear any pre-existing verified domain to ensure test isolation
db = Database(f"sqlite:///{db_path}")
db.initialize()
with db.get_engine().begin() as conn:
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
app.dependency_overrides[get_database] = lambda: db
mock_service = create_mock_verification_service(start_success=True, verify_success=True)
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
# Step 1: GET /authorize -> should show verification form
response1 = client.get("/authorize", params=valid_auth_params)
assert response1.status_code == 200
assert "Verify Your Identity" in response1.text
# Step 2: POST /authorize/verify-code -> should show consent
form_data = {
"domain": "user.example.com",
"code": "123456",
"client_id": valid_auth_params["client_id"],
"redirect_uri": valid_auth_params["redirect_uri"],
"response_type": valid_auth_params["response_type"],
"state": valid_auth_params["state"],
"code_challenge": valid_auth_params["code_challenge"],
"code_challenge_method": valid_auth_params["code_challenge_method"],
"scope": "",
"me": valid_auth_params["me"],
}
response2 = client.post("/authorize/verify-code", data=form_data)
assert response2.status_code == 200
# Should show consent page
assert "Authorization Request" in response2.text or "Authorize" in response2.text
finally:
app.dependency_overrides.clear()
def test_verification_code_retry_with_correct_code(
self, configured_app, valid_auth_params
):
"""Test that user can retry with correct code after failure."""
app, _ = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser
mock_service = Mock()
# First verify_email_code call fails, second succeeds
mock_service.verify_email_code.side_effect = [
{"success": False, "error": "invalid_code"},
{"success": True, "email": "test@example.com"}
]
mock_service.code_storage = Mock()
mock_service.code_storage.get.return_value = "test@example.com"
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
form_data = {
"domain": "user.example.com",
"code": "000000", # Wrong code
"client_id": valid_auth_params["client_id"],
"redirect_uri": valid_auth_params["redirect_uri"],
"response_type": valid_auth_params["response_type"],
"state": valid_auth_params["state"],
"code_challenge": valid_auth_params["code_challenge"],
"code_challenge_method": valid_auth_params["code_challenge_method"],
"scope": "",
"me": valid_auth_params["me"],
}
# First attempt with wrong code
response1 = client.post("/authorize/verify-code", data=form_data)
assert response1.status_code == 200
assert "Invalid" in response1.text or "invalid" in response1.text.lower()
# Second attempt with correct code
form_data["code"] = "123456"
response2 = client.post("/authorize/verify-code", data=form_data)
assert response2.status_code == 200
assert "Authorization Request" in response2.text or "Authorize" in response2.text
finally:
app.dependency_overrides.clear()
class TestSecurityRequirements:
"""Tests for security requirements of the fix."""
def test_unverified_domain_never_sees_consent_directly(
self, configured_app, valid_auth_params
):
"""Critical: Unverified domains must NEVER see consent page directly."""
app, db_path = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
from gondulf.database.connection import Database
from sqlalchemy import text
# Clear any pre-existing verified domain to ensure test isolation
db = Database(f"sqlite:///{db_path}")
db.initialize()
with db.get_engine().begin() as conn:
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
app.dependency_overrides[get_database] = lambda: db
mock_service = create_mock_verification_service(start_success=True)
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
with TestClient(app) as client:
response = client.get("/authorize", params=valid_auth_params)
# The consent page should NOT be shown
assert "Authorization Request" not in response.text
# Verify code page should be shown instead
assert "Verify Your Identity" in response.text
finally:
app.dependency_overrides.clear()
def test_state_parameter_preserved_through_flow(
self, configured_app, valid_auth_params
):
"""Test that state parameter is preserved through verification flow."""
app, _ = configured_app
from gondulf.dependencies import get_verification_service, get_happ_parser
mock_service = create_mock_verification_service(start_success=True)
mock_parser = create_mock_happ_parser()
app.dependency_overrides[get_verification_service] = lambda: mock_service
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
try:
unique_state = "unique_state_abc123xyz"
params = valid_auth_params.copy()
params["state"] = unique_state
with TestClient(app) as client:
response = client.get("/authorize", params=params)
assert response.status_code == 200
# State should be in hidden form field
assert f'value="{unique_state}"' in response.text or f"value='{unique_state}'" in response.text
finally:
app.dependency_overrides.clear()

View File

@@ -0,0 +1,433 @@
"""
Integration tests for IndieAuth response_type flows.
Tests the two IndieAuth flows per W3C specification:
- Authentication flow (response_type=id): Code redeemed at authorization endpoint
- Authorization flow (response_type=code): Code redeemed at token endpoint
"""
from unittest.mock import AsyncMock, patch
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def flow_app(monkeypatch, tmp_path):
"""Create app for flow testing."""
db_path = tmp_path / "test.db"
monkeypatch.setenv("GONDULF_SECRET_KEY", "a" * 32)
monkeypatch.setenv("GONDULF_BASE_URL", "https://auth.example.com")
monkeypatch.setenv("GONDULF_DATABASE_URL", f"sqlite:///{db_path}")
monkeypatch.setenv("GONDULF_DEBUG", "true")
from gondulf.main import app
return app
@pytest.fixture
def flow_client(flow_app):
"""Create test client for flow tests."""
with TestClient(flow_app) as client:
yield client
@pytest.fixture
def mock_happ_fetch():
"""Mock h-app parser to avoid network calls."""
from gondulf.services.happ_parser import ClientMetadata
metadata = ClientMetadata(
name="Test Application",
url="https://app.example.com",
logo="https://app.example.com/logo.png"
)
with patch('gondulf.services.happ_parser.HAppParser.fetch_and_parse', new_callable=AsyncMock) as mock:
mock.return_value = metadata
yield mock
class TestResponseTypeValidation:
"""Tests for response_type parameter validation."""
@pytest.fixture
def base_params(self):
"""Base authorization parameters without response_type."""
return {
"client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback",
"state": "test123",
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
"code_challenge_method": "S256",
"me": "https://user.example.com",
}
def test_response_type_id_accepted(self, flow_client, base_params, mock_happ_fetch):
"""Test response_type=id is accepted."""
params = base_params.copy()
params["response_type"] = "id"
response = flow_client.get("/authorize", params=params)
assert response.status_code == 200
assert "text/html" in response.headers["content-type"]
def test_response_type_code_accepted(self, flow_client, base_params, mock_happ_fetch):
"""Test response_type=code is accepted."""
params = base_params.copy()
params["response_type"] = "code"
response = flow_client.get("/authorize", params=params)
assert response.status_code == 200
assert "text/html" in response.headers["content-type"]
def test_response_type_defaults_to_id(self, flow_client, base_params, mock_happ_fetch):
"""Test missing response_type defaults to 'id'."""
# No response_type in params
response = flow_client.get("/authorize", params=base_params)
assert response.status_code == 200
# Form should contain response_type=id
assert 'value="id"' in response.text
def test_invalid_response_type_rejected(self, flow_client, base_params, mock_happ_fetch):
"""Test invalid response_type redirects with error."""
params = base_params.copy()
params["response_type"] = "token" # Invalid
response = flow_client.get("/authorize", params=params, follow_redirects=False)
assert response.status_code == 302
location = response.headers["location"]
assert "error=unsupported_response_type" in location
assert "state=test123" in location
def test_consent_form_includes_response_type(self, flow_client, base_params, mock_happ_fetch):
"""Test consent form includes response_type hidden field."""
params = base_params.copy()
params["response_type"] = "code"
response = flow_client.get("/authorize", params=params)
assert response.status_code == 200
assert 'name="response_type"' in response.text
assert 'value="code"' in response.text
class TestAuthenticationFlow:
"""Tests for authentication flow (response_type=id)."""
@pytest.fixture
def auth_code_id_flow(self, flow_client):
"""Create an authorization code for the authentication flow."""
consent_data = {
"client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback",
"response_type": "id", # Authentication flow
"state": "test123",
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
"code_challenge_method": "S256",
"scope": "",
"me": "https://user.example.com",
}
response = flow_client.post(
"/authorize/consent",
data=consent_data,
follow_redirects=False
)
assert response.status_code == 302
location = response.headers["location"]
from tests.conftest import extract_code_from_redirect
code = extract_code_from_redirect(location)
return code, consent_data
def test_auth_code_redemption_at_authorization_endpoint(self, flow_client, auth_code_id_flow):
"""Test authentication flow code is redeemed at authorization endpoint."""
code, consent_data = auth_code_id_flow
response = flow_client.post(
"/authorize",
data={
"code": code,
"client_id": consent_data["client_id"],
}
)
assert response.status_code == 200
data = response.json()
assert "me" in data
assert data["me"] == "https://user.example.com"
# Should NOT have access_token
assert "access_token" not in data
def test_auth_flow_returns_only_me(self, flow_client, auth_code_id_flow):
"""Test authentication response contains only 'me' field."""
code, consent_data = auth_code_id_flow
response = flow_client.post(
"/authorize",
data={
"code": code,
"client_id": consent_data["client_id"],
}
)
data = response.json()
assert set(data.keys()) == {"me"}
def test_auth_flow_code_single_use(self, flow_client, auth_code_id_flow):
"""Test authentication code can only be used once."""
code, consent_data = auth_code_id_flow
# First use - should succeed
response1 = flow_client.post(
"/authorize",
data={
"code": code,
"client_id": consent_data["client_id"],
}
)
assert response1.status_code == 200
# Second use - should fail
response2 = flow_client.post(
"/authorize",
data={
"code": code,
"client_id": consent_data["client_id"],
}
)
assert response2.status_code == 400
assert response2.json()["error"] == "invalid_grant"
def test_auth_flow_client_id_mismatch_rejected(self, flow_client, auth_code_id_flow):
"""Test wrong client_id is rejected."""
code, _ = auth_code_id_flow
response = flow_client.post(
"/authorize",
data={
"code": code,
"client_id": "https://wrong.example.com",
}
)
assert response.status_code == 400
assert response.json()["error"] == "invalid_client"
def test_auth_flow_redirect_uri_mismatch_rejected(self, flow_client, auth_code_id_flow):
"""Test wrong redirect_uri is rejected when provided."""
code, consent_data = auth_code_id_flow
response = flow_client.post(
"/authorize",
data={
"code": code,
"client_id": consent_data["client_id"],
"redirect_uri": "https://wrong.example.com/callback",
}
)
assert response.status_code == 400
assert response.json()["error"] == "invalid_grant"
def test_auth_flow_id_code_rejected_at_token_endpoint(self, flow_client, auth_code_id_flow):
"""Test authentication flow code is rejected at token endpoint."""
code, consent_data = auth_code_id_flow
response = flow_client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": consent_data["client_id"],
"redirect_uri": consent_data["redirect_uri"],
}
)
assert response.status_code == 400
# Should indicate wrong endpoint
data = response.json()["detail"]
assert data["error"] == "invalid_grant"
assert "authorization endpoint" in data["error_description"]
def test_auth_flow_cache_headers(self, flow_client, auth_code_id_flow):
"""Test authentication response has no-cache headers."""
code, consent_data = auth_code_id_flow
response = flow_client.post(
"/authorize",
data={
"code": code,
"client_id": consent_data["client_id"],
}
)
assert response.headers.get("Cache-Control") == "no-store"
assert response.headers.get("Pragma") == "no-cache"
class TestAuthorizationFlow:
"""Tests for authorization flow (response_type=code)."""
@pytest.fixture
def auth_code_code_flow(self, flow_client):
"""Create an authorization code for the authorization flow."""
consent_data = {
"client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback",
"response_type": "code", # Authorization flow
"state": "test456",
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
"code_challenge_method": "S256",
"scope": "profile",
"me": "https://user.example.com",
}
response = flow_client.post(
"/authorize/consent",
data=consent_data,
follow_redirects=False
)
assert response.status_code == 302
location = response.headers["location"]
from tests.conftest import extract_code_from_redirect
code = extract_code_from_redirect(location)
return code, consent_data
def test_code_flow_redemption_at_token_endpoint(self, flow_client, auth_code_code_flow):
"""Test authorization flow code is redeemed at token endpoint."""
code, consent_data = auth_code_code_flow
response = flow_client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": consent_data["client_id"],
"redirect_uri": consent_data["redirect_uri"],
}
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert "me" in data
assert data["me"] == "https://user.example.com"
assert data["token_type"] == "Bearer"
def test_code_flow_code_rejected_at_authorization_endpoint(self, flow_client, auth_code_code_flow):
"""Test authorization flow code is rejected at authorization endpoint."""
code, consent_data = auth_code_code_flow
response = flow_client.post(
"/authorize",
data={
"code": code,
"client_id": consent_data["client_id"],
}
)
assert response.status_code == 400
# Should indicate wrong endpoint
data = response.json()
assert data["error"] == "invalid_grant"
assert "token endpoint" in data["error_description"]
def test_code_flow_single_use(self, flow_client, auth_code_code_flow):
"""Test authorization code can only be used once."""
code, consent_data = auth_code_code_flow
# First use - should succeed
response1 = flow_client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": consent_data["client_id"],
"redirect_uri": consent_data["redirect_uri"],
}
)
assert response1.status_code == 200
# Second use - should fail
response2 = flow_client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": consent_data["client_id"],
"redirect_uri": consent_data["redirect_uri"],
}
)
assert response2.status_code == 400
class TestMetadataEndpoint:
"""Tests for server metadata endpoint."""
def test_metadata_includes_both_response_types(self, flow_client):
"""Test metadata advertises both response types."""
response = flow_client.get("/.well-known/oauth-authorization-server")
assert response.status_code == 200
data = response.json()
assert "response_types_supported" in data
assert "code" in data["response_types_supported"]
assert "id" in data["response_types_supported"]
def test_metadata_includes_code_challenge_method(self, flow_client):
"""Test metadata advertises S256 code challenge method."""
response = flow_client.get("/.well-known/oauth-authorization-server")
assert response.status_code == 200
data = response.json()
assert "code_challenge_methods_supported" in data
assert "S256" in data["code_challenge_methods_supported"]
class TestErrorScenarios:
"""Tests for error handling in both flows."""
def test_invalid_code_at_authorization_endpoint(self, flow_client):
"""Test invalid code returns error at authorization endpoint."""
response = flow_client.post(
"/authorize",
data={
"code": "invalid_code_12345",
"client_id": "https://app.example.com",
}
)
assert response.status_code == 400
data = response.json()
assert data["error"] == "invalid_grant"
def test_missing_code_at_authorization_endpoint(self, flow_client):
"""Test missing code returns validation error."""
response = flow_client.post(
"/authorize",
data={
"client_id": "https://app.example.com",
}
)
# FastAPI returns 422 for missing required form field
assert response.status_code == 422
def test_missing_client_id_at_authorization_endpoint(self, flow_client):
"""Test missing client_id returns validation error."""
response = flow_client.post(
"/authorize",
data={
"code": "some_code",
}
)
# FastAPI returns 422 for missing required form field
assert response.status_code == 422

View File

@@ -32,13 +32,14 @@ def token_client(token_app):
@pytest.fixture @pytest.fixture
def setup_auth_code(token_app, test_code_storage): def setup_auth_code(token_app, test_code_storage):
"""Setup a valid authorization code for testing.""" """Setup a valid authorization code for testing (authorization flow)."""
from gondulf.dependencies import get_code_storage from gondulf.dependencies import get_code_storage
code = "integration_test_code_12345" code = "integration_test_code_12345"
metadata = { metadata = {
"client_id": "https://app.example.com", "client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback", "redirect_uri": "https://app.example.com/callback",
"response_type": "code", # Authorization flow - exchange at token endpoint
"state": "xyz123", "state": "xyz123",
"me": "https://user.example.com", "me": "https://user.example.com",
"scope": "", "scope": "",
@@ -212,6 +213,7 @@ class TestTokenExchangeErrors:
metadata = { metadata = {
"client_id": "https://app.example.com", "client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback", "redirect_uri": "https://app.example.com/callback",
"response_type": "code", # Authorization flow
"state": "xyz123", "state": "xyz123",
"me": "https://user.example.com", "me": "https://user.example.com",
"scope": "", "scope": "",

View File

@@ -78,11 +78,11 @@ class TestMetadataEndpoint:
assert data["token_endpoint"] == "https://auth.example.com/token" assert data["token_endpoint"] == "https://auth.example.com/token"
def test_metadata_response_types_supported(self, client): def test_metadata_response_types_supported(self, client):
"""Test response_types_supported contains only 'code'.""" """Test response_types_supported contains both 'code' and 'id'."""
response = client.get("/.well-known/oauth-authorization-server") response = client.get("/.well-known/oauth-authorization-server")
data = response.json() data = response.json()
assert data["response_types_supported"] == ["code"] assert data["response_types_supported"] == ["code", "id"]
def test_metadata_grant_types_supported(self, client): def test_metadata_grant_types_supported(self, client):
"""Test grant_types_supported contains only 'authorization_code'.""" """Test grant_types_supported contains only 'authorization_code'."""
@@ -91,12 +91,12 @@ class TestMetadataEndpoint:
assert data["grant_types_supported"] == ["authorization_code"] assert data["grant_types_supported"] == ["authorization_code"]
def test_metadata_code_challenge_methods_empty(self, client): def test_metadata_code_challenge_methods_supported(self, client):
"""Test code_challenge_methods_supported is empty array.""" """Test code_challenge_methods_supported contains S256."""
response = client.get("/.well-known/oauth-authorization-server") response = client.get("/.well-known/oauth-authorization-server")
data = response.json() data = response.json()
assert data["code_challenge_methods_supported"] == [] assert data["code_challenge_methods_supported"] == ["S256"]
def test_metadata_token_endpoint_auth_methods(self, client): def test_metadata_token_endpoint_auth_methods(self, client):
"""Test token_endpoint_auth_methods_supported contains 'none'.""" """Test token_endpoint_auth_methods_supported contains 'none'."""

View File

@@ -71,11 +71,12 @@ def client(test_config, test_database, test_code_storage, test_token_service):
@pytest.fixture @pytest.fixture
def valid_auth_code(test_code_storage): def valid_auth_code(test_code_storage):
"""Create a valid authorization code.""" """Create a valid authorization code (authorization flow)."""
code = "test_auth_code_12345" code = "test_auth_code_12345"
metadata = { metadata = {
"client_id": "https://client.example.com", "client_id": "https://client.example.com",
"redirect_uri": "https://client.example.com/callback", "redirect_uri": "https://client.example.com/callback",
"response_type": "code", # Authorization flow - exchange at token endpoint
"state": "xyz123", "state": "xyz123",
"me": "https://user.example.com", "me": "https://user.example.com",
"scope": "", "scope": "",