Files
Gondulf/tests/unit/test_token_endpoint.py
Phil Skentelbery 6bb2a4033f feat(token): implement GET /token for token verification
Implements W3C IndieAuth Section 6.3 token verification endpoint.
The token endpoint now supports both:
- POST: Issue new tokens (authorization code exchange)
- GET: Verify existing tokens (resource server validation)

Changes:
- Added GET handler to /token endpoint
- Extracts Bearer token from Authorization header (RFC 6750)
- Returns JSON with me, client_id, scope
- Returns 401 with WWW-Authenticate for invalid tokens
- 11 new tests covering all verification scenarios

All 533 tests passing. Resolves critical P0 blocker for v1.0.0.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-25 08:10:47 -07:00

551 lines
18 KiB
Python

"""
Unit tests for Token Endpoint.
Tests token exchange endpoint including validation, error handling, and security.
"""
import os
import pytest
from fastapi.testclient import TestClient
from gondulf.database.connection import Database
from gondulf.services.token_service import TokenService
from gondulf.storage import CodeStore
@pytest.fixture(scope="function")
def test_config(monkeypatch):
"""Configure test environment."""
# Set required environment variables
monkeypatch.setenv("GONDULF_SECRET_KEY", "test_secret_key_" + "x" * 32)
monkeypatch.setenv("GONDULF_BASE_URL", "https://auth.example.com")
monkeypatch.setenv("GONDULF_DATABASE_URL", "sqlite:///:memory:")
# Import after environment is set
from gondulf.config import Config
Config.load()
Config.validate()
return Config
@pytest.fixture
def test_database(tmp_path):
"""Create test database."""
db_path = tmp_path / "test.db"
db = Database(f"sqlite:///{db_path}")
db.ensure_database_directory()
db.run_migrations()
return db
@pytest.fixture
def test_code_storage():
"""Create test code storage."""
return CodeStore(ttl_seconds=600)
@pytest.fixture
def test_token_service(test_database):
"""Create test token service."""
return TokenService(
database=test_database,
token_length=32,
token_ttl=3600
)
@pytest.fixture
def client(test_config, test_database, test_code_storage, test_token_service):
"""Create test client with dependency overrides."""
# Import app after config is set
from gondulf.dependencies import get_code_storage, get_database, get_token_service
from gondulf.main import app
app.dependency_overrides[get_database] = lambda: test_database
app.dependency_overrides[get_code_storage] = lambda: test_code_storage
app.dependency_overrides[get_token_service] = lambda: test_token_service
yield TestClient(app)
app.dependency_overrides.clear()
@pytest.fixture
def valid_auth_code(test_code_storage):
"""Create a valid authorization code (authorization flow)."""
code = "test_auth_code_12345"
metadata = {
"client_id": "https://client.example.com",
"redirect_uri": "https://client.example.com/callback",
"response_type": "code", # Authorization flow - exchange at token endpoint
"state": "xyz123",
"me": "https://user.example.com",
"scope": "",
"code_challenge": "abc123",
"code_challenge_method": "S256",
"created_at": 1234567890,
"expires_at": 1234568490,
"used": False
}
test_code_storage.store(f"authz:{code}", metadata)
return code, metadata
class TestTokenExchangeSuccess:
"""Tests for successful token exchange."""
def test_token_exchange_success(self, client, valid_auth_code):
"""Test successful token exchange returns access token."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert data["token_type"] == "Bearer"
assert data["me"] == metadata["me"]
assert data["scope"] == metadata["scope"]
def test_token_exchange_response_format(self, client, valid_auth_code):
"""Test token response matches OAuth 2.0 format."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
assert response.status_code == 200
data = response.json()
# Required fields per OAuth 2.0
assert "access_token" in data
assert "token_type" in data
assert "me" in data
assert isinstance(data["access_token"], str)
assert len(data["access_token"]) == 43 # base64url encoded
def test_token_exchange_cache_headers(self, client, valid_auth_code):
"""Test OAuth 2.0 cache headers are set."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
assert response.headers["Cache-Control"] == "no-store"
assert response.headers["Pragma"] == "no-cache"
def test_token_exchange_deletes_code(self, client, valid_auth_code, test_code_storage):
"""Test authorization code is deleted after exchange."""
code, metadata = valid_auth_code
client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
# Code should be deleted
assert test_code_storage.get(f"authz:{code}") is None
class TestTokenExchangeErrors:
"""Tests for error conditions."""
def test_invalid_grant_type(self, client, valid_auth_code):
"""Test unsupported grant_type returns error."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "password", # Wrong grant type
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
assert response.status_code == 400
data = response.json()
assert data["detail"]["error"] == "unsupported_grant_type"
def test_code_not_found(self, client):
"""Test invalid authorization code returns error."""
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": "invalid_code_123",
"client_id": "https://client.example.com",
"redirect_uri": "https://client.example.com/callback"
}
)
assert response.status_code == 400
data = response.json()
assert data["detail"]["error"] == "invalid_grant"
def test_client_id_mismatch(self, client, valid_auth_code):
"""Test client_id mismatch returns error."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": "https://wrong-client.example.com", # Wrong client
"redirect_uri": metadata["redirect_uri"]
}
)
assert response.status_code == 400
data = response.json()
assert data["detail"]["error"] == "invalid_client"
def test_redirect_uri_mismatch(self, client, valid_auth_code):
"""Test redirect_uri mismatch returns error."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": "https://wrong-uri.example.com/callback" # Wrong URI
}
)
assert response.status_code == 400
data = response.json()
assert data["detail"]["error"] == "invalid_grant"
def test_code_replay_prevention(self, client, valid_auth_code, test_code_storage):
"""Test authorization code cannot be used twice."""
code, metadata = valid_auth_code
# Mark code as used
metadata["used"] = True
test_code_storage.store(f"authz:{code}", metadata)
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
assert response.status_code == 400
data = response.json()
assert data["detail"]["error"] == "invalid_grant"
class TestPKCEHandling:
"""Tests for PKCE parameter handling."""
def test_code_verifier_accepted_but_not_validated(self, client, valid_auth_code):
"""Test code_verifier is accepted but not validated in v1.0.0."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"],
"code_verifier": "some_verifier_string"
}
)
# Should still succeed (PKCE not validated in v1.0.0)
assert response.status_code == 200
class TestSecurityValidation:
"""Tests for security validations."""
def test_token_generated_via_service(self, client, valid_auth_code, test_token_service):
"""Test token is generated through token service."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
assert response.status_code == 200
data = response.json()
# Validate token was actually stored
token_metadata = test_token_service.validate_token(data["access_token"])
assert token_metadata is not None
assert token_metadata["me"] == metadata["me"]
class TestTokenVerification:
"""Tests for GET /token token verification endpoint."""
def test_verify_valid_token_success(self, client, test_token_service):
"""Test successful token verification with valid token."""
# Generate a token
token = test_token_service.generate_token(
me="https://user.example.com",
client_id="https://client.example.com",
scope=""
)
# Verify the token
response = client.get(
"/token",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
assert data["me"] == "https://user.example.com"
assert data["client_id"] == "https://client.example.com"
assert data["scope"] == ""
def test_verify_token_with_scope(self, client, test_token_service):
"""Test token verification includes scope."""
# Generate a token with scope
token = test_token_service.generate_token(
me="https://user.example.com",
client_id="https://client.example.com",
scope="create update"
)
# Verify the token
response = client.get(
"/token",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
assert data["scope"] == "create update"
def test_verify_invalid_token(self, client):
"""Test verification of invalid token returns 401."""
response = client.get(
"/token",
headers={"Authorization": "Bearer invalid_token_xyz123"}
)
assert response.status_code == 401
data = response.json()
assert data["detail"]["error"] == "invalid_token"
assert "WWW-Authenticate" in response.headers
assert response.headers["WWW-Authenticate"] == "Bearer"
def test_verify_missing_authorization_header(self, client):
"""Test verification without Authorization header returns 401."""
response = client.get("/token")
assert response.status_code == 401
data = response.json()
assert data["detail"]["error"] == "invalid_token"
assert "WWW-Authenticate" in response.headers
def test_verify_invalid_auth_scheme(self, client):
"""Test verification with non-Bearer auth scheme returns 401."""
response = client.get(
"/token",
headers={"Authorization": "Basic dXNlcjpwYXNz"}
)
assert response.status_code == 401
data = response.json()
assert data["detail"]["error"] == "invalid_token"
def test_verify_empty_token(self, client):
"""Test verification with empty token returns 401."""
response = client.get(
"/token",
headers={"Authorization": "Bearer "}
)
assert response.status_code == 401
data = response.json()
assert data["detail"]["error"] == "invalid_token"
def test_verify_case_insensitive_bearer(self, client, test_token_service):
"""Test Bearer scheme is case-insensitive per RFC 6750."""
# Generate a token
token = test_token_service.generate_token(
me="https://user.example.com",
client_id="https://client.example.com",
scope=""
)
# Test with lowercase "bearer"
response = client.get(
"/token",
headers={"Authorization": f"bearer {token}"}
)
assert response.status_code == 200
data = response.json()
assert data["me"] == "https://user.example.com"
def test_verify_expired_token(self, client, test_database):
"""Test verification of expired token returns 401."""
# Create token service with very short TTL
short_ttl_service = TokenService(
database=test_database,
token_length=32,
token_ttl=0 # Expires immediately
)
# Generate token (will be expired)
token = short_ttl_service.generate_token(
me="https://user.example.com",
client_id="https://client.example.com",
scope=""
)
# Import app to override dependency temporarily
from gondulf.dependencies import get_token_service
from gondulf.main import app
# Override with short TTL service
app.dependency_overrides[get_token_service] = lambda: short_ttl_service
try:
# Verify the token (should be expired)
response = client.get(
"/token",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 401
data = response.json()
assert data["detail"]["error"] == "invalid_token"
finally:
# Clean up override
app.dependency_overrides.clear()
class TestTokenVerificationIntegration:
"""Integration tests for full token lifecycle."""
def test_full_token_lifecycle(self, client, valid_auth_code, test_token_service):
"""Test complete flow: exchange code, verify token."""
code, metadata = valid_auth_code
# Step 1: Exchange authorization code for token
exchange_response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
assert exchange_response.status_code == 200
token_data = exchange_response.json()
access_token = token_data["access_token"]
# Step 2: Verify the token
verify_response = client.get(
"/token",
headers={"Authorization": f"Bearer {access_token}"}
)
assert verify_response.status_code == 200
verify_data = verify_response.json()
assert verify_data["me"] == metadata["me"]
assert verify_data["client_id"] == metadata["client_id"]
assert verify_data["scope"] == metadata["scope"]
def test_verify_revoked_token(self, client, test_token_service):
"""Test verification of revoked token returns 401."""
# Generate a token
token = test_token_service.generate_token(
me="https://user.example.com",
client_id="https://client.example.com",
scope=""
)
# Revoke the token
revoked = test_token_service.revoke_token(token)
assert revoked is True
# Try to verify revoked token
response = client.get(
"/token",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 401
data = response.json()
assert data["detail"]["error"] == "invalid_token"
def test_verify_cross_client_token(self, client, test_token_service):
"""Test token verification returns correct client_id."""
# Generate tokens for two different clients
token_a = test_token_service.generate_token(
me="https://user.example.com",
client_id="https://client-a.example.com",
scope=""
)
token_b = test_token_service.generate_token(
me="https://user.example.com",
client_id="https://client-b.example.com",
scope=""
)
# Verify token A returns client A
response_a = client.get(
"/token",
headers={"Authorization": f"Bearer {token_a}"}
)
assert response_a.status_code == 200
assert response_a.json()["client_id"] == "https://client-a.example.com"
# Verify token B returns client B
response_b = client.get(
"/token",
headers={"Authorization": f"Bearer {token_b}"}
)
assert response_b.status_code == 200
assert response_b.json()["client_id"] == "https://client-b.example.com"