Compare commits
3 Commits
v1.0.0-rc.
...
9b50f359a6
| Author | SHA1 | Date | |
|---|---|---|---|
| 9b50f359a6 | |||
| 8dddc73826 | |||
| 052d3ad3e1 |
@@ -1,17 +1,28 @@
|
||||
"""Authorization endpoint for OAuth 2.0 / IndieAuth authorization code flow."""
|
||||
"""Authorization endpoint for OAuth 2.0 / IndieAuth authorization code flow.
|
||||
|
||||
Supports both IndieAuth flows per W3C specification:
|
||||
- Authentication (response_type=id): Returns user identity only, code redeemed at authorization endpoint
|
||||
- Authorization (response_type=code): Returns access token, code redeemed at token endpoint
|
||||
"""
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from fastapi import APIRouter, Depends, Form, Request
|
||||
from fastapi.responses import HTMLResponse, RedirectResponse
|
||||
from fastapi import APIRouter, Depends, Form, Request, Response
|
||||
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
|
||||
from fastapi.templating import Jinja2Templates
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import text
|
||||
|
||||
from gondulf.database.connection import Database
|
||||
from gondulf.dependencies import get_database, get_happ_parser, get_verification_service
|
||||
from gondulf.dependencies import get_code_storage, get_database, get_happ_parser, get_verification_service
|
||||
from gondulf.services.domain_verification import DomainVerificationService
|
||||
from gondulf.services.happ_parser import HAppParser
|
||||
from gondulf.storage import CodeStore
|
||||
from gondulf.utils.validation import (
|
||||
extract_domain_from_url,
|
||||
mask_email,
|
||||
normalize_client_id,
|
||||
validate_redirect_uri,
|
||||
)
|
||||
@@ -21,6 +32,76 @@ logger = logging.getLogger("gondulf.authorization")
|
||||
router = APIRouter()
|
||||
templates = Jinja2Templates(directory="src/gondulf/templates")
|
||||
|
||||
# Valid response types per IndieAuth spec
|
||||
VALID_RESPONSE_TYPES = {"id", "code"}
|
||||
|
||||
|
||||
class AuthenticationResponse(BaseModel):
|
||||
"""
|
||||
IndieAuth authentication response (response_type=id flow).
|
||||
|
||||
Per W3C IndieAuth specification Section 5.3.3:
|
||||
https://www.w3.org/TR/indieauth/#authentication-response
|
||||
"""
|
||||
me: str
|
||||
|
||||
|
||||
async def check_domain_verified(database: Database, domain: str) -> bool:
|
||||
"""
|
||||
Check if domain is verified in the database.
|
||||
|
||||
Args:
|
||||
database: Database service
|
||||
domain: Domain to check (e.g., "example.com")
|
||||
|
||||
Returns:
|
||||
True if domain is verified, False otherwise
|
||||
"""
|
||||
try:
|
||||
engine = database.get_engine()
|
||||
with engine.connect() as conn:
|
||||
result = conn.execute(
|
||||
text("SELECT verified FROM domains WHERE domain = :domain AND verified = 1"),
|
||||
{"domain": domain}
|
||||
)
|
||||
row = result.fetchone()
|
||||
return row is not None
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to check domain verification: {e}")
|
||||
return False
|
||||
|
||||
|
||||
async def store_verified_domain(database: Database, domain: str, email: str) -> None:
|
||||
"""
|
||||
Store verified domain in database.
|
||||
|
||||
Args:
|
||||
database: Database service
|
||||
domain: Verified domain
|
||||
email: Email used for verification (for audit)
|
||||
"""
|
||||
try:
|
||||
engine = database.get_engine()
|
||||
now = datetime.utcnow()
|
||||
with engine.begin() as conn:
|
||||
# Use INSERT OR REPLACE for SQLite
|
||||
conn.execute(
|
||||
text("""
|
||||
INSERT OR REPLACE INTO domains
|
||||
(domain, email, verification_code, verified, verified_at, two_factor)
|
||||
VALUES (:domain, :email, '', 1, :verified_at, 1)
|
||||
"""),
|
||||
{
|
||||
"domain": domain,
|
||||
"email": email,
|
||||
"verified_at": now
|
||||
}
|
||||
)
|
||||
logger.info(f"Stored verified domain: {domain}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to store verified domain: {e}")
|
||||
raise
|
||||
|
||||
|
||||
@router.get("/authorize")
|
||||
async def authorize_get(
|
||||
@@ -34,7 +115,8 @@ async def authorize_get(
|
||||
scope: str | None = None,
|
||||
me: str | None = None,
|
||||
database: Database = Depends(get_database),
|
||||
happ_parser: HAppParser = Depends(get_happ_parser)
|
||||
happ_parser: HAppParser = Depends(get_happ_parser),
|
||||
verification_service: DomainVerificationService = Depends(get_verification_service)
|
||||
) -> HTMLResponse:
|
||||
"""
|
||||
Handle authorization request (GET).
|
||||
@@ -42,20 +124,26 @@ async def authorize_get(
|
||||
Validates client_id, redirect_uri, and required parameters.
|
||||
Shows consent form if domain is verified, or verification form if not.
|
||||
|
||||
Supports two IndieAuth flows per W3C specification:
|
||||
- response_type=id (default): Authentication only, returns user identity
|
||||
- response_type=code: Authorization, returns access token
|
||||
|
||||
Args:
|
||||
request: FastAPI request object
|
||||
client_id: Client application identifier
|
||||
redirect_uri: Callback URI for client
|
||||
response_type: Must be "code"
|
||||
response_type: "id" (default) for authentication, "code" for authorization
|
||||
state: Client state parameter
|
||||
code_challenge: PKCE code challenge
|
||||
code_challenge_method: PKCE method (S256)
|
||||
scope: Requested scope
|
||||
scope: Requested scope (only meaningful for response_type=code)
|
||||
me: User identity URL
|
||||
database: Database service
|
||||
happ_parser: H-app parser for client metadata
|
||||
verification_service: Domain verification service
|
||||
|
||||
Returns:
|
||||
HTML response with consent form or error page
|
||||
HTML response with consent form, verification form, or error page
|
||||
"""
|
||||
# Validate required parameters (pre-client validation)
|
||||
if not client_id:
|
||||
@@ -108,11 +196,13 @@ async def authorize_get(
|
||||
|
||||
# From here on, redirect errors to client via OAuth error redirect
|
||||
|
||||
# Validate response_type
|
||||
if response_type != "code":
|
||||
# Validate response_type - default to "id" if not provided (per IndieAuth spec)
|
||||
effective_response_type = response_type or "id"
|
||||
|
||||
if effective_response_type not in VALID_RESPONSE_TYPES:
|
||||
error_params = {
|
||||
"error": "unsupported_response_type",
|
||||
"error_description": "Only response_type=code is supported",
|
||||
"error_description": f"response_type must be 'id' or 'code', got '{response_type}'",
|
||||
"state": state or ""
|
||||
}
|
||||
redirect_url = f"{redirect_uri}?{urlencode(error_params)}"
|
||||
@@ -148,9 +238,9 @@ async def authorize_get(
|
||||
redirect_url = f"{redirect_uri}?{urlencode(error_params)}"
|
||||
return RedirectResponse(url=redirect_url, status_code=302)
|
||||
|
||||
# Validate me URL format
|
||||
# Validate me URL format and extract domain
|
||||
try:
|
||||
extract_domain_from_url(me)
|
||||
domain = extract_domain_from_url(me)
|
||||
except ValueError:
|
||||
error_params = {
|
||||
"error": "invalid_request",
|
||||
@@ -160,11 +250,71 @@ async def authorize_get(
|
||||
redirect_url = f"{redirect_uri}?{urlencode(error_params)}"
|
||||
return RedirectResponse(url=redirect_url, status_code=302)
|
||||
|
||||
# Check if domain is verified
|
||||
# For Phase 2, we'll show consent form immediately (domain verification happens separately)
|
||||
# In Phase 3, we'll check database for verified domains
|
||||
# SECURITY FIX: Check if domain is verified before showing consent
|
||||
is_verified = await check_domain_verified(database, domain)
|
||||
|
||||
if not is_verified:
|
||||
logger.info(f"Domain {domain} not verified, starting verification")
|
||||
|
||||
# Start two-factor verification
|
||||
result = verification_service.start_verification(domain, me)
|
||||
|
||||
if not result["success"]:
|
||||
# Verification cannot start (DNS failed, no rel=me, etc)
|
||||
error_message = result.get("error", "verification_failed")
|
||||
|
||||
# Map error codes to user-friendly messages
|
||||
error_messages = {
|
||||
"dns_verification_failed": "DNS verification failed. Please add the required TXT record.",
|
||||
"email_discovery_failed": "Could not find an email address on your homepage. Please add a rel='me' link to your email.",
|
||||
"invalid_email_format": "The email address discovered on your homepage is invalid.",
|
||||
"email_send_failed": "Failed to send verification email. Please try again."
|
||||
}
|
||||
friendly_error = error_messages.get(error_message, error_message)
|
||||
|
||||
logger.warning(f"Verification start failed for domain={domain}: {error_message}")
|
||||
|
||||
return templates.TemplateResponse(
|
||||
"verification_error.html",
|
||||
{
|
||||
"request": request,
|
||||
"error": friendly_error,
|
||||
"domain": domain,
|
||||
"client_id": normalized_client_id,
|
||||
"redirect_uri": redirect_uri,
|
||||
"response_type": effective_response_type,
|
||||
"state": state or "",
|
||||
"code_challenge": code_challenge,
|
||||
"code_challenge_method": code_challenge_method,
|
||||
"scope": scope or "",
|
||||
"me": me
|
||||
},
|
||||
status_code=200
|
||||
)
|
||||
|
||||
# Verification started - show code entry form
|
||||
logger.info(f"Verification code sent for domain={domain}")
|
||||
|
||||
return templates.TemplateResponse(
|
||||
"verify_code.html",
|
||||
{
|
||||
"request": request,
|
||||
"masked_email": result["email"],
|
||||
"domain": domain,
|
||||
"client_id": normalized_client_id,
|
||||
"redirect_uri": redirect_uri,
|
||||
"response_type": effective_response_type,
|
||||
"state": state or "",
|
||||
"code_challenge": code_challenge,
|
||||
"code_challenge_method": code_challenge_method,
|
||||
"scope": scope or "",
|
||||
"me": me
|
||||
}
|
||||
)
|
||||
|
||||
# Domain is verified - fetch client metadata and show consent form
|
||||
logger.info(f"Domain {domain} is verified, showing consent page")
|
||||
|
||||
# Fetch client metadata (h-app microformat)
|
||||
client_metadata = None
|
||||
try:
|
||||
client_metadata = await happ_parser.fetch_and_parse(normalized_client_id)
|
||||
@@ -180,6 +330,7 @@ async def authorize_get(
|
||||
"request": request,
|
||||
"client_id": normalized_client_id,
|
||||
"redirect_uri": redirect_uri,
|
||||
"response_type": effective_response_type,
|
||||
"state": state or "",
|
||||
"code_challenge": code_challenge,
|
||||
"code_challenge_method": code_challenge_method,
|
||||
@@ -190,11 +341,124 @@ async def authorize_get(
|
||||
)
|
||||
|
||||
|
||||
@router.post("/authorize/verify-code")
|
||||
async def authorize_verify_code(
|
||||
request: Request,
|
||||
domain: str = Form(...),
|
||||
code: str = Form(...),
|
||||
client_id: str = Form(...),
|
||||
redirect_uri: str = Form(...),
|
||||
response_type: str = Form("id"),
|
||||
state: str = Form(...),
|
||||
code_challenge: str = Form(...),
|
||||
code_challenge_method: str = Form(...),
|
||||
scope: str = Form(""),
|
||||
me: str = Form(...),
|
||||
database: Database = Depends(get_database),
|
||||
verification_service: DomainVerificationService = Depends(get_verification_service),
|
||||
happ_parser: HAppParser = Depends(get_happ_parser)
|
||||
) -> HTMLResponse:
|
||||
"""
|
||||
Handle verification code submission during authorization flow.
|
||||
|
||||
This endpoint is called when user submits the 6-digit email verification code.
|
||||
On success, shows consent page. On failure, shows code entry form with error.
|
||||
|
||||
Args:
|
||||
request: FastAPI request object
|
||||
domain: Domain being verified
|
||||
code: 6-digit verification code from email
|
||||
client_id: Client application identifier
|
||||
redirect_uri: Callback URI
|
||||
response_type: "id" for authentication, "code" for authorization
|
||||
state: Client state parameter
|
||||
code_challenge: PKCE code challenge
|
||||
code_challenge_method: PKCE method
|
||||
scope: Requested scope
|
||||
me: User identity URL
|
||||
database: Database service
|
||||
verification_service: Domain verification service
|
||||
happ_parser: H-app parser for client metadata
|
||||
|
||||
Returns:
|
||||
HTML response: consent page on success, code form with error on failure
|
||||
"""
|
||||
logger.info(f"Verification code submission for domain={domain}")
|
||||
|
||||
# Verify the code
|
||||
result = verification_service.verify_email_code(domain, code)
|
||||
|
||||
if not result["success"]:
|
||||
logger.warning(f"Verification code invalid for domain={domain}: {result.get('error')}")
|
||||
|
||||
# Get masked email for display
|
||||
email = verification_service.code_storage.get(f"email_addr:{domain}")
|
||||
masked = mask_email(email) if email else "unknown"
|
||||
|
||||
# Map error codes to user-friendly messages
|
||||
error_messages = {
|
||||
"invalid_code": "Invalid verification code. Please check and try again.",
|
||||
"email_not_found": "Verification session expired. Please start over."
|
||||
}
|
||||
error_message = result.get("error", "invalid_code")
|
||||
friendly_error = error_messages.get(error_message, error_message)
|
||||
|
||||
return templates.TemplateResponse(
|
||||
"verify_code.html",
|
||||
{
|
||||
"request": request,
|
||||
"error": friendly_error,
|
||||
"masked_email": masked,
|
||||
"domain": domain,
|
||||
"client_id": client_id,
|
||||
"redirect_uri": redirect_uri,
|
||||
"response_type": response_type,
|
||||
"state": state,
|
||||
"code_challenge": code_challenge,
|
||||
"code_challenge_method": code_challenge_method,
|
||||
"scope": scope,
|
||||
"me": me
|
||||
},
|
||||
status_code=200
|
||||
)
|
||||
|
||||
# Code valid - store verified domain in database
|
||||
email = result.get("email", "")
|
||||
await store_verified_domain(database, domain, email)
|
||||
|
||||
logger.info(f"Domain verified successfully: {domain}")
|
||||
|
||||
# Fetch client metadata for consent page
|
||||
client_metadata = None
|
||||
try:
|
||||
client_metadata = await happ_parser.fetch_and_parse(client_id)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to fetch client metadata: {e}")
|
||||
|
||||
# Show consent form
|
||||
return templates.TemplateResponse(
|
||||
"authorize.html",
|
||||
{
|
||||
"request": request,
|
||||
"client_id": client_id,
|
||||
"redirect_uri": redirect_uri,
|
||||
"response_type": response_type,
|
||||
"state": state,
|
||||
"code_challenge": code_challenge,
|
||||
"code_challenge_method": code_challenge_method,
|
||||
"scope": scope,
|
||||
"me": me,
|
||||
"client_metadata": client_metadata
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@router.post("/authorize/consent")
|
||||
async def authorize_consent(
|
||||
request: Request,
|
||||
client_id: str = Form(...),
|
||||
redirect_uri: str = Form(...),
|
||||
response_type: str = Form("id"), # Default to "id" for authentication flow
|
||||
state: str = Form(...),
|
||||
code_challenge: str = Form(...),
|
||||
code_challenge_method: str = Form(...),
|
||||
@@ -211,6 +475,7 @@ async def authorize_consent(
|
||||
request: FastAPI request object
|
||||
client_id: Client application identifier
|
||||
redirect_uri: Callback URI
|
||||
response_type: "id" for authentication, "code" for authorization
|
||||
state: Client state
|
||||
code_challenge: PKCE challenge
|
||||
code_challenge_method: PKCE method
|
||||
@@ -221,9 +486,9 @@ async def authorize_consent(
|
||||
Returns:
|
||||
Redirect to client callback with authorization code
|
||||
"""
|
||||
logger.info(f"Authorization consent granted for client_id={client_id}")
|
||||
logger.info(f"Authorization consent granted for client_id={client_id} response_type={response_type}")
|
||||
|
||||
# Create authorization code
|
||||
# Create authorization code with response_type metadata
|
||||
authorization_code = verification_service.create_authorization_code(
|
||||
client_id=client_id,
|
||||
redirect_uri=redirect_uri,
|
||||
@@ -231,7 +496,8 @@ async def authorize_consent(
|
||||
code_challenge=code_challenge,
|
||||
code_challenge_method=code_challenge_method,
|
||||
scope=scope,
|
||||
me=me
|
||||
me=me,
|
||||
response_type=response_type
|
||||
)
|
||||
|
||||
# Build redirect URL with authorization code
|
||||
@@ -243,3 +509,161 @@ async def authorize_consent(
|
||||
|
||||
logger.info(f"Redirecting to {redirect_uri} with authorization code")
|
||||
return RedirectResponse(url=redirect_url, status_code=302)
|
||||
|
||||
|
||||
@router.post("/authorize")
|
||||
async def authorize_post(
|
||||
response: Response,
|
||||
code: str = Form(...),
|
||||
client_id: str = Form(...),
|
||||
redirect_uri: Optional[str] = Form(None),
|
||||
code_verifier: Optional[str] = Form(None),
|
||||
code_storage: CodeStore = Depends(get_code_storage)
|
||||
) -> JSONResponse:
|
||||
"""
|
||||
Handle authorization code verification for authentication flow (response_type=id).
|
||||
|
||||
Per W3C IndieAuth specification Section 5.3.3:
|
||||
https://www.w3.org/TR/indieauth/#redeeming-the-authorization-code-id
|
||||
|
||||
This endpoint is used ONLY for the authentication flow (response_type=id).
|
||||
For the authorization flow (response_type=code), clients must use the token endpoint.
|
||||
|
||||
Request (application/x-www-form-urlencoded):
|
||||
code: Authorization code from /authorize redirect
|
||||
client_id: Client application URL (must match original request)
|
||||
redirect_uri: Original redirect URI (optional but recommended)
|
||||
code_verifier: PKCE verifier (optional, for PKCE validation)
|
||||
|
||||
Response (200 OK):
|
||||
{
|
||||
"me": "https://user.example.com/"
|
||||
}
|
||||
|
||||
Error Response (400 Bad Request):
|
||||
{
|
||||
"error": "invalid_grant",
|
||||
"error_description": "..."
|
||||
}
|
||||
|
||||
Returns:
|
||||
JSONResponse with user identity or error
|
||||
"""
|
||||
# Set cache headers (OAuth 2.0 best practice)
|
||||
response.headers["Cache-Control"] = "no-store"
|
||||
response.headers["Pragma"] = "no-cache"
|
||||
|
||||
logger.info(f"Authorization code verification request from client: {client_id}")
|
||||
|
||||
# STEP 1: Retrieve authorization code from storage
|
||||
storage_key = f"authz:{code}"
|
||||
code_data = code_storage.get(storage_key)
|
||||
|
||||
if code_data is None:
|
||||
logger.warning(f"Authorization code not found or expired: {code[:8]}...")
|
||||
return JSONResponse(
|
||||
status_code=400,
|
||||
content={
|
||||
"error": "invalid_grant",
|
||||
"error_description": "Authorization code is invalid or has expired"
|
||||
},
|
||||
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
|
||||
)
|
||||
|
||||
# Validate code_data is a dict
|
||||
if not isinstance(code_data, dict):
|
||||
logger.error(f"Authorization code metadata is not a dict: {type(code_data)}")
|
||||
return JSONResponse(
|
||||
status_code=400,
|
||||
content={
|
||||
"error": "invalid_grant",
|
||||
"error_description": "Authorization code is malformed"
|
||||
},
|
||||
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
|
||||
)
|
||||
|
||||
# STEP 2: Validate this code was issued for response_type=id
|
||||
stored_response_type = code_data.get('response_type', 'id')
|
||||
if stored_response_type != 'id':
|
||||
logger.warning(
|
||||
f"Code redemption at authorization endpoint for response_type={stored_response_type}"
|
||||
)
|
||||
return JSONResponse(
|
||||
status_code=400,
|
||||
content={
|
||||
"error": "invalid_grant",
|
||||
"error_description": "Authorization code must be redeemed at the token endpoint"
|
||||
},
|
||||
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
|
||||
)
|
||||
|
||||
# STEP 3: Validate client_id matches
|
||||
if code_data.get('client_id') != client_id:
|
||||
logger.warning(
|
||||
f"Client ID mismatch: expected {code_data.get('client_id')}, got {client_id}"
|
||||
)
|
||||
return JSONResponse(
|
||||
status_code=400,
|
||||
content={
|
||||
"error": "invalid_client",
|
||||
"error_description": "Client ID does not match authorization code"
|
||||
},
|
||||
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
|
||||
)
|
||||
|
||||
# STEP 4: Validate redirect_uri if provided
|
||||
if redirect_uri and code_data.get('redirect_uri') != redirect_uri:
|
||||
logger.warning(
|
||||
f"Redirect URI mismatch: expected {code_data.get('redirect_uri')}, got {redirect_uri}"
|
||||
)
|
||||
return JSONResponse(
|
||||
status_code=400,
|
||||
content={
|
||||
"error": "invalid_grant",
|
||||
"error_description": "Redirect URI does not match authorization request"
|
||||
},
|
||||
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
|
||||
)
|
||||
|
||||
# STEP 5: Check if code already used (prevent replay)
|
||||
if code_data.get('used'):
|
||||
logger.warning(f"Authorization code replay detected: {code[:8]}...")
|
||||
return JSONResponse(
|
||||
status_code=400,
|
||||
content={
|
||||
"error": "invalid_grant",
|
||||
"error_description": "Authorization code has already been used"
|
||||
},
|
||||
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
|
||||
)
|
||||
|
||||
# STEP 6: Extract user identity
|
||||
me = code_data.get('me')
|
||||
if not me:
|
||||
logger.error("Authorization code missing 'me' parameter")
|
||||
return JSONResponse(
|
||||
status_code=400,
|
||||
content={
|
||||
"error": "invalid_grant",
|
||||
"error_description": "Authorization code is malformed"
|
||||
},
|
||||
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
|
||||
)
|
||||
|
||||
# STEP 7: PKCE validation (optional for authentication flow)
|
||||
if code_verifier:
|
||||
logger.debug(f"PKCE code_verifier provided but not validated (v1.0.0)")
|
||||
# v1.1.0 will validate: SHA256(code_verifier) == code_challenge
|
||||
|
||||
# STEP 8: Delete authorization code (single-use enforcement)
|
||||
code_storage.delete(storage_key)
|
||||
logger.info(f"Authorization code verified and deleted: {code[:8]}...")
|
||||
|
||||
# STEP 9: Return authentication response with user identity
|
||||
logger.info(f"Authentication successful for {me} (client: {client_id})")
|
||||
|
||||
return JSONResponse(
|
||||
status_code=200,
|
||||
content={"me": me},
|
||||
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
|
||||
)
|
||||
|
||||
@@ -29,9 +29,9 @@ async def get_metadata(config: Config = Depends(get_config)) -> Response:
|
||||
"issuer": config.BASE_URL,
|
||||
"authorization_endpoint": f"{config.BASE_URL}/authorize",
|
||||
"token_endpoint": f"{config.BASE_URL}/token",
|
||||
"response_types_supported": ["code"],
|
||||
"response_types_supported": ["code", "id"],
|
||||
"grant_types_supported": ["authorization_code"],
|
||||
"code_challenge_methods_supported": [],
|
||||
"code_challenge_methods_supported": ["S256"],
|
||||
"token_endpoint_auth_methods_supported": ["none"],
|
||||
"revocation_endpoint_auth_methods_supported": ["none"],
|
||||
"scopes_supported": []
|
||||
|
||||
@@ -156,6 +156,21 @@ async def token_exchange(
|
||||
}
|
||||
)
|
||||
|
||||
# STEP 4.5: Validate this code was issued for response_type=code
|
||||
# Codes with response_type=id must be redeemed at the authorization endpoint
|
||||
stored_response_type = code_data.get('response_type', 'id')
|
||||
if stored_response_type != 'code':
|
||||
logger.warning(
|
||||
f"Code redemption at token endpoint for response_type={stored_response_type}"
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail={
|
||||
"error": "invalid_grant",
|
||||
"error_description": "Authorization code must be redeemed at the authorization endpoint"
|
||||
}
|
||||
)
|
||||
|
||||
# STEP 5: Check if code already used (prevent replay)
|
||||
if code_data.get('used'):
|
||||
logger.error(f"Authorization code replay detected: {code[:8]}...")
|
||||
|
||||
@@ -212,7 +212,8 @@ class DomainVerificationService:
|
||||
code_challenge: str,
|
||||
code_challenge_method: str,
|
||||
scope: str,
|
||||
me: str
|
||||
me: str,
|
||||
response_type: str = "id"
|
||||
) -> str:
|
||||
"""
|
||||
Create authorization code with metadata.
|
||||
@@ -225,6 +226,7 @@ class DomainVerificationService:
|
||||
code_challenge_method: PKCE method (S256)
|
||||
scope: Requested scope
|
||||
me: Verified user identity
|
||||
response_type: "id" for authentication, "code" for authorization
|
||||
|
||||
Returns:
|
||||
Authorization code
|
||||
@@ -232,7 +234,7 @@ class DomainVerificationService:
|
||||
# Generate authorization code
|
||||
authorization_code = self._generate_authorization_code()
|
||||
|
||||
# Create metadata
|
||||
# Create metadata including response_type for flow determination during redemption
|
||||
metadata = {
|
||||
"client_id": client_id,
|
||||
"redirect_uri": redirect_uri,
|
||||
@@ -241,6 +243,7 @@ class DomainVerificationService:
|
||||
"code_challenge_method": code_challenge_method,
|
||||
"scope": scope,
|
||||
"me": me,
|
||||
"response_type": response_type,
|
||||
"created_at": int(time.time()),
|
||||
"expires_at": int(time.time()) + 600,
|
||||
"used": False
|
||||
|
||||
@@ -36,6 +36,7 @@
|
||||
<form method="POST" action="/authorize/consent">
|
||||
<input type="hidden" name="client_id" value="{{ client_id }}">
|
||||
<input type="hidden" name="redirect_uri" value="{{ redirect_uri }}">
|
||||
<input type="hidden" name="response_type" value="{{ response_type }}">
|
||||
<input type="hidden" name="state" value="{{ state }}">
|
||||
<input type="hidden" name="code_challenge" value="{{ code_challenge }}">
|
||||
<input type="hidden" name="code_challenge_method" value="{{ code_challenge_method }}">
|
||||
|
||||
40
src/gondulf/templates/verification_error.html
Normal file
40
src/gondulf/templates/verification_error.html
Normal file
@@ -0,0 +1,40 @@
|
||||
{% extends "base.html" %}
|
||||
|
||||
{% block title %}Verification Failed - Gondulf{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
<h1>Verification Failed</h1>
|
||||
|
||||
<div class="error">
|
||||
<p>{{ error }}</p>
|
||||
</div>
|
||||
|
||||
{% if "DNS" in error or "dns" in error %}
|
||||
<div class="instructions">
|
||||
<h2>How to Fix</h2>
|
||||
<p>Add the following DNS TXT record to your domain:</p>
|
||||
<code>
|
||||
Type: TXT<br>
|
||||
Name: _gondulf.{{ domain }}<br>
|
||||
Value: gondulf-verify-domain
|
||||
</code>
|
||||
<p>DNS changes may take up to 24 hours to propagate.</p>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if "email" in error.lower() or "rel" in error.lower() %}
|
||||
<div class="instructions">
|
||||
<h2>How to Fix</h2>
|
||||
<p>Add a rel="me" link to your homepage pointing to your email:</p>
|
||||
<code><link rel="me" href="mailto:you@example.com"></code>
|
||||
<p>Or as an anchor tag:</p>
|
||||
<code><a rel="me" href="mailto:you@example.com">Email me</a></code>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
<p>
|
||||
<a href="/authorize?client_id={{ client_id }}&redirect_uri={{ redirect_uri }}&response_type={{ response_type }}&state={{ state }}&code_challenge={{ code_challenge }}&code_challenge_method={{ code_challenge_method }}&scope={{ scope }}&me={{ me }}">
|
||||
Try Again
|
||||
</a>
|
||||
</p>
|
||||
{% endblock %}
|
||||
51
src/gondulf/templates/verify_code.html
Normal file
51
src/gondulf/templates/verify_code.html
Normal file
@@ -0,0 +1,51 @@
|
||||
{% extends "base.html" %}
|
||||
|
||||
{% block title %}Verify Your Identity - Gondulf{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
<h1>Verify Your Identity</h1>
|
||||
|
||||
<p>To sign in as <strong>{{ domain }}</strong>, please enter the verification code sent to <strong>{{ masked_email }}</strong>.</p>
|
||||
|
||||
{% if error %}
|
||||
<div class="error">
|
||||
<p>{{ error }}</p>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
<form method="POST" action="/authorize/verify-code">
|
||||
<!-- Pass through authorization parameters -->
|
||||
<input type="hidden" name="domain" value="{{ domain }}">
|
||||
<input type="hidden" name="client_id" value="{{ client_id }}">
|
||||
<input type="hidden" name="redirect_uri" value="{{ redirect_uri }}">
|
||||
<input type="hidden" name="response_type" value="{{ response_type }}">
|
||||
<input type="hidden" name="state" value="{{ state }}">
|
||||
<input type="hidden" name="code_challenge" value="{{ code_challenge }}">
|
||||
<input type="hidden" name="code_challenge_method" value="{{ code_challenge_method }}">
|
||||
<input type="hidden" name="scope" value="{{ scope }}">
|
||||
<input type="hidden" name="me" value="{{ me }}">
|
||||
|
||||
<div class="form-group">
|
||||
<label for="code">Verification Code:</label>
|
||||
<input type="text"
|
||||
id="code"
|
||||
name="code"
|
||||
placeholder="000000"
|
||||
maxlength="6"
|
||||
pattern="[0-9]{6}"
|
||||
inputmode="numeric"
|
||||
autocomplete="one-time-code"
|
||||
required
|
||||
autofocus>
|
||||
</div>
|
||||
|
||||
<button type="submit">Verify</button>
|
||||
</form>
|
||||
|
||||
<p class="help-text">
|
||||
Did not receive a code? Check your spam folder.
|
||||
<a href="/authorize?client_id={{ client_id }}&redirect_uri={{ redirect_uri }}&response_type={{ response_type }}&state={{ state }}&code_challenge={{ code_challenge }}&code_challenge_method={{ code_challenge_method }}&scope={{ scope }}&me={{ me }}">
|
||||
Request a new code
|
||||
</a>
|
||||
</p>
|
||||
{% endblock %}
|
||||
@@ -131,7 +131,7 @@ def test_code_storage():
|
||||
@pytest.fixture
|
||||
def valid_auth_code(test_code_storage) -> tuple[str, dict]:
|
||||
"""
|
||||
Create a valid authorization code with metadata.
|
||||
Create a valid authorization code with metadata (authorization flow).
|
||||
|
||||
Args:
|
||||
test_code_storage: Code storage fixture
|
||||
@@ -143,6 +143,7 @@ def valid_auth_code(test_code_storage) -> tuple[str, dict]:
|
||||
metadata = {
|
||||
"client_id": "https://client.example.com",
|
||||
"redirect_uri": "https://client.example.com/callback",
|
||||
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||
"state": "xyz123",
|
||||
"me": "https://user.example.com",
|
||||
"scope": "",
|
||||
@@ -159,7 +160,7 @@ def valid_auth_code(test_code_storage) -> tuple[str, dict]:
|
||||
@pytest.fixture
|
||||
def expired_auth_code(test_code_storage) -> tuple[str, dict]:
|
||||
"""
|
||||
Create an expired authorization code.
|
||||
Create an expired authorization code (authorization flow).
|
||||
|
||||
Returns:
|
||||
Tuple of (code, metadata) where the code is expired
|
||||
@@ -169,6 +170,7 @@ def expired_auth_code(test_code_storage) -> tuple[str, dict]:
|
||||
metadata = {
|
||||
"client_id": "https://client.example.com",
|
||||
"redirect_uri": "https://client.example.com/callback",
|
||||
"response_type": "code", # Authorization flow
|
||||
"state": "xyz123",
|
||||
"me": "https://user.example.com",
|
||||
"scope": "",
|
||||
@@ -186,7 +188,7 @@ def expired_auth_code(test_code_storage) -> tuple[str, dict]:
|
||||
@pytest.fixture
|
||||
def used_auth_code(test_code_storage) -> tuple[str, dict]:
|
||||
"""
|
||||
Create an already-used authorization code.
|
||||
Create an already-used authorization code (authorization flow).
|
||||
|
||||
Returns:
|
||||
Tuple of (code, metadata) where the code is marked as used
|
||||
@@ -195,6 +197,7 @@ def used_auth_code(test_code_storage) -> tuple[str, dict]:
|
||||
metadata = {
|
||||
"client_id": "https://client.example.com",
|
||||
"redirect_uri": "https://client.example.com/callback",
|
||||
"response_type": "code", # Authorization flow
|
||||
"state": "xyz123",
|
||||
"me": "https://user.example.com",
|
||||
"scope": "",
|
||||
@@ -474,13 +477,13 @@ def malicious_client() -> dict[str, Any]:
|
||||
@pytest.fixture
|
||||
def valid_auth_request() -> dict[str, str]:
|
||||
"""
|
||||
Complete valid authorization request parameters.
|
||||
Complete valid authorization request parameters (for authorization flow).
|
||||
|
||||
Returns:
|
||||
Dict with all required authorization parameters
|
||||
"""
|
||||
return {
|
||||
"response_type": "code",
|
||||
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||
"client_id": "https://app.example.com",
|
||||
"redirect_uri": "https://app.example.com/callback",
|
||||
"state": "random_state_12345",
|
||||
|
||||
@@ -76,6 +76,7 @@ class TestCompleteAuthorizationFlow:
|
||||
consent_data = {
|
||||
"client_id": "https://app.example.com",
|
||||
"redirect_uri": "https://app.example.com/callback",
|
||||
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||
"state": "e2e_test_state_12345",
|
||||
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
|
||||
"code_challenge_method": "S256",
|
||||
@@ -139,6 +140,7 @@ class TestCompleteAuthorizationFlow:
|
||||
data={
|
||||
"client_id": "https://app.example.com",
|
||||
"redirect_uri": "https://app.example.com/callback",
|
||||
"response_type": "code", # For state preservation test
|
||||
"state": state,
|
||||
"code_challenge": "abc123",
|
||||
"code_challenge_method": "S256",
|
||||
@@ -163,6 +165,7 @@ class TestCompleteAuthorizationFlow:
|
||||
data={
|
||||
"client_id": "https://app.example.com",
|
||||
"redirect_uri": "https://app.example.com/callback",
|
||||
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||
"state": f"flow_{i}",
|
||||
"code_challenge": "abc123",
|
||||
"code_challenge_method": "S256",
|
||||
@@ -217,6 +220,7 @@ class TestErrorScenariosE2E:
|
||||
metadata = {
|
||||
"client_id": "https://app.example.com",
|
||||
"redirect_uri": "https://app.example.com/callback",
|
||||
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||
"state": "test",
|
||||
"me": "https://user.example.com",
|
||||
"scope": "",
|
||||
@@ -255,6 +259,7 @@ class TestErrorScenariosE2E:
|
||||
data={
|
||||
"client_id": "https://app.example.com",
|
||||
"redirect_uri": "https://app.example.com/callback",
|
||||
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||
"state": "test",
|
||||
"code_challenge": "abc123",
|
||||
"code_challenge_method": "S256",
|
||||
@@ -292,6 +297,7 @@ class TestErrorScenariosE2E:
|
||||
data={
|
||||
"client_id": "https://app.example.com",
|
||||
"redirect_uri": "https://app.example.com/callback",
|
||||
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||
"state": "test",
|
||||
"code_challenge": "abc123",
|
||||
"code_challenge_method": "S256",
|
||||
@@ -327,6 +333,7 @@ class TestTokenUsageE2E:
|
||||
data={
|
||||
"client_id": "https://app.example.com",
|
||||
"redirect_uri": "https://app.example.com/callback",
|
||||
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||
"state": "test",
|
||||
"code_challenge": "abc123",
|
||||
"code_challenge_method": "S256",
|
||||
@@ -362,6 +369,7 @@ class TestTokenUsageE2E:
|
||||
data={
|
||||
"client_id": "https://app.example.com",
|
||||
"redirect_uri": "https://app.example.com/callback",
|
||||
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||
"state": "test",
|
||||
"code_challenge": "abc123",
|
||||
"code_challenge_method": "S256",
|
||||
|
||||
532
tests/integration/api/test_authorization_verification.py
Normal file
532
tests/integration/api/test_authorization_verification.py
Normal file
@@ -0,0 +1,532 @@
|
||||
"""
|
||||
Integration tests for authorization endpoint domain verification.
|
||||
|
||||
Tests the security fix that requires domain verification before showing the consent page.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from unittest.mock import AsyncMock, Mock, patch
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def valid_auth_params():
|
||||
"""Valid authorization request parameters."""
|
||||
return {
|
||||
"client_id": "https://app.example.com",
|
||||
"redirect_uri": "https://app.example.com/callback",
|
||||
"response_type": "code",
|
||||
"state": "test123",
|
||||
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
|
||||
"code_challenge_method": "S256",
|
||||
"me": "https://user.example.com",
|
||||
}
|
||||
|
||||
|
||||
def create_mock_verification_service(start_success=True, verify_success=True, start_error="dns_verification_failed"):
|
||||
"""Create a mock verification service with configurable behavior."""
|
||||
mock_service = Mock()
|
||||
|
||||
if start_success:
|
||||
mock_service.start_verification.return_value = {
|
||||
"success": True,
|
||||
"email": "t***@example.com",
|
||||
"verification_method": "email"
|
||||
}
|
||||
else:
|
||||
mock_service.start_verification.return_value = {
|
||||
"success": False,
|
||||
"error": start_error
|
||||
}
|
||||
|
||||
if verify_success:
|
||||
mock_service.verify_email_code.return_value = {
|
||||
"success": True,
|
||||
"email": "test@example.com"
|
||||
}
|
||||
else:
|
||||
mock_service.verify_email_code.return_value = {
|
||||
"success": False,
|
||||
"error": "invalid_code"
|
||||
}
|
||||
|
||||
mock_service.code_storage = Mock()
|
||||
mock_service.code_storage.get.return_value = "test@example.com"
|
||||
mock_service.create_authorization_code.return_value = "test_auth_code_12345"
|
||||
|
||||
return mock_service
|
||||
|
||||
|
||||
def create_mock_happ_parser():
|
||||
"""Create a mock h-app parser."""
|
||||
from gondulf.services.happ_parser import ClientMetadata
|
||||
|
||||
mock_parser = Mock()
|
||||
mock_parser.fetch_and_parse = AsyncMock(return_value=ClientMetadata(
|
||||
name="Test Application",
|
||||
url="https://app.example.com",
|
||||
logo="https://app.example.com/logo.png"
|
||||
))
|
||||
return mock_parser
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def configured_app(monkeypatch, tmp_path):
|
||||
"""Create a fully configured app with fresh database."""
|
||||
db_path = tmp_path / "test.db"
|
||||
|
||||
monkeypatch.setenv("GONDULF_SECRET_KEY", "a" * 32)
|
||||
monkeypatch.setenv("GONDULF_BASE_URL", "https://auth.example.com")
|
||||
monkeypatch.setenv("GONDULF_DATABASE_URL", f"sqlite:///{db_path}")
|
||||
monkeypatch.setenv("GONDULF_DEBUG", "true")
|
||||
|
||||
from gondulf.main import app
|
||||
return app, db_path
|
||||
|
||||
|
||||
class TestUnverifiedDomainTriggersVerification:
|
||||
"""Tests that unverified domains trigger the verification flow."""
|
||||
|
||||
def test_unverified_domain_shows_verification_form(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test that an unverified domain shows the verification code form."""
|
||||
app, _ = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||
|
||||
mock_service = create_mock_verification_service(start_success=True)
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/authorize", params=valid_auth_params)
|
||||
|
||||
assert response.status_code == 200
|
||||
# Should show verification form, not consent form
|
||||
assert "Verify Your Identity" in response.text
|
||||
assert "verification code" in response.text.lower()
|
||||
# Should show masked email
|
||||
assert "t***@example.com" in response.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
def test_unverified_domain_preserves_auth_params(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test that authorization parameters are preserved in verification form."""
|
||||
app, _ = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||
|
||||
mock_service = create_mock_verification_service(start_success=True)
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/authorize", params=valid_auth_params)
|
||||
|
||||
assert response.status_code == 200
|
||||
# Check hidden fields contain auth params
|
||||
assert 'name="client_id"' in response.text
|
||||
assert 'name="redirect_uri"' in response.text
|
||||
assert 'name="state"' in response.text
|
||||
assert 'name="code_challenge"' in response.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
def test_unverified_domain_does_not_show_consent(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test that unverified domain does NOT show consent form directly."""
|
||||
app, _ = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||
|
||||
mock_service = create_mock_verification_service(start_success=True)
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/authorize", params=valid_auth_params)
|
||||
|
||||
assert response.status_code == 200
|
||||
# Should NOT show consent/authorization form
|
||||
assert "Authorization Request" not in response.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
class TestVerifiedDomainShowsConsent:
|
||||
"""Tests that verified domains skip verification and show consent."""
|
||||
|
||||
def test_verified_domain_shows_consent_page(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test that a verified domain shows consent page directly."""
|
||||
app, db_path = configured_app
|
||||
from gondulf.dependencies import get_happ_parser, get_database
|
||||
from gondulf.database.connection import Database
|
||||
from sqlalchemy import text
|
||||
|
||||
# Create database and insert verified domain
|
||||
db = Database(f"sqlite:///{db_path}")
|
||||
db.initialize()
|
||||
|
||||
with db.get_engine().begin() as conn:
|
||||
conn.execute(
|
||||
text("""
|
||||
INSERT INTO domains (domain, email, verification_code, verified, verified_at, two_factor)
|
||||
VALUES (:domain, :email, '', 1, :verified_at, 1)
|
||||
"""),
|
||||
{"domain": "user.example.com", "email": "test@example.com", "verified_at": datetime.utcnow()}
|
||||
)
|
||||
|
||||
# Override database to use same instance
|
||||
app.dependency_overrides[get_database] = lambda: db
|
||||
mock_parser = create_mock_happ_parser()
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/authorize", params=valid_auth_params)
|
||||
|
||||
# Should show consent page
|
||||
assert response.status_code == 200
|
||||
assert "Authorization Request" in response.text
|
||||
assert 'action="/authorize/consent"' in response.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
class TestVerificationCodeValidation:
|
||||
"""Tests for the verification code submission endpoint."""
|
||||
|
||||
def test_valid_code_shows_consent(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test that valid verification code shows consent page."""
|
||||
app, _ = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||
|
||||
mock_service = create_mock_verification_service(verify_success=True)
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
form_data = {
|
||||
"domain": "user.example.com",
|
||||
"code": "123456",
|
||||
"client_id": valid_auth_params["client_id"],
|
||||
"redirect_uri": valid_auth_params["redirect_uri"],
|
||||
"response_type": valid_auth_params["response_type"],
|
||||
"state": valid_auth_params["state"],
|
||||
"code_challenge": valid_auth_params["code_challenge"],
|
||||
"code_challenge_method": valid_auth_params["code_challenge_method"],
|
||||
"scope": "",
|
||||
"me": valid_auth_params["me"],
|
||||
}
|
||||
|
||||
response = client.post("/authorize/verify-code", data=form_data)
|
||||
|
||||
assert response.status_code == 200
|
||||
# Should show consent page after successful verification
|
||||
assert "Authorization Request" in response.text or "Authorize" in response.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
def test_invalid_code_shows_error_with_retry(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test that invalid code shows error and allows retry."""
|
||||
app, _ = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||
|
||||
mock_service = create_mock_verification_service(verify_success=False)
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
form_data = {
|
||||
"domain": "user.example.com",
|
||||
"code": "000000",
|
||||
"client_id": valid_auth_params["client_id"],
|
||||
"redirect_uri": valid_auth_params["redirect_uri"],
|
||||
"response_type": valid_auth_params["response_type"],
|
||||
"state": valid_auth_params["state"],
|
||||
"code_challenge": valid_auth_params["code_challenge"],
|
||||
"code_challenge_method": valid_auth_params["code_challenge_method"],
|
||||
"scope": "",
|
||||
"me": valid_auth_params["me"],
|
||||
}
|
||||
|
||||
response = client.post("/authorize/verify-code", data=form_data)
|
||||
|
||||
assert response.status_code == 200
|
||||
# Should show verify_code page with error
|
||||
assert "Invalid verification code" in response.text or "invalid" in response.text.lower()
|
||||
# Should still have the form for retry
|
||||
assert 'name="code"' in response.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
class TestDNSFailureHandling:
|
||||
"""Tests for DNS verification failure scenarios."""
|
||||
|
||||
def test_dns_failure_shows_instructions(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test that DNS verification failure shows helpful instructions."""
|
||||
app, db_path = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
|
||||
from gondulf.database.connection import Database
|
||||
from sqlalchemy import text
|
||||
|
||||
# Clear any pre-existing verified domain to ensure test isolation
|
||||
db = Database(f"sqlite:///{db_path}")
|
||||
db.initialize()
|
||||
with db.get_engine().begin() as conn:
|
||||
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
|
||||
|
||||
app.dependency_overrides[get_database] = lambda: db
|
||||
|
||||
mock_service = create_mock_verification_service(start_success=False, start_error="dns_verification_failed")
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/authorize", params=valid_auth_params)
|
||||
|
||||
assert response.status_code == 200
|
||||
# Should show error page with DNS instructions
|
||||
assert "DNS" in response.text or "dns" in response.text.lower()
|
||||
assert "TXT" in response.text
|
||||
assert "_gondulf" in response.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
class TestEmailFailureHandling:
|
||||
"""Tests for email discovery failure scenarios."""
|
||||
|
||||
def test_email_discovery_failure_shows_instructions(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test that email discovery failure shows helpful instructions."""
|
||||
app, db_path = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
|
||||
from gondulf.database.connection import Database
|
||||
from sqlalchemy import text
|
||||
|
||||
# Clear any pre-existing verified domain to ensure test isolation
|
||||
db = Database(f"sqlite:///{db_path}")
|
||||
db.initialize()
|
||||
with db.get_engine().begin() as conn:
|
||||
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
|
||||
|
||||
app.dependency_overrides[get_database] = lambda: db
|
||||
|
||||
mock_service = create_mock_verification_service(start_success=False, start_error="email_discovery_failed")
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/authorize", params=valid_auth_params)
|
||||
|
||||
assert response.status_code == 200
|
||||
# Should show error page with email instructions
|
||||
assert "email" in response.text.lower()
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
class TestFullVerificationFlow:
|
||||
"""Integration tests for the complete verification flow."""
|
||||
|
||||
def test_full_flow_new_domain(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test complete flow: unverified domain -> verify code -> consent."""
|
||||
app, db_path = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
|
||||
from gondulf.database.connection import Database
|
||||
from sqlalchemy import text
|
||||
|
||||
# Clear any pre-existing verified domain to ensure test isolation
|
||||
db = Database(f"sqlite:///{db_path}")
|
||||
db.initialize()
|
||||
with db.get_engine().begin() as conn:
|
||||
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
|
||||
|
||||
app.dependency_overrides[get_database] = lambda: db
|
||||
|
||||
mock_service = create_mock_verification_service(start_success=True, verify_success=True)
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
# Step 1: GET /authorize -> should show verification form
|
||||
response1 = client.get("/authorize", params=valid_auth_params)
|
||||
|
||||
assert response1.status_code == 200
|
||||
assert "Verify Your Identity" in response1.text
|
||||
|
||||
# Step 2: POST /authorize/verify-code -> should show consent
|
||||
form_data = {
|
||||
"domain": "user.example.com",
|
||||
"code": "123456",
|
||||
"client_id": valid_auth_params["client_id"],
|
||||
"redirect_uri": valid_auth_params["redirect_uri"],
|
||||
"response_type": valid_auth_params["response_type"],
|
||||
"state": valid_auth_params["state"],
|
||||
"code_challenge": valid_auth_params["code_challenge"],
|
||||
"code_challenge_method": valid_auth_params["code_challenge_method"],
|
||||
"scope": "",
|
||||
"me": valid_auth_params["me"],
|
||||
}
|
||||
|
||||
response2 = client.post("/authorize/verify-code", data=form_data)
|
||||
|
||||
assert response2.status_code == 200
|
||||
# Should show consent page
|
||||
assert "Authorization Request" in response2.text or "Authorize" in response2.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
def test_verification_code_retry_with_correct_code(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test that user can retry with correct code after failure."""
|
||||
app, _ = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||
|
||||
mock_service = Mock()
|
||||
# First verify_email_code call fails, second succeeds
|
||||
mock_service.verify_email_code.side_effect = [
|
||||
{"success": False, "error": "invalid_code"},
|
||||
{"success": True, "email": "test@example.com"}
|
||||
]
|
||||
mock_service.code_storage = Mock()
|
||||
mock_service.code_storage.get.return_value = "test@example.com"
|
||||
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
form_data = {
|
||||
"domain": "user.example.com",
|
||||
"code": "000000", # Wrong code
|
||||
"client_id": valid_auth_params["client_id"],
|
||||
"redirect_uri": valid_auth_params["redirect_uri"],
|
||||
"response_type": valid_auth_params["response_type"],
|
||||
"state": valid_auth_params["state"],
|
||||
"code_challenge": valid_auth_params["code_challenge"],
|
||||
"code_challenge_method": valid_auth_params["code_challenge_method"],
|
||||
"scope": "",
|
||||
"me": valid_auth_params["me"],
|
||||
}
|
||||
|
||||
# First attempt with wrong code
|
||||
response1 = client.post("/authorize/verify-code", data=form_data)
|
||||
assert response1.status_code == 200
|
||||
assert "Invalid" in response1.text or "invalid" in response1.text.lower()
|
||||
|
||||
# Second attempt with correct code
|
||||
form_data["code"] = "123456"
|
||||
response2 = client.post("/authorize/verify-code", data=form_data)
|
||||
assert response2.status_code == 200
|
||||
assert "Authorization Request" in response2.text or "Authorize" in response2.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
class TestSecurityRequirements:
|
||||
"""Tests for security requirements of the fix."""
|
||||
|
||||
def test_unverified_domain_never_sees_consent_directly(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Critical: Unverified domains must NEVER see consent page directly."""
|
||||
app, db_path = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
|
||||
from gondulf.database.connection import Database
|
||||
from sqlalchemy import text
|
||||
|
||||
# Clear any pre-existing verified domain to ensure test isolation
|
||||
db = Database(f"sqlite:///{db_path}")
|
||||
db.initialize()
|
||||
with db.get_engine().begin() as conn:
|
||||
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
|
||||
|
||||
app.dependency_overrides[get_database] = lambda: db
|
||||
|
||||
mock_service = create_mock_verification_service(start_success=True)
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/authorize", params=valid_auth_params)
|
||||
|
||||
# The consent page should NOT be shown
|
||||
assert "Authorization Request" not in response.text
|
||||
# Verify code page should be shown instead
|
||||
assert "Verify Your Identity" in response.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
def test_state_parameter_preserved_through_flow(
|
||||
self, configured_app, valid_auth_params
|
||||
):
|
||||
"""Test that state parameter is preserved through verification flow."""
|
||||
app, _ = configured_app
|
||||
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||
|
||||
mock_service = create_mock_verification_service(start_success=True)
|
||||
mock_parser = create_mock_happ_parser()
|
||||
|
||||
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||
|
||||
try:
|
||||
unique_state = "unique_state_abc123xyz"
|
||||
params = valid_auth_params.copy()
|
||||
params["state"] = unique_state
|
||||
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/authorize", params=params)
|
||||
|
||||
assert response.status_code == 200
|
||||
# State should be in hidden form field
|
||||
assert f'value="{unique_state}"' in response.text or f"value='{unique_state}'" in response.text
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
433
tests/integration/api/test_response_type_flows.py
Normal file
433
tests/integration/api/test_response_type_flows.py
Normal file
@@ -0,0 +1,433 @@
|
||||
"""
|
||||
Integration tests for IndieAuth response_type flows.
|
||||
|
||||
Tests the two IndieAuth flows per W3C specification:
|
||||
- Authentication flow (response_type=id): Code redeemed at authorization endpoint
|
||||
- Authorization flow (response_type=code): Code redeemed at token endpoint
|
||||
"""
|
||||
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def flow_app(monkeypatch, tmp_path):
|
||||
"""Create app for flow testing."""
|
||||
db_path = tmp_path / "test.db"
|
||||
|
||||
monkeypatch.setenv("GONDULF_SECRET_KEY", "a" * 32)
|
||||
monkeypatch.setenv("GONDULF_BASE_URL", "https://auth.example.com")
|
||||
monkeypatch.setenv("GONDULF_DATABASE_URL", f"sqlite:///{db_path}")
|
||||
monkeypatch.setenv("GONDULF_DEBUG", "true")
|
||||
|
||||
from gondulf.main import app
|
||||
return app
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def flow_client(flow_app):
|
||||
"""Create test client for flow tests."""
|
||||
with TestClient(flow_app) as client:
|
||||
yield client
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_happ_fetch():
|
||||
"""Mock h-app parser to avoid network calls."""
|
||||
from gondulf.services.happ_parser import ClientMetadata
|
||||
|
||||
metadata = ClientMetadata(
|
||||
name="Test Application",
|
||||
url="https://app.example.com",
|
||||
logo="https://app.example.com/logo.png"
|
||||
)
|
||||
|
||||
with patch('gondulf.services.happ_parser.HAppParser.fetch_and_parse', new_callable=AsyncMock) as mock:
|
||||
mock.return_value = metadata
|
||||
yield mock
|
||||
|
||||
|
||||
class TestResponseTypeValidation:
|
||||
"""Tests for response_type parameter validation."""
|
||||
|
||||
@pytest.fixture
|
||||
def base_params(self):
|
||||
"""Base authorization parameters without response_type."""
|
||||
return {
|
||||
"client_id": "https://app.example.com",
|
||||
"redirect_uri": "https://app.example.com/callback",
|
||||
"state": "test123",
|
||||
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
|
||||
"code_challenge_method": "S256",
|
||||
"me": "https://user.example.com",
|
||||
}
|
||||
|
||||
def test_response_type_id_accepted(self, flow_client, base_params, mock_happ_fetch):
|
||||
"""Test response_type=id is accepted."""
|
||||
params = base_params.copy()
|
||||
params["response_type"] = "id"
|
||||
|
||||
response = flow_client.get("/authorize", params=params)
|
||||
assert response.status_code == 200
|
||||
assert "text/html" in response.headers["content-type"]
|
||||
|
||||
def test_response_type_code_accepted(self, flow_client, base_params, mock_happ_fetch):
|
||||
"""Test response_type=code is accepted."""
|
||||
params = base_params.copy()
|
||||
params["response_type"] = "code"
|
||||
|
||||
response = flow_client.get("/authorize", params=params)
|
||||
assert response.status_code == 200
|
||||
assert "text/html" in response.headers["content-type"]
|
||||
|
||||
def test_response_type_defaults_to_id(self, flow_client, base_params, mock_happ_fetch):
|
||||
"""Test missing response_type defaults to 'id'."""
|
||||
# No response_type in params
|
||||
response = flow_client.get("/authorize", params=base_params)
|
||||
assert response.status_code == 200
|
||||
# Form should contain response_type=id
|
||||
assert 'value="id"' in response.text
|
||||
|
||||
def test_invalid_response_type_rejected(self, flow_client, base_params, mock_happ_fetch):
|
||||
"""Test invalid response_type redirects with error."""
|
||||
params = base_params.copy()
|
||||
params["response_type"] = "token" # Invalid
|
||||
|
||||
response = flow_client.get("/authorize", params=params, follow_redirects=False)
|
||||
|
||||
assert response.status_code == 302
|
||||
location = response.headers["location"]
|
||||
assert "error=unsupported_response_type" in location
|
||||
assert "state=test123" in location
|
||||
|
||||
def test_consent_form_includes_response_type(self, flow_client, base_params, mock_happ_fetch):
|
||||
"""Test consent form includes response_type hidden field."""
|
||||
params = base_params.copy()
|
||||
params["response_type"] = "code"
|
||||
|
||||
response = flow_client.get("/authorize", params=params)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert 'name="response_type"' in response.text
|
||||
assert 'value="code"' in response.text
|
||||
|
||||
|
||||
class TestAuthenticationFlow:
|
||||
"""Tests for authentication flow (response_type=id)."""
|
||||
|
||||
@pytest.fixture
|
||||
def auth_code_id_flow(self, flow_client):
|
||||
"""Create an authorization code for the authentication flow."""
|
||||
consent_data = {
|
||||
"client_id": "https://app.example.com",
|
||||
"redirect_uri": "https://app.example.com/callback",
|
||||
"response_type": "id", # Authentication flow
|
||||
"state": "test123",
|
||||
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
|
||||
"code_challenge_method": "S256",
|
||||
"scope": "",
|
||||
"me": "https://user.example.com",
|
||||
}
|
||||
|
||||
response = flow_client.post(
|
||||
"/authorize/consent",
|
||||
data=consent_data,
|
||||
follow_redirects=False
|
||||
)
|
||||
|
||||
assert response.status_code == 302
|
||||
location = response.headers["location"]
|
||||
|
||||
from tests.conftest import extract_code_from_redirect
|
||||
code = extract_code_from_redirect(location)
|
||||
return code, consent_data
|
||||
|
||||
def test_auth_code_redemption_at_authorization_endpoint(self, flow_client, auth_code_id_flow):
|
||||
"""Test authentication flow code is redeemed at authorization endpoint."""
|
||||
code, consent_data = auth_code_id_flow
|
||||
|
||||
response = flow_client.post(
|
||||
"/authorize",
|
||||
data={
|
||||
"code": code,
|
||||
"client_id": consent_data["client_id"],
|
||||
}
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert "me" in data
|
||||
assert data["me"] == "https://user.example.com"
|
||||
# Should NOT have access_token
|
||||
assert "access_token" not in data
|
||||
|
||||
def test_auth_flow_returns_only_me(self, flow_client, auth_code_id_flow):
|
||||
"""Test authentication response contains only 'me' field."""
|
||||
code, consent_data = auth_code_id_flow
|
||||
|
||||
response = flow_client.post(
|
||||
"/authorize",
|
||||
data={
|
||||
"code": code,
|
||||
"client_id": consent_data["client_id"],
|
||||
}
|
||||
)
|
||||
|
||||
data = response.json()
|
||||
assert set(data.keys()) == {"me"}
|
||||
|
||||
def test_auth_flow_code_single_use(self, flow_client, auth_code_id_flow):
|
||||
"""Test authentication code can only be used once."""
|
||||
code, consent_data = auth_code_id_flow
|
||||
|
||||
# First use - should succeed
|
||||
response1 = flow_client.post(
|
||||
"/authorize",
|
||||
data={
|
||||
"code": code,
|
||||
"client_id": consent_data["client_id"],
|
||||
}
|
||||
)
|
||||
assert response1.status_code == 200
|
||||
|
||||
# Second use - should fail
|
||||
response2 = flow_client.post(
|
||||
"/authorize",
|
||||
data={
|
||||
"code": code,
|
||||
"client_id": consent_data["client_id"],
|
||||
}
|
||||
)
|
||||
assert response2.status_code == 400
|
||||
assert response2.json()["error"] == "invalid_grant"
|
||||
|
||||
def test_auth_flow_client_id_mismatch_rejected(self, flow_client, auth_code_id_flow):
|
||||
"""Test wrong client_id is rejected."""
|
||||
code, _ = auth_code_id_flow
|
||||
|
||||
response = flow_client.post(
|
||||
"/authorize",
|
||||
data={
|
||||
"code": code,
|
||||
"client_id": "https://wrong.example.com",
|
||||
}
|
||||
)
|
||||
|
||||
assert response.status_code == 400
|
||||
assert response.json()["error"] == "invalid_client"
|
||||
|
||||
def test_auth_flow_redirect_uri_mismatch_rejected(self, flow_client, auth_code_id_flow):
|
||||
"""Test wrong redirect_uri is rejected when provided."""
|
||||
code, consent_data = auth_code_id_flow
|
||||
|
||||
response = flow_client.post(
|
||||
"/authorize",
|
||||
data={
|
||||
"code": code,
|
||||
"client_id": consent_data["client_id"],
|
||||
"redirect_uri": "https://wrong.example.com/callback",
|
||||
}
|
||||
)
|
||||
|
||||
assert response.status_code == 400
|
||||
assert response.json()["error"] == "invalid_grant"
|
||||
|
||||
def test_auth_flow_id_code_rejected_at_token_endpoint(self, flow_client, auth_code_id_flow):
|
||||
"""Test authentication flow code is rejected at token endpoint."""
|
||||
code, consent_data = auth_code_id_flow
|
||||
|
||||
response = flow_client.post(
|
||||
"/token",
|
||||
data={
|
||||
"grant_type": "authorization_code",
|
||||
"code": code,
|
||||
"client_id": consent_data["client_id"],
|
||||
"redirect_uri": consent_data["redirect_uri"],
|
||||
}
|
||||
)
|
||||
|
||||
assert response.status_code == 400
|
||||
# Should indicate wrong endpoint
|
||||
data = response.json()["detail"]
|
||||
assert data["error"] == "invalid_grant"
|
||||
assert "authorization endpoint" in data["error_description"]
|
||||
|
||||
def test_auth_flow_cache_headers(self, flow_client, auth_code_id_flow):
|
||||
"""Test authentication response has no-cache headers."""
|
||||
code, consent_data = auth_code_id_flow
|
||||
|
||||
response = flow_client.post(
|
||||
"/authorize",
|
||||
data={
|
||||
"code": code,
|
||||
"client_id": consent_data["client_id"],
|
||||
}
|
||||
)
|
||||
|
||||
assert response.headers.get("Cache-Control") == "no-store"
|
||||
assert response.headers.get("Pragma") == "no-cache"
|
||||
|
||||
|
||||
class TestAuthorizationFlow:
|
||||
"""Tests for authorization flow (response_type=code)."""
|
||||
|
||||
@pytest.fixture
|
||||
def auth_code_code_flow(self, flow_client):
|
||||
"""Create an authorization code for the authorization flow."""
|
||||
consent_data = {
|
||||
"client_id": "https://app.example.com",
|
||||
"redirect_uri": "https://app.example.com/callback",
|
||||
"response_type": "code", # Authorization flow
|
||||
"state": "test456",
|
||||
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
|
||||
"code_challenge_method": "S256",
|
||||
"scope": "profile",
|
||||
"me": "https://user.example.com",
|
||||
}
|
||||
|
||||
response = flow_client.post(
|
||||
"/authorize/consent",
|
||||
data=consent_data,
|
||||
follow_redirects=False
|
||||
)
|
||||
|
||||
assert response.status_code == 302
|
||||
location = response.headers["location"]
|
||||
|
||||
from tests.conftest import extract_code_from_redirect
|
||||
code = extract_code_from_redirect(location)
|
||||
return code, consent_data
|
||||
|
||||
def test_code_flow_redemption_at_token_endpoint(self, flow_client, auth_code_code_flow):
|
||||
"""Test authorization flow code is redeemed at token endpoint."""
|
||||
code, consent_data = auth_code_code_flow
|
||||
|
||||
response = flow_client.post(
|
||||
"/token",
|
||||
data={
|
||||
"grant_type": "authorization_code",
|
||||
"code": code,
|
||||
"client_id": consent_data["client_id"],
|
||||
"redirect_uri": consent_data["redirect_uri"],
|
||||
}
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert "access_token" in data
|
||||
assert "me" in data
|
||||
assert data["me"] == "https://user.example.com"
|
||||
assert data["token_type"] == "Bearer"
|
||||
|
||||
def test_code_flow_code_rejected_at_authorization_endpoint(self, flow_client, auth_code_code_flow):
|
||||
"""Test authorization flow code is rejected at authorization endpoint."""
|
||||
code, consent_data = auth_code_code_flow
|
||||
|
||||
response = flow_client.post(
|
||||
"/authorize",
|
||||
data={
|
||||
"code": code,
|
||||
"client_id": consent_data["client_id"],
|
||||
}
|
||||
)
|
||||
|
||||
assert response.status_code == 400
|
||||
# Should indicate wrong endpoint
|
||||
data = response.json()
|
||||
assert data["error"] == "invalid_grant"
|
||||
assert "token endpoint" in data["error_description"]
|
||||
|
||||
def test_code_flow_single_use(self, flow_client, auth_code_code_flow):
|
||||
"""Test authorization code can only be used once."""
|
||||
code, consent_data = auth_code_code_flow
|
||||
|
||||
# First use - should succeed
|
||||
response1 = flow_client.post(
|
||||
"/token",
|
||||
data={
|
||||
"grant_type": "authorization_code",
|
||||
"code": code,
|
||||
"client_id": consent_data["client_id"],
|
||||
"redirect_uri": consent_data["redirect_uri"],
|
||||
}
|
||||
)
|
||||
assert response1.status_code == 200
|
||||
|
||||
# Second use - should fail
|
||||
response2 = flow_client.post(
|
||||
"/token",
|
||||
data={
|
||||
"grant_type": "authorization_code",
|
||||
"code": code,
|
||||
"client_id": consent_data["client_id"],
|
||||
"redirect_uri": consent_data["redirect_uri"],
|
||||
}
|
||||
)
|
||||
assert response2.status_code == 400
|
||||
|
||||
|
||||
class TestMetadataEndpoint:
|
||||
"""Tests for server metadata endpoint."""
|
||||
|
||||
def test_metadata_includes_both_response_types(self, flow_client):
|
||||
"""Test metadata advertises both response types."""
|
||||
response = flow_client.get("/.well-known/oauth-authorization-server")
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert "response_types_supported" in data
|
||||
assert "code" in data["response_types_supported"]
|
||||
assert "id" in data["response_types_supported"]
|
||||
|
||||
def test_metadata_includes_code_challenge_method(self, flow_client):
|
||||
"""Test metadata advertises S256 code challenge method."""
|
||||
response = flow_client.get("/.well-known/oauth-authorization-server")
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert "code_challenge_methods_supported" in data
|
||||
assert "S256" in data["code_challenge_methods_supported"]
|
||||
|
||||
|
||||
class TestErrorScenarios:
|
||||
"""Tests for error handling in both flows."""
|
||||
|
||||
def test_invalid_code_at_authorization_endpoint(self, flow_client):
|
||||
"""Test invalid code returns error at authorization endpoint."""
|
||||
response = flow_client.post(
|
||||
"/authorize",
|
||||
data={
|
||||
"code": "invalid_code_12345",
|
||||
"client_id": "https://app.example.com",
|
||||
}
|
||||
)
|
||||
|
||||
assert response.status_code == 400
|
||||
data = response.json()
|
||||
assert data["error"] == "invalid_grant"
|
||||
|
||||
def test_missing_code_at_authorization_endpoint(self, flow_client):
|
||||
"""Test missing code returns validation error."""
|
||||
response = flow_client.post(
|
||||
"/authorize",
|
||||
data={
|
||||
"client_id": "https://app.example.com",
|
||||
}
|
||||
)
|
||||
|
||||
# FastAPI returns 422 for missing required form field
|
||||
assert response.status_code == 422
|
||||
|
||||
def test_missing_client_id_at_authorization_endpoint(self, flow_client):
|
||||
"""Test missing client_id returns validation error."""
|
||||
response = flow_client.post(
|
||||
"/authorize",
|
||||
data={
|
||||
"code": "some_code",
|
||||
}
|
||||
)
|
||||
|
||||
# FastAPI returns 422 for missing required form field
|
||||
assert response.status_code == 422
|
||||
@@ -32,13 +32,14 @@ def token_client(token_app):
|
||||
|
||||
@pytest.fixture
|
||||
def setup_auth_code(token_app, test_code_storage):
|
||||
"""Setup a valid authorization code for testing."""
|
||||
"""Setup a valid authorization code for testing (authorization flow)."""
|
||||
from gondulf.dependencies import get_code_storage
|
||||
|
||||
code = "integration_test_code_12345"
|
||||
metadata = {
|
||||
"client_id": "https://app.example.com",
|
||||
"redirect_uri": "https://app.example.com/callback",
|
||||
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||
"state": "xyz123",
|
||||
"me": "https://user.example.com",
|
||||
"scope": "",
|
||||
@@ -212,6 +213,7 @@ class TestTokenExchangeErrors:
|
||||
metadata = {
|
||||
"client_id": "https://app.example.com",
|
||||
"redirect_uri": "https://app.example.com/callback",
|
||||
"response_type": "code", # Authorization flow
|
||||
"state": "xyz123",
|
||||
"me": "https://user.example.com",
|
||||
"scope": "",
|
||||
|
||||
@@ -78,11 +78,11 @@ class TestMetadataEndpoint:
|
||||
assert data["token_endpoint"] == "https://auth.example.com/token"
|
||||
|
||||
def test_metadata_response_types_supported(self, client):
|
||||
"""Test response_types_supported contains only 'code'."""
|
||||
"""Test response_types_supported contains both 'code' and 'id'."""
|
||||
response = client.get("/.well-known/oauth-authorization-server")
|
||||
data = response.json()
|
||||
|
||||
assert data["response_types_supported"] == ["code"]
|
||||
assert data["response_types_supported"] == ["code", "id"]
|
||||
|
||||
def test_metadata_grant_types_supported(self, client):
|
||||
"""Test grant_types_supported contains only 'authorization_code'."""
|
||||
@@ -91,12 +91,12 @@ class TestMetadataEndpoint:
|
||||
|
||||
assert data["grant_types_supported"] == ["authorization_code"]
|
||||
|
||||
def test_metadata_code_challenge_methods_empty(self, client):
|
||||
"""Test code_challenge_methods_supported is empty array."""
|
||||
def test_metadata_code_challenge_methods_supported(self, client):
|
||||
"""Test code_challenge_methods_supported contains S256."""
|
||||
response = client.get("/.well-known/oauth-authorization-server")
|
||||
data = response.json()
|
||||
|
||||
assert data["code_challenge_methods_supported"] == []
|
||||
assert data["code_challenge_methods_supported"] == ["S256"]
|
||||
|
||||
def test_metadata_token_endpoint_auth_methods(self, client):
|
||||
"""Test token_endpoint_auth_methods_supported contains 'none'."""
|
||||
|
||||
@@ -71,11 +71,12 @@ def client(test_config, test_database, test_code_storage, test_token_service):
|
||||
|
||||
@pytest.fixture
|
||||
def valid_auth_code(test_code_storage):
|
||||
"""Create a valid authorization code."""
|
||||
"""Create a valid authorization code (authorization flow)."""
|
||||
code = "test_auth_code_12345"
|
||||
metadata = {
|
||||
"client_id": "https://client.example.com",
|
||||
"redirect_uri": "https://client.example.com/callback",
|
||||
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||
"state": "xyz123",
|
||||
"me": "https://user.example.com",
|
||||
"scope": "",
|
||||
|
||||
Reference in New Issue
Block a user