# Phase 3: Authentication Design ## Overview This document provides a complete, implementation-ready design for Phase 3 of the StarPunk V1 implementation: Authentication with IndieLogin. The authentication module (`starpunk/auth.py`) implements session-based authentication for the admin interface using IndieLogin.com as a delegated authentication provider. **Priority**: CRITICAL - Required for admin functionality **Estimated Effort**: 4-6 hours **Dependencies**: `starpunk/database.py`, `starpunk/utils.py` **File**: `starpunk/auth.py` ## Design Principles 1. **Zero Password Storage** - No passwords stored or managed locally 2. **Delegated Authentication** - IndieLogin.com handles identity verification 3. **Session-Based** - HTTP-only secure cookies for session management 4. **CSRF Protection** - State tokens prevent cross-site request forgery 5. **Single Admin User** - Simplified authorization (V1 requirement) 6. **Standards Compliance** - Full IndieAuth/OAuth 2.0 compatibility ## Authentication Flow ### Overview StarPunk uses IndieLogin.com for authentication, which implements the IndieAuth protocol. This allows users to authenticate using their personal website as their identity. ``` User → StarPunk → IndieLogin.com → User's Website → IndieLogin.com → StarPunk ``` ### Detailed Flow ```mermaid sequenceDiagram participant U as User participant S as StarPunk participant I as IndieLogin.com participant W as User's Website U->>S: GET /admin/login S->>U: Show login form U->>S: POST /admin/login (me=https://alice.com) S->>S: Generate state token S->>S: Store state in database S->>U: Redirect to IndieLogin U->>I: GET /auth (with params) I->>W: Verify identity (rel=me) I->>U: Show verification options U->>I: Authenticate I->>U: Redirect to callback U->>S: GET /auth/callback (code, state) S->>S: Verify state token S->>I: POST /auth (exchange code) I->>S: Return verified identity S->>S: Verify me == ADMIN_ME S->>S: Create session S->>U: Set cookie, redirect to /admin ``` ## Module Structure ```python """ Authentication module for StarPunk Implements IndieLogin authentication for admin access and session management. All authentication is delegated to IndieLogin.com - no passwords are stored. Functions: initiate_login: Start IndieLogin authentication flow handle_callback: Process IndieLogin callback create_session: Create authenticated session verify_session: Check if session is valid destroy_session: Logout and cleanup require_auth: Decorator for protected routes Classes: AuthError: Base authentication exception InvalidStateError: CSRF state validation failed UnauthorizedError: User not authorized as admin """ # Standard library imports import secrets from datetime import datetime, timedelta from functools import wraps from typing import Optional, Dict, Any from urllib.parse import urlencode, quote_plus # Third-party imports from flask import ( current_app, session, request, redirect, url_for, abort, g ) import httpx # Local imports from starpunk.database import get_db class AuthError(Exception): """Base exception for authentication errors""" pass class InvalidStateError(AuthError): """CSRF state token validation failed""" pass class UnauthorizedError(AuthError): """User is not authorized as admin""" pass ``` ## Core Functions ### 1. initiate_login() ```python def initiate_login(me: str) -> str: """ Initiate IndieLogin authentication flow Generates a CSRF state token, stores it in the database, and returns the authorization URL to redirect the user to IndieLogin.com. Args: me: The user's website URL (their identity) Returns: Authorization URL for redirect Raises: ValueError: If me is not a valid URL AuthError: If state generation fails Example: >>> url = initiate_login("https://alice.example.com") >>> # Redirect user to url """ # 1. Validate URL format if not me.startswith(('http://', 'https://')): raise ValueError(f"Invalid URL format: {me}") # Normalize URL (remove trailing slash) me = me.rstrip('/') # 2. Generate state token (CSRF protection) state = secrets.token_urlsafe(32) # 3. Store state in database with expiry db = get_db() expires_at = datetime.utcnow() + timedelta(minutes=5) try: db.execute( "INSERT INTO auth_state (state, expires_at) VALUES (?, ?)", (state, expires_at) ) db.commit() except Exception as e: raise AuthError(f"Failed to store state: {e}") # 4. Build authorization URL client_id = current_app.config['SITE_URL'] redirect_uri = f"{client_id}/auth/callback" params = { 'me': me, 'client_id': client_id, 'redirect_uri': redirect_uri, 'state': state, 'response_type': 'code' } auth_url = f"https://indielogin.com/auth?{urlencode(params)}" return auth_url ``` ### 2. handle_callback() ```python def handle_callback(code: str, state: str) -> Dict[str, Any]: """ Handle IndieLogin callback and verify identity Validates the state token, exchanges the authorization code for the verified identity, and checks if the user is authorized. Args: code: Authorization code from IndieLogin state: State token for CSRF verification Returns: Dict with verified 'me' URL and any profile info Raises: InvalidStateError: If state doesn't match UnauthorizedError: If user is not the admin AuthError: If verification fails """ # 1. Verify state token db = get_db() # Get stored state and check expiry row = db.execute( """ SELECT expires_at FROM auth_state WHERE state = ? AND expires_at > datetime('now') """, (state,) ).fetchone() if row is None: raise InvalidStateError("Invalid or expired state token") # Delete used state (one-time use) db.execute("DELETE FROM auth_state WHERE state = ?", (state,)) db.commit() # 2. Exchange code for verified identity client_id = current_app.config['SITE_URL'] redirect_uri = f"{client_id}/auth/callback" token_endpoint = "https://indielogin.com/auth" try: with httpx.Client(timeout=10.0) as client: response = client.post( token_endpoint, data={ 'code': code, 'client_id': client_id, 'redirect_uri': redirect_uri }, headers={'Accept': 'application/json'} ) response.raise_for_status() data = response.json() except Exception as e: raise AuthError(f"Failed to verify identity: {e}") # 3. Extract verified identity me = data.get('me', '').rstrip('/') if not me: raise AuthError("No identity returned from IndieLogin") # 4. Check authorization (admin only) admin_me = current_app.config['ADMIN_ME'].rstrip('/') if me != admin_me: raise UnauthorizedError( f"User {me} is not authorized. Only {admin_me} can access admin." ) return { 'me': me, 'profile': data.get('profile', {}) } ``` ### 3. create_session() ```python def create_session(me: str) -> str: """ Create authenticated session Generates a session token, stores it in the database, and returns the token to be set as a secure cookie. Args: me: The verified user identity URL Returns: Session token for cookie Example: >>> token = create_session("https://alice.example.com") >>> # Set cookie with token """ # 1. Generate session token session_token = secrets.token_urlsafe(32) # 2. Store in database db = get_db() created_at = datetime.utcnow() expires_at = created_at + timedelta(days=30) db.execute( """ INSERT INTO sessions (session_token, me, created_at, expires_at, last_accessed) VALUES (?, ?, ?, ?, ?) """, (session_token, me, created_at, expires_at, created_at) ) db.commit() return session_token ``` ### 4. verify_session() ```python def verify_session(session_token: Optional[str] = None) -> Optional[Dict[str, Any]]: """ Verify if session is valid Checks the session token from cookie or parameter, verifies it exists and hasn't expired, and updates last access time. Args: session_token: Token to verify (default: from cookie) Returns: Session data dict if valid, None if invalid Example: >>> session = verify_session() >>> if session: ... print(f"Logged in as {session['me']}") """ # Get token from parameter or cookie if session_token is None: session_token = request.cookies.get('session_token') if not session_token: return None # Query database db = get_db() row = db.execute( """ SELECT id, me, created_at, expires_at FROM sessions WHERE session_token = ? AND expires_at > datetime('now') """, (session_token,) ).fetchone() if row is None: return None # Update last access time db.execute( "UPDATE sessions SET last_accessed = ? WHERE id = ?", (datetime.utcnow(), row['id']) ) db.commit() return { 'id': row['id'], 'me': row['me'], 'created_at': row['created_at'], 'expires_at': row['expires_at'] } ``` ### 5. destroy_session() ```python def destroy_session(session_token: Optional[str] = None) -> None: """ Destroy session (logout) Removes session from database. Token can be provided or taken from cookie. Args: session_token: Token to destroy (default: from cookie) """ if session_token is None: session_token = request.cookies.get('session_token') if session_token: db = get_db() db.execute( "DELETE FROM sessions WHERE session_token = ?", (session_token,) ) db.commit() ``` ### 6. require_auth Decorator ```python def require_auth(f): """ Decorator to protect routes requiring authentication Verifies session and adds user info to g.user. Redirects to login if not authenticated. Example: @app.route('/admin') @require_auth def admin_dashboard(): # g.user contains session info return render_template('admin/dashboard.html') """ @wraps(f) def decorated_function(*args, **kwargs): session = verify_session() if session is None: # Store intended destination session['next'] = request.url return redirect(url_for('admin.login')) # Add to Flask globals g.user = session return f(*args, **kwargs) return decorated_function ``` ## Helper Functions ### cleanup_expired_sessions() ```python def cleanup_expired_sessions() -> int: """ Remove expired sessions and state tokens Should be called periodically (e.g., daily cron job). Returns: Number of records deleted """ db = get_db() # Delete expired sessions result1 = db.execute( "DELETE FROM sessions WHERE expires_at < datetime('now')" ) # Delete expired state tokens result2 = db.execute( "DELETE FROM auth_state WHERE expires_at < datetime('now')" ) db.commit() return result1.rowcount + result2.rowcount ``` ### extend_session() ```python def extend_session(session_token: str, days: int = 30) -> None: """ Extend session expiry time Used to keep active users logged in. Args: session_token: Token to extend days: Number of days to extend """ db = get_db() new_expiry = datetime.utcnow() + timedelta(days=days) db.execute( "UPDATE sessions SET expires_at = ? WHERE session_token = ?", (new_expiry, session_token) ) db.commit() ``` ## Security Considerations ### Session Security 1. **Token Generation**: Use `secrets.token_urlsafe(32)` for cryptographically secure tokens 2. **Cookie Flags**: - `HttpOnly`: Prevent JavaScript access - `Secure`: HTTPS only (production) - `SameSite=Lax`: CSRF protection 3. **Token Storage**: Never store raw tokens, consider hashing in future 4. **Expiry**: 30-day default, extendable on activity ### CSRF Protection 1. **State Tokens**: Random token for each auth attempt 2. **Single Use**: State deleted after verification 3. **Short Expiry**: 5-minute validity window 4. **Database Storage**: Prevents replay attacks ### Authorization 1. **Single Admin**: Only ADMIN_ME from config can authenticate 2. **URL Normalization**: Strip trailing slashes for comparison 3. **Strict Matching**: Exact match required (no wildcards) ## Integration Points ### Configuration Required ```python # In .env file SITE_URL=https://starpunk.example.com ADMIN_ME=https://alice.example.com SESSION_SECRET=<64-character-hex-string> ``` ### Database Tables Used - `sessions`: Store active sessions - `auth_state`: Store CSRF state tokens ### Routes to Implement (Phase 4) ```python # In starpunk/routes/admin.py @app.route('/admin/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': me = request.form.get('me') auth_url = initiate_login(me) return redirect(auth_url) return render_template('admin/login.html') @app.route('/auth/callback') def callback(): code = request.args.get('code') state = request.args.get('state') try: user = handle_callback(code, state) token = create_session(user['me']) response = redirect(url_for('admin.dashboard')) response.set_cookie( 'session_token', token, httponly=True, secure=current_app.config['ENV'] == 'production', samesite='Lax', max_age=30*24*60*60 # 30 days ) return response except AuthError as e: flash(str(e)) return redirect(url_for('admin.login')) @app.route('/admin/logout') @require_auth def logout(): destroy_session() response = redirect(url_for('public.index')) response.delete_cookie('session_token') return response ``` ## Testing Strategy ### Test Categories 1. **Authentication Flow Tests** - State token generation and storage - Callback handling with valid code - Invalid state rejection - Expired state cleanup 2. **Session Management Tests** - Session creation - Session verification - Session destruction - Session expiry 3. **Authorization Tests** - Admin user accepted - Non-admin rejected - URL normalization 4. **Security Tests** - CSRF protection - Token uniqueness - Cookie security flags ### Example Tests ```python def test_initiate_login(app, client): """Test login initiation""" url = initiate_login("https://alice.example.com") assert "indielogin.com" in url assert "state=" in url def test_require_auth_decorator(app, client): """Test auth decorator redirects""" @require_auth def protected(): return "Protected" # Without session response = protected() assert response.status_code == 302 assert "/login" in response.location ``` ## Acceptance Criteria Phase 3 is complete when: - [ ] All authentication functions implemented - [ ] State token CSRF protection working - [ ] Session management functional - [ ] require_auth decorator protects routes - [ ] Expired session cleanup implemented - [ ] Integration with IndieLogin.com tested - [ ] Security measures in place (cookie flags, etc.) - [ ] Error handling comprehensive - [ ] Test coverage >90% - [ ] Documentation complete ## Next Steps After Phase 3: 1. **Phase 4**: Web Routes and Templates 2. **Phase 5**: Micropub Implementation 3. **Phase 6**: RSS Feed Generation Authentication provides the foundation for the admin interface and API security.