""" Authentication module for StarPunk Implements IndieLogin authentication for admin access using indielogin.com as a delegated authentication provider. No passwords are stored locally. Security features: - CSRF protection via state tokens - Secure session tokens (cryptographically random) - Token hashing in database (SHA-256) - HttpOnly, Secure, SameSite cookies - Single-admin authorization - Automatic session cleanup Functions: initiate_login: Start IndieLogin authentication flow handle_callback: Process IndieLogin callback create_session: Create authenticated session verify_session: Check if session is valid destroy_session: Logout and cleanup require_auth: Decorator for protected routes Exceptions: AuthError: Base authentication exception InvalidStateError: CSRF state validation failed UnauthorizedError: User not authorized as admin IndieLoginError: External service error """ import hashlib import logging import secrets from datetime import datetime, timedelta from functools import wraps from typing import Any, Dict, Optional from urllib.parse import urlencode import httpx from flask import current_app, g, redirect, request, session, url_for from starpunk.database import get_db from starpunk.utils import is_valid_url # Custom exceptions class AuthError(Exception): """Base exception for authentication errors""" pass class InvalidStateError(AuthError): """CSRF state validation failed""" pass class UnauthorizedError(AuthError): """User not authorized as admin""" pass class IndieLoginError(AuthError): """IndieLogin service error""" pass # Logging helper functions def _redact_token(value: str, show_chars: int = 6) -> str: """ Redact sensitive token for logging Shows first N and last 4 characters with asterisks in between. Args: value: Token to redact show_chars: Number of characters to show at start (default: 6) Returns: Redacted token string like "abc123...********...xyz9" """ if not value or len(value) <= (show_chars + 4): return "***REDACTED***" return f"{value[:show_chars]}...{'*' * 8}...{value[-4:]}" def _log_http_request(method: str, url: str, data: dict, headers: dict = None) -> None: """ Log HTTP request details at DEBUG level Automatically redacts sensitive parameters (code, state, authorization) Args: method: HTTP method (GET, POST, etc.) url: Request URL data: Request data/parameters headers: Optional request headers """ if not current_app.logger.isEnabledFor(logging.DEBUG): return # Redact sensitive data safe_data = data.copy() if "code" in safe_data: safe_data["code"] = _redact_token(safe_data["code"]) if "state" in safe_data: safe_data["state"] = _redact_token(safe_data["state"], 8) current_app.logger.debug( f"IndieAuth HTTP Request:\n" f" Method: {method}\n" f" URL: {url}\n" f" Data: {safe_data}" ) if headers: safe_headers = { k: v for k, v in headers.items() if k.lower() not in ["authorization", "cookie"] } current_app.logger.debug(f" Headers: {safe_headers}") def _log_http_response(status_code: int, headers: dict, body: str) -> None: """ Log HTTP response details at DEBUG level Automatically redacts sensitive response data Args: status_code: HTTP status code headers: Response headers body: Response body (JSON string or text) """ if not current_app.logger.isEnabledFor(logging.DEBUG): return # Parse and redact JSON body if present safe_body = body try: import json data = json.loads(body) if "access_token" in data: data["access_token"] = _redact_token(data["access_token"]) if "code" in data: data["code"] = _redact_token(data["code"]) safe_body = json.dumps(data, indent=2) except (json.JSONDecodeError, TypeError): # Not JSON or parsing failed, log as-is (likely error message) pass # Redact sensitive headers safe_headers = { k: v for k, v in headers.items() if k.lower() not in ["set-cookie", "authorization"] } current_app.logger.debug( f"IndieAuth HTTP Response:\n" f" Status: {status_code}\n" f" Headers: {safe_headers}\n" f" Body: {safe_body}" ) # Helper functions def _hash_token(token: str) -> str: """ Hash token using SHA-256 Args: token: Token to hash Returns: Hexadecimal hash string """ return hashlib.sha256(token.encode()).hexdigest() def _generate_state_token() -> str: """ Generate CSRF state token Returns: URL-safe random token """ return secrets.token_urlsafe(32) def _verify_state_token(state: str) -> bool: """ Verify and consume CSRF state token Args: state: State token to verify Returns: True if valid, False otherwise """ db = get_db(current_app) # Check if state exists and not expired result = db.execute( """ SELECT 1 FROM auth_state WHERE state = ? AND expires_at > datetime('now') """, (state,), ).fetchone() if not result: return False # Delete state (single-use) db.execute("DELETE FROM auth_state WHERE state = ?", (state,)) db.commit() return True def _cleanup_expired_sessions() -> None: """Remove expired sessions and state tokens""" db = get_db(current_app) # Delete expired sessions db.execute( """ DELETE FROM sessions WHERE expires_at <= datetime('now') """ ) # Delete expired state tokens db.execute( """ DELETE FROM auth_state WHERE expires_at <= datetime('now') """ ) db.commit() # Core authentication functions def initiate_login(me_url: str) -> str: """ Initiate IndieLogin authentication flow Args: me_url: User's IndieWeb identity URL Returns: Redirect URL to IndieLogin.com Raises: ValueError: Invalid me_url format """ # Validate URL format if not is_valid_url(me_url): raise ValueError(f"Invalid URL format: {me_url}") current_app.logger.debug(f"Auth: Validating me URL: {me_url}") # Generate CSRF state token state = _generate_state_token() current_app.logger.debug(f"Auth: Generated state token: {_redact_token(state, 8)}") # Store state in database (5-minute expiry) db = get_db(current_app) expires_at = datetime.utcnow() + timedelta(minutes=5) redirect_uri = f"{current_app.config['SITE_URL']}/auth/callback" db.execute( """ INSERT INTO auth_state (state, expires_at, redirect_uri) VALUES (?, ?, ?) """, (state, expires_at, redirect_uri), ) db.commit() # Build IndieLogin URL params = { "me": me_url, "client_id": current_app.config["SITE_URL"], "redirect_uri": redirect_uri, "state": state, "response_type": "code", } current_app.logger.debug( f"Auth: Building authorization URL with params: {{\n" f" 'me': '{me_url}',\n" f" 'client_id': '{current_app.config['SITE_URL']}',\n" f" 'redirect_uri': '{redirect_uri}',\n" f" 'state': '{_redact_token(state, 8)}',\n" f" 'response_type': 'code'\n" f"}}" ) auth_url = f"{current_app.config['INDIELOGIN_URL']}/auth?{urlencode(params)}" # Log authentication attempt current_app.logger.info(f"Auth: Authentication initiated for {me_url}") return auth_url def handle_callback(code: str, state: str) -> Optional[str]: """ Handle IndieLogin callback Args: code: Authorization code from IndieLogin state: CSRF state token Returns: Session token if successful, None otherwise Raises: InvalidStateError: State token validation failed UnauthorizedError: User not authorized as admin IndieLoginError: Code exchange failed """ current_app.logger.debug(f"Auth: Verifying state token: {_redact_token(state, 8)}") # Verify state token (CSRF protection) if not _verify_state_token(state): current_app.logger.warning("Auth: Invalid state token received (possible CSRF or expired token)") raise InvalidStateError("Invalid or expired state token") current_app.logger.debug("Auth: State token valid and consumed") # Prepare token exchange request token_exchange_data = { "code": code, "client_id": current_app.config["SITE_URL"], "redirect_uri": f"{current_app.config['SITE_URL']}/auth/callback", } # Log the request _log_http_request( method="POST", url=f"{current_app.config['INDIELOGIN_URL']}/auth", data=token_exchange_data, ) # Exchange code for identity try: response = httpx.post( f"{current_app.config['INDIELOGIN_URL']}/auth", data=token_exchange_data, timeout=10.0, ) # Log the response _log_http_response( status_code=response.status_code, headers=dict(response.headers), body=response.text, ) response.raise_for_status() except httpx.RequestError as e: current_app.logger.error(f"Auth: IndieLogin request failed: {e}") raise IndieLoginError(f"Failed to verify code: {e}") except httpx.HTTPStatusError as e: current_app.logger.error(f"Auth: IndieLogin returned error: {e.response.status_code}") raise IndieLoginError(f"IndieLogin returned error: {e.response.status_code}") # Parse response data = response.json() me = data.get("me") if not me: current_app.logger.error("Auth: No identity returned from IndieLogin") raise IndieLoginError("No identity returned from IndieLogin") current_app.logger.debug(f"Auth: Received identity from IndieLogin: {me}") # Verify this is the admin user admin_me = current_app.config.get("ADMIN_ME") if not admin_me: current_app.logger.error("Auth: ADMIN_ME not configured") raise UnauthorizedError("Admin user not configured") current_app.logger.info(f"Auth: Verifying admin authorization for me={me}") if me != admin_me: current_app.logger.warning( f"Auth: Unauthorized login attempt: {me} (expected {admin_me})" ) raise UnauthorizedError(f"User {me} is not authorized") current_app.logger.debug("Auth: Admin verification passed") # Create session session_token = create_session(me) return session_token def create_session(me: str) -> str: """ Create authenticated session Args: me: Verified user identity URL Returns: Session token (plaintext, to be sent as cookie) """ # Generate secure token session_token = secrets.token_urlsafe(32) token_hash = _hash_token(session_token) current_app.logger.debug("Auth: Session token generated (hash will be stored)") # Calculate expiry (use configured session lifetime or default to 30 days) session_lifetime = current_app.config.get("SESSION_LIFETIME", 30) expires_at = datetime.utcnow() + timedelta(days=session_lifetime) current_app.logger.debug(f"Auth: Session expiry: {expires_at} ({session_lifetime} days)") # Get request metadata user_agent = request.headers.get("User-Agent", "")[:200] ip_address = request.remote_addr current_app.logger.debug(f"Auth: Request metadata - IP: {ip_address}, User-Agent: {user_agent[:50]}...") # Store in database db = get_db(current_app) db.execute( """ INSERT INTO sessions (session_token_hash, me, expires_at, user_agent, ip_address) VALUES (?, ?, ?, ?, ?) """, (token_hash, me, expires_at, user_agent, ip_address), ) db.commit() # Cleanup expired sessions _cleanup_expired_sessions() # Log session creation current_app.logger.info(f"Auth: Session created for {me}") return session_token def verify_session(token: str) -> Optional[Dict[str, Any]]: """ Verify session token Args: token: Session token from cookie Returns: Session info dict if valid, None otherwise """ if not token: current_app.logger.debug("Auth: No session token provided") return None current_app.logger.debug(f"Auth: Verifying session token: {_redact_token(token)}") token_hash = _hash_token(token) db = get_db(current_app) session_data = db.execute( """ SELECT id, me, created_at, expires_at, last_used_at FROM sessions WHERE session_token_hash = ? AND expires_at > datetime('now') """, (token_hash,), ).fetchone() if not session_data: current_app.logger.debug("Auth: Session token invalid or expired") return None current_app.logger.debug(f"Auth: Session verified for {session_data['me']}") # Update last_used_at for activity tracking db.execute( """ UPDATE sessions SET last_used_at = datetime('now') WHERE id = ? """, (session_data["id"],), ) db.commit() return { "me": session_data["me"], "created_at": session_data["created_at"], "expires_at": session_data["expires_at"], } def destroy_session(token: str) -> None: """ Destroy session (logout) Args: token: Session token to destroy """ if not token: return token_hash = _hash_token(token) db = get_db(current_app) db.execute( """ DELETE FROM sessions WHERE session_token_hash = ? """, (token_hash,), ) db.commit() current_app.logger.info("Session destroyed") def require_auth(f): """ Decorator to require authentication for a route Usage: @app.route('/admin') @require_auth def admin_dashboard(): return render_template('admin/dashboard.html') """ @wraps(f) def decorated_function(*args, **kwargs): # Get session token from cookie session_token = request.cookies.get("starpunk_session") # Verify session session_info = verify_session(session_token) if not session_info: # Store intended destination session["next"] = request.url return redirect(url_for("auth.login_form")) # Store user info in g for use in views g.user = session_info g.me = session_info["me"] return f(*args, **kwargs) return decorated_function