Files
StarPunk/tests/test_auth.py
Phil Skentelbery 01e66a063e feat: Add detailed IndieAuth logging with security-aware token redaction
- Add logging helper functions with automatic token redaction
- Implement comprehensive logging throughout auth flow
- Add production warning for DEBUG logging
- Add 14 new tests for logging functionality
- Update version to v0.7.0

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-19 14:51:30 -07:00

886 lines
31 KiB
Python

"""
Tests for authentication module (starpunk/auth.py)
"""
import hashlib
import secrets
from datetime import datetime, timedelta
from unittest.mock import MagicMock, patch
import httpx
import pytest
from flask import g
from starpunk.auth import (
AuthError,
IndieLoginError,
InvalidStateError,
UnauthorizedError,
_cleanup_expired_sessions,
_generate_state_token,
_hash_token,
_log_http_request,
_log_http_response,
_redact_token,
_verify_state_token,
create_session,
destroy_session,
handle_callback,
initiate_login,
require_auth,
verify_session,
)
@pytest.fixture
def app(tmp_path):
"""Create Flask app for testing"""
from starpunk import create_app
# Create test-specific data directory
test_data_dir = tmp_path / "data"
test_data_dir.mkdir(parents=True, exist_ok=True)
app = create_app(
{
"TESTING": True,
"SITE_URL": "http://localhost:5000",
"ADMIN_ME": "https://example.com",
"SESSION_SECRET": secrets.token_hex(32),
"SESSION_LIFETIME": 30,
"INDIELOGIN_URL": "https://indielogin.com",
"DATA_PATH": test_data_dir,
"NOTES_PATH": test_data_dir / "notes",
"DATABASE_PATH": test_data_dir / "starpunk.db",
}
)
return app
@pytest.fixture
def db(app):
"""Get database connection"""
from starpunk.database import get_db
with app.app_context():
yield get_db(app)
@pytest.fixture
def client(app):
"""Get Flask test client"""
return app.test_client()
# Test helper functions
class TestHelpers:
def test_hash_token(self):
"""Test token hashing"""
token = "test-token-123"
expected = hashlib.sha256(token.encode()).hexdigest()
assert _hash_token(token) == expected
def test_hash_token_consistent(self):
"""Test that hashing is consistent"""
token = "test-token"
hash1 = _hash_token(token)
hash2 = _hash_token(token)
assert hash1 == hash2
def test_hash_token_different_inputs(self):
"""Test that different tokens produce different hashes"""
token1 = "token1"
token2 = "token2"
assert _hash_token(token1) != _hash_token(token2)
def test_generate_state_token(self):
"""Test state token generation"""
token = _generate_state_token()
assert isinstance(token, str)
assert len(token) > 0
def test_generate_state_token_unique(self):
"""Test that generated tokens are unique"""
tokens = [_generate_state_token() for _ in range(10)]
assert len(set(tokens)) == 10
class TestStateTokenVerification:
def test_verify_valid_state_token(self, app, db):
"""Test verifying a valid state token"""
with app.app_context():
state = secrets.token_urlsafe(32)
expires_at = datetime.utcnow() + timedelta(minutes=5)
db.execute(
"INSERT INTO auth_state (state, expires_at) VALUES (?, ?)",
(state, expires_at),
)
db.commit()
assert _verify_state_token(state) is True
# Token should be deleted after verification
result = db.execute(
"SELECT 1 FROM auth_state WHERE state = ?", (state,)
).fetchone()
assert result is None
def test_verify_invalid_state_token(self, app):
"""Test verifying an invalid state token"""
with app.app_context():
assert _verify_state_token("invalid-token") is False
def test_verify_expired_state_token(self, app, db):
"""Test verifying an expired state token"""
with app.app_context():
state = secrets.token_urlsafe(32)
expires_at = datetime.utcnow() - timedelta(minutes=5)
db.execute(
"INSERT INTO auth_state (state, expires_at) VALUES (?, ?)",
(state, expires_at),
)
db.commit()
assert _verify_state_token(state) is False
class TestCleanup:
def test_cleanup_expired_sessions(self, app, db):
"""Test cleanup of expired sessions"""
with app.app_context():
# Create expired session
token_hash = _hash_token("expired-token")
expires_at = datetime.utcnow() - timedelta(days=1)
db.execute(
"""
INSERT INTO sessions (session_token_hash, me, expires_at)
VALUES (?, ?, ?)
""",
(token_hash, "https://example.com", expires_at),
)
db.commit()
_cleanup_expired_sessions()
# Expired session should be deleted
result = db.execute(
"SELECT 1 FROM sessions WHERE session_token_hash = ?", (token_hash,)
).fetchone()
assert result is None
def test_cleanup_expired_auth_state(self, app, db):
"""Test cleanup of expired auth state"""
with app.app_context():
state = secrets.token_urlsafe(32)
expires_at = datetime.utcnow() - timedelta(minutes=10)
db.execute(
"INSERT INTO auth_state (state, expires_at) VALUES (?, ?)",
(state, expires_at),
)
db.commit()
_cleanup_expired_sessions()
# Expired state should be deleted
result = db.execute(
"SELECT 1 FROM auth_state WHERE state = ?", (state,)
).fetchone()
assert result is None
def test_cleanup_keeps_valid_sessions(self, app, db):
"""Test that cleanup keeps valid sessions"""
with app.app_context():
token_hash = _hash_token("valid-token")
expires_at = datetime.utcnow() + timedelta(days=30)
db.execute(
"""
INSERT INTO sessions (session_token_hash, me, expires_at)
VALUES (?, ?, ?)
""",
(token_hash, "https://example.com", expires_at),
)
db.commit()
_cleanup_expired_sessions()
# Valid session should still exist
result = db.execute(
"SELECT 1 FROM sessions WHERE session_token_hash = ?", (token_hash,)
).fetchone()
assert result is not None
class TestInitiateLogin:
def test_initiate_login_success(self, app, db):
"""Test successful login initiation"""
with app.app_context():
me_url = "https://example.com"
auth_url = initiate_login(me_url)
assert "indielogin.com/auth" in auth_url
assert "me=https%3A%2F%2Fexample.com" in auth_url
assert "client_id=" in auth_url
assert "redirect_uri=" in auth_url
assert "state=" in auth_url
assert "response_type=code" in auth_url
# State should be stored in database
result = db.execute("SELECT COUNT(*) as count FROM auth_state").fetchone()
assert result["count"] > 0
def test_initiate_login_invalid_url(self, app):
"""Test login initiation with invalid URL"""
with app.app_context():
with pytest.raises(ValueError, match="Invalid URL format"):
initiate_login("not-a-url")
def test_initiate_login_stores_state(self, app, db):
"""Test that state token is stored"""
with app.app_context():
me_url = "https://example.com"
auth_url = initiate_login(me_url)
# Extract state from URL
state_param = [p for p in auth_url.split("&") if p.startswith("state=")][0]
state = state_param.split("=")[1]
# State should exist in database
result = db.execute(
"SELECT expires_at FROM auth_state WHERE state = ?", (state,)
).fetchone()
assert result is not None
class TestHandleCallback:
@patch("starpunk.auth.httpx.post")
def test_handle_callback_success(self, mock_post, app, db, client):
"""Test successful callback handling"""
with app.test_request_context():
# Setup state token
state = secrets.token_urlsafe(32)
expires_at = datetime.utcnow() + timedelta(minutes=5)
db.execute(
"INSERT INTO auth_state (state, expires_at) VALUES (?, ?)",
(state, expires_at),
)
db.commit()
# Mock IndieLogin response
mock_response = MagicMock()
mock_response.json.return_value = {"me": "https://example.com"}
mock_post.return_value = mock_response
# Handle callback
code = "test-code"
session_token = handle_callback(code, state)
assert session_token is not None
assert isinstance(session_token, str)
# Session should be created
token_hash = _hash_token(session_token)
result = db.execute(
"SELECT me FROM sessions WHERE session_token_hash = ?", (token_hash,)
).fetchone()
assert result is not None
assert result["me"] == "https://example.com"
def test_handle_callback_invalid_state(self, app):
"""Test callback with invalid state"""
with app.app_context():
with pytest.raises(InvalidStateError):
handle_callback("code", "invalid-state")
@patch("starpunk.auth.httpx.post")
def test_handle_callback_unauthorized_user(self, mock_post, app, db):
"""Test callback with unauthorized user"""
with app.app_context():
# Setup state token
state = secrets.token_urlsafe(32)
expires_at = datetime.utcnow() + timedelta(minutes=5)
db.execute(
"INSERT INTO auth_state (state, expires_at) VALUES (?, ?)",
(state, expires_at),
)
db.commit()
# Mock IndieLogin response with different user
mock_response = MagicMock()
mock_response.json.return_value = {"me": "https://attacker.com"}
mock_post.return_value = mock_response
with pytest.raises(UnauthorizedError):
handle_callback("code", state)
@patch("starpunk.auth.httpx.post")
def test_handle_callback_indielogin_error(self, mock_post, app, db):
"""Test callback with IndieLogin error"""
with app.app_context():
# Setup state token
state = secrets.token_urlsafe(32)
expires_at = datetime.utcnow() + timedelta(minutes=5)
db.execute(
"INSERT INTO auth_state (state, expires_at) VALUES (?, ?)",
(state, expires_at),
)
db.commit()
# Mock IndieLogin error
mock_post.side_effect = httpx.RequestError("Connection failed")
with pytest.raises(IndieLoginError):
handle_callback("code", state)
@patch("starpunk.auth.httpx.post")
def test_handle_callback_no_identity(self, mock_post, app, db):
"""Test callback with no identity in response"""
with app.app_context():
# Setup state token
state = secrets.token_urlsafe(32)
expires_at = datetime.utcnow() + timedelta(minutes=5)
db.execute(
"INSERT INTO auth_state (state, expires_at) VALUES (?, ?)",
(state, expires_at),
)
db.commit()
# Mock IndieLogin response without 'me' field
mock_response = MagicMock()
mock_response.json.return_value = {}
mock_post.return_value = mock_response
with pytest.raises(IndieLoginError, match="No identity returned"):
handle_callback("code", state)
class TestCreateSession:
def test_create_session_success(self, app, db, client):
"""Test successful session creation"""
with app.test_request_context():
me = "https://example.com"
session_token = create_session(me)
assert session_token is not None
assert isinstance(session_token, str)
# Session should exist in database
token_hash = _hash_token(session_token)
result = db.execute(
"""
SELECT me, expires_at, created_at
FROM sessions
WHERE session_token_hash = ?
""",
(token_hash,),
).fetchone()
assert result is not None
assert result["me"] == me
assert result["expires_at"] is not None
def test_create_session_metadata(self, app, db, client):
"""Test that session stores metadata"""
with app.test_request_context(
headers={"User-Agent": "Test Browser"},
environ_base={"REMOTE_ADDR": "127.0.0.1"},
):
me = "https://example.com"
session_token = create_session(me)
token_hash = _hash_token(session_token)
result = db.execute(
"""
SELECT user_agent, ip_address
FROM sessions
WHERE session_token_hash = ?
""",
(token_hash,),
).fetchone()
assert result["user_agent"] == "Test Browser"
assert result["ip_address"] == "127.0.0.1"
class TestVerifySession:
def test_verify_valid_session(self, app, db, client):
"""Test verifying a valid session"""
with app.test_request_context():
# Create session
me = "https://example.com"
session_token = create_session(me)
# Verify session
session_info = verify_session(session_token)
assert session_info is not None
assert session_info["me"] == me
assert "created_at" in session_info
assert "expires_at" in session_info
def test_verify_invalid_session(self, app):
"""Test verifying an invalid session"""
with app.app_context():
session_info = verify_session("invalid-token")
assert session_info is None
def test_verify_expired_session(self, app, db):
"""Test verifying an expired session"""
with app.app_context():
# Create expired session
token = secrets.token_urlsafe(32)
token_hash = _hash_token(token)
expires_at = datetime.utcnow() - timedelta(days=1)
db.execute(
"""
INSERT INTO sessions (session_token_hash, me, expires_at)
VALUES (?, ?, ?)
""",
(token_hash, "https://example.com", expires_at),
)
db.commit()
session_info = verify_session(token)
assert session_info is None
def test_verify_session_updates_last_used(self, app, db, client):
"""Test that verification updates last_used_at"""
with app.test_request_context():
# Create session
me = "https://example.com"
session_token = create_session(me)
# Verify session
verify_session(session_token)
# Check last_used_at is set
token_hash = _hash_token(session_token)
result = db.execute(
"SELECT last_used_at FROM sessions WHERE session_token_hash = ?",
(token_hash,),
).fetchone()
assert result["last_used_at"] is not None
def test_verify_empty_token(self, app):
"""Test verifying empty token"""
with app.app_context():
assert verify_session("") is None
assert verify_session(None) is None
class TestDestroySession:
def test_destroy_session_success(self, app, db, client):
"""Test successful session destruction"""
with app.test_request_context():
# Create session
me = "https://example.com"
session_token = create_session(me)
# Destroy session
destroy_session(session_token)
# Session should no longer exist
token_hash = _hash_token(session_token)
result = db.execute(
"SELECT 1 FROM sessions WHERE session_token_hash = ?", (token_hash,)
).fetchone()
assert result is None
def test_destroy_invalid_session(self, app):
"""Test destroying an invalid session (should not raise error)"""
with app.app_context():
destroy_session("invalid-token") # Should not raise
def test_destroy_empty_token(self, app):
"""Test destroying empty token"""
with app.app_context():
destroy_session("") # Should not raise
destroy_session(None) # Should not raise
class TestRequireAuthDecorator:
def test_require_auth_with_valid_session(self, app, db, client):
"""Test require_auth decorator with valid session"""
with app.test_request_context():
# Create session
me = "https://example.com"
session_token = create_session(me)
# Create test route
@require_auth
def protected_route():
return "Protected content"
# Manually set cookie header
environ = {"HTTP_COOKIE": f"starpunk_session={session_token}"}
with app.test_request_context(environ_base=environ):
result = protected_route()
assert result == "Protected content"
assert hasattr(g, "user")
assert g.user["me"] == me
def test_require_auth_without_session(self, app, client):
"""Test require_auth decorator without session"""
# Create test route
@require_auth
def protected_route():
return "Protected content"
# Call protected route without session
with app.test_request_context():
with patch("starpunk.auth.redirect") as mock_redirect:
with patch("starpunk.auth.url_for") as mock_url_for:
mock_url_for.return_value = "/auth/login"
protected_route()
mock_redirect.assert_called_once()
def test_require_auth_with_expired_session(self, app, db, client):
"""Test require_auth decorator with expired session"""
# Create expired session
with app.app_context():
token = secrets.token_urlsafe(32)
token_hash = _hash_token(token)
expires_at = datetime.utcnow() - timedelta(days=1)
db.execute(
"""
INSERT INTO sessions (session_token_hash, me, expires_at)
VALUES (?, ?, ?)
""",
(token_hash, "https://example.com", expires_at),
)
db.commit()
# Create test route
@require_auth
def protected_route():
return "Protected content"
# Call protected route with expired session
environ = {"HTTP_COOKIE": f"starpunk_session={token}"}
with app.test_request_context(environ_base=environ):
with patch("starpunk.auth.redirect") as mock_redirect:
with patch("starpunk.auth.url_for") as mock_url_for:
mock_url_for.return_value = "/auth/login"
protected_route()
mock_redirect.assert_called_once()
class TestSecurityFeatures:
def test_token_hashing_prevents_plaintext_storage(self, app, db, client):
"""Test that tokens are hashed, not stored in plaintext"""
with app.test_request_context():
me = "https://example.com"
session_token = create_session(me)
# Database should not contain plaintext token
result = db.execute("SELECT session_token_hash FROM sessions").fetchone()
assert result["session_token_hash"] != session_token
assert len(result["session_token_hash"]) == 64 # SHA-256 hex length
def test_state_tokens_are_single_use(self, app, db):
"""Test that state tokens can only be used once"""
with app.app_context():
state = secrets.token_urlsafe(32)
expires_at = datetime.utcnow() + timedelta(minutes=5)
db.execute(
"INSERT INTO auth_state (state, expires_at) VALUES (?, ?)",
(state, expires_at),
)
db.commit()
# First verification should succeed
assert _verify_state_token(state) is True
# Second verification should fail (token deleted)
assert _verify_state_token(state) is False
def test_session_expiry(self, app, db, client):
"""Test that sessions expire correctly"""
with app.test_request_context():
# Create session with custom lifetime
app.config["SESSION_LIFETIME"] = 1 # 1 day
me = "https://example.com"
session_token = create_session(me)
token_hash = _hash_token(session_token)
result = db.execute(
"SELECT expires_at FROM sessions WHERE session_token_hash = ?",
(token_hash,),
).fetchone()
expires_at = datetime.fromisoformat(result["expires_at"])
created_at = datetime.utcnow()
# Should expire approximately 1 day from now
# (allow for minor timing differences)
delta = expires_at - created_at
assert delta.total_seconds() >= 86000 # At least 23.8 hours
assert delta.total_seconds() <= 86401 # At most 1 day + 1 second
class TestExceptionHierarchy:
def test_exception_inheritance(self):
"""Test that custom exceptions inherit correctly"""
assert issubclass(InvalidStateError, AuthError)
assert issubclass(UnauthorizedError, AuthError)
assert issubclass(IndieLoginError, AuthError)
assert issubclass(AuthError, Exception)
def test_exception_messages(self):
"""Test that exceptions can carry messages"""
error = InvalidStateError("Test message")
assert str(error) == "Test message"
error = UnauthorizedError("Unauthorized")
assert str(error) == "Unauthorized"
error = IndieLoginError("Service error")
assert str(error) == "Service error"
class TestLoggingHelpers:
def test_redact_token_normal(self):
"""Test token redaction for normal-length tokens"""
token = "abcdefghijklmnopqrstuvwxyz"
result = _redact_token(token, 6)
assert result == "abcdef...********...wxyz"
def test_redact_token_short(self):
"""Test token redaction for short tokens"""
token = "short"
result = _redact_token(token, 6)
assert result == "***REDACTED***"
def test_redact_token_empty(self):
"""Test token redaction for empty tokens"""
result = _redact_token("", 6)
assert result == "***REDACTED***"
result = _redact_token(None, 6)
assert result == "***REDACTED***"
def test_redact_token_custom_length(self):
"""Test token redaction with custom show_chars"""
token = "abcdefghijklmnopqrstuvwxyz"
result = _redact_token(token, 8)
assert result == "abcdefgh...********...wxyz"
def test_log_http_request_redacts_code(self, app, caplog):
"""Test that code parameter is redacted in request logs"""
import logging
with app.app_context():
# Set DEBUG level for logging
app.logger.setLevel(logging.DEBUG)
with caplog.at_level(logging.DEBUG):
_log_http_request(
method="POST",
url="https://indielogin.com/auth",
data={"code": "sensitive_code_12345"},
)
# Should log but with redacted code
assert "sensitive_code_12345" not in caplog.text
assert "sensit...********...2345" in caplog.text
def test_log_http_request_redacts_state(self, app, caplog):
"""Test that state parameter is redacted in request logs"""
import logging
with app.app_context():
app.logger.setLevel(logging.DEBUG)
with caplog.at_level(logging.DEBUG):
_log_http_request(
method="POST",
url="https://indielogin.com/auth",
data={"state": "state_token_123456789"},
)
# Should log but with redacted state (8 chars shown at start)
assert "state_token_123456789" not in caplog.text
assert "state_to...********...6789" in caplog.text
def test_log_http_request_not_logged_at_info(self, app, caplog):
"""Test that HTTP requests are not logged at INFO level"""
import logging
with app.app_context():
app.logger.setLevel(logging.INFO)
with caplog.at_level(logging.INFO):
_log_http_request(
method="POST",
url="https://indielogin.com/auth",
data={"code": "test_code"},
)
# Should not log anything
assert "IndieAuth HTTP Request" not in caplog.text
def test_log_http_response_redacts_tokens(self, app, caplog):
"""Test that response tokens are redacted"""
import logging
with app.app_context():
app.logger.setLevel(logging.DEBUG)
with caplog.at_level(logging.DEBUG):
_log_http_response(
status_code=200,
headers={"content-type": "application/json"},
body='{"access_token": "secret_token_xyz789"}',
)
# Should log but with redacted token
assert "secret_token_xyz789" not in caplog.text
assert "secret...********...z789" in caplog.text
def test_log_http_response_handles_non_json(self, app, caplog):
"""Test that non-JSON responses are logged as-is"""
import logging
with app.app_context():
app.logger.setLevel(logging.DEBUG)
with caplog.at_level(logging.DEBUG):
_log_http_response(
status_code=500, headers={}, body="Internal Server Error"
)
# Should log the plain text body
assert "Internal Server Error" in caplog.text
def test_log_http_response_redacts_sensitive_headers(self, app, caplog):
"""Test that sensitive headers are redacted"""
import logging
with app.app_context():
app.logger.setLevel(logging.DEBUG)
with caplog.at_level(logging.DEBUG):
_log_http_response(
status_code=200,
headers={
"content-type": "application/json",
"set-cookie": "sensitive_cookie",
"authorization": "Bearer token",
},
body='{"me": "https://example.com"}',
)
# Should log content-type but not sensitive headers
assert "content-type" in caplog.text
assert "set-cookie" not in caplog.text
assert "authorization" not in caplog.text
assert "sensitive_cookie" not in caplog.text
class TestLoggingIntegration:
def test_initiate_login_logs_at_debug(self, app, db, caplog):
"""Test that initiate_login logs at DEBUG level"""
import logging
with app.app_context():
app.logger.setLevel(logging.DEBUG)
with caplog.at_level(logging.DEBUG):
me_url = "https://example.com"
initiate_login(me_url)
# Should see DEBUG logs
assert "Validating me URL" in caplog.text
assert "Generated state token" in caplog.text
assert "Building authorization URL" in caplog.text
# Should see INFO log
assert "Authentication initiated" in caplog.text
def test_initiate_login_info_level(self, app, db, caplog):
"""Test that initiate_login only shows milestones at INFO level"""
import logging
with app.app_context():
app.logger.setLevel(logging.INFO)
with caplog.at_level(logging.INFO):
me_url = "https://example.com"
initiate_login(me_url)
# Should see INFO milestone
assert "Authentication initiated" in caplog.text
# Should NOT see DEBUG details
assert "Validating me URL" not in caplog.text
assert "Generated state token" not in caplog.text
@patch("starpunk.auth.httpx.post")
def test_handle_callback_logs_http_details(self, mock_post, app, db, client, caplog):
"""Test that handle_callback logs HTTP request/response at DEBUG"""
import logging
with app.test_request_context():
app.logger.setLevel(logging.DEBUG)
# Setup state token
state = secrets.token_urlsafe(32)
expires_at = datetime.utcnow() + timedelta(minutes=5)
db.execute(
"INSERT INTO auth_state (state, expires_at) VALUES (?, ?)",
(state, expires_at),
)
db.commit()
# Mock IndieLogin response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.headers = {"content-type": "application/json"}
mock_response.text = '{"me": "https://example.com"}'
mock_response.json.return_value = {"me": "https://example.com"}
mock_post.return_value = mock_response
with caplog.at_level(logging.DEBUG):
code = "test_authorization_code"
handle_callback(code, state)
# Should see HTTP request/response logs
assert "IndieAuth HTTP Request" in caplog.text
assert "IndieAuth HTTP Response" in caplog.text
# Code should be redacted
assert "test_authorization_code" not in caplog.text
assert "test_a...********...code" in caplog.text
def test_create_session_logs_details(self, app, db, client, caplog):
"""Test that create_session logs session details at DEBUG"""
import logging
with app.test_request_context():
app.logger.setLevel(logging.DEBUG)
with caplog.at_level(logging.DEBUG):
me = "https://example.com"
create_session(me)
# Should see DEBUG logs
assert "Session token generated" in caplog.text
assert "Session expiry" in caplog.text
assert "Request metadata" in caplog.text
# Should see INFO log
assert "Session created" in caplog.text