Files
StarPunk/starpunk/auth_external.py
Phil Skentelbery 2bd971f3d6 fix(auth): Implement IndieAuth endpoint discovery per W3C spec
BREAKING: Removes INDIELOGIN_URL config - endpoints are now properly
discovered from user's profile URL as required by W3C IndieAuth spec.

- auth.py: Uses discover_endpoints() to find authorization_endpoint
- config.py: Deprecation warning for obsolete INDIELOGIN_URL setting
- auth_external.py: Relaxed validation (allows auth-only flows)
- tests: Updated to mock endpoint discovery

This fixes a regression where admin login was hardcoded to use
indielogin.com instead of respecting the user's declared endpoints.

Version: 1.5.0-hotfix.1

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-17 13:52:36 -07:00

615 lines
18 KiB
Python

"""
External IndieAuth Token Verification with Endpoint Discovery
This module handles verification of bearer tokens issued by external
IndieAuth providers. Following the IndieAuth specification, endpoints
are discovered dynamically from the user's profile URL, not hardcoded.
For StarPunk V1 (single-user CMS), we always discover endpoints from
ADMIN_ME since only the site owner can post content.
Key Components:
EndpointCache: Simple in-memory cache for discovered endpoints and tokens
verify_external_token: Main entry point for token verification
discover_endpoints: Discovers IndieAuth endpoints from profile URL
Configuration (via Flask app.config):
ADMIN_ME: Site owner's profile URL (required)
DEBUG: Allow HTTP endpoints in debug mode
ADR: ADR-031 IndieAuth Endpoint Discovery Implementation
Date: 2025-11-24
Version: v1.0.0-rc.5
"""
import hashlib
import logging
import re
import time
from typing import Dict, Optional, Any
from urllib.parse import urljoin, urlparse
import httpx
from bs4 import BeautifulSoup
from flask import current_app
# Timeouts
DISCOVERY_TIMEOUT = 5.0 # Profile fetch (cached, so can be slower)
VERIFICATION_TIMEOUT = 3.0 # Token verification (every request)
# Cache TTLs
ENDPOINT_CACHE_TTL = 3600 # 1 hour for endpoints
TOKEN_CACHE_TTL = 300 # 5 minutes for token verifications
class EndpointCache:
"""
Simple in-memory cache for endpoint discovery and token verification
V1 single-user implementation: We only cache one user's endpoints
since StarPunk V1 is explicitly single-user (only ADMIN_ME can post).
When V2 adds multi-user support, this will need refactoring to
cache endpoints per profile URL.
"""
def __init__(self):
# Endpoint cache (single-user V1)
self.endpoints: Optional[Dict[str, str]] = None
self.endpoints_expire: float = 0
# Token verification cache (token_hash -> (info, expiry))
self.token_cache: Dict[str, tuple[Dict[str, Any], float]] = {}
def get_endpoints(self, ignore_expiry: bool = False) -> Optional[Dict[str, str]]:
"""
Get cached endpoints if still valid
Args:
ignore_expiry: Return cached endpoints even if expired (grace period)
Returns:
Cached endpoints dict or None if not cached or expired
"""
if self.endpoints is None:
return None
if ignore_expiry or time.time() < self.endpoints_expire:
return self.endpoints
return None
def set_endpoints(self, endpoints: Dict[str, str], ttl: int = ENDPOINT_CACHE_TTL):
"""Cache discovered endpoints"""
self.endpoints = endpoints
self.endpoints_expire = time.time() + ttl
def get_token_info(self, token_hash: str) -> Optional[Dict[str, Any]]:
"""Get cached token verification if still valid"""
if token_hash in self.token_cache:
info, expiry = self.token_cache[token_hash]
if time.time() < expiry:
return info
else:
# Expired, remove from cache
del self.token_cache[token_hash]
return None
def set_token_info(self, token_hash: str, info: Dict[str, Any], ttl: int = TOKEN_CACHE_TTL):
"""Cache token verification result"""
expiry = time.time() + ttl
self.token_cache[token_hash] = (info, expiry)
# Global cache instance (singleton for V1)
_cache = EndpointCache()
class DiscoveryError(Exception):
"""Raised when endpoint discovery fails"""
pass
class TokenVerificationError(Exception):
"""Raised when token verification fails"""
pass
def verify_external_token(token: str) -> Optional[Dict[str, Any]]:
"""
Verify bearer token with external IndieAuth provider
This is the main entry point for token verification. For StarPunk V1
(single-user), we always discover endpoints from ADMIN_ME since only
the site owner can post content.
Process:
1. Check token verification cache
2. Discover endpoints from ADMIN_ME (with caching)
3. Verify token with discovered endpoint
4. Validate token belongs to ADMIN_ME
5. Cache successful verification
Args:
token: Bearer token to verify
Returns:
Dict with token info (me, client_id, scope) if valid
None if token is invalid or verification fails
Token info dict contains:
me: User's profile URL
client_id: Client application URL
scope: Space-separated list of scopes
"""
admin_me = current_app.config.get("ADMIN_ME")
if not admin_me:
current_app.logger.error(
"ADMIN_ME not configured. Cannot verify token ownership."
)
return None
# Check token cache first
token_hash = _hash_token(token)
cached_info = _cache.get_token_info(token_hash)
if cached_info:
current_app.logger.debug("Token verification cache hit")
return cached_info
# Discover endpoints from ADMIN_ME (V1 single-user assumption)
try:
endpoints = discover_endpoints(admin_me)
except DiscoveryError as e:
current_app.logger.error(f"Endpoint discovery failed: {e}")
return None
token_endpoint = endpoints.get('token_endpoint')
if not token_endpoint:
current_app.logger.error("No token endpoint found in discovery")
return None
# Verify token with discovered endpoint
try:
token_info = _verify_with_endpoint(token_endpoint, token)
except TokenVerificationError as e:
current_app.logger.warning(f"Token verification failed: {e}")
return None
# Validate token belongs to admin (single-user security check)
token_me = token_info.get('me', '')
if normalize_url(token_me) != normalize_url(admin_me):
current_app.logger.warning(
f"Token 'me' mismatch: {token_me} != {admin_me}"
)
return None
# Cache successful verification
_cache.set_token_info(token_hash, token_info)
current_app.logger.debug(f"Token verified successfully for {token_me}")
return token_info
def discover_endpoints(profile_url: str) -> Dict[str, str]:
"""
Discover IndieAuth endpoints from a profile URL
Implements IndieAuth endpoint discovery per W3C spec:
https://www.w3.org/TR/indieauth/#discovery-by-clients
Discovery priority:
1. HTTP Link headers (highest priority)
2. HTML link elements
Args:
profile_url: User's profile URL (their IndieWeb identity)
Returns:
Dict with discovered endpoints:
{
'authorization_endpoint': 'https://...',
'token_endpoint': 'https://...'
}
Raises:
DiscoveryError: If discovery fails or no endpoints found
"""
# Check cache first
cached_endpoints = _cache.get_endpoints()
if cached_endpoints:
current_app.logger.debug("Endpoint discovery cache hit")
return cached_endpoints
# Validate profile URL
_validate_profile_url(profile_url)
try:
# Fetch profile with discovery
endpoints = _fetch_and_parse(profile_url)
# Cache successful discovery
_cache.set_endpoints(endpoints)
return endpoints
except Exception as e:
# Check cache even if expired (grace period for network failures)
cached = _cache.get_endpoints(ignore_expiry=True)
if cached:
current_app.logger.warning(
f"Using expired cache due to discovery failure: {e}"
)
return cached
# No cache available, must fail
raise DiscoveryError(f"Endpoint discovery failed: {e}")
def _fetch_and_parse(profile_url: str) -> Dict[str, str]:
"""
Fetch profile URL and parse endpoints from headers and HTML
Args:
profile_url: User's profile URL
Returns:
Dict with discovered endpoints
Raises:
DiscoveryError: If fetch fails or no endpoints found
"""
try:
response = httpx.get(
profile_url,
timeout=DISCOVERY_TIMEOUT,
follow_redirects=True,
headers={
'Accept': 'text/html,application/xhtml+xml',
'User-Agent': f'StarPunk/{current_app.config.get("VERSION", "1.0")}'
}
)
response.raise_for_status()
except httpx.TimeoutException:
raise DiscoveryError(f"Timeout fetching profile: {profile_url}")
except httpx.HTTPStatusError as e:
raise DiscoveryError(f"HTTP {e.response.status_code} fetching profile")
except httpx.RequestError as e:
raise DiscoveryError(f"Network error fetching profile: {e}")
endpoints = {}
# 1. Parse HTTP Link headers (highest priority)
link_header = response.headers.get('Link', '')
if link_header:
link_endpoints = _parse_link_header(link_header, profile_url)
endpoints.update(link_endpoints)
# 2. Parse HTML link elements
content_type = response.headers.get('Content-Type', '')
if 'text/html' in content_type or 'application/xhtml+xml' in content_type:
try:
html_endpoints = _parse_html_links(response.text, profile_url)
# Merge: Link headers take priority (so update HTML first)
html_endpoints.update(endpoints)
endpoints = html_endpoints
except Exception as e:
current_app.logger.warning(f"HTML parsing failed: {e}")
# Continue with Link header endpoints if HTML parsing fails
# Validate we found at least one endpoint
# - authorization_endpoint: Required for authentication-only flows (admin login)
# - token_endpoint: Required for Micropub token verification
# Having at least one allows the appropriate flow to work
if 'token_endpoint' not in endpoints and 'authorization_endpoint' not in endpoints:
raise DiscoveryError(
f"No IndieAuth endpoints found at {profile_url}. "
"Ensure your profile has authorization_endpoint or token_endpoint configured."
)
# Validate endpoint URLs
for rel, url in endpoints.items():
_validate_endpoint_url(url, rel)
current_app.logger.info(
f"Discovered endpoints from {profile_url}: "
f"token={endpoints.get('token_endpoint')}, "
f"auth={endpoints.get('authorization_endpoint')}"
)
return endpoints
def _parse_link_header(header: str, base_url: str) -> Dict[str, str]:
"""
Parse HTTP Link header for IndieAuth endpoints
Basic RFC 8288 support - handles simple Link headers.
Limitations: Only supports quoted rel values, single Link headers.
Example:
Link: <https://auth.example.com/token>; rel="token_endpoint"
Args:
header: Link header value
base_url: Base URL for resolving relative URLs
Returns:
Dict with discovered endpoints
"""
endpoints = {}
# Pattern: <url>; rel="relation"
# Note: Simplified - doesn't handle all RFC 8288 edge cases
pattern = r'<([^>]+)>;\s*rel="([^"]+)"'
matches = re.findall(pattern, header)
for url, rel in matches:
if rel == 'authorization_endpoint':
endpoints['authorization_endpoint'] = urljoin(base_url, url)
elif rel == 'token_endpoint':
endpoints['token_endpoint'] = urljoin(base_url, url)
return endpoints
def _parse_html_links(html: str, base_url: str) -> Dict[str, str]:
"""
Extract IndieAuth endpoints from HTML link elements
Looks for:
<link rel="authorization_endpoint" href="...">
<link rel="token_endpoint" href="...">
Args:
html: HTML content
base_url: Base URL for resolving relative URLs
Returns:
Dict with discovered endpoints
"""
endpoints = {}
try:
soup = BeautifulSoup(html, 'html.parser')
# Find all link elements (check both head and body - be liberal)
for link in soup.find_all('link', rel=True):
rel = link.get('rel')
href = link.get('href')
if not href:
continue
# rel can be a list or string
if isinstance(rel, list):
rel = ' '.join(rel)
# Check for IndieAuth endpoints
if 'authorization_endpoint' in rel:
endpoints['authorization_endpoint'] = urljoin(base_url, href)
elif 'token_endpoint' in rel:
endpoints['token_endpoint'] = urljoin(base_url, href)
except Exception as e:
current_app.logger.warning(f"HTML parsing error: {e}")
# Return what we found so far
return endpoints
def _verify_with_endpoint(endpoint: str, token: str) -> Dict[str, Any]:
"""
Verify token with the discovered token endpoint
Makes GET request to endpoint with Authorization header.
Implements retry logic for network errors only.
Args:
endpoint: Token endpoint URL
token: Bearer token to verify
Returns:
Token info dict from endpoint
Raises:
TokenVerificationError: If verification fails
"""
headers = {
'Authorization': f'Bearer {token}',
'Accept': 'application/json',
}
max_retries = 3
for attempt in range(max_retries):
try:
response = httpx.get(
endpoint,
headers=headers,
timeout=VERIFICATION_TIMEOUT,
follow_redirects=True,
)
# Handle HTTP status codes
if response.status_code == 200:
token_info = response.json()
# Validate required fields
if 'me' not in token_info:
raise TokenVerificationError("Token response missing 'me' field")
return token_info
# Client errors - don't retry
elif response.status_code in [400, 401, 403, 404]:
raise TokenVerificationError(
f"Token verification failed: HTTP {response.status_code}"
)
# Server errors - retry
elif response.status_code in [500, 502, 503, 504]:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
current_app.logger.debug(
f"Server error {response.status_code}, retrying in {wait_time}s..."
)
time.sleep(wait_time)
continue
else:
raise TokenVerificationError(
f"Token endpoint error: HTTP {response.status_code}"
)
# Other status codes
else:
raise TokenVerificationError(
f"Unexpected response: HTTP {response.status_code}"
)
except httpx.TimeoutException:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
current_app.logger.debug(f"Timeout, retrying in {wait_time}s...")
time.sleep(wait_time)
continue
else:
raise TokenVerificationError("Token verification timeout")
except httpx.NetworkError as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
current_app.logger.debug(f"Network error, retrying in {wait_time}s...")
time.sleep(wait_time)
continue
else:
raise TokenVerificationError(f"Network error: {e}")
except Exception as e:
# Don't retry for unexpected errors
raise TokenVerificationError(f"Verification failed: {e}")
# Should never reach here, but just in case
raise TokenVerificationError("Maximum retries exceeded")
def _validate_profile_url(url: str) -> None:
"""
Validate profile URL format and security requirements
Args:
url: Profile URL to validate
Raises:
DiscoveryError: If URL is invalid or insecure
"""
parsed = urlparse(url)
# Must be absolute
if not parsed.scheme or not parsed.netloc:
raise DiscoveryError(f"Invalid profile URL format: {url}")
# HTTPS required in production
if not current_app.debug and parsed.scheme != 'https':
raise DiscoveryError(
f"HTTPS required for profile URLs in production. Got: {url}"
)
# Allow localhost only in debug mode
if not current_app.debug and parsed.hostname in ['localhost', '127.0.0.1', '::1']:
raise DiscoveryError(
"Localhost URLs not allowed in production"
)
def _validate_endpoint_url(url: str, rel: str) -> None:
"""
Validate discovered endpoint URL
Args:
url: Endpoint URL to validate
rel: Endpoint relation (for error messages)
Raises:
DiscoveryError: If URL is invalid or insecure
"""
parsed = urlparse(url)
# Must be absolute
if not parsed.scheme or not parsed.netloc:
raise DiscoveryError(f"Invalid {rel} URL format: {url}")
# HTTPS required in production
if not current_app.debug and parsed.scheme != 'https':
raise DiscoveryError(
f"HTTPS required for {rel} in production. Got: {url}"
)
# Allow localhost only in debug mode
if not current_app.debug and parsed.hostname in ['localhost', '127.0.0.1', '::1']:
raise DiscoveryError(
f"Localhost not allowed for {rel} in production"
)
def normalize_url(url: str) -> str:
"""
Normalize URL for comparison
Removes trailing slash and converts to lowercase.
Used only for comparison, not for storage.
Args:
url: URL to normalize
Returns:
Normalized URL
"""
return url.rstrip('/').lower()
def _hash_token(token: str) -> str:
"""
Hash token for secure caching
Uses SHA-256 to prevent tokens from appearing in logs
and to create fixed-length cache keys.
Args:
token: Bearer token
Returns:
SHA-256 hash of token (hex)
"""
return hashlib.sha256(token.encode()).hexdigest()
def check_scope(required_scope: str, token_scope: str) -> bool:
"""
Check if token has required scope
Scopes are space-separated in token_scope string.
Any scope in the list satisfies the requirement.
Args:
required_scope: Scope needed (e.g., "create")
token_scope: Space-separated scope string from token
Returns:
True if token has required scope, False otherwise
Examples:
>>> check_scope("create", "create update")
True
>>> check_scope("create", "read")
False
>>> check_scope("create", "")
False
"""
if not token_scope:
return False
scopes = token_scope.split()
return required_scope in scopes