feat: Implement secure token management for Micropub

Implements token security and management as specified in ADR-029:

Database Changes (BREAKING):
- Add secure tokens table with SHA256 hashed storage
- Add authorization_codes table for IndieAuth token exchange
- Drop old insecure tokens table (invalidates existing tokens)
- Update SCHEMA_SQL to match post-migration state

Token Management (starpunk/tokens.py):
- Generate cryptographically secure tokens
- Hash tokens with SHA256 for secure storage
- Create and verify access tokens
- Create and exchange authorization codes
- PKCE support (optional but recommended)
- Scope validation (V1: only 'create' scope)
- Token expiry and revocation support

Testing:
- Comprehensive test suite for all token operations
- Test authorization code replay protection
- Test PKCE validation
- Test parameter validation
- Test token expiry

Security:
- Tokens never stored in plain text
- Authorization codes single-use with replay protection
- Optional PKCE for enhanced security
- Proper UTC datetime handling for expiry

Related:
- ADR-029: Micropub IndieAuth Integration Strategy
- Migration 002: Secure tokens and authorization codes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-11-24 11:52:09 -07:00
parent e2333cb31d
commit 3b41029c75
5 changed files with 924 additions and 9 deletions

View File

@@ -0,0 +1,57 @@
-- Migration: Secure token storage and add authorization codes
-- Date: 2025-11-24
-- Version: 0.10.0 (BREAKING CHANGE)
-- ADR: ADR-029 Micropub IndieAuth Integration Strategy
--
-- SECURITY FIX: Migrate tokens table to use SHA256 hashed storage
-- BREAKING CHANGE: All existing tokens will be invalidated
--
-- This migration:
-- 1. Creates new secure tokens table with token_hash column
-- 2. Drops old insecure tokens table (invalidates all existing tokens)
-- 3. Creates authorization_codes table for IndieAuth token exchange
-- 4. Adds appropriate indexes for performance
-- Step 1: Drop the old insecure tokens table
-- This invalidates all existing tokens (necessary security fix)
DROP TABLE IF EXISTS tokens;
-- Step 2: Create new secure tokens table
CREATE TABLE tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
token_hash TEXT UNIQUE NOT NULL, -- SHA256 hash of token (never store plain text)
me TEXT NOT NULL, -- User identity URL
client_id TEXT, -- Client application URL
scope TEXT DEFAULT 'create', -- Granted scopes (V1: only 'create')
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL, -- Token expiration (90 days default)
last_used_at TIMESTAMP, -- Track last usage for auditing
revoked_at TIMESTAMP -- Soft revocation support
);
-- Step 3: Create authorization_codes table for token exchange
CREATE TABLE authorization_codes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code_hash TEXT UNIQUE NOT NULL, -- SHA256 hash of authorization code
me TEXT NOT NULL, -- User identity URL
client_id TEXT NOT NULL, -- Client application URL
redirect_uri TEXT NOT NULL, -- Client's redirect URI (must match on exchange)
scope TEXT, -- Requested scopes (can be empty per IndieAuth spec)
state TEXT, -- Client's state parameter
code_challenge TEXT, -- Optional PKCE code challenge
code_challenge_method TEXT, -- PKCE method (S256 if used)
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL, -- Short expiry (10 minutes default)
used_at TIMESTAMP -- Prevent replay attacks (code can only be used once)
);
-- Step 4: Create indexes for performance
CREATE INDEX idx_tokens_hash ON tokens(token_hash);
CREATE INDEX idx_tokens_me ON tokens(me);
CREATE INDEX idx_tokens_expires ON tokens(expires_at);
CREATE INDEX idx_auth_codes_hash ON authorization_codes(code_hash);
CREATE INDEX idx_auth_codes_expires ON authorization_codes(expires_at);
-- Migration complete
-- Security notice: All users must re-authenticate after this migration

View File

@@ -42,17 +42,41 @@ CREATE INDEX IF NOT EXISTS idx_sessions_token_hash ON sessions(session_token_has
CREATE INDEX IF NOT EXISTS idx_sessions_expires ON sessions(expires_at);
CREATE INDEX IF NOT EXISTS idx_sessions_me ON sessions(me);
-- Micropub access tokens
-- Micropub access tokens (secure storage with hashed tokens)
CREATE TABLE IF NOT EXISTS tokens (
token TEXT PRIMARY KEY,
id INTEGER PRIMARY KEY AUTOINCREMENT,
token_hash TEXT UNIQUE NOT NULL,
me TEXT NOT NULL,
client_id TEXT,
scope TEXT,
scope TEXT DEFAULT 'create',
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP
expires_at TIMESTAMP NOT NULL,
last_used_at TIMESTAMP,
revoked_at TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_tokens_hash ON tokens(token_hash);
CREATE INDEX IF NOT EXISTS idx_tokens_me ON tokens(me);
CREATE INDEX IF NOT EXISTS idx_tokens_expires ON tokens(expires_at);
-- Authorization codes for IndieAuth token exchange
CREATE TABLE IF NOT EXISTS authorization_codes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code_hash TEXT UNIQUE NOT NULL,
me TEXT NOT NULL,
client_id TEXT NOT NULL,
redirect_uri TEXT NOT NULL,
scope TEXT,
state TEXT,
code_challenge TEXT,
code_challenge_method TEXT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL,
used_at TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_auth_codes_hash ON authorization_codes(code_hash);
CREATE INDEX IF NOT EXISTS idx_auth_codes_expires ON authorization_codes(expires_at);
-- CSRF state tokens (for IndieAuth flow)
CREATE TABLE IF NOT EXISTS auth_state (

View File

@@ -52,7 +52,7 @@ def is_schema_current(conn):
Check if database schema is current (matches SCHEMA_SQL)
Uses heuristic: Check for presence of latest schema features
Currently checks for code_verifier column in auth_state table
Currently checks for authorization_codes table and token_hash column in tokens table
Args:
conn: SQLite connection
@@ -61,11 +61,17 @@ def is_schema_current(conn):
bool: True if schema appears current, False if legacy
"""
try:
cursor = conn.execute("PRAGMA table_info(auth_state)")
columns = [row[1] for row in cursor.fetchall()]
return 'code_verifier' in columns
# Check for authorization_codes table (added in migration 002)
if not table_exists(conn, 'authorization_codes'):
return False
# Check for token_hash column in tokens table (migration 002)
if not column_exists(conn, 'tokens', 'token_hash'):
return False
return True
except sqlite3.OperationalError:
# Table doesn't exist - definitely not current
# Schema check failed - definitely not current
return False

412
starpunk/tokens.py Normal file
View File

@@ -0,0 +1,412 @@
"""
Token management for Micropub IndieAuth integration
Handles:
- Access token generation and verification
- Authorization code generation and exchange
- Token hashing for secure storage (SHA256)
- Scope validation
- Token expiry management
Security:
- Tokens stored as SHA256 hashes (never plain text)
- Authorization codes use single-use pattern with replay protection
- Optional PKCE support for enhanced security
"""
import hashlib
import secrets
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from flask import current_app
# V1 supported scopes
SUPPORTED_SCOPES = ["create"]
DEFAULT_SCOPE = "create"
# Token and code expiry defaults
TOKEN_EXPIRY_DAYS = 90
AUTH_CODE_EXPIRY_MINUTES = 10
class TokenError(Exception):
"""Base exception for token-related errors"""
pass
class InvalidTokenError(TokenError):
"""Raised when token is invalid or expired"""
pass
class InvalidAuthorizationCodeError(TokenError):
"""Raised when authorization code is invalid, expired, or already used"""
pass
def generate_token() -> str:
"""
Generate a cryptographically secure random token
Returns:
URL-safe base64-encoded random token (43 characters)
"""
return secrets.token_urlsafe(32)
def hash_token(token: str) -> str:
"""
Generate SHA256 hash of token for secure storage
Args:
token: Plain text token
Returns:
Hexadecimal SHA256 hash
"""
return hashlib.sha256(token.encode()).hexdigest()
def create_access_token(me: str, client_id: str, scope: str) -> str:
"""
Create and store an access token in the database
Args:
me: User's identity URL
client_id: Client application URL
scope: Space-separated list of scopes
Returns:
Plain text access token (return to client, never logged or stored)
Raises:
TokenError: If token creation fails
"""
# Generate token
token = generate_token()
token_hash_value = hash_token(token)
# Calculate expiry
# Use UTC to match SQLite's datetime('now') which returns UTC
expires_at = (datetime.utcnow() + timedelta(days=TOKEN_EXPIRY_DAYS)).strftime('%Y-%m-%d %H:%M:%S')
# Store in database
from starpunk.database import get_db
try:
db = get_db(current_app)
db.execute("""
INSERT INTO tokens (token_hash, me, client_id, scope, expires_at)
VALUES (?, ?, ?, ?, ?)
""", (token_hash_value, me, client_id, scope, expires_at))
db.commit()
current_app.logger.info(
f"Created access token for client_id={client_id}, scope={scope}"
)
return token
except Exception as e:
current_app.logger.error(f"Failed to create access token: {e}")
raise TokenError(f"Failed to create access token: {e}")
def verify_token(token: str) -> Optional[Dict[str, Any]]:
"""
Verify an access token and return token information
Args:
token: Plain text token to verify
Returns:
Dictionary with token info: {me, client_id, scope}
None if token is invalid, expired, or revoked
"""
if not token:
return None
# Hash the token for lookup
token_hash_value = hash_token(token)
from starpunk.database import get_db
try:
db = get_db(current_app)
row = db.execute("""
SELECT me, client_id, scope, id
FROM tokens
WHERE token_hash = ?
AND expires_at > datetime('now')
AND revoked_at IS NULL
""", (token_hash_value,)).fetchone()
if row:
# Update last_used_at
db.execute("""
UPDATE tokens
SET last_used_at = datetime('now')
WHERE id = ?
""", (row['id'],))
db.commit()
return {
'me': row['me'],
'client_id': row['client_id'],
'scope': row['scope']
}
return None
except Exception as e:
current_app.logger.error(f"Token verification failed: {e}")
return None
def revoke_token(token: str) -> bool:
"""
Revoke an access token (soft deletion)
Args:
token: Plain text token to revoke
Returns:
True if token was revoked, False if not found
"""
token_hash_value = hash_token(token)
from starpunk.database import get_db
try:
db = get_db(current_app)
cursor = db.execute("""
UPDATE tokens
SET revoked_at = datetime('now')
WHERE token_hash = ?
AND revoked_at IS NULL
""", (token_hash_value,))
db.commit()
return cursor.rowcount > 0
except Exception as e:
current_app.logger.error(f"Token revocation failed: {e}")
return False
def create_authorization_code(
me: str,
client_id: str,
redirect_uri: str,
scope: str = "",
state: Optional[str] = None,
code_challenge: Optional[str] = None,
code_challenge_method: Optional[str] = None
) -> str:
"""
Create and store an authorization code for token exchange
Args:
me: User's identity URL
client_id: Client application URL
redirect_uri: Client's redirect URI (must match during exchange)
scope: Space-separated list of requested scopes (can be empty)
state: Client's state parameter (optional)
code_challenge: PKCE code challenge (optional)
code_challenge_method: PKCE method, typically 'S256' (optional)
Returns:
Plain text authorization code (return to client)
Raises:
TokenError: If code creation fails
"""
# Generate authorization code
code = generate_token()
code_hash_value = hash_token(code)
# Calculate expiry (short-lived)
# Use UTC to match SQLite's datetime('now') which returns UTC
expires_at = (datetime.utcnow() + timedelta(minutes=AUTH_CODE_EXPIRY_MINUTES)).strftime('%Y-%m-%d %H:%M:%S')
# Store in database
from starpunk.database import get_db
try:
db = get_db(current_app)
db.execute("""
INSERT INTO authorization_codes (
code_hash, me, client_id, redirect_uri, scope, state,
code_challenge, code_challenge_method, expires_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
code_hash_value, me, client_id, redirect_uri, scope, state,
code_challenge, code_challenge_method, expires_at
))
db.commit()
current_app.logger.info(
f"Created authorization code for client_id={client_id}, scope={scope}"
)
return code
except Exception as e:
current_app.logger.error(f"Failed to create authorization code: {e}")
raise TokenError(f"Failed to create authorization code: {e}")
def exchange_authorization_code(
code: str,
client_id: str,
redirect_uri: str,
me: str,
code_verifier: Optional[str] = None
) -> Dict[str, Any]:
"""
Exchange authorization code for access token
Args:
code: Authorization code to exchange
client_id: Client application URL (must match original request)
redirect_uri: Redirect URI (must match original request)
me: User's identity URL (must match original request)
code_verifier: PKCE verifier (required if code_challenge was provided)
Returns:
Dictionary with: {me, client_id, scope}
Raises:
InvalidAuthorizationCodeError: If code is invalid, expired, used, or validation fails
"""
if not code:
raise InvalidAuthorizationCodeError("No authorization code provided")
code_hash_value = hash_token(code)
from starpunk.database import get_db
try:
db = get_db(current_app)
# Look up authorization code
row = db.execute("""
SELECT me, client_id, redirect_uri, scope, code_challenge,
code_challenge_method, used_at
FROM authorization_codes
WHERE code_hash = ?
AND expires_at > datetime('now')
""", (code_hash_value,)).fetchone()
if not row:
raise InvalidAuthorizationCodeError(
"Authorization code is invalid or expired"
)
# Check if already used (prevent replay attacks)
if row['used_at']:
raise InvalidAuthorizationCodeError(
"Authorization code has already been used"
)
# Validate parameters match original authorization request
if row['client_id'] != client_id:
raise InvalidAuthorizationCodeError(
"client_id does not match authorization request"
)
if row['redirect_uri'] != redirect_uri:
raise InvalidAuthorizationCodeError(
"redirect_uri does not match authorization request"
)
if row['me'] != me:
raise InvalidAuthorizationCodeError(
"me parameter does not match authorization request"
)
# Validate PKCE if code_challenge was provided
if row['code_challenge']:
if not code_verifier:
raise InvalidAuthorizationCodeError(
"code_verifier required (PKCE was used during authorization)"
)
# Verify PKCE challenge
if row['code_challenge_method'] == 'S256':
# SHA256 hash of verifier
computed_challenge = hashlib.sha256(
code_verifier.encode()
).hexdigest()
else:
# Plain (not recommended, but spec allows it)
computed_challenge = code_verifier
if computed_challenge != row['code_challenge']:
raise InvalidAuthorizationCodeError(
"code_verifier does not match code_challenge"
)
# Mark code as used
db.execute("""
UPDATE authorization_codes
SET used_at = datetime('now')
WHERE code_hash = ?
""", (code_hash_value,))
db.commit()
# Return authorization info for token creation
return {
'me': row['me'],
'client_id': row['client_id'],
'scope': row['scope']
}
except InvalidAuthorizationCodeError:
# Re-raise validation errors
raise
except Exception as e:
current_app.logger.error(f"Authorization code exchange failed: {e}")
raise InvalidAuthorizationCodeError(f"Code exchange failed: {e}")
def validate_scope(requested_scope: str) -> str:
"""
Validate and filter requested scopes to supported ones
Args:
requested_scope: Space-separated list of requested scopes
Returns:
Space-separated list of valid scopes (may be empty)
"""
if not requested_scope:
return ""
requested = set(requested_scope.split())
supported = set(SUPPORTED_SCOPES)
valid_scopes = requested & supported
return " ".join(sorted(valid_scopes)) if valid_scopes else ""
def check_scope(required: str, granted: str) -> bool:
"""
Check if granted scopes include required scope
Args:
required: Required scope (single scope string)
granted: Granted scopes (space-separated string)
Returns:
True if required scope is in granted scopes
"""
if not granted:
# IndieAuth spec: no scope means no access
return False
granted_scopes = set(granted.split())
return required in granted_scopes

416
tests/test_tokens.py Normal file
View File

@@ -0,0 +1,416 @@
"""
Tests for token management module
Tests:
- Token generation and hashing
- Access token creation and verification
- Authorization code creation and exchange
- PKCE validation
- Scope validation
- Token expiry and revocation
"""
import pytest
from datetime import datetime, timedelta
from starpunk.tokens import (
generate_token,
hash_token,
create_access_token,
verify_token,
revoke_token,
create_authorization_code,
exchange_authorization_code,
validate_scope,
check_scope,
TokenError,
InvalidAuthorizationCodeError
)
def test_generate_token():
"""Test token generation produces unique random tokens"""
token1 = generate_token()
token2 = generate_token()
assert token1 != token2
assert len(token1) == 43 # URL-safe base64 of 32 bytes
assert len(token2) == 43
def test_hash_token():
"""Test token hashing is consistent and deterministic"""
token = "test_token_12345"
hash1 = hash_token(token)
hash2 = hash_token(token)
assert hash1 == hash2
assert len(hash1) == 64 # SHA256 hex is 64 chars
assert hash1 != token # Hash should not be plain text
def test_hash_token_different_inputs():
"""Test different tokens produce different hashes"""
token1 = "token1"
token2 = "token2"
hash1 = hash_token(token1)
hash2 = hash_token(token2)
assert hash1 != hash2
def test_create_access_token(app):
"""Test access token creation and storage"""
with app.app_context():
token = create_access_token(
me="https://user.example",
client_id="https://client.example",
scope="create"
)
# Verify token was returned
assert token is not None
assert len(token) == 43
# Verify token can be looked up
token_info = verify_token(token)
assert token_info is not None
assert token_info['me'] == "https://user.example"
assert token_info['client_id'] == "https://client.example"
assert token_info['scope'] == "create"
def test_verify_token_invalid(app):
"""Test verification fails for invalid token"""
with app.app_context():
# Verify with non-existent token
token_info = verify_token("invalid_token_12345")
assert token_info is None
def test_verify_token_expired(app):
"""Test verification fails for expired token"""
with app.app_context():
from starpunk.database import get_db
# Create expired token
token = generate_token()
token_hash_value = hash_token(token)
expired_at = (datetime.utcnow() - timedelta(days=1)).strftime('%Y-%m-%d %H:%M:%S')
db = get_db(app)
db.execute("""
INSERT INTO tokens (token_hash, me, client_id, scope, expires_at)
VALUES (?, ?, ?, ?, ?)
""", (token_hash_value, "https://user.example", "https://client.example",
"create", expired_at))
db.commit()
# Verify fails for expired token
token_info = verify_token(token)
assert token_info is None
def test_revoke_token(app):
"""Test token revocation"""
with app.app_context():
# Create token
token = create_access_token(
me="https://user.example",
client_id="https://client.example",
scope="create"
)
# Verify token works
assert verify_token(token) is not None
# Revoke token
result = revoke_token(token)
assert result is True
# Verify token no longer works
assert verify_token(token) is None
def test_revoke_nonexistent_token(app):
"""Test revoking non-existent token returns False"""
with app.app_context():
result = revoke_token("nonexistent_token")
assert result is False
def test_create_authorization_code(app):
"""Test authorization code creation"""
with app.app_context():
code = create_authorization_code(
me="https://user.example",
client_id="https://client.example",
redirect_uri="https://client.example/callback",
scope="create",
state="random_state_123"
)
assert code is not None
assert len(code) == 43
def test_exchange_authorization_code(app):
"""Test authorization code exchange for token"""
with app.app_context():
# Create authorization code
code = create_authorization_code(
me="https://user.example",
client_id="https://client.example",
redirect_uri="https://client.example/callback",
scope="create"
)
# Exchange code
auth_info = exchange_authorization_code(
code=code,
client_id="https://client.example",
redirect_uri="https://client.example/callback",
me="https://user.example"
)
assert auth_info['me'] == "https://user.example"
assert auth_info['client_id'] == "https://client.example"
assert auth_info['scope'] == "create"
def test_exchange_authorization_code_invalid(app):
"""Test exchange fails with invalid code"""
with app.app_context():
with pytest.raises(InvalidAuthorizationCodeError):
exchange_authorization_code(
code="invalid_code",
client_id="https://client.example",
redirect_uri="https://client.example/callback",
me="https://user.example"
)
def test_exchange_authorization_code_replay_protection(app):
"""Test authorization code can only be used once"""
with app.app_context():
# Create code
code = create_authorization_code(
me="https://user.example",
client_id="https://client.example",
redirect_uri="https://client.example/callback",
scope="create"
)
# First exchange succeeds
exchange_authorization_code(
code=code,
client_id="https://client.example",
redirect_uri="https://client.example/callback",
me="https://user.example"
)
# Second exchange fails (replay attack)
with pytest.raises(InvalidAuthorizationCodeError,
match="already been used"):
exchange_authorization_code(
code=code,
client_id="https://client.example",
redirect_uri="https://client.example/callback",
me="https://user.example"
)
def test_exchange_authorization_code_client_id_mismatch(app):
"""Test exchange fails if client_id doesn't match"""
with app.app_context():
code = create_authorization_code(
me="https://user.example",
client_id="https://client.example",
redirect_uri="https://client.example/callback",
scope="create"
)
with pytest.raises(InvalidAuthorizationCodeError,
match="client_id does not match"):
exchange_authorization_code(
code=code,
client_id="https://different-client.example",
redirect_uri="https://client.example/callback",
me="https://user.example"
)
def test_exchange_authorization_code_redirect_uri_mismatch(app):
"""Test exchange fails if redirect_uri doesn't match"""
with app.app_context():
code = create_authorization_code(
me="https://user.example",
client_id="https://client.example",
redirect_uri="https://client.example/callback",
scope="create"
)
with pytest.raises(InvalidAuthorizationCodeError,
match="redirect_uri does not match"):
exchange_authorization_code(
code=code,
client_id="https://client.example",
redirect_uri="https://client.example/different-callback",
me="https://user.example"
)
def test_exchange_authorization_code_me_mismatch(app):
"""Test exchange fails if me parameter doesn't match"""
with app.app_context():
code = create_authorization_code(
me="https://user.example",
client_id="https://client.example",
redirect_uri="https://client.example/callback",
scope="create"
)
with pytest.raises(InvalidAuthorizationCodeError,
match="me parameter does not match"):
exchange_authorization_code(
code=code,
client_id="https://client.example",
redirect_uri="https://client.example/callback",
me="https://different-user.example"
)
def test_pkce_code_challenge_validation(app):
"""Test PKCE code challenge/verifier validation"""
with app.app_context():
import hashlib
# Generate verifier and challenge
code_verifier = "test_verifier_with_enough_entropy_12345678"
code_challenge = hashlib.sha256(code_verifier.encode()).hexdigest()
# Create code with PKCE
code = create_authorization_code(
me="https://user.example",
client_id="https://client.example",
redirect_uri="https://client.example/callback",
scope="create",
code_challenge=code_challenge,
code_challenge_method="S256"
)
# Exchange with correct verifier succeeds
auth_info = exchange_authorization_code(
code=code,
client_id="https://client.example",
redirect_uri="https://client.example/callback",
me="https://user.example",
code_verifier=code_verifier
)
assert auth_info is not None
def test_pkce_missing_verifier(app):
"""Test PKCE exchange fails if verifier is missing"""
with app.app_context():
# Create code with PKCE
code = create_authorization_code(
me="https://user.example",
client_id="https://client.example",
redirect_uri="https://client.example/callback",
scope="create",
code_challenge="some_challenge",
code_challenge_method="S256"
)
# Exchange without verifier fails
with pytest.raises(InvalidAuthorizationCodeError,
match="code_verifier required"):
exchange_authorization_code(
code=code,
client_id="https://client.example",
redirect_uri="https://client.example/callback",
me="https://user.example"
)
def test_pkce_wrong_verifier(app):
"""Test PKCE exchange fails with wrong verifier"""
with app.app_context():
import hashlib
code_verifier = "correct_verifier"
code_challenge = hashlib.sha256(code_verifier.encode()).hexdigest()
# Create code with PKCE
code = create_authorization_code(
me="https://user.example",
client_id="https://client.example",
redirect_uri="https://client.example/callback",
scope="create",
code_challenge=code_challenge,
code_challenge_method="S256"
)
# Exchange with wrong verifier fails
with pytest.raises(InvalidAuthorizationCodeError,
match="code_verifier does not match"):
exchange_authorization_code(
code=code,
client_id="https://client.example",
redirect_uri="https://client.example/callback",
me="https://user.example",
code_verifier="wrong_verifier"
)
def test_validate_scope():
"""Test scope validation filters to supported scopes"""
# Valid scope
assert validate_scope("create") == "create"
# Empty scope
assert validate_scope("") == ""
# Unsupported scope filtered out
assert validate_scope("update delete") == ""
# Mixed valid and invalid scopes
assert validate_scope("create update delete") == "create"
def test_check_scope():
"""Test scope checking logic"""
# Scope granted
assert check_scope("create", "create") is True
assert check_scope("create", "create update") is True
# Scope not granted
assert check_scope("update", "create") is False
assert check_scope("create", "") is False
assert check_scope("create", None) is False
def test_empty_scope_authorization(app):
"""Test that empty scope is allowed during authorization per IndieAuth spec"""
with app.app_context():
# Create authorization code with empty scope
code = create_authorization_code(
me="https://user.example",
client_id="https://client.example",
redirect_uri="https://client.example/callback",
scope="" # Empty scope allowed
)
# Exchange should succeed
auth_info = exchange_authorization_code(
code=code,
client_id="https://client.example",
redirect_uri="https://client.example/callback",
me="https://user.example"
)
# But scope should be empty
assert auth_info['scope'] == ""