Files
Gondulf/tests/integration/api/test_token_flow.py
Phil Skentelbery 052d3ad3e1 feat(auth): implement response_type=id authentication flow
Implements both IndieAuth flows per W3C specification:
- Authentication flow (response_type=id): Code redeemed at authorization endpoint, returns only user identity
- Authorization flow (response_type=code): Code redeemed at token endpoint, returns access token

Changes:
- Authorization endpoint GET: Accept response_type=id (default) and code
- Authorization endpoint POST: Handle code verification for authentication flow
- Token endpoint: Validate response_type=code for authorization flow
- Store response_type in authorization code metadata
- Update metadata endpoint: response_types_supported=[code, id], code_challenge_methods_supported=[S256]

The default behavior now correctly defaults to response_type=id when omitted, per IndieAuth spec section 5.2.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 12:23:20 -07:00

331 lines
12 KiB
Python

"""
Integration tests for token endpoint flow.
Tests the complete token exchange flow including authorization code validation,
PKCE verification, token generation, and error handling.
"""
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def token_app(monkeypatch, tmp_path):
"""Create app for token testing."""
db_path = tmp_path / "test.db"
monkeypatch.setenv("GONDULF_SECRET_KEY", "a" * 32)
monkeypatch.setenv("GONDULF_BASE_URL", "https://auth.example.com")
monkeypatch.setenv("GONDULF_DATABASE_URL", f"sqlite:///{db_path}")
monkeypatch.setenv("GONDULF_DEBUG", "true")
from gondulf.main import app
return app
@pytest.fixture
def token_client(token_app):
"""Create test client for token tests."""
with TestClient(token_app) as client:
yield client
@pytest.fixture
def setup_auth_code(token_app, test_code_storage):
"""Setup a valid authorization code for testing (authorization flow)."""
from gondulf.dependencies import get_code_storage
code = "integration_test_code_12345"
metadata = {
"client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback",
"response_type": "code", # Authorization flow - exchange at token endpoint
"state": "xyz123",
"me": "https://user.example.com",
"scope": "",
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
"code_challenge_method": "S256",
"created_at": 1234567890,
"expires_at": 1234568490,
"used": False
}
# Override the code storage dependency
token_app.dependency_overrides[get_code_storage] = lambda: test_code_storage
test_code_storage.store(f"authz:{code}", metadata)
yield code, metadata, test_code_storage
token_app.dependency_overrides.clear()
class TestTokenExchangeIntegration:
"""Integration tests for successful token exchange."""
def test_valid_code_exchange_returns_token(self, token_client, setup_auth_code):
"""Test valid authorization code exchange returns access token."""
code, metadata, _ = setup_auth_code
response = token_client.post("/token", data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"],
})
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert data["token_type"] == "Bearer"
assert data["me"] == metadata["me"]
def test_token_response_format_matches_oauth2(self, token_client, setup_auth_code):
"""Test token response matches OAuth 2.0 specification format."""
code, metadata, _ = setup_auth_code
response = token_client.post("/token", data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"],
})
assert response.status_code == 200
data = response.json()
# Required fields per OAuth 2.0 / IndieAuth
assert "access_token" in data
assert "token_type" in data
assert "me" in data
# Token should be substantial
assert len(data["access_token"]) >= 32
def test_token_response_includes_cache_headers(self, token_client, setup_auth_code):
"""Test token response includes required cache headers."""
code, metadata, _ = setup_auth_code
response = token_client.post("/token", data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"],
})
assert response.status_code == 200
# OAuth 2.0 requires no-store
assert response.headers["Cache-Control"] == "no-store"
assert response.headers["Pragma"] == "no-cache"
def test_authorization_code_single_use(self, token_client, setup_auth_code):
"""Test authorization code cannot be used twice."""
code, metadata, _ = setup_auth_code
# First exchange should succeed
response1 = token_client.post("/token", data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"],
})
assert response1.status_code == 200
# Second exchange should fail
response2 = token_client.post("/token", data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"],
})
assert response2.status_code == 400
data = response2.json()
assert data["detail"]["error"] == "invalid_grant"
class TestTokenExchangeErrors:
"""Integration tests for token exchange error conditions."""
def test_invalid_grant_type_rejected(self, token_client, setup_auth_code):
"""Test invalid grant_type returns error."""
code, metadata, _ = setup_auth_code
response = token_client.post("/token", data={
"grant_type": "password", # Invalid grant type
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"],
})
assert response.status_code == 400
data = response.json()
assert data["detail"]["error"] == "unsupported_grant_type"
def test_invalid_code_rejected(self, token_client, setup_auth_code):
"""Test invalid authorization code returns error."""
_, metadata, _ = setup_auth_code
response = token_client.post("/token", data={
"grant_type": "authorization_code",
"code": "nonexistent_code_12345",
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"],
})
assert response.status_code == 400
data = response.json()
assert data["detail"]["error"] == "invalid_grant"
def test_client_id_mismatch_rejected(self, token_client, setup_auth_code):
"""Test mismatched client_id returns error."""
code, metadata, _ = setup_auth_code
response = token_client.post("/token", data={
"grant_type": "authorization_code",
"code": code,
"client_id": "https://different-client.example.com", # Wrong client
"redirect_uri": metadata["redirect_uri"],
})
assert response.status_code == 400
data = response.json()
assert data["detail"]["error"] == "invalid_client"
def test_redirect_uri_mismatch_rejected(self, token_client, setup_auth_code):
"""Test mismatched redirect_uri returns error."""
code, metadata, _ = setup_auth_code
response = token_client.post("/token", data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": "https://app.example.com/different-callback", # Wrong URI
})
assert response.status_code == 400
data = response.json()
assert data["detail"]["error"] == "invalid_grant"
def test_used_code_rejected(self, token_client, token_app, test_code_storage):
"""Test already-used authorization code returns error."""
from gondulf.dependencies import get_code_storage
code = "used_code_test_12345"
metadata = {
"client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback",
"response_type": "code", # Authorization flow
"state": "xyz123",
"me": "https://user.example.com",
"scope": "",
"code_challenge": "abc123",
"code_challenge_method": "S256",
"created_at": 1234567890,
"expires_at": 1234568490,
"used": True # Already used
}
token_app.dependency_overrides[get_code_storage] = lambda: test_code_storage
test_code_storage.store(f"authz:{code}", metadata)
response = token_client.post("/token", data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"],
})
assert response.status_code == 400
data = response.json()
assert data["detail"]["error"] == "invalid_grant"
token_app.dependency_overrides.clear()
class TestTokenEndpointSecurity:
"""Security tests for token endpoint."""
def test_token_endpoint_requires_post(self, token_client):
"""Test token endpoint only accepts POST requests."""
response = token_client.get("/token")
assert response.status_code == 405 # Method Not Allowed
def test_token_endpoint_requires_form_data(self, token_client, setup_auth_code):
"""Test token endpoint requires form-encoded data."""
code, metadata, _ = setup_auth_code
# Send JSON instead of form data
response = token_client.post("/token", json={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"],
})
# Should fail because it expects form data
assert response.status_code == 422 # Unprocessable Entity
def test_token_response_security_headers(self, token_client, setup_auth_code):
"""Test token response includes security headers."""
code, metadata, _ = setup_auth_code
response = token_client.post("/token", data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"],
})
# Security headers should be present
assert "X-Frame-Options" in response.headers
assert "X-Content-Type-Options" in response.headers
def test_error_response_format_matches_oauth2(self, token_client):
"""Test error responses match OAuth 2.0 format."""
response = token_client.post("/token", data={
"grant_type": "authorization_code",
"code": "invalid_code",
"client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback",
})
assert response.status_code == 400
data = response.json()
# OAuth 2.0 error format
assert "detail" in data
assert "error" in data["detail"]
class TestPKCEHandling:
"""Tests for PKCE code_verifier handling."""
def test_code_verifier_accepted(self, token_client, setup_auth_code):
"""Test code_verifier parameter is accepted."""
code, metadata, _ = setup_auth_code
response = token_client.post("/token", data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"],
"code_verifier": "some_verifier_value", # PKCE verifier
})
# Should succeed (PKCE validation deferred per design)
assert response.status_code == 200
def test_token_exchange_works_without_verifier(self, token_client, setup_auth_code):
"""Test token exchange works without code_verifier in v1.0.0."""
code, metadata, _ = setup_auth_code
response = token_client.post("/token", data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"],
# No code_verifier
})
# Should succeed (PKCE not enforced in v1.0.0)
assert response.status_code == 200