Files
Gondulf/docs/designs/phase-2-implementation-guide.md
Phil Skentelbery 6f06aebf40 docs: add Phase 2 domain verification design and clarifications
Add comprehensive Phase 2 documentation:
- Complete design document for two-factor domain verification
- Implementation guide with code examples
- ADR for implementation decisions (ADR-0004)
- ADR for rel="me" email discovery (ADR-008)
- Phase 1 impact assessment
- All 23 clarification questions answered
- Updated architecture docs (indieauth-protocol, security)
- Updated ADR-005 with rel="me" approach
- Updated backlog with technical debt items

Design ready for Phase 2 implementation.

Generated with Claude Code https://claude.com/claude-code

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-20 13:05:09 -07:00

21 KiB

Phase 2 Implementation Guide - Specific Details

Date: 2024-11-20 Architect: Claude (Architect Agent) Status: Supplementary to Phase 2 Design Purpose: Provide specific implementation details for Developer clarification questions

This document supplements /docs/designs/phase-2-domain-verification.md with specific implementation decisions from ADR-0004.

1. Rate Limiting Implementation

Approach

Implement actual in-memory rate limiting with timestamp tracking.

Implementation Specifications

Service Structure:

# src/gondulf/rate_limiter.py
from typing import Dict, List
import time

class RateLimiter:
    """In-memory rate limiter for domain verification attempts."""

    def __init__(self, max_attempts: int = 3, window_hours: int = 1):
        """
        Args:
            max_attempts: Maximum attempts per domain in time window (default: 3)
            window_hours: Time window in hours (default: 1)
        """
        self.max_attempts = max_attempts
        self.window_seconds = window_hours * 3600
        self._attempts: Dict[str, List[int]] = {}  # domain -> [timestamp1, timestamp2, ...]

    def check_rate_limit(self, domain: str) -> bool:
        """
        Check if domain has exceeded rate limit.

        Args:
            domain: Domain to check

        Returns:
            True if within rate limit, False if exceeded
        """
        # Clean old timestamps first
        self._clean_old_attempts(domain)

        # Check current count
        if domain not in self._attempts:
            return True

        return len(self._attempts[domain]) < self.max_attempts

    def record_attempt(self, domain: str) -> None:
        """Record a verification attempt for domain."""
        now = int(time.time())
        if domain not in self._attempts:
            self._attempts[domain] = []
        self._attempts[domain].append(now)

    def _clean_old_attempts(self, domain: str) -> None:
        """Remove timestamps older than window."""
        if domain not in self._attempts:
            return

        now = int(time.time())
        cutoff = now - self.window_seconds
        self._attempts[domain] = [ts for ts in self._attempts[domain] if ts > cutoff]

        # Remove domain entirely if no recent attempts
        if not self._attempts[domain]:
            del self._attempts[domain]

Usage in Endpoints:

# In verification endpoint
rate_limiter = get_rate_limiter()
if not rate_limiter.check_rate_limit(domain):
    return {"success": False, "error": "rate_limit_exceeded"}

rate_limiter.record_attempt(domain)
# ... proceed with verification

Consequences:

  • State lost on restart (acceptable trade-off for simplicity)
  • No persistence needed
  • Simple dictionary-based implementation

2. Authorization Code Metadata Structure

Approach

Use Phase 1's CodeStorage service with complete metadata structure from the start.

Data Structure Specification

Authorization Code Metadata:

{
    "client_id": "https://client.example.com/",
    "redirect_uri": "https://client.example.com/callback",
    "state": "client_state_value",
    "code_challenge": "base64url_encoded_challenge",
    "code_challenge_method": "S256",
    "scope": "profile email",
    "me": "https://user.example.com/",
    "created_at": 1700000000,  # epoch integer
    "expires_at": 1700000600,  # epoch integer (created_at + 600)
    "used": False  # Include now, consume in Phase 3
}

Storage Implementation:

# Use Phase 1's CodeStorage
code_storage = get_code_storage()
authorization_code = generate_random_code()
metadata = {
    "client_id": client_id,
    "redirect_uri": redirect_uri,
    "state": state,
    "code_challenge": code_challenge,
    "code_challenge_method": code_challenge_method,
    "scope": scope,
    "me": me,
    "created_at": int(time.time()),
    "expires_at": int(time.time()) + 600,
    "used": False
}
code_storage.store(f"authz:{authorization_code}", metadata, ttl=600)

Rationale:

  • Epoch integers simpler than datetime objects
  • Include used field now (Phase 3 will check/update it)
  • Reuse existing CodeStorage infrastructure
  • Key prefix authz: distinguishes from verification codes

3. HTML Template Implementation

Approach

Use Jinja2 templates with separate template files.

Directory Structure

src/gondulf/templates/
├── base.html              # Shared layout
├── verify_email.html      # Email verification form
├── verify_totp.html       # TOTP verification form (future)
├── authorize.html         # Authorization consent page
└── error.html            # Generic error page

Base Template

<!-- src/gondulf/templates/base.html -->
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>{% block title %}Gondulf IndieAuth{% endblock %}</title>
    <style>
        body {
            font-family: system-ui, -apple-system, sans-serif;
            max-width: 600px;
            margin: 50px auto;
            padding: 20px;
            line-height: 1.6;
        }
        .error { color: #d32f2f; }
        .success { color: #388e3c; }
        form { margin-top: 20px; }
        input, button { font-size: 16px; padding: 8px; }
        button { background: #1976d2; color: white; border: none; cursor: pointer; }
        button:hover { background: #1565c0; }
    </style>
</head>
<body>
    {% block content %}{% endblock %}
</body>
</html>

Email Verification Template

<!-- src/gondulf/templates/verify_email.html -->
{% extends "base.html" %}

{% block title %}Verify Email - Gondulf{% endblock %}

{% block content %}
<h1>Verify Your Email</h1>
<p>A verification code has been sent to <strong>{{ masked_email }}</strong></p>
<p>Please enter the 6-digit code to complete verification:</p>

{% if error %}
<p class="error">{{ error }}</p>
{% endif %}

<form method="POST" action="/verify/email">
    <input type="hidden" name="domain" value="{{ domain }}">
    <input type="text" name="code" placeholder="000000" maxlength="6" required autofocus>
    <button type="submit">Verify</button>
</form>
{% endblock %}

FastAPI Integration

from fastapi import FastAPI, Request
from fastapi.templating import Jinja2Templates

templates = Jinja2Templates(directory="src/gondulf/templates")

@app.get("/verify/email")
async def verify_email_page(request: Request, domain: str):
    masked = mask_email(discovered_email)
    return templates.TemplateResponse("verify_email.html", {
        "request": request,
        "domain": domain,
        "masked_email": masked
    })

Dependencies:

  • Add to pyproject.toml: jinja2 = "^3.1.0"

4. Database Migration Timing

Approach

Apply migration 002 immediately as part of Phase 2 setup.

Execution Order

  1. Developer runs migration: alembic upgrade head
  2. Migration 002 adds two_factor column with default value false
  3. All Phase 2 code assumes column exists
  4. New domains inserted with explicit two_factor value

Migration File (if not already created)

# migrations/versions/002_add_two_factor_column.py
"""Add two_factor column to domains table

Revision ID: 002
Revises: 001
Create Date: 2024-11-20
"""
from alembic import op
import sqlalchemy as sa

def upgrade():
    op.add_column('domains',
        sa.Column('two_factor', sa.Boolean(), nullable=False, server_default='false')
    )

def downgrade():
    op.drop_column('domains', 'two_factor')

Rationale:

  • Keep database schema current with code expectations
  • No conditional logic needed in Phase 2 code
  • Clean separation: migration handles existing data, new code uses new schema

5. Client Validation Helper Functions

Approach

Standalone utility functions in shared module.

Module Structure

# src/gondulf/utils/validation.py
"""Client validation and utility functions."""
from urllib.parse import urlparse
import re

def mask_email(email: str) -> str:
    """
    Mask email for display: user@example.com -> u***@example.com

    Args:
        email: Email address to mask

    Returns:
        Masked email string
    """
    if '@' not in email:
        return email

    local, domain = email.split('@', 1)
    if len(local) <= 1:
        return email

    masked_local = local[0] + '***'
    return f"{masked_local}@{domain}"


def normalize_client_id(client_id: str) -> str:
    """
    Normalize client_id URL to canonical form.

    Rules:
    - Ensure https:// scheme
    - Remove default port (443)
    - Preserve path

    Args:
        client_id: Client ID URL

    Returns:
        Normalized client_id
    """
    parsed = urlparse(client_id)

    # Ensure https
    if parsed.scheme != 'https':
        raise ValueError("client_id must use https scheme")

    # Remove default HTTPS port
    netloc = parsed.netloc
    if netloc.endswith(':443'):
        netloc = netloc[:-4]

    # Reconstruct
    normalized = f"https://{netloc}{parsed.path}"
    if parsed.query:
        normalized += f"?{parsed.query}"
    if parsed.fragment:
        normalized += f"#{parsed.fragment}"

    return normalized


def validate_redirect_uri(redirect_uri: str, client_id: str) -> bool:
    """
    Validate redirect_uri against client_id per IndieAuth spec.

    Rules:
    - Must use https scheme (except localhost)
    - Must share same origin as client_id OR
    - Must be subdomain of client_id domain

    Args:
        redirect_uri: Redirect URI to validate
        client_id: Client ID for comparison

    Returns:
        True if valid, False otherwise
    """
    try:
        redirect_parsed = urlparse(redirect_uri)
        client_parsed = urlparse(client_id)

        # Check scheme (allow http for localhost only)
        if redirect_parsed.scheme != 'https':
            if redirect_parsed.hostname not in ('localhost', '127.0.0.1'):
                return False

        # Same origin check
        if (redirect_parsed.scheme == client_parsed.scheme and
            redirect_parsed.netloc == client_parsed.netloc):
            return True

        # Subdomain check
        redirect_host = redirect_parsed.hostname or ''
        client_host = client_parsed.hostname or ''

        # Must end with .{client_host}
        if redirect_host.endswith(f".{client_host}"):
            return True

        return False

    except Exception:
        return False

Usage:

from gondulf.utils.validation import mask_email, validate_redirect_uri, normalize_client_id

# In verification endpoint
masked = mask_email(discovered_email)

# In authorization endpoint
normalized_client = normalize_client_id(client_id)
if not validate_redirect_uri(redirect_uri, normalized_client):
    return error_response("invalid_redirect_uri")

6. Error Response Format Consistency

Approach

Use format appropriate to endpoint type.

Format Rules by Endpoint Type

Verification Endpoints (/verify/email, /verify/totp):

# Always return 200 OK with JSON
return JSONResponse(
    status_code=200,
    content={"success": False, "error": "invalid_code"}
)

Authorization Endpoint - Pre-Client Validation:

# Return HTML error page if client_id not yet validated
return templates.TemplateResponse("error.html", {
    "request": request,
    "error": "Missing required parameter: client_id",
    "error_code": "invalid_request"
}, status_code=400)

Authorization Endpoint - Post-Client Validation:

# Return OAuth redirect with error parameter
from urllib.parse import urlencode
error_params = {
    "error": "invalid_request",
    "error_description": "Missing code_challenge parameter",
    "state": request.query_params.get("state", "")
}
redirect_url = f"{redirect_uri}?{urlencode(error_params)}"
return RedirectResponse(url=redirect_url, status_code=302)

Token Endpoint (Phase 3):

# Always return JSON with appropriate status code
return JSONResponse(
    status_code=400,
    content={
        "error": "invalid_grant",
        "error_description": "Authorization code has expired"
    }
)

Error Flow Decision Tree

Is this a verification endpoint?
  YES -> Return JSON (200 OK) with success:false
  NO -> Continue

Has client_id been validated yet?
  NO -> Return HTML error page
  YES -> Continue

Is redirect_uri valid?
  NO -> Return HTML error page (can't redirect safely)
  YES -> Return OAuth redirect with error

7. Dependency Injection Pattern

Approach

Singleton services instantiated at startup in dependencies.py.

Implementation Structure

Dependencies Module:

# src/gondulf/dependencies.py
"""FastAPI dependency injection for services."""
from functools import lru_cache
from gondulf.config import get_config
from gondulf.database import DatabaseService
from gondulf.code_storage import CodeStorage
from gondulf.email_service import EmailService
from gondulf.dns_service import DNSService
from gondulf.html_fetcher import HTMLFetcherService
from gondulf.relme_parser import RelMeParser
from gondulf.verification_service import DomainVerificationService
from gondulf.rate_limiter import RateLimiter

# Configuration
@lru_cache()
def get_config_singleton():
    """Get singleton configuration instance."""
    return get_config()

# Phase 1 Services
@lru_cache()
def get_database():
    """Get singleton database service."""
    config = get_config_singleton()
    return DatabaseService(config.database_url)

@lru_cache()
def get_code_storage():
    """Get singleton code storage service."""
    return CodeStorage()

@lru_cache()
def get_email_service():
    """Get singleton email service."""
    config = get_config_singleton()
    return EmailService(
        smtp_host=config.smtp_host,
        smtp_port=config.smtp_port,
        smtp_username=config.smtp_username,
        smtp_password=config.smtp_password,
        from_address=config.smtp_from_address
    )

@lru_cache()
def get_dns_service():
    """Get singleton DNS service."""
    config = get_config_singleton()
    return DNSService(nameservers=config.dns_nameservers)

# Phase 2 Services
@lru_cache()
def get_html_fetcher():
    """Get singleton HTML fetcher service."""
    return HTMLFetcherService()

@lru_cache()
def get_relme_parser():
    """Get singleton rel=me parser service."""
    return RelMeParser()

@lru_cache()
def get_rate_limiter():
    """Get singleton rate limiter service."""
    return RateLimiter(max_attempts=3, window_hours=1)

@lru_cache()
def get_verification_service():
    """Get singleton domain verification service."""
    return DomainVerificationService(
        dns_service=get_dns_service(),
        email_service=get_email_service(),
        code_storage=get_code_storage(),
        html_fetcher=get_html_fetcher(),
        relme_parser=get_relme_parser()
    )

Usage in Endpoints:

from fastapi import Depends
from gondulf.dependencies import get_verification_service, get_rate_limiter

@app.post("/verify/email")
async def verify_email(
    domain: str,
    code: str,
    verification_service: DomainVerificationService = Depends(get_verification_service),
    rate_limiter: RateLimiter = Depends(get_rate_limiter)
):
    # Use injected services
    if not rate_limiter.check_rate_limit(domain):
        return {"success": False, "error": "rate_limit_exceeded"}

    result = verification_service.verify_email_code(domain, code)
    return {"success": result}

Rationale:

  • @lru_cache() ensures single instance per function
  • Services configured once at startup
  • Consistent with Phase 1 pattern
  • Simple to test (can override dependencies in tests)

8. Test Organization for Authorization Endpoint

Approach

Separate test files per major endpoint with shared fixtures.

File Structure

tests/
├── conftest.py                      # Shared fixtures and configuration
├── test_verification_endpoints.py   # Email/TOTP verification tests
└── test_authorization_endpoint.py   # Authorization flow tests

Shared Fixtures Module

# tests/conftest.py
import pytest
from fastapi.testclient import TestClient
from gondulf.main import app
from gondulf.dependencies import get_database, get_code_storage, get_rate_limiter

@pytest.fixture
def client():
    """FastAPI test client."""
    return TestClient(app)

@pytest.fixture
def mock_database():
    """Mock database service for testing."""
    # Create in-memory test database
    from gondulf.database import DatabaseService
    db = DatabaseService("sqlite:///:memory:")
    db.initialize()
    return db

@pytest.fixture
def mock_code_storage():
    """Mock code storage for testing."""
    from gondulf.code_storage import CodeStorage
    return CodeStorage()

@pytest.fixture
def mock_rate_limiter():
    """Mock rate limiter with clean state."""
    from gondulf.rate_limiter import RateLimiter
    return RateLimiter()

@pytest.fixture
def verified_domain(mock_database):
    """Fixture providing a pre-verified domain."""
    domain = "example.com"
    mock_database.store_verified_domain(
        domain=domain,
        email="user@example.com",
        two_factor=True
    )
    return domain

@pytest.fixture
def override_dependencies(mock_database, mock_code_storage, mock_rate_limiter):
    """Override FastAPI dependencies with test mocks."""
    app.dependency_overrides[get_database] = lambda: mock_database
    app.dependency_overrides[get_code_storage] = lambda: mock_code_storage
    app.dependency_overrides[get_rate_limiter] = lambda: mock_rate_limiter
    yield
    app.dependency_overrides.clear()

Verification Endpoints Tests

# tests/test_verification_endpoints.py
import pytest

class TestEmailVerification:
    """Tests for /verify/email endpoint."""

    def test_email_verification_success(self, client, override_dependencies):
        """Test successful email verification."""
        # Test implementation
        pass

    def test_email_verification_invalid_code(self, client, override_dependencies):
        """Test email verification with invalid code."""
        pass

    def test_email_verification_rate_limit(self, client, override_dependencies):
        """Test rate limiting on email verification."""
        pass

class TestTOTPVerification:
    """Tests for /verify/totp endpoint (future)."""
    pass

Authorization Endpoint Tests

# tests/test_authorization_endpoint.py
import pytest
from urllib.parse import parse_qs, urlparse

class TestAuthorizationEndpoint:
    """Tests for /authorize endpoint."""

    def test_authorize_missing_client_id(self, client, override_dependencies):
        """Test authorization with missing client_id parameter."""
        response = client.get("/authorize")
        assert response.status_code == 400
        assert "client_id" in response.text

    def test_authorize_invalid_redirect_uri(self, client, override_dependencies):
        """Test authorization with mismatched redirect_uri."""
        params = {
            "client_id": "https://client.example.com/",
            "redirect_uri": "https://evil.com/callback",
            "response_type": "code",
            "state": "test_state"
        }
        response = client.get("/authorize", params=params)
        assert response.status_code == 400

    def test_authorize_success_flow(self, client, override_dependencies, verified_domain):
        """Test complete successful authorization flow."""
        # Full flow test with verified domain
        params = {
            "client_id": "https://client.example.com/",
            "redirect_uri": "https://client.example.com/callback",
            "response_type": "code",
            "state": "test_state",
            "code_challenge": "test_challenge",
            "code_challenge_method": "S256",
            "me": f"https://{verified_domain}/"
        }
        response = client.get("/authorize", params=params, allow_redirects=False)
        assert response.status_code == 302

        # Verify redirect contains authorization code
        redirect_url = response.headers["location"]
        parsed = urlparse(redirect_url)
        query_params = parse_qs(parsed.query)
        assert "code" in query_params
        assert query_params["state"][0] == "test_state"

Test Organization Rules

  1. One test class per major functionality (email verification, authorization flow)
  2. Test complete flows, not internal methods (black box testing)
  3. Use shared fixtures for common setup (verified domains, mock services)
  4. Test both success and error paths
  5. Test security boundaries (rate limiting, invalid inputs, unauthorized access)

Summary

These implementation decisions provide the Developer with unambiguous direction for Phase 2 implementation. All decisions prioritize simplicity while maintaining security and specification compliance.

Key Principles Applied:

  • Real implementations over stubs (rate limiting, validation)
  • Reuse existing infrastructure (CodeStorage, dependency pattern)
  • Standard tools over custom solutions (Jinja2 templates)
  • Simple data structures (epoch integers, dictionaries)
  • Clear separation of concerns (utility functions, test organization)

Next Steps for Developer:

  1. Review this guide alongside Phase 2 design document
  2. Implement in the order specified by Phase 2 design
  3. Follow patterns and structures defined here
  4. Ask clarification questions if any ambiguity remains before implementation

All architectural decisions are now documented and ready for implementation.