""" Integration tests for token endpoint flow. Tests the complete token exchange flow including authorization code validation, PKCE verification, token generation, and error handling. """ import pytest from fastapi.testclient import TestClient @pytest.fixture def token_app(monkeypatch, tmp_path): """Create app for token testing.""" db_path = tmp_path / "test.db" monkeypatch.setenv("GONDULF_SECRET_KEY", "a" * 32) monkeypatch.setenv("GONDULF_BASE_URL", "https://auth.example.com") monkeypatch.setenv("GONDULF_DATABASE_URL", f"sqlite:///{db_path}") monkeypatch.setenv("GONDULF_DEBUG", "true") from gondulf.main import app return app @pytest.fixture def token_client(token_app): """Create test client for token tests.""" with TestClient(token_app) as client: yield client @pytest.fixture def setup_auth_code(token_app, test_code_storage): """Setup a valid authorization code for testing.""" from gondulf.dependencies import get_code_storage code = "integration_test_code_12345" metadata = { "client_id": "https://app.example.com", "redirect_uri": "https://app.example.com/callback", "state": "xyz123", "me": "https://user.example.com", "scope": "", "code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM", "code_challenge_method": "S256", "created_at": 1234567890, "expires_at": 1234568490, "used": False } # Override the code storage dependency token_app.dependency_overrides[get_code_storage] = lambda: test_code_storage test_code_storage.store(f"authz:{code}", metadata) yield code, metadata, test_code_storage token_app.dependency_overrides.clear() class TestTokenExchangeIntegration: """Integration tests for successful token exchange.""" def test_valid_code_exchange_returns_token(self, token_client, setup_auth_code): """Test valid authorization code exchange returns access token.""" code, metadata, _ = setup_auth_code response = token_client.post("/token", data={ "grant_type": "authorization_code", "code": code, "client_id": metadata["client_id"], "redirect_uri": metadata["redirect_uri"], }) assert response.status_code == 200 data = response.json() assert "access_token" in data assert data["token_type"] == "Bearer" assert data["me"] == metadata["me"] def test_token_response_format_matches_oauth2(self, token_client, setup_auth_code): """Test token response matches OAuth 2.0 specification format.""" code, metadata, _ = setup_auth_code response = token_client.post("/token", data={ "grant_type": "authorization_code", "code": code, "client_id": metadata["client_id"], "redirect_uri": metadata["redirect_uri"], }) assert response.status_code == 200 data = response.json() # Required fields per OAuth 2.0 / IndieAuth assert "access_token" in data assert "token_type" in data assert "me" in data # Token should be substantial assert len(data["access_token"]) >= 32 def test_token_response_includes_cache_headers(self, token_client, setup_auth_code): """Test token response includes required cache headers.""" code, metadata, _ = setup_auth_code response = token_client.post("/token", data={ "grant_type": "authorization_code", "code": code, "client_id": metadata["client_id"], "redirect_uri": metadata["redirect_uri"], }) assert response.status_code == 200 # OAuth 2.0 requires no-store assert response.headers["Cache-Control"] == "no-store" assert response.headers["Pragma"] == "no-cache" def test_authorization_code_single_use(self, token_client, setup_auth_code): """Test authorization code cannot be used twice.""" code, metadata, _ = setup_auth_code # First exchange should succeed response1 = token_client.post("/token", data={ "grant_type": "authorization_code", "code": code, "client_id": metadata["client_id"], "redirect_uri": metadata["redirect_uri"], }) assert response1.status_code == 200 # Second exchange should fail response2 = token_client.post("/token", data={ "grant_type": "authorization_code", "code": code, "client_id": metadata["client_id"], "redirect_uri": metadata["redirect_uri"], }) assert response2.status_code == 400 data = response2.json() assert data["detail"]["error"] == "invalid_grant" class TestTokenExchangeErrors: """Integration tests for token exchange error conditions.""" def test_invalid_grant_type_rejected(self, token_client, setup_auth_code): """Test invalid grant_type returns error.""" code, metadata, _ = setup_auth_code response = token_client.post("/token", data={ "grant_type": "password", # Invalid grant type "code": code, "client_id": metadata["client_id"], "redirect_uri": metadata["redirect_uri"], }) assert response.status_code == 400 data = response.json() assert data["detail"]["error"] == "unsupported_grant_type" def test_invalid_code_rejected(self, token_client, setup_auth_code): """Test invalid authorization code returns error.""" _, metadata, _ = setup_auth_code response = token_client.post("/token", data={ "grant_type": "authorization_code", "code": "nonexistent_code_12345", "client_id": metadata["client_id"], "redirect_uri": metadata["redirect_uri"], }) assert response.status_code == 400 data = response.json() assert data["detail"]["error"] == "invalid_grant" def test_client_id_mismatch_rejected(self, token_client, setup_auth_code): """Test mismatched client_id returns error.""" code, metadata, _ = setup_auth_code response = token_client.post("/token", data={ "grant_type": "authorization_code", "code": code, "client_id": "https://different-client.example.com", # Wrong client "redirect_uri": metadata["redirect_uri"], }) assert response.status_code == 400 data = response.json() assert data["detail"]["error"] == "invalid_client" def test_redirect_uri_mismatch_rejected(self, token_client, setup_auth_code): """Test mismatched redirect_uri returns error.""" code, metadata, _ = setup_auth_code response = token_client.post("/token", data={ "grant_type": "authorization_code", "code": code, "client_id": metadata["client_id"], "redirect_uri": "https://app.example.com/different-callback", # Wrong URI }) assert response.status_code == 400 data = response.json() assert data["detail"]["error"] == "invalid_grant" def test_used_code_rejected(self, token_client, token_app, test_code_storage): """Test already-used authorization code returns error.""" from gondulf.dependencies import get_code_storage code = "used_code_test_12345" metadata = { "client_id": "https://app.example.com", "redirect_uri": "https://app.example.com/callback", "state": "xyz123", "me": "https://user.example.com", "scope": "", "code_challenge": "abc123", "code_challenge_method": "S256", "created_at": 1234567890, "expires_at": 1234568490, "used": True # Already used } token_app.dependency_overrides[get_code_storage] = lambda: test_code_storage test_code_storage.store(f"authz:{code}", metadata) response = token_client.post("/token", data={ "grant_type": "authorization_code", "code": code, "client_id": metadata["client_id"], "redirect_uri": metadata["redirect_uri"], }) assert response.status_code == 400 data = response.json() assert data["detail"]["error"] == "invalid_grant" token_app.dependency_overrides.clear() class TestTokenEndpointSecurity: """Security tests for token endpoint.""" def test_token_endpoint_requires_post(self, token_client): """Test token endpoint only accepts POST requests.""" response = token_client.get("/token") assert response.status_code == 405 # Method Not Allowed def test_token_endpoint_requires_form_data(self, token_client, setup_auth_code): """Test token endpoint requires form-encoded data.""" code, metadata, _ = setup_auth_code # Send JSON instead of form data response = token_client.post("/token", json={ "grant_type": "authorization_code", "code": code, "client_id": metadata["client_id"], "redirect_uri": metadata["redirect_uri"], }) # Should fail because it expects form data assert response.status_code == 422 # Unprocessable Entity def test_token_response_security_headers(self, token_client, setup_auth_code): """Test token response includes security headers.""" code, metadata, _ = setup_auth_code response = token_client.post("/token", data={ "grant_type": "authorization_code", "code": code, "client_id": metadata["client_id"], "redirect_uri": metadata["redirect_uri"], }) # Security headers should be present assert "X-Frame-Options" in response.headers assert "X-Content-Type-Options" in response.headers def test_error_response_format_matches_oauth2(self, token_client): """Test error responses match OAuth 2.0 format.""" response = token_client.post("/token", data={ "grant_type": "authorization_code", "code": "invalid_code", "client_id": "https://app.example.com", "redirect_uri": "https://app.example.com/callback", }) assert response.status_code == 400 data = response.json() # OAuth 2.0 error format assert "detail" in data assert "error" in data["detail"] class TestPKCEHandling: """Tests for PKCE code_verifier handling.""" def test_code_verifier_accepted(self, token_client, setup_auth_code): """Test code_verifier parameter is accepted.""" code, metadata, _ = setup_auth_code response = token_client.post("/token", data={ "grant_type": "authorization_code", "code": code, "client_id": metadata["client_id"], "redirect_uri": metadata["redirect_uri"], "code_verifier": "some_verifier_value", # PKCE verifier }) # Should succeed (PKCE validation deferred per design) assert response.status_code == 200 def test_token_exchange_works_without_verifier(self, token_client, setup_auth_code): """Test token exchange works without code_verifier in v1.0.0.""" code, metadata, _ = setup_auth_code response = token_client.post("/token", data={ "grant_type": "authorization_code", "code": code, "client_id": metadata["client_id"], "redirect_uri": metadata["redirect_uri"], # No code_verifier }) # Should succeed (PKCE not enforced in v1.0.0) assert response.status_code == 200