Implements token security and management as specified in ADR-029: Database Changes (BREAKING): - Add secure tokens table with SHA256 hashed storage - Add authorization_codes table for IndieAuth token exchange - Drop old insecure tokens table (invalidates existing tokens) - Update SCHEMA_SQL to match post-migration state Token Management (starpunk/tokens.py): - Generate cryptographically secure tokens - Hash tokens with SHA256 for secure storage - Create and verify access tokens - Create and exchange authorization codes - PKCE support (optional but recommended) - Scope validation (V1: only 'create' scope) - Token expiry and revocation support Testing: - Comprehensive test suite for all token operations - Test authorization code replay protection - Test PKCE validation - Test parameter validation - Test token expiry Security: - Tokens never stored in plain text - Authorization codes single-use with replay protection - Optional PKCE for enhanced security - Proper UTC datetime handling for expiry Related: - ADR-029: Micropub IndieAuth Integration Strategy - Migration 002: Secure tokens and authorization codes 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
417 lines
13 KiB
Python
417 lines
13 KiB
Python
"""
|
|
Tests for token management module
|
|
|
|
Tests:
|
|
- Token generation and hashing
|
|
- Access token creation and verification
|
|
- Authorization code creation and exchange
|
|
- PKCE validation
|
|
- Scope validation
|
|
- Token expiry and revocation
|
|
"""
|
|
|
|
import pytest
|
|
from datetime import datetime, timedelta
|
|
from starpunk.tokens import (
|
|
generate_token,
|
|
hash_token,
|
|
create_access_token,
|
|
verify_token,
|
|
revoke_token,
|
|
create_authorization_code,
|
|
exchange_authorization_code,
|
|
validate_scope,
|
|
check_scope,
|
|
TokenError,
|
|
InvalidAuthorizationCodeError
|
|
)
|
|
|
|
|
|
def test_generate_token():
|
|
"""Test token generation produces unique random tokens"""
|
|
token1 = generate_token()
|
|
token2 = generate_token()
|
|
|
|
assert token1 != token2
|
|
assert len(token1) == 43 # URL-safe base64 of 32 bytes
|
|
assert len(token2) == 43
|
|
|
|
|
|
def test_hash_token():
|
|
"""Test token hashing is consistent and deterministic"""
|
|
token = "test_token_12345"
|
|
|
|
hash1 = hash_token(token)
|
|
hash2 = hash_token(token)
|
|
|
|
assert hash1 == hash2
|
|
assert len(hash1) == 64 # SHA256 hex is 64 chars
|
|
assert hash1 != token # Hash should not be plain text
|
|
|
|
|
|
def test_hash_token_different_inputs():
|
|
"""Test different tokens produce different hashes"""
|
|
token1 = "token1"
|
|
token2 = "token2"
|
|
|
|
hash1 = hash_token(token1)
|
|
hash2 = hash_token(token2)
|
|
|
|
assert hash1 != hash2
|
|
|
|
|
|
def test_create_access_token(app):
|
|
"""Test access token creation and storage"""
|
|
with app.app_context():
|
|
token = create_access_token(
|
|
me="https://user.example",
|
|
client_id="https://client.example",
|
|
scope="create"
|
|
)
|
|
|
|
# Verify token was returned
|
|
assert token is not None
|
|
assert len(token) == 43
|
|
|
|
# Verify token can be looked up
|
|
token_info = verify_token(token)
|
|
assert token_info is not None
|
|
assert token_info['me'] == "https://user.example"
|
|
assert token_info['client_id'] == "https://client.example"
|
|
assert token_info['scope'] == "create"
|
|
|
|
|
|
def test_verify_token_invalid(app):
|
|
"""Test verification fails for invalid token"""
|
|
with app.app_context():
|
|
# Verify with non-existent token
|
|
token_info = verify_token("invalid_token_12345")
|
|
assert token_info is None
|
|
|
|
|
|
def test_verify_token_expired(app):
|
|
"""Test verification fails for expired token"""
|
|
with app.app_context():
|
|
from starpunk.database import get_db
|
|
|
|
# Create expired token
|
|
token = generate_token()
|
|
token_hash_value = hash_token(token)
|
|
expired_at = (datetime.utcnow() - timedelta(days=1)).strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
db = get_db(app)
|
|
db.execute("""
|
|
INSERT INTO tokens (token_hash, me, client_id, scope, expires_at)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (token_hash_value, "https://user.example", "https://client.example",
|
|
"create", expired_at))
|
|
db.commit()
|
|
|
|
# Verify fails for expired token
|
|
token_info = verify_token(token)
|
|
assert token_info is None
|
|
|
|
|
|
def test_revoke_token(app):
|
|
"""Test token revocation"""
|
|
with app.app_context():
|
|
# Create token
|
|
token = create_access_token(
|
|
me="https://user.example",
|
|
client_id="https://client.example",
|
|
scope="create"
|
|
)
|
|
|
|
# Verify token works
|
|
assert verify_token(token) is not None
|
|
|
|
# Revoke token
|
|
result = revoke_token(token)
|
|
assert result is True
|
|
|
|
# Verify token no longer works
|
|
assert verify_token(token) is None
|
|
|
|
|
|
def test_revoke_nonexistent_token(app):
|
|
"""Test revoking non-existent token returns False"""
|
|
with app.app_context():
|
|
result = revoke_token("nonexistent_token")
|
|
assert result is False
|
|
|
|
|
|
def test_create_authorization_code(app):
|
|
"""Test authorization code creation"""
|
|
with app.app_context():
|
|
code = create_authorization_code(
|
|
me="https://user.example",
|
|
client_id="https://client.example",
|
|
redirect_uri="https://client.example/callback",
|
|
scope="create",
|
|
state="random_state_123"
|
|
)
|
|
|
|
assert code is not None
|
|
assert len(code) == 43
|
|
|
|
|
|
def test_exchange_authorization_code(app):
|
|
"""Test authorization code exchange for token"""
|
|
with app.app_context():
|
|
# Create authorization code
|
|
code = create_authorization_code(
|
|
me="https://user.example",
|
|
client_id="https://client.example",
|
|
redirect_uri="https://client.example/callback",
|
|
scope="create"
|
|
)
|
|
|
|
# Exchange code
|
|
auth_info = exchange_authorization_code(
|
|
code=code,
|
|
client_id="https://client.example",
|
|
redirect_uri="https://client.example/callback",
|
|
me="https://user.example"
|
|
)
|
|
|
|
assert auth_info['me'] == "https://user.example"
|
|
assert auth_info['client_id'] == "https://client.example"
|
|
assert auth_info['scope'] == "create"
|
|
|
|
|
|
def test_exchange_authorization_code_invalid(app):
|
|
"""Test exchange fails with invalid code"""
|
|
with app.app_context():
|
|
with pytest.raises(InvalidAuthorizationCodeError):
|
|
exchange_authorization_code(
|
|
code="invalid_code",
|
|
client_id="https://client.example",
|
|
redirect_uri="https://client.example/callback",
|
|
me="https://user.example"
|
|
)
|
|
|
|
|
|
def test_exchange_authorization_code_replay_protection(app):
|
|
"""Test authorization code can only be used once"""
|
|
with app.app_context():
|
|
# Create code
|
|
code = create_authorization_code(
|
|
me="https://user.example",
|
|
client_id="https://client.example",
|
|
redirect_uri="https://client.example/callback",
|
|
scope="create"
|
|
)
|
|
|
|
# First exchange succeeds
|
|
exchange_authorization_code(
|
|
code=code,
|
|
client_id="https://client.example",
|
|
redirect_uri="https://client.example/callback",
|
|
me="https://user.example"
|
|
)
|
|
|
|
# Second exchange fails (replay attack)
|
|
with pytest.raises(InvalidAuthorizationCodeError,
|
|
match="already been used"):
|
|
exchange_authorization_code(
|
|
code=code,
|
|
client_id="https://client.example",
|
|
redirect_uri="https://client.example/callback",
|
|
me="https://user.example"
|
|
)
|
|
|
|
|
|
def test_exchange_authorization_code_client_id_mismatch(app):
|
|
"""Test exchange fails if client_id doesn't match"""
|
|
with app.app_context():
|
|
code = create_authorization_code(
|
|
me="https://user.example",
|
|
client_id="https://client.example",
|
|
redirect_uri="https://client.example/callback",
|
|
scope="create"
|
|
)
|
|
|
|
with pytest.raises(InvalidAuthorizationCodeError,
|
|
match="client_id does not match"):
|
|
exchange_authorization_code(
|
|
code=code,
|
|
client_id="https://different-client.example",
|
|
redirect_uri="https://client.example/callback",
|
|
me="https://user.example"
|
|
)
|
|
|
|
|
|
def test_exchange_authorization_code_redirect_uri_mismatch(app):
|
|
"""Test exchange fails if redirect_uri doesn't match"""
|
|
with app.app_context():
|
|
code = create_authorization_code(
|
|
me="https://user.example",
|
|
client_id="https://client.example",
|
|
redirect_uri="https://client.example/callback",
|
|
scope="create"
|
|
)
|
|
|
|
with pytest.raises(InvalidAuthorizationCodeError,
|
|
match="redirect_uri does not match"):
|
|
exchange_authorization_code(
|
|
code=code,
|
|
client_id="https://client.example",
|
|
redirect_uri="https://client.example/different-callback",
|
|
me="https://user.example"
|
|
)
|
|
|
|
|
|
def test_exchange_authorization_code_me_mismatch(app):
|
|
"""Test exchange fails if me parameter doesn't match"""
|
|
with app.app_context():
|
|
code = create_authorization_code(
|
|
me="https://user.example",
|
|
client_id="https://client.example",
|
|
redirect_uri="https://client.example/callback",
|
|
scope="create"
|
|
)
|
|
|
|
with pytest.raises(InvalidAuthorizationCodeError,
|
|
match="me parameter does not match"):
|
|
exchange_authorization_code(
|
|
code=code,
|
|
client_id="https://client.example",
|
|
redirect_uri="https://client.example/callback",
|
|
me="https://different-user.example"
|
|
)
|
|
|
|
|
|
def test_pkce_code_challenge_validation(app):
|
|
"""Test PKCE code challenge/verifier validation"""
|
|
with app.app_context():
|
|
import hashlib
|
|
|
|
# Generate verifier and challenge
|
|
code_verifier = "test_verifier_with_enough_entropy_12345678"
|
|
code_challenge = hashlib.sha256(code_verifier.encode()).hexdigest()
|
|
|
|
# Create code with PKCE
|
|
code = create_authorization_code(
|
|
me="https://user.example",
|
|
client_id="https://client.example",
|
|
redirect_uri="https://client.example/callback",
|
|
scope="create",
|
|
code_challenge=code_challenge,
|
|
code_challenge_method="S256"
|
|
)
|
|
|
|
# Exchange with correct verifier succeeds
|
|
auth_info = exchange_authorization_code(
|
|
code=code,
|
|
client_id="https://client.example",
|
|
redirect_uri="https://client.example/callback",
|
|
me="https://user.example",
|
|
code_verifier=code_verifier
|
|
)
|
|
|
|
assert auth_info is not None
|
|
|
|
|
|
def test_pkce_missing_verifier(app):
|
|
"""Test PKCE exchange fails if verifier is missing"""
|
|
with app.app_context():
|
|
# Create code with PKCE
|
|
code = create_authorization_code(
|
|
me="https://user.example",
|
|
client_id="https://client.example",
|
|
redirect_uri="https://client.example/callback",
|
|
scope="create",
|
|
code_challenge="some_challenge",
|
|
code_challenge_method="S256"
|
|
)
|
|
|
|
# Exchange without verifier fails
|
|
with pytest.raises(InvalidAuthorizationCodeError,
|
|
match="code_verifier required"):
|
|
exchange_authorization_code(
|
|
code=code,
|
|
client_id="https://client.example",
|
|
redirect_uri="https://client.example/callback",
|
|
me="https://user.example"
|
|
)
|
|
|
|
|
|
def test_pkce_wrong_verifier(app):
|
|
"""Test PKCE exchange fails with wrong verifier"""
|
|
with app.app_context():
|
|
import hashlib
|
|
|
|
code_verifier = "correct_verifier"
|
|
code_challenge = hashlib.sha256(code_verifier.encode()).hexdigest()
|
|
|
|
# Create code with PKCE
|
|
code = create_authorization_code(
|
|
me="https://user.example",
|
|
client_id="https://client.example",
|
|
redirect_uri="https://client.example/callback",
|
|
scope="create",
|
|
code_challenge=code_challenge,
|
|
code_challenge_method="S256"
|
|
)
|
|
|
|
# Exchange with wrong verifier fails
|
|
with pytest.raises(InvalidAuthorizationCodeError,
|
|
match="code_verifier does not match"):
|
|
exchange_authorization_code(
|
|
code=code,
|
|
client_id="https://client.example",
|
|
redirect_uri="https://client.example/callback",
|
|
me="https://user.example",
|
|
code_verifier="wrong_verifier"
|
|
)
|
|
|
|
|
|
def test_validate_scope():
|
|
"""Test scope validation filters to supported scopes"""
|
|
# Valid scope
|
|
assert validate_scope("create") == "create"
|
|
|
|
# Empty scope
|
|
assert validate_scope("") == ""
|
|
|
|
# Unsupported scope filtered out
|
|
assert validate_scope("update delete") == ""
|
|
|
|
# Mixed valid and invalid scopes
|
|
assert validate_scope("create update delete") == "create"
|
|
|
|
|
|
def test_check_scope():
|
|
"""Test scope checking logic"""
|
|
# Scope granted
|
|
assert check_scope("create", "create") is True
|
|
assert check_scope("create", "create update") is True
|
|
|
|
# Scope not granted
|
|
assert check_scope("update", "create") is False
|
|
assert check_scope("create", "") is False
|
|
assert check_scope("create", None) is False
|
|
|
|
|
|
def test_empty_scope_authorization(app):
|
|
"""Test that empty scope is allowed during authorization per IndieAuth spec"""
|
|
with app.app_context():
|
|
# Create authorization code with empty scope
|
|
code = create_authorization_code(
|
|
me="https://user.example",
|
|
client_id="https://client.example",
|
|
redirect_uri="https://client.example/callback",
|
|
scope="" # Empty scope allowed
|
|
)
|
|
|
|
# Exchange should succeed
|
|
auth_info = exchange_authorization_code(
|
|
code=code,
|
|
client_id="https://client.example",
|
|
redirect_uri="https://client.example/callback",
|
|
me="https://user.example"
|
|
)
|
|
|
|
# But scope should be empty
|
|
assert auth_info['scope'] == ""
|