""" Unit tests for Token Endpoint. Tests token exchange endpoint including validation, error handling, and security. """ import os import pytest from fastapi.testclient import TestClient from gondulf.database.connection import Database from gondulf.services.token_service import TokenService from gondulf.storage import CodeStore @pytest.fixture(scope="function") def test_config(monkeypatch): """Configure test environment.""" # Set required environment variables monkeypatch.setenv("GONDULF_SECRET_KEY", "test_secret_key_" + "x" * 32) monkeypatch.setenv("GONDULF_BASE_URL", "https://auth.example.com") monkeypatch.setenv("GONDULF_DATABASE_URL", "sqlite:///:memory:") # Import after environment is set from gondulf.config import Config Config.load() Config.validate() return Config @pytest.fixture def test_database(tmp_path): """Create test database.""" db_path = tmp_path / "test.db" db = Database(f"sqlite:///{db_path}") db.ensure_database_directory() db.run_migrations() return db @pytest.fixture def test_code_storage(): """Create test code storage.""" return CodeStore(ttl_seconds=600) @pytest.fixture def test_token_service(test_database): """Create test token service.""" return TokenService( database=test_database, token_length=32, token_ttl=3600 ) @pytest.fixture def client(test_config, test_database, test_code_storage, test_token_service): """Create test client with dependency overrides.""" # Import app after config is set from gondulf.dependencies import get_code_storage, get_database, get_token_service from gondulf.main import app app.dependency_overrides[get_database] = lambda: test_database app.dependency_overrides[get_code_storage] = lambda: test_code_storage app.dependency_overrides[get_token_service] = lambda: test_token_service yield TestClient(app) app.dependency_overrides.clear() @pytest.fixture def valid_auth_code(test_code_storage): """Create a valid authorization code (authorization flow).""" code = "test_auth_code_12345" metadata = { "client_id": "https://client.example.com", "redirect_uri": "https://client.example.com/callback", "response_type": "code", # Authorization flow - exchange at token endpoint "state": "xyz123", "me": "https://user.example.com", "scope": "", "code_challenge": "abc123", "code_challenge_method": "S256", "created_at": 1234567890, "expires_at": 1234568490, "used": False } test_code_storage.store(f"authz:{code}", metadata) return code, metadata class TestTokenExchangeSuccess: """Tests for successful token exchange.""" def test_token_exchange_success(self, client, valid_auth_code): """Test successful token exchange returns access token.""" code, metadata = valid_auth_code response = client.post( "/token", data={ "grant_type": "authorization_code", "code": code, "client_id": metadata["client_id"], "redirect_uri": metadata["redirect_uri"] } ) assert response.status_code == 200 data = response.json() assert "access_token" in data assert data["token_type"] == "Bearer" assert data["me"] == metadata["me"] assert data["scope"] == metadata["scope"] def test_token_exchange_response_format(self, client, valid_auth_code): """Test token response matches OAuth 2.0 format.""" code, metadata = valid_auth_code response = client.post( "/token", data={ "grant_type": "authorization_code", "code": code, "client_id": metadata["client_id"], "redirect_uri": metadata["redirect_uri"] } ) assert response.status_code == 200 data = response.json() # Required fields per OAuth 2.0 assert "access_token" in data assert "token_type" in data assert "me" in data assert isinstance(data["access_token"], str) assert len(data["access_token"]) == 43 # base64url encoded def test_token_exchange_cache_headers(self, client, valid_auth_code): """Test OAuth 2.0 cache headers are set.""" code, metadata = valid_auth_code response = client.post( "/token", data={ "grant_type": "authorization_code", "code": code, "client_id": metadata["client_id"], "redirect_uri": metadata["redirect_uri"] } ) assert response.headers["Cache-Control"] == "no-store" assert response.headers["Pragma"] == "no-cache" def test_token_exchange_deletes_code(self, client, valid_auth_code, test_code_storage): """Test authorization code is deleted after exchange.""" code, metadata = valid_auth_code client.post( "/token", data={ "grant_type": "authorization_code", "code": code, "client_id": metadata["client_id"], "redirect_uri": metadata["redirect_uri"] } ) # Code should be deleted assert test_code_storage.get(f"authz:{code}") is None class TestTokenExchangeErrors: """Tests for error conditions.""" def test_invalid_grant_type(self, client, valid_auth_code): """Test unsupported grant_type returns error.""" code, metadata = valid_auth_code response = client.post( "/token", data={ "grant_type": "password", # Wrong grant type "code": code, "client_id": metadata["client_id"], "redirect_uri": metadata["redirect_uri"] } ) assert response.status_code == 400 data = response.json() assert data["detail"]["error"] == "unsupported_grant_type" def test_code_not_found(self, client): """Test invalid authorization code returns error.""" response = client.post( "/token", data={ "grant_type": "authorization_code", "code": "invalid_code_123", "client_id": "https://client.example.com", "redirect_uri": "https://client.example.com/callback" } ) assert response.status_code == 400 data = response.json() assert data["detail"]["error"] == "invalid_grant" def test_client_id_mismatch(self, client, valid_auth_code): """Test client_id mismatch returns error.""" code, metadata = valid_auth_code response = client.post( "/token", data={ "grant_type": "authorization_code", "code": code, "client_id": "https://wrong-client.example.com", # Wrong client "redirect_uri": metadata["redirect_uri"] } ) assert response.status_code == 400 data = response.json() assert data["detail"]["error"] == "invalid_client" def test_redirect_uri_mismatch(self, client, valid_auth_code): """Test redirect_uri mismatch returns error.""" code, metadata = valid_auth_code response = client.post( "/token", data={ "grant_type": "authorization_code", "code": code, "client_id": metadata["client_id"], "redirect_uri": "https://wrong-uri.example.com/callback" # Wrong URI } ) assert response.status_code == 400 data = response.json() assert data["detail"]["error"] == "invalid_grant" def test_code_replay_prevention(self, client, valid_auth_code, test_code_storage): """Test authorization code cannot be used twice.""" code, metadata = valid_auth_code # Mark code as used metadata["used"] = True test_code_storage.store(f"authz:{code}", metadata) response = client.post( "/token", data={ "grant_type": "authorization_code", "code": code, "client_id": metadata["client_id"], "redirect_uri": metadata["redirect_uri"] } ) assert response.status_code == 400 data = response.json() assert data["detail"]["error"] == "invalid_grant" class TestPKCEHandling: """Tests for PKCE parameter handling.""" def test_code_verifier_accepted_but_not_validated(self, client, valid_auth_code): """Test code_verifier is accepted but not validated in v1.0.0.""" code, metadata = valid_auth_code response = client.post( "/token", data={ "grant_type": "authorization_code", "code": code, "client_id": metadata["client_id"], "redirect_uri": metadata["redirect_uri"], "code_verifier": "some_verifier_string" } ) # Should still succeed (PKCE not validated in v1.0.0) assert response.status_code == 200 class TestSecurityValidation: """Tests for security validations.""" def test_token_generated_via_service(self, client, valid_auth_code, test_token_service): """Test token is generated through token service.""" code, metadata = valid_auth_code response = client.post( "/token", data={ "grant_type": "authorization_code", "code": code, "client_id": metadata["client_id"], "redirect_uri": metadata["redirect_uri"] } ) assert response.status_code == 200 data = response.json() # Validate token was actually stored token_metadata = test_token_service.validate_token(data["access_token"]) assert token_metadata is not None assert token_metadata["me"] == metadata["me"]