IndieAuth authentication-only flows should redeem the code at the authorization endpoint, not the token endpoint. The token endpoint is only for authorization flows that need access tokens. - Remove grant_type parameter (only needed for token flows) - Change endpoint from /token to /authorize - Update debug logging to reflect code verification flow 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
678 lines
19 KiB
Python
678 lines
19 KiB
Python
"""
|
|
Authentication module for StarPunk
|
|
|
|
Implements IndieLogin authentication for admin access using indielogin.com
|
|
as a delegated authentication provider. No passwords are stored locally.
|
|
|
|
Security features:
|
|
- CSRF protection via state tokens
|
|
- Secure session tokens (cryptographically random)
|
|
- Token hashing in database (SHA-256)
|
|
- HttpOnly, Secure, SameSite cookies
|
|
- Single-admin authorization
|
|
- Automatic session cleanup
|
|
|
|
Functions:
|
|
initiate_login: Start IndieLogin authentication flow
|
|
handle_callback: Process IndieLogin callback
|
|
create_session: Create authenticated session
|
|
verify_session: Check if session is valid
|
|
destroy_session: Logout and cleanup
|
|
require_auth: Decorator for protected routes
|
|
|
|
Exceptions:
|
|
AuthError: Base authentication exception
|
|
InvalidStateError: CSRF state validation failed
|
|
UnauthorizedError: User not authorized as admin
|
|
IndieLoginError: External service error
|
|
"""
|
|
|
|
import base64
|
|
import hashlib
|
|
import logging
|
|
import secrets
|
|
from datetime import datetime, timedelta
|
|
from functools import wraps
|
|
from typing import Any, Dict, Optional
|
|
from urllib.parse import urlencode
|
|
|
|
import httpx
|
|
from flask import current_app, g, redirect, request, session, url_for
|
|
|
|
from starpunk.database import get_db
|
|
from starpunk.utils import is_valid_url
|
|
|
|
|
|
# Custom exceptions
|
|
class AuthError(Exception):
|
|
"""Base exception for authentication errors"""
|
|
|
|
pass
|
|
|
|
|
|
class InvalidStateError(AuthError):
|
|
"""CSRF state validation failed"""
|
|
|
|
pass
|
|
|
|
|
|
class UnauthorizedError(AuthError):
|
|
"""User not authorized as admin"""
|
|
|
|
pass
|
|
|
|
|
|
class IndieLoginError(AuthError):
|
|
"""IndieLogin service error"""
|
|
|
|
pass
|
|
|
|
|
|
# PKCE helper functions
|
|
def _generate_pkce_verifier() -> str:
|
|
"""
|
|
Generate PKCE code_verifier.
|
|
|
|
Creates a cryptographically random 43-character URL-safe string
|
|
as required by PKCE specification (RFC 7636).
|
|
|
|
Returns:
|
|
URL-safe base64-encoded random string (43 characters)
|
|
"""
|
|
# Generate 32 random bytes = 43 chars when base64-url encoded
|
|
verifier = secrets.token_urlsafe(32)
|
|
return verifier
|
|
|
|
|
|
def _generate_pkce_challenge(verifier: str) -> str:
|
|
"""
|
|
Generate PKCE code_challenge from code_verifier.
|
|
|
|
Creates SHA256 hash of verifier and encodes as base64-url string
|
|
per RFC 7636 S256 method.
|
|
|
|
Args:
|
|
verifier: The code_verifier string from _generate_pkce_verifier()
|
|
|
|
Returns:
|
|
Base64-URL encoded SHA256 hash (43 characters)
|
|
"""
|
|
# SHA256 hash the verifier
|
|
digest = hashlib.sha256(verifier.encode('utf-8')).digest()
|
|
# Base64-URL encode (no padding)
|
|
challenge = base64.urlsafe_b64encode(digest).decode('utf-8').rstrip('=')
|
|
return challenge
|
|
|
|
|
|
# Logging helper functions
|
|
def _redact_token(value: str, show_chars: int = 6) -> str:
|
|
"""
|
|
Redact sensitive token for logging
|
|
|
|
Shows first N and last 4 characters with asterisks in between.
|
|
|
|
Args:
|
|
value: Token to redact
|
|
show_chars: Number of characters to show at start (default: 6)
|
|
|
|
Returns:
|
|
Redacted token string like "abc123...********...xyz9"
|
|
"""
|
|
if not value or len(value) <= (show_chars + 4):
|
|
return "***REDACTED***"
|
|
|
|
return f"{value[:show_chars]}...{'*' * 8}...{value[-4:]}"
|
|
|
|
|
|
def _log_http_request(method: str, url: str, data: dict, headers: dict = None) -> None:
|
|
"""
|
|
Log HTTP request details at DEBUG level
|
|
|
|
Automatically redacts sensitive parameters (code, state, authorization)
|
|
|
|
Args:
|
|
method: HTTP method (GET, POST, etc.)
|
|
url: Request URL
|
|
data: Request data/parameters
|
|
headers: Optional request headers
|
|
"""
|
|
if not current_app.logger.isEnabledFor(logging.DEBUG):
|
|
return
|
|
|
|
# Redact sensitive data
|
|
safe_data = data.copy()
|
|
if "code" in safe_data:
|
|
safe_data["code"] = _redact_token(safe_data["code"])
|
|
if "state" in safe_data:
|
|
safe_data["state"] = _redact_token(safe_data["state"], 8)
|
|
if "code_verifier" in safe_data:
|
|
safe_data["code_verifier"] = _redact_token(safe_data["code_verifier"])
|
|
|
|
current_app.logger.debug(
|
|
f"IndieAuth HTTP Request:\n"
|
|
f" Method: {method}\n"
|
|
f" URL: {url}\n"
|
|
f" Data: {safe_data}"
|
|
)
|
|
|
|
if headers:
|
|
safe_headers = {
|
|
k: v
|
|
for k, v in headers.items()
|
|
if k.lower() not in ["authorization", "cookie"]
|
|
}
|
|
current_app.logger.debug(f" Headers: {safe_headers}")
|
|
|
|
|
|
def _log_http_response(status_code: int, headers: dict, body: str) -> None:
|
|
"""
|
|
Log HTTP response details at DEBUG level
|
|
|
|
Automatically redacts sensitive response data
|
|
|
|
Args:
|
|
status_code: HTTP status code
|
|
headers: Response headers
|
|
body: Response body (JSON string or text)
|
|
"""
|
|
if not current_app.logger.isEnabledFor(logging.DEBUG):
|
|
return
|
|
|
|
# Parse and redact JSON body if present
|
|
safe_body = body
|
|
try:
|
|
import json
|
|
|
|
data = json.loads(body)
|
|
if "access_token" in data:
|
|
data["access_token"] = _redact_token(data["access_token"])
|
|
if "code" in data:
|
|
data["code"] = _redact_token(data["code"])
|
|
safe_body = json.dumps(data, indent=2)
|
|
except (json.JSONDecodeError, TypeError):
|
|
# Not JSON or parsing failed, log as-is (likely error message)
|
|
pass
|
|
|
|
# Redact sensitive headers
|
|
safe_headers = {
|
|
k: v for k, v in headers.items() if k.lower() not in ["set-cookie", "authorization"]
|
|
}
|
|
|
|
current_app.logger.debug(
|
|
f"IndieAuth HTTP Response:\n"
|
|
f" Status: {status_code}\n"
|
|
f" Headers: {safe_headers}\n"
|
|
f" Body: {safe_body}"
|
|
)
|
|
|
|
|
|
# Helper functions
|
|
def _hash_token(token: str) -> str:
|
|
"""
|
|
Hash token using SHA-256
|
|
|
|
Args:
|
|
token: Token to hash
|
|
|
|
Returns:
|
|
Hexadecimal hash string
|
|
"""
|
|
return hashlib.sha256(token.encode()).hexdigest()
|
|
|
|
|
|
def _generate_state_token() -> str:
|
|
"""
|
|
Generate CSRF state token
|
|
|
|
Returns:
|
|
URL-safe random token
|
|
"""
|
|
return secrets.token_urlsafe(32)
|
|
|
|
|
|
def _verify_state_token(state: str) -> Optional[str]:
|
|
"""
|
|
Verify and consume CSRF state token, returning code_verifier.
|
|
|
|
Args:
|
|
state: State token to verify
|
|
|
|
Returns:
|
|
code_verifier string if valid, None if invalid or expired
|
|
"""
|
|
db = get_db(current_app)
|
|
|
|
# Check if state exists and not expired, retrieve code_verifier
|
|
result = db.execute(
|
|
"""
|
|
SELECT code_verifier FROM auth_state
|
|
WHERE state = ? AND expires_at > datetime('now')
|
|
""",
|
|
(state,),
|
|
).fetchone()
|
|
|
|
if not result:
|
|
return None
|
|
|
|
code_verifier = result['code_verifier']
|
|
|
|
# Delete state (single-use)
|
|
db.execute("DELETE FROM auth_state WHERE state = ?", (state,))
|
|
db.commit()
|
|
|
|
return code_verifier
|
|
|
|
|
|
def _cleanup_expired_sessions() -> None:
|
|
"""Remove expired sessions and state tokens"""
|
|
db = get_db(current_app)
|
|
|
|
# Delete expired sessions
|
|
db.execute(
|
|
"""
|
|
DELETE FROM sessions
|
|
WHERE expires_at <= datetime('now')
|
|
"""
|
|
)
|
|
|
|
# Delete expired state tokens
|
|
db.execute(
|
|
"""
|
|
DELETE FROM auth_state
|
|
WHERE expires_at <= datetime('now')
|
|
"""
|
|
)
|
|
|
|
db.commit()
|
|
|
|
|
|
# Core authentication functions
|
|
def initiate_login(me_url: str) -> str:
|
|
"""
|
|
Initiate IndieLogin authentication flow with PKCE.
|
|
|
|
Args:
|
|
me_url: User's IndieWeb identity URL
|
|
|
|
Returns:
|
|
Redirect URL to IndieLogin.com
|
|
|
|
Raises:
|
|
ValueError: Invalid me_url format
|
|
"""
|
|
# Validate URL format
|
|
if not is_valid_url(me_url):
|
|
raise ValueError(f"Invalid URL format: {me_url}")
|
|
|
|
current_app.logger.debug(f"Auth: Validating me URL: {me_url}")
|
|
|
|
# Generate CSRF state token
|
|
state = _generate_state_token()
|
|
current_app.logger.debug(f"Auth: Generated state token: {_redact_token(state, 8)}")
|
|
|
|
# Generate PKCE verifier and challenge
|
|
code_verifier = _generate_pkce_verifier()
|
|
code_challenge = _generate_pkce_challenge(code_verifier)
|
|
current_app.logger.debug(
|
|
f"Auth: Generated PKCE pair:\n"
|
|
f" verifier: {_redact_token(code_verifier)}\n"
|
|
f" challenge: {_redact_token(code_challenge)}"
|
|
)
|
|
|
|
# Store state and verifier in database (5-minute expiry)
|
|
db = get_db(current_app)
|
|
expires_at = datetime.utcnow() + timedelta(minutes=5)
|
|
redirect_uri = f"{current_app.config['SITE_URL']}auth/callback"
|
|
|
|
db.execute(
|
|
"""
|
|
INSERT INTO auth_state (state, code_verifier, expires_at, redirect_uri)
|
|
VALUES (?, ?, ?, ?)
|
|
""",
|
|
(state, code_verifier, expires_at, redirect_uri),
|
|
)
|
|
db.commit()
|
|
|
|
# Build IndieLogin authorization URL with PKCE
|
|
params = {
|
|
"me": me_url,
|
|
"client_id": current_app.config["SITE_URL"],
|
|
"redirect_uri": redirect_uri,
|
|
"state": state,
|
|
"code_challenge": code_challenge,
|
|
"code_challenge_method": "S256",
|
|
}
|
|
|
|
current_app.logger.debug(
|
|
f"Auth: Building authorization URL with params:\n"
|
|
f" me: {me_url}\n"
|
|
f" client_id: {current_app.config['SITE_URL']}\n"
|
|
f" redirect_uri: {redirect_uri}\n"
|
|
f" state: {_redact_token(state, 8)}\n"
|
|
f" code_challenge: {_redact_token(code_challenge)}\n"
|
|
f" code_challenge_method: S256"
|
|
)
|
|
|
|
# CORRECT ENDPOINT: /authorize (not /auth)
|
|
auth_url = f"{current_app.config['INDIELOGIN_URL']}/authorize?{urlencode(params)}"
|
|
|
|
# Log the complete authorization URL for debugging
|
|
current_app.logger.debug(
|
|
"Auth: Complete authorization URL (GET request):\n"
|
|
" %s",
|
|
auth_url
|
|
)
|
|
|
|
current_app.logger.info(f"Auth: Authentication initiated for {me_url}")
|
|
|
|
return auth_url
|
|
|
|
|
|
def handle_callback(code: str, state: str, iss: Optional[str] = None) -> Optional[str]:
|
|
"""
|
|
Handle IndieLogin callback with PKCE verification.
|
|
|
|
Args:
|
|
code: Authorization code from IndieLogin
|
|
state: CSRF state token
|
|
iss: Issuer identifier (should be https://indielogin.com/)
|
|
|
|
Returns:
|
|
Session token if successful, None otherwise
|
|
|
|
Raises:
|
|
InvalidStateError: State token validation failed
|
|
UnauthorizedError: User not authorized as admin
|
|
IndieLoginError: Code exchange failed
|
|
"""
|
|
current_app.logger.debug(f"Auth: Verifying state token: {_redact_token(state, 8)}")
|
|
|
|
# Verify state token and retrieve code_verifier (CSRF protection)
|
|
code_verifier = _verify_state_token(state)
|
|
if not code_verifier:
|
|
current_app.logger.warning(
|
|
"Auth: Invalid state token received (possible CSRF or expired token)"
|
|
)
|
|
raise InvalidStateError("Invalid or expired state token")
|
|
|
|
current_app.logger.debug("Auth: State token valid, code_verifier retrieved")
|
|
|
|
# Verify issuer (security check)
|
|
expected_iss = f"{current_app.config['INDIELOGIN_URL']}/"
|
|
if iss and iss != expected_iss:
|
|
current_app.logger.warning(
|
|
f"Auth: Invalid issuer received: {iss} (expected {expected_iss})"
|
|
)
|
|
raise IndieLoginError(f"Invalid issuer: {iss}")
|
|
|
|
current_app.logger.debug(f"Auth: Issuer verified: {iss}")
|
|
|
|
# Prepare code verification request with PKCE verifier
|
|
# Note: For authentication-only flows (identity verification), we use the
|
|
# authorization endpoint, not the token endpoint. grant_type is not needed.
|
|
# See IndieAuth spec: authorization endpoint for authentication,
|
|
# token endpoint for access tokens.
|
|
token_exchange_data = {
|
|
"code": code,
|
|
"client_id": current_app.config["SITE_URL"],
|
|
"redirect_uri": f"{current_app.config['SITE_URL']}auth/callback",
|
|
"code_verifier": code_verifier, # PKCE verification
|
|
}
|
|
|
|
# Use authorization endpoint for authentication-only flow (identity verification)
|
|
token_url = f"{current_app.config['INDIELOGIN_URL']}/authorize"
|
|
|
|
# Log the request (code_verifier will be redacted)
|
|
_log_http_request(
|
|
method="POST",
|
|
url=token_url,
|
|
data=token_exchange_data,
|
|
)
|
|
|
|
# Log detailed httpx request info for debugging
|
|
current_app.logger.debug(
|
|
"Auth: Sending code verification request to authorization endpoint:\n"
|
|
" Method: POST\n"
|
|
" URL: %s\n"
|
|
" Data: code=%s, client_id=%s, redirect_uri=%s, code_verifier=%s",
|
|
token_url,
|
|
_redact_token(code),
|
|
token_exchange_data["client_id"],
|
|
token_exchange_data["redirect_uri"],
|
|
_redact_token(code_verifier),
|
|
)
|
|
|
|
# Exchange code for identity at authorization endpoint (authentication-only flow)
|
|
try:
|
|
response = httpx.post(
|
|
token_url,
|
|
data=token_exchange_data,
|
|
timeout=10.0,
|
|
)
|
|
|
|
# Log detailed httpx response info for debugging
|
|
current_app.logger.debug(
|
|
"Auth: Received code verification response:\n"
|
|
" Status: %d\n"
|
|
" Headers: %s\n"
|
|
" Body: %s",
|
|
response.status_code,
|
|
{k: v for k, v in dict(response.headers).items() if k.lower() not in ["set-cookie", "authorization"]},
|
|
_redact_token(response.text) if response.text else "(empty)",
|
|
)
|
|
|
|
# Log the response (legacy helper)
|
|
_log_http_response(
|
|
status_code=response.status_code,
|
|
headers=dict(response.headers),
|
|
body=response.text,
|
|
)
|
|
|
|
response.raise_for_status()
|
|
except httpx.RequestError as e:
|
|
current_app.logger.error(f"Auth: IndieLogin request failed: {e}")
|
|
raise IndieLoginError(f"Failed to verify code: {e}")
|
|
except httpx.HTTPStatusError as e:
|
|
current_app.logger.error(
|
|
f"Auth: IndieLogin returned error: {e.response.status_code} - {e.response.text}"
|
|
)
|
|
raise IndieLoginError(
|
|
f"IndieLogin returned error: {e.response.status_code}"
|
|
)
|
|
|
|
# Parse response
|
|
try:
|
|
data = response.json()
|
|
except Exception as e:
|
|
current_app.logger.error(f"Auth: Failed to parse IndieLogin response: {e}")
|
|
raise IndieLoginError("Invalid JSON response from IndieLogin")
|
|
|
|
me = data.get("me")
|
|
|
|
if not me:
|
|
current_app.logger.error("Auth: No identity returned from IndieLogin")
|
|
raise IndieLoginError("No identity returned from IndieLogin")
|
|
|
|
current_app.logger.debug(f"Auth: Received identity from IndieLogin: {me}")
|
|
|
|
# Verify this is the admin user
|
|
admin_me = current_app.config.get("ADMIN_ME")
|
|
if not admin_me:
|
|
current_app.logger.error("Auth: ADMIN_ME not configured")
|
|
raise UnauthorizedError("Admin user not configured")
|
|
|
|
current_app.logger.info(f"Auth: Verifying admin authorization for me={me}")
|
|
|
|
if me != admin_me:
|
|
current_app.logger.warning(
|
|
f"Auth: Unauthorized login attempt: {me} (expected {admin_me})"
|
|
)
|
|
raise UnauthorizedError(f"User {me} is not authorized")
|
|
|
|
current_app.logger.debug("Auth: Admin verification passed")
|
|
|
|
# Create session
|
|
session_token = create_session(me)
|
|
|
|
return session_token
|
|
|
|
|
|
def create_session(me: str) -> str:
|
|
"""
|
|
Create authenticated session
|
|
|
|
Args:
|
|
me: Verified user identity URL
|
|
|
|
Returns:
|
|
Session token (plaintext, to be sent as cookie)
|
|
"""
|
|
# Generate secure token
|
|
session_token = secrets.token_urlsafe(32)
|
|
token_hash = _hash_token(session_token)
|
|
|
|
current_app.logger.debug("Auth: Session token generated (hash will be stored)")
|
|
|
|
# Calculate expiry (use configured session lifetime or default to 30 days)
|
|
session_lifetime = current_app.config.get("SESSION_LIFETIME", 30)
|
|
expires_at = datetime.utcnow() + timedelta(days=session_lifetime)
|
|
|
|
current_app.logger.debug(f"Auth: Session expiry: {expires_at} ({session_lifetime} days)")
|
|
|
|
# Get request metadata
|
|
user_agent = request.headers.get("User-Agent", "")[:200]
|
|
ip_address = request.remote_addr
|
|
|
|
current_app.logger.debug(f"Auth: Request metadata - IP: {ip_address}, User-Agent: {user_agent[:50]}...")
|
|
|
|
# Store in database
|
|
db = get_db(current_app)
|
|
db.execute(
|
|
"""
|
|
INSERT INTO sessions
|
|
(session_token_hash, me, expires_at, user_agent, ip_address)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
(token_hash, me, expires_at, user_agent, ip_address),
|
|
)
|
|
db.commit()
|
|
|
|
# Cleanup expired sessions
|
|
_cleanup_expired_sessions()
|
|
|
|
# Log session creation
|
|
current_app.logger.info(f"Auth: Session created for {me}")
|
|
|
|
return session_token
|
|
|
|
|
|
def verify_session(token: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Verify session token
|
|
|
|
Args:
|
|
token: Session token from cookie
|
|
|
|
Returns:
|
|
Session info dict if valid, None otherwise
|
|
"""
|
|
if not token:
|
|
current_app.logger.debug("Auth: No session token provided")
|
|
return None
|
|
|
|
current_app.logger.debug(f"Auth: Verifying session token: {_redact_token(token)}")
|
|
|
|
token_hash = _hash_token(token)
|
|
|
|
db = get_db(current_app)
|
|
session_data = db.execute(
|
|
"""
|
|
SELECT id, me, created_at, expires_at, last_used_at
|
|
FROM sessions
|
|
WHERE session_token_hash = ?
|
|
AND expires_at > datetime('now')
|
|
""",
|
|
(token_hash,),
|
|
).fetchone()
|
|
|
|
if not session_data:
|
|
current_app.logger.debug("Auth: Session token invalid or expired")
|
|
return None
|
|
|
|
current_app.logger.debug(f"Auth: Session verified for {session_data['me']}")
|
|
|
|
# Update last_used_at for activity tracking
|
|
db.execute(
|
|
"""
|
|
UPDATE sessions
|
|
SET last_used_at = datetime('now')
|
|
WHERE id = ?
|
|
""",
|
|
(session_data["id"],),
|
|
)
|
|
db.commit()
|
|
|
|
return {
|
|
"me": session_data["me"],
|
|
"created_at": session_data["created_at"],
|
|
"expires_at": session_data["expires_at"],
|
|
}
|
|
|
|
|
|
def destroy_session(token: str) -> None:
|
|
"""
|
|
Destroy session (logout)
|
|
|
|
Args:
|
|
token: Session token to destroy
|
|
"""
|
|
if not token:
|
|
return
|
|
|
|
token_hash = _hash_token(token)
|
|
|
|
db = get_db(current_app)
|
|
db.execute(
|
|
"""
|
|
DELETE FROM sessions
|
|
WHERE session_token_hash = ?
|
|
""",
|
|
(token_hash,),
|
|
)
|
|
db.commit()
|
|
|
|
current_app.logger.info("Session destroyed")
|
|
|
|
|
|
def require_auth(f):
|
|
"""
|
|
Decorator to require authentication for a route
|
|
|
|
Usage:
|
|
@app.route('/admin')
|
|
@require_auth
|
|
def admin_dashboard():
|
|
return render_template('admin/dashboard.html')
|
|
"""
|
|
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
# Get session token from cookie
|
|
session_token = request.cookies.get("starpunk_session")
|
|
|
|
# Verify session
|
|
session_info = verify_session(session_token)
|
|
|
|
if not session_info:
|
|
# Store intended destination
|
|
session["next"] = request.url
|
|
return redirect(url_for("auth.login_form"))
|
|
|
|
# Store user info in g for use in views
|
|
g.user = session_info
|
|
g.me = session_info["me"]
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|