Implement complete authentication system following ADR-010 and Phase 3 design specs. This is a MINOR version increment (0.3.0 -> 0.4.0) as it adds new functionality. Authentication Features: - IndieLogin authentication flow via indielogin.com - Secure session management with SHA-256 token hashing - CSRF protection with single-use state tokens - Session lifecycle (create, verify, destroy) - require_auth decorator for protected routes - Automatic cleanup of expired sessions - IP address and user agent tracking Security Measures: - Cryptographically secure token generation (secrets module) - Token hashing for storage (never plaintext) - SQL injection prevention (prepared statements) - Single-use CSRF state tokens - 30-day session expiry with activity refresh - Comprehensive security logging Implementation Details: - starpunk/auth.py: 406 lines, 6 core functions, 4 helpers, 4 exceptions - tests/test_auth.py: 648 lines, 37 tests, 96% coverage - Database schema updates for sessions and auth_state tables - URL validation utility added to utils.py Test Coverage: - 37 authentication tests - 96% code coverage (exceeds 90% target) - All security features tested - Edge cases and error paths covered Documentation: - Implementation report in docs/reports/ - Updated CHANGELOG.md with detailed changes - Version incremented to 0.4.0 - ADR-010 and Phase 3 design docs included Follows project standards: - Black code formatting (88 char lines) - Flake8 linting (no errors) - Python coding standards - Type hints on all functions - Comprehensive docstrings 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
684 lines
18 KiB
Python
684 lines
18 KiB
Python
"""
|
|
Core utility functions for StarPunk
|
|
|
|
This module provides essential utilities for slug generation, file operations,
|
|
hashing, and date/time handling. These utilities are used throughout the
|
|
application and have no external dependencies beyond standard library.
|
|
"""
|
|
|
|
# Standard library imports
|
|
import hashlib
|
|
import re
|
|
import secrets
|
|
import shutil
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
# Constants - Slug configuration
|
|
MAX_SLUG_LENGTH = 100
|
|
MIN_SLUG_LENGTH = 1
|
|
SLUG_WORDS_COUNT = 5
|
|
RANDOM_SUFFIX_LENGTH = 4
|
|
|
|
# Reserved slugs (system routes)
|
|
RESERVED_SLUGS = {"admin", "api", "static", "auth", "feed", "login", "logout"}
|
|
|
|
# File operations
|
|
TEMP_FILE_SUFFIX = ".tmp"
|
|
TRASH_DIR_NAME = ".trash"
|
|
|
|
# Hashing
|
|
CONTENT_HASH_ALGORITHM = "sha256"
|
|
|
|
# Regex patterns
|
|
SLUG_PATTERN = re.compile(r"^[a-z0-9]+(?:-[a-z0-9]+)*$")
|
|
SAFE_SLUG_PATTERN = re.compile(r"[^a-z0-9-]")
|
|
MULTIPLE_HYPHENS_PATTERN = re.compile(r"-+")
|
|
URL_PATTERN = re.compile(
|
|
r"^https?://" # http:// or https://
|
|
r"(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|" # domain...
|
|
r"localhost|" # localhost...
|
|
r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})" # ...or ip
|
|
r"(?::\d+)?" # optional port
|
|
r"(?:/?|[/?]\S+)$",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Character set for random suffix generation
|
|
RANDOM_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789"
|
|
|
|
|
|
# Helper Functions
|
|
|
|
|
|
def is_valid_url(url: str) -> bool:
|
|
"""
|
|
Validate URL format
|
|
|
|
Checks if a string is a valid HTTP or HTTPS URL.
|
|
|
|
Args:
|
|
url: URL string to validate
|
|
|
|
Returns:
|
|
True if valid URL, False otherwise
|
|
|
|
Examples:
|
|
>>> is_valid_url("https://example.com")
|
|
True
|
|
|
|
>>> is_valid_url("http://localhost:5000")
|
|
True
|
|
|
|
>>> is_valid_url("not-a-url")
|
|
False
|
|
|
|
>>> is_valid_url("ftp://example.com")
|
|
False
|
|
"""
|
|
if not url or not isinstance(url, str):
|
|
return False
|
|
return bool(URL_PATTERN.match(url))
|
|
|
|
|
|
def extract_first_words(text: str, max_words: int = 5) -> str:
|
|
"""
|
|
Extract first N words from text
|
|
|
|
Helper function for slug generation. Splits text on whitespace
|
|
and returns first N non-empty words.
|
|
|
|
Args:
|
|
text: Text to extract words from
|
|
max_words: Maximum number of words to extract (default: 5)
|
|
|
|
Returns:
|
|
Space-separated string of first N words
|
|
|
|
Examples:
|
|
>>> extract_first_words("Hello world this is a test", 3)
|
|
'Hello world this'
|
|
|
|
>>> extract_first_words(" Multiple spaces ", 2)
|
|
'Multiple spaces'
|
|
"""
|
|
words = text.strip().split()
|
|
return " ".join(words[:max_words])
|
|
|
|
|
|
def normalize_slug_text(text: str) -> str:
|
|
"""
|
|
Normalize text for use in slug
|
|
|
|
Converts to lowercase, replaces spaces with hyphens, removes
|
|
special characters, and collapses multiple hyphens.
|
|
|
|
Args:
|
|
text: Text to normalize
|
|
|
|
Returns:
|
|
Normalized slug-safe text
|
|
|
|
Examples:
|
|
>>> normalize_slug_text("Hello World!")
|
|
'hello-world'
|
|
|
|
>>> normalize_slug_text("Testing... with -- special chars!")
|
|
'testing-with-special-chars'
|
|
"""
|
|
# Convert to lowercase
|
|
text = text.lower()
|
|
|
|
# Replace spaces with hyphens
|
|
text = text.replace(" ", "-")
|
|
|
|
# Remove all non-alphanumeric characters except hyphens
|
|
text = SAFE_SLUG_PATTERN.sub("", text)
|
|
|
|
# Collapse multiple hyphens to single hyphen
|
|
text = MULTIPLE_HYPHENS_PATTERN.sub("-", text)
|
|
|
|
# Strip leading/trailing hyphens
|
|
text = text.strip("-")
|
|
|
|
return text
|
|
|
|
|
|
def generate_random_suffix(length: int = 4) -> str:
|
|
"""
|
|
Generate random alphanumeric suffix
|
|
|
|
Creates a secure random string for making slugs unique.
|
|
Uses lowercase letters and numbers only.
|
|
|
|
Args:
|
|
length: Length of suffix (default: 4)
|
|
|
|
Returns:
|
|
Random alphanumeric string
|
|
|
|
Examples:
|
|
>>> suffix = generate_random_suffix()
|
|
>>> len(suffix)
|
|
4
|
|
>>> suffix.isalnum()
|
|
True
|
|
"""
|
|
return "".join(secrets.choice(RANDOM_CHARS) for _ in range(length))
|
|
|
|
|
|
# Slug Functions
|
|
|
|
|
|
def generate_slug(content: str, created_at: Optional[datetime] = None) -> str:
|
|
"""
|
|
Generate URL-safe slug from note content
|
|
|
|
Creates a slug by extracting the first few words from the content and
|
|
normalizing them to lowercase with hyphens. If content is insufficient,
|
|
falls back to timestamp-based slug.
|
|
|
|
Args:
|
|
content: The note content (markdown text)
|
|
created_at: Optional timestamp for fallback slug (defaults to now)
|
|
|
|
Returns:
|
|
URL-safe slug string (lowercase, alphanumeric + hyphens only)
|
|
|
|
Raises:
|
|
ValueError: If content is empty or contains only whitespace
|
|
|
|
Examples:
|
|
>>> generate_slug("Hello World! This is my first note.")
|
|
'hello-world-this-is-my'
|
|
|
|
>>> generate_slug("Testing... with special chars!@#")
|
|
'testing-with-special-chars'
|
|
|
|
>>> generate_slug("A") # Too short, uses timestamp
|
|
'20241118-143022'
|
|
|
|
Notes:
|
|
- This function does NOT check for uniqueness
|
|
- Caller must verify slug doesn't exist in database
|
|
- Use make_slug_unique() to add random suffix if needed
|
|
"""
|
|
# Validate input
|
|
if not content or not content.strip():
|
|
raise ValueError("Content cannot be empty or whitespace-only")
|
|
|
|
# Extract first N words from content
|
|
first_words = extract_first_words(content, SLUG_WORDS_COUNT)
|
|
|
|
# Normalize to slug format
|
|
slug = normalize_slug_text(first_words)
|
|
|
|
# If slug is empty or too short, use timestamp fallback
|
|
if len(slug) < MIN_SLUG_LENGTH:
|
|
if created_at is None:
|
|
created_at = datetime.utcnow()
|
|
slug = created_at.strftime("%Y%m%d-%H%M%S")
|
|
|
|
# Truncate to maximum length
|
|
slug = slug[:MAX_SLUG_LENGTH]
|
|
|
|
return slug
|
|
|
|
|
|
def make_slug_unique(base_slug: str, existing_slugs: set[str]) -> str:
|
|
"""
|
|
Make a slug unique by adding random suffix if needed
|
|
|
|
If the base_slug already exists in the provided set, appends a random
|
|
alphanumeric suffix until a unique slug is found.
|
|
|
|
Args:
|
|
base_slug: The base slug to make unique
|
|
existing_slugs: Set of existing slugs to check against
|
|
|
|
Returns:
|
|
Unique slug (base_slug or base_slug-{random})
|
|
|
|
Examples:
|
|
>>> make_slug_unique("test-note", set())
|
|
'test-note'
|
|
|
|
>>> make_slug_unique("test-note", {"test-note"})
|
|
'test-note-a7c9' # Random suffix
|
|
|
|
>>> make_slug_unique("test-note", {"test-note", "test-note-a7c9"})
|
|
'test-note-x3k2' # Different random suffix
|
|
|
|
Notes:
|
|
- Random suffix is 4 lowercase alphanumeric characters
|
|
- Extremely low collision probability (36^4 = 1.6M combinations)
|
|
- Will retry up to 100 times if collision occurs (should never happen)
|
|
"""
|
|
# If base slug doesn't exist, return it unchanged
|
|
if base_slug not in existing_slugs:
|
|
return base_slug
|
|
|
|
# Generate unique slug with random suffix
|
|
max_attempts = 100
|
|
for _ in range(max_attempts):
|
|
suffix = generate_random_suffix(RANDOM_SUFFIX_LENGTH)
|
|
unique_slug = f"{base_slug}-{suffix}"
|
|
|
|
if unique_slug not in existing_slugs:
|
|
return unique_slug
|
|
|
|
# This should never happen with 36^4 combinations
|
|
raise RuntimeError(
|
|
f"Failed to generate unique slug after {max_attempts} attempts. "
|
|
f"This is extremely unlikely and may indicate a problem."
|
|
)
|
|
|
|
|
|
def validate_slug(slug: str) -> bool:
|
|
"""
|
|
Validate that a slug meets all requirements
|
|
|
|
Checks that slug contains only allowed characters and is within
|
|
length limits. Also checks against reserved slugs.
|
|
|
|
Args:
|
|
slug: The slug to validate
|
|
|
|
Returns:
|
|
True if slug is valid, False otherwise
|
|
|
|
Rules:
|
|
- Must contain only: a-z, 0-9, hyphen (-)
|
|
- Must be between 1 and 100 characters
|
|
- Cannot start or end with hyphen
|
|
- Cannot contain consecutive hyphens
|
|
- Cannot be a reserved slug
|
|
|
|
Examples:
|
|
>>> validate_slug("hello-world")
|
|
True
|
|
|
|
>>> validate_slug("Hello-World") # Uppercase
|
|
False
|
|
|
|
>>> validate_slug("-hello") # Leading hyphen
|
|
False
|
|
|
|
>>> validate_slug("hello--world") # Double hyphen
|
|
False
|
|
|
|
>>> validate_slug("admin") # Reserved slug
|
|
False
|
|
"""
|
|
# Check basic constraints
|
|
if not slug:
|
|
return False
|
|
|
|
if len(slug) < MIN_SLUG_LENGTH or len(slug) > MAX_SLUG_LENGTH:
|
|
return False
|
|
|
|
# Check against reserved slugs
|
|
if slug in RESERVED_SLUGS:
|
|
return False
|
|
|
|
# Check pattern (lowercase alphanumeric with single hyphens)
|
|
return bool(SLUG_PATTERN.match(slug))
|
|
|
|
|
|
# Content Hashing
|
|
|
|
|
|
def calculate_content_hash(content: str) -> str:
|
|
"""
|
|
Calculate SHA-256 hash of content
|
|
|
|
Generates a cryptographic hash of the content for change detection
|
|
and cache invalidation. Uses UTF-8 encoding.
|
|
|
|
Args:
|
|
content: The content to hash (markdown text)
|
|
|
|
Returns:
|
|
Hexadecimal hash string (64 characters)
|
|
|
|
Examples:
|
|
>>> calculate_content_hash("Hello World")
|
|
'a591a6d40bf420404a011733cfb7b190d62c65bf0bcda32b57b277d9ad9f146e'
|
|
|
|
>>> calculate_content_hash("")
|
|
'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
|
|
|
|
Notes:
|
|
- Same content always produces same hash
|
|
- Hash is deterministic across systems
|
|
- Useful for detecting external file modifications
|
|
- SHA-256 chosen for security and wide support
|
|
"""
|
|
content_bytes = content.encode("utf-8")
|
|
hash_obj = hashlib.sha256(content_bytes)
|
|
return hash_obj.hexdigest()
|
|
|
|
|
|
# File Path Operations
|
|
|
|
|
|
def generate_note_path(slug: str, created_at: datetime, data_dir: Path) -> Path:
|
|
"""
|
|
Generate file path for a note
|
|
|
|
Creates path following pattern: data/notes/YYYY/MM/slug.md
|
|
|
|
Args:
|
|
slug: URL-safe slug for the note
|
|
created_at: Creation timestamp (determines YYYY/MM)
|
|
data_dir: Base data directory path
|
|
|
|
Returns:
|
|
Full Path object for the note file
|
|
|
|
Raises:
|
|
ValueError: If slug is invalid
|
|
|
|
Examples:
|
|
>>> from datetime import datetime
|
|
>>> from pathlib import Path
|
|
>>> dt = datetime(2024, 11, 18, 14, 30)
|
|
>>> generate_note_path("test-note", dt, Path("data"))
|
|
PosixPath('data/notes/2024/11/test-note.md')
|
|
|
|
Notes:
|
|
- Does NOT create directories (use ensure_note_directory)
|
|
- Does NOT check if file exists
|
|
- Validates slug before generating path
|
|
"""
|
|
# Validate slug before generating path
|
|
if not validate_slug(slug):
|
|
raise ValueError(f"Invalid slug: {slug}")
|
|
|
|
# Extract year and month from created_at
|
|
year = created_at.strftime("%Y")
|
|
month = created_at.strftime("%m")
|
|
|
|
# Build path: data_dir/notes/YYYY/MM/slug.md
|
|
note_path = data_dir / "notes" / year / month / f"{slug}.md"
|
|
|
|
return note_path
|
|
|
|
|
|
def ensure_note_directory(note_path: Path) -> Path:
|
|
"""
|
|
Ensure directory exists for note file
|
|
|
|
Creates parent directories if they don't exist. Safe to call
|
|
even if directories already exist.
|
|
|
|
Args:
|
|
note_path: Full path to note file
|
|
|
|
Returns:
|
|
Parent directory path
|
|
|
|
Raises:
|
|
OSError: If directory cannot be created (permissions, etc.)
|
|
|
|
Examples:
|
|
>>> note_path = Path("data/notes/2024/11/test-note.md")
|
|
>>> ensure_note_directory(note_path)
|
|
PosixPath('data/notes/2024/11')
|
|
"""
|
|
# Create parent directories if they don't exist
|
|
parent_dir = note_path.parent
|
|
parent_dir.mkdir(parents=True, exist_ok=True)
|
|
return parent_dir
|
|
|
|
|
|
def validate_note_path(file_path: Path, data_dir: Path) -> bool:
|
|
"""
|
|
Validate that file path is within data directory
|
|
|
|
Security check to prevent path traversal attacks. Ensures the
|
|
resolved path is within the allowed data directory.
|
|
|
|
Args:
|
|
file_path: Path to validate
|
|
data_dir: Base data directory that must contain file_path
|
|
|
|
Returns:
|
|
True if path is safe, False otherwise
|
|
|
|
Examples:
|
|
>>> validate_note_path(
|
|
... Path("data/notes/2024/11/note.md"),
|
|
... Path("data")
|
|
... )
|
|
True
|
|
|
|
>>> validate_note_path(
|
|
... Path("data/notes/../../etc/passwd"),
|
|
... Path("data")
|
|
... )
|
|
False
|
|
|
|
Security:
|
|
- Resolves symlinks and relative paths
|
|
- Checks if resolved path is child of data_dir
|
|
- Prevents directory traversal attacks
|
|
"""
|
|
# Resolve both paths to absolute
|
|
try:
|
|
resolved_file = file_path.resolve()
|
|
resolved_data_dir = data_dir.resolve()
|
|
|
|
# Check if file_path is relative to data_dir
|
|
return resolved_file.is_relative_to(resolved_data_dir)
|
|
except (ValueError, OSError):
|
|
# If resolve() fails or is_relative_to() raises an error
|
|
return False
|
|
|
|
|
|
# Atomic File Operations
|
|
|
|
|
|
def write_note_file(file_path: Path, content: str) -> None:
|
|
"""
|
|
Write note content to file atomically
|
|
|
|
Writes to temporary file first, then atomically renames to final path.
|
|
This prevents corruption if write is interrupted.
|
|
|
|
Args:
|
|
file_path: Destination file path
|
|
content: Content to write (markdown text)
|
|
|
|
Raises:
|
|
OSError: If file cannot be written
|
|
ValueError: If file_path is invalid
|
|
|
|
Examples:
|
|
>>> write_note_file(Path("data/notes/2024/11/test.md"), "# Test")
|
|
|
|
Implementation:
|
|
1. Create temp file: {file_path}.tmp
|
|
2. Write content to temp file
|
|
3. Atomically rename temp to final path
|
|
4. If any step fails, clean up temp file
|
|
|
|
Notes:
|
|
- Atomic rename is guaranteed on POSIX systems
|
|
- Temp file created in same directory as target
|
|
- UTF-8 encoding used for all text
|
|
"""
|
|
# Create temp file path
|
|
temp_path = file_path.with_suffix(file_path.suffix + TEMP_FILE_SUFFIX)
|
|
|
|
try:
|
|
# Write to temp file
|
|
temp_path.write_text(content, encoding="utf-8")
|
|
|
|
# Atomically rename temp to final path
|
|
temp_path.replace(file_path)
|
|
except Exception:
|
|
# Clean up temp file if it exists
|
|
if temp_path.exists():
|
|
temp_path.unlink()
|
|
# Re-raise the exception
|
|
raise
|
|
|
|
|
|
def read_note_file(file_path: Path) -> str:
|
|
"""
|
|
Read note content from file
|
|
|
|
Args:
|
|
file_path: Path to note file
|
|
|
|
Returns:
|
|
File content as string
|
|
|
|
Raises:
|
|
FileNotFoundError: If file doesn't exist
|
|
OSError: If file cannot be read
|
|
|
|
Examples:
|
|
>>> content = read_note_file(Path("data/notes/2024/11/test.md"))
|
|
>>> print(content)
|
|
# Test Note
|
|
"""
|
|
return file_path.read_text(encoding="utf-8")
|
|
|
|
|
|
def delete_note_file(
|
|
file_path: Path, soft: bool = False, data_dir: Optional[Path] = None
|
|
) -> None:
|
|
"""
|
|
Delete note file from filesystem
|
|
|
|
Supports soft delete (move to trash) or hard delete (permanent removal).
|
|
|
|
Args:
|
|
file_path: Path to note file
|
|
soft: If True, move to .trash/ directory; if False, delete permanently
|
|
data_dir: Required if soft=True, base data directory
|
|
|
|
Raises:
|
|
FileNotFoundError: If file doesn't exist
|
|
ValueError: If soft=True but data_dir not provided
|
|
OSError: If file cannot be deleted or moved
|
|
|
|
Examples:
|
|
>>> # Hard delete
|
|
>>> delete_note_file(Path("data/notes/2024/11/test.md"))
|
|
|
|
>>> # Soft delete (move to trash)
|
|
>>> delete_note_file(
|
|
... Path("data/notes/2024/11/test.md"),
|
|
... soft=True,
|
|
... data_dir=Path("data")
|
|
... )
|
|
"""
|
|
if soft:
|
|
# Soft delete: move to trash
|
|
if data_dir is None:
|
|
raise ValueError("data_dir is required for soft delete")
|
|
|
|
# Extract year/month from file path
|
|
# Assuming path structure: data_dir/notes/YYYY/MM/slug.md
|
|
parts = file_path.parts
|
|
try:
|
|
# Find the year and month in the path
|
|
notes_idx = parts.index("notes")
|
|
year = parts[notes_idx + 1]
|
|
month = parts[notes_idx + 2]
|
|
except (ValueError, IndexError):
|
|
# If path doesn't follow expected structure, use current date
|
|
now = datetime.utcnow()
|
|
year = now.strftime("%Y")
|
|
month = now.strftime("%m")
|
|
|
|
# Create trash directory path
|
|
trash_dir = data_dir / TRASH_DIR_NAME / year / month
|
|
trash_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Move file to trash
|
|
trash_path = trash_dir / file_path.name
|
|
shutil.move(str(file_path), str(trash_path))
|
|
else:
|
|
# Hard delete: permanent removal
|
|
file_path.unlink()
|
|
|
|
|
|
# Date/Time Utilities
|
|
|
|
|
|
def format_rfc822(dt: datetime) -> str:
|
|
"""
|
|
Format datetime as RFC-822 string
|
|
|
|
Converts datetime to RFC-822 format required by RSS 2.0 specification.
|
|
Assumes UTC timezone.
|
|
|
|
Args:
|
|
dt: Datetime to format (assumed UTC)
|
|
|
|
Returns:
|
|
RFC-822 formatted string
|
|
|
|
Examples:
|
|
>>> from datetime import datetime
|
|
>>> dt = datetime(2024, 11, 18, 14, 30, 45)
|
|
>>> format_rfc822(dt)
|
|
'Mon, 18 Nov 2024 14:30:45 +0000'
|
|
|
|
References:
|
|
- RSS 2.0 spec: https://www.rssboard.org/rss-specification
|
|
- RFC-822 date format
|
|
"""
|
|
return dt.strftime("%a, %d %b %Y %H:%M:%S +0000")
|
|
|
|
|
|
def format_iso8601(dt: datetime) -> str:
|
|
"""
|
|
Format datetime as ISO 8601 string
|
|
|
|
Converts datetime to ISO 8601 format for timestamps and APIs.
|
|
|
|
Args:
|
|
dt: Datetime to format
|
|
|
|
Returns:
|
|
ISO 8601 formatted string
|
|
|
|
Examples:
|
|
>>> from datetime import datetime
|
|
>>> dt = datetime(2024, 11, 18, 14, 30, 45)
|
|
>>> format_iso8601(dt)
|
|
'2024-11-18T14:30:45Z'
|
|
"""
|
|
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
|
|
def parse_iso8601(date_string: str) -> datetime:
|
|
"""
|
|
Parse ISO 8601 string to datetime
|
|
|
|
Args:
|
|
date_string: ISO 8601 formatted string
|
|
|
|
Returns:
|
|
Datetime object (UTC)
|
|
|
|
Raises:
|
|
ValueError: If string is not valid ISO 8601 format
|
|
|
|
Examples:
|
|
>>> parse_iso8601("2024-11-18T14:30:45Z")
|
|
datetime.datetime(2024, 11, 18, 14, 30, 45)
|
|
"""
|
|
# Remove 'Z' suffix if present
|
|
if date_string.endswith("Z"):
|
|
date_string = date_string[:-1]
|
|
|
|
# Parse using fromisoformat
|
|
return datetime.fromisoformat(date_string)
|