"""
Integration tests for authorization endpoint flow.
Tests the complete authorization endpoint behavior including parameter validation,
client metadata fetching, consent form rendering, and code generation.
Updated for the session-based authentication flow (ADR-010).
"""
import tempfile
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import AsyncMock, Mock, patch
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def auth_app(monkeypatch, tmp_path):
"""Create app for authorization testing."""
db_path = tmp_path / "test.db"
monkeypatch.setenv("GONDULF_SECRET_KEY", "a" * 32)
monkeypatch.setenv("GONDULF_BASE_URL", "https://auth.example.com")
monkeypatch.setenv("GONDULF_DATABASE_URL", f"sqlite:///{db_path}")
monkeypatch.setenv("GONDULF_DEBUG", "true")
from gondulf.main import app
return app
@pytest.fixture
def auth_client(auth_app):
"""Create test client for authorization tests."""
with TestClient(auth_app) as client:
yield client
@pytest.fixture
def mock_happ_fetch():
"""Mock h-app parser to avoid network calls."""
from gondulf.services.happ_parser import ClientMetadata
metadata = ClientMetadata(
name="Test Application",
url="https://app.example.com",
logo="https://app.example.com/logo.png"
)
with patch('gondulf.services.happ_parser.HAppParser.fetch_and_parse', new_callable=AsyncMock) as mock:
mock.return_value = metadata
yield mock
class TestAuthorizationEndpointValidation:
"""Tests for authorization endpoint parameter validation."""
def test_missing_client_id_returns_error(self, auth_client):
"""Test that missing client_id returns 400 error."""
response = auth_client.get("/authorize", params={
"redirect_uri": "https://app.example.com/callback",
"response_type": "code",
"state": "test123",
})
assert response.status_code == 400
assert "client_id" in response.text.lower()
def test_missing_redirect_uri_returns_error(self, auth_client):
"""Test that missing redirect_uri returns 400 error."""
response = auth_client.get("/authorize", params={
"client_id": "https://app.example.com",
"response_type": "code",
"state": "test123",
})
assert response.status_code == 400
assert "redirect_uri" in response.text.lower()
def test_http_client_id_rejected(self, auth_client):
"""Test that HTTP client_id (non-HTTPS) is rejected."""
response = auth_client.get("/authorize", params={
"client_id": "http://app.example.com", # HTTP not allowed
"redirect_uri": "https://app.example.com/callback",
"response_type": "code",
"state": "test123",
})
assert response.status_code == 400
assert "https" in response.text.lower()
def test_mismatched_redirect_uri_rejected(self, auth_client):
"""Test that redirect_uri not matching client_id domain is rejected."""
response = auth_client.get("/authorize", params={
"client_id": "https://app.example.com",
"redirect_uri": "https://evil.example.com/callback", # Different domain
"response_type": "code",
"state": "test123",
})
assert response.status_code == 400
assert "redirect_uri" in response.text.lower()
class TestAuthorizationEndpointRedirectErrors:
"""Tests for errors that redirect back to the client."""
@pytest.fixture
def valid_params(self):
"""Valid base authorization parameters."""
return {
"client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback",
"state": "test123",
}
def test_invalid_response_type_redirects_with_error(self, auth_client, valid_params, mock_happ_fetch):
"""Test invalid response_type redirects with error parameter."""
params = valid_params.copy()
params["response_type"] = "token" # Invalid - only "code" is supported
response = auth_client.get("/authorize", params=params, follow_redirects=False)
assert response.status_code == 302
location = response.headers["location"]
assert "error=unsupported_response_type" in location
assert "state=test123" in location
def test_missing_code_challenge_redirects_with_error(self, auth_client, valid_params, mock_happ_fetch):
"""Test missing PKCE code_challenge redirects with error."""
params = valid_params.copy()
params["response_type"] = "code"
params["me"] = "https://user.example.com"
# Missing code_challenge
response = auth_client.get("/authorize", params=params, follow_redirects=False)
assert response.status_code == 302
location = response.headers["location"]
assert "error=invalid_request" in location
assert "code_challenge" in location.lower()
def test_invalid_code_challenge_method_redirects_with_error(self, auth_client, valid_params, mock_happ_fetch):
"""Test invalid code_challenge_method redirects with error."""
params = valid_params.copy()
params["response_type"] = "code"
params["me"] = "https://user.example.com"
params["code_challenge"] = "abc123"
params["code_challenge_method"] = "plain" # Invalid - only S256 supported
response = auth_client.get("/authorize", params=params, follow_redirects=False)
assert response.status_code == 302
location = response.headers["location"]
assert "error=invalid_request" in location
assert "S256" in location
def test_missing_me_parameter_redirects_with_error(self, auth_client, valid_params, mock_happ_fetch):
"""Test missing me parameter redirects with error."""
params = valid_params.copy()
params["response_type"] = "code"
params["code_challenge"] = "abc123"
params["code_challenge_method"] = "S256"
# Missing me parameter
response = auth_client.get("/authorize", params=params, follow_redirects=False)
assert response.status_code == 302
location = response.headers["location"]
assert "error=invalid_request" in location
assert "me" in location.lower()
def test_invalid_me_url_redirects_with_error(self, auth_client, valid_params, mock_happ_fetch):
"""Test invalid me URL redirects with error."""
params = valid_params.copy()
params["response_type"] = "code"
params["code_challenge"] = "abc123"
params["code_challenge_method"] = "S256"
params["me"] = "not-a-valid-url"
response = auth_client.get("/authorize", params=params, follow_redirects=False)
assert response.status_code == 302
location = response.headers["location"]
assert "error=invalid_request" in location
class TestAuthorizationConsentPage:
"""Tests for the consent page rendering (after email verification)."""
@pytest.fixture
def complete_params(self):
"""Complete valid authorization parameters."""
return {
"client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback",
"response_type": "code",
"state": "test123",
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
"code_challenge_method": "S256",
"me": "https://user.example.com",
}
def test_valid_request_shows_verification_page(self, auth_app, complete_params, mock_happ_fetch):
"""Test valid authorization request shows verification page (not consent directly)."""
from gondulf.dependencies import (
get_dns_service, get_email_service, get_html_fetcher,
get_relme_parser, get_auth_session_service, get_database
)
from gondulf.database.connection import Database
from sqlalchemy import text
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
db = Database(f"sqlite:///{db_path}")
db.initialize()
# Setup DNS-verified domain
now = datetime.utcnow()
with db.get_engine().begin() as conn:
conn.execute(
text("""
INSERT OR REPLACE INTO domains
(domain, email, verification_code, verified, verified_at, last_checked, two_factor)
VALUES (:domain, '', '', 1, :now, :now, 0)
"""),
{"domain": "user.example.com", "now": now}
)
# Create mock services
mock_dns = Mock()
mock_dns.verify_txt_record.return_value = True
mock_email = Mock()
mock_email.send_verification_code = Mock()
mock_html = Mock()
mock_html.fetch.return_value = 'Email'
from gondulf.services.relme_parser import RelMeParser
mock_relme = RelMeParser()
mock_session = Mock()
mock_session.create_session.return_value = {
"session_id": "test_session_123",
"verification_code": "123456",
"expires_at": datetime.utcnow() + timedelta(minutes=10)
}
auth_app.dependency_overrides[get_database] = lambda: db
auth_app.dependency_overrides[get_dns_service] = lambda: mock_dns
auth_app.dependency_overrides[get_email_service] = lambda: mock_email
auth_app.dependency_overrides[get_html_fetcher] = lambda: mock_html
auth_app.dependency_overrides[get_relme_parser] = lambda: mock_relme
auth_app.dependency_overrides[get_auth_session_service] = lambda: mock_session
try:
with TestClient(auth_app) as client:
response = client.get("/authorize", params=complete_params)
assert response.status_code == 200
assert "text/html" in response.headers["content-type"]
# Should show verification page (email auth required every login)
assert "Verify Your Identity" in response.text
finally:
auth_app.dependency_overrides.clear()
class TestAuthorizationConsentSubmission:
"""Tests for consent form submission (via session-based flow)."""
def test_consent_submission_redirects_with_code(self, auth_app):
"""Test consent submission redirects to client with authorization code."""
from gondulf.dependencies import get_auth_session_service, get_code_storage
# Mock verified session
mock_session = Mock()
mock_session.get_session.return_value = {
"session_id": "test_session_123",
"me": "https://user.example.com",
"email": "test@example.com",
"code_verified": True, # Session is verified
"client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback",
"state": "test123",
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
"code_challenge_method": "S256",
"scope": "",
"response_type": "code"
}
mock_session.delete_session = Mock()
auth_app.dependency_overrides[get_auth_session_service] = lambda: mock_session
try:
with TestClient(auth_app) as client:
response = client.post(
"/authorize/consent",
data={"session_id": "test_session_123"},
follow_redirects=False
)
assert response.status_code == 302
location = response.headers["location"]
assert location.startswith("https://app.example.com/callback")
assert "code=" in location
assert "state=test123" in location
finally:
auth_app.dependency_overrides.clear()
def test_consent_submission_generates_unique_codes(self, auth_app):
"""Test each consent generates a unique authorization code."""
from gondulf.dependencies import get_auth_session_service
# Mock verified session
mock_session = Mock()
mock_session.get_session.return_value = {
"session_id": "test_session_123",
"me": "https://user.example.com",
"email": "test@example.com",
"code_verified": True,
"client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback",
"state": "test123",
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
"code_challenge_method": "S256",
"scope": "",
"response_type": "code"
}
mock_session.delete_session = Mock()
auth_app.dependency_overrides[get_auth_session_service] = lambda: mock_session
try:
with TestClient(auth_app) as client:
# First submission
response1 = client.post(
"/authorize/consent",
data={"session_id": "test_session_123"},
follow_redirects=False
)
location1 = response1.headers["location"]
# Second submission
response2 = client.post(
"/authorize/consent",
data={"session_id": "test_session_123"},
follow_redirects=False
)
location2 = response2.headers["location"]
# Extract codes
from tests.conftest import extract_code_from_redirect
code1 = extract_code_from_redirect(location1)
code2 = extract_code_from_redirect(location2)
assert code1 != code2
finally:
auth_app.dependency_overrides.clear()
def test_authorization_code_stored_for_exchange(self, auth_app):
"""Test authorization code is stored for later token exchange."""
from gondulf.dependencies import get_auth_session_service
# Mock verified session
mock_session = Mock()
mock_session.get_session.return_value = {
"session_id": "test_session_123",
"me": "https://user.example.com",
"email": "test@example.com",
"code_verified": True,
"client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback",
"state": "test123",
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
"code_challenge_method": "S256",
"scope": "",
"response_type": "code"
}
mock_session.delete_session = Mock()
auth_app.dependency_overrides[get_auth_session_service] = lambda: mock_session
try:
with TestClient(auth_app) as client:
response = client.post(
"/authorize/consent",
data={"session_id": "test_session_123"},
follow_redirects=False
)
from tests.conftest import extract_code_from_redirect
code = extract_code_from_redirect(response.headers["location"])
# Code should be non-empty and URL-safe
assert code is not None
assert len(code) > 20 # Should be a substantial code
finally:
auth_app.dependency_overrides.clear()
class TestAuthorizationSecurityHeaders:
"""Tests for security headers on authorization endpoints."""
def test_authorization_page_has_security_headers(self, auth_app, mock_happ_fetch):
"""Test authorization page includes security headers."""
from gondulf.dependencies import (
get_dns_service, get_email_service, get_html_fetcher,
get_relme_parser, get_auth_session_service, get_database
)
from gondulf.database.connection import Database
from sqlalchemy import text
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
db = Database(f"sqlite:///{db_path}")
db.initialize()
now = datetime.utcnow()
with db.get_engine().begin() as conn:
conn.execute(
text("""
INSERT OR REPLACE INTO domains
(domain, email, verification_code, verified, verified_at, last_checked, two_factor)
VALUES (:domain, '', '', 1, :now, :now, 0)
"""),
{"domain": "user.example.com", "now": now}
)
mock_dns = Mock()
mock_dns.verify_txt_record.return_value = True
mock_email = Mock()
mock_email.send_verification_code = Mock()
mock_html = Mock()
mock_html.fetch.return_value = 'Email'
from gondulf.services.relme_parser import RelMeParser
mock_relme = RelMeParser()
mock_session = Mock()
mock_session.create_session.return_value = {
"session_id": "test_session_123",
"verification_code": "123456",
"expires_at": datetime.utcnow() + timedelta(minutes=10)
}
auth_app.dependency_overrides[get_database] = lambda: db
auth_app.dependency_overrides[get_dns_service] = lambda: mock_dns
auth_app.dependency_overrides[get_email_service] = lambda: mock_email
auth_app.dependency_overrides[get_html_fetcher] = lambda: mock_html
auth_app.dependency_overrides[get_relme_parser] = lambda: mock_relme
auth_app.dependency_overrides[get_auth_session_service] = lambda: mock_session
try:
params = {
"client_id": "https://app.example.com",
"redirect_uri": "https://app.example.com/callback",
"response_type": "code",
"state": "test123",
"code_challenge": "abc123",
"code_challenge_method": "S256",
"me": "https://user.example.com",
}
with TestClient(auth_app) as client:
response = client.get("/authorize", params=params)
assert "X-Frame-Options" in response.headers
assert "X-Content-Type-Options" in response.headers
assert response.headers["X-Frame-Options"] == "DENY"
finally:
auth_app.dependency_overrides.clear()
def test_error_pages_have_security_headers(self, auth_client):
"""Test error pages include security headers."""
# Request without client_id should return error page
response = auth_client.get("/authorize", params={
"redirect_uri": "https://app.example.com/callback"
})
assert response.status_code == 400
assert "X-Frame-Options" in response.headers
assert "X-Content-Type-Options" in response.headers