feat: Add detailed IndieAuth logging with security-aware token redaction

- Add logging helper functions with automatic token redaction
- Implement comprehensive logging throughout auth flow
- Add production warning for DEBUG logging
- Add 14 new tests for logging functionality
- Update version to v0.7.0

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-11-19 14:51:30 -07:00
parent 8be079593f
commit 01e66a063e
9 changed files with 2887 additions and 13 deletions

View File

@@ -3,9 +3,54 @@ StarPunk package initialization
Creates and configures the Flask application
"""
import logging
from flask import Flask
def configure_logging(app):
"""
Configure application logging based on LOG_LEVEL
Args:
app: Flask application instance
"""
log_level = app.config.get("LOG_LEVEL", "INFO").upper()
# Set Flask logger level
app.logger.setLevel(getattr(logging, log_level, logging.INFO))
# Configure handler with detailed format for DEBUG
handler = logging.StreamHandler()
if log_level == "DEBUG":
formatter = logging.Formatter(
"[%(asctime)s] %(levelname)s - %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# Warn if DEBUG enabled in production
if not app.debug and app.config.get("ENV") != "development":
app.logger.warning(
"=" * 70
+ "\n"
+ "WARNING: DEBUG logging enabled in production!\n"
+ "This logs detailed HTTP requests/responses.\n"
+ "Sensitive data is redacted, but consider using INFO level.\n"
+ "Set LOG_LEVEL=INFO in production for normal operation.\n"
+ "=" * 70
)
else:
formatter = logging.Formatter(
"[%(asctime)s] %(levelname)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
handler.setFormatter(formatter)
# Remove existing handlers and add our configured handler
app.logger.handlers.clear()
app.logger.addHandler(handler)
def create_app(config=None):
"""
Application factory for StarPunk
@@ -23,6 +68,9 @@ def create_app(config=None):
load_config(app, config)
# Configure logging
configure_logging(app)
# Initialize database
from starpunk.database import init_db
@@ -105,5 +153,5 @@ def create_app(config=None):
# Package version (Semantic Versioning 2.0.0)
# See docs/standards/versioning-strategy.md for details
__version__ = "0.6.2"
__version_info__ = (0, 6, 2)
__version__ = "0.7.0"
__version_info__ = (0, 7, 0)

View File

@@ -28,6 +28,7 @@ Exceptions:
"""
import hashlib
import logging
import secrets
from datetime import datetime, timedelta
from functools import wraps
@@ -66,6 +67,106 @@ class IndieLoginError(AuthError):
pass
# Logging helper functions
def _redact_token(value: str, show_chars: int = 6) -> str:
"""
Redact sensitive token for logging
Shows first N and last 4 characters with asterisks in between.
Args:
value: Token to redact
show_chars: Number of characters to show at start (default: 6)
Returns:
Redacted token string like "abc123...********...xyz9"
"""
if not value or len(value) <= (show_chars + 4):
return "***REDACTED***"
return f"{value[:show_chars]}...{'*' * 8}...{value[-4:]}"
def _log_http_request(method: str, url: str, data: dict, headers: dict = None) -> None:
"""
Log HTTP request details at DEBUG level
Automatically redacts sensitive parameters (code, state, authorization)
Args:
method: HTTP method (GET, POST, etc.)
url: Request URL
data: Request data/parameters
headers: Optional request headers
"""
if not current_app.logger.isEnabledFor(logging.DEBUG):
return
# Redact sensitive data
safe_data = data.copy()
if "code" in safe_data:
safe_data["code"] = _redact_token(safe_data["code"])
if "state" in safe_data:
safe_data["state"] = _redact_token(safe_data["state"], 8)
current_app.logger.debug(
f"IndieAuth HTTP Request:\n"
f" Method: {method}\n"
f" URL: {url}\n"
f" Data: {safe_data}"
)
if headers:
safe_headers = {
k: v
for k, v in headers.items()
if k.lower() not in ["authorization", "cookie"]
}
current_app.logger.debug(f" Headers: {safe_headers}")
def _log_http_response(status_code: int, headers: dict, body: str) -> None:
"""
Log HTTP response details at DEBUG level
Automatically redacts sensitive response data
Args:
status_code: HTTP status code
headers: Response headers
body: Response body (JSON string or text)
"""
if not current_app.logger.isEnabledFor(logging.DEBUG):
return
# Parse and redact JSON body if present
safe_body = body
try:
import json
data = json.loads(body)
if "access_token" in data:
data["access_token"] = _redact_token(data["access_token"])
if "code" in data:
data["code"] = _redact_token(data["code"])
safe_body = json.dumps(data, indent=2)
except (json.JSONDecodeError, TypeError):
# Not JSON or parsing failed, log as-is (likely error message)
pass
# Redact sensitive headers
safe_headers = {
k: v for k, v in headers.items() if k.lower() not in ["set-cookie", "authorization"]
}
current_app.logger.debug(
f"IndieAuth HTTP Response:\n"
f" Status: {status_code}\n"
f" Headers: {safe_headers}\n"
f" Body: {safe_body}"
)
# Helper functions
def _hash_token(token: str) -> str:
"""
@@ -162,8 +263,11 @@ def initiate_login(me_url: str) -> str:
if not is_valid_url(me_url):
raise ValueError(f"Invalid URL format: {me_url}")
current_app.logger.debug(f"Auth: Validating me URL: {me_url}")
# Generate CSRF state token
state = _generate_state_token()
current_app.logger.debug(f"Auth: Generated state token: {_redact_token(state, 8)}")
# Store state in database (5-minute expiry)
db = get_db(current_app)
@@ -188,10 +292,20 @@ def initiate_login(me_url: str) -> str:
"response_type": "code",
}
current_app.logger.debug(
f"Auth: Building authorization URL with params: {{\n"
f" 'me': '{me_url}',\n"
f" 'client_id': '{current_app.config['SITE_URL']}',\n"
f" 'redirect_uri': '{redirect_uri}',\n"
f" 'state': '{_redact_token(state, 8)}',\n"
f" 'response_type': 'code'\n"
f"}}"
)
auth_url = f"{current_app.config['INDIELOGIN_URL']}/auth?{urlencode(params)}"
# Log authentication attempt
current_app.logger.info(f"Auth initiated for {me_url}")
current_app.logger.info(f"Auth: Authentication initiated for {me_url}")
return auth_url
@@ -212,27 +326,50 @@ def handle_callback(code: str, state: str) -> Optional[str]:
UnauthorizedError: User not authorized as admin
IndieLoginError: Code exchange failed
"""
current_app.logger.debug(f"Auth: Verifying state token: {_redact_token(state, 8)}")
# Verify state token (CSRF protection)
if not _verify_state_token(state):
current_app.logger.warning("Auth: Invalid state token received (possible CSRF or expired token)")
raise InvalidStateError("Invalid or expired state token")
current_app.logger.debug("Auth: State token valid and consumed")
# Prepare token exchange request
token_exchange_data = {
"code": code,
"client_id": current_app.config["SITE_URL"],
"redirect_uri": f"{current_app.config['SITE_URL']}/auth/callback",
}
# Log the request
_log_http_request(
method="POST",
url=f"{current_app.config['INDIELOGIN_URL']}/auth",
data=token_exchange_data,
)
# Exchange code for identity
try:
response = httpx.post(
f"{current_app.config['INDIELOGIN_URL']}/auth",
data={
"code": code,
"client_id": current_app.config["SITE_URL"],
"redirect_uri": f"{current_app.config['SITE_URL']}/auth/callback",
},
data=token_exchange_data,
timeout=10.0,
)
# Log the response
_log_http_response(
status_code=response.status_code,
headers=dict(response.headers),
body=response.text,
)
response.raise_for_status()
except httpx.RequestError as e:
current_app.logger.error(f"IndieLogin request failed: {e}")
current_app.logger.error(f"Auth: IndieLogin request failed: {e}")
raise IndieLoginError(f"Failed to verify code: {e}")
except httpx.HTTPStatusError as e:
current_app.logger.error(f"IndieLogin returned error: {e}")
current_app.logger.error(f"Auth: IndieLogin returned error: {e.response.status_code}")
raise IndieLoginError(f"IndieLogin returned error: {e.response.status_code}")
# Parse response
@@ -240,18 +377,27 @@ def handle_callback(code: str, state: str) -> Optional[str]:
me = data.get("me")
if not me:
current_app.logger.error("Auth: No identity returned from IndieLogin")
raise IndieLoginError("No identity returned from IndieLogin")
current_app.logger.debug(f"Auth: Received identity from IndieLogin: {me}")
# Verify this is the admin user
admin_me = current_app.config.get("ADMIN_ME")
if not admin_me:
current_app.logger.error("ADMIN_ME not configured")
current_app.logger.error("Auth: ADMIN_ME not configured")
raise UnauthorizedError("Admin user not configured")
current_app.logger.info(f"Auth: Verifying admin authorization for me={me}")
if me != admin_me:
current_app.logger.warning(f"Unauthorized login attempt: {me}")
current_app.logger.warning(
f"Auth: Unauthorized login attempt: {me} (expected {admin_me})"
)
raise UnauthorizedError(f"User {me} is not authorized")
current_app.logger.debug("Auth: Admin verification passed")
# Create session
session_token = create_session(me)
@@ -272,14 +418,20 @@ def create_session(me: str) -> str:
session_token = secrets.token_urlsafe(32)
token_hash = _hash_token(session_token)
current_app.logger.debug("Auth: Session token generated (hash will be stored)")
# Calculate expiry (use configured session lifetime or default to 30 days)
session_lifetime = current_app.config.get("SESSION_LIFETIME", 30)
expires_at = datetime.utcnow() + timedelta(days=session_lifetime)
current_app.logger.debug(f"Auth: Session expiry: {expires_at} ({session_lifetime} days)")
# Get request metadata
user_agent = request.headers.get("User-Agent", "")[:200]
ip_address = request.remote_addr
current_app.logger.debug(f"Auth: Request metadata - IP: {ip_address}, User-Agent: {user_agent[:50]}...")
# Store in database
db = get_db(current_app)
db.execute(
@@ -296,7 +448,7 @@ def create_session(me: str) -> str:
_cleanup_expired_sessions()
# Log session creation
current_app.logger.info(f"Session created for {me}")
current_app.logger.info(f"Auth: Session created for {me}")
return session_token
@@ -312,8 +464,11 @@ def verify_session(token: str) -> Optional[Dict[str, Any]]:
Session info dict if valid, None otherwise
"""
if not token:
current_app.logger.debug("Auth: No session token provided")
return None
current_app.logger.debug(f"Auth: Verifying session token: {_redact_token(token)}")
token_hash = _hash_token(token)
db = get_db(current_app)
@@ -328,8 +483,11 @@ def verify_session(token: str) -> Optional[Dict[str, Any]]:
).fetchone()
if not session_data:
current_app.logger.debug("Auth: Session token invalid or expired")
return None
current_app.logger.debug(f"Auth: Session verified for {session_data['me']}")
# Update last_used_at for activity tracking
db.execute(
"""