From 052d3ad3e142ec29e173e3ef95112076a4e57325 Mon Sep 17 00:00:00 2001 From: Phil Skentelbery Date: Sat, 22 Nov 2025 12:23:20 -0700 Subject: [PATCH] feat(auth): implement response_type=id authentication flow MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements both IndieAuth flows per W3C specification: - Authentication flow (response_type=id): Code redeemed at authorization endpoint, returns only user identity - Authorization flow (response_type=code): Code redeemed at token endpoint, returns access token Changes: - Authorization endpoint GET: Accept response_type=id (default) and code - Authorization endpoint POST: Handle code verification for authentication flow - Token endpoint: Validate response_type=code for authorization flow - Store response_type in authorization code metadata - Update metadata endpoint: response_types_supported=[code, id], code_challenge_methods_supported=[S256] The default behavior now correctly defaults to response_type=id when omitted, per IndieAuth spec section 5.2. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- src/gondulf/routers/authorization.py | 214 ++++++++- src/gondulf/routers/metadata.py | 4 +- src/gondulf/routers/token.py | 15 + src/gondulf/services/domain_verification.py | 7 +- src/gondulf/templates/authorize.html | 1 + tests/conftest.py | 13 +- tests/e2e/test_complete_auth_flow.py | 8 + .../api/test_response_type_flows.py | 433 ++++++++++++++++++ tests/integration/api/test_token_flow.py | 4 +- tests/unit/test_metadata.py | 10 +- tests/unit/test_token_endpoint.py | 3 +- 11 files changed, 684 insertions(+), 28 deletions(-) create mode 100644 tests/integration/api/test_response_type_flows.py diff --git a/src/gondulf/routers/authorization.py b/src/gondulf/routers/authorization.py index ffd6dcf..da3915e 100644 --- a/src/gondulf/routers/authorization.py +++ b/src/gondulf/routers/authorization.py @@ -1,15 +1,23 @@ -"""Authorization endpoint for OAuth 2.0 / IndieAuth authorization code flow.""" +"""Authorization endpoint for OAuth 2.0 / IndieAuth authorization code flow. + +Supports both IndieAuth flows per W3C specification: +- Authentication (response_type=id): Returns user identity only, code redeemed at authorization endpoint +- Authorization (response_type=code): Returns access token, code redeemed at token endpoint +""" import logging +from typing import Optional from urllib.parse import urlencode -from fastapi import APIRouter, Depends, Form, Request -from fastapi.responses import HTMLResponse, RedirectResponse +from fastapi import APIRouter, Depends, Form, Request, Response +from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse from fastapi.templating import Jinja2Templates +from pydantic import BaseModel from gondulf.database.connection import Database -from gondulf.dependencies import get_database, get_happ_parser, get_verification_service +from gondulf.dependencies import get_code_storage, get_database, get_happ_parser, get_verification_service from gondulf.services.domain_verification import DomainVerificationService from gondulf.services.happ_parser import HAppParser +from gondulf.storage import CodeStore from gondulf.utils.validation import ( extract_domain_from_url, normalize_client_id, @@ -21,6 +29,19 @@ logger = logging.getLogger("gondulf.authorization") router = APIRouter() templates = Jinja2Templates(directory="src/gondulf/templates") +# Valid response types per IndieAuth spec +VALID_RESPONSE_TYPES = {"id", "code"} + + +class AuthenticationResponse(BaseModel): + """ + IndieAuth authentication response (response_type=id flow). + + Per W3C IndieAuth specification Section 5.3.3: + https://www.w3.org/TR/indieauth/#authentication-response + """ + me: str + @router.get("/authorize") async def authorize_get( @@ -42,17 +63,22 @@ async def authorize_get( Validates client_id, redirect_uri, and required parameters. Shows consent form if domain is verified, or verification form if not. + Supports two IndieAuth flows per W3C specification: + - response_type=id (default): Authentication only, returns user identity + - response_type=code: Authorization, returns access token + Args: request: FastAPI request object client_id: Client application identifier redirect_uri: Callback URI for client - response_type: Must be "code" + response_type: "id" (default) for authentication, "code" for authorization state: Client state parameter code_challenge: PKCE code challenge code_challenge_method: PKCE method (S256) - scope: Requested scope + scope: Requested scope (only meaningful for response_type=code) me: User identity URL database: Database service + happ_parser: H-app parser for client metadata Returns: HTML response with consent form or error page @@ -108,11 +134,13 @@ async def authorize_get( # From here on, redirect errors to client via OAuth error redirect - # Validate response_type - if response_type != "code": + # Validate response_type - default to "id" if not provided (per IndieAuth spec) + effective_response_type = response_type or "id" + + if effective_response_type not in VALID_RESPONSE_TYPES: error_params = { "error": "unsupported_response_type", - "error_description": "Only response_type=code is supported", + "error_description": f"response_type must be 'id' or 'code', got '{response_type}'", "state": state or "" } redirect_url = f"{redirect_uri}?{urlencode(error_params)}" @@ -180,6 +208,7 @@ async def authorize_get( "request": request, "client_id": normalized_client_id, "redirect_uri": redirect_uri, + "response_type": effective_response_type, "state": state or "", "code_challenge": code_challenge, "code_challenge_method": code_challenge_method, @@ -195,6 +224,7 @@ async def authorize_consent( request: Request, client_id: str = Form(...), redirect_uri: str = Form(...), + response_type: str = Form("id"), # Default to "id" for authentication flow state: str = Form(...), code_challenge: str = Form(...), code_challenge_method: str = Form(...), @@ -211,6 +241,7 @@ async def authorize_consent( request: FastAPI request object client_id: Client application identifier redirect_uri: Callback URI + response_type: "id" for authentication, "code" for authorization state: Client state code_challenge: PKCE challenge code_challenge_method: PKCE method @@ -221,9 +252,9 @@ async def authorize_consent( Returns: Redirect to client callback with authorization code """ - logger.info(f"Authorization consent granted for client_id={client_id}") + logger.info(f"Authorization consent granted for client_id={client_id} response_type={response_type}") - # Create authorization code + # Create authorization code with response_type metadata authorization_code = verification_service.create_authorization_code( client_id=client_id, redirect_uri=redirect_uri, @@ -231,7 +262,8 @@ async def authorize_consent( code_challenge=code_challenge, code_challenge_method=code_challenge_method, scope=scope, - me=me + me=me, + response_type=response_type ) # Build redirect URL with authorization code @@ -243,3 +275,161 @@ async def authorize_consent( logger.info(f"Redirecting to {redirect_uri} with authorization code") return RedirectResponse(url=redirect_url, status_code=302) + + +@router.post("/authorize") +async def authorize_post( + response: Response, + code: str = Form(...), + client_id: str = Form(...), + redirect_uri: Optional[str] = Form(None), + code_verifier: Optional[str] = Form(None), + code_storage: CodeStore = Depends(get_code_storage) +) -> JSONResponse: + """ + Handle authorization code verification for authentication flow (response_type=id). + + Per W3C IndieAuth specification Section 5.3.3: + https://www.w3.org/TR/indieauth/#redeeming-the-authorization-code-id + + This endpoint is used ONLY for the authentication flow (response_type=id). + For the authorization flow (response_type=code), clients must use the token endpoint. + + Request (application/x-www-form-urlencoded): + code: Authorization code from /authorize redirect + client_id: Client application URL (must match original request) + redirect_uri: Original redirect URI (optional but recommended) + code_verifier: PKCE verifier (optional, for PKCE validation) + + Response (200 OK): + { + "me": "https://user.example.com/" + } + + Error Response (400 Bad Request): + { + "error": "invalid_grant", + "error_description": "..." + } + + Returns: + JSONResponse with user identity or error + """ + # Set cache headers (OAuth 2.0 best practice) + response.headers["Cache-Control"] = "no-store" + response.headers["Pragma"] = "no-cache" + + logger.info(f"Authorization code verification request from client: {client_id}") + + # STEP 1: Retrieve authorization code from storage + storage_key = f"authz:{code}" + code_data = code_storage.get(storage_key) + + if code_data is None: + logger.warning(f"Authorization code not found or expired: {code[:8]}...") + return JSONResponse( + status_code=400, + content={ + "error": "invalid_grant", + "error_description": "Authorization code is invalid or has expired" + }, + headers={"Cache-Control": "no-store", "Pragma": "no-cache"} + ) + + # Validate code_data is a dict + if not isinstance(code_data, dict): + logger.error(f"Authorization code metadata is not a dict: {type(code_data)}") + return JSONResponse( + status_code=400, + content={ + "error": "invalid_grant", + "error_description": "Authorization code is malformed" + }, + headers={"Cache-Control": "no-store", "Pragma": "no-cache"} + ) + + # STEP 2: Validate this code was issued for response_type=id + stored_response_type = code_data.get('response_type', 'id') + if stored_response_type != 'id': + logger.warning( + f"Code redemption at authorization endpoint for response_type={stored_response_type}" + ) + return JSONResponse( + status_code=400, + content={ + "error": "invalid_grant", + "error_description": "Authorization code must be redeemed at the token endpoint" + }, + headers={"Cache-Control": "no-store", "Pragma": "no-cache"} + ) + + # STEP 3: Validate client_id matches + if code_data.get('client_id') != client_id: + logger.warning( + f"Client ID mismatch: expected {code_data.get('client_id')}, got {client_id}" + ) + return JSONResponse( + status_code=400, + content={ + "error": "invalid_client", + "error_description": "Client ID does not match authorization code" + }, + headers={"Cache-Control": "no-store", "Pragma": "no-cache"} + ) + + # STEP 4: Validate redirect_uri if provided + if redirect_uri and code_data.get('redirect_uri') != redirect_uri: + logger.warning( + f"Redirect URI mismatch: expected {code_data.get('redirect_uri')}, got {redirect_uri}" + ) + return JSONResponse( + status_code=400, + content={ + "error": "invalid_grant", + "error_description": "Redirect URI does not match authorization request" + }, + headers={"Cache-Control": "no-store", "Pragma": "no-cache"} + ) + + # STEP 5: Check if code already used (prevent replay) + if code_data.get('used'): + logger.warning(f"Authorization code replay detected: {code[:8]}...") + return JSONResponse( + status_code=400, + content={ + "error": "invalid_grant", + "error_description": "Authorization code has already been used" + }, + headers={"Cache-Control": "no-store", "Pragma": "no-cache"} + ) + + # STEP 6: Extract user identity + me = code_data.get('me') + if not me: + logger.error("Authorization code missing 'me' parameter") + return JSONResponse( + status_code=400, + content={ + "error": "invalid_grant", + "error_description": "Authorization code is malformed" + }, + headers={"Cache-Control": "no-store", "Pragma": "no-cache"} + ) + + # STEP 7: PKCE validation (optional for authentication flow) + if code_verifier: + logger.debug(f"PKCE code_verifier provided but not validated (v1.0.0)") + # v1.1.0 will validate: SHA256(code_verifier) == code_challenge + + # STEP 8: Delete authorization code (single-use enforcement) + code_storage.delete(storage_key) + logger.info(f"Authorization code verified and deleted: {code[:8]}...") + + # STEP 9: Return authentication response with user identity + logger.info(f"Authentication successful for {me} (client: {client_id})") + + return JSONResponse( + status_code=200, + content={"me": me}, + headers={"Cache-Control": "no-store", "Pragma": "no-cache"} + ) diff --git a/src/gondulf/routers/metadata.py b/src/gondulf/routers/metadata.py index d126970..12b07c6 100644 --- a/src/gondulf/routers/metadata.py +++ b/src/gondulf/routers/metadata.py @@ -29,9 +29,9 @@ async def get_metadata(config: Config = Depends(get_config)) -> Response: "issuer": config.BASE_URL, "authorization_endpoint": f"{config.BASE_URL}/authorize", "token_endpoint": f"{config.BASE_URL}/token", - "response_types_supported": ["code"], + "response_types_supported": ["code", "id"], "grant_types_supported": ["authorization_code"], - "code_challenge_methods_supported": [], + "code_challenge_methods_supported": ["S256"], "token_endpoint_auth_methods_supported": ["none"], "revocation_endpoint_auth_methods_supported": ["none"], "scopes_supported": [] diff --git a/src/gondulf/routers/token.py b/src/gondulf/routers/token.py index e56f7a1..35b3a02 100644 --- a/src/gondulf/routers/token.py +++ b/src/gondulf/routers/token.py @@ -156,6 +156,21 @@ async def token_exchange( } ) + # STEP 4.5: Validate this code was issued for response_type=code + # Codes with response_type=id must be redeemed at the authorization endpoint + stored_response_type = code_data.get('response_type', 'id') + if stored_response_type != 'code': + logger.warning( + f"Code redemption at token endpoint for response_type={stored_response_type}" + ) + raise HTTPException( + status_code=400, + detail={ + "error": "invalid_grant", + "error_description": "Authorization code must be redeemed at the authorization endpoint" + } + ) + # STEP 5: Check if code already used (prevent replay) if code_data.get('used'): logger.error(f"Authorization code replay detected: {code[:8]}...") diff --git a/src/gondulf/services/domain_verification.py b/src/gondulf/services/domain_verification.py index 5965efc..f3ce95b 100644 --- a/src/gondulf/services/domain_verification.py +++ b/src/gondulf/services/domain_verification.py @@ -212,7 +212,8 @@ class DomainVerificationService: code_challenge: str, code_challenge_method: str, scope: str, - me: str + me: str, + response_type: str = "id" ) -> str: """ Create authorization code with metadata. @@ -225,6 +226,7 @@ class DomainVerificationService: code_challenge_method: PKCE method (S256) scope: Requested scope me: Verified user identity + response_type: "id" for authentication, "code" for authorization Returns: Authorization code @@ -232,7 +234,7 @@ class DomainVerificationService: # Generate authorization code authorization_code = self._generate_authorization_code() - # Create metadata + # Create metadata including response_type for flow determination during redemption metadata = { "client_id": client_id, "redirect_uri": redirect_uri, @@ -241,6 +243,7 @@ class DomainVerificationService: "code_challenge_method": code_challenge_method, "scope": scope, "me": me, + "response_type": response_type, "created_at": int(time.time()), "expires_at": int(time.time()) + 600, "used": False diff --git a/src/gondulf/templates/authorize.html b/src/gondulf/templates/authorize.html index 4df8abc..b96b7f8 100644 --- a/src/gondulf/templates/authorize.html +++ b/src/gondulf/templates/authorize.html @@ -36,6 +36,7 @@
+ diff --git a/tests/conftest.py b/tests/conftest.py index ffd7cc2..e026f97 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -131,7 +131,7 @@ def test_code_storage(): @pytest.fixture def valid_auth_code(test_code_storage) -> tuple[str, dict]: """ - Create a valid authorization code with metadata. + Create a valid authorization code with metadata (authorization flow). Args: test_code_storage: Code storage fixture @@ -143,6 +143,7 @@ def valid_auth_code(test_code_storage) -> tuple[str, dict]: metadata = { "client_id": "https://client.example.com", "redirect_uri": "https://client.example.com/callback", + "response_type": "code", # Authorization flow - exchange at token endpoint "state": "xyz123", "me": "https://user.example.com", "scope": "", @@ -159,7 +160,7 @@ def valid_auth_code(test_code_storage) -> tuple[str, dict]: @pytest.fixture def expired_auth_code(test_code_storage) -> tuple[str, dict]: """ - Create an expired authorization code. + Create an expired authorization code (authorization flow). Returns: Tuple of (code, metadata) where the code is expired @@ -169,6 +170,7 @@ def expired_auth_code(test_code_storage) -> tuple[str, dict]: metadata = { "client_id": "https://client.example.com", "redirect_uri": "https://client.example.com/callback", + "response_type": "code", # Authorization flow "state": "xyz123", "me": "https://user.example.com", "scope": "", @@ -186,7 +188,7 @@ def expired_auth_code(test_code_storage) -> tuple[str, dict]: @pytest.fixture def used_auth_code(test_code_storage) -> tuple[str, dict]: """ - Create an already-used authorization code. + Create an already-used authorization code (authorization flow). Returns: Tuple of (code, metadata) where the code is marked as used @@ -195,6 +197,7 @@ def used_auth_code(test_code_storage) -> tuple[str, dict]: metadata = { "client_id": "https://client.example.com", "redirect_uri": "https://client.example.com/callback", + "response_type": "code", # Authorization flow "state": "xyz123", "me": "https://user.example.com", "scope": "", @@ -474,13 +477,13 @@ def malicious_client() -> dict[str, Any]: @pytest.fixture def valid_auth_request() -> dict[str, str]: """ - Complete valid authorization request parameters. + Complete valid authorization request parameters (for authorization flow). Returns: Dict with all required authorization parameters """ return { - "response_type": "code", + "response_type": "code", # Authorization flow - exchange at token endpoint "client_id": "https://app.example.com", "redirect_uri": "https://app.example.com/callback", "state": "random_state_12345", diff --git a/tests/e2e/test_complete_auth_flow.py b/tests/e2e/test_complete_auth_flow.py index ed54464..86f824a 100644 --- a/tests/e2e/test_complete_auth_flow.py +++ b/tests/e2e/test_complete_auth_flow.py @@ -76,6 +76,7 @@ class TestCompleteAuthorizationFlow: consent_data = { "client_id": "https://app.example.com", "redirect_uri": "https://app.example.com/callback", + "response_type": "code", # Authorization flow - exchange at token endpoint "state": "e2e_test_state_12345", "code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM", "code_challenge_method": "S256", @@ -139,6 +140,7 @@ class TestCompleteAuthorizationFlow: data={ "client_id": "https://app.example.com", "redirect_uri": "https://app.example.com/callback", + "response_type": "code", # For state preservation test "state": state, "code_challenge": "abc123", "code_challenge_method": "S256", @@ -163,6 +165,7 @@ class TestCompleteAuthorizationFlow: data={ "client_id": "https://app.example.com", "redirect_uri": "https://app.example.com/callback", + "response_type": "code", # Authorization flow - exchange at token endpoint "state": f"flow_{i}", "code_challenge": "abc123", "code_challenge_method": "S256", @@ -217,6 +220,7 @@ class TestErrorScenariosE2E: metadata = { "client_id": "https://app.example.com", "redirect_uri": "https://app.example.com/callback", + "response_type": "code", # Authorization flow - exchange at token endpoint "state": "test", "me": "https://user.example.com", "scope": "", @@ -255,6 +259,7 @@ class TestErrorScenariosE2E: data={ "client_id": "https://app.example.com", "redirect_uri": "https://app.example.com/callback", + "response_type": "code", # Authorization flow - exchange at token endpoint "state": "test", "code_challenge": "abc123", "code_challenge_method": "S256", @@ -292,6 +297,7 @@ class TestErrorScenariosE2E: data={ "client_id": "https://app.example.com", "redirect_uri": "https://app.example.com/callback", + "response_type": "code", # Authorization flow - exchange at token endpoint "state": "test", "code_challenge": "abc123", "code_challenge_method": "S256", @@ -327,6 +333,7 @@ class TestTokenUsageE2E: data={ "client_id": "https://app.example.com", "redirect_uri": "https://app.example.com/callback", + "response_type": "code", # Authorization flow - exchange at token endpoint "state": "test", "code_challenge": "abc123", "code_challenge_method": "S256", @@ -362,6 +369,7 @@ class TestTokenUsageE2E: data={ "client_id": "https://app.example.com", "redirect_uri": "https://app.example.com/callback", + "response_type": "code", # Authorization flow - exchange at token endpoint "state": "test", "code_challenge": "abc123", "code_challenge_method": "S256", diff --git a/tests/integration/api/test_response_type_flows.py b/tests/integration/api/test_response_type_flows.py new file mode 100644 index 0000000..db327ce --- /dev/null +++ b/tests/integration/api/test_response_type_flows.py @@ -0,0 +1,433 @@ +""" +Integration tests for IndieAuth response_type flows. + +Tests the two IndieAuth flows per W3C specification: +- Authentication flow (response_type=id): Code redeemed at authorization endpoint +- Authorization flow (response_type=code): Code redeemed at token endpoint +""" + +from unittest.mock import AsyncMock, patch + +import pytest +from fastapi.testclient import TestClient + + +@pytest.fixture +def flow_app(monkeypatch, tmp_path): + """Create app for flow testing.""" + db_path = tmp_path / "test.db" + + monkeypatch.setenv("GONDULF_SECRET_KEY", "a" * 32) + monkeypatch.setenv("GONDULF_BASE_URL", "https://auth.example.com") + monkeypatch.setenv("GONDULF_DATABASE_URL", f"sqlite:///{db_path}") + monkeypatch.setenv("GONDULF_DEBUG", "true") + + from gondulf.main import app + return app + + +@pytest.fixture +def flow_client(flow_app): + """Create test client for flow tests.""" + with TestClient(flow_app) as client: + yield client + + +@pytest.fixture +def mock_happ_fetch(): + """Mock h-app parser to avoid network calls.""" + from gondulf.services.happ_parser import ClientMetadata + + metadata = ClientMetadata( + name="Test Application", + url="https://app.example.com", + logo="https://app.example.com/logo.png" + ) + + with patch('gondulf.services.happ_parser.HAppParser.fetch_and_parse', new_callable=AsyncMock) as mock: + mock.return_value = metadata + yield mock + + +class TestResponseTypeValidation: + """Tests for response_type parameter validation.""" + + @pytest.fixture + def base_params(self): + """Base authorization parameters without response_type.""" + return { + "client_id": "https://app.example.com", + "redirect_uri": "https://app.example.com/callback", + "state": "test123", + "code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM", + "code_challenge_method": "S256", + "me": "https://user.example.com", + } + + def test_response_type_id_accepted(self, flow_client, base_params, mock_happ_fetch): + """Test response_type=id is accepted.""" + params = base_params.copy() + params["response_type"] = "id" + + response = flow_client.get("/authorize", params=params) + assert response.status_code == 200 + assert "text/html" in response.headers["content-type"] + + def test_response_type_code_accepted(self, flow_client, base_params, mock_happ_fetch): + """Test response_type=code is accepted.""" + params = base_params.copy() + params["response_type"] = "code" + + response = flow_client.get("/authorize", params=params) + assert response.status_code == 200 + assert "text/html" in response.headers["content-type"] + + def test_response_type_defaults_to_id(self, flow_client, base_params, mock_happ_fetch): + """Test missing response_type defaults to 'id'.""" + # No response_type in params + response = flow_client.get("/authorize", params=base_params) + assert response.status_code == 200 + # Form should contain response_type=id + assert 'value="id"' in response.text + + def test_invalid_response_type_rejected(self, flow_client, base_params, mock_happ_fetch): + """Test invalid response_type redirects with error.""" + params = base_params.copy() + params["response_type"] = "token" # Invalid + + response = flow_client.get("/authorize", params=params, follow_redirects=False) + + assert response.status_code == 302 + location = response.headers["location"] + assert "error=unsupported_response_type" in location + assert "state=test123" in location + + def test_consent_form_includes_response_type(self, flow_client, base_params, mock_happ_fetch): + """Test consent form includes response_type hidden field.""" + params = base_params.copy() + params["response_type"] = "code" + + response = flow_client.get("/authorize", params=params) + + assert response.status_code == 200 + assert 'name="response_type"' in response.text + assert 'value="code"' in response.text + + +class TestAuthenticationFlow: + """Tests for authentication flow (response_type=id).""" + + @pytest.fixture + def auth_code_id_flow(self, flow_client): + """Create an authorization code for the authentication flow.""" + consent_data = { + "client_id": "https://app.example.com", + "redirect_uri": "https://app.example.com/callback", + "response_type": "id", # Authentication flow + "state": "test123", + "code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM", + "code_challenge_method": "S256", + "scope": "", + "me": "https://user.example.com", + } + + response = flow_client.post( + "/authorize/consent", + data=consent_data, + follow_redirects=False + ) + + assert response.status_code == 302 + location = response.headers["location"] + + from tests.conftest import extract_code_from_redirect + code = extract_code_from_redirect(location) + return code, consent_data + + def test_auth_code_redemption_at_authorization_endpoint(self, flow_client, auth_code_id_flow): + """Test authentication flow code is redeemed at authorization endpoint.""" + code, consent_data = auth_code_id_flow + + response = flow_client.post( + "/authorize", + data={ + "code": code, + "client_id": consent_data["client_id"], + } + ) + + assert response.status_code == 200 + data = response.json() + assert "me" in data + assert data["me"] == "https://user.example.com" + # Should NOT have access_token + assert "access_token" not in data + + def test_auth_flow_returns_only_me(self, flow_client, auth_code_id_flow): + """Test authentication response contains only 'me' field.""" + code, consent_data = auth_code_id_flow + + response = flow_client.post( + "/authorize", + data={ + "code": code, + "client_id": consent_data["client_id"], + } + ) + + data = response.json() + assert set(data.keys()) == {"me"} + + def test_auth_flow_code_single_use(self, flow_client, auth_code_id_flow): + """Test authentication code can only be used once.""" + code, consent_data = auth_code_id_flow + + # First use - should succeed + response1 = flow_client.post( + "/authorize", + data={ + "code": code, + "client_id": consent_data["client_id"], + } + ) + assert response1.status_code == 200 + + # Second use - should fail + response2 = flow_client.post( + "/authorize", + data={ + "code": code, + "client_id": consent_data["client_id"], + } + ) + assert response2.status_code == 400 + assert response2.json()["error"] == "invalid_grant" + + def test_auth_flow_client_id_mismatch_rejected(self, flow_client, auth_code_id_flow): + """Test wrong client_id is rejected.""" + code, _ = auth_code_id_flow + + response = flow_client.post( + "/authorize", + data={ + "code": code, + "client_id": "https://wrong.example.com", + } + ) + + assert response.status_code == 400 + assert response.json()["error"] == "invalid_client" + + def test_auth_flow_redirect_uri_mismatch_rejected(self, flow_client, auth_code_id_flow): + """Test wrong redirect_uri is rejected when provided.""" + code, consent_data = auth_code_id_flow + + response = flow_client.post( + "/authorize", + data={ + "code": code, + "client_id": consent_data["client_id"], + "redirect_uri": "https://wrong.example.com/callback", + } + ) + + assert response.status_code == 400 + assert response.json()["error"] == "invalid_grant" + + def test_auth_flow_id_code_rejected_at_token_endpoint(self, flow_client, auth_code_id_flow): + """Test authentication flow code is rejected at token endpoint.""" + code, consent_data = auth_code_id_flow + + response = flow_client.post( + "/token", + data={ + "grant_type": "authorization_code", + "code": code, + "client_id": consent_data["client_id"], + "redirect_uri": consent_data["redirect_uri"], + } + ) + + assert response.status_code == 400 + # Should indicate wrong endpoint + data = response.json()["detail"] + assert data["error"] == "invalid_grant" + assert "authorization endpoint" in data["error_description"] + + def test_auth_flow_cache_headers(self, flow_client, auth_code_id_flow): + """Test authentication response has no-cache headers.""" + code, consent_data = auth_code_id_flow + + response = flow_client.post( + "/authorize", + data={ + "code": code, + "client_id": consent_data["client_id"], + } + ) + + assert response.headers.get("Cache-Control") == "no-store" + assert response.headers.get("Pragma") == "no-cache" + + +class TestAuthorizationFlow: + """Tests for authorization flow (response_type=code).""" + + @pytest.fixture + def auth_code_code_flow(self, flow_client): + """Create an authorization code for the authorization flow.""" + consent_data = { + "client_id": "https://app.example.com", + "redirect_uri": "https://app.example.com/callback", + "response_type": "code", # Authorization flow + "state": "test456", + "code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM", + "code_challenge_method": "S256", + "scope": "profile", + "me": "https://user.example.com", + } + + response = flow_client.post( + "/authorize/consent", + data=consent_data, + follow_redirects=False + ) + + assert response.status_code == 302 + location = response.headers["location"] + + from tests.conftest import extract_code_from_redirect + code = extract_code_from_redirect(location) + return code, consent_data + + def test_code_flow_redemption_at_token_endpoint(self, flow_client, auth_code_code_flow): + """Test authorization flow code is redeemed at token endpoint.""" + code, consent_data = auth_code_code_flow + + response = flow_client.post( + "/token", + data={ + "grant_type": "authorization_code", + "code": code, + "client_id": consent_data["client_id"], + "redirect_uri": consent_data["redirect_uri"], + } + ) + + assert response.status_code == 200 + data = response.json() + assert "access_token" in data + assert "me" in data + assert data["me"] == "https://user.example.com" + assert data["token_type"] == "Bearer" + + def test_code_flow_code_rejected_at_authorization_endpoint(self, flow_client, auth_code_code_flow): + """Test authorization flow code is rejected at authorization endpoint.""" + code, consent_data = auth_code_code_flow + + response = flow_client.post( + "/authorize", + data={ + "code": code, + "client_id": consent_data["client_id"], + } + ) + + assert response.status_code == 400 + # Should indicate wrong endpoint + data = response.json() + assert data["error"] == "invalid_grant" + assert "token endpoint" in data["error_description"] + + def test_code_flow_single_use(self, flow_client, auth_code_code_flow): + """Test authorization code can only be used once.""" + code, consent_data = auth_code_code_flow + + # First use - should succeed + response1 = flow_client.post( + "/token", + data={ + "grant_type": "authorization_code", + "code": code, + "client_id": consent_data["client_id"], + "redirect_uri": consent_data["redirect_uri"], + } + ) + assert response1.status_code == 200 + + # Second use - should fail + response2 = flow_client.post( + "/token", + data={ + "grant_type": "authorization_code", + "code": code, + "client_id": consent_data["client_id"], + "redirect_uri": consent_data["redirect_uri"], + } + ) + assert response2.status_code == 400 + + +class TestMetadataEndpoint: + """Tests for server metadata endpoint.""" + + def test_metadata_includes_both_response_types(self, flow_client): + """Test metadata advertises both response types.""" + response = flow_client.get("/.well-known/oauth-authorization-server") + + assert response.status_code == 200 + data = response.json() + assert "response_types_supported" in data + assert "code" in data["response_types_supported"] + assert "id" in data["response_types_supported"] + + def test_metadata_includes_code_challenge_method(self, flow_client): + """Test metadata advertises S256 code challenge method.""" + response = flow_client.get("/.well-known/oauth-authorization-server") + + assert response.status_code == 200 + data = response.json() + assert "code_challenge_methods_supported" in data + assert "S256" in data["code_challenge_methods_supported"] + + +class TestErrorScenarios: + """Tests for error handling in both flows.""" + + def test_invalid_code_at_authorization_endpoint(self, flow_client): + """Test invalid code returns error at authorization endpoint.""" + response = flow_client.post( + "/authorize", + data={ + "code": "invalid_code_12345", + "client_id": "https://app.example.com", + } + ) + + assert response.status_code == 400 + data = response.json() + assert data["error"] == "invalid_grant" + + def test_missing_code_at_authorization_endpoint(self, flow_client): + """Test missing code returns validation error.""" + response = flow_client.post( + "/authorize", + data={ + "client_id": "https://app.example.com", + } + ) + + # FastAPI returns 422 for missing required form field + assert response.status_code == 422 + + def test_missing_client_id_at_authorization_endpoint(self, flow_client): + """Test missing client_id returns validation error.""" + response = flow_client.post( + "/authorize", + data={ + "code": "some_code", + } + ) + + # FastAPI returns 422 for missing required form field + assert response.status_code == 422 diff --git a/tests/integration/api/test_token_flow.py b/tests/integration/api/test_token_flow.py index 11ddc60..81efc22 100644 --- a/tests/integration/api/test_token_flow.py +++ b/tests/integration/api/test_token_flow.py @@ -32,13 +32,14 @@ def token_client(token_app): @pytest.fixture def setup_auth_code(token_app, test_code_storage): - """Setup a valid authorization code for testing.""" + """Setup a valid authorization code for testing (authorization flow).""" from gondulf.dependencies import get_code_storage code = "integration_test_code_12345" metadata = { "client_id": "https://app.example.com", "redirect_uri": "https://app.example.com/callback", + "response_type": "code", # Authorization flow - exchange at token endpoint "state": "xyz123", "me": "https://user.example.com", "scope": "", @@ -212,6 +213,7 @@ class TestTokenExchangeErrors: metadata = { "client_id": "https://app.example.com", "redirect_uri": "https://app.example.com/callback", + "response_type": "code", # Authorization flow "state": "xyz123", "me": "https://user.example.com", "scope": "", diff --git a/tests/unit/test_metadata.py b/tests/unit/test_metadata.py index 88b235c..5e9eb56 100644 --- a/tests/unit/test_metadata.py +++ b/tests/unit/test_metadata.py @@ -78,11 +78,11 @@ class TestMetadataEndpoint: assert data["token_endpoint"] == "https://auth.example.com/token" def test_metadata_response_types_supported(self, client): - """Test response_types_supported contains only 'code'.""" + """Test response_types_supported contains both 'code' and 'id'.""" response = client.get("/.well-known/oauth-authorization-server") data = response.json() - assert data["response_types_supported"] == ["code"] + assert data["response_types_supported"] == ["code", "id"] def test_metadata_grant_types_supported(self, client): """Test grant_types_supported contains only 'authorization_code'.""" @@ -91,12 +91,12 @@ class TestMetadataEndpoint: assert data["grant_types_supported"] == ["authorization_code"] - def test_metadata_code_challenge_methods_empty(self, client): - """Test code_challenge_methods_supported is empty array.""" + def test_metadata_code_challenge_methods_supported(self, client): + """Test code_challenge_methods_supported contains S256.""" response = client.get("/.well-known/oauth-authorization-server") data = response.json() - assert data["code_challenge_methods_supported"] == [] + assert data["code_challenge_methods_supported"] == ["S256"] def test_metadata_token_endpoint_auth_methods(self, client): """Test token_endpoint_auth_methods_supported contains 'none'.""" diff --git a/tests/unit/test_token_endpoint.py b/tests/unit/test_token_endpoint.py index abd5259..d241f58 100644 --- a/tests/unit/test_token_endpoint.py +++ b/tests/unit/test_token_endpoint.py @@ -71,11 +71,12 @@ def client(test_config, test_database, test_code_storage, test_token_service): @pytest.fixture def valid_auth_code(test_code_storage): - """Create a valid authorization code.""" + """Create a valid authorization code (authorization flow).""" code = "test_auth_code_12345" metadata = { "client_id": "https://client.example.com", "redirect_uri": "https://client.example.com/callback", + "response_type": "code", # Authorization flow - exchange at token endpoint "state": "xyz123", "me": "https://user.example.com", "scope": "",