Implements W3C IndieAuth Section 6.3 token verification endpoint. The token endpoint now supports both: - POST: Issue new tokens (authorization code exchange) - GET: Verify existing tokens (resource server validation) Changes: - Added GET handler to /token endpoint - Extracts Bearer token from Authorization header (RFC 6750) - Returns JSON with me, client_id, scope - Returns 401 with WWW-Authenticate for invalid tokens - 11 new tests covering all verification scenarios All 533 tests passing. Resolves critical P0 blocker for v1.0.0. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
551 lines
18 KiB
Python
551 lines
18 KiB
Python
"""
|
|
Unit tests for Token Endpoint.
|
|
|
|
Tests token exchange endpoint including validation, error handling, and security.
|
|
"""
|
|
import os
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
|
|
from gondulf.database.connection import Database
|
|
from gondulf.services.token_service import TokenService
|
|
from gondulf.storage import CodeStore
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def test_config(monkeypatch):
|
|
"""Configure test environment."""
|
|
# Set required environment variables
|
|
monkeypatch.setenv("GONDULF_SECRET_KEY", "test_secret_key_" + "x" * 32)
|
|
monkeypatch.setenv("GONDULF_BASE_URL", "https://auth.example.com")
|
|
monkeypatch.setenv("GONDULF_DATABASE_URL", "sqlite:///:memory:")
|
|
|
|
# Import after environment is set
|
|
from gondulf.config import Config
|
|
Config.load()
|
|
Config.validate()
|
|
return Config
|
|
|
|
|
|
@pytest.fixture
|
|
def test_database(tmp_path):
|
|
"""Create test database."""
|
|
db_path = tmp_path / "test.db"
|
|
db = Database(f"sqlite:///{db_path}")
|
|
db.ensure_database_directory()
|
|
db.run_migrations()
|
|
return db
|
|
|
|
|
|
@pytest.fixture
|
|
def test_code_storage():
|
|
"""Create test code storage."""
|
|
return CodeStore(ttl_seconds=600)
|
|
|
|
|
|
@pytest.fixture
|
|
def test_token_service(test_database):
|
|
"""Create test token service."""
|
|
return TokenService(
|
|
database=test_database,
|
|
token_length=32,
|
|
token_ttl=3600
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def client(test_config, test_database, test_code_storage, test_token_service):
|
|
"""Create test client with dependency overrides."""
|
|
# Import app after config is set
|
|
from gondulf.dependencies import get_code_storage, get_database, get_token_service
|
|
from gondulf.main import app
|
|
|
|
app.dependency_overrides[get_database] = lambda: test_database
|
|
app.dependency_overrides[get_code_storage] = lambda: test_code_storage
|
|
app.dependency_overrides[get_token_service] = lambda: test_token_service
|
|
|
|
yield TestClient(app)
|
|
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
@pytest.fixture
|
|
def valid_auth_code(test_code_storage):
|
|
"""Create a valid authorization code (authorization flow)."""
|
|
code = "test_auth_code_12345"
|
|
metadata = {
|
|
"client_id": "https://client.example.com",
|
|
"redirect_uri": "https://client.example.com/callback",
|
|
"response_type": "code", # Authorization flow - exchange at token endpoint
|
|
"state": "xyz123",
|
|
"me": "https://user.example.com",
|
|
"scope": "",
|
|
"code_challenge": "abc123",
|
|
"code_challenge_method": "S256",
|
|
"created_at": 1234567890,
|
|
"expires_at": 1234568490,
|
|
"used": False
|
|
}
|
|
test_code_storage.store(f"authz:{code}", metadata)
|
|
return code, metadata
|
|
|
|
|
|
class TestTokenExchangeSuccess:
|
|
"""Tests for successful token exchange."""
|
|
|
|
def test_token_exchange_success(self, client, valid_auth_code):
|
|
"""Test successful token exchange returns access token."""
|
|
code, metadata = valid_auth_code
|
|
|
|
response = client.post(
|
|
"/token",
|
|
data={
|
|
"grant_type": "authorization_code",
|
|
"code": code,
|
|
"client_id": metadata["client_id"],
|
|
"redirect_uri": metadata["redirect_uri"]
|
|
}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "access_token" in data
|
|
assert data["token_type"] == "Bearer"
|
|
assert data["me"] == metadata["me"]
|
|
assert data["scope"] == metadata["scope"]
|
|
|
|
def test_token_exchange_response_format(self, client, valid_auth_code):
|
|
"""Test token response matches OAuth 2.0 format."""
|
|
code, metadata = valid_auth_code
|
|
|
|
response = client.post(
|
|
"/token",
|
|
data={
|
|
"grant_type": "authorization_code",
|
|
"code": code,
|
|
"client_id": metadata["client_id"],
|
|
"redirect_uri": metadata["redirect_uri"]
|
|
}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Required fields per OAuth 2.0
|
|
assert "access_token" in data
|
|
assert "token_type" in data
|
|
assert "me" in data
|
|
assert isinstance(data["access_token"], str)
|
|
assert len(data["access_token"]) == 43 # base64url encoded
|
|
|
|
def test_token_exchange_cache_headers(self, client, valid_auth_code):
|
|
"""Test OAuth 2.0 cache headers are set."""
|
|
code, metadata = valid_auth_code
|
|
|
|
response = client.post(
|
|
"/token",
|
|
data={
|
|
"grant_type": "authorization_code",
|
|
"code": code,
|
|
"client_id": metadata["client_id"],
|
|
"redirect_uri": metadata["redirect_uri"]
|
|
}
|
|
)
|
|
|
|
assert response.headers["Cache-Control"] == "no-store"
|
|
assert response.headers["Pragma"] == "no-cache"
|
|
|
|
def test_token_exchange_deletes_code(self, client, valid_auth_code, test_code_storage):
|
|
"""Test authorization code is deleted after exchange."""
|
|
code, metadata = valid_auth_code
|
|
|
|
client.post(
|
|
"/token",
|
|
data={
|
|
"grant_type": "authorization_code",
|
|
"code": code,
|
|
"client_id": metadata["client_id"],
|
|
"redirect_uri": metadata["redirect_uri"]
|
|
}
|
|
)
|
|
|
|
# Code should be deleted
|
|
assert test_code_storage.get(f"authz:{code}") is None
|
|
|
|
|
|
class TestTokenExchangeErrors:
|
|
"""Tests for error conditions."""
|
|
|
|
def test_invalid_grant_type(self, client, valid_auth_code):
|
|
"""Test unsupported grant_type returns error."""
|
|
code, metadata = valid_auth_code
|
|
|
|
response = client.post(
|
|
"/token",
|
|
data={
|
|
"grant_type": "password", # Wrong grant type
|
|
"code": code,
|
|
"client_id": metadata["client_id"],
|
|
"redirect_uri": metadata["redirect_uri"]
|
|
}
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
data = response.json()
|
|
assert data["detail"]["error"] == "unsupported_grant_type"
|
|
|
|
def test_code_not_found(self, client):
|
|
"""Test invalid authorization code returns error."""
|
|
response = client.post(
|
|
"/token",
|
|
data={
|
|
"grant_type": "authorization_code",
|
|
"code": "invalid_code_123",
|
|
"client_id": "https://client.example.com",
|
|
"redirect_uri": "https://client.example.com/callback"
|
|
}
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
data = response.json()
|
|
assert data["detail"]["error"] == "invalid_grant"
|
|
|
|
def test_client_id_mismatch(self, client, valid_auth_code):
|
|
"""Test client_id mismatch returns error."""
|
|
code, metadata = valid_auth_code
|
|
|
|
response = client.post(
|
|
"/token",
|
|
data={
|
|
"grant_type": "authorization_code",
|
|
"code": code,
|
|
"client_id": "https://wrong-client.example.com", # Wrong client
|
|
"redirect_uri": metadata["redirect_uri"]
|
|
}
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
data = response.json()
|
|
assert data["detail"]["error"] == "invalid_client"
|
|
|
|
def test_redirect_uri_mismatch(self, client, valid_auth_code):
|
|
"""Test redirect_uri mismatch returns error."""
|
|
code, metadata = valid_auth_code
|
|
|
|
response = client.post(
|
|
"/token",
|
|
data={
|
|
"grant_type": "authorization_code",
|
|
"code": code,
|
|
"client_id": metadata["client_id"],
|
|
"redirect_uri": "https://wrong-uri.example.com/callback" # Wrong URI
|
|
}
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
data = response.json()
|
|
assert data["detail"]["error"] == "invalid_grant"
|
|
|
|
def test_code_replay_prevention(self, client, valid_auth_code, test_code_storage):
|
|
"""Test authorization code cannot be used twice."""
|
|
code, metadata = valid_auth_code
|
|
|
|
# Mark code as used
|
|
metadata["used"] = True
|
|
test_code_storage.store(f"authz:{code}", metadata)
|
|
|
|
response = client.post(
|
|
"/token",
|
|
data={
|
|
"grant_type": "authorization_code",
|
|
"code": code,
|
|
"client_id": metadata["client_id"],
|
|
"redirect_uri": metadata["redirect_uri"]
|
|
}
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
data = response.json()
|
|
assert data["detail"]["error"] == "invalid_grant"
|
|
|
|
|
|
class TestPKCEHandling:
|
|
"""Tests for PKCE parameter handling."""
|
|
|
|
def test_code_verifier_accepted_but_not_validated(self, client, valid_auth_code):
|
|
"""Test code_verifier is accepted but not validated in v1.0.0."""
|
|
code, metadata = valid_auth_code
|
|
|
|
response = client.post(
|
|
"/token",
|
|
data={
|
|
"grant_type": "authorization_code",
|
|
"code": code,
|
|
"client_id": metadata["client_id"],
|
|
"redirect_uri": metadata["redirect_uri"],
|
|
"code_verifier": "some_verifier_string"
|
|
}
|
|
)
|
|
|
|
# Should still succeed (PKCE not validated in v1.0.0)
|
|
assert response.status_code == 200
|
|
|
|
|
|
class TestSecurityValidation:
|
|
"""Tests for security validations."""
|
|
|
|
def test_token_generated_via_service(self, client, valid_auth_code, test_token_service):
|
|
"""Test token is generated through token service."""
|
|
code, metadata = valid_auth_code
|
|
|
|
response = client.post(
|
|
"/token",
|
|
data={
|
|
"grant_type": "authorization_code",
|
|
"code": code,
|
|
"client_id": metadata["client_id"],
|
|
"redirect_uri": metadata["redirect_uri"]
|
|
}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Validate token was actually stored
|
|
token_metadata = test_token_service.validate_token(data["access_token"])
|
|
assert token_metadata is not None
|
|
assert token_metadata["me"] == metadata["me"]
|
|
|
|
|
|
class TestTokenVerification:
|
|
"""Tests for GET /token token verification endpoint."""
|
|
|
|
def test_verify_valid_token_success(self, client, test_token_service):
|
|
"""Test successful token verification with valid token."""
|
|
# Generate a token
|
|
token = test_token_service.generate_token(
|
|
me="https://user.example.com",
|
|
client_id="https://client.example.com",
|
|
scope=""
|
|
)
|
|
|
|
# Verify the token
|
|
response = client.get(
|
|
"/token",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["me"] == "https://user.example.com"
|
|
assert data["client_id"] == "https://client.example.com"
|
|
assert data["scope"] == ""
|
|
|
|
def test_verify_token_with_scope(self, client, test_token_service):
|
|
"""Test token verification includes scope."""
|
|
# Generate a token with scope
|
|
token = test_token_service.generate_token(
|
|
me="https://user.example.com",
|
|
client_id="https://client.example.com",
|
|
scope="create update"
|
|
)
|
|
|
|
# Verify the token
|
|
response = client.get(
|
|
"/token",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["scope"] == "create update"
|
|
|
|
def test_verify_invalid_token(self, client):
|
|
"""Test verification of invalid token returns 401."""
|
|
response = client.get(
|
|
"/token",
|
|
headers={"Authorization": "Bearer invalid_token_xyz123"}
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert data["detail"]["error"] == "invalid_token"
|
|
assert "WWW-Authenticate" in response.headers
|
|
assert response.headers["WWW-Authenticate"] == "Bearer"
|
|
|
|
def test_verify_missing_authorization_header(self, client):
|
|
"""Test verification without Authorization header returns 401."""
|
|
response = client.get("/token")
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert data["detail"]["error"] == "invalid_token"
|
|
assert "WWW-Authenticate" in response.headers
|
|
|
|
def test_verify_invalid_auth_scheme(self, client):
|
|
"""Test verification with non-Bearer auth scheme returns 401."""
|
|
response = client.get(
|
|
"/token",
|
|
headers={"Authorization": "Basic dXNlcjpwYXNz"}
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert data["detail"]["error"] == "invalid_token"
|
|
|
|
def test_verify_empty_token(self, client):
|
|
"""Test verification with empty token returns 401."""
|
|
response = client.get(
|
|
"/token",
|
|
headers={"Authorization": "Bearer "}
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert data["detail"]["error"] == "invalid_token"
|
|
|
|
def test_verify_case_insensitive_bearer(self, client, test_token_service):
|
|
"""Test Bearer scheme is case-insensitive per RFC 6750."""
|
|
# Generate a token
|
|
token = test_token_service.generate_token(
|
|
me="https://user.example.com",
|
|
client_id="https://client.example.com",
|
|
scope=""
|
|
)
|
|
|
|
# Test with lowercase "bearer"
|
|
response = client.get(
|
|
"/token",
|
|
headers={"Authorization": f"bearer {token}"}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["me"] == "https://user.example.com"
|
|
|
|
def test_verify_expired_token(self, client, test_database):
|
|
"""Test verification of expired token returns 401."""
|
|
# Create token service with very short TTL
|
|
short_ttl_service = TokenService(
|
|
database=test_database,
|
|
token_length=32,
|
|
token_ttl=0 # Expires immediately
|
|
)
|
|
|
|
# Generate token (will be expired)
|
|
token = short_ttl_service.generate_token(
|
|
me="https://user.example.com",
|
|
client_id="https://client.example.com",
|
|
scope=""
|
|
)
|
|
|
|
# Import app to override dependency temporarily
|
|
from gondulf.dependencies import get_token_service
|
|
from gondulf.main import app
|
|
|
|
# Override with short TTL service
|
|
app.dependency_overrides[get_token_service] = lambda: short_ttl_service
|
|
|
|
try:
|
|
# Verify the token (should be expired)
|
|
response = client.get(
|
|
"/token",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert data["detail"]["error"] == "invalid_token"
|
|
finally:
|
|
# Clean up override
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
class TestTokenVerificationIntegration:
|
|
"""Integration tests for full token lifecycle."""
|
|
|
|
def test_full_token_lifecycle(self, client, valid_auth_code, test_token_service):
|
|
"""Test complete flow: exchange code, verify token."""
|
|
code, metadata = valid_auth_code
|
|
|
|
# Step 1: Exchange authorization code for token
|
|
exchange_response = client.post(
|
|
"/token",
|
|
data={
|
|
"grant_type": "authorization_code",
|
|
"code": code,
|
|
"client_id": metadata["client_id"],
|
|
"redirect_uri": metadata["redirect_uri"]
|
|
}
|
|
)
|
|
|
|
assert exchange_response.status_code == 200
|
|
token_data = exchange_response.json()
|
|
access_token = token_data["access_token"]
|
|
|
|
# Step 2: Verify the token
|
|
verify_response = client.get(
|
|
"/token",
|
|
headers={"Authorization": f"Bearer {access_token}"}
|
|
)
|
|
|
|
assert verify_response.status_code == 200
|
|
verify_data = verify_response.json()
|
|
assert verify_data["me"] == metadata["me"]
|
|
assert verify_data["client_id"] == metadata["client_id"]
|
|
assert verify_data["scope"] == metadata["scope"]
|
|
|
|
def test_verify_revoked_token(self, client, test_token_service):
|
|
"""Test verification of revoked token returns 401."""
|
|
# Generate a token
|
|
token = test_token_service.generate_token(
|
|
me="https://user.example.com",
|
|
client_id="https://client.example.com",
|
|
scope=""
|
|
)
|
|
|
|
# Revoke the token
|
|
revoked = test_token_service.revoke_token(token)
|
|
assert revoked is True
|
|
|
|
# Try to verify revoked token
|
|
response = client.get(
|
|
"/token",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert data["detail"]["error"] == "invalid_token"
|
|
|
|
def test_verify_cross_client_token(self, client, test_token_service):
|
|
"""Test token verification returns correct client_id."""
|
|
# Generate tokens for two different clients
|
|
token_a = test_token_service.generate_token(
|
|
me="https://user.example.com",
|
|
client_id="https://client-a.example.com",
|
|
scope=""
|
|
)
|
|
|
|
token_b = test_token_service.generate_token(
|
|
me="https://user.example.com",
|
|
client_id="https://client-b.example.com",
|
|
scope=""
|
|
)
|
|
|
|
# Verify token A returns client A
|
|
response_a = client.get(
|
|
"/token",
|
|
headers={"Authorization": f"Bearer {token_a}"}
|
|
)
|
|
assert response_a.status_code == 200
|
|
assert response_a.json()["client_id"] == "https://client-a.example.com"
|
|
|
|
# Verify token B returns client B
|
|
response_b = client.get(
|
|
"/token",
|
|
headers={"Authorization": f"Bearer {token_b}"}
|
|
)
|
|
assert response_b.status_code == 200
|
|
assert response_b.json()["client_id"] == "https://client-b.example.com"
|