Files
StarPunk/starpunk/utils.py
Phil Skentelbery d4f1bfb198 feat: Implement Phase 3 authentication module with IndieLogin support
Implement complete authentication system following ADR-010 and Phase 3 design specs.
This is a MINOR version increment (0.3.0 -> 0.4.0) as it adds new functionality.

Authentication Features:
- IndieLogin authentication flow via indielogin.com
- Secure session management with SHA-256 token hashing
- CSRF protection with single-use state tokens
- Session lifecycle (create, verify, destroy)
- require_auth decorator for protected routes
- Automatic cleanup of expired sessions
- IP address and user agent tracking

Security Measures:
- Cryptographically secure token generation (secrets module)
- Token hashing for storage (never plaintext)
- SQL injection prevention (prepared statements)
- Single-use CSRF state tokens
- 30-day session expiry with activity refresh
- Comprehensive security logging

Implementation Details:
- starpunk/auth.py: 406 lines, 6 core functions, 4 helpers, 4 exceptions
- tests/test_auth.py: 648 lines, 37 tests, 96% coverage
- Database schema updates for sessions and auth_state tables
- URL validation utility added to utils.py

Test Coverage:
- 37 authentication tests
- 96% code coverage (exceeds 90% target)
- All security features tested
- Edge cases and error paths covered

Documentation:
- Implementation report in docs/reports/
- Updated CHANGELOG.md with detailed changes
- Version incremented to 0.4.0
- ADR-010 and Phase 3 design docs included

Follows project standards:
- Black code formatting (88 char lines)
- Flake8 linting (no errors)
- Python coding standards
- Type hints on all functions
- Comprehensive docstrings

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-18 20:35:36 -07:00

684 lines
18 KiB
Python

"""
Core utility functions for StarPunk
This module provides essential utilities for slug generation, file operations,
hashing, and date/time handling. These utilities are used throughout the
application and have no external dependencies beyond standard library.
"""
# Standard library imports
import hashlib
import re
import secrets
import shutil
from datetime import datetime
from pathlib import Path
from typing import Optional
# Constants - Slug configuration
MAX_SLUG_LENGTH = 100
MIN_SLUG_LENGTH = 1
SLUG_WORDS_COUNT = 5
RANDOM_SUFFIX_LENGTH = 4
# Reserved slugs (system routes)
RESERVED_SLUGS = {"admin", "api", "static", "auth", "feed", "login", "logout"}
# File operations
TEMP_FILE_SUFFIX = ".tmp"
TRASH_DIR_NAME = ".trash"
# Hashing
CONTENT_HASH_ALGORITHM = "sha256"
# Regex patterns
SLUG_PATTERN = re.compile(r"^[a-z0-9]+(?:-[a-z0-9]+)*$")
SAFE_SLUG_PATTERN = re.compile(r"[^a-z0-9-]")
MULTIPLE_HYPHENS_PATTERN = re.compile(r"-+")
URL_PATTERN = re.compile(
r"^https?://" # http:// or https://
r"(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|" # domain...
r"localhost|" # localhost...
r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})" # ...or ip
r"(?::\d+)?" # optional port
r"(?:/?|[/?]\S+)$",
re.IGNORECASE,
)
# Character set for random suffix generation
RANDOM_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789"
# Helper Functions
def is_valid_url(url: str) -> bool:
"""
Validate URL format
Checks if a string is a valid HTTP or HTTPS URL.
Args:
url: URL string to validate
Returns:
True if valid URL, False otherwise
Examples:
>>> is_valid_url("https://example.com")
True
>>> is_valid_url("http://localhost:5000")
True
>>> is_valid_url("not-a-url")
False
>>> is_valid_url("ftp://example.com")
False
"""
if not url or not isinstance(url, str):
return False
return bool(URL_PATTERN.match(url))
def extract_first_words(text: str, max_words: int = 5) -> str:
"""
Extract first N words from text
Helper function for slug generation. Splits text on whitespace
and returns first N non-empty words.
Args:
text: Text to extract words from
max_words: Maximum number of words to extract (default: 5)
Returns:
Space-separated string of first N words
Examples:
>>> extract_first_words("Hello world this is a test", 3)
'Hello world this'
>>> extract_first_words(" Multiple spaces ", 2)
'Multiple spaces'
"""
words = text.strip().split()
return " ".join(words[:max_words])
def normalize_slug_text(text: str) -> str:
"""
Normalize text for use in slug
Converts to lowercase, replaces spaces with hyphens, removes
special characters, and collapses multiple hyphens.
Args:
text: Text to normalize
Returns:
Normalized slug-safe text
Examples:
>>> normalize_slug_text("Hello World!")
'hello-world'
>>> normalize_slug_text("Testing... with -- special chars!")
'testing-with-special-chars'
"""
# Convert to lowercase
text = text.lower()
# Replace spaces with hyphens
text = text.replace(" ", "-")
# Remove all non-alphanumeric characters except hyphens
text = SAFE_SLUG_PATTERN.sub("", text)
# Collapse multiple hyphens to single hyphen
text = MULTIPLE_HYPHENS_PATTERN.sub("-", text)
# Strip leading/trailing hyphens
text = text.strip("-")
return text
def generate_random_suffix(length: int = 4) -> str:
"""
Generate random alphanumeric suffix
Creates a secure random string for making slugs unique.
Uses lowercase letters and numbers only.
Args:
length: Length of suffix (default: 4)
Returns:
Random alphanumeric string
Examples:
>>> suffix = generate_random_suffix()
>>> len(suffix)
4
>>> suffix.isalnum()
True
"""
return "".join(secrets.choice(RANDOM_CHARS) for _ in range(length))
# Slug Functions
def generate_slug(content: str, created_at: Optional[datetime] = None) -> str:
"""
Generate URL-safe slug from note content
Creates a slug by extracting the first few words from the content and
normalizing them to lowercase with hyphens. If content is insufficient,
falls back to timestamp-based slug.
Args:
content: The note content (markdown text)
created_at: Optional timestamp for fallback slug (defaults to now)
Returns:
URL-safe slug string (lowercase, alphanumeric + hyphens only)
Raises:
ValueError: If content is empty or contains only whitespace
Examples:
>>> generate_slug("Hello World! This is my first note.")
'hello-world-this-is-my'
>>> generate_slug("Testing... with special chars!@#")
'testing-with-special-chars'
>>> generate_slug("A") # Too short, uses timestamp
'20241118-143022'
Notes:
- This function does NOT check for uniqueness
- Caller must verify slug doesn't exist in database
- Use make_slug_unique() to add random suffix if needed
"""
# Validate input
if not content or not content.strip():
raise ValueError("Content cannot be empty or whitespace-only")
# Extract first N words from content
first_words = extract_first_words(content, SLUG_WORDS_COUNT)
# Normalize to slug format
slug = normalize_slug_text(first_words)
# If slug is empty or too short, use timestamp fallback
if len(slug) < MIN_SLUG_LENGTH:
if created_at is None:
created_at = datetime.utcnow()
slug = created_at.strftime("%Y%m%d-%H%M%S")
# Truncate to maximum length
slug = slug[:MAX_SLUG_LENGTH]
return slug
def make_slug_unique(base_slug: str, existing_slugs: set[str]) -> str:
"""
Make a slug unique by adding random suffix if needed
If the base_slug already exists in the provided set, appends a random
alphanumeric suffix until a unique slug is found.
Args:
base_slug: The base slug to make unique
existing_slugs: Set of existing slugs to check against
Returns:
Unique slug (base_slug or base_slug-{random})
Examples:
>>> make_slug_unique("test-note", set())
'test-note'
>>> make_slug_unique("test-note", {"test-note"})
'test-note-a7c9' # Random suffix
>>> make_slug_unique("test-note", {"test-note", "test-note-a7c9"})
'test-note-x3k2' # Different random suffix
Notes:
- Random suffix is 4 lowercase alphanumeric characters
- Extremely low collision probability (36^4 = 1.6M combinations)
- Will retry up to 100 times if collision occurs (should never happen)
"""
# If base slug doesn't exist, return it unchanged
if base_slug not in existing_slugs:
return base_slug
# Generate unique slug with random suffix
max_attempts = 100
for _ in range(max_attempts):
suffix = generate_random_suffix(RANDOM_SUFFIX_LENGTH)
unique_slug = f"{base_slug}-{suffix}"
if unique_slug not in existing_slugs:
return unique_slug
# This should never happen with 36^4 combinations
raise RuntimeError(
f"Failed to generate unique slug after {max_attempts} attempts. "
f"This is extremely unlikely and may indicate a problem."
)
def validate_slug(slug: str) -> bool:
"""
Validate that a slug meets all requirements
Checks that slug contains only allowed characters and is within
length limits. Also checks against reserved slugs.
Args:
slug: The slug to validate
Returns:
True if slug is valid, False otherwise
Rules:
- Must contain only: a-z, 0-9, hyphen (-)
- Must be between 1 and 100 characters
- Cannot start or end with hyphen
- Cannot contain consecutive hyphens
- Cannot be a reserved slug
Examples:
>>> validate_slug("hello-world")
True
>>> validate_slug("Hello-World") # Uppercase
False
>>> validate_slug("-hello") # Leading hyphen
False
>>> validate_slug("hello--world") # Double hyphen
False
>>> validate_slug("admin") # Reserved slug
False
"""
# Check basic constraints
if not slug:
return False
if len(slug) < MIN_SLUG_LENGTH or len(slug) > MAX_SLUG_LENGTH:
return False
# Check against reserved slugs
if slug in RESERVED_SLUGS:
return False
# Check pattern (lowercase alphanumeric with single hyphens)
return bool(SLUG_PATTERN.match(slug))
# Content Hashing
def calculate_content_hash(content: str) -> str:
"""
Calculate SHA-256 hash of content
Generates a cryptographic hash of the content for change detection
and cache invalidation. Uses UTF-8 encoding.
Args:
content: The content to hash (markdown text)
Returns:
Hexadecimal hash string (64 characters)
Examples:
>>> calculate_content_hash("Hello World")
'a591a6d40bf420404a011733cfb7b190d62c65bf0bcda32b57b277d9ad9f146e'
>>> calculate_content_hash("")
'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
Notes:
- Same content always produces same hash
- Hash is deterministic across systems
- Useful for detecting external file modifications
- SHA-256 chosen for security and wide support
"""
content_bytes = content.encode("utf-8")
hash_obj = hashlib.sha256(content_bytes)
return hash_obj.hexdigest()
# File Path Operations
def generate_note_path(slug: str, created_at: datetime, data_dir: Path) -> Path:
"""
Generate file path for a note
Creates path following pattern: data/notes/YYYY/MM/slug.md
Args:
slug: URL-safe slug for the note
created_at: Creation timestamp (determines YYYY/MM)
data_dir: Base data directory path
Returns:
Full Path object for the note file
Raises:
ValueError: If slug is invalid
Examples:
>>> from datetime import datetime
>>> from pathlib import Path
>>> dt = datetime(2024, 11, 18, 14, 30)
>>> generate_note_path("test-note", dt, Path("data"))
PosixPath('data/notes/2024/11/test-note.md')
Notes:
- Does NOT create directories (use ensure_note_directory)
- Does NOT check if file exists
- Validates slug before generating path
"""
# Validate slug before generating path
if not validate_slug(slug):
raise ValueError(f"Invalid slug: {slug}")
# Extract year and month from created_at
year = created_at.strftime("%Y")
month = created_at.strftime("%m")
# Build path: data_dir/notes/YYYY/MM/slug.md
note_path = data_dir / "notes" / year / month / f"{slug}.md"
return note_path
def ensure_note_directory(note_path: Path) -> Path:
"""
Ensure directory exists for note file
Creates parent directories if they don't exist. Safe to call
even if directories already exist.
Args:
note_path: Full path to note file
Returns:
Parent directory path
Raises:
OSError: If directory cannot be created (permissions, etc.)
Examples:
>>> note_path = Path("data/notes/2024/11/test-note.md")
>>> ensure_note_directory(note_path)
PosixPath('data/notes/2024/11')
"""
# Create parent directories if they don't exist
parent_dir = note_path.parent
parent_dir.mkdir(parents=True, exist_ok=True)
return parent_dir
def validate_note_path(file_path: Path, data_dir: Path) -> bool:
"""
Validate that file path is within data directory
Security check to prevent path traversal attacks. Ensures the
resolved path is within the allowed data directory.
Args:
file_path: Path to validate
data_dir: Base data directory that must contain file_path
Returns:
True if path is safe, False otherwise
Examples:
>>> validate_note_path(
... Path("data/notes/2024/11/note.md"),
... Path("data")
... )
True
>>> validate_note_path(
... Path("data/notes/../../etc/passwd"),
... Path("data")
... )
False
Security:
- Resolves symlinks and relative paths
- Checks if resolved path is child of data_dir
- Prevents directory traversal attacks
"""
# Resolve both paths to absolute
try:
resolved_file = file_path.resolve()
resolved_data_dir = data_dir.resolve()
# Check if file_path is relative to data_dir
return resolved_file.is_relative_to(resolved_data_dir)
except (ValueError, OSError):
# If resolve() fails or is_relative_to() raises an error
return False
# Atomic File Operations
def write_note_file(file_path: Path, content: str) -> None:
"""
Write note content to file atomically
Writes to temporary file first, then atomically renames to final path.
This prevents corruption if write is interrupted.
Args:
file_path: Destination file path
content: Content to write (markdown text)
Raises:
OSError: If file cannot be written
ValueError: If file_path is invalid
Examples:
>>> write_note_file(Path("data/notes/2024/11/test.md"), "# Test")
Implementation:
1. Create temp file: {file_path}.tmp
2. Write content to temp file
3. Atomically rename temp to final path
4. If any step fails, clean up temp file
Notes:
- Atomic rename is guaranteed on POSIX systems
- Temp file created in same directory as target
- UTF-8 encoding used for all text
"""
# Create temp file path
temp_path = file_path.with_suffix(file_path.suffix + TEMP_FILE_SUFFIX)
try:
# Write to temp file
temp_path.write_text(content, encoding="utf-8")
# Atomically rename temp to final path
temp_path.replace(file_path)
except Exception:
# Clean up temp file if it exists
if temp_path.exists():
temp_path.unlink()
# Re-raise the exception
raise
def read_note_file(file_path: Path) -> str:
"""
Read note content from file
Args:
file_path: Path to note file
Returns:
File content as string
Raises:
FileNotFoundError: If file doesn't exist
OSError: If file cannot be read
Examples:
>>> content = read_note_file(Path("data/notes/2024/11/test.md"))
>>> print(content)
# Test Note
"""
return file_path.read_text(encoding="utf-8")
def delete_note_file(
file_path: Path, soft: bool = False, data_dir: Optional[Path] = None
) -> None:
"""
Delete note file from filesystem
Supports soft delete (move to trash) or hard delete (permanent removal).
Args:
file_path: Path to note file
soft: If True, move to .trash/ directory; if False, delete permanently
data_dir: Required if soft=True, base data directory
Raises:
FileNotFoundError: If file doesn't exist
ValueError: If soft=True but data_dir not provided
OSError: If file cannot be deleted or moved
Examples:
>>> # Hard delete
>>> delete_note_file(Path("data/notes/2024/11/test.md"))
>>> # Soft delete (move to trash)
>>> delete_note_file(
... Path("data/notes/2024/11/test.md"),
... soft=True,
... data_dir=Path("data")
... )
"""
if soft:
# Soft delete: move to trash
if data_dir is None:
raise ValueError("data_dir is required for soft delete")
# Extract year/month from file path
# Assuming path structure: data_dir/notes/YYYY/MM/slug.md
parts = file_path.parts
try:
# Find the year and month in the path
notes_idx = parts.index("notes")
year = parts[notes_idx + 1]
month = parts[notes_idx + 2]
except (ValueError, IndexError):
# If path doesn't follow expected structure, use current date
now = datetime.utcnow()
year = now.strftime("%Y")
month = now.strftime("%m")
# Create trash directory path
trash_dir = data_dir / TRASH_DIR_NAME / year / month
trash_dir.mkdir(parents=True, exist_ok=True)
# Move file to trash
trash_path = trash_dir / file_path.name
shutil.move(str(file_path), str(trash_path))
else:
# Hard delete: permanent removal
file_path.unlink()
# Date/Time Utilities
def format_rfc822(dt: datetime) -> str:
"""
Format datetime as RFC-822 string
Converts datetime to RFC-822 format required by RSS 2.0 specification.
Assumes UTC timezone.
Args:
dt: Datetime to format (assumed UTC)
Returns:
RFC-822 formatted string
Examples:
>>> from datetime import datetime
>>> dt = datetime(2024, 11, 18, 14, 30, 45)
>>> format_rfc822(dt)
'Mon, 18 Nov 2024 14:30:45 +0000'
References:
- RSS 2.0 spec: https://www.rssboard.org/rss-specification
- RFC-822 date format
"""
return dt.strftime("%a, %d %b %Y %H:%M:%S +0000")
def format_iso8601(dt: datetime) -> str:
"""
Format datetime as ISO 8601 string
Converts datetime to ISO 8601 format for timestamps and APIs.
Args:
dt: Datetime to format
Returns:
ISO 8601 formatted string
Examples:
>>> from datetime import datetime
>>> dt = datetime(2024, 11, 18, 14, 30, 45)
>>> format_iso8601(dt)
'2024-11-18T14:30:45Z'
"""
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
def parse_iso8601(date_string: str) -> datetime:
"""
Parse ISO 8601 string to datetime
Args:
date_string: ISO 8601 formatted string
Returns:
Datetime object (UTC)
Raises:
ValueError: If string is not valid ISO 8601 format
Examples:
>>> parse_iso8601("2024-11-18T14:30:45Z")
datetime.datetime(2024, 11, 18, 14, 30, 45)
"""
# Remove 'Z' suffix if present
if date_string.endswith("Z"):
date_string = date_string[:-1]
# Parse using fromisoformat
return datetime.fromisoformat(date_string)