fix(auth): make PKCE optional per ADR-003
PKCE was incorrectly required in the /authorize endpoint, contradicting ADR-003 which defers PKCE to v1.1.0. Changes: - PKCE parameters are now optional in /authorize - If code_challenge provided, validates method is S256 - Defaults to S256 if method not specified - Logs when clients don't use PKCE for monitoring - Updated tests for optional PKCE behavior This fixes authentication for clients that don't implement PKCE. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -127,20 +127,6 @@ class TestAuthorizationEndpointRedirectErrors:
|
||||
assert "error=unsupported_response_type" in location
|
||||
assert "state=test123" in location
|
||||
|
||||
def test_missing_code_challenge_redirects_with_error(self, auth_client, valid_params, mock_happ_fetch):
|
||||
"""Test missing PKCE code_challenge redirects with error."""
|
||||
params = valid_params.copy()
|
||||
params["response_type"] = "code"
|
||||
params["me"] = "https://user.example.com"
|
||||
# Missing code_challenge
|
||||
|
||||
response = auth_client.get("/authorize", params=params, follow_redirects=False)
|
||||
|
||||
assert response.status_code == 302
|
||||
location = response.headers["location"]
|
||||
assert "error=invalid_request" in location
|
||||
assert "code_challenge" in location.lower()
|
||||
|
||||
def test_invalid_code_challenge_method_redirects_with_error(self, auth_client, valid_params, mock_happ_fetch):
|
||||
"""Test invalid code_challenge_method redirects with error."""
|
||||
params = valid_params.copy()
|
||||
@@ -401,6 +387,262 @@ class TestAuthorizationConsentSubmission:
|
||||
auth_app.dependency_overrides.clear()
|
||||
|
||||
|
||||
class TestAuthorizationPKCEOptional:
|
||||
"""Tests for optional PKCE behavior (ADR-003: PKCE deferred to v1.1.0)."""
|
||||
|
||||
@pytest.fixture
|
||||
def valid_params_without_pkce(self):
|
||||
"""Valid authorization parameters WITHOUT PKCE."""
|
||||
return {
|
||||
"client_id": "https://app.example.com",
|
||||
"redirect_uri": "https://app.example.com/callback",
|
||||
"response_type": "code",
|
||||
"state": "test123",
|
||||
"me": "https://user.example.com",
|
||||
}
|
||||
|
||||
@pytest.fixture
|
||||
def valid_params_with_pkce(self):
|
||||
"""Valid authorization parameters WITH PKCE."""
|
||||
return {
|
||||
"client_id": "https://app.example.com",
|
||||
"redirect_uri": "https://app.example.com/callback",
|
||||
"response_type": "code",
|
||||
"state": "test123",
|
||||
"me": "https://user.example.com",
|
||||
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
|
||||
"code_challenge_method": "S256",
|
||||
}
|
||||
|
||||
def test_authorization_without_pkce_succeeds(self, auth_app, valid_params_without_pkce, mock_happ_fetch):
|
||||
"""Test authorization request without PKCE succeeds (PKCE is optional)."""
|
||||
from gondulf.dependencies import (
|
||||
get_dns_service, get_email_service, get_html_fetcher,
|
||||
get_relme_parser, get_auth_session_service, get_database
|
||||
)
|
||||
from gondulf.database.connection import Database
|
||||
from sqlalchemy import text
|
||||
import tempfile
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = Path(tmpdir) / "test.db"
|
||||
db = Database(f"sqlite:///{db_path}")
|
||||
db.initialize()
|
||||
|
||||
now = datetime.utcnow()
|
||||
with db.get_engine().begin() as conn:
|
||||
conn.execute(
|
||||
text("""
|
||||
INSERT OR REPLACE INTO domains
|
||||
(domain, email, verification_code, verified, verified_at, last_checked, two_factor)
|
||||
VALUES (:domain, '', '', 1, :now, :now, 0)
|
||||
"""),
|
||||
{"domain": "user.example.com", "now": now}
|
||||
)
|
||||
|
||||
mock_dns = Mock()
|
||||
mock_dns.verify_txt_record.return_value = True
|
||||
|
||||
mock_email = Mock()
|
||||
mock_email.send_verification_code = Mock()
|
||||
|
||||
mock_html = Mock()
|
||||
mock_html.fetch.return_value = '<html><a href="mailto:test@example.com" rel="me">Email</a></html>'
|
||||
|
||||
from gondulf.services.relme_parser import RelMeParser
|
||||
mock_relme = RelMeParser()
|
||||
|
||||
mock_session = Mock()
|
||||
mock_session.create_session.return_value = {
|
||||
"session_id": "test_session_123",
|
||||
"verification_code": "123456",
|
||||
"expires_at": datetime.utcnow() + timedelta(minutes=10)
|
||||
}
|
||||
|
||||
auth_app.dependency_overrides[get_database] = lambda: db
|
||||
auth_app.dependency_overrides[get_dns_service] = lambda: mock_dns
|
||||
auth_app.dependency_overrides[get_email_service] = lambda: mock_email
|
||||
auth_app.dependency_overrides[get_html_fetcher] = lambda: mock_html
|
||||
auth_app.dependency_overrides[get_relme_parser] = lambda: mock_relme
|
||||
auth_app.dependency_overrides[get_auth_session_service] = lambda: mock_session
|
||||
|
||||
try:
|
||||
with TestClient(auth_app) as client:
|
||||
response = client.get("/authorize", params=valid_params_without_pkce)
|
||||
|
||||
# Should succeed and show verification page
|
||||
assert response.status_code == 200
|
||||
assert "Verify Your Identity" in response.text
|
||||
# Session should be created with None for code_challenge
|
||||
mock_session.create_session.assert_called_once()
|
||||
call_kwargs = mock_session.create_session.call_args[1]
|
||||
assert call_kwargs["code_challenge"] is None
|
||||
assert call_kwargs["code_challenge_method"] is None
|
||||
finally:
|
||||
auth_app.dependency_overrides.clear()
|
||||
|
||||
def test_authorization_with_pkce_succeeds(self, auth_app, valid_params_with_pkce, mock_happ_fetch):
|
||||
"""Test authorization request with PKCE succeeds."""
|
||||
from gondulf.dependencies import (
|
||||
get_dns_service, get_email_service, get_html_fetcher,
|
||||
get_relme_parser, get_auth_session_service, get_database
|
||||
)
|
||||
from gondulf.database.connection import Database
|
||||
from sqlalchemy import text
|
||||
import tempfile
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = Path(tmpdir) / "test.db"
|
||||
db = Database(f"sqlite:///{db_path}")
|
||||
db.initialize()
|
||||
|
||||
now = datetime.utcnow()
|
||||
with db.get_engine().begin() as conn:
|
||||
conn.execute(
|
||||
text("""
|
||||
INSERT OR REPLACE INTO domains
|
||||
(domain, email, verification_code, verified, verified_at, last_checked, two_factor)
|
||||
VALUES (:domain, '', '', 1, :now, :now, 0)
|
||||
"""),
|
||||
{"domain": "user.example.com", "now": now}
|
||||
)
|
||||
|
||||
mock_dns = Mock()
|
||||
mock_dns.verify_txt_record.return_value = True
|
||||
|
||||
mock_email = Mock()
|
||||
mock_email.send_verification_code = Mock()
|
||||
|
||||
mock_html = Mock()
|
||||
mock_html.fetch.return_value = '<html><a href="mailto:test@example.com" rel="me">Email</a></html>'
|
||||
|
||||
from gondulf.services.relme_parser import RelMeParser
|
||||
mock_relme = RelMeParser()
|
||||
|
||||
mock_session = Mock()
|
||||
mock_session.create_session.return_value = {
|
||||
"session_id": "test_session_123",
|
||||
"verification_code": "123456",
|
||||
"expires_at": datetime.utcnow() + timedelta(minutes=10)
|
||||
}
|
||||
|
||||
auth_app.dependency_overrides[get_database] = lambda: db
|
||||
auth_app.dependency_overrides[get_dns_service] = lambda: mock_dns
|
||||
auth_app.dependency_overrides[get_email_service] = lambda: mock_email
|
||||
auth_app.dependency_overrides[get_html_fetcher] = lambda: mock_html
|
||||
auth_app.dependency_overrides[get_relme_parser] = lambda: mock_relme
|
||||
auth_app.dependency_overrides[get_auth_session_service] = lambda: mock_session
|
||||
|
||||
try:
|
||||
with TestClient(auth_app) as client:
|
||||
response = client.get("/authorize", params=valid_params_with_pkce)
|
||||
|
||||
# Should succeed and show verification page
|
||||
assert response.status_code == 200
|
||||
assert "Verify Your Identity" in response.text
|
||||
# Session should be created with PKCE parameters
|
||||
mock_session.create_session.assert_called_once()
|
||||
call_kwargs = mock_session.create_session.call_args[1]
|
||||
assert call_kwargs["code_challenge"] == "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM"
|
||||
assert call_kwargs["code_challenge_method"] == "S256"
|
||||
finally:
|
||||
auth_app.dependency_overrides.clear()
|
||||
|
||||
def test_authorization_with_pkce_defaults_to_s256(self, auth_app, mock_happ_fetch):
|
||||
"""Test authorization with code_challenge but no method defaults to S256."""
|
||||
from gondulf.dependencies import (
|
||||
get_dns_service, get_email_service, get_html_fetcher,
|
||||
get_relme_parser, get_auth_session_service, get_database
|
||||
)
|
||||
from gondulf.database.connection import Database
|
||||
from sqlalchemy import text
|
||||
import tempfile
|
||||
|
||||
params = {
|
||||
"client_id": "https://app.example.com",
|
||||
"redirect_uri": "https://app.example.com/callback",
|
||||
"response_type": "code",
|
||||
"state": "test123",
|
||||
"me": "https://user.example.com",
|
||||
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
|
||||
# No code_challenge_method - should default to S256
|
||||
}
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = Path(tmpdir) / "test.db"
|
||||
db = Database(f"sqlite:///{db_path}")
|
||||
db.initialize()
|
||||
|
||||
now = datetime.utcnow()
|
||||
with db.get_engine().begin() as conn:
|
||||
conn.execute(
|
||||
text("""
|
||||
INSERT OR REPLACE INTO domains
|
||||
(domain, email, verification_code, verified, verified_at, last_checked, two_factor)
|
||||
VALUES (:domain, '', '', 1, :now, :now, 0)
|
||||
"""),
|
||||
{"domain": "user.example.com", "now": now}
|
||||
)
|
||||
|
||||
mock_dns = Mock()
|
||||
mock_dns.verify_txt_record.return_value = True
|
||||
|
||||
mock_email = Mock()
|
||||
mock_email.send_verification_code = Mock()
|
||||
|
||||
mock_html = Mock()
|
||||
mock_html.fetch.return_value = '<html><a href="mailto:test@example.com" rel="me">Email</a></html>'
|
||||
|
||||
from gondulf.services.relme_parser import RelMeParser
|
||||
mock_relme = RelMeParser()
|
||||
|
||||
mock_session = Mock()
|
||||
mock_session.create_session.return_value = {
|
||||
"session_id": "test_session_123",
|
||||
"verification_code": "123456",
|
||||
"expires_at": datetime.utcnow() + timedelta(minutes=10)
|
||||
}
|
||||
|
||||
auth_app.dependency_overrides[get_database] = lambda: db
|
||||
auth_app.dependency_overrides[get_dns_service] = lambda: mock_dns
|
||||
auth_app.dependency_overrides[get_email_service] = lambda: mock_email
|
||||
auth_app.dependency_overrides[get_html_fetcher] = lambda: mock_html
|
||||
auth_app.dependency_overrides[get_relme_parser] = lambda: mock_relme
|
||||
auth_app.dependency_overrides[get_auth_session_service] = lambda: mock_session
|
||||
|
||||
try:
|
||||
with TestClient(auth_app) as client:
|
||||
response = client.get("/authorize", params=params)
|
||||
|
||||
# Should succeed and default to S256
|
||||
assert response.status_code == 200
|
||||
mock_session.create_session.assert_called_once()
|
||||
call_kwargs = mock_session.create_session.call_args[1]
|
||||
assert call_kwargs["code_challenge_method"] == "S256"
|
||||
finally:
|
||||
auth_app.dependency_overrides.clear()
|
||||
|
||||
def test_authorization_with_invalid_pkce_method_rejected(self, auth_client, mock_happ_fetch):
|
||||
"""Test authorization with code_challenge and invalid method is rejected."""
|
||||
params = {
|
||||
"client_id": "https://app.example.com",
|
||||
"redirect_uri": "https://app.example.com/callback",
|
||||
"response_type": "code",
|
||||
"state": "test123",
|
||||
"me": "https://user.example.com",
|
||||
"code_challenge": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM",
|
||||
"code_challenge_method": "plain", # Invalid - only S256 supported
|
||||
}
|
||||
|
||||
response = auth_client.get("/authorize", params=params, follow_redirects=False)
|
||||
|
||||
# Should redirect with error
|
||||
assert response.status_code == 302
|
||||
location = response.headers["location"]
|
||||
assert "error=invalid_request" in location
|
||||
assert "S256" in location
|
||||
|
||||
|
||||
class TestAuthorizationSecurityHeaders:
|
||||
"""Tests for security headers on authorization endpoints."""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user