Files
Gondulf/tests/unit/test_token_endpoint.py
Phil Skentelbery 115e733604 feat(phase-4a): complete Phase 3 implementation and gap analysis
Merges Phase 4a work including:

Implementation:
- Metadata discovery endpoint (/api/.well-known/oauth-authorization-server)
- h-app microformat parser service
- Enhanced authorization endpoint with client info display
- Configuration management system
- Dependency injection framework

Documentation:
- Comprehensive gap analysis for v1.0.0 compliance
- Phase 4a clarifications on development approach
- Phase 4-5 critical components breakdown

Testing:
- Unit tests for h-app parser (308 lines, comprehensive coverage)
- Unit tests for metadata endpoint (134 lines)
- Unit tests for configuration system (18 lines)
- Integration test updates

All tests passing with high coverage. Ready for Phase 4b security hardening.
2025-11-20 17:16:11 -07:00

317 lines
9.9 KiB
Python

"""
Unit tests for Token Endpoint.
Tests token exchange endpoint including validation, error handling, and security.
"""
import os
import pytest
from fastapi.testclient import TestClient
from gondulf.database.connection import Database
from gondulf.services.token_service import TokenService
from gondulf.storage import CodeStore
@pytest.fixture(scope="function")
def test_config(monkeypatch):
"""Configure test environment."""
# Set required environment variables
monkeypatch.setenv("GONDULF_SECRET_KEY", "test_secret_key_" + "x" * 32)
monkeypatch.setenv("GONDULF_BASE_URL", "https://auth.example.com")
monkeypatch.setenv("GONDULF_DATABASE_URL", "sqlite:///:memory:")
# Import after environment is set
from gondulf.config import Config
Config.load()
Config.validate()
return Config
@pytest.fixture
def test_database(tmp_path):
"""Create test database."""
db_path = tmp_path / "test.db"
db = Database(f"sqlite:///{db_path}")
db.ensure_database_directory()
db.run_migrations()
return db
@pytest.fixture
def test_code_storage():
"""Create test code storage."""
return CodeStore(ttl_seconds=600)
@pytest.fixture
def test_token_service(test_database):
"""Create test token service."""
return TokenService(
database=test_database,
token_length=32,
token_ttl=3600
)
@pytest.fixture
def client(test_config, test_database, test_code_storage, test_token_service):
"""Create test client with dependency overrides."""
# Import app after config is set
from gondulf.dependencies import get_code_storage, get_database, get_token_service
from gondulf.main import app
app.dependency_overrides[get_database] = lambda: test_database
app.dependency_overrides[get_code_storage] = lambda: test_code_storage
app.dependency_overrides[get_token_service] = lambda: test_token_service
yield TestClient(app)
app.dependency_overrides.clear()
@pytest.fixture
def valid_auth_code(test_code_storage):
"""Create a valid authorization code."""
code = "test_auth_code_12345"
metadata = {
"client_id": "https://client.example.com",
"redirect_uri": "https://client.example.com/callback",
"state": "xyz123",
"me": "https://user.example.com",
"scope": "",
"code_challenge": "abc123",
"code_challenge_method": "S256",
"created_at": 1234567890,
"expires_at": 1234568490,
"used": False
}
test_code_storage.store(f"authz:{code}", metadata)
return code, metadata
class TestTokenExchangeSuccess:
"""Tests for successful token exchange."""
def test_token_exchange_success(self, client, valid_auth_code):
"""Test successful token exchange returns access token."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert data["token_type"] == "Bearer"
assert data["me"] == metadata["me"]
assert data["scope"] == metadata["scope"]
def test_token_exchange_response_format(self, client, valid_auth_code):
"""Test token response matches OAuth 2.0 format."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
assert response.status_code == 200
data = response.json()
# Required fields per OAuth 2.0
assert "access_token" in data
assert "token_type" in data
assert "me" in data
assert isinstance(data["access_token"], str)
assert len(data["access_token"]) == 43 # base64url encoded
def test_token_exchange_cache_headers(self, client, valid_auth_code):
"""Test OAuth 2.0 cache headers are set."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
assert response.headers["Cache-Control"] == "no-store"
assert response.headers["Pragma"] == "no-cache"
def test_token_exchange_deletes_code(self, client, valid_auth_code, test_code_storage):
"""Test authorization code is deleted after exchange."""
code, metadata = valid_auth_code
client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
# Code should be deleted
assert test_code_storage.get(f"authz:{code}") is None
class TestTokenExchangeErrors:
"""Tests for error conditions."""
def test_invalid_grant_type(self, client, valid_auth_code):
"""Test unsupported grant_type returns error."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "password", # Wrong grant type
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
assert response.status_code == 400
data = response.json()
assert data["detail"]["error"] == "unsupported_grant_type"
def test_code_not_found(self, client):
"""Test invalid authorization code returns error."""
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": "invalid_code_123",
"client_id": "https://client.example.com",
"redirect_uri": "https://client.example.com/callback"
}
)
assert response.status_code == 400
data = response.json()
assert data["detail"]["error"] == "invalid_grant"
def test_client_id_mismatch(self, client, valid_auth_code):
"""Test client_id mismatch returns error."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": "https://wrong-client.example.com", # Wrong client
"redirect_uri": metadata["redirect_uri"]
}
)
assert response.status_code == 400
data = response.json()
assert data["detail"]["error"] == "invalid_client"
def test_redirect_uri_mismatch(self, client, valid_auth_code):
"""Test redirect_uri mismatch returns error."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": "https://wrong-uri.example.com/callback" # Wrong URI
}
)
assert response.status_code == 400
data = response.json()
assert data["detail"]["error"] == "invalid_grant"
def test_code_replay_prevention(self, client, valid_auth_code, test_code_storage):
"""Test authorization code cannot be used twice."""
code, metadata = valid_auth_code
# Mark code as used
metadata["used"] = True
test_code_storage.store(f"authz:{code}", metadata)
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
assert response.status_code == 400
data = response.json()
assert data["detail"]["error"] == "invalid_grant"
class TestPKCEHandling:
"""Tests for PKCE parameter handling."""
def test_code_verifier_accepted_but_not_validated(self, client, valid_auth_code):
"""Test code_verifier is accepted but not validated in v1.0.0."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"],
"code_verifier": "some_verifier_string"
}
)
# Should still succeed (PKCE not validated in v1.0.0)
assert response.status_code == 200
class TestSecurityValidation:
"""Tests for security validations."""
def test_token_generated_via_service(self, client, valid_auth_code, test_token_service):
"""Test token is generated through token service."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
assert response.status_code == 200
data = response.json()
# Validate token was actually stored
token_metadata = test_token_service.validate_token(data["access_token"])
assert token_metadata is not None
assert token_metadata["me"] == metadata["me"]