Files
StarPunk/starpunk/auth.py
Phil Skentelbery 5e50330bdf feat: Implement PKCE authentication for IndieLogin.com
This fixes critical IndieAuth authentication by implementing PKCE (Proof Key
for Code Exchange) as required by IndieLogin.com API specification.

Added:
- PKCE code_verifier and code_challenge generation (RFC 7636)
- Database column: auth_state.code_verifier for PKCE support
- Issuer validation for authentication callbacks
- Comprehensive PKCE unit tests (6 tests, all passing)
- Database migration script for code_verifier column

Changed:
- Corrected IndieLogin.com API endpoints (/authorize and /token)
- State token validation now returns code_verifier for token exchange
- Authentication flow follows IndieLogin.com API specification exactly
- Enhanced logging with code_verifier redaction

Removed:
- OAuth metadata endpoint (/.well-known/oauth-authorization-server)
  Added in v0.7.0 but not required by IndieLogin.com
- h-app microformats markup from templates
  Modified in v0.7.1 but not used by IndieLogin.com
- indieauth-metadata link from HTML head

Security:
- PKCE prevents authorization code interception attacks
- Issuer validation prevents token substitution attacks
- Code verifier securely stored, redacted in logs, and single-use

Documentation:
- Version: 0.8.0
- CHANGELOG updated with v0.8.0 entry and v0.7.x notes
- ADR-016 and ADR-017 marked as superseded by ADR-019
- Implementation report created in docs/reports/
- Test update guide created in TODO_TEST_UPDATES.md

Breaking Changes:
- Users mid-authentication will need to restart login after upgrade
- Database migration required before deployment

Related: ADR-019

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-19 15:43:38 -07:00

640 lines
17 KiB
Python

"""
Authentication module for StarPunk
Implements IndieLogin authentication for admin access using indielogin.com
as a delegated authentication provider. No passwords are stored locally.
Security features:
- CSRF protection via state tokens
- Secure session tokens (cryptographically random)
- Token hashing in database (SHA-256)
- HttpOnly, Secure, SameSite cookies
- Single-admin authorization
- Automatic session cleanup
Functions:
initiate_login: Start IndieLogin authentication flow
handle_callback: Process IndieLogin callback
create_session: Create authenticated session
verify_session: Check if session is valid
destroy_session: Logout and cleanup
require_auth: Decorator for protected routes
Exceptions:
AuthError: Base authentication exception
InvalidStateError: CSRF state validation failed
UnauthorizedError: User not authorized as admin
IndieLoginError: External service error
"""
import base64
import hashlib
import logging
import secrets
from datetime import datetime, timedelta
from functools import wraps
from typing import Any, Dict, Optional
from urllib.parse import urlencode
import httpx
from flask import current_app, g, redirect, request, session, url_for
from starpunk.database import get_db
from starpunk.utils import is_valid_url
# Custom exceptions
class AuthError(Exception):
"""Base exception for authentication errors"""
pass
class InvalidStateError(AuthError):
"""CSRF state validation failed"""
pass
class UnauthorizedError(AuthError):
"""User not authorized as admin"""
pass
class IndieLoginError(AuthError):
"""IndieLogin service error"""
pass
# PKCE helper functions
def _generate_pkce_verifier() -> str:
"""
Generate PKCE code_verifier.
Creates a cryptographically random 43-character URL-safe string
as required by PKCE specification (RFC 7636).
Returns:
URL-safe base64-encoded random string (43 characters)
"""
# Generate 32 random bytes = 43 chars when base64-url encoded
verifier = secrets.token_urlsafe(32)
return verifier
def _generate_pkce_challenge(verifier: str) -> str:
"""
Generate PKCE code_challenge from code_verifier.
Creates SHA256 hash of verifier and encodes as base64-url string
per RFC 7636 S256 method.
Args:
verifier: The code_verifier string from _generate_pkce_verifier()
Returns:
Base64-URL encoded SHA256 hash (43 characters)
"""
# SHA256 hash the verifier
digest = hashlib.sha256(verifier.encode('utf-8')).digest()
# Base64-URL encode (no padding)
challenge = base64.urlsafe_b64encode(digest).decode('utf-8').rstrip('=')
return challenge
# Logging helper functions
def _redact_token(value: str, show_chars: int = 6) -> str:
"""
Redact sensitive token for logging
Shows first N and last 4 characters with asterisks in between.
Args:
value: Token to redact
show_chars: Number of characters to show at start (default: 6)
Returns:
Redacted token string like "abc123...********...xyz9"
"""
if not value or len(value) <= (show_chars + 4):
return "***REDACTED***"
return f"{value[:show_chars]}...{'*' * 8}...{value[-4:]}"
def _log_http_request(method: str, url: str, data: dict, headers: dict = None) -> None:
"""
Log HTTP request details at DEBUG level
Automatically redacts sensitive parameters (code, state, authorization)
Args:
method: HTTP method (GET, POST, etc.)
url: Request URL
data: Request data/parameters
headers: Optional request headers
"""
if not current_app.logger.isEnabledFor(logging.DEBUG):
return
# Redact sensitive data
safe_data = data.copy()
if "code" in safe_data:
safe_data["code"] = _redact_token(safe_data["code"])
if "state" in safe_data:
safe_data["state"] = _redact_token(safe_data["state"], 8)
if "code_verifier" in safe_data:
safe_data["code_verifier"] = _redact_token(safe_data["code_verifier"])
current_app.logger.debug(
f"IndieAuth HTTP Request:\n"
f" Method: {method}\n"
f" URL: {url}\n"
f" Data: {safe_data}"
)
if headers:
safe_headers = {
k: v
for k, v in headers.items()
if k.lower() not in ["authorization", "cookie"]
}
current_app.logger.debug(f" Headers: {safe_headers}")
def _log_http_response(status_code: int, headers: dict, body: str) -> None:
"""
Log HTTP response details at DEBUG level
Automatically redacts sensitive response data
Args:
status_code: HTTP status code
headers: Response headers
body: Response body (JSON string or text)
"""
if not current_app.logger.isEnabledFor(logging.DEBUG):
return
# Parse and redact JSON body if present
safe_body = body
try:
import json
data = json.loads(body)
if "access_token" in data:
data["access_token"] = _redact_token(data["access_token"])
if "code" in data:
data["code"] = _redact_token(data["code"])
safe_body = json.dumps(data, indent=2)
except (json.JSONDecodeError, TypeError):
# Not JSON or parsing failed, log as-is (likely error message)
pass
# Redact sensitive headers
safe_headers = {
k: v for k, v in headers.items() if k.lower() not in ["set-cookie", "authorization"]
}
current_app.logger.debug(
f"IndieAuth HTTP Response:\n"
f" Status: {status_code}\n"
f" Headers: {safe_headers}\n"
f" Body: {safe_body}"
)
# Helper functions
def _hash_token(token: str) -> str:
"""
Hash token using SHA-256
Args:
token: Token to hash
Returns:
Hexadecimal hash string
"""
return hashlib.sha256(token.encode()).hexdigest()
def _generate_state_token() -> str:
"""
Generate CSRF state token
Returns:
URL-safe random token
"""
return secrets.token_urlsafe(32)
def _verify_state_token(state: str) -> Optional[str]:
"""
Verify and consume CSRF state token, returning code_verifier.
Args:
state: State token to verify
Returns:
code_verifier string if valid, None if invalid or expired
"""
db = get_db(current_app)
# Check if state exists and not expired, retrieve code_verifier
result = db.execute(
"""
SELECT code_verifier FROM auth_state
WHERE state = ? AND expires_at > datetime('now')
""",
(state,),
).fetchone()
if not result:
return None
code_verifier = result['code_verifier']
# Delete state (single-use)
db.execute("DELETE FROM auth_state WHERE state = ?", (state,))
db.commit()
return code_verifier
def _cleanup_expired_sessions() -> None:
"""Remove expired sessions and state tokens"""
db = get_db(current_app)
# Delete expired sessions
db.execute(
"""
DELETE FROM sessions
WHERE expires_at <= datetime('now')
"""
)
# Delete expired state tokens
db.execute(
"""
DELETE FROM auth_state
WHERE expires_at <= datetime('now')
"""
)
db.commit()
# Core authentication functions
def initiate_login(me_url: str) -> str:
"""
Initiate IndieLogin authentication flow with PKCE.
Args:
me_url: User's IndieWeb identity URL
Returns:
Redirect URL to IndieLogin.com
Raises:
ValueError: Invalid me_url format
"""
# Validate URL format
if not is_valid_url(me_url):
raise ValueError(f"Invalid URL format: {me_url}")
current_app.logger.debug(f"Auth: Validating me URL: {me_url}")
# Generate CSRF state token
state = _generate_state_token()
current_app.logger.debug(f"Auth: Generated state token: {_redact_token(state, 8)}")
# Generate PKCE verifier and challenge
code_verifier = _generate_pkce_verifier()
code_challenge = _generate_pkce_challenge(code_verifier)
current_app.logger.debug(
f"Auth: Generated PKCE pair:\n"
f" verifier: {_redact_token(code_verifier)}\n"
f" challenge: {_redact_token(code_challenge)}"
)
# Store state and verifier in database (5-minute expiry)
db = get_db(current_app)
expires_at = datetime.utcnow() + timedelta(minutes=5)
redirect_uri = f"{current_app.config['SITE_URL']}/auth/callback"
db.execute(
"""
INSERT INTO auth_state (state, code_verifier, expires_at, redirect_uri)
VALUES (?, ?, ?, ?)
""",
(state, code_verifier, expires_at, redirect_uri),
)
db.commit()
# Build IndieLogin authorization URL with PKCE
params = {
"me": me_url,
"client_id": current_app.config["SITE_URL"],
"redirect_uri": redirect_uri,
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
}
current_app.logger.debug(
f"Auth: Building authorization URL with params:\n"
f" me: {me_url}\n"
f" client_id: {current_app.config['SITE_URL']}\n"
f" redirect_uri: {redirect_uri}\n"
f" state: {_redact_token(state, 8)}\n"
f" code_challenge: {_redact_token(code_challenge)}\n"
f" code_challenge_method: S256"
)
# CORRECT ENDPOINT: /authorize (not /auth)
auth_url = f"{current_app.config['INDIELOGIN_URL']}/authorize?{urlencode(params)}"
current_app.logger.info(f"Auth: Authentication initiated for {me_url}")
return auth_url
def handle_callback(code: str, state: str, iss: Optional[str] = None) -> Optional[str]:
"""
Handle IndieLogin callback with PKCE verification.
Args:
code: Authorization code from IndieLogin
state: CSRF state token
iss: Issuer identifier (should be https://indielogin.com/)
Returns:
Session token if successful, None otherwise
Raises:
InvalidStateError: State token validation failed
UnauthorizedError: User not authorized as admin
IndieLoginError: Code exchange failed
"""
current_app.logger.debug(f"Auth: Verifying state token: {_redact_token(state, 8)}")
# Verify state token and retrieve code_verifier (CSRF protection)
code_verifier = _verify_state_token(state)
if not code_verifier:
current_app.logger.warning(
"Auth: Invalid state token received (possible CSRF or expired token)"
)
raise InvalidStateError("Invalid or expired state token")
current_app.logger.debug("Auth: State token valid, code_verifier retrieved")
# Verify issuer (security check)
expected_iss = f"{current_app.config['INDIELOGIN_URL']}/"
if iss and iss != expected_iss:
current_app.logger.warning(
f"Auth: Invalid issuer received: {iss} (expected {expected_iss})"
)
raise IndieLoginError(f"Invalid issuer: {iss}")
current_app.logger.debug(f"Auth: Issuer verified: {iss}")
# Prepare token exchange request with PKCE verifier
token_exchange_data = {
"code": code,
"client_id": current_app.config["SITE_URL"],
"redirect_uri": f"{current_app.config['SITE_URL']}/auth/callback",
"code_verifier": code_verifier, # PKCE verification
}
# Log the request (code_verifier will be redacted)
_log_http_request(
method="POST",
url=f"{current_app.config['INDIELOGIN_URL']}/token",
data=token_exchange_data,
)
# Exchange code for identity (CORRECT ENDPOINT: /token)
try:
response = httpx.post(
f"{current_app.config['INDIELOGIN_URL']}/token",
data=token_exchange_data,
timeout=10.0,
)
# Log the response
_log_http_response(
status_code=response.status_code,
headers=dict(response.headers),
body=response.text,
)
response.raise_for_status()
except httpx.RequestError as e:
current_app.logger.error(f"Auth: IndieLogin request failed: {e}")
raise IndieLoginError(f"Failed to verify code: {e}")
except httpx.HTTPStatusError as e:
current_app.logger.error(
f"Auth: IndieLogin returned error: {e.response.status_code} - {e.response.text}"
)
raise IndieLoginError(
f"IndieLogin returned error: {e.response.status_code}"
)
# Parse response
try:
data = response.json()
except Exception as e:
current_app.logger.error(f"Auth: Failed to parse IndieLogin response: {e}")
raise IndieLoginError("Invalid JSON response from IndieLogin")
me = data.get("me")
if not me:
current_app.logger.error("Auth: No identity returned from IndieLogin")
raise IndieLoginError("No identity returned from IndieLogin")
current_app.logger.debug(f"Auth: Received identity from IndieLogin: {me}")
# Verify this is the admin user
admin_me = current_app.config.get("ADMIN_ME")
if not admin_me:
current_app.logger.error("Auth: ADMIN_ME not configured")
raise UnauthorizedError("Admin user not configured")
current_app.logger.info(f"Auth: Verifying admin authorization for me={me}")
if me != admin_me:
current_app.logger.warning(
f"Auth: Unauthorized login attempt: {me} (expected {admin_me})"
)
raise UnauthorizedError(f"User {me} is not authorized")
current_app.logger.debug("Auth: Admin verification passed")
# Create session
session_token = create_session(me)
return session_token
def create_session(me: str) -> str:
"""
Create authenticated session
Args:
me: Verified user identity URL
Returns:
Session token (plaintext, to be sent as cookie)
"""
# Generate secure token
session_token = secrets.token_urlsafe(32)
token_hash = _hash_token(session_token)
current_app.logger.debug("Auth: Session token generated (hash will be stored)")
# Calculate expiry (use configured session lifetime or default to 30 days)
session_lifetime = current_app.config.get("SESSION_LIFETIME", 30)
expires_at = datetime.utcnow() + timedelta(days=session_lifetime)
current_app.logger.debug(f"Auth: Session expiry: {expires_at} ({session_lifetime} days)")
# Get request metadata
user_agent = request.headers.get("User-Agent", "")[:200]
ip_address = request.remote_addr
current_app.logger.debug(f"Auth: Request metadata - IP: {ip_address}, User-Agent: {user_agent[:50]}...")
# Store in database
db = get_db(current_app)
db.execute(
"""
INSERT INTO sessions
(session_token_hash, me, expires_at, user_agent, ip_address)
VALUES (?, ?, ?, ?, ?)
""",
(token_hash, me, expires_at, user_agent, ip_address),
)
db.commit()
# Cleanup expired sessions
_cleanup_expired_sessions()
# Log session creation
current_app.logger.info(f"Auth: Session created for {me}")
return session_token
def verify_session(token: str) -> Optional[Dict[str, Any]]:
"""
Verify session token
Args:
token: Session token from cookie
Returns:
Session info dict if valid, None otherwise
"""
if not token:
current_app.logger.debug("Auth: No session token provided")
return None
current_app.logger.debug(f"Auth: Verifying session token: {_redact_token(token)}")
token_hash = _hash_token(token)
db = get_db(current_app)
session_data = db.execute(
"""
SELECT id, me, created_at, expires_at, last_used_at
FROM sessions
WHERE session_token_hash = ?
AND expires_at > datetime('now')
""",
(token_hash,),
).fetchone()
if not session_data:
current_app.logger.debug("Auth: Session token invalid or expired")
return None
current_app.logger.debug(f"Auth: Session verified for {session_data['me']}")
# Update last_used_at for activity tracking
db.execute(
"""
UPDATE sessions
SET last_used_at = datetime('now')
WHERE id = ?
""",
(session_data["id"],),
)
db.commit()
return {
"me": session_data["me"],
"created_at": session_data["created_at"],
"expires_at": session_data["expires_at"],
}
def destroy_session(token: str) -> None:
"""
Destroy session (logout)
Args:
token: Session token to destroy
"""
if not token:
return
token_hash = _hash_token(token)
db = get_db(current_app)
db.execute(
"""
DELETE FROM sessions
WHERE session_token_hash = ?
""",
(token_hash,),
)
db.commit()
current_app.logger.info("Session destroyed")
def require_auth(f):
"""
Decorator to require authentication for a route
Usage:
@app.route('/admin')
@require_auth
def admin_dashboard():
return render_template('admin/dashboard.html')
"""
@wraps(f)
def decorated_function(*args, **kwargs):
# Get session token from cookie
session_token = request.cookies.get("starpunk_session")
# Verify session
session_info = verify_session(session_token)
if not session_info:
# Store intended destination
session["next"] = request.url
return redirect(url_for("auth.login_form"))
# Store user info in g for use in views
g.user = session_info
g.me = session_info["me"]
return f(*args, **kwargs)
return decorated_function