"""Authorization endpoint for OAuth 2.0 / IndieAuth authorization code flow.""" import logging from urllib.parse import urlencode from fastapi import APIRouter, Depends, Form, Request from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.templating import Jinja2Templates from gondulf.database.connection import Database from gondulf.dependencies import get_database, get_happ_parser, get_verification_service from gondulf.services.domain_verification import DomainVerificationService from gondulf.services.happ_parser import HAppParser from gondulf.utils.validation import ( extract_domain_from_url, normalize_client_id, validate_redirect_uri, ) logger = logging.getLogger("gondulf.authorization") router = APIRouter() templates = Jinja2Templates(directory="src/gondulf/templates") @router.get("/authorize") async def authorize_get( request: Request, client_id: str | None = None, redirect_uri: str | None = None, response_type: str | None = None, state: str | None = None, code_challenge: str | None = None, code_challenge_method: str | None = None, scope: str | None = None, me: str | None = None, database: Database = Depends(get_database), happ_parser: HAppParser = Depends(get_happ_parser) ) -> HTMLResponse: """ Handle authorization request (GET). Validates client_id, redirect_uri, and required parameters. Shows consent form if domain is verified, or verification form if not. Args: request: FastAPI request object client_id: Client application identifier redirect_uri: Callback URI for client response_type: Must be "code" state: Client state parameter code_challenge: PKCE code challenge code_challenge_method: PKCE method (S256) scope: Requested scope me: User identity URL database: Database service Returns: HTML response with consent form or error page """ # Validate required parameters (pre-client validation) if not client_id: return templates.TemplateResponse( "error.html", { "request": request, "error": "Missing required parameter: client_id", "error_code": "invalid_request" }, status_code=400 ) if not redirect_uri: return templates.TemplateResponse( "error.html", { "request": request, "error": "Missing required parameter: redirect_uri", "error_code": "invalid_request" }, status_code=400 ) # Normalize and validate client_id try: normalized_client_id = normalize_client_id(client_id) except ValueError: return templates.TemplateResponse( "error.html", { "request": request, "error": "client_id must use HTTPS", "error_code": "invalid_request" }, status_code=400 ) # Validate redirect_uri against client_id if not validate_redirect_uri(redirect_uri, normalized_client_id): return templates.TemplateResponse( "error.html", { "request": request, "error": "redirect_uri does not match client_id domain", "error_code": "invalid_request" }, status_code=400 ) # From here on, redirect errors to client via OAuth error redirect # Validate response_type if response_type != "code": error_params = { "error": "unsupported_response_type", "error_description": "Only response_type=code is supported", "state": state or "" } redirect_url = f"{redirect_uri}?{urlencode(error_params)}" return RedirectResponse(url=redirect_url, status_code=302) # Validate code_challenge (PKCE required) if not code_challenge: error_params = { "error": "invalid_request", "error_description": "code_challenge is required (PKCE)", "state": state or "" } redirect_url = f"{redirect_uri}?{urlencode(error_params)}" return RedirectResponse(url=redirect_url, status_code=302) # Validate code_challenge_method if code_challenge_method != "S256": error_params = { "error": "invalid_request", "error_description": "code_challenge_method must be S256", "state": state or "" } redirect_url = f"{redirect_uri}?{urlencode(error_params)}" return RedirectResponse(url=redirect_url, status_code=302) # Validate me parameter if not me: error_params = { "error": "invalid_request", "error_description": "me parameter is required", "state": state or "" } redirect_url = f"{redirect_uri}?{urlencode(error_params)}" return RedirectResponse(url=redirect_url, status_code=302) # Validate me URL format try: extract_domain_from_url(me) except ValueError: error_params = { "error": "invalid_request", "error_description": "Invalid me URL", "state": state or "" } redirect_url = f"{redirect_uri}?{urlencode(error_params)}" return RedirectResponse(url=redirect_url, status_code=302) # Check if domain is verified # For Phase 2, we'll show consent form immediately (domain verification happens separately) # In Phase 3, we'll check database for verified domains # Fetch client metadata (h-app microformat) client_metadata = None try: client_metadata = await happ_parser.fetch_and_parse(normalized_client_id) logger.info(f"Fetched client metadata for {normalized_client_id}: {client_metadata.name}") except Exception as e: logger.warning(f"Failed to fetch client metadata for {normalized_client_id}: {e}") # Continue without metadata - will show client_id instead # Show consent form return templates.TemplateResponse( "authorize.html", { "request": request, "client_id": normalized_client_id, "redirect_uri": redirect_uri, "state": state or "", "code_challenge": code_challenge, "code_challenge_method": code_challenge_method, "scope": scope or "", "me": me, "client_metadata": client_metadata } ) @router.post("/authorize/consent") async def authorize_consent( request: Request, client_id: str = Form(...), redirect_uri: str = Form(...), state: str = Form(...), code_challenge: str = Form(...), code_challenge_method: str = Form(...), scope: str = Form(...), me: str = Form(...), verification_service: DomainVerificationService = Depends(get_verification_service) ) -> RedirectResponse: """ Handle authorization consent (POST). Creates authorization code and redirects to client callback. Args: request: FastAPI request object client_id: Client application identifier redirect_uri: Callback URI state: Client state code_challenge: PKCE challenge code_challenge_method: PKCE method scope: Requested scope me: User identity verification_service: Domain verification service Returns: Redirect to client callback with authorization code """ logger.info(f"Authorization consent granted for client_id={client_id}") # Create authorization code authorization_code = verification_service.create_authorization_code( client_id=client_id, redirect_uri=redirect_uri, state=state, code_challenge=code_challenge, code_challenge_method=code_challenge_method, scope=scope, me=me ) # Build redirect URL with authorization code redirect_params = { "code": authorization_code, "state": state } redirect_url = f"{redirect_uri}?{urlencode(redirect_params)}" logger.info(f"Redirecting to {redirect_uri} with authorization code") return RedirectResponse(url=redirect_url, status_code=302)