From 3b41029c75bcfae0c9dce7220e80565b60bafbe0 Mon Sep 17 00:00:00 2001 From: Phil Skentelbery Date: Mon, 24 Nov 2025 11:52:09 -0700 Subject: [PATCH] feat: Implement secure token management for Micropub MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements token security and management as specified in ADR-029: Database Changes (BREAKING): - Add secure tokens table with SHA256 hashed storage - Add authorization_codes table for IndieAuth token exchange - Drop old insecure tokens table (invalidates existing tokens) - Update SCHEMA_SQL to match post-migration state Token Management (starpunk/tokens.py): - Generate cryptographically secure tokens - Hash tokens with SHA256 for secure storage - Create and verify access tokens - Create and exchange authorization codes - PKCE support (optional but recommended) - Scope validation (V1: only 'create' scope) - Token expiry and revocation support Testing: - Comprehensive test suite for all token operations - Test authorization code replay protection - Test PKCE validation - Test parameter validation - Test token expiry Security: - Tokens never stored in plain text - Authorization codes single-use with replay protection - Optional PKCE for enhanced security - Proper UTC datetime handling for expiry Related: - ADR-029: Micropub IndieAuth Integration Strategy - Migration 002: Secure tokens and authorization codes 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- ..._secure_tokens_and_authorization_codes.sql | 57 +++ starpunk/database.py | 32 +- starpunk/migrations.py | 16 +- starpunk/tokens.py | 412 +++++++++++++++++ tests/test_tokens.py | 416 ++++++++++++++++++ 5 files changed, 924 insertions(+), 9 deletions(-) create mode 100644 migrations/002_secure_tokens_and_authorization_codes.sql create mode 100644 starpunk/tokens.py create mode 100644 tests/test_tokens.py diff --git a/migrations/002_secure_tokens_and_authorization_codes.sql b/migrations/002_secure_tokens_and_authorization_codes.sql new file mode 100644 index 0000000..b6b2934 --- /dev/null +++ b/migrations/002_secure_tokens_and_authorization_codes.sql @@ -0,0 +1,57 @@ +-- Migration: Secure token storage and add authorization codes +-- Date: 2025-11-24 +-- Version: 0.10.0 (BREAKING CHANGE) +-- ADR: ADR-029 Micropub IndieAuth Integration Strategy +-- +-- SECURITY FIX: Migrate tokens table to use SHA256 hashed storage +-- BREAKING CHANGE: All existing tokens will be invalidated +-- +-- This migration: +-- 1. Creates new secure tokens table with token_hash column +-- 2. Drops old insecure tokens table (invalidates all existing tokens) +-- 3. Creates authorization_codes table for IndieAuth token exchange +-- 4. Adds appropriate indexes for performance + +-- Step 1: Drop the old insecure tokens table +-- This invalidates all existing tokens (necessary security fix) +DROP TABLE IF EXISTS tokens; + +-- Step 2: Create new secure tokens table +CREATE TABLE tokens ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + token_hash TEXT UNIQUE NOT NULL, -- SHA256 hash of token (never store plain text) + me TEXT NOT NULL, -- User identity URL + client_id TEXT, -- Client application URL + scope TEXT DEFAULT 'create', -- Granted scopes (V1: only 'create') + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + expires_at TIMESTAMP NOT NULL, -- Token expiration (90 days default) + last_used_at TIMESTAMP, -- Track last usage for auditing + revoked_at TIMESTAMP -- Soft revocation support +); + +-- Step 3: Create authorization_codes table for token exchange +CREATE TABLE authorization_codes ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + code_hash TEXT UNIQUE NOT NULL, -- SHA256 hash of authorization code + me TEXT NOT NULL, -- User identity URL + client_id TEXT NOT NULL, -- Client application URL + redirect_uri TEXT NOT NULL, -- Client's redirect URI (must match on exchange) + scope TEXT, -- Requested scopes (can be empty per IndieAuth spec) + state TEXT, -- Client's state parameter + code_challenge TEXT, -- Optional PKCE code challenge + code_challenge_method TEXT, -- PKCE method (S256 if used) + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + expires_at TIMESTAMP NOT NULL, -- Short expiry (10 minutes default) + used_at TIMESTAMP -- Prevent replay attacks (code can only be used once) +); + +-- Step 4: Create indexes for performance +CREATE INDEX idx_tokens_hash ON tokens(token_hash); +CREATE INDEX idx_tokens_me ON tokens(me); +CREATE INDEX idx_tokens_expires ON tokens(expires_at); + +CREATE INDEX idx_auth_codes_hash ON authorization_codes(code_hash); +CREATE INDEX idx_auth_codes_expires ON authorization_codes(expires_at); + +-- Migration complete +-- Security notice: All users must re-authenticate after this migration diff --git a/starpunk/database.py b/starpunk/database.py index ccee6e5..2843a64 100644 --- a/starpunk/database.py +++ b/starpunk/database.py @@ -42,17 +42,41 @@ CREATE INDEX IF NOT EXISTS idx_sessions_token_hash ON sessions(session_token_has CREATE INDEX IF NOT EXISTS idx_sessions_expires ON sessions(expires_at); CREATE INDEX IF NOT EXISTS idx_sessions_me ON sessions(me); --- Micropub access tokens +-- Micropub access tokens (secure storage with hashed tokens) CREATE TABLE IF NOT EXISTS tokens ( - token TEXT PRIMARY KEY, + id INTEGER PRIMARY KEY AUTOINCREMENT, + token_hash TEXT UNIQUE NOT NULL, me TEXT NOT NULL, client_id TEXT, - scope TEXT, + scope TEXT DEFAULT 'create', created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - expires_at TIMESTAMP + expires_at TIMESTAMP NOT NULL, + last_used_at TIMESTAMP, + revoked_at TIMESTAMP ); +CREATE INDEX IF NOT EXISTS idx_tokens_hash ON tokens(token_hash); CREATE INDEX IF NOT EXISTS idx_tokens_me ON tokens(me); +CREATE INDEX IF NOT EXISTS idx_tokens_expires ON tokens(expires_at); + +-- Authorization codes for IndieAuth token exchange +CREATE TABLE IF NOT EXISTS authorization_codes ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + code_hash TEXT UNIQUE NOT NULL, + me TEXT NOT NULL, + client_id TEXT NOT NULL, + redirect_uri TEXT NOT NULL, + scope TEXT, + state TEXT, + code_challenge TEXT, + code_challenge_method TEXT, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + expires_at TIMESTAMP NOT NULL, + used_at TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_auth_codes_hash ON authorization_codes(code_hash); +CREATE INDEX IF NOT EXISTS idx_auth_codes_expires ON authorization_codes(expires_at); -- CSRF state tokens (for IndieAuth flow) CREATE TABLE IF NOT EXISTS auth_state ( diff --git a/starpunk/migrations.py b/starpunk/migrations.py index 3cf1a46..6147f56 100644 --- a/starpunk/migrations.py +++ b/starpunk/migrations.py @@ -52,7 +52,7 @@ def is_schema_current(conn): Check if database schema is current (matches SCHEMA_SQL) Uses heuristic: Check for presence of latest schema features - Currently checks for code_verifier column in auth_state table + Currently checks for authorization_codes table and token_hash column in tokens table Args: conn: SQLite connection @@ -61,11 +61,17 @@ def is_schema_current(conn): bool: True if schema appears current, False if legacy """ try: - cursor = conn.execute("PRAGMA table_info(auth_state)") - columns = [row[1] for row in cursor.fetchall()] - return 'code_verifier' in columns + # Check for authorization_codes table (added in migration 002) + if not table_exists(conn, 'authorization_codes'): + return False + + # Check for token_hash column in tokens table (migration 002) + if not column_exists(conn, 'tokens', 'token_hash'): + return False + + return True except sqlite3.OperationalError: - # Table doesn't exist - definitely not current + # Schema check failed - definitely not current return False diff --git a/starpunk/tokens.py b/starpunk/tokens.py new file mode 100644 index 0000000..e868c8f --- /dev/null +++ b/starpunk/tokens.py @@ -0,0 +1,412 @@ +""" +Token management for Micropub IndieAuth integration + +Handles: +- Access token generation and verification +- Authorization code generation and exchange +- Token hashing for secure storage (SHA256) +- Scope validation +- Token expiry management + +Security: +- Tokens stored as SHA256 hashes (never plain text) +- Authorization codes use single-use pattern with replay protection +- Optional PKCE support for enhanced security +""" + +import hashlib +import secrets +from datetime import datetime, timedelta +from typing import Optional, Dict, Any +from flask import current_app + + +# V1 supported scopes +SUPPORTED_SCOPES = ["create"] +DEFAULT_SCOPE = "create" + +# Token and code expiry defaults +TOKEN_EXPIRY_DAYS = 90 +AUTH_CODE_EXPIRY_MINUTES = 10 + + +class TokenError(Exception): + """Base exception for token-related errors""" + pass + + +class InvalidTokenError(TokenError): + """Raised when token is invalid or expired""" + pass + + +class InvalidAuthorizationCodeError(TokenError): + """Raised when authorization code is invalid, expired, or already used""" + pass + + +def generate_token() -> str: + """ + Generate a cryptographically secure random token + + Returns: + URL-safe base64-encoded random token (43 characters) + """ + return secrets.token_urlsafe(32) + + +def hash_token(token: str) -> str: + """ + Generate SHA256 hash of token for secure storage + + Args: + token: Plain text token + + Returns: + Hexadecimal SHA256 hash + """ + return hashlib.sha256(token.encode()).hexdigest() + + +def create_access_token(me: str, client_id: str, scope: str) -> str: + """ + Create and store an access token in the database + + Args: + me: User's identity URL + client_id: Client application URL + scope: Space-separated list of scopes + + Returns: + Plain text access token (return to client, never logged or stored) + + Raises: + TokenError: If token creation fails + """ + # Generate token + token = generate_token() + token_hash_value = hash_token(token) + + # Calculate expiry + # Use UTC to match SQLite's datetime('now') which returns UTC + expires_at = (datetime.utcnow() + timedelta(days=TOKEN_EXPIRY_DAYS)).strftime('%Y-%m-%d %H:%M:%S') + + # Store in database + from starpunk.database import get_db + + try: + db = get_db(current_app) + db.execute(""" + INSERT INTO tokens (token_hash, me, client_id, scope, expires_at) + VALUES (?, ?, ?, ?, ?) + """, (token_hash_value, me, client_id, scope, expires_at)) + db.commit() + + current_app.logger.info( + f"Created access token for client_id={client_id}, scope={scope}" + ) + + return token + + except Exception as e: + current_app.logger.error(f"Failed to create access token: {e}") + raise TokenError(f"Failed to create access token: {e}") + + +def verify_token(token: str) -> Optional[Dict[str, Any]]: + """ + Verify an access token and return token information + + Args: + token: Plain text token to verify + + Returns: + Dictionary with token info: {me, client_id, scope} + None if token is invalid, expired, or revoked + """ + if not token: + return None + + # Hash the token for lookup + token_hash_value = hash_token(token) + + from starpunk.database import get_db + + try: + db = get_db(current_app) + row = db.execute(""" + SELECT me, client_id, scope, id + FROM tokens + WHERE token_hash = ? + AND expires_at > datetime('now') + AND revoked_at IS NULL + """, (token_hash_value,)).fetchone() + + if row: + # Update last_used_at + db.execute(""" + UPDATE tokens + SET last_used_at = datetime('now') + WHERE id = ? + """, (row['id'],)) + db.commit() + + return { + 'me': row['me'], + 'client_id': row['client_id'], + 'scope': row['scope'] + } + + return None + + except Exception as e: + current_app.logger.error(f"Token verification failed: {e}") + return None + + +def revoke_token(token: str) -> bool: + """ + Revoke an access token (soft deletion) + + Args: + token: Plain text token to revoke + + Returns: + True if token was revoked, False if not found + """ + token_hash_value = hash_token(token) + + from starpunk.database import get_db + + try: + db = get_db(current_app) + cursor = db.execute(""" + UPDATE tokens + SET revoked_at = datetime('now') + WHERE token_hash = ? + AND revoked_at IS NULL + """, (token_hash_value,)) + db.commit() + + return cursor.rowcount > 0 + + except Exception as e: + current_app.logger.error(f"Token revocation failed: {e}") + return False + + +def create_authorization_code( + me: str, + client_id: str, + redirect_uri: str, + scope: str = "", + state: Optional[str] = None, + code_challenge: Optional[str] = None, + code_challenge_method: Optional[str] = None +) -> str: + """ + Create and store an authorization code for token exchange + + Args: + me: User's identity URL + client_id: Client application URL + redirect_uri: Client's redirect URI (must match during exchange) + scope: Space-separated list of requested scopes (can be empty) + state: Client's state parameter (optional) + code_challenge: PKCE code challenge (optional) + code_challenge_method: PKCE method, typically 'S256' (optional) + + Returns: + Plain text authorization code (return to client) + + Raises: + TokenError: If code creation fails + """ + # Generate authorization code + code = generate_token() + code_hash_value = hash_token(code) + + # Calculate expiry (short-lived) + # Use UTC to match SQLite's datetime('now') which returns UTC + expires_at = (datetime.utcnow() + timedelta(minutes=AUTH_CODE_EXPIRY_MINUTES)).strftime('%Y-%m-%d %H:%M:%S') + + # Store in database + from starpunk.database import get_db + + try: + db = get_db(current_app) + db.execute(""" + INSERT INTO authorization_codes ( + code_hash, me, client_id, redirect_uri, scope, state, + code_challenge, code_challenge_method, expires_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + code_hash_value, me, client_id, redirect_uri, scope, state, + code_challenge, code_challenge_method, expires_at + )) + db.commit() + + current_app.logger.info( + f"Created authorization code for client_id={client_id}, scope={scope}" + ) + + return code + + except Exception as e: + current_app.logger.error(f"Failed to create authorization code: {e}") + raise TokenError(f"Failed to create authorization code: {e}") + + +def exchange_authorization_code( + code: str, + client_id: str, + redirect_uri: str, + me: str, + code_verifier: Optional[str] = None +) -> Dict[str, Any]: + """ + Exchange authorization code for access token + + Args: + code: Authorization code to exchange + client_id: Client application URL (must match original request) + redirect_uri: Redirect URI (must match original request) + me: User's identity URL (must match original request) + code_verifier: PKCE verifier (required if code_challenge was provided) + + Returns: + Dictionary with: {me, client_id, scope} + + Raises: + InvalidAuthorizationCodeError: If code is invalid, expired, used, or validation fails + """ + if not code: + raise InvalidAuthorizationCodeError("No authorization code provided") + + code_hash_value = hash_token(code) + + from starpunk.database import get_db + + try: + db = get_db(current_app) + + # Look up authorization code + row = db.execute(""" + SELECT me, client_id, redirect_uri, scope, code_challenge, + code_challenge_method, used_at + FROM authorization_codes + WHERE code_hash = ? + AND expires_at > datetime('now') + """, (code_hash_value,)).fetchone() + + if not row: + raise InvalidAuthorizationCodeError( + "Authorization code is invalid or expired" + ) + + # Check if already used (prevent replay attacks) + if row['used_at']: + raise InvalidAuthorizationCodeError( + "Authorization code has already been used" + ) + + # Validate parameters match original authorization request + if row['client_id'] != client_id: + raise InvalidAuthorizationCodeError( + "client_id does not match authorization request" + ) + + if row['redirect_uri'] != redirect_uri: + raise InvalidAuthorizationCodeError( + "redirect_uri does not match authorization request" + ) + + if row['me'] != me: + raise InvalidAuthorizationCodeError( + "me parameter does not match authorization request" + ) + + # Validate PKCE if code_challenge was provided + if row['code_challenge']: + if not code_verifier: + raise InvalidAuthorizationCodeError( + "code_verifier required (PKCE was used during authorization)" + ) + + # Verify PKCE challenge + if row['code_challenge_method'] == 'S256': + # SHA256 hash of verifier + computed_challenge = hashlib.sha256( + code_verifier.encode() + ).hexdigest() + else: + # Plain (not recommended, but spec allows it) + computed_challenge = code_verifier + + if computed_challenge != row['code_challenge']: + raise InvalidAuthorizationCodeError( + "code_verifier does not match code_challenge" + ) + + # Mark code as used + db.execute(""" + UPDATE authorization_codes + SET used_at = datetime('now') + WHERE code_hash = ? + """, (code_hash_value,)) + db.commit() + + # Return authorization info for token creation + return { + 'me': row['me'], + 'client_id': row['client_id'], + 'scope': row['scope'] + } + + except InvalidAuthorizationCodeError: + # Re-raise validation errors + raise + + except Exception as e: + current_app.logger.error(f"Authorization code exchange failed: {e}") + raise InvalidAuthorizationCodeError(f"Code exchange failed: {e}") + + +def validate_scope(requested_scope: str) -> str: + """ + Validate and filter requested scopes to supported ones + + Args: + requested_scope: Space-separated list of requested scopes + + Returns: + Space-separated list of valid scopes (may be empty) + """ + if not requested_scope: + return "" + + requested = set(requested_scope.split()) + supported = set(SUPPORTED_SCOPES) + valid_scopes = requested & supported + + return " ".join(sorted(valid_scopes)) if valid_scopes else "" + + +def check_scope(required: str, granted: str) -> bool: + """ + Check if granted scopes include required scope + + Args: + required: Required scope (single scope string) + granted: Granted scopes (space-separated string) + + Returns: + True if required scope is in granted scopes + """ + if not granted: + # IndieAuth spec: no scope means no access + return False + + granted_scopes = set(granted.split()) + return required in granted_scopes diff --git a/tests/test_tokens.py b/tests/test_tokens.py new file mode 100644 index 0000000..090b6fc --- /dev/null +++ b/tests/test_tokens.py @@ -0,0 +1,416 @@ +""" +Tests for token management module + +Tests: +- Token generation and hashing +- Access token creation and verification +- Authorization code creation and exchange +- PKCE validation +- Scope validation +- Token expiry and revocation +""" + +import pytest +from datetime import datetime, timedelta +from starpunk.tokens import ( + generate_token, + hash_token, + create_access_token, + verify_token, + revoke_token, + create_authorization_code, + exchange_authorization_code, + validate_scope, + check_scope, + TokenError, + InvalidAuthorizationCodeError +) + + +def test_generate_token(): + """Test token generation produces unique random tokens""" + token1 = generate_token() + token2 = generate_token() + + assert token1 != token2 + assert len(token1) == 43 # URL-safe base64 of 32 bytes + assert len(token2) == 43 + + +def test_hash_token(): + """Test token hashing is consistent and deterministic""" + token = "test_token_12345" + + hash1 = hash_token(token) + hash2 = hash_token(token) + + assert hash1 == hash2 + assert len(hash1) == 64 # SHA256 hex is 64 chars + assert hash1 != token # Hash should not be plain text + + +def test_hash_token_different_inputs(): + """Test different tokens produce different hashes""" + token1 = "token1" + token2 = "token2" + + hash1 = hash_token(token1) + hash2 = hash_token(token2) + + assert hash1 != hash2 + + +def test_create_access_token(app): + """Test access token creation and storage""" + with app.app_context(): + token = create_access_token( + me="https://user.example", + client_id="https://client.example", + scope="create" + ) + + # Verify token was returned + assert token is not None + assert len(token) == 43 + + # Verify token can be looked up + token_info = verify_token(token) + assert token_info is not None + assert token_info['me'] == "https://user.example" + assert token_info['client_id'] == "https://client.example" + assert token_info['scope'] == "create" + + +def test_verify_token_invalid(app): + """Test verification fails for invalid token""" + with app.app_context(): + # Verify with non-existent token + token_info = verify_token("invalid_token_12345") + assert token_info is None + + +def test_verify_token_expired(app): + """Test verification fails for expired token""" + with app.app_context(): + from starpunk.database import get_db + + # Create expired token + token = generate_token() + token_hash_value = hash_token(token) + expired_at = (datetime.utcnow() - timedelta(days=1)).strftime('%Y-%m-%d %H:%M:%S') + + db = get_db(app) + db.execute(""" + INSERT INTO tokens (token_hash, me, client_id, scope, expires_at) + VALUES (?, ?, ?, ?, ?) + """, (token_hash_value, "https://user.example", "https://client.example", + "create", expired_at)) + db.commit() + + # Verify fails for expired token + token_info = verify_token(token) + assert token_info is None + + +def test_revoke_token(app): + """Test token revocation""" + with app.app_context(): + # Create token + token = create_access_token( + me="https://user.example", + client_id="https://client.example", + scope="create" + ) + + # Verify token works + assert verify_token(token) is not None + + # Revoke token + result = revoke_token(token) + assert result is True + + # Verify token no longer works + assert verify_token(token) is None + + +def test_revoke_nonexistent_token(app): + """Test revoking non-existent token returns False""" + with app.app_context(): + result = revoke_token("nonexistent_token") + assert result is False + + +def test_create_authorization_code(app): + """Test authorization code creation""" + with app.app_context(): + code = create_authorization_code( + me="https://user.example", + client_id="https://client.example", + redirect_uri="https://client.example/callback", + scope="create", + state="random_state_123" + ) + + assert code is not None + assert len(code) == 43 + + +def test_exchange_authorization_code(app): + """Test authorization code exchange for token""" + with app.app_context(): + # Create authorization code + code = create_authorization_code( + me="https://user.example", + client_id="https://client.example", + redirect_uri="https://client.example/callback", + scope="create" + ) + + # Exchange code + auth_info = exchange_authorization_code( + code=code, + client_id="https://client.example", + redirect_uri="https://client.example/callback", + me="https://user.example" + ) + + assert auth_info['me'] == "https://user.example" + assert auth_info['client_id'] == "https://client.example" + assert auth_info['scope'] == "create" + + +def test_exchange_authorization_code_invalid(app): + """Test exchange fails with invalid code""" + with app.app_context(): + with pytest.raises(InvalidAuthorizationCodeError): + exchange_authorization_code( + code="invalid_code", + client_id="https://client.example", + redirect_uri="https://client.example/callback", + me="https://user.example" + ) + + +def test_exchange_authorization_code_replay_protection(app): + """Test authorization code can only be used once""" + with app.app_context(): + # Create code + code = create_authorization_code( + me="https://user.example", + client_id="https://client.example", + redirect_uri="https://client.example/callback", + scope="create" + ) + + # First exchange succeeds + exchange_authorization_code( + code=code, + client_id="https://client.example", + redirect_uri="https://client.example/callback", + me="https://user.example" + ) + + # Second exchange fails (replay attack) + with pytest.raises(InvalidAuthorizationCodeError, + match="already been used"): + exchange_authorization_code( + code=code, + client_id="https://client.example", + redirect_uri="https://client.example/callback", + me="https://user.example" + ) + + +def test_exchange_authorization_code_client_id_mismatch(app): + """Test exchange fails if client_id doesn't match""" + with app.app_context(): + code = create_authorization_code( + me="https://user.example", + client_id="https://client.example", + redirect_uri="https://client.example/callback", + scope="create" + ) + + with pytest.raises(InvalidAuthorizationCodeError, + match="client_id does not match"): + exchange_authorization_code( + code=code, + client_id="https://different-client.example", + redirect_uri="https://client.example/callback", + me="https://user.example" + ) + + +def test_exchange_authorization_code_redirect_uri_mismatch(app): + """Test exchange fails if redirect_uri doesn't match""" + with app.app_context(): + code = create_authorization_code( + me="https://user.example", + client_id="https://client.example", + redirect_uri="https://client.example/callback", + scope="create" + ) + + with pytest.raises(InvalidAuthorizationCodeError, + match="redirect_uri does not match"): + exchange_authorization_code( + code=code, + client_id="https://client.example", + redirect_uri="https://client.example/different-callback", + me="https://user.example" + ) + + +def test_exchange_authorization_code_me_mismatch(app): + """Test exchange fails if me parameter doesn't match""" + with app.app_context(): + code = create_authorization_code( + me="https://user.example", + client_id="https://client.example", + redirect_uri="https://client.example/callback", + scope="create" + ) + + with pytest.raises(InvalidAuthorizationCodeError, + match="me parameter does not match"): + exchange_authorization_code( + code=code, + client_id="https://client.example", + redirect_uri="https://client.example/callback", + me="https://different-user.example" + ) + + +def test_pkce_code_challenge_validation(app): + """Test PKCE code challenge/verifier validation""" + with app.app_context(): + import hashlib + + # Generate verifier and challenge + code_verifier = "test_verifier_with_enough_entropy_12345678" + code_challenge = hashlib.sha256(code_verifier.encode()).hexdigest() + + # Create code with PKCE + code = create_authorization_code( + me="https://user.example", + client_id="https://client.example", + redirect_uri="https://client.example/callback", + scope="create", + code_challenge=code_challenge, + code_challenge_method="S256" + ) + + # Exchange with correct verifier succeeds + auth_info = exchange_authorization_code( + code=code, + client_id="https://client.example", + redirect_uri="https://client.example/callback", + me="https://user.example", + code_verifier=code_verifier + ) + + assert auth_info is not None + + +def test_pkce_missing_verifier(app): + """Test PKCE exchange fails if verifier is missing""" + with app.app_context(): + # Create code with PKCE + code = create_authorization_code( + me="https://user.example", + client_id="https://client.example", + redirect_uri="https://client.example/callback", + scope="create", + code_challenge="some_challenge", + code_challenge_method="S256" + ) + + # Exchange without verifier fails + with pytest.raises(InvalidAuthorizationCodeError, + match="code_verifier required"): + exchange_authorization_code( + code=code, + client_id="https://client.example", + redirect_uri="https://client.example/callback", + me="https://user.example" + ) + + +def test_pkce_wrong_verifier(app): + """Test PKCE exchange fails with wrong verifier""" + with app.app_context(): + import hashlib + + code_verifier = "correct_verifier" + code_challenge = hashlib.sha256(code_verifier.encode()).hexdigest() + + # Create code with PKCE + code = create_authorization_code( + me="https://user.example", + client_id="https://client.example", + redirect_uri="https://client.example/callback", + scope="create", + code_challenge=code_challenge, + code_challenge_method="S256" + ) + + # Exchange with wrong verifier fails + with pytest.raises(InvalidAuthorizationCodeError, + match="code_verifier does not match"): + exchange_authorization_code( + code=code, + client_id="https://client.example", + redirect_uri="https://client.example/callback", + me="https://user.example", + code_verifier="wrong_verifier" + ) + + +def test_validate_scope(): + """Test scope validation filters to supported scopes""" + # Valid scope + assert validate_scope("create") == "create" + + # Empty scope + assert validate_scope("") == "" + + # Unsupported scope filtered out + assert validate_scope("update delete") == "" + + # Mixed valid and invalid scopes + assert validate_scope("create update delete") == "create" + + +def test_check_scope(): + """Test scope checking logic""" + # Scope granted + assert check_scope("create", "create") is True + assert check_scope("create", "create update") is True + + # Scope not granted + assert check_scope("update", "create") is False + assert check_scope("create", "") is False + assert check_scope("create", None) is False + + +def test_empty_scope_authorization(app): + """Test that empty scope is allowed during authorization per IndieAuth spec""" + with app.app_context(): + # Create authorization code with empty scope + code = create_authorization_code( + me="https://user.example", + client_id="https://client.example", + redirect_uri="https://client.example/callback", + scope="" # Empty scope allowed + ) + + # Exchange should succeed + auth_info = exchange_authorization_code( + code=code, + client_id="https://client.example", + redirect_uri="https://client.example/callback", + me="https://user.example" + ) + + # But scope should be empty + assert auth_info['scope'] == ""