Compare commits
3 Commits
v1.0.0-rc.
...
v1.0.0-rc.
| Author | SHA1 | Date | |
|---|---|---|---|
| 9b50f359a6 | |||
| 8dddc73826 | |||
| 052d3ad3e1 |
@@ -1,17 +1,28 @@
|
|||||||
"""Authorization endpoint for OAuth 2.0 / IndieAuth authorization code flow."""
|
"""Authorization endpoint for OAuth 2.0 / IndieAuth authorization code flow.
|
||||||
|
|
||||||
|
Supports both IndieAuth flows per W3C specification:
|
||||||
|
- Authentication (response_type=id): Returns user identity only, code redeemed at authorization endpoint
|
||||||
|
- Authorization (response_type=code): Returns access token, code redeemed at token endpoint
|
||||||
|
"""
|
||||||
import logging
|
import logging
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Optional
|
||||||
from urllib.parse import urlencode
|
from urllib.parse import urlencode
|
||||||
|
|
||||||
from fastapi import APIRouter, Depends, Form, Request
|
from fastapi import APIRouter, Depends, Form, Request, Response
|
||||||
from fastapi.responses import HTMLResponse, RedirectResponse
|
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
|
||||||
from fastapi.templating import Jinja2Templates
|
from fastapi.templating import Jinja2Templates
|
||||||
|
from pydantic import BaseModel
|
||||||
|
from sqlalchemy import text
|
||||||
|
|
||||||
from gondulf.database.connection import Database
|
from gondulf.database.connection import Database
|
||||||
from gondulf.dependencies import get_database, get_happ_parser, get_verification_service
|
from gondulf.dependencies import get_code_storage, get_database, get_happ_parser, get_verification_service
|
||||||
from gondulf.services.domain_verification import DomainVerificationService
|
from gondulf.services.domain_verification import DomainVerificationService
|
||||||
from gondulf.services.happ_parser import HAppParser
|
from gondulf.services.happ_parser import HAppParser
|
||||||
|
from gondulf.storage import CodeStore
|
||||||
from gondulf.utils.validation import (
|
from gondulf.utils.validation import (
|
||||||
extract_domain_from_url,
|
extract_domain_from_url,
|
||||||
|
mask_email,
|
||||||
normalize_client_id,
|
normalize_client_id,
|
||||||
validate_redirect_uri,
|
validate_redirect_uri,
|
||||||
)
|
)
|
||||||
@@ -21,6 +32,76 @@ logger = logging.getLogger("gondulf.authorization")
|
|||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
templates = Jinja2Templates(directory="src/gondulf/templates")
|
templates = Jinja2Templates(directory="src/gondulf/templates")
|
||||||
|
|
||||||
|
# Valid response types per IndieAuth spec
|
||||||
|
VALID_RESPONSE_TYPES = {"id", "code"}
|
||||||
|
|
||||||
|
|
||||||
|
class AuthenticationResponse(BaseModel):
|
||||||
|
"""
|
||||||
|
IndieAuth authentication response (response_type=id flow).
|
||||||
|
|
||||||
|
Per W3C IndieAuth specification Section 5.3.3:
|
||||||
|
https://www.w3.org/TR/indieauth/#authentication-response
|
||||||
|
"""
|
||||||
|
me: str
|
||||||
|
|
||||||
|
|
||||||
|
async def check_domain_verified(database: Database, domain: str) -> bool:
|
||||||
|
"""
|
||||||
|
Check if domain is verified in the database.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
database: Database service
|
||||||
|
domain: Domain to check (e.g., "example.com")
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if domain is verified, False otherwise
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
engine = database.get_engine()
|
||||||
|
with engine.connect() as conn:
|
||||||
|
result = conn.execute(
|
||||||
|
text("SELECT verified FROM domains WHERE domain = :domain AND verified = 1"),
|
||||||
|
{"domain": domain}
|
||||||
|
)
|
||||||
|
row = result.fetchone()
|
||||||
|
return row is not None
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to check domain verification: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
async def store_verified_domain(database: Database, domain: str, email: str) -> None:
|
||||||
|
"""
|
||||||
|
Store verified domain in database.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
database: Database service
|
||||||
|
domain: Verified domain
|
||||||
|
email: Email used for verification (for audit)
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
engine = database.get_engine()
|
||||||
|
now = datetime.utcnow()
|
||||||
|
with engine.begin() as conn:
|
||||||
|
# Use INSERT OR REPLACE for SQLite
|
||||||
|
conn.execute(
|
||||||
|
text("""
|
||||||
|
INSERT OR REPLACE INTO domains
|
||||||
|
(domain, email, verification_code, verified, verified_at, two_factor)
|
||||||
|
VALUES (:domain, :email, '', 1, :verified_at, 1)
|
||||||
|
"""),
|
||||||
|
{
|
||||||
|
"domain": domain,
|
||||||
|
"email": email,
|
||||||
|
"verified_at": now
|
||||||
|
}
|
||||||
|
)
|
||||||
|
logger.info(f"Stored verified domain: {domain}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to store verified domain: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
@router.get("/authorize")
|
@router.get("/authorize")
|
||||||
async def authorize_get(
|
async def authorize_get(
|
||||||
@@ -34,7 +115,8 @@ async def authorize_get(
|
|||||||
scope: str | None = None,
|
scope: str | None = None,
|
||||||
me: str | None = None,
|
me: str | None = None,
|
||||||
database: Database = Depends(get_database),
|
database: Database = Depends(get_database),
|
||||||
happ_parser: HAppParser = Depends(get_happ_parser)
|
happ_parser: HAppParser = Depends(get_happ_parser),
|
||||||
|
verification_service: DomainVerificationService = Depends(get_verification_service)
|
||||||
) -> HTMLResponse:
|
) -> HTMLResponse:
|
||||||
"""
|
"""
|
||||||
Handle authorization request (GET).
|
Handle authorization request (GET).
|
||||||
@@ -42,20 +124,26 @@ async def authorize_get(
|
|||||||
Validates client_id, redirect_uri, and required parameters.
|
Validates client_id, redirect_uri, and required parameters.
|
||||||
Shows consent form if domain is verified, or verification form if not.
|
Shows consent form if domain is verified, or verification form if not.
|
||||||
|
|
||||||
|
Supports two IndieAuth flows per W3C specification:
|
||||||
|
- response_type=id (default): Authentication only, returns user identity
|
||||||
|
- response_type=code: Authorization, returns access token
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
request: FastAPI request object
|
request: FastAPI request object
|
||||||
client_id: Client application identifier
|
client_id: Client application identifier
|
||||||
redirect_uri: Callback URI for client
|
redirect_uri: Callback URI for client
|
||||||
response_type: Must be "code"
|
response_type: "id" (default) for authentication, "code" for authorization
|
||||||
state: Client state parameter
|
state: Client state parameter
|
||||||
code_challenge: PKCE code challenge
|
code_challenge: PKCE code challenge
|
||||||
code_challenge_method: PKCE method (S256)
|
code_challenge_method: PKCE method (S256)
|
||||||
scope: Requested scope
|
scope: Requested scope (only meaningful for response_type=code)
|
||||||
me: User identity URL
|
me: User identity URL
|
||||||
database: Database service
|
database: Database service
|
||||||
|
happ_parser: H-app parser for client metadata
|
||||||
|
verification_service: Domain verification service
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
HTML response with consent form or error page
|
HTML response with consent form, verification form, or error page
|
||||||
"""
|
"""
|
||||||
# Validate required parameters (pre-client validation)
|
# Validate required parameters (pre-client validation)
|
||||||
if not client_id:
|
if not client_id:
|
||||||
@@ -108,11 +196,13 @@ async def authorize_get(
|
|||||||
|
|
||||||
# From here on, redirect errors to client via OAuth error redirect
|
# From here on, redirect errors to client via OAuth error redirect
|
||||||
|
|
||||||
# Validate response_type
|
# Validate response_type - default to "id" if not provided (per IndieAuth spec)
|
||||||
if response_type != "code":
|
effective_response_type = response_type or "id"
|
||||||
|
|
||||||
|
if effective_response_type not in VALID_RESPONSE_TYPES:
|
||||||
error_params = {
|
error_params = {
|
||||||
"error": "unsupported_response_type",
|
"error": "unsupported_response_type",
|
||||||
"error_description": "Only response_type=code is supported",
|
"error_description": f"response_type must be 'id' or 'code', got '{response_type}'",
|
||||||
"state": state or ""
|
"state": state or ""
|
||||||
}
|
}
|
||||||
redirect_url = f"{redirect_uri}?{urlencode(error_params)}"
|
redirect_url = f"{redirect_uri}?{urlencode(error_params)}"
|
||||||
@@ -148,9 +238,9 @@ async def authorize_get(
|
|||||||
redirect_url = f"{redirect_uri}?{urlencode(error_params)}"
|
redirect_url = f"{redirect_uri}?{urlencode(error_params)}"
|
||||||
return RedirectResponse(url=redirect_url, status_code=302)
|
return RedirectResponse(url=redirect_url, status_code=302)
|
||||||
|
|
||||||
# Validate me URL format
|
# Validate me URL format and extract domain
|
||||||
try:
|
try:
|
||||||
extract_domain_from_url(me)
|
domain = extract_domain_from_url(me)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
error_params = {
|
error_params = {
|
||||||
"error": "invalid_request",
|
"error": "invalid_request",
|
||||||
@@ -160,11 +250,71 @@ async def authorize_get(
|
|||||||
redirect_url = f"{redirect_uri}?{urlencode(error_params)}"
|
redirect_url = f"{redirect_uri}?{urlencode(error_params)}"
|
||||||
return RedirectResponse(url=redirect_url, status_code=302)
|
return RedirectResponse(url=redirect_url, status_code=302)
|
||||||
|
|
||||||
# Check if domain is verified
|
# SECURITY FIX: Check if domain is verified before showing consent
|
||||||
# For Phase 2, we'll show consent form immediately (domain verification happens separately)
|
is_verified = await check_domain_verified(database, domain)
|
||||||
# In Phase 3, we'll check database for verified domains
|
|
||||||
|
if not is_verified:
|
||||||
|
logger.info(f"Domain {domain} not verified, starting verification")
|
||||||
|
|
||||||
|
# Start two-factor verification
|
||||||
|
result = verification_service.start_verification(domain, me)
|
||||||
|
|
||||||
|
if not result["success"]:
|
||||||
|
# Verification cannot start (DNS failed, no rel=me, etc)
|
||||||
|
error_message = result.get("error", "verification_failed")
|
||||||
|
|
||||||
|
# Map error codes to user-friendly messages
|
||||||
|
error_messages = {
|
||||||
|
"dns_verification_failed": "DNS verification failed. Please add the required TXT record.",
|
||||||
|
"email_discovery_failed": "Could not find an email address on your homepage. Please add a rel='me' link to your email.",
|
||||||
|
"invalid_email_format": "The email address discovered on your homepage is invalid.",
|
||||||
|
"email_send_failed": "Failed to send verification email. Please try again."
|
||||||
|
}
|
||||||
|
friendly_error = error_messages.get(error_message, error_message)
|
||||||
|
|
||||||
|
logger.warning(f"Verification start failed for domain={domain}: {error_message}")
|
||||||
|
|
||||||
|
return templates.TemplateResponse(
|
||||||
|
"verification_error.html",
|
||||||
|
{
|
||||||
|
"request": request,
|
||||||
|
"error": friendly_error,
|
||||||
|
"domain": domain,
|
||||||
|
"client_id": normalized_client_id,
|
||||||
|
"redirect_uri": redirect_uri,
|
||||||
|
"response_type": effective_response_type,
|
||||||
|
"state": state or "",
|
||||||
|
"code_challenge": code_challenge,
|
||||||
|
"code_challenge_method": code_challenge_method,
|
||||||
|
"scope": scope or "",
|
||||||
|
"me": me
|
||||||
|
},
|
||||||
|
status_code=200
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verification started - show code entry form
|
||||||
|
logger.info(f"Verification code sent for domain={domain}")
|
||||||
|
|
||||||
|
return templates.TemplateResponse(
|
||||||
|
"verify_code.html",
|
||||||
|
{
|
||||||
|
"request": request,
|
||||||
|
"masked_email": result["email"],
|
||||||
|
"domain": domain,
|
||||||
|
"client_id": normalized_client_id,
|
||||||
|
"redirect_uri": redirect_uri,
|
||||||
|
"response_type": effective_response_type,
|
||||||
|
"state": state or "",
|
||||||
|
"code_challenge": code_challenge,
|
||||||
|
"code_challenge_method": code_challenge_method,
|
||||||
|
"scope": scope or "",
|
||||||
|
"me": me
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Domain is verified - fetch client metadata and show consent form
|
||||||
|
logger.info(f"Domain {domain} is verified, showing consent page")
|
||||||
|
|
||||||
# Fetch client metadata (h-app microformat)
|
|
||||||
client_metadata = None
|
client_metadata = None
|
||||||
try:
|
try:
|
||||||
client_metadata = await happ_parser.fetch_and_parse(normalized_client_id)
|
client_metadata = await happ_parser.fetch_and_parse(normalized_client_id)
|
||||||
@@ -180,6 +330,7 @@ async def authorize_get(
|
|||||||
"request": request,
|
"request": request,
|
||||||
"client_id": normalized_client_id,
|
"client_id": normalized_client_id,
|
||||||
"redirect_uri": redirect_uri,
|
"redirect_uri": redirect_uri,
|
||||||
|
"response_type": effective_response_type,
|
||||||
"state": state or "",
|
"state": state or "",
|
||||||
"code_challenge": code_challenge,
|
"code_challenge": code_challenge,
|
||||||
"code_challenge_method": code_challenge_method,
|
"code_challenge_method": code_challenge_method,
|
||||||
@@ -190,11 +341,124 @@ async def authorize_get(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/authorize/verify-code")
|
||||||
|
async def authorize_verify_code(
|
||||||
|
request: Request,
|
||||||
|
domain: str = Form(...),
|
||||||
|
code: str = Form(...),
|
||||||
|
client_id: str = Form(...),
|
||||||
|
redirect_uri: str = Form(...),
|
||||||
|
response_type: str = Form("id"),
|
||||||
|
state: str = Form(...),
|
||||||
|
code_challenge: str = Form(...),
|
||||||
|
code_challenge_method: str = Form(...),
|
||||||
|
scope: str = Form(""),
|
||||||
|
me: str = Form(...),
|
||||||
|
database: Database = Depends(get_database),
|
||||||
|
verification_service: DomainVerificationService = Depends(get_verification_service),
|
||||||
|
happ_parser: HAppParser = Depends(get_happ_parser)
|
||||||
|
) -> HTMLResponse:
|
||||||
|
"""
|
||||||
|
Handle verification code submission during authorization flow.
|
||||||
|
|
||||||
|
This endpoint is called when user submits the 6-digit email verification code.
|
||||||
|
On success, shows consent page. On failure, shows code entry form with error.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
request: FastAPI request object
|
||||||
|
domain: Domain being verified
|
||||||
|
code: 6-digit verification code from email
|
||||||
|
client_id: Client application identifier
|
||||||
|
redirect_uri: Callback URI
|
||||||
|
response_type: "id" for authentication, "code" for authorization
|
||||||
|
state: Client state parameter
|
||||||
|
code_challenge: PKCE code challenge
|
||||||
|
code_challenge_method: PKCE method
|
||||||
|
scope: Requested scope
|
||||||
|
me: User identity URL
|
||||||
|
database: Database service
|
||||||
|
verification_service: Domain verification service
|
||||||
|
happ_parser: H-app parser for client metadata
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
HTML response: consent page on success, code form with error on failure
|
||||||
|
"""
|
||||||
|
logger.info(f"Verification code submission for domain={domain}")
|
||||||
|
|
||||||
|
# Verify the code
|
||||||
|
result = verification_service.verify_email_code(domain, code)
|
||||||
|
|
||||||
|
if not result["success"]:
|
||||||
|
logger.warning(f"Verification code invalid for domain={domain}: {result.get('error')}")
|
||||||
|
|
||||||
|
# Get masked email for display
|
||||||
|
email = verification_service.code_storage.get(f"email_addr:{domain}")
|
||||||
|
masked = mask_email(email) if email else "unknown"
|
||||||
|
|
||||||
|
# Map error codes to user-friendly messages
|
||||||
|
error_messages = {
|
||||||
|
"invalid_code": "Invalid verification code. Please check and try again.",
|
||||||
|
"email_not_found": "Verification session expired. Please start over."
|
||||||
|
}
|
||||||
|
error_message = result.get("error", "invalid_code")
|
||||||
|
friendly_error = error_messages.get(error_message, error_message)
|
||||||
|
|
||||||
|
return templates.TemplateResponse(
|
||||||
|
"verify_code.html",
|
||||||
|
{
|
||||||
|
"request": request,
|
||||||
|
"error": friendly_error,
|
||||||
|
"masked_email": masked,
|
||||||
|
"domain": domain,
|
||||||
|
"client_id": client_id,
|
||||||
|
"redirect_uri": redirect_uri,
|
||||||
|
"response_type": response_type,
|
||||||
|
"state": state,
|
||||||
|
"code_challenge": code_challenge,
|
||||||
|
"code_challenge_method": code_challenge_method,
|
||||||
|
"scope": scope,
|
||||||
|
"me": me
|
||||||
|
},
|
||||||
|
status_code=200
|
||||||
|
)
|
||||||
|
|
||||||
|
# Code valid - store verified domain in database
|
||||||
|
email = result.get("email", "")
|
||||||
|
await store_verified_domain(database, domain, email)
|
||||||
|
|
||||||
|
logger.info(f"Domain verified successfully: {domain}")
|
||||||
|
|
||||||
|
# Fetch client metadata for consent page
|
||||||
|
client_metadata = None
|
||||||
|
try:
|
||||||
|
client_metadata = await happ_parser.fetch_and_parse(client_id)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to fetch client metadata: {e}")
|
||||||
|
|
||||||
|
# Show consent form
|
||||||
|
return templates.TemplateResponse(
|
||||||
|
"authorize.html",
|
||||||
|
{
|
||||||
|
"request": request,
|
||||||
|
"client_id": client_id,
|
||||||
|
"redirect_uri": redirect_uri,
|
||||||
|
"response_type": response_type,
|
||||||
|
"state": state,
|
||||||
|
"code_challenge": code_challenge,
|
||||||
|
"code_challenge_method": code_challenge_method,
|
||||||
|
"scope": scope,
|
||||||
|
"me": me,
|
||||||
|
"client_metadata": client_metadata
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@router.post("/authorize/consent")
|
@router.post("/authorize/consent")
|
||||||
async def authorize_consent(
|
async def authorize_consent(
|
||||||
request: Request,
|
request: Request,
|
||||||
client_id: str = Form(...),
|
client_id: str = Form(...),
|
||||||
redirect_uri: str = Form(...),
|
redirect_uri: str = Form(...),
|
||||||
|
response_type: str = Form("id"), # Default to "id" for authentication flow
|
||||||
state: str = Form(...),
|
state: str = Form(...),
|
||||||
code_challenge: str = Form(...),
|
code_challenge: str = Form(...),
|
||||||
code_challenge_method: str = Form(...),
|
code_challenge_method: str = Form(...),
|
||||||
@@ -211,6 +475,7 @@ async def authorize_consent(
|
|||||||
request: FastAPI request object
|
request: FastAPI request object
|
||||||
client_id: Client application identifier
|
client_id: Client application identifier
|
||||||
redirect_uri: Callback URI
|
redirect_uri: Callback URI
|
||||||
|
response_type: "id" for authentication, "code" for authorization
|
||||||
state: Client state
|
state: Client state
|
||||||
code_challenge: PKCE challenge
|
code_challenge: PKCE challenge
|
||||||
code_challenge_method: PKCE method
|
code_challenge_method: PKCE method
|
||||||
@@ -221,9 +486,9 @@ async def authorize_consent(
|
|||||||
Returns:
|
Returns:
|
||||||
Redirect to client callback with authorization code
|
Redirect to client callback with authorization code
|
||||||
"""
|
"""
|
||||||
logger.info(f"Authorization consent granted for client_id={client_id}")
|
logger.info(f"Authorization consent granted for client_id={client_id} response_type={response_type}")
|
||||||
|
|
||||||
# Create authorization code
|
# Create authorization code with response_type metadata
|
||||||
authorization_code = verification_service.create_authorization_code(
|
authorization_code = verification_service.create_authorization_code(
|
||||||
client_id=client_id,
|
client_id=client_id,
|
||||||
redirect_uri=redirect_uri,
|
redirect_uri=redirect_uri,
|
||||||
@@ -231,7 +496,8 @@ async def authorize_consent(
|
|||||||
code_challenge=code_challenge,
|
code_challenge=code_challenge,
|
||||||
code_challenge_method=code_challenge_method,
|
code_challenge_method=code_challenge_method,
|
||||||
scope=scope,
|
scope=scope,
|
||||||
me=me
|
me=me,
|
||||||
|
response_type=response_type
|
||||||
)
|
)
|
||||||
|
|
||||||
# Build redirect URL with authorization code
|
# Build redirect URL with authorization code
|
||||||
@@ -243,3 +509,161 @@ async def authorize_consent(
|
|||||||
|
|
||||||
logger.info(f"Redirecting to {redirect_uri} with authorization code")
|
logger.info(f"Redirecting to {redirect_uri} with authorization code")
|
||||||
return RedirectResponse(url=redirect_url, status_code=302)
|
return RedirectResponse(url=redirect_url, status_code=302)
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/authorize")
|
||||||
|
async def authorize_post(
|
||||||
|
response: Response,
|
||||||
|
code: str = Form(...),
|
||||||
|
client_id: str = Form(...),
|
||||||
|
redirect_uri: Optional[str] = Form(None),
|
||||||
|
code_verifier: Optional[str] = Form(None),
|
||||||
|
code_storage: CodeStore = Depends(get_code_storage)
|
||||||
|
) -> JSONResponse:
|
||||||
|
"""
|
||||||
|
Handle authorization code verification for authentication flow (response_type=id).
|
||||||
|
|
||||||
|
Per W3C IndieAuth specification Section 5.3.3:
|
||||||
|
https://www.w3.org/TR/indieauth/#redeeming-the-authorization-code-id
|
||||||
|
|
||||||
|
This endpoint is used ONLY for the authentication flow (response_type=id).
|
||||||
|
For the authorization flow (response_type=code), clients must use the token endpoint.
|
||||||
|
|
||||||
|
Request (application/x-www-form-urlencoded):
|
||||||
|
code: Authorization code from /authorize redirect
|
||||||
|
client_id: Client application URL (must match original request)
|
||||||
|
redirect_uri: Original redirect URI (optional but recommended)
|
||||||
|
code_verifier: PKCE verifier (optional, for PKCE validation)
|
||||||
|
|
||||||
|
Response (200 OK):
|
||||||
|
{
|
||||||
|
"me": "https://user.example.com/"
|
||||||
|
}
|
||||||
|
|
||||||
|
Error Response (400 Bad Request):
|
||||||
|
{
|
||||||
|
"error": "invalid_grant",
|
||||||
|
"error_description": "..."
|
||||||
|
}
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
JSONResponse with user identity or error
|
||||||
|
"""
|
||||||
|
# Set cache headers (OAuth 2.0 best practice)
|
||||||
|
response.headers["Cache-Control"] = "no-store"
|
||||||
|
response.headers["Pragma"] = "no-cache"
|
||||||
|
|
||||||
|
logger.info(f"Authorization code verification request from client: {client_id}")
|
||||||
|
|
||||||
|
# STEP 1: Retrieve authorization code from storage
|
||||||
|
storage_key = f"authz:{code}"
|
||||||
|
code_data = code_storage.get(storage_key)
|
||||||
|
|
||||||
|
if code_data is None:
|
||||||
|
logger.warning(f"Authorization code not found or expired: {code[:8]}...")
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=400,
|
||||||
|
content={
|
||||||
|
"error": "invalid_grant",
|
||||||
|
"error_description": "Authorization code is invalid or has expired"
|
||||||
|
},
|
||||||
|
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Validate code_data is a dict
|
||||||
|
if not isinstance(code_data, dict):
|
||||||
|
logger.error(f"Authorization code metadata is not a dict: {type(code_data)}")
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=400,
|
||||||
|
content={
|
||||||
|
"error": "invalid_grant",
|
||||||
|
"error_description": "Authorization code is malformed"
|
||||||
|
},
|
||||||
|
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
|
||||||
|
)
|
||||||
|
|
||||||
|
# STEP 2: Validate this code was issued for response_type=id
|
||||||
|
stored_response_type = code_data.get('response_type', 'id')
|
||||||
|
if stored_response_type != 'id':
|
||||||
|
logger.warning(
|
||||||
|
f"Code redemption at authorization endpoint for response_type={stored_response_type}"
|
||||||
|
)
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=400,
|
||||||
|
content={
|
||||||
|
"error": "invalid_grant",
|
||||||
|
"error_description": "Authorization code must be redeemed at the token endpoint"
|
||||||
|
},
|
||||||
|
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
|
||||||
|
)
|
||||||
|
|
||||||
|
# STEP 3: Validate client_id matches
|
||||||
|
if code_data.get('client_id') != client_id:
|
||||||
|
logger.warning(
|
||||||
|
f"Client ID mismatch: expected {code_data.get('client_id')}, got {client_id}"
|
||||||
|
)
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=400,
|
||||||
|
content={
|
||||||
|
"error": "invalid_client",
|
||||||
|
"error_description": "Client ID does not match authorization code"
|
||||||
|
},
|
||||||
|
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
|
||||||
|
)
|
||||||
|
|
||||||
|
# STEP 4: Validate redirect_uri if provided
|
||||||
|
if redirect_uri and code_data.get('redirect_uri') != redirect_uri:
|
||||||
|
logger.warning(
|
||||||
|
f"Redirect URI mismatch: expected {code_data.get('redirect_uri')}, got {redirect_uri}"
|
||||||
|
)
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=400,
|
||||||
|
content={
|
||||||
|
"error": "invalid_grant",
|
||||||
|
"error_description": "Redirect URI does not match authorization request"
|
||||||
|
},
|
||||||
|
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
|
||||||
|
)
|
||||||
|
|
||||||
|
# STEP 5: Check if code already used (prevent replay)
|
||||||
|
if code_data.get('used'):
|
||||||
|
logger.warning(f"Authorization code replay detected: {code[:8]}...")
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=400,
|
||||||
|
content={
|
||||||
|
"error": "invalid_grant",
|
||||||
|
"error_description": "Authorization code has already been used"
|
||||||
|
},
|
||||||
|
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
|
||||||
|
)
|
||||||
|
|
||||||
|
# STEP 6: Extract user identity
|
||||||
|
me = code_data.get('me')
|
||||||
|
if not me:
|
||||||
|
logger.error("Authorization code missing 'me' parameter")
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=400,
|
||||||
|
content={
|
||||||
|
"error": "invalid_grant",
|
||||||
|
"error_description": "Authorization code is malformed"
|
||||||
|
},
|
||||||
|
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
|
||||||
|
)
|
||||||
|
|
||||||
|
# STEP 7: PKCE validation (optional for authentication flow)
|
||||||
|
if code_verifier:
|
||||||
|
logger.debug(f"PKCE code_verifier provided but not validated (v1.0.0)")
|
||||||
|
# v1.1.0 will validate: SHA256(code_verifier) == code_challenge
|
||||||
|
|
||||||
|
# STEP 8: Delete authorization code (single-use enforcement)
|
||||||
|
code_storage.delete(storage_key)
|
||||||
|
logger.info(f"Authorization code verified and deleted: {code[:8]}...")
|
||||||
|
|
||||||
|
# STEP 9: Return authentication response with user identity
|
||||||
|
logger.info(f"Authentication successful for {me} (client: {client_id})")
|
||||||
|
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=200,
|
||||||
|
content={"me": me},
|
||||||
|
headers={"Cache-Control": "no-store", "Pragma": "no-cache"}
|
||||||
|
)
|
||||||
|
|||||||
@@ -29,9 +29,9 @@ async def get_metadata(config: Config = Depends(get_config)) -> Response:
|
|||||||
"issuer": config.BASE_URL,
|
"issuer": config.BASE_URL,
|
||||||
"authorization_endpoint": f"{config.BASE_URL}/authorize",
|
"authorization_endpoint": f"{config.BASE_URL}/authorize",
|
||||||
"token_endpoint": f"{config.BASE_URL}/token",
|
"token_endpoint": f"{config.BASE_URL}/token",
|
||||||
"response_types_supported": ["code"],
|
"response_types_supported": ["code", "id"],
|
||||||
"grant_types_supported": ["authorization_code"],
|
"grant_types_supported": ["authorization_code"],
|
||||||
"code_challenge_methods_supported": [],
|
"code_challenge_methods_supported": ["S256"],
|
||||||
"token_endpoint_auth_methods_supported": ["none"],
|
"token_endpoint_auth_methods_supported": ["none"],
|
||||||
"revocation_endpoint_auth_methods_supported": ["none"],
|
"revocation_endpoint_auth_methods_supported": ["none"],
|
||||||
"scopes_supported": []
|
"scopes_supported": []
|
||||||
|
|||||||
@@ -156,6 +156,21 @@ async def token_exchange(
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# STEP 4.5: Validate this code was issued for response_type=code
|
||||||
|
# Codes with response_type=id must be redeemed at the authorization endpoint
|
||||||
|
stored_response_type = code_data.get('response_type', 'id')
|
||||||
|
if stored_response_type != 'code':
|
||||||
|
logger.warning(
|
||||||
|
f"Code redemption at token endpoint for response_type={stored_response_type}"
|
||||||
|
)
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=400,
|
||||||
|
detail={
|
||||||
|
"error": "invalid_grant",
|
||||||
|
"error_description": "Authorization code must be redeemed at the authorization endpoint"
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# STEP 5: Check if code already used (prevent replay)
|
# STEP 5: Check if code already used (prevent replay)
|
||||||
if code_data.get('used'):
|
if code_data.get('used'):
|
||||||
logger.error(f"Authorization code replay detected: {code[:8]}...")
|
logger.error(f"Authorization code replay detected: {code[:8]}...")
|
||||||
|
|||||||
@@ -212,7 +212,8 @@ class DomainVerificationService:
|
|||||||
code_challenge: str,
|
code_challenge: str,
|
||||||
code_challenge_method: str,
|
code_challenge_method: str,
|
||||||
scope: str,
|
scope: str,
|
||||||
me: str
|
me: str,
|
||||||
|
response_type: str = "id"
|
||||||
) -> str:
|
) -> str:
|
||||||
"""
|
"""
|
||||||
Create authorization code with metadata.
|
Create authorization code with metadata.
|
||||||
@@ -225,6 +226,7 @@ class DomainVerificationService:
|
|||||||
code_challenge_method: PKCE method (S256)
|
code_challenge_method: PKCE method (S256)
|
||||||
scope: Requested scope
|
scope: Requested scope
|
||||||
me: Verified user identity
|
me: Verified user identity
|
||||||
|
response_type: "id" for authentication, "code" for authorization
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Authorization code
|
Authorization code
|
||||||
@@ -232,7 +234,7 @@ class DomainVerificationService:
|
|||||||
# Generate authorization code
|
# Generate authorization code
|
||||||
authorization_code = self._generate_authorization_code()
|
authorization_code = self._generate_authorization_code()
|
||||||
|
|
||||||
# Create metadata
|
# Create metadata including response_type for flow determination during redemption
|
||||||
metadata = {
|
metadata = {
|
||||||
"client_id": client_id,
|
"client_id": client_id,
|
||||||
"redirect_uri": redirect_uri,
|
"redirect_uri": redirect_uri,
|
||||||
@@ -241,6 +243,7 @@ class DomainVerificationService:
|
|||||||
"code_challenge_method": code_challenge_method,
|
"code_challenge_method": code_challenge_method,
|
||||||
"scope": scope,
|
"scope": scope,
|
||||||
"me": me,
|
"me": me,
|
||||||
|
"response_type": response_type,
|
||||||
"created_at": int(time.time()),
|
"created_at": int(time.time()),
|
||||||
"expires_at": int(time.time()) + 600,
|
"expires_at": int(time.time()) + 600,
|
||||||
"used": False
|
"used": False
|
||||||
|
|||||||
@@ -36,6 +36,7 @@
|
|||||||
<form method="POST" action="/authorize/consent">
|
<form method="POST" action="/authorize/consent">
|
||||||
<input type="hidden" name="client_id" value="{{ client_id }}">
|
<input type="hidden" name="client_id" value="{{ client_id }}">
|
||||||
<input type="hidden" name="redirect_uri" value="{{ redirect_uri }}">
|
<input type="hidden" name="redirect_uri" value="{{ redirect_uri }}">
|
||||||
|
<input type="hidden" name="response_type" value="{{ response_type }}">
|
||||||
<input type="hidden" name="state" value="{{ state }}">
|
<input type="hidden" name="state" value="{{ state }}">
|
||||||
<input type="hidden" name="code_challenge" value="{{ code_challenge }}">
|
<input type="hidden" name="code_challenge" value="{{ code_challenge }}">
|
||||||
<input type="hidden" name="code_challenge_method" value="{{ code_challenge_method }}">
|
<input type="hidden" name="code_challenge_method" value="{{ code_challenge_method }}">
|
||||||
|
|||||||
40
src/gondulf/templates/verification_error.html
Normal file
40
src/gondulf/templates/verification_error.html
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
{% extends "base.html" %}
|
||||||
|
|
||||||
|
{% block title %}Verification Failed - Gondulf{% endblock %}
|
||||||
|
|
||||||
|
{% block content %}
|
||||||
|
<h1>Verification Failed</h1>
|
||||||
|
|
||||||
|
<div class="error">
|
||||||
|
<p>{{ error }}</p>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
{% if "DNS" in error or "dns" in error %}
|
||||||
|
<div class="instructions">
|
||||||
|
<h2>How to Fix</h2>
|
||||||
|
<p>Add the following DNS TXT record to your domain:</p>
|
||||||
|
<code>
|
||||||
|
Type: TXT<br>
|
||||||
|
Name: _gondulf.{{ domain }}<br>
|
||||||
|
Value: gondulf-verify-domain
|
||||||
|
</code>
|
||||||
|
<p>DNS changes may take up to 24 hours to propagate.</p>
|
||||||
|
</div>
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
|
{% if "email" in error.lower() or "rel" in error.lower() %}
|
||||||
|
<div class="instructions">
|
||||||
|
<h2>How to Fix</h2>
|
||||||
|
<p>Add a rel="me" link to your homepage pointing to your email:</p>
|
||||||
|
<code><link rel="me" href="mailto:you@example.com"></code>
|
||||||
|
<p>Or as an anchor tag:</p>
|
||||||
|
<code><a rel="me" href="mailto:you@example.com">Email me</a></code>
|
||||||
|
</div>
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
|
<p>
|
||||||
|
<a href="/authorize?client_id={{ client_id }}&redirect_uri={{ redirect_uri }}&response_type={{ response_type }}&state={{ state }}&code_challenge={{ code_challenge }}&code_challenge_method={{ code_challenge_method }}&scope={{ scope }}&me={{ me }}">
|
||||||
|
Try Again
|
||||||
|
</a>
|
||||||
|
</p>
|
||||||
|
{% endblock %}
|
||||||
51
src/gondulf/templates/verify_code.html
Normal file
51
src/gondulf/templates/verify_code.html
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
{% extends "base.html" %}
|
||||||
|
|
||||||
|
{% block title %}Verify Your Identity - Gondulf{% endblock %}
|
||||||
|
|
||||||
|
{% block content %}
|
||||||
|
<h1>Verify Your Identity</h1>
|
||||||
|
|
||||||
|
<p>To sign in as <strong>{{ domain }}</strong>, please enter the verification code sent to <strong>{{ masked_email }}</strong>.</p>
|
||||||
|
|
||||||
|
{% if error %}
|
||||||
|
<div class="error">
|
||||||
|
<p>{{ error }}</p>
|
||||||
|
</div>
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
|
<form method="POST" action="/authorize/verify-code">
|
||||||
|
<!-- Pass through authorization parameters -->
|
||||||
|
<input type="hidden" name="domain" value="{{ domain }}">
|
||||||
|
<input type="hidden" name="client_id" value="{{ client_id }}">
|
||||||
|
<input type="hidden" name="redirect_uri" value="{{ redirect_uri }}">
|
||||||
|
<input type="hidden" name="response_type" value="{{ response_type }}">
|
||||||
|
<input type="hidden" name="state" value="{{ state }}">
|
||||||
|
<input type="hidden" name="code_challenge" value="{{ code_challenge }}">
|
||||||
|
<input type="hidden" name="code_challenge_method" value="{{ code_challenge_method }}">
|
||||||
|
<input type="hidden" name="scope" value="{{ scope }}">
|
||||||
|
<input type="hidden" name="me" value="{{ me }}">
|
||||||
|
|
||||||
|
<div class="form-group">
|
||||||
|
<label for="code">Verification Code:</label>
|
||||||
|
<input type="text"
|
||||||
|
id="code"
|
||||||
|
name="code"
|
||||||
|
placeholder="000000"
|
||||||
|
maxlength="6"
|
||||||
|
pattern="[0-9]{6}"
|
||||||
|
inputmode="numeric"
|
||||||
|
autocomplete="one-time-code"
|
||||||
|
required
|
||||||
|
autofocus>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<button type="submit">Verify</button>
|
||||||
|
</form>
|
||||||
|
|
||||||
|
<p class="help-text">
|
||||||
|
Did not receive a code? Check your spam folder.
|
||||||
|
<a href="/authorize?client_id={{ client_id }}&redirect_uri={{ redirect_uri }}&response_type={{ response_type }}&state={{ state }}&code_challenge={{ code_challenge }}&code_challenge_method={{ code_challenge_method }}&scope={{ scope }}&me={{ me }}">
|
||||||
|
Request a new code
|
||||||
|
</a>
|
||||||
|
</p>
|
||||||
|
{% endblock %}
|
||||||
@@ -131,7 +131,7 @@ def test_code_storage():
|
|||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def valid_auth_code(test_code_storage) -> tuple[str, dict]:
|
def valid_auth_code(test_code_storage) -> tuple[str, dict]:
|
||||||
"""
|
"""
|
||||||
Create a valid authorization code with metadata.
|
Create a valid authorization code with metadata (authorization flow).
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
test_code_storage: Code storage fixture
|
test_code_storage: Code storage fixture
|
||||||
@@ -143,6 +143,7 @@ def valid_auth_code(test_code_storage) -> tuple[str, dict]:
|
|||||||
metadata = {
|
metadata = {
|
||||||
"client_id": "https://client.example.com",
|
"client_id": "https://client.example.com",
|
||||||
"redirect_uri": "https://client.example.com/callback",
|
"redirect_uri": "https://client.example.com/callback",
|
||||||
|
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||||
"state": "xyz123",
|
"state": "xyz123",
|
||||||
"me": "https://user.example.com",
|
"me": "https://user.example.com",
|
||||||
"scope": "",
|
"scope": "",
|
||||||
@@ -159,7 +160,7 @@ def valid_auth_code(test_code_storage) -> tuple[str, dict]:
|
|||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def expired_auth_code(test_code_storage) -> tuple[str, dict]:
|
def expired_auth_code(test_code_storage) -> tuple[str, dict]:
|
||||||
"""
|
"""
|
||||||
Create an expired authorization code.
|
Create an expired authorization code (authorization flow).
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Tuple of (code, metadata) where the code is expired
|
Tuple of (code, metadata) where the code is expired
|
||||||
@@ -169,6 +170,7 @@ def expired_auth_code(test_code_storage) -> tuple[str, dict]:
|
|||||||
metadata = {
|
metadata = {
|
||||||
"client_id": "https://client.example.com",
|
"client_id": "https://client.example.com",
|
||||||
"redirect_uri": "https://client.example.com/callback",
|
"redirect_uri": "https://client.example.com/callback",
|
||||||
|
"response_type": "code", # Authorization flow
|
||||||
"state": "xyz123",
|
"state": "xyz123",
|
||||||
"me": "https://user.example.com",
|
"me": "https://user.example.com",
|
||||||
"scope": "",
|
"scope": "",
|
||||||
@@ -186,7 +188,7 @@ def expired_auth_code(test_code_storage) -> tuple[str, dict]:
|
|||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def used_auth_code(test_code_storage) -> tuple[str, dict]:
|
def used_auth_code(test_code_storage) -> tuple[str, dict]:
|
||||||
"""
|
"""
|
||||||
Create an already-used authorization code.
|
Create an already-used authorization code (authorization flow).
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Tuple of (code, metadata) where the code is marked as used
|
Tuple of (code, metadata) where the code is marked as used
|
||||||
@@ -195,6 +197,7 @@ def used_auth_code(test_code_storage) -> tuple[str, dict]:
|
|||||||
metadata = {
|
metadata = {
|
||||||
"client_id": "https://client.example.com",
|
"client_id": "https://client.example.com",
|
||||||
"redirect_uri": "https://client.example.com/callback",
|
"redirect_uri": "https://client.example.com/callback",
|
||||||
|
"response_type": "code", # Authorization flow
|
||||||
"state": "xyz123",
|
"state": "xyz123",
|
||||||
"me": "https://user.example.com",
|
"me": "https://user.example.com",
|
||||||
"scope": "",
|
"scope": "",
|
||||||
@@ -474,13 +477,13 @@ def malicious_client() -> dict[str, Any]:
|
|||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def valid_auth_request() -> dict[str, str]:
|
def valid_auth_request() -> dict[str, str]:
|
||||||
"""
|
"""
|
||||||
Complete valid authorization request parameters.
|
Complete valid authorization request parameters (for authorization flow).
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Dict with all required authorization parameters
|
Dict with all required authorization parameters
|
||||||
"""
|
"""
|
||||||
return {
|
return {
|
||||||
"response_type": "code",
|
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||||
"client_id": "https://app.example.com",
|
"client_id": "https://app.example.com",
|
||||||
"redirect_uri": "https://app.example.com/callback",
|
"redirect_uri": "https://app.example.com/callback",
|
||||||
"state": "random_state_12345",
|
"state": "random_state_12345",
|
||||||
|
|||||||
@@ -76,6 +76,7 @@ class TestCompleteAuthorizationFlow:
|
|||||||
consent_data = {
|
consent_data = {
|
||||||
"client_id": "https://app.example.com",
|
"client_id": "https://app.example.com",
|
||||||
"redirect_uri": "https://app.example.com/callback",
|
"redirect_uri": "https://app.example.com/callback",
|
||||||
|
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||||
"state": "e2e_test_state_12345",
|
"state": "e2e_test_state_12345",
|
||||||
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
|
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
|
||||||
"code_challenge_method": "S256",
|
"code_challenge_method": "S256",
|
||||||
@@ -139,6 +140,7 @@ class TestCompleteAuthorizationFlow:
|
|||||||
data={
|
data={
|
||||||
"client_id": "https://app.example.com",
|
"client_id": "https://app.example.com",
|
||||||
"redirect_uri": "https://app.example.com/callback",
|
"redirect_uri": "https://app.example.com/callback",
|
||||||
|
"response_type": "code", # For state preservation test
|
||||||
"state": state,
|
"state": state,
|
||||||
"code_challenge": "abc123",
|
"code_challenge": "abc123",
|
||||||
"code_challenge_method": "S256",
|
"code_challenge_method": "S256",
|
||||||
@@ -163,6 +165,7 @@ class TestCompleteAuthorizationFlow:
|
|||||||
data={
|
data={
|
||||||
"client_id": "https://app.example.com",
|
"client_id": "https://app.example.com",
|
||||||
"redirect_uri": "https://app.example.com/callback",
|
"redirect_uri": "https://app.example.com/callback",
|
||||||
|
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||||
"state": f"flow_{i}",
|
"state": f"flow_{i}",
|
||||||
"code_challenge": "abc123",
|
"code_challenge": "abc123",
|
||||||
"code_challenge_method": "S256",
|
"code_challenge_method": "S256",
|
||||||
@@ -217,6 +220,7 @@ class TestErrorScenariosE2E:
|
|||||||
metadata = {
|
metadata = {
|
||||||
"client_id": "https://app.example.com",
|
"client_id": "https://app.example.com",
|
||||||
"redirect_uri": "https://app.example.com/callback",
|
"redirect_uri": "https://app.example.com/callback",
|
||||||
|
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||||
"state": "test",
|
"state": "test",
|
||||||
"me": "https://user.example.com",
|
"me": "https://user.example.com",
|
||||||
"scope": "",
|
"scope": "",
|
||||||
@@ -255,6 +259,7 @@ class TestErrorScenariosE2E:
|
|||||||
data={
|
data={
|
||||||
"client_id": "https://app.example.com",
|
"client_id": "https://app.example.com",
|
||||||
"redirect_uri": "https://app.example.com/callback",
|
"redirect_uri": "https://app.example.com/callback",
|
||||||
|
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||||
"state": "test",
|
"state": "test",
|
||||||
"code_challenge": "abc123",
|
"code_challenge": "abc123",
|
||||||
"code_challenge_method": "S256",
|
"code_challenge_method": "S256",
|
||||||
@@ -292,6 +297,7 @@ class TestErrorScenariosE2E:
|
|||||||
data={
|
data={
|
||||||
"client_id": "https://app.example.com",
|
"client_id": "https://app.example.com",
|
||||||
"redirect_uri": "https://app.example.com/callback",
|
"redirect_uri": "https://app.example.com/callback",
|
||||||
|
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||||
"state": "test",
|
"state": "test",
|
||||||
"code_challenge": "abc123",
|
"code_challenge": "abc123",
|
||||||
"code_challenge_method": "S256",
|
"code_challenge_method": "S256",
|
||||||
@@ -327,6 +333,7 @@ class TestTokenUsageE2E:
|
|||||||
data={
|
data={
|
||||||
"client_id": "https://app.example.com",
|
"client_id": "https://app.example.com",
|
||||||
"redirect_uri": "https://app.example.com/callback",
|
"redirect_uri": "https://app.example.com/callback",
|
||||||
|
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||||
"state": "test",
|
"state": "test",
|
||||||
"code_challenge": "abc123",
|
"code_challenge": "abc123",
|
||||||
"code_challenge_method": "S256",
|
"code_challenge_method": "S256",
|
||||||
@@ -362,6 +369,7 @@ class TestTokenUsageE2E:
|
|||||||
data={
|
data={
|
||||||
"client_id": "https://app.example.com",
|
"client_id": "https://app.example.com",
|
||||||
"redirect_uri": "https://app.example.com/callback",
|
"redirect_uri": "https://app.example.com/callback",
|
||||||
|
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||||
"state": "test",
|
"state": "test",
|
||||||
"code_challenge": "abc123",
|
"code_challenge": "abc123",
|
||||||
"code_challenge_method": "S256",
|
"code_challenge_method": "S256",
|
||||||
|
|||||||
532
tests/integration/api/test_authorization_verification.py
Normal file
532
tests/integration/api/test_authorization_verification.py
Normal file
@@ -0,0 +1,532 @@
|
|||||||
|
"""
|
||||||
|
Integration tests for authorization endpoint domain verification.
|
||||||
|
|
||||||
|
Tests the security fix that requires domain verification before showing the consent page.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
from unittest.mock import AsyncMock, Mock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def valid_auth_params():
|
||||||
|
"""Valid authorization request parameters."""
|
||||||
|
return {
|
||||||
|
"client_id": "https://app.example.com",
|
||||||
|
"redirect_uri": "https://app.example.com/callback",
|
||||||
|
"response_type": "code",
|
||||||
|
"state": "test123",
|
||||||
|
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
|
||||||
|
"code_challenge_method": "S256",
|
||||||
|
"me": "https://user.example.com",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def create_mock_verification_service(start_success=True, verify_success=True, start_error="dns_verification_failed"):
|
||||||
|
"""Create a mock verification service with configurable behavior."""
|
||||||
|
mock_service = Mock()
|
||||||
|
|
||||||
|
if start_success:
|
||||||
|
mock_service.start_verification.return_value = {
|
||||||
|
"success": True,
|
||||||
|
"email": "t***@example.com",
|
||||||
|
"verification_method": "email"
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
mock_service.start_verification.return_value = {
|
||||||
|
"success": False,
|
||||||
|
"error": start_error
|
||||||
|
}
|
||||||
|
|
||||||
|
if verify_success:
|
||||||
|
mock_service.verify_email_code.return_value = {
|
||||||
|
"success": True,
|
||||||
|
"email": "test@example.com"
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
mock_service.verify_email_code.return_value = {
|
||||||
|
"success": False,
|
||||||
|
"error": "invalid_code"
|
||||||
|
}
|
||||||
|
|
||||||
|
mock_service.code_storage = Mock()
|
||||||
|
mock_service.code_storage.get.return_value = "test@example.com"
|
||||||
|
mock_service.create_authorization_code.return_value = "test_auth_code_12345"
|
||||||
|
|
||||||
|
return mock_service
|
||||||
|
|
||||||
|
|
||||||
|
def create_mock_happ_parser():
|
||||||
|
"""Create a mock h-app parser."""
|
||||||
|
from gondulf.services.happ_parser import ClientMetadata
|
||||||
|
|
||||||
|
mock_parser = Mock()
|
||||||
|
mock_parser.fetch_and_parse = AsyncMock(return_value=ClientMetadata(
|
||||||
|
name="Test Application",
|
||||||
|
url="https://app.example.com",
|
||||||
|
logo="https://app.example.com/logo.png"
|
||||||
|
))
|
||||||
|
return mock_parser
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def configured_app(monkeypatch, tmp_path):
|
||||||
|
"""Create a fully configured app with fresh database."""
|
||||||
|
db_path = tmp_path / "test.db"
|
||||||
|
|
||||||
|
monkeypatch.setenv("GONDULF_SECRET_KEY", "a" * 32)
|
||||||
|
monkeypatch.setenv("GONDULF_BASE_URL", "https://auth.example.com")
|
||||||
|
monkeypatch.setenv("GONDULF_DATABASE_URL", f"sqlite:///{db_path}")
|
||||||
|
monkeypatch.setenv("GONDULF_DEBUG", "true")
|
||||||
|
|
||||||
|
from gondulf.main import app
|
||||||
|
return app, db_path
|
||||||
|
|
||||||
|
|
||||||
|
class TestUnverifiedDomainTriggersVerification:
|
||||||
|
"""Tests that unverified domains trigger the verification flow."""
|
||||||
|
|
||||||
|
def test_unverified_domain_shows_verification_form(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test that an unverified domain shows the verification code form."""
|
||||||
|
app, _ = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||||
|
|
||||||
|
mock_service = create_mock_verification_service(start_success=True)
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.get("/authorize", params=valid_auth_params)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
# Should show verification form, not consent form
|
||||||
|
assert "Verify Your Identity" in response.text
|
||||||
|
assert "verification code" in response.text.lower()
|
||||||
|
# Should show masked email
|
||||||
|
assert "t***@example.com" in response.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
def test_unverified_domain_preserves_auth_params(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test that authorization parameters are preserved in verification form."""
|
||||||
|
app, _ = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||||
|
|
||||||
|
mock_service = create_mock_verification_service(start_success=True)
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.get("/authorize", params=valid_auth_params)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
# Check hidden fields contain auth params
|
||||||
|
assert 'name="client_id"' in response.text
|
||||||
|
assert 'name="redirect_uri"' in response.text
|
||||||
|
assert 'name="state"' in response.text
|
||||||
|
assert 'name="code_challenge"' in response.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
def test_unverified_domain_does_not_show_consent(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test that unverified domain does NOT show consent form directly."""
|
||||||
|
app, _ = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||||
|
|
||||||
|
mock_service = create_mock_verification_service(start_success=True)
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.get("/authorize", params=valid_auth_params)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
# Should NOT show consent/authorization form
|
||||||
|
assert "Authorization Request" not in response.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
|
||||||
|
class TestVerifiedDomainShowsConsent:
|
||||||
|
"""Tests that verified domains skip verification and show consent."""
|
||||||
|
|
||||||
|
def test_verified_domain_shows_consent_page(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test that a verified domain shows consent page directly."""
|
||||||
|
app, db_path = configured_app
|
||||||
|
from gondulf.dependencies import get_happ_parser, get_database
|
||||||
|
from gondulf.database.connection import Database
|
||||||
|
from sqlalchemy import text
|
||||||
|
|
||||||
|
# Create database and insert verified domain
|
||||||
|
db = Database(f"sqlite:///{db_path}")
|
||||||
|
db.initialize()
|
||||||
|
|
||||||
|
with db.get_engine().begin() as conn:
|
||||||
|
conn.execute(
|
||||||
|
text("""
|
||||||
|
INSERT INTO domains (domain, email, verification_code, verified, verified_at, two_factor)
|
||||||
|
VALUES (:domain, :email, '', 1, :verified_at, 1)
|
||||||
|
"""),
|
||||||
|
{"domain": "user.example.com", "email": "test@example.com", "verified_at": datetime.utcnow()}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Override database to use same instance
|
||||||
|
app.dependency_overrides[get_database] = lambda: db
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.get("/authorize", params=valid_auth_params)
|
||||||
|
|
||||||
|
# Should show consent page
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert "Authorization Request" in response.text
|
||||||
|
assert 'action="/authorize/consent"' in response.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
|
||||||
|
class TestVerificationCodeValidation:
|
||||||
|
"""Tests for the verification code submission endpoint."""
|
||||||
|
|
||||||
|
def test_valid_code_shows_consent(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test that valid verification code shows consent page."""
|
||||||
|
app, _ = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||||
|
|
||||||
|
mock_service = create_mock_verification_service(verify_success=True)
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
form_data = {
|
||||||
|
"domain": "user.example.com",
|
||||||
|
"code": "123456",
|
||||||
|
"client_id": valid_auth_params["client_id"],
|
||||||
|
"redirect_uri": valid_auth_params["redirect_uri"],
|
||||||
|
"response_type": valid_auth_params["response_type"],
|
||||||
|
"state": valid_auth_params["state"],
|
||||||
|
"code_challenge": valid_auth_params["code_challenge"],
|
||||||
|
"code_challenge_method": valid_auth_params["code_challenge_method"],
|
||||||
|
"scope": "",
|
||||||
|
"me": valid_auth_params["me"],
|
||||||
|
}
|
||||||
|
|
||||||
|
response = client.post("/authorize/verify-code", data=form_data)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
# Should show consent page after successful verification
|
||||||
|
assert "Authorization Request" in response.text or "Authorize" in response.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
def test_invalid_code_shows_error_with_retry(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test that invalid code shows error and allows retry."""
|
||||||
|
app, _ = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||||
|
|
||||||
|
mock_service = create_mock_verification_service(verify_success=False)
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
form_data = {
|
||||||
|
"domain": "user.example.com",
|
||||||
|
"code": "000000",
|
||||||
|
"client_id": valid_auth_params["client_id"],
|
||||||
|
"redirect_uri": valid_auth_params["redirect_uri"],
|
||||||
|
"response_type": valid_auth_params["response_type"],
|
||||||
|
"state": valid_auth_params["state"],
|
||||||
|
"code_challenge": valid_auth_params["code_challenge"],
|
||||||
|
"code_challenge_method": valid_auth_params["code_challenge_method"],
|
||||||
|
"scope": "",
|
||||||
|
"me": valid_auth_params["me"],
|
||||||
|
}
|
||||||
|
|
||||||
|
response = client.post("/authorize/verify-code", data=form_data)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
# Should show verify_code page with error
|
||||||
|
assert "Invalid verification code" in response.text or "invalid" in response.text.lower()
|
||||||
|
# Should still have the form for retry
|
||||||
|
assert 'name="code"' in response.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
|
||||||
|
class TestDNSFailureHandling:
|
||||||
|
"""Tests for DNS verification failure scenarios."""
|
||||||
|
|
||||||
|
def test_dns_failure_shows_instructions(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test that DNS verification failure shows helpful instructions."""
|
||||||
|
app, db_path = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
|
||||||
|
from gondulf.database.connection import Database
|
||||||
|
from sqlalchemy import text
|
||||||
|
|
||||||
|
# Clear any pre-existing verified domain to ensure test isolation
|
||||||
|
db = Database(f"sqlite:///{db_path}")
|
||||||
|
db.initialize()
|
||||||
|
with db.get_engine().begin() as conn:
|
||||||
|
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
|
||||||
|
|
||||||
|
app.dependency_overrides[get_database] = lambda: db
|
||||||
|
|
||||||
|
mock_service = create_mock_verification_service(start_success=False, start_error="dns_verification_failed")
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.get("/authorize", params=valid_auth_params)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
# Should show error page with DNS instructions
|
||||||
|
assert "DNS" in response.text or "dns" in response.text.lower()
|
||||||
|
assert "TXT" in response.text
|
||||||
|
assert "_gondulf" in response.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
|
||||||
|
class TestEmailFailureHandling:
|
||||||
|
"""Tests for email discovery failure scenarios."""
|
||||||
|
|
||||||
|
def test_email_discovery_failure_shows_instructions(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test that email discovery failure shows helpful instructions."""
|
||||||
|
app, db_path = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
|
||||||
|
from gondulf.database.connection import Database
|
||||||
|
from sqlalchemy import text
|
||||||
|
|
||||||
|
# Clear any pre-existing verified domain to ensure test isolation
|
||||||
|
db = Database(f"sqlite:///{db_path}")
|
||||||
|
db.initialize()
|
||||||
|
with db.get_engine().begin() as conn:
|
||||||
|
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
|
||||||
|
|
||||||
|
app.dependency_overrides[get_database] = lambda: db
|
||||||
|
|
||||||
|
mock_service = create_mock_verification_service(start_success=False, start_error="email_discovery_failed")
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.get("/authorize", params=valid_auth_params)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
# Should show error page with email instructions
|
||||||
|
assert "email" in response.text.lower()
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
|
||||||
|
class TestFullVerificationFlow:
|
||||||
|
"""Integration tests for the complete verification flow."""
|
||||||
|
|
||||||
|
def test_full_flow_new_domain(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test complete flow: unverified domain -> verify code -> consent."""
|
||||||
|
app, db_path = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
|
||||||
|
from gondulf.database.connection import Database
|
||||||
|
from sqlalchemy import text
|
||||||
|
|
||||||
|
# Clear any pre-existing verified domain to ensure test isolation
|
||||||
|
db = Database(f"sqlite:///{db_path}")
|
||||||
|
db.initialize()
|
||||||
|
with db.get_engine().begin() as conn:
|
||||||
|
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
|
||||||
|
|
||||||
|
app.dependency_overrides[get_database] = lambda: db
|
||||||
|
|
||||||
|
mock_service = create_mock_verification_service(start_success=True, verify_success=True)
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
# Step 1: GET /authorize -> should show verification form
|
||||||
|
response1 = client.get("/authorize", params=valid_auth_params)
|
||||||
|
|
||||||
|
assert response1.status_code == 200
|
||||||
|
assert "Verify Your Identity" in response1.text
|
||||||
|
|
||||||
|
# Step 2: POST /authorize/verify-code -> should show consent
|
||||||
|
form_data = {
|
||||||
|
"domain": "user.example.com",
|
||||||
|
"code": "123456",
|
||||||
|
"client_id": valid_auth_params["client_id"],
|
||||||
|
"redirect_uri": valid_auth_params["redirect_uri"],
|
||||||
|
"response_type": valid_auth_params["response_type"],
|
||||||
|
"state": valid_auth_params["state"],
|
||||||
|
"code_challenge": valid_auth_params["code_challenge"],
|
||||||
|
"code_challenge_method": valid_auth_params["code_challenge_method"],
|
||||||
|
"scope": "",
|
||||||
|
"me": valid_auth_params["me"],
|
||||||
|
}
|
||||||
|
|
||||||
|
response2 = client.post("/authorize/verify-code", data=form_data)
|
||||||
|
|
||||||
|
assert response2.status_code == 200
|
||||||
|
# Should show consent page
|
||||||
|
assert "Authorization Request" in response2.text or "Authorize" in response2.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
def test_verification_code_retry_with_correct_code(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test that user can retry with correct code after failure."""
|
||||||
|
app, _ = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||||
|
|
||||||
|
mock_service = Mock()
|
||||||
|
# First verify_email_code call fails, second succeeds
|
||||||
|
mock_service.verify_email_code.side_effect = [
|
||||||
|
{"success": False, "error": "invalid_code"},
|
||||||
|
{"success": True, "email": "test@example.com"}
|
||||||
|
]
|
||||||
|
mock_service.code_storage = Mock()
|
||||||
|
mock_service.code_storage.get.return_value = "test@example.com"
|
||||||
|
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
form_data = {
|
||||||
|
"domain": "user.example.com",
|
||||||
|
"code": "000000", # Wrong code
|
||||||
|
"client_id": valid_auth_params["client_id"],
|
||||||
|
"redirect_uri": valid_auth_params["redirect_uri"],
|
||||||
|
"response_type": valid_auth_params["response_type"],
|
||||||
|
"state": valid_auth_params["state"],
|
||||||
|
"code_challenge": valid_auth_params["code_challenge"],
|
||||||
|
"code_challenge_method": valid_auth_params["code_challenge_method"],
|
||||||
|
"scope": "",
|
||||||
|
"me": valid_auth_params["me"],
|
||||||
|
}
|
||||||
|
|
||||||
|
# First attempt with wrong code
|
||||||
|
response1 = client.post("/authorize/verify-code", data=form_data)
|
||||||
|
assert response1.status_code == 200
|
||||||
|
assert "Invalid" in response1.text or "invalid" in response1.text.lower()
|
||||||
|
|
||||||
|
# Second attempt with correct code
|
||||||
|
form_data["code"] = "123456"
|
||||||
|
response2 = client.post("/authorize/verify-code", data=form_data)
|
||||||
|
assert response2.status_code == 200
|
||||||
|
assert "Authorization Request" in response2.text or "Authorize" in response2.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
|
||||||
|
class TestSecurityRequirements:
|
||||||
|
"""Tests for security requirements of the fix."""
|
||||||
|
|
||||||
|
def test_unverified_domain_never_sees_consent_directly(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Critical: Unverified domains must NEVER see consent page directly."""
|
||||||
|
app, db_path = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser, get_database
|
||||||
|
from gondulf.database.connection import Database
|
||||||
|
from sqlalchemy import text
|
||||||
|
|
||||||
|
# Clear any pre-existing verified domain to ensure test isolation
|
||||||
|
db = Database(f"sqlite:///{db_path}")
|
||||||
|
db.initialize()
|
||||||
|
with db.get_engine().begin() as conn:
|
||||||
|
conn.execute(text("DELETE FROM domains WHERE domain = :domain"), {"domain": "user.example.com"})
|
||||||
|
|
||||||
|
app.dependency_overrides[get_database] = lambda: db
|
||||||
|
|
||||||
|
mock_service = create_mock_verification_service(start_success=True)
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.get("/authorize", params=valid_auth_params)
|
||||||
|
|
||||||
|
# The consent page should NOT be shown
|
||||||
|
assert "Authorization Request" not in response.text
|
||||||
|
# Verify code page should be shown instead
|
||||||
|
assert "Verify Your Identity" in response.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
def test_state_parameter_preserved_through_flow(
|
||||||
|
self, configured_app, valid_auth_params
|
||||||
|
):
|
||||||
|
"""Test that state parameter is preserved through verification flow."""
|
||||||
|
app, _ = configured_app
|
||||||
|
from gondulf.dependencies import get_verification_service, get_happ_parser
|
||||||
|
|
||||||
|
mock_service = create_mock_verification_service(start_success=True)
|
||||||
|
mock_parser = create_mock_happ_parser()
|
||||||
|
|
||||||
|
app.dependency_overrides[get_verification_service] = lambda: mock_service
|
||||||
|
app.dependency_overrides[get_happ_parser] = lambda: mock_parser
|
||||||
|
|
||||||
|
try:
|
||||||
|
unique_state = "unique_state_abc123xyz"
|
||||||
|
params = valid_auth_params.copy()
|
||||||
|
params["state"] = unique_state
|
||||||
|
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.get("/authorize", params=params)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
# State should be in hidden form field
|
||||||
|
assert f'value="{unique_state}"' in response.text or f"value='{unique_state}'" in response.text
|
||||||
|
finally:
|
||||||
|
app.dependency_overrides.clear()
|
||||||
433
tests/integration/api/test_response_type_flows.py
Normal file
433
tests/integration/api/test_response_type_flows.py
Normal file
@@ -0,0 +1,433 @@
|
|||||||
|
"""
|
||||||
|
Integration tests for IndieAuth response_type flows.
|
||||||
|
|
||||||
|
Tests the two IndieAuth flows per W3C specification:
|
||||||
|
- Authentication flow (response_type=id): Code redeemed at authorization endpoint
|
||||||
|
- Authorization flow (response_type=code): Code redeemed at token endpoint
|
||||||
|
"""
|
||||||
|
|
||||||
|
from unittest.mock import AsyncMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def flow_app(monkeypatch, tmp_path):
|
||||||
|
"""Create app for flow testing."""
|
||||||
|
db_path = tmp_path / "test.db"
|
||||||
|
|
||||||
|
monkeypatch.setenv("GONDULF_SECRET_KEY", "a" * 32)
|
||||||
|
monkeypatch.setenv("GONDULF_BASE_URL", "https://auth.example.com")
|
||||||
|
monkeypatch.setenv("GONDULF_DATABASE_URL", f"sqlite:///{db_path}")
|
||||||
|
monkeypatch.setenv("GONDULF_DEBUG", "true")
|
||||||
|
|
||||||
|
from gondulf.main import app
|
||||||
|
return app
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def flow_client(flow_app):
|
||||||
|
"""Create test client for flow tests."""
|
||||||
|
with TestClient(flow_app) as client:
|
||||||
|
yield client
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_happ_fetch():
|
||||||
|
"""Mock h-app parser to avoid network calls."""
|
||||||
|
from gondulf.services.happ_parser import ClientMetadata
|
||||||
|
|
||||||
|
metadata = ClientMetadata(
|
||||||
|
name="Test Application",
|
||||||
|
url="https://app.example.com",
|
||||||
|
logo="https://app.example.com/logo.png"
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch('gondulf.services.happ_parser.HAppParser.fetch_and_parse', new_callable=AsyncMock) as mock:
|
||||||
|
mock.return_value = metadata
|
||||||
|
yield mock
|
||||||
|
|
||||||
|
|
||||||
|
class TestResponseTypeValidation:
|
||||||
|
"""Tests for response_type parameter validation."""
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def base_params(self):
|
||||||
|
"""Base authorization parameters without response_type."""
|
||||||
|
return {
|
||||||
|
"client_id": "https://app.example.com",
|
||||||
|
"redirect_uri": "https://app.example.com/callback",
|
||||||
|
"state": "test123",
|
||||||
|
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
|
||||||
|
"code_challenge_method": "S256",
|
||||||
|
"me": "https://user.example.com",
|
||||||
|
}
|
||||||
|
|
||||||
|
def test_response_type_id_accepted(self, flow_client, base_params, mock_happ_fetch):
|
||||||
|
"""Test response_type=id is accepted."""
|
||||||
|
params = base_params.copy()
|
||||||
|
params["response_type"] = "id"
|
||||||
|
|
||||||
|
response = flow_client.get("/authorize", params=params)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert "text/html" in response.headers["content-type"]
|
||||||
|
|
||||||
|
def test_response_type_code_accepted(self, flow_client, base_params, mock_happ_fetch):
|
||||||
|
"""Test response_type=code is accepted."""
|
||||||
|
params = base_params.copy()
|
||||||
|
params["response_type"] = "code"
|
||||||
|
|
||||||
|
response = flow_client.get("/authorize", params=params)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert "text/html" in response.headers["content-type"]
|
||||||
|
|
||||||
|
def test_response_type_defaults_to_id(self, flow_client, base_params, mock_happ_fetch):
|
||||||
|
"""Test missing response_type defaults to 'id'."""
|
||||||
|
# No response_type in params
|
||||||
|
response = flow_client.get("/authorize", params=base_params)
|
||||||
|
assert response.status_code == 200
|
||||||
|
# Form should contain response_type=id
|
||||||
|
assert 'value="id"' in response.text
|
||||||
|
|
||||||
|
def test_invalid_response_type_rejected(self, flow_client, base_params, mock_happ_fetch):
|
||||||
|
"""Test invalid response_type redirects with error."""
|
||||||
|
params = base_params.copy()
|
||||||
|
params["response_type"] = "token" # Invalid
|
||||||
|
|
||||||
|
response = flow_client.get("/authorize", params=params, follow_redirects=False)
|
||||||
|
|
||||||
|
assert response.status_code == 302
|
||||||
|
location = response.headers["location"]
|
||||||
|
assert "error=unsupported_response_type" in location
|
||||||
|
assert "state=test123" in location
|
||||||
|
|
||||||
|
def test_consent_form_includes_response_type(self, flow_client, base_params, mock_happ_fetch):
|
||||||
|
"""Test consent form includes response_type hidden field."""
|
||||||
|
params = base_params.copy()
|
||||||
|
params["response_type"] = "code"
|
||||||
|
|
||||||
|
response = flow_client.get("/authorize", params=params)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert 'name="response_type"' in response.text
|
||||||
|
assert 'value="code"' in response.text
|
||||||
|
|
||||||
|
|
||||||
|
class TestAuthenticationFlow:
|
||||||
|
"""Tests for authentication flow (response_type=id)."""
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def auth_code_id_flow(self, flow_client):
|
||||||
|
"""Create an authorization code for the authentication flow."""
|
||||||
|
consent_data = {
|
||||||
|
"client_id": "https://app.example.com",
|
||||||
|
"redirect_uri": "https://app.example.com/callback",
|
||||||
|
"response_type": "id", # Authentication flow
|
||||||
|
"state": "test123",
|
||||||
|
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
|
||||||
|
"code_challenge_method": "S256",
|
||||||
|
"scope": "",
|
||||||
|
"me": "https://user.example.com",
|
||||||
|
}
|
||||||
|
|
||||||
|
response = flow_client.post(
|
||||||
|
"/authorize/consent",
|
||||||
|
data=consent_data,
|
||||||
|
follow_redirects=False
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 302
|
||||||
|
location = response.headers["location"]
|
||||||
|
|
||||||
|
from tests.conftest import extract_code_from_redirect
|
||||||
|
code = extract_code_from_redirect(location)
|
||||||
|
return code, consent_data
|
||||||
|
|
||||||
|
def test_auth_code_redemption_at_authorization_endpoint(self, flow_client, auth_code_id_flow):
|
||||||
|
"""Test authentication flow code is redeemed at authorization endpoint."""
|
||||||
|
code, consent_data = auth_code_id_flow
|
||||||
|
|
||||||
|
response = flow_client.post(
|
||||||
|
"/authorize",
|
||||||
|
data={
|
||||||
|
"code": code,
|
||||||
|
"client_id": consent_data["client_id"],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.json()
|
||||||
|
assert "me" in data
|
||||||
|
assert data["me"] == "https://user.example.com"
|
||||||
|
# Should NOT have access_token
|
||||||
|
assert "access_token" not in data
|
||||||
|
|
||||||
|
def test_auth_flow_returns_only_me(self, flow_client, auth_code_id_flow):
|
||||||
|
"""Test authentication response contains only 'me' field."""
|
||||||
|
code, consent_data = auth_code_id_flow
|
||||||
|
|
||||||
|
response = flow_client.post(
|
||||||
|
"/authorize",
|
||||||
|
data={
|
||||||
|
"code": code,
|
||||||
|
"client_id": consent_data["client_id"],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
assert set(data.keys()) == {"me"}
|
||||||
|
|
||||||
|
def test_auth_flow_code_single_use(self, flow_client, auth_code_id_flow):
|
||||||
|
"""Test authentication code can only be used once."""
|
||||||
|
code, consent_data = auth_code_id_flow
|
||||||
|
|
||||||
|
# First use - should succeed
|
||||||
|
response1 = flow_client.post(
|
||||||
|
"/authorize",
|
||||||
|
data={
|
||||||
|
"code": code,
|
||||||
|
"client_id": consent_data["client_id"],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
assert response1.status_code == 200
|
||||||
|
|
||||||
|
# Second use - should fail
|
||||||
|
response2 = flow_client.post(
|
||||||
|
"/authorize",
|
||||||
|
data={
|
||||||
|
"code": code,
|
||||||
|
"client_id": consent_data["client_id"],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
assert response2.status_code == 400
|
||||||
|
assert response2.json()["error"] == "invalid_grant"
|
||||||
|
|
||||||
|
def test_auth_flow_client_id_mismatch_rejected(self, flow_client, auth_code_id_flow):
|
||||||
|
"""Test wrong client_id is rejected."""
|
||||||
|
code, _ = auth_code_id_flow
|
||||||
|
|
||||||
|
response = flow_client.post(
|
||||||
|
"/authorize",
|
||||||
|
data={
|
||||||
|
"code": code,
|
||||||
|
"client_id": "https://wrong.example.com",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert response.json()["error"] == "invalid_client"
|
||||||
|
|
||||||
|
def test_auth_flow_redirect_uri_mismatch_rejected(self, flow_client, auth_code_id_flow):
|
||||||
|
"""Test wrong redirect_uri is rejected when provided."""
|
||||||
|
code, consent_data = auth_code_id_flow
|
||||||
|
|
||||||
|
response = flow_client.post(
|
||||||
|
"/authorize",
|
||||||
|
data={
|
||||||
|
"code": code,
|
||||||
|
"client_id": consent_data["client_id"],
|
||||||
|
"redirect_uri": "https://wrong.example.com/callback",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert response.json()["error"] == "invalid_grant"
|
||||||
|
|
||||||
|
def test_auth_flow_id_code_rejected_at_token_endpoint(self, flow_client, auth_code_id_flow):
|
||||||
|
"""Test authentication flow code is rejected at token endpoint."""
|
||||||
|
code, consent_data = auth_code_id_flow
|
||||||
|
|
||||||
|
response = flow_client.post(
|
||||||
|
"/token",
|
||||||
|
data={
|
||||||
|
"grant_type": "authorization_code",
|
||||||
|
"code": code,
|
||||||
|
"client_id": consent_data["client_id"],
|
||||||
|
"redirect_uri": consent_data["redirect_uri"],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
# Should indicate wrong endpoint
|
||||||
|
data = response.json()["detail"]
|
||||||
|
assert data["error"] == "invalid_grant"
|
||||||
|
assert "authorization endpoint" in data["error_description"]
|
||||||
|
|
||||||
|
def test_auth_flow_cache_headers(self, flow_client, auth_code_id_flow):
|
||||||
|
"""Test authentication response has no-cache headers."""
|
||||||
|
code, consent_data = auth_code_id_flow
|
||||||
|
|
||||||
|
response = flow_client.post(
|
||||||
|
"/authorize",
|
||||||
|
data={
|
||||||
|
"code": code,
|
||||||
|
"client_id": consent_data["client_id"],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.headers.get("Cache-Control") == "no-store"
|
||||||
|
assert response.headers.get("Pragma") == "no-cache"
|
||||||
|
|
||||||
|
|
||||||
|
class TestAuthorizationFlow:
|
||||||
|
"""Tests for authorization flow (response_type=code)."""
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def auth_code_code_flow(self, flow_client):
|
||||||
|
"""Create an authorization code for the authorization flow."""
|
||||||
|
consent_data = {
|
||||||
|
"client_id": "https://app.example.com",
|
||||||
|
"redirect_uri": "https://app.example.com/callback",
|
||||||
|
"response_type": "code", # Authorization flow
|
||||||
|
"state": "test456",
|
||||||
|
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
|
||||||
|
"code_challenge_method": "S256",
|
||||||
|
"scope": "profile",
|
||||||
|
"me": "https://user.example.com",
|
||||||
|
}
|
||||||
|
|
||||||
|
response = flow_client.post(
|
||||||
|
"/authorize/consent",
|
||||||
|
data=consent_data,
|
||||||
|
follow_redirects=False
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 302
|
||||||
|
location = response.headers["location"]
|
||||||
|
|
||||||
|
from tests.conftest import extract_code_from_redirect
|
||||||
|
code = extract_code_from_redirect(location)
|
||||||
|
return code, consent_data
|
||||||
|
|
||||||
|
def test_code_flow_redemption_at_token_endpoint(self, flow_client, auth_code_code_flow):
|
||||||
|
"""Test authorization flow code is redeemed at token endpoint."""
|
||||||
|
code, consent_data = auth_code_code_flow
|
||||||
|
|
||||||
|
response = flow_client.post(
|
||||||
|
"/token",
|
||||||
|
data={
|
||||||
|
"grant_type": "authorization_code",
|
||||||
|
"code": code,
|
||||||
|
"client_id": consent_data["client_id"],
|
||||||
|
"redirect_uri": consent_data["redirect_uri"],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.json()
|
||||||
|
assert "access_token" in data
|
||||||
|
assert "me" in data
|
||||||
|
assert data["me"] == "https://user.example.com"
|
||||||
|
assert data["token_type"] == "Bearer"
|
||||||
|
|
||||||
|
def test_code_flow_code_rejected_at_authorization_endpoint(self, flow_client, auth_code_code_flow):
|
||||||
|
"""Test authorization flow code is rejected at authorization endpoint."""
|
||||||
|
code, consent_data = auth_code_code_flow
|
||||||
|
|
||||||
|
response = flow_client.post(
|
||||||
|
"/authorize",
|
||||||
|
data={
|
||||||
|
"code": code,
|
||||||
|
"client_id": consent_data["client_id"],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
# Should indicate wrong endpoint
|
||||||
|
data = response.json()
|
||||||
|
assert data["error"] == "invalid_grant"
|
||||||
|
assert "token endpoint" in data["error_description"]
|
||||||
|
|
||||||
|
def test_code_flow_single_use(self, flow_client, auth_code_code_flow):
|
||||||
|
"""Test authorization code can only be used once."""
|
||||||
|
code, consent_data = auth_code_code_flow
|
||||||
|
|
||||||
|
# First use - should succeed
|
||||||
|
response1 = flow_client.post(
|
||||||
|
"/token",
|
||||||
|
data={
|
||||||
|
"grant_type": "authorization_code",
|
||||||
|
"code": code,
|
||||||
|
"client_id": consent_data["client_id"],
|
||||||
|
"redirect_uri": consent_data["redirect_uri"],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
assert response1.status_code == 200
|
||||||
|
|
||||||
|
# Second use - should fail
|
||||||
|
response2 = flow_client.post(
|
||||||
|
"/token",
|
||||||
|
data={
|
||||||
|
"grant_type": "authorization_code",
|
||||||
|
"code": code,
|
||||||
|
"client_id": consent_data["client_id"],
|
||||||
|
"redirect_uri": consent_data["redirect_uri"],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
assert response2.status_code == 400
|
||||||
|
|
||||||
|
|
||||||
|
class TestMetadataEndpoint:
|
||||||
|
"""Tests for server metadata endpoint."""
|
||||||
|
|
||||||
|
def test_metadata_includes_both_response_types(self, flow_client):
|
||||||
|
"""Test metadata advertises both response types."""
|
||||||
|
response = flow_client.get("/.well-known/oauth-authorization-server")
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.json()
|
||||||
|
assert "response_types_supported" in data
|
||||||
|
assert "code" in data["response_types_supported"]
|
||||||
|
assert "id" in data["response_types_supported"]
|
||||||
|
|
||||||
|
def test_metadata_includes_code_challenge_method(self, flow_client):
|
||||||
|
"""Test metadata advertises S256 code challenge method."""
|
||||||
|
response = flow_client.get("/.well-known/oauth-authorization-server")
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.json()
|
||||||
|
assert "code_challenge_methods_supported" in data
|
||||||
|
assert "S256" in data["code_challenge_methods_supported"]
|
||||||
|
|
||||||
|
|
||||||
|
class TestErrorScenarios:
|
||||||
|
"""Tests for error handling in both flows."""
|
||||||
|
|
||||||
|
def test_invalid_code_at_authorization_endpoint(self, flow_client):
|
||||||
|
"""Test invalid code returns error at authorization endpoint."""
|
||||||
|
response = flow_client.post(
|
||||||
|
"/authorize",
|
||||||
|
data={
|
||||||
|
"code": "invalid_code_12345",
|
||||||
|
"client_id": "https://app.example.com",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
data = response.json()
|
||||||
|
assert data["error"] == "invalid_grant"
|
||||||
|
|
||||||
|
def test_missing_code_at_authorization_endpoint(self, flow_client):
|
||||||
|
"""Test missing code returns validation error."""
|
||||||
|
response = flow_client.post(
|
||||||
|
"/authorize",
|
||||||
|
data={
|
||||||
|
"client_id": "https://app.example.com",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# FastAPI returns 422 for missing required form field
|
||||||
|
assert response.status_code == 422
|
||||||
|
|
||||||
|
def test_missing_client_id_at_authorization_endpoint(self, flow_client):
|
||||||
|
"""Test missing client_id returns validation error."""
|
||||||
|
response = flow_client.post(
|
||||||
|
"/authorize",
|
||||||
|
data={
|
||||||
|
"code": "some_code",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# FastAPI returns 422 for missing required form field
|
||||||
|
assert response.status_code == 422
|
||||||
@@ -32,13 +32,14 @@ def token_client(token_app):
|
|||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def setup_auth_code(token_app, test_code_storage):
|
def setup_auth_code(token_app, test_code_storage):
|
||||||
"""Setup a valid authorization code for testing."""
|
"""Setup a valid authorization code for testing (authorization flow)."""
|
||||||
from gondulf.dependencies import get_code_storage
|
from gondulf.dependencies import get_code_storage
|
||||||
|
|
||||||
code = "integration_test_code_12345"
|
code = "integration_test_code_12345"
|
||||||
metadata = {
|
metadata = {
|
||||||
"client_id": "https://app.example.com",
|
"client_id": "https://app.example.com",
|
||||||
"redirect_uri": "https://app.example.com/callback",
|
"redirect_uri": "https://app.example.com/callback",
|
||||||
|
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||||
"state": "xyz123",
|
"state": "xyz123",
|
||||||
"me": "https://user.example.com",
|
"me": "https://user.example.com",
|
||||||
"scope": "",
|
"scope": "",
|
||||||
@@ -212,6 +213,7 @@ class TestTokenExchangeErrors:
|
|||||||
metadata = {
|
metadata = {
|
||||||
"client_id": "https://app.example.com",
|
"client_id": "https://app.example.com",
|
||||||
"redirect_uri": "https://app.example.com/callback",
|
"redirect_uri": "https://app.example.com/callback",
|
||||||
|
"response_type": "code", # Authorization flow
|
||||||
"state": "xyz123",
|
"state": "xyz123",
|
||||||
"me": "https://user.example.com",
|
"me": "https://user.example.com",
|
||||||
"scope": "",
|
"scope": "",
|
||||||
|
|||||||
@@ -78,11 +78,11 @@ class TestMetadataEndpoint:
|
|||||||
assert data["token_endpoint"] == "https://auth.example.com/token"
|
assert data["token_endpoint"] == "https://auth.example.com/token"
|
||||||
|
|
||||||
def test_metadata_response_types_supported(self, client):
|
def test_metadata_response_types_supported(self, client):
|
||||||
"""Test response_types_supported contains only 'code'."""
|
"""Test response_types_supported contains both 'code' and 'id'."""
|
||||||
response = client.get("/.well-known/oauth-authorization-server")
|
response = client.get("/.well-known/oauth-authorization-server")
|
||||||
data = response.json()
|
data = response.json()
|
||||||
|
|
||||||
assert data["response_types_supported"] == ["code"]
|
assert data["response_types_supported"] == ["code", "id"]
|
||||||
|
|
||||||
def test_metadata_grant_types_supported(self, client):
|
def test_metadata_grant_types_supported(self, client):
|
||||||
"""Test grant_types_supported contains only 'authorization_code'."""
|
"""Test grant_types_supported contains only 'authorization_code'."""
|
||||||
@@ -91,12 +91,12 @@ class TestMetadataEndpoint:
|
|||||||
|
|
||||||
assert data["grant_types_supported"] == ["authorization_code"]
|
assert data["grant_types_supported"] == ["authorization_code"]
|
||||||
|
|
||||||
def test_metadata_code_challenge_methods_empty(self, client):
|
def test_metadata_code_challenge_methods_supported(self, client):
|
||||||
"""Test code_challenge_methods_supported is empty array."""
|
"""Test code_challenge_methods_supported contains S256."""
|
||||||
response = client.get("/.well-known/oauth-authorization-server")
|
response = client.get("/.well-known/oauth-authorization-server")
|
||||||
data = response.json()
|
data = response.json()
|
||||||
|
|
||||||
assert data["code_challenge_methods_supported"] == []
|
assert data["code_challenge_methods_supported"] == ["S256"]
|
||||||
|
|
||||||
def test_metadata_token_endpoint_auth_methods(self, client):
|
def test_metadata_token_endpoint_auth_methods(self, client):
|
||||||
"""Test token_endpoint_auth_methods_supported contains 'none'."""
|
"""Test token_endpoint_auth_methods_supported contains 'none'."""
|
||||||
|
|||||||
@@ -71,11 +71,12 @@ def client(test_config, test_database, test_code_storage, test_token_service):
|
|||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def valid_auth_code(test_code_storage):
|
def valid_auth_code(test_code_storage):
|
||||||
"""Create a valid authorization code."""
|
"""Create a valid authorization code (authorization flow)."""
|
||||||
code = "test_auth_code_12345"
|
code = "test_auth_code_12345"
|
||||||
metadata = {
|
metadata = {
|
||||||
"client_id": "https://client.example.com",
|
"client_id": "https://client.example.com",
|
||||||
"redirect_uri": "https://client.example.com/callback",
|
"redirect_uri": "https://client.example.com/callback",
|
||||||
|
"response_type": "code", # Authorization flow - exchange at token endpoint
|
||||||
"state": "xyz123",
|
"state": "xyz123",
|
||||||
"me": "https://user.example.com",
|
"me": "https://user.example.com",
|
||||||
"scope": "",
|
"scope": "",
|
||||||
|
|||||||
Reference in New Issue
Block a user