"""
Tests for external IndieAuth token verification with endpoint discovery
Tests cover:
- Endpoint discovery from HTTP Link headers
- Endpoint discovery from HTML link elements
- Token verification with discovered endpoints
- Caching behavior for endpoints and tokens
- Error handling and edge cases
- HTTPS validation
- URL normalization
ADR: ADR-031 IndieAuth Endpoint Discovery Implementation
"""
import hashlib
import time
from unittest.mock import Mock, patch
import pytest
import httpx
from starpunk.auth_external import (
verify_external_token,
discover_endpoints,
check_scope,
normalize_url,
_parse_link_header,
_parse_html_links,
_cache,
DiscoveryError,
TokenVerificationError,
ENDPOINT_CACHE_TTL,
TOKEN_CACHE_TTL,
)
# Test Fixtures
# -------------
@pytest.fixture
def mock_profile_html():
"""HTML profile with IndieAuth link elements"""
return """
Test Profile
Hello World
"""
@pytest.fixture
def mock_profile_html_relative():
"""HTML profile with relative URLs"""
return """
"""
@pytest.fixture
def mock_link_headers():
"""HTTP Link headers with IndieAuth endpoints"""
return (
'; rel="authorization_endpoint", '
'; rel="token_endpoint"'
)
@pytest.fixture
def mock_token_response():
"""Valid token verification response"""
return {
'me': 'https://alice.example.com/',
'client_id': 'https://app.example.com/',
'scope': 'create update',
}
@pytest.fixture(autouse=True)
def clear_cache():
"""Clear cache before each test"""
_cache.endpoints = None
_cache.endpoints_expire = 0
_cache.token_cache.clear()
yield
# Clear after test too
_cache.endpoints = None
_cache.endpoints_expire = 0
_cache.token_cache.clear()
# Endpoint Discovery Tests
# -------------------------
def test_parse_link_header_both_endpoints(mock_link_headers):
"""Parse Link header with both authorization and token endpoints"""
endpoints = _parse_link_header(mock_link_headers, 'https://alice.example.com/')
assert endpoints['authorization_endpoint'] == 'https://auth.example.com/authorize'
assert endpoints['token_endpoint'] == 'https://auth.example.com/token'
def test_parse_link_header_single_endpoint():
"""Parse Link header with only token endpoint"""
header = '; rel="token_endpoint"'
endpoints = _parse_link_header(header, 'https://alice.example.com/')
assert endpoints['token_endpoint'] == 'https://auth.example.com/token'
assert 'authorization_endpoint' not in endpoints
def test_parse_link_header_relative_url():
"""Parse Link header with relative URL"""
header = '; rel="token_endpoint"'
endpoints = _parse_link_header(header, 'https://alice.example.com/')
assert endpoints['token_endpoint'] == 'https://alice.example.com/auth/token'
def test_parse_html_links_both_endpoints(mock_profile_html):
"""Parse HTML with both authorization and token endpoints"""
endpoints = _parse_html_links(mock_profile_html, 'https://alice.example.com/')
assert endpoints['authorization_endpoint'] == 'https://auth.example.com/authorize'
assert endpoints['token_endpoint'] == 'https://auth.example.com/token'
def test_parse_html_links_relative_urls(mock_profile_html_relative):
"""Parse HTML with relative endpoint URLs"""
endpoints = _parse_html_links(
mock_profile_html_relative,
'https://alice.example.com/'
)
assert endpoints['authorization_endpoint'] == 'https://alice.example.com/auth/authorize'
assert endpoints['token_endpoint'] == 'https://alice.example.com/auth/token'
def test_parse_html_links_empty():
"""Parse HTML with no IndieAuth links"""
html = ''
endpoints = _parse_html_links(html, 'https://alice.example.com/')
assert endpoints == {}
def test_parse_html_links_malformed():
"""Parse malformed HTML gracefully"""
html = '
'''
endpoints = _parse_html_links(html, 'https://alice.example.com/')
assert endpoints['authorization_endpoint'] == 'https://auth.example.com/authorize'
@patch('starpunk.auth_external.httpx.get')
def test_discover_endpoints_from_html(mock_get, app_with_admin_me, mock_profile_html):
"""Discover endpoints from HTML link elements"""
mock_response = Mock()
mock_response.status_code = 200
mock_response.headers = {'Content-Type': 'text/html'}
mock_response.text = mock_profile_html
mock_get.return_value = mock_response
with app_with_admin_me.app_context():
endpoints = discover_endpoints('https://alice.example.com/')
assert endpoints['token_endpoint'] == 'https://auth.example.com/token'
assert endpoints['authorization_endpoint'] == 'https://auth.example.com/authorize'
@patch('starpunk.auth_external.httpx.get')
def test_discover_endpoints_from_link_header(mock_get, app_with_admin_me, mock_link_headers):
"""Discover endpoints from HTTP Link headers"""
mock_response = Mock()
mock_response.status_code = 200
mock_response.headers = {
'Content-Type': 'text/html',
'Link': mock_link_headers
}
mock_response.text = ''
mock_get.return_value = mock_response
with app_with_admin_me.app_context():
endpoints = discover_endpoints('https://alice.example.com/')
assert endpoints['token_endpoint'] == 'https://auth.example.com/token'
@patch('starpunk.auth_external.httpx.get')
def test_discover_endpoints_link_header_priority(mock_get, app_with_admin_me, mock_profile_html, mock_link_headers):
"""Link headers take priority over HTML link elements"""
mock_response = Mock()
mock_response.status_code = 200
mock_response.headers = {
'Content-Type': 'text/html',
'Link': '; rel="token_endpoint"'
}
# HTML has different endpoint
mock_response.text = mock_profile_html
mock_get.return_value = mock_response
with app_with_admin_me.app_context():
endpoints = discover_endpoints('https://alice.example.com/')
# Link header should win
assert endpoints['token_endpoint'] == 'https://different.example.com/token'
@patch('starpunk.auth_external.httpx.get')
def test_discover_endpoints_no_token_endpoint(mock_get, app_with_admin_me):
"""Raise error if no token endpoint found"""
mock_response = Mock()
mock_response.status_code = 200
mock_response.headers = {'Content-Type': 'text/html'}
mock_response.text = ''
mock_get.return_value = mock_response
with app_with_admin_me.app_context():
with pytest.raises(DiscoveryError) as exc_info:
discover_endpoints('https://alice.example.com/')
assert 'No token endpoint found' in str(exc_info.value)
@patch('starpunk.auth_external.httpx.get')
def test_discover_endpoints_http_error(mock_get, app_with_admin_me):
"""Handle HTTP errors during discovery"""
mock_get.side_effect = httpx.HTTPStatusError(
"404 Not Found",
request=Mock(),
response=Mock(status_code=404)
)
with app_with_admin_me.app_context():
with pytest.raises(DiscoveryError) as exc_info:
discover_endpoints('https://alice.example.com/')
assert 'HTTP 404' in str(exc_info.value)
@patch('starpunk.auth_external.httpx.get')
def test_discover_endpoints_timeout(mock_get, app_with_admin_me):
"""Handle timeout during discovery"""
mock_get.side_effect = httpx.TimeoutException("Timeout")
with app_with_admin_me.app_context():
with pytest.raises(DiscoveryError) as exc_info:
discover_endpoints('https://alice.example.com/')
assert 'Timeout' in str(exc_info.value)
@patch('starpunk.auth_external.httpx.get')
def test_discover_endpoints_network_error(mock_get, app_with_admin_me):
"""Handle network errors during discovery"""
mock_get.side_effect = httpx.NetworkError("Connection failed")
with app_with_admin_me.app_context():
with pytest.raises(DiscoveryError) as exc_info:
discover_endpoints('https://alice.example.com/')
assert 'Network error' in str(exc_info.value)
# HTTPS Validation Tests
# -----------------------
def test_discover_endpoints_http_not_allowed_production(app_with_admin_me):
"""HTTP profile URLs not allowed in production"""
with app_with_admin_me.app_context():
app_with_admin_me.config['DEBUG'] = False
with pytest.raises(DiscoveryError) as exc_info:
discover_endpoints('http://alice.example.com/')
assert 'HTTPS required' in str(exc_info.value)
def test_discover_endpoints_http_allowed_debug(app_with_admin_me):
"""HTTP profile URLs allowed in debug mode"""
with app_with_admin_me.app_context():
app_with_admin_me.config['DEBUG'] = True
# Should validate without raising (mock would be needed for full test)
# Just test validation doesn't raise
from starpunk.auth_external import _validate_profile_url
_validate_profile_url('http://localhost:5000/')
def test_discover_endpoints_localhost_not_allowed_production(app_with_admin_me):
"""Localhost URLs not allowed in production"""
with app_with_admin_me.app_context():
app_with_admin_me.config['DEBUG'] = False
with pytest.raises(DiscoveryError) as exc_info:
discover_endpoints('https://localhost/')
assert 'Localhost' in str(exc_info.value)
# Caching Tests
# -------------
@patch('starpunk.auth_external.httpx.get')
def test_discover_endpoints_caching(mock_get, app_with_admin_me, mock_profile_html):
"""Discovered endpoints are cached"""
mock_response = Mock()
mock_response.status_code = 200
mock_response.headers = {'Content-Type': 'text/html'}
mock_response.text = mock_profile_html
mock_get.return_value = mock_response
with app_with_admin_me.app_context():
# First call - should fetch
endpoints1 = discover_endpoints('https://alice.example.com/')
# Second call - should use cache
endpoints2 = discover_endpoints('https://alice.example.com/')
# Should only call httpx.get once
assert mock_get.call_count == 1
assert endpoints1 == endpoints2
@patch('starpunk.auth_external.httpx.get')
def test_discover_endpoints_cache_expiry(mock_get, app_with_admin_me, mock_profile_html):
"""Endpoint cache expires after TTL"""
mock_response = Mock()
mock_response.status_code = 200
mock_response.headers = {'Content-Type': 'text/html'}
mock_response.text = mock_profile_html
mock_get.return_value = mock_response
with app_with_admin_me.app_context():
# First call
discover_endpoints('https://alice.example.com/')
# Expire cache manually
_cache.endpoints_expire = time.time() - 1
# Second call should fetch again
discover_endpoints('https://alice.example.com/')
assert mock_get.call_count == 2
@patch('starpunk.auth_external.httpx.get')
def test_discover_endpoints_grace_period(mock_get, app_with_admin_me, mock_profile_html):
"""Use expired cache on network failure (grace period)"""
# First call succeeds
mock_response = Mock()
mock_response.status_code = 200
mock_response.headers = {'Content-Type': 'text/html'}
mock_response.text = mock_profile_html
mock_get.return_value = mock_response
with app_with_admin_me.app_context():
endpoints1 = discover_endpoints('https://alice.example.com/')
# Expire cache
_cache.endpoints_expire = time.time() - 1
# Second call fails, but should use expired cache
mock_get.side_effect = httpx.NetworkError("Connection failed")
endpoints2 = discover_endpoints('https://alice.example.com/')
# Should return cached endpoints despite network failure
assert endpoints1 == endpoints2
# Token Verification Tests
# -------------------------
@patch('starpunk.auth_external.discover_endpoints')
@patch('starpunk.auth_external.httpx.get')
def test_verify_external_token_success(mock_get, mock_discover, app_with_admin_me, mock_token_response):
"""Successfully verify token with discovered endpoint"""
# Mock discovery
mock_discover.return_value = {
'token_endpoint': 'https://auth.example.com/token'
}
# Mock token verification
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = mock_token_response
mock_get.return_value = mock_response
with app_with_admin_me.app_context():
token_info = verify_external_token('test-token-123')
assert token_info is not None
assert token_info['me'] == 'https://alice.example.com/'
assert token_info['scope'] == 'create update'
@patch('starpunk.auth_external.discover_endpoints')
@patch('starpunk.auth_external.httpx.get')
def test_verify_external_token_wrong_me(mock_get, mock_discover, app_with_admin_me):
"""Reject token for different user"""
mock_discover.return_value = {
'token_endpoint': 'https://auth.example.com/token'
}
# Token for wrong user
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
'me': 'https://bob.example.com/', # Not ADMIN_ME
'scope': 'create',
}
mock_get.return_value = mock_response
with app_with_admin_me.app_context():
token_info = verify_external_token('test-token-123')
# Should reject
assert token_info is None
@patch('starpunk.auth_external.discover_endpoints')
@patch('starpunk.auth_external.httpx.get')
def test_verify_external_token_401(mock_get, mock_discover, app_with_admin_me):
"""Handle 401 Unauthorized from token endpoint"""
mock_discover.return_value = {
'token_endpoint': 'https://auth.example.com/token'
}
mock_response = Mock()
mock_response.status_code = 401
mock_get.return_value = mock_response
with app_with_admin_me.app_context():
token_info = verify_external_token('invalid-token')
assert token_info is None
@patch('starpunk.auth_external.discover_endpoints')
@patch('starpunk.auth_external.httpx.get')
def test_verify_external_token_missing_me(mock_get, mock_discover, app_with_admin_me):
"""Reject token response missing 'me' field"""
mock_discover.return_value = {
'token_endpoint': 'https://auth.example.com/token'
}
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
'scope': 'create',
# Missing 'me' field
}
mock_get.return_value = mock_response
with app_with_admin_me.app_context():
token_info = verify_external_token('test-token')
assert token_info is None
@patch('starpunk.auth_external.discover_endpoints')
@patch('starpunk.auth_external.httpx.get')
def test_verify_external_token_retry_on_500(mock_get, mock_discover, app_with_admin_me, mock_token_response):
"""Retry token verification on 500 server error"""
mock_discover.return_value = {
'token_endpoint': 'https://auth.example.com/token'
}
# First call: 500 error
error_response = Mock()
error_response.status_code = 500
# Second call: success
success_response = Mock()
success_response.status_code = 200
success_response.json.return_value = mock_token_response
mock_get.side_effect = [error_response, success_response]
with app_with_admin_me.app_context():
with patch('time.sleep'): # Skip sleep delay
token_info = verify_external_token('test-token')
assert token_info is not None
assert mock_get.call_count == 2
@patch('starpunk.auth_external.discover_endpoints')
@patch('starpunk.auth_external.httpx.get')
def test_verify_external_token_no_retry_on_403(mock_get, mock_discover, app_with_admin_me):
"""Don't retry on 403 Forbidden (client error)"""
mock_discover.return_value = {
'token_endpoint': 'https://auth.example.com/token'
}
mock_response = Mock()
mock_response.status_code = 403
mock_get.return_value = mock_response
with app_with_admin_me.app_context():
token_info = verify_external_token('test-token')
assert token_info is None
# Should only call once (no retries)
assert mock_get.call_count == 1
@patch('starpunk.auth_external.discover_endpoints')
@patch('starpunk.auth_external.httpx.get')
def test_verify_external_token_caching(mock_get, mock_discover, app_with_admin_me, mock_token_response):
"""Token verifications are cached"""
mock_discover.return_value = {
'token_endpoint': 'https://auth.example.com/token'
}
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = mock_token_response
mock_get.return_value = mock_response
with app_with_admin_me.app_context():
# First call
token_info1 = verify_external_token('test-token')
# Second call should use cache
token_info2 = verify_external_token('test-token')
assert token_info1 == token_info2
# Should only verify once
assert mock_get.call_count == 1
@patch('starpunk.auth_external.discover_endpoints')
def test_verify_external_token_no_admin_me(mock_discover, app):
"""Fail if ADMIN_ME not configured"""
with app.app_context():
# app fixture has no ADMIN_ME
token_info = verify_external_token('test-token')
assert token_info is None
# Should not even attempt discovery
mock_discover.assert_not_called()
# URL Normalization Tests
# ------------------------
def test_normalize_url_removes_trailing_slash():
"""Normalize URL removes trailing slash"""
assert normalize_url('https://example.com/') == 'https://example.com'
assert normalize_url('https://example.com') == 'https://example.com'
def test_normalize_url_lowercase():
"""Normalize URL converts to lowercase"""
assert normalize_url('https://Example.COM/') == 'https://example.com'
assert normalize_url('HTTPS://EXAMPLE.COM') == 'https://example.com'
def test_normalize_url_path_preserved():
"""Normalize URL preserves path"""
assert normalize_url('https://example.com/path/') == 'https://example.com/path'
assert normalize_url('https://Example.com/Path') == 'https://example.com/path'
# Scope Checking Tests
# ---------------------
def test_check_scope_present():
"""Check scope returns True when scope is present"""
assert check_scope('create', 'create update delete') is True
assert check_scope('create', 'create') is True
def test_check_scope_missing():
"""Check scope returns False when scope is missing"""
assert check_scope('create', 'update delete') is False
assert check_scope('create', '') is False
assert check_scope('create', 'created') is False # Partial match
def test_check_scope_empty():
"""Check scope handles empty scope string"""
assert check_scope('create', '') is False
assert check_scope('create', None) is False
# Fixtures
# --------
@pytest.fixture
def app():
"""Create test Flask app without ADMIN_ME"""
from flask import Flask
app = Flask(__name__)
app.config['TESTING'] = True
app.config['DEBUG'] = False
return app
@pytest.fixture
def app_with_admin_me():
"""Create test Flask app with ADMIN_ME configured"""
from flask import Flask
app = Flask(__name__)
app.config['TESTING'] = True
app.config['DEBUG'] = False
app.config['ADMIN_ME'] = 'https://alice.example.com/'
app.config['VERSION'] = '1.0.0-test'
return app