Files
StarPunk/starpunk/routes/auth.py
Phil Skentelbery e5050a0a7e feat: Implement IndieAuth token and authorization endpoints (Phase 2)
Following design in /docs/design/micropub-architecture.md and
/docs/decisions/ADR-029-micropub-v1-implementation-phases.md

Token Endpoint (/auth/token):
- Exchange authorization codes for access tokens
- Form-encoded POST requests per IndieAuth spec
- PKCE support (code_verifier validation)
- OAuth 2.0 error responses
- Validates client_id, redirect_uri, me parameters
- Returns Bearer tokens with scope

Authorization Endpoint (/auth/authorization):
- GET: Display consent form (requires admin login)
- POST: Process approval/denial
- PKCE support (code_challenge storage)
- Scope validation and filtering
- Integration with session management
- Proper error handling and redirects

All 33 Phase 2 tests pass (17 token + 16 authorization)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-24 12:26:54 -07:00

451 lines
14 KiB
Python

"""
Authentication routes for StarPunk
Handles IndieLogin authentication flow including login form, OAuth callback,
logout functionality, and IndieAuth endpoints for Micropub clients.
"""
from flask import (
Blueprint,
current_app,
flash,
jsonify,
redirect,
render_template,
request,
session,
url_for,
)
from starpunk.auth import (
IndieLoginError,
InvalidStateError,
UnauthorizedError,
destroy_session,
handle_callback,
initiate_login,
require_auth,
verify_session,
)
from starpunk.tokens import (
create_access_token,
create_authorization_code,
exchange_authorization_code,
InvalidAuthorizationCodeError,
validate_scope,
)
# Create blueprint
bp = Blueprint("auth", __name__, url_prefix="/auth")
@bp.route("/login", methods=["GET"])
def login_form():
"""
Display login form
If user is already authenticated, redirects to admin dashboard.
Otherwise shows login form for IndieLogin authentication.
Returns:
Redirect to dashboard if authenticated, otherwise login template
Template: templates/admin/login.html
"""
# Check if already logged in
session_token = request.cookies.get("starpunk_session")
if session_token and verify_session(session_token):
return redirect(url_for("admin.dashboard"))
return render_template("admin/login.html")
@bp.route("/login", methods=["POST"])
def login_initiate():
"""
Initiate IndieLogin authentication flow
Validates the submitted 'me' URL and redirects to IndieLogin.com
for authentication.
Form data:
me: User's personal website URL
Returns:
Redirect to IndieLogin.com or back to login form on error
Raises:
Flashes error message and redirects on validation failure
"""
me_url = request.form.get("me", "").strip()
if not me_url:
flash("Please enter your website URL", "error")
return redirect(url_for("auth.login_form"))
try:
# Initiate IndieLogin flow
auth_url = initiate_login(me_url)
return redirect(auth_url)
except ValueError as e:
flash(str(e), "error")
return redirect(url_for("auth.login_form"))
@bp.route("/callback")
def callback():
"""
Handle IndieLogin callback
Processes the OAuth callback from IndieLogin.com, validates the
authorization code, state token, and issuer, then creates an
authenticated session using PKCE verification.
Query parameters:
code: Authorization code from IndieLogin
state: CSRF state token
iss: Issuer identifier (should be https://indielogin.com/)
Returns:
Redirect to admin dashboard on success, login form on failure
Sets:
session cookie (HttpOnly, Secure, SameSite=Lax, 30 day expiry)
"""
code = request.args.get("code")
state = request.args.get("state")
iss = request.args.get("iss") # Extract issuer parameter
if not code or not state:
flash("Missing authentication parameters", "error")
return redirect(url_for("auth.login_form"))
try:
# Handle callback and create session with PKCE verification
session_token = handle_callback(code, state, iss) # Pass issuer
# Create response with redirect
response = redirect(url_for("admin.dashboard"))
# Set secure session cookie
secure = current_app.config.get("SITE_URL", "").startswith("https://")
response.set_cookie(
"starpunk_session",
session_token,
httponly=True,
secure=secure,
samesite="Lax",
max_age=30 * 24 * 60 * 60, # 30 days
)
flash("Login successful!", "success")
return response
except InvalidStateError as e:
current_app.logger.error(f"Invalid state error: {e}")
flash("Authentication failed: Invalid state token (possible CSRF)", "error")
return redirect(url_for("auth.login_form"))
except UnauthorizedError as e:
current_app.logger.error(f"Unauthorized: {e}")
flash("Authentication failed: Not authorized as admin", "error")
return redirect(url_for("auth.login_form"))
except IndieLoginError as e:
current_app.logger.error(f"IndieLogin error: {e}")
flash(f"Authentication failed: {e}", "error")
return redirect(url_for("auth.login_form"))
except Exception as e:
current_app.logger.error(f"Unexpected auth error: {e}")
flash("Authentication failed: An unexpected error occurred", "error")
return redirect(url_for("auth.login_form"))
@bp.route("/logout", methods=["POST"])
@require_auth
def logout():
"""
Logout and destroy session
Destroys the user's session and clears the session cookie.
Requires authentication (user must be logged in to logout).
Returns:
Redirect to homepage with session cookie cleared
Decorator: @require_auth
"""
session_token = request.cookies.get("starpunk_session")
# Destroy session in database
if session_token:
try:
destroy_session(session_token)
except Exception as e:
current_app.logger.error(f"Error destroying session: {e}")
# Clear cookie and redirect
response = redirect(url_for("public.index"))
response.delete_cookie("starpunk_session")
flash("Logged out successfully", "success")
return response
@bp.route("/token", methods=["POST"])
def token_endpoint():
"""
IndieAuth token endpoint for exchanging authorization codes for access tokens
Implements the IndieAuth token endpoint as specified in:
https://www.w3.org/TR/indieauth/#token-endpoint
Form parameters (application/x-www-form-urlencoded):
grant_type: Must be "authorization_code"
code: The authorization code received from authorization endpoint
client_id: Client application URL (must match authorization request)
redirect_uri: Redirect URI (must match authorization request)
me: User's profile URL (must match authorization request)
code_verifier: PKCE verifier (optional, required if PKCE was used)
Returns:
200 OK with JSON response on success:
{
"access_token": "xxx",
"token_type": "Bearer",
"scope": "create",
"me": "https://user.example"
}
400 Bad Request with JSON error response on failure:
{
"error": "invalid_grant|invalid_request|invalid_client",
"error_description": "Human-readable error description"
}
"""
# Only accept form-encoded POST requests
if request.content_type and 'application/x-www-form-urlencoded' not in request.content_type:
return jsonify({
"error": "invalid_request",
"error_description": "Content-Type must be application/x-www-form-urlencoded"
}), 400
# Extract parameters from form data
grant_type = request.form.get('grant_type')
code = request.form.get('code')
client_id = request.form.get('client_id')
redirect_uri = request.form.get('redirect_uri')
me = request.form.get('me')
code_verifier = request.form.get('code_verifier')
# Validate required parameters
if not grant_type:
return jsonify({
"error": "invalid_request",
"error_description": "Missing grant_type parameter"
}), 400
if grant_type != 'authorization_code':
return jsonify({
"error": "unsupported_grant_type",
"error_description": f"Unsupported grant_type: {grant_type}"
}), 400
if not code:
return jsonify({
"error": "invalid_request",
"error_description": "Missing code parameter"
}), 400
if not client_id:
return jsonify({
"error": "invalid_request",
"error_description": "Missing client_id parameter"
}), 400
if not redirect_uri:
return jsonify({
"error": "invalid_request",
"error_description": "Missing redirect_uri parameter"
}), 400
if not me:
return jsonify({
"error": "invalid_request",
"error_description": "Missing me parameter"
}), 400
# Exchange authorization code for token
try:
auth_info = exchange_authorization_code(
code=code,
client_id=client_id,
redirect_uri=redirect_uri,
me=me,
code_verifier=code_verifier
)
# IndieAuth spec: MUST NOT issue token if no scope
if not auth_info['scope']:
return jsonify({
"error": "invalid_scope",
"error_description": "Authorization code was issued without scope"
}), 400
# Create access token
access_token = create_access_token(
me=auth_info['me'],
client_id=auth_info['client_id'],
scope=auth_info['scope']
)
# Return token response
return jsonify({
"access_token": access_token,
"token_type": "Bearer",
"scope": auth_info['scope'],
"me": auth_info['me']
}), 200
except InvalidAuthorizationCodeError as e:
current_app.logger.warning(f"Invalid authorization code: {e}")
return jsonify({
"error": "invalid_grant",
"error_description": str(e)
}), 400
except Exception as e:
current_app.logger.error(f"Token endpoint error: {e}")
return jsonify({
"error": "server_error",
"error_description": "An unexpected error occurred"
}), 500
@bp.route("/authorization", methods=["GET", "POST"])
def authorization_endpoint():
"""
IndieAuth authorization endpoint for Micropub client authorization
Implements the IndieAuth authorization endpoint as specified in:
https://www.w3.org/TR/indieauth/#authorization-endpoint
GET: Display authorization consent form
Query parameters:
response_type: Must be "code"
client_id: Client application URL
redirect_uri: Client's callback URL
state: Client's CSRF state token
scope: Space-separated list of requested scopes (optional)
me: User's profile URL (optional)
code_challenge: PKCE challenge (optional)
code_challenge_method: PKCE method, typically "S256" (optional)
POST: Process authorization approval/denial
Form parameters:
approve: "yes" if user approved, anything else is denial
(other parameters inherited from GET via hidden form fields)
Returns:
GET: HTML authorization consent form
POST: Redirect to client's redirect_uri with code and state parameters
"""
if request.method == "GET":
# Extract IndieAuth parameters
response_type = request.args.get('response_type')
client_id = request.args.get('client_id')
redirect_uri = request.args.get('redirect_uri')
state = request.args.get('state')
scope = request.args.get('scope', '')
me_param = request.args.get('me')
code_challenge = request.args.get('code_challenge')
code_challenge_method = request.args.get('code_challenge_method')
# Validate required parameters
if not response_type:
return "Missing response_type parameter", 400
if response_type != 'code':
return f"Unsupported response_type: {response_type}", 400
if not client_id:
return "Missing client_id parameter", 400
if not redirect_uri:
return "Missing redirect_uri parameter", 400
if not state:
return "Missing state parameter", 400
# Validate and filter scope to supported scopes
validated_scope = validate_scope(scope)
# Check if user is logged in as admin
session_token = request.cookies.get("starpunk_session")
if not session_token or not verify_session(session_token):
# Store authorization request in session
session['pending_auth_url'] = request.url
flash("Please log in to authorize this application", "info")
return redirect(url_for('auth.login_form'))
# User is logged in, show authorization consent form
# Use ADMIN_ME as the user's identity
me = current_app.config.get('ADMIN_ME')
return render_template(
'auth/authorize.html',
client_id=client_id,
redirect_uri=redirect_uri,
state=state,
scope=validated_scope,
me=me,
response_type=response_type,
code_challenge=code_challenge,
code_challenge_method=code_challenge_method
)
else: # POST
# User submitted authorization form
approve = request.form.get('approve')
client_id = request.form.get('client_id')
redirect_uri = request.form.get('redirect_uri')
state = request.form.get('state')
scope = request.form.get('scope', '')
me = request.form.get('me')
code_challenge = request.form.get('code_challenge')
code_challenge_method = request.form.get('code_challenge_method')
# Check if user is still logged in
session_token = request.cookies.get("starpunk_session")
if not session_token or not verify_session(session_token):
flash("Session expired, please log in again", "error")
return redirect(url_for('auth.login_form'))
# If user denied, redirect with error
if approve != 'yes':
error_redirect = f"{redirect_uri}?error=access_denied&error_description=User+denied+authorization&state={state}"
return redirect(error_redirect)
# User approved, generate authorization code
try:
auth_code = create_authorization_code(
me=me,
client_id=client_id,
redirect_uri=redirect_uri,
scope=scope,
state=state,
code_challenge=code_challenge,
code_challenge_method=code_challenge_method
)
# Redirect back to client with authorization code
callback_url = f"{redirect_uri}?code={auth_code}&state={state}"
return redirect(callback_url)
except Exception as e:
current_app.logger.error(f"Authorization endpoint error: {e}")
error_redirect = f"{redirect_uri}?error=server_error&error_description=Failed+to+generate+authorization+code&state={state}"
return redirect(error_redirect)