Files
Gondulf/tests/unit/test_token_endpoint.py
Phil Skentelbery 05b4ff7a6b feat(phase-3): implement token endpoint and OAuth 2.0 flow
Phase 3 Implementation:
- Token service with secure token generation and validation
- Token endpoint (POST /token) with OAuth 2.0 compliance
- Database migration 003 for tokens table
- Authorization code validation and single-use enforcement

Phase 1 Updates:
- Enhanced CodeStore to support dict values with JSON serialization
- Maintains backward compatibility

Phase 2 Updates:
- Authorization codes now include PKCE fields, used flag, timestamps
- Complete metadata structure for token exchange

Security:
- 256-bit cryptographically secure tokens (secrets.token_urlsafe)
- SHA-256 hashed storage (no plaintext)
- Constant-time comparison for validation
- Single-use code enforcement with replay detection

Testing:
- 226 tests passing (100%)
- 87.27% coverage (exceeds 80% requirement)
- OAuth 2.0 compliance verified

This completes the v1.0.0 MVP with full IndieAuth authorization code flow.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-20 14:24:06 -07:00

316 lines
9.8 KiB
Python

"""
Unit tests for Token Endpoint.
Tests token exchange endpoint including validation, error handling, and security.
"""
import os
import pytest
from fastapi.testclient import TestClient
from gondulf.database.connection import Database
from gondulf.services.token_service import TokenService
from gondulf.storage import CodeStore
@pytest.fixture(scope="function")
def test_config(monkeypatch):
"""Configure test environment."""
# Set required environment variables
monkeypatch.setenv("GONDULF_SECRET_KEY", "test_secret_key_" + "x" * 32)
monkeypatch.setenv("GONDULF_DATABASE_URL", "sqlite:///:memory:")
# Import after environment is set
from gondulf.config import Config
Config.load()
Config.validate()
return Config
@pytest.fixture
def test_database(tmp_path):
"""Create test database."""
db_path = tmp_path / "test.db"
db = Database(f"sqlite:///{db_path}")
db.ensure_database_directory()
db.run_migrations()
return db
@pytest.fixture
def test_code_storage():
"""Create test code storage."""
return CodeStore(ttl_seconds=600)
@pytest.fixture
def test_token_service(test_database):
"""Create test token service."""
return TokenService(
database=test_database,
token_length=32,
token_ttl=3600
)
@pytest.fixture
def client(test_config, test_database, test_code_storage, test_token_service):
"""Create test client with dependency overrides."""
# Import app after config is set
from gondulf.dependencies import get_code_storage, get_database, get_token_service
from gondulf.main import app
app.dependency_overrides[get_database] = lambda: test_database
app.dependency_overrides[get_code_storage] = lambda: test_code_storage
app.dependency_overrides[get_token_service] = lambda: test_token_service
yield TestClient(app)
app.dependency_overrides.clear()
@pytest.fixture
def valid_auth_code(test_code_storage):
"""Create a valid authorization code."""
code = "test_auth_code_12345"
metadata = {
"client_id": "https://client.example.com",
"redirect_uri": "https://client.example.com/callback",
"state": "xyz123",
"me": "https://user.example.com",
"scope": "",
"code_challenge": "abc123",
"code_challenge_method": "S256",
"created_at": 1234567890,
"expires_at": 1234568490,
"used": False
}
test_code_storage.store(f"authz:{code}", metadata)
return code, metadata
class TestTokenExchangeSuccess:
"""Tests for successful token exchange."""
def test_token_exchange_success(self, client, valid_auth_code):
"""Test successful token exchange returns access token."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert data["token_type"] == "Bearer"
assert data["me"] == metadata["me"]
assert data["scope"] == metadata["scope"]
def test_token_exchange_response_format(self, client, valid_auth_code):
"""Test token response matches OAuth 2.0 format."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
assert response.status_code == 200
data = response.json()
# Required fields per OAuth 2.0
assert "access_token" in data
assert "token_type" in data
assert "me" in data
assert isinstance(data["access_token"], str)
assert len(data["access_token"]) == 43 # base64url encoded
def test_token_exchange_cache_headers(self, client, valid_auth_code):
"""Test OAuth 2.0 cache headers are set."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
assert response.headers["Cache-Control"] == "no-store"
assert response.headers["Pragma"] == "no-cache"
def test_token_exchange_deletes_code(self, client, valid_auth_code, test_code_storage):
"""Test authorization code is deleted after exchange."""
code, metadata = valid_auth_code
client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
# Code should be deleted
assert test_code_storage.get(f"authz:{code}") is None
class TestTokenExchangeErrors:
"""Tests for error conditions."""
def test_invalid_grant_type(self, client, valid_auth_code):
"""Test unsupported grant_type returns error."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "password", # Wrong grant type
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
assert response.status_code == 400
data = response.json()
assert data["detail"]["error"] == "unsupported_grant_type"
def test_code_not_found(self, client):
"""Test invalid authorization code returns error."""
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": "invalid_code_123",
"client_id": "https://client.example.com",
"redirect_uri": "https://client.example.com/callback"
}
)
assert response.status_code == 400
data = response.json()
assert data["detail"]["error"] == "invalid_grant"
def test_client_id_mismatch(self, client, valid_auth_code):
"""Test client_id mismatch returns error."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": "https://wrong-client.example.com", # Wrong client
"redirect_uri": metadata["redirect_uri"]
}
)
assert response.status_code == 400
data = response.json()
assert data["detail"]["error"] == "invalid_client"
def test_redirect_uri_mismatch(self, client, valid_auth_code):
"""Test redirect_uri mismatch returns error."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": "https://wrong-uri.example.com/callback" # Wrong URI
}
)
assert response.status_code == 400
data = response.json()
assert data["detail"]["error"] == "invalid_grant"
def test_code_replay_prevention(self, client, valid_auth_code, test_code_storage):
"""Test authorization code cannot be used twice."""
code, metadata = valid_auth_code
# Mark code as used
metadata["used"] = True
test_code_storage.store(f"authz:{code}", metadata)
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
assert response.status_code == 400
data = response.json()
assert data["detail"]["error"] == "invalid_grant"
class TestPKCEHandling:
"""Tests for PKCE parameter handling."""
def test_code_verifier_accepted_but_not_validated(self, client, valid_auth_code):
"""Test code_verifier is accepted but not validated in v1.0.0."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"],
"code_verifier": "some_verifier_string"
}
)
# Should still succeed (PKCE not validated in v1.0.0)
assert response.status_code == 200
class TestSecurityValidation:
"""Tests for security validations."""
def test_token_generated_via_service(self, client, valid_auth_code, test_token_service):
"""Test token is generated through token service."""
code, metadata = valid_auth_code
response = client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": metadata["client_id"],
"redirect_uri": metadata["redirect_uri"]
}
)
assert response.status_code == 200
data = response.json()
# Validate token was actually stored
token_metadata = test_token_service.validate_token(data["access_token"])
assert token_metadata is not None
assert token_metadata["me"] == metadata["me"]