fix: Implement IndieAuth endpoint discovery (v1.0.0-rc.5)
CRITICAL: Fix hardcoded IndieAuth endpoint configuration that violated the W3C IndieAuth specification. Endpoints are now discovered dynamically from the user's profile URL as required by the spec. This combines two critical fixes for v1.0.0-rc.5: 1. Migration race condition fix (previously committed) 2. IndieAuth endpoint discovery (this commit) ## What Changed ### Endpoint Discovery Implementation - Completely rewrote starpunk/auth_external.py with full endpoint discovery - Implements W3C IndieAuth specification Section 4.2 (Discovery by Clients) - Supports HTTP Link headers and HTML link elements for discovery - Always discovers from ADMIN_ME (single-user V1 assumption) - Endpoint caching (1 hour TTL) for performance - Token verification caching (5 minutes TTL) - Graceful fallback to expired cache on network failures ### Breaking Changes - REMOVED: TOKEN_ENDPOINT configuration variable - Endpoints now discovered automatically from ADMIN_ME profile - ADMIN_ME profile must include IndieAuth link elements or headers - Deprecation warning shown if TOKEN_ENDPOINT still in environment ### Added - New dependency: beautifulsoup4>=4.12.0 for HTML parsing - HTTP Link header parsing (RFC 8288 basic support) - HTML link element extraction with BeautifulSoup4 - Relative URL resolution against profile URL - HTTPS enforcement in production (HTTP allowed in debug mode) - Comprehensive error handling with clear messages - 35 new tests covering all discovery scenarios ### Security - Token hashing (SHA-256) for secure caching - HTTPS required in production, localhost only in debug mode - URL validation prevents injection - Fail closed on security errors - Single-user validation (token must belong to ADMIN_ME) ### Performance - Cold cache: ~700ms (first request per hour) - Warm cache: ~2ms (subsequent requests) - Grace period maintains service during network issues ## Testing - 536 tests passing (excluding timing-sensitive migration tests) - 35 new endpoint discovery tests (all passing) - Zero regressions in existing functionality ## Documentation - Updated CHANGELOG.md with comprehensive v1.0.0-rc.5 entry - Implementation report: docs/reports/2025-11-24-v1.0.0-rc.5-implementation.md - Migration guide: docs/migration/fix-hardcoded-endpoints.md (architect) - ADR-031: Endpoint Discovery Implementation Details (architect) ## Migration Required 1. Ensure ADMIN_ME profile has IndieAuth link elements 2. Remove TOKEN_ENDPOINT from .env file 3. Restart StarPunk - endpoints discovered automatically Following: - ADR-031: Endpoint Discovery Implementation Details - docs/architecture/endpoint-discovery-answers.md (architect Q&A) - docs/architecture/indieauth-endpoint-discovery.md (architect guide) - W3C IndieAuth Specification Section 4.2 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -1,29 +1,118 @@
|
||||
"""
|
||||
External IndieAuth Token Verification for StarPunk
|
||||
External IndieAuth Token Verification with Endpoint Discovery
|
||||
|
||||
This module handles verification of bearer tokens issued by external
|
||||
IndieAuth providers. StarPunk no longer issues its own tokens (Phase 2+3
|
||||
of IndieAuth removal), but still needs to verify tokens for Micropub requests.
|
||||
IndieAuth providers. Following the IndieAuth specification, endpoints
|
||||
are discovered dynamically from the user's profile URL, not hardcoded.
|
||||
|
||||
Functions:
|
||||
verify_external_token: Verify token with external IndieAuth provider
|
||||
check_scope: Verify token has required scope
|
||||
For StarPunk V1 (single-user CMS), we always discover endpoints from
|
||||
ADMIN_ME since only the site owner can post content.
|
||||
|
||||
Key Components:
|
||||
EndpointCache: Simple in-memory cache for discovered endpoints and tokens
|
||||
verify_external_token: Main entry point for token verification
|
||||
discover_endpoints: Discovers IndieAuth endpoints from profile URL
|
||||
|
||||
Configuration (via Flask app.config):
|
||||
TOKEN_ENDPOINT: External token endpoint URL for verification
|
||||
ADMIN_ME: Expected 'me' value in token (site owner identity)
|
||||
ADMIN_ME: Site owner's profile URL (required)
|
||||
DEBUG: Allow HTTP endpoints in debug mode
|
||||
|
||||
ADR: ADR-030 IndieAuth Provider Removal Strategy
|
||||
ADR: ADR-031 IndieAuth Endpoint Discovery Implementation
|
||||
Date: 2025-11-24
|
||||
Version: v1.0.0-rc.5
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
from typing import Dict, Optional, Any
|
||||
from urllib.parse import urljoin, urlparse
|
||||
|
||||
import httpx
|
||||
from typing import Optional, Dict, Any
|
||||
from bs4 import BeautifulSoup
|
||||
from flask import current_app
|
||||
|
||||
|
||||
# Timeouts
|
||||
DISCOVERY_TIMEOUT = 5.0 # Profile fetch (cached, so can be slower)
|
||||
VERIFICATION_TIMEOUT = 3.0 # Token verification (every request)
|
||||
|
||||
# Cache TTLs
|
||||
ENDPOINT_CACHE_TTL = 3600 # 1 hour for endpoints
|
||||
TOKEN_CACHE_TTL = 300 # 5 minutes for token verifications
|
||||
|
||||
|
||||
class EndpointCache:
|
||||
"""
|
||||
Simple in-memory cache for endpoint discovery and token verification
|
||||
|
||||
V1 single-user implementation: We only cache one user's endpoints
|
||||
since StarPunk V1 is explicitly single-user (only ADMIN_ME can post).
|
||||
|
||||
When V2 adds multi-user support, this will need refactoring to
|
||||
cache endpoints per profile URL.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# Endpoint cache (single-user V1)
|
||||
self.endpoints: Optional[Dict[str, str]] = None
|
||||
self.endpoints_expire: float = 0
|
||||
|
||||
# Token verification cache (token_hash -> (info, expiry))
|
||||
self.token_cache: Dict[str, tuple[Dict[str, Any], float]] = {}
|
||||
|
||||
def get_endpoints(self, ignore_expiry: bool = False) -> Optional[Dict[str, str]]:
|
||||
"""
|
||||
Get cached endpoints if still valid
|
||||
|
||||
Args:
|
||||
ignore_expiry: Return cached endpoints even if expired (grace period)
|
||||
|
||||
Returns:
|
||||
Cached endpoints dict or None if not cached or expired
|
||||
"""
|
||||
if self.endpoints is None:
|
||||
return None
|
||||
|
||||
if ignore_expiry or time.time() < self.endpoints_expire:
|
||||
return self.endpoints
|
||||
|
||||
return None
|
||||
|
||||
def set_endpoints(self, endpoints: Dict[str, str], ttl: int = ENDPOINT_CACHE_TTL):
|
||||
"""Cache discovered endpoints"""
|
||||
self.endpoints = endpoints
|
||||
self.endpoints_expire = time.time() + ttl
|
||||
|
||||
def get_token_info(self, token_hash: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get cached token verification if still valid"""
|
||||
if token_hash in self.token_cache:
|
||||
info, expiry = self.token_cache[token_hash]
|
||||
if time.time() < expiry:
|
||||
return info
|
||||
else:
|
||||
# Expired, remove from cache
|
||||
del self.token_cache[token_hash]
|
||||
return None
|
||||
|
||||
def set_token_info(self, token_hash: str, info: Dict[str, Any], ttl: int = TOKEN_CACHE_TTL):
|
||||
"""Cache token verification result"""
|
||||
expiry = time.time() + ttl
|
||||
self.token_cache[token_hash] = (info, expiry)
|
||||
|
||||
|
||||
# Global cache instance (singleton for V1)
|
||||
_cache = EndpointCache()
|
||||
|
||||
|
||||
class DiscoveryError(Exception):
|
||||
"""Raised when endpoint discovery fails"""
|
||||
pass
|
||||
|
||||
|
||||
class TokenVerificationError(Exception):
|
||||
"""Token verification failed"""
|
||||
"""Raised when token verification fails"""
|
||||
pass
|
||||
|
||||
|
||||
@@ -31,8 +120,16 @@ def verify_external_token(token: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Verify bearer token with external IndieAuth provider
|
||||
|
||||
Makes a GET request to the token endpoint with Authorization header.
|
||||
The external provider returns token info if valid, or error if invalid.
|
||||
This is the main entry point for token verification. For StarPunk V1
|
||||
(single-user), we always discover endpoints from ADMIN_ME since only
|
||||
the site owner can post content.
|
||||
|
||||
Process:
|
||||
1. Check token verification cache
|
||||
2. Discover endpoints from ADMIN_ME (with caching)
|
||||
3. Verify token with discovered endpoint
|
||||
4. Validate token belongs to ADMIN_ME
|
||||
5. Cache successful verification
|
||||
|
||||
Args:
|
||||
token: Bearer token to verify
|
||||
@@ -46,82 +143,443 @@ def verify_external_token(token: str) -> Optional[Dict[str, Any]]:
|
||||
client_id: Client application URL
|
||||
scope: Space-separated list of scopes
|
||||
"""
|
||||
token_endpoint = current_app.config.get("TOKEN_ENDPOINT")
|
||||
admin_me = current_app.config.get("ADMIN_ME")
|
||||
|
||||
if not token_endpoint:
|
||||
current_app.logger.error(
|
||||
"TOKEN_ENDPOINT not configured. Cannot verify external tokens."
|
||||
)
|
||||
return None
|
||||
|
||||
if not admin_me:
|
||||
current_app.logger.error(
|
||||
"ADMIN_ME not configured. Cannot verify token ownership."
|
||||
)
|
||||
return None
|
||||
|
||||
# Check token cache first
|
||||
token_hash = _hash_token(token)
|
||||
cached_info = _cache.get_token_info(token_hash)
|
||||
if cached_info:
|
||||
current_app.logger.debug("Token verification cache hit")
|
||||
return cached_info
|
||||
|
||||
# Discover endpoints from ADMIN_ME (V1 single-user assumption)
|
||||
try:
|
||||
# Verify token with external provider
|
||||
headers = {
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Accept": "application/json",
|
||||
endpoints = discover_endpoints(admin_me)
|
||||
except DiscoveryError as e:
|
||||
current_app.logger.error(f"Endpoint discovery failed: {e}")
|
||||
return None
|
||||
|
||||
token_endpoint = endpoints.get('token_endpoint')
|
||||
if not token_endpoint:
|
||||
current_app.logger.error("No token endpoint found in discovery")
|
||||
return None
|
||||
|
||||
# Verify token with discovered endpoint
|
||||
try:
|
||||
token_info = _verify_with_endpoint(token_endpoint, token)
|
||||
except TokenVerificationError as e:
|
||||
current_app.logger.warning(f"Token verification failed: {e}")
|
||||
return None
|
||||
|
||||
# Validate token belongs to admin (single-user security check)
|
||||
token_me = token_info.get('me', '')
|
||||
if normalize_url(token_me) != normalize_url(admin_me):
|
||||
current_app.logger.warning(
|
||||
f"Token 'me' mismatch: {token_me} != {admin_me}"
|
||||
)
|
||||
return None
|
||||
|
||||
# Cache successful verification
|
||||
_cache.set_token_info(token_hash, token_info)
|
||||
|
||||
current_app.logger.debug(f"Token verified successfully for {token_me}")
|
||||
return token_info
|
||||
|
||||
|
||||
def discover_endpoints(profile_url: str) -> Dict[str, str]:
|
||||
"""
|
||||
Discover IndieAuth endpoints from a profile URL
|
||||
|
||||
Implements IndieAuth endpoint discovery per W3C spec:
|
||||
https://www.w3.org/TR/indieauth/#discovery-by-clients
|
||||
|
||||
Discovery priority:
|
||||
1. HTTP Link headers (highest priority)
|
||||
2. HTML link elements
|
||||
|
||||
Args:
|
||||
profile_url: User's profile URL (their IndieWeb identity)
|
||||
|
||||
Returns:
|
||||
Dict with discovered endpoints:
|
||||
{
|
||||
'authorization_endpoint': 'https://...',
|
||||
'token_endpoint': 'https://...'
|
||||
}
|
||||
|
||||
current_app.logger.debug(
|
||||
f"Verifying token with external provider: {token_endpoint}"
|
||||
)
|
||||
Raises:
|
||||
DiscoveryError: If discovery fails or no endpoints found
|
||||
"""
|
||||
# Check cache first
|
||||
cached_endpoints = _cache.get_endpoints()
|
||||
if cached_endpoints:
|
||||
current_app.logger.debug("Endpoint discovery cache hit")
|
||||
return cached_endpoints
|
||||
|
||||
response = httpx.get(
|
||||
token_endpoint,
|
||||
headers=headers,
|
||||
timeout=5.0,
|
||||
follow_redirects=True,
|
||||
)
|
||||
# Validate profile URL
|
||||
_validate_profile_url(profile_url)
|
||||
|
||||
if response.status_code != 200:
|
||||
current_app.logger.warning(
|
||||
f"Token verification failed: HTTP {response.status_code}"
|
||||
)
|
||||
return None
|
||||
try:
|
||||
# Fetch profile with discovery
|
||||
endpoints = _fetch_and_parse(profile_url)
|
||||
|
||||
token_info = response.json()
|
||||
# Cache successful discovery
|
||||
_cache.set_endpoints(endpoints)
|
||||
|
||||
# Validate required fields
|
||||
if "me" not in token_info:
|
||||
current_app.logger.warning("Token response missing 'me' field")
|
||||
return None
|
||||
|
||||
# Verify token belongs to site owner
|
||||
token_me = token_info["me"].rstrip("/")
|
||||
expected_me = admin_me.rstrip("/")
|
||||
|
||||
if token_me != expected_me:
|
||||
current_app.logger.warning(
|
||||
f"Token 'me' mismatch: {token_me} != {expected_me}"
|
||||
)
|
||||
return None
|
||||
|
||||
current_app.logger.debug(f"Token verified successfully for {token_me}")
|
||||
return token_info
|
||||
|
||||
except httpx.TimeoutException:
|
||||
current_app.logger.error(
|
||||
f"Token verification timeout for {token_endpoint}"
|
||||
)
|
||||
return None
|
||||
|
||||
except httpx.RequestError as e:
|
||||
current_app.logger.error(
|
||||
f"Token verification request failed: {e}"
|
||||
)
|
||||
return None
|
||||
return endpoints
|
||||
|
||||
except Exception as e:
|
||||
current_app.logger.error(
|
||||
f"Unexpected error during token verification: {e}"
|
||||
# Check cache even if expired (grace period for network failures)
|
||||
cached = _cache.get_endpoints(ignore_expiry=True)
|
||||
if cached:
|
||||
current_app.logger.warning(
|
||||
f"Using expired cache due to discovery failure: {e}"
|
||||
)
|
||||
return cached
|
||||
|
||||
# No cache available, must fail
|
||||
raise DiscoveryError(f"Endpoint discovery failed: {e}")
|
||||
|
||||
|
||||
def _fetch_and_parse(profile_url: str) -> Dict[str, str]:
|
||||
"""
|
||||
Fetch profile URL and parse endpoints from headers and HTML
|
||||
|
||||
Args:
|
||||
profile_url: User's profile URL
|
||||
|
||||
Returns:
|
||||
Dict with discovered endpoints
|
||||
|
||||
Raises:
|
||||
DiscoveryError: If fetch fails or no endpoints found
|
||||
"""
|
||||
try:
|
||||
response = httpx.get(
|
||||
profile_url,
|
||||
timeout=DISCOVERY_TIMEOUT,
|
||||
follow_redirects=True,
|
||||
headers={
|
||||
'Accept': 'text/html,application/xhtml+xml',
|
||||
'User-Agent': f'StarPunk/{current_app.config.get("VERSION", "1.0")}'
|
||||
}
|
||||
)
|
||||
return None
|
||||
response.raise_for_status()
|
||||
|
||||
except httpx.TimeoutException:
|
||||
raise DiscoveryError(f"Timeout fetching profile: {profile_url}")
|
||||
except httpx.HTTPStatusError as e:
|
||||
raise DiscoveryError(f"HTTP {e.response.status_code} fetching profile")
|
||||
except httpx.RequestError as e:
|
||||
raise DiscoveryError(f"Network error fetching profile: {e}")
|
||||
|
||||
endpoints = {}
|
||||
|
||||
# 1. Parse HTTP Link headers (highest priority)
|
||||
link_header = response.headers.get('Link', '')
|
||||
if link_header:
|
||||
link_endpoints = _parse_link_header(link_header, profile_url)
|
||||
endpoints.update(link_endpoints)
|
||||
|
||||
# 2. Parse HTML link elements
|
||||
content_type = response.headers.get('Content-Type', '')
|
||||
if 'text/html' in content_type or 'application/xhtml+xml' in content_type:
|
||||
try:
|
||||
html_endpoints = _parse_html_links(response.text, profile_url)
|
||||
# Merge: Link headers take priority (so update HTML first)
|
||||
html_endpoints.update(endpoints)
|
||||
endpoints = html_endpoints
|
||||
except Exception as e:
|
||||
current_app.logger.warning(f"HTML parsing failed: {e}")
|
||||
# Continue with Link header endpoints if HTML parsing fails
|
||||
|
||||
# Validate we found required endpoints
|
||||
if 'token_endpoint' not in endpoints:
|
||||
raise DiscoveryError(
|
||||
f"No token endpoint found at {profile_url}. "
|
||||
"Ensure your profile has IndieAuth link elements or headers."
|
||||
)
|
||||
|
||||
# Validate endpoint URLs
|
||||
for rel, url in endpoints.items():
|
||||
_validate_endpoint_url(url, rel)
|
||||
|
||||
current_app.logger.info(
|
||||
f"Discovered endpoints from {profile_url}: "
|
||||
f"token={endpoints.get('token_endpoint')}, "
|
||||
f"auth={endpoints.get('authorization_endpoint')}"
|
||||
)
|
||||
|
||||
return endpoints
|
||||
|
||||
|
||||
def _parse_link_header(header: str, base_url: str) -> Dict[str, str]:
|
||||
"""
|
||||
Parse HTTP Link header for IndieAuth endpoints
|
||||
|
||||
Basic RFC 8288 support - handles simple Link headers.
|
||||
Limitations: Only supports quoted rel values, single Link headers.
|
||||
|
||||
Example:
|
||||
Link: <https://auth.example.com/token>; rel="token_endpoint"
|
||||
|
||||
Args:
|
||||
header: Link header value
|
||||
base_url: Base URL for resolving relative URLs
|
||||
|
||||
Returns:
|
||||
Dict with discovered endpoints
|
||||
"""
|
||||
endpoints = {}
|
||||
|
||||
# Pattern: <url>; rel="relation"
|
||||
# Note: Simplified - doesn't handle all RFC 8288 edge cases
|
||||
pattern = r'<([^>]+)>;\s*rel="([^"]+)"'
|
||||
matches = re.findall(pattern, header)
|
||||
|
||||
for url, rel in matches:
|
||||
if rel == 'authorization_endpoint':
|
||||
endpoints['authorization_endpoint'] = urljoin(base_url, url)
|
||||
elif rel == 'token_endpoint':
|
||||
endpoints['token_endpoint'] = urljoin(base_url, url)
|
||||
|
||||
return endpoints
|
||||
|
||||
|
||||
def _parse_html_links(html: str, base_url: str) -> Dict[str, str]:
|
||||
"""
|
||||
Extract IndieAuth endpoints from HTML link elements
|
||||
|
||||
Looks for:
|
||||
<link rel="authorization_endpoint" href="...">
|
||||
<link rel="token_endpoint" href="...">
|
||||
|
||||
Args:
|
||||
html: HTML content
|
||||
base_url: Base URL for resolving relative URLs
|
||||
|
||||
Returns:
|
||||
Dict with discovered endpoints
|
||||
"""
|
||||
endpoints = {}
|
||||
|
||||
try:
|
||||
soup = BeautifulSoup(html, 'html.parser')
|
||||
|
||||
# Find all link elements (check both head and body - be liberal)
|
||||
for link in soup.find_all('link', rel=True):
|
||||
rel = link.get('rel')
|
||||
href = link.get('href')
|
||||
|
||||
if not href:
|
||||
continue
|
||||
|
||||
# rel can be a list or string
|
||||
if isinstance(rel, list):
|
||||
rel = ' '.join(rel)
|
||||
|
||||
# Check for IndieAuth endpoints
|
||||
if 'authorization_endpoint' in rel:
|
||||
endpoints['authorization_endpoint'] = urljoin(base_url, href)
|
||||
elif 'token_endpoint' in rel:
|
||||
endpoints['token_endpoint'] = urljoin(base_url, href)
|
||||
|
||||
except Exception as e:
|
||||
current_app.logger.warning(f"HTML parsing error: {e}")
|
||||
# Return what we found so far
|
||||
|
||||
return endpoints
|
||||
|
||||
|
||||
def _verify_with_endpoint(endpoint: str, token: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Verify token with the discovered token endpoint
|
||||
|
||||
Makes GET request to endpoint with Authorization header.
|
||||
Implements retry logic for network errors only.
|
||||
|
||||
Args:
|
||||
endpoint: Token endpoint URL
|
||||
token: Bearer token to verify
|
||||
|
||||
Returns:
|
||||
Token info dict from endpoint
|
||||
|
||||
Raises:
|
||||
TokenVerificationError: If verification fails
|
||||
"""
|
||||
headers = {
|
||||
'Authorization': f'Bearer {token}',
|
||||
'Accept': 'application/json',
|
||||
}
|
||||
|
||||
max_retries = 3
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
response = httpx.get(
|
||||
endpoint,
|
||||
headers=headers,
|
||||
timeout=VERIFICATION_TIMEOUT,
|
||||
follow_redirects=True,
|
||||
)
|
||||
|
||||
# Handle HTTP status codes
|
||||
if response.status_code == 200:
|
||||
token_info = response.json()
|
||||
|
||||
# Validate required fields
|
||||
if 'me' not in token_info:
|
||||
raise TokenVerificationError("Token response missing 'me' field")
|
||||
|
||||
return token_info
|
||||
|
||||
# Client errors - don't retry
|
||||
elif response.status_code in [400, 401, 403, 404]:
|
||||
raise TokenVerificationError(
|
||||
f"Token verification failed: HTTP {response.status_code}"
|
||||
)
|
||||
|
||||
# Server errors - retry
|
||||
elif response.status_code in [500, 502, 503, 504]:
|
||||
if attempt < max_retries - 1:
|
||||
wait_time = 2 ** attempt # Exponential backoff
|
||||
current_app.logger.debug(
|
||||
f"Server error {response.status_code}, retrying in {wait_time}s..."
|
||||
)
|
||||
time.sleep(wait_time)
|
||||
continue
|
||||
else:
|
||||
raise TokenVerificationError(
|
||||
f"Token endpoint error: HTTP {response.status_code}"
|
||||
)
|
||||
|
||||
# Other status codes
|
||||
else:
|
||||
raise TokenVerificationError(
|
||||
f"Unexpected response: HTTP {response.status_code}"
|
||||
)
|
||||
|
||||
except httpx.TimeoutException:
|
||||
if attempt < max_retries - 1:
|
||||
wait_time = 2 ** attempt
|
||||
current_app.logger.debug(f"Timeout, retrying in {wait_time}s...")
|
||||
time.sleep(wait_time)
|
||||
continue
|
||||
else:
|
||||
raise TokenVerificationError("Token verification timeout")
|
||||
|
||||
except httpx.NetworkError as e:
|
||||
if attempt < max_retries - 1:
|
||||
wait_time = 2 ** attempt
|
||||
current_app.logger.debug(f"Network error, retrying in {wait_time}s...")
|
||||
time.sleep(wait_time)
|
||||
continue
|
||||
else:
|
||||
raise TokenVerificationError(f"Network error: {e}")
|
||||
|
||||
except Exception as e:
|
||||
# Don't retry for unexpected errors
|
||||
raise TokenVerificationError(f"Verification failed: {e}")
|
||||
|
||||
# Should never reach here, but just in case
|
||||
raise TokenVerificationError("Maximum retries exceeded")
|
||||
|
||||
|
||||
def _validate_profile_url(url: str) -> None:
|
||||
"""
|
||||
Validate profile URL format and security requirements
|
||||
|
||||
Args:
|
||||
url: Profile URL to validate
|
||||
|
||||
Raises:
|
||||
DiscoveryError: If URL is invalid or insecure
|
||||
"""
|
||||
parsed = urlparse(url)
|
||||
|
||||
# Must be absolute
|
||||
if not parsed.scheme or not parsed.netloc:
|
||||
raise DiscoveryError(f"Invalid profile URL format: {url}")
|
||||
|
||||
# HTTPS required in production
|
||||
if not current_app.debug and parsed.scheme != 'https':
|
||||
raise DiscoveryError(
|
||||
f"HTTPS required for profile URLs in production. Got: {url}"
|
||||
)
|
||||
|
||||
# Allow localhost only in debug mode
|
||||
if not current_app.debug and parsed.hostname in ['localhost', '127.0.0.1', '::1']:
|
||||
raise DiscoveryError(
|
||||
"Localhost URLs not allowed in production"
|
||||
)
|
||||
|
||||
|
||||
def _validate_endpoint_url(url: str, rel: str) -> None:
|
||||
"""
|
||||
Validate discovered endpoint URL
|
||||
|
||||
Args:
|
||||
url: Endpoint URL to validate
|
||||
rel: Endpoint relation (for error messages)
|
||||
|
||||
Raises:
|
||||
DiscoveryError: If URL is invalid or insecure
|
||||
"""
|
||||
parsed = urlparse(url)
|
||||
|
||||
# Must be absolute
|
||||
if not parsed.scheme or not parsed.netloc:
|
||||
raise DiscoveryError(f"Invalid {rel} URL format: {url}")
|
||||
|
||||
# HTTPS required in production
|
||||
if not current_app.debug and parsed.scheme != 'https':
|
||||
raise DiscoveryError(
|
||||
f"HTTPS required for {rel} in production. Got: {url}"
|
||||
)
|
||||
|
||||
# Allow localhost only in debug mode
|
||||
if not current_app.debug and parsed.hostname in ['localhost', '127.0.0.1', '::1']:
|
||||
raise DiscoveryError(
|
||||
f"Localhost not allowed for {rel} in production"
|
||||
)
|
||||
|
||||
|
||||
def normalize_url(url: str) -> str:
|
||||
"""
|
||||
Normalize URL for comparison
|
||||
|
||||
Removes trailing slash and converts to lowercase.
|
||||
Used only for comparison, not for storage.
|
||||
|
||||
Args:
|
||||
url: URL to normalize
|
||||
|
||||
Returns:
|
||||
Normalized URL
|
||||
"""
|
||||
return url.rstrip('/').lower()
|
||||
|
||||
|
||||
def _hash_token(token: str) -> str:
|
||||
"""
|
||||
Hash token for secure caching
|
||||
|
||||
Uses SHA-256 to prevent tokens from appearing in logs
|
||||
and to create fixed-length cache keys.
|
||||
|
||||
Args:
|
||||
token: Bearer token
|
||||
|
||||
Returns:
|
||||
SHA-256 hash of token (hex)
|
||||
"""
|
||||
return hashlib.sha256(token.encode()).hexdigest()
|
||||
|
||||
|
||||
def check_scope(required_scope: str, token_scope: str) -> bool:
|
||||
|
||||
Reference in New Issue
Block a user