""" Data models for StarPunk This module provides data model classes that wrap database rows and provide clean interfaces for working with notes, sessions, tokens, and authentication state. All models are immutable and use dataclasses for clean structure. """ # Standard library imports from dataclasses import dataclass, field, replace from datetime import datetime, timedelta from pathlib import Path from typing import Any, Optional import sqlite3 # Third-party imports import markdown # Local imports from starpunk.utils import ( read_note_file, calculate_content_hash, validate_note_path, ) # Constants - Session configuration DEFAULT_SESSION_EXPIRY_DAYS = 30 SESSION_EXTENSION_ON_USE = True # Constants - Auth state configuration DEFAULT_AUTH_STATE_EXPIRY_MINUTES = 5 # Constants - Token configuration DEFAULT_TOKEN_EXPIRY_DAYS = 90 # Constants - Markdown rendering MARKDOWN_EXTENSIONS = ["extra", "codehilite", "nl2br"] # Constants - Content limits MAX_TITLE_LENGTH = 200 EXCERPT_LENGTH = 200 @dataclass(frozen=True) class Note: """ Represents a note/post This is an immutable data model that wraps a database row and provides access to note metadata and lazy-loaded content. Content is read from files on-demand, and HTML rendering is cached. Attributes: id: Database ID (primary key) slug: URL-safe slug (unique) file_path: Path to markdown file (relative to data directory) published: Whether note is published (visible publicly) created_at: Creation timestamp (UTC) updated_at: Last update timestamp (UTC) deleted_at: Soft deletion timestamp (UTC, None if not deleted) content_hash: SHA-256 hash of content (for integrity checking) _data_dir: Base data directory path (used for file loading) _cached_content: Cached markdown content (lazy-loaded) _cached_html: Cached rendered HTML (lazy-loaded) Properties: content: Markdown content (loaded from file, cached) html: Rendered HTML content (cached) title: Extracted title (first line or slug) excerpt: Short excerpt for previews permalink: Public URL path is_published: Alias for published (more readable) Methods: from_row: Create Note from database row to_dict: Serialize to dictionary (for JSON) verify_integrity: Check if file content matches hash Examples: >>> # Create from database row >>> row = db.execute("SELECT * FROM notes WHERE slug = ?", (slug,)).fetchone() >>> note = Note.from_row(row, data_dir=Path("data")) >>> # Access metadata >>> print(note.slug) 'my-first-note' >>> print(note.published) True >>> # Lazy-load content >>> content = note.content # Reads file on first access >>> content = note.content # Returns cached value on subsequent access >>> # Render HTML >>> html = note.html # Renders markdown on first access >>> html = note.html # Returns cached value >>> # Extract metadata >>> title = note.title >>> permalink = note.permalink """ # Core fields from database id: int slug: str file_path: str published: bool created_at: datetime updated_at: datetime # Internal fields (not from database) _data_dir: Path = field(repr=False, compare=False) # Optional fields deleted_at: Optional[datetime] = None content_hash: Optional[str] = None _cached_content: Optional[str] = field( default=None, repr=False, compare=False, init=False ) _cached_html: Optional[str] = field( default=None, repr=False, compare=False, init=False ) @classmethod def from_row(cls, row: sqlite3.Row | dict[str, Any], data_dir: Path) -> "Note": """ Create Note instance from database row Args: row: Database row (sqlite3.Row or dict with column names) data_dir: Base data directory path Returns: Note instance Examples: >>> row = db.execute("SELECT * FROM notes WHERE id = ?", (1,)).fetchone() >>> note = Note.from_row(row, Path("data")) """ # Handle both sqlite3.Row and dict if hasattr(row, "keys"): data = {key: row[key] for key in row.keys()} else: data = dict(row) # Convert timestamps if they are strings created_at = data["created_at"] if isinstance(created_at, str): created_at = datetime.fromisoformat(created_at.replace("Z", "+00:00")) updated_at = data["updated_at"] if isinstance(updated_at, str): updated_at = datetime.fromisoformat(updated_at.replace("Z", "+00:00")) deleted_at = data.get("deleted_at") if deleted_at and isinstance(deleted_at, str): deleted_at = datetime.fromisoformat(deleted_at.replace("Z", "+00:00")) return cls( id=data["id"], slug=data["slug"], file_path=data["file_path"], published=bool(data["published"]), created_at=created_at, updated_at=updated_at, deleted_at=deleted_at, _data_dir=data_dir, content_hash=data.get("content_hash"), ) @property def content(self) -> str: """ Get note content (lazy-loaded from file) Reads markdown content from file on first access, then caches. Subsequent accesses return cached value. Returns: Markdown content as string Raises: FileNotFoundError: If note file doesn't exist OSError: If file cannot be read Examples: >>> content = note.content >>> print(content) This is my note content... """ if self._cached_content is None: # Build full path full_path = self._data_dir / self.file_path # Validate path for security if not validate_note_path(full_path, self._data_dir): raise ValueError( f"Invalid file path: {self.file_path}. " f"Path traversal detected." ) # Read content from file content = read_note_file(full_path) # Cache it (use object.__setattr__ for frozen dataclass) object.__setattr__(self, "_cached_content", content) return self._cached_content @property def html(self) -> str: """ Get rendered HTML content (lazy-rendered and cached) Renders markdown to HTML on first access, then caches. Uses Python-Markdown with extensions for code highlighting, tables, and other features. Returns: Rendered HTML as string Examples: >>> html = note.html >>> print(html)

This is my note content...

""" if self._cached_html is None: # Render markdown to HTML html = markdown.markdown( self.content, # This triggers lazy load of content extensions=MARKDOWN_EXTENSIONS, ) # Cache it object.__setattr__(self, "_cached_html", html) return self._cached_html @property def title(self) -> str: """ Extract title from content Returns first line of content, or uses slug as fallback. Strips markdown heading syntax (# ) if present. Returns: Title string Examples: >>> # Content: "# My First Note\\n\\nContent here..." >>> note.title 'My First Note' >>> # Content: "Just a note without heading" >>> note.title 'Just a note without heading' """ try: # Get content (may trigger lazy load) content = self.content # Split into lines and get first non-empty line lines = content.strip().split("\n") first_line = "" for line in lines: if line.strip(): first_line = line.strip() break if not first_line: # No content, use slug return self.slug # Strip markdown heading syntax if first_line.startswith("#"): # Remove leading # characters and whitespace first_line = first_line.lstrip("#").strip() # Truncate to max length if len(first_line) > MAX_TITLE_LENGTH: first_line = first_line[:MAX_TITLE_LENGTH] return first_line if first_line else self.slug except (FileNotFoundError, OSError): # If file doesn't exist, use slug return self.slug @property def excerpt(self) -> str: """ Generate short excerpt for previews Returns first 200 characters of content (plain text, no markdown). Strips markdown formatting and adds ellipsis if truncated. Returns: Excerpt string Examples: >>> note.excerpt 'This is my note content. It has some interesting points...' """ try: # Get content content = self.content # Remove markdown heading syntax from first line lines = content.split("\n") cleaned_lines = [] for line in lines: if line.strip().startswith("#"): # Remove heading syntax cleaned_lines.append(line.lstrip("#").strip()) else: cleaned_lines.append(line) cleaned_content = "\n".join(cleaned_lines) # Take first EXCERPT_LENGTH characters excerpt = cleaned_content.strip()[:EXCERPT_LENGTH] # Truncate to word boundary if len(cleaned_content.strip()) > EXCERPT_LENGTH: # Find last space in excerpt last_space = excerpt.rfind(" ") if last_space > 0: excerpt = excerpt[:last_space] excerpt += "..." return excerpt except (FileNotFoundError, OSError): # If file doesn't exist, return slug return self.slug @property def permalink(self) -> str: """ Generate permalink (public URL path) Returns: URL path string (e.g., '/note/my-first-note') Examples: >>> note.permalink '/note/my-first-note' """ return f"/note/{self.slug}" @property def is_published(self) -> bool: """ Alias for published (more readable) Returns: True if note is published, False otherwise """ return self.published def to_dict( self, include_content: bool = False, include_html: bool = False ) -> dict[str, Any]: """ Serialize note to dictionary Converts note to dictionary for JSON serialization or template rendering. Can optionally include content and rendered HTML. Args: include_content: Include markdown content in output include_html: Include rendered HTML in output Returns: Dictionary with note data Examples: >>> note.to_dict() { 'id': 1, 'slug': 'my-first-note', 'title': 'My First Note', 'published': True, 'created_at': '2024-11-18T14:30:00Z', 'updated_at': '2024-11-18T14:30:00Z', 'permalink': '/note/my-first-note' } >>> note.to_dict(include_content=True, include_html=True) { # ... same as above, plus: 'content': 'Markdown content...', 'html': '

Rendered HTML...

' } """ data = { "id": self.id, "slug": self.slug, "title": self.title, "published": self.published, "created_at": self.created_at.strftime("%Y-%m-%dT%H:%M:%SZ"), "updated_at": self.updated_at.strftime("%Y-%m-%dT%H:%M:%SZ"), "permalink": self.permalink, "excerpt": self.excerpt, } if include_content: data["content"] = self.content if include_html: data["html"] = self.html return data def verify_integrity(self) -> bool: """ Verify content matches stored hash Reads content from file, calculates hash, and compares with stored content_hash. Used to detect external file modifications. Returns: True if hash matches, False otherwise Examples: >>> note.verify_integrity() True # File has not been modified >>> # Someone edits file externally >>> note.verify_integrity() False # Hash mismatch detected """ if self.content_hash is None: # No hash stored, cannot verify return False try: # Calculate hash of current content current_hash = calculate_content_hash(self.content) # Compare with stored hash return current_hash == self.content_hash except (FileNotFoundError, OSError): # If file doesn't exist, integrity check fails return False @dataclass(frozen=True) class Session: """ Represents an authenticated session Sessions are created after successful IndieLogin authentication and stored in the database. They have a configurable expiry time and can be extended on use. Attributes: id: Database ID (primary key) session_token: Unique session token (stored in cookie) me: Authenticated user's URL (from IndieLogin) created_at: Session creation timestamp (UTC) expires_at: Session expiration timestamp (UTC) last_used_at: Last activity timestamp (UTC, nullable) Properties: is_expired: Check if session has expired is_active: Check if session is not expired age: Age of session (timedelta) time_until_expiry: Time remaining until expiry (timedelta) Methods: from_row: Create Session from database row to_dict: Serialize to dictionary is_valid: Comprehensive validation (checks expiry, token format) with_updated_last_used: Create new session with updated last_used_at Examples: >>> # Create from database row >>> row = db.execute( ... "SELECT * FROM sessions WHERE session_token = ?", (token,) ... ).fetchone() >>> session = Session.from_row(row) >>> # Check if expired >>> if session.is_expired: ... print("Session expired") >>> # Check validity >>> if session.is_valid(): ... print("Session is valid") >>> # Update last used >>> updated_session = session.with_updated_last_used() """ # Core fields from database id: int session_token: str me: str created_at: datetime expires_at: datetime last_used_at: Optional[datetime] = None @classmethod def from_row(cls, row: sqlite3.Row | dict[str, Any]) -> "Session": """ Create Session instance from database row Args: row: Database row (sqlite3.Row or dict) Returns: Session instance Examples: >>> row = db.execute("SELECT * FROM sessions WHERE id = ?", (1,)).fetchone() >>> session = Session.from_row(row) """ # Handle both sqlite3.Row and dict if hasattr(row, "keys"): data = {key: row[key] for key in row.keys()} else: data = dict(row) # Convert timestamps if they are strings created_at = data["created_at"] if isinstance(created_at, str): created_at = datetime.fromisoformat(created_at.replace("Z", "+00:00")) expires_at = data["expires_at"] if isinstance(expires_at, str): expires_at = datetime.fromisoformat(expires_at.replace("Z", "+00:00")) last_used_at = data.get("last_used_at") if last_used_at and isinstance(last_used_at, str): last_used_at = datetime.fromisoformat(last_used_at.replace("Z", "+00:00")) return cls( id=data["id"], session_token=data["session_token"], me=data["me"], created_at=created_at, expires_at=expires_at, last_used_at=last_used_at, ) @property def is_expired(self) -> bool: """ Check if session has expired Compares expires_at with current UTC time. Returns: True if expired, False otherwise Examples: >>> session.is_expired False """ return datetime.utcnow() >= self.expires_at @property def is_active(self) -> bool: """ Check if session is active (not expired) Returns: True if not expired, False otherwise Examples: >>> session.is_active True """ return not self.is_expired @property def age(self) -> timedelta: """ Get age of session Returns: Timedelta since session creation Examples: >>> session.age datetime.timedelta(days=2, seconds=3600) """ return datetime.utcnow() - self.created_at @property def time_until_expiry(self) -> timedelta: """ Get time remaining until expiry Returns: Timedelta until expiry (negative if already expired) Examples: >>> session.time_until_expiry datetime.timedelta(days=28) """ return self.expires_at - datetime.utcnow() def is_valid(self) -> bool: """ Comprehensive session validation Checks: - Session is not expired - Session token is not empty - User 'me' URL is valid Returns: True if session is valid, False otherwise Examples: >>> session.is_valid() True """ # Check if expired if self.is_expired: return False # Check if token is not empty if not self.session_token or not self.session_token.strip(): return False # Check if 'me' URL is not empty (basic validation) if not self.me or not self.me.strip(): return False return True def with_updated_last_used(self) -> "Session": """ Create new session with updated last_used_at Since sessions are immutable, this returns a new Session instance with last_used_at set to current UTC time. Returns: New Session instance with updated timestamp Examples: >>> updated = session.with_updated_last_used() >>> updated.last_used_at datetime.datetime(2024, 11, 18, 15, 30, 0) """ return replace(self, last_used_at=datetime.utcnow()) def to_dict(self) -> dict[str, Any]: """ Serialize session to dictionary Returns: Dictionary with session data (excludes sensitive token) Examples: >>> session.to_dict() { 'id': 1, 'me': 'https://alice.example.com', 'created_at': '2024-11-18T14:30:00Z', 'expires_at': '2024-12-18T14:30:00Z', 'is_active': True } """ return { "id": self.id, "me": self.me, "created_at": self.created_at.strftime("%Y-%m-%dT%H:%M:%SZ"), "expires_at": self.expires_at.strftime("%Y-%m-%dT%H:%M:%SZ"), "is_active": self.is_active, } @dataclass(frozen=True) class Token: """ Represents a Micropub access token Tokens are used to authenticate Micropub API requests. They have associated scopes that determine what actions the token can perform. Attributes: token: Token string (primary key, used in Authorization header) me: User's URL client_id: Client application URL (optional) scope: Space-separated scope string (e.g., "create update delete") created_at: Token creation timestamp (UTC) expires_at: Token expiration timestamp (UTC, nullable for no expiry) Properties: is_expired: Check if token has expired is_active: Check if token is not expired scopes: List of individual scopes Methods: from_row: Create Token from database row to_dict: Serialize to dictionary has_scope: Check if token has specific scope is_valid: Comprehensive validation Examples: >>> # Create from database row >>> row = db.execute( ... "SELECT * FROM tokens WHERE token = ?", (token,) ... ).fetchone() >>> token_obj = Token.from_row(row) >>> # Check scope >>> if token_obj.has_scope('create'): ... print("Can create posts") >>> # Check if expired >>> if token_obj.is_active: ... print("Token is active") """ # Core fields from database token: str me: str client_id: Optional[str] = None scope: Optional[str] = None created_at: datetime = field(default_factory=lambda: datetime.utcnow()) expires_at: Optional[datetime] = None @classmethod def from_row(cls, row: sqlite3.Row | dict[str, Any]) -> "Token": """ Create Token instance from database row Args: row: Database row (sqlite3.Row or dict) Returns: Token instance Examples: >>> row = db.execute( ... "SELECT * FROM tokens WHERE token = ?", (token,) ... ).fetchone() >>> token_obj = Token.from_row(row) """ # Handle both sqlite3.Row and dict if hasattr(row, "keys"): data = {key: row[key] for key in row.keys()} else: data = dict(row) # Convert timestamps if they are strings created_at = data["created_at"] if isinstance(created_at, str): created_at = datetime.fromisoformat(created_at.replace("Z", "+00:00")) expires_at = data.get("expires_at") if expires_at and isinstance(expires_at, str): expires_at = datetime.fromisoformat(expires_at.replace("Z", "+00:00")) return cls( token=data["token"], me=data["me"], client_id=data.get("client_id"), scope=data.get("scope"), created_at=created_at, expires_at=expires_at, ) @property def is_expired(self) -> bool: """ Check if token has expired If expires_at is None, token never expires. Otherwise, compares with current UTC time. Returns: True if expired, False otherwise Examples: >>> token_obj.is_expired False """ if self.expires_at is None: # No expiry set, token never expires return False return datetime.utcnow() >= self.expires_at @property def is_active(self) -> bool: """ Check if token is active (not expired) Returns: True if not expired, False otherwise """ return not self.is_expired @property def scopes(self) -> list[str]: """ Get list of individual scopes Splits scope string on whitespace. Returns: List of scope strings Examples: >>> # scope = "create update delete" >>> token_obj.scopes ['create', 'update', 'delete'] >>> # scope = None >>> token_obj.scopes [] """ if not self.scope: return [] return self.scope.split() def has_scope(self, required_scope: str) -> bool: """ Check if token has required scope Args: required_scope: Scope to check for (e.g., 'create', 'update') Returns: True if token has scope, False otherwise Examples: >>> token_obj.has_scope('create') True >>> token_obj.has_scope('delete') False """ return required_scope in self.scopes def is_valid(self, required_scope: Optional[str] = None) -> bool: """ Comprehensive token validation Checks: - Token is not expired - Token string is not empty - If required_scope provided, token has that scope Args: required_scope: Optional scope to check Returns: True if token is valid, False otherwise Examples: >>> token_obj.is_valid() True >>> token_obj.is_valid(required_scope='create') True """ # Check if expired if self.is_expired: return False # Check if token is not empty if not self.token or not self.token.strip(): return False # Check required scope if provided if required_scope is not None: if not self.has_scope(required_scope): return False return True def to_dict(self) -> dict[str, Any]: """ Serialize token to dictionary Returns: Dictionary with token data (excludes sensitive token value) Examples: >>> token_obj.to_dict() { 'me': 'https://alice.example.com', 'client_id': 'https://quill.p3k.io', 'scope': 'create update', 'created_at': '2024-11-18T14:30:00Z', 'is_active': True } """ data = { "me": self.me, "client_id": self.client_id, "scope": self.scope, "created_at": self.created_at.strftime("%Y-%m-%dT%H:%M:%SZ"), "is_active": self.is_active, } if self.expires_at: data["expires_at"] = self.expires_at.strftime("%Y-%m-%dT%H:%M:%SZ") return data @dataclass(frozen=True) class AuthState: """ Represents an OAuth state token (CSRF protection) State tokens are short-lived (5 minutes) and single-use. They prevent CSRF attacks during the IndieLogin authentication flow. Attributes: state: Random state token string (primary key) created_at: Token creation timestamp (UTC) expires_at: Token expiration timestamp (UTC) Properties: is_expired: Check if state token has expired is_active: Check if state token is not expired age: Age of state token Methods: from_row: Create AuthState from database row to_dict: Serialize to dictionary is_valid: Comprehensive validation Examples: >>> # Create from database row >>> row = db.execute( ... "SELECT * FROM auth_state WHERE state = ?", (state,) ... ).fetchone() >>> auth_state = AuthState.from_row(row) >>> # Check if expired >>> if auth_state.is_active: ... print("State token is still valid") >>> # Check validity >>> if auth_state.is_valid(): ... print("Valid state token") """ # Core fields from database state: str created_at: datetime expires_at: datetime @classmethod def from_row(cls, row: sqlite3.Row | dict[str, Any]) -> "AuthState": """ Create AuthState instance from database row Args: row: Database row (sqlite3.Row or dict) Returns: AuthState instance Examples: >>> row = db.execute( ... "SELECT * FROM auth_state WHERE state = ?", (state,) ... ).fetchone() >>> auth_state = AuthState.from_row(row) """ # Handle both sqlite3.Row and dict if hasattr(row, "keys"): data = {key: row[key] for key in row.keys()} else: data = dict(row) # Convert timestamps if they are strings created_at = data["created_at"] if isinstance(created_at, str): created_at = datetime.fromisoformat(created_at.replace("Z", "+00:00")) expires_at = data["expires_at"] if isinstance(expires_at, str): expires_at = datetime.fromisoformat(expires_at.replace("Z", "+00:00")) return cls( state=data["state"], created_at=created_at, expires_at=expires_at, ) @property def is_expired(self) -> bool: """ Check if state token has expired State tokens have short expiry (5 minutes default). Returns: True if expired, False otherwise Examples: >>> auth_state.is_expired False """ return datetime.utcnow() >= self.expires_at @property def is_active(self) -> bool: """ Check if state token is active (not expired) Returns: True if not expired, False otherwise """ return not self.is_expired @property def age(self) -> timedelta: """ Get age of state token Returns: Timedelta since token creation Examples: >>> auth_state.age datetime.timedelta(seconds=120) """ return datetime.utcnow() - self.created_at def is_valid(self) -> bool: """ Comprehensive state token validation Checks: - Token is not expired - Token string is not empty Returns: True if valid, False otherwise Examples: >>> auth_state.is_valid() True """ # Check if expired if self.is_expired: return False # Check if state is not empty if not self.state or not self.state.strip(): return False return True def to_dict(self) -> dict[str, Any]: """ Serialize state token to dictionary Returns: Dictionary with state data Examples: >>> auth_state.to_dict() { 'created_at': '2024-11-18T14:30:00Z', 'expires_at': '2024-11-18T14:35:00Z', 'is_active': True } """ return { "created_at": self.created_at.strftime("%Y-%m-%dT%H:%M:%SZ"), "expires_at": self.expires_at.strftime("%Y-%m-%dT%H:%M:%SZ"), "is_active": self.is_active, }