1073 lines
30 KiB
Python
1073 lines
30 KiB
Python
"""
|
|
Data models for StarPunk
|
|
|
|
This module provides data model classes that wrap database rows and provide
|
|
clean interfaces for working with notes, sessions, tokens, and authentication
|
|
state. All models are immutable and use dataclasses for clean structure.
|
|
"""
|
|
|
|
# Standard library imports
|
|
from dataclasses import dataclass, field, replace
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
import sqlite3
|
|
|
|
# Third-party imports
|
|
import markdown
|
|
|
|
# Local imports
|
|
from starpunk.utils import (
|
|
read_note_file,
|
|
calculate_content_hash,
|
|
validate_note_path,
|
|
)
|
|
|
|
# Constants - Session configuration
|
|
DEFAULT_SESSION_EXPIRY_DAYS = 30
|
|
SESSION_EXTENSION_ON_USE = True
|
|
|
|
# Constants - Auth state configuration
|
|
DEFAULT_AUTH_STATE_EXPIRY_MINUTES = 5
|
|
|
|
# Constants - Token configuration
|
|
DEFAULT_TOKEN_EXPIRY_DAYS = 90
|
|
|
|
# Constants - Markdown rendering
|
|
MARKDOWN_EXTENSIONS = ["extra", "codehilite", "nl2br"]
|
|
|
|
# Constants - Content limits
|
|
MAX_TITLE_LENGTH = 200
|
|
EXCERPT_LENGTH = 200
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Note:
|
|
"""
|
|
Represents a note/post
|
|
|
|
This is an immutable data model that wraps a database row and provides
|
|
access to note metadata and lazy-loaded content. Content is read from
|
|
files on-demand, and HTML rendering is cached.
|
|
|
|
Attributes:
|
|
id: Database ID (primary key)
|
|
slug: URL-safe slug (unique)
|
|
file_path: Path to markdown file (relative to data directory)
|
|
published: Whether note is published (visible publicly)
|
|
created_at: Creation timestamp (UTC)
|
|
updated_at: Last update timestamp (UTC)
|
|
content_hash: SHA-256 hash of content (for integrity checking)
|
|
_data_dir: Base data directory path (used for file loading)
|
|
_cached_content: Cached markdown content (lazy-loaded)
|
|
_cached_html: Cached rendered HTML (lazy-loaded)
|
|
|
|
Properties:
|
|
content: Markdown content (loaded from file, cached)
|
|
html: Rendered HTML content (cached)
|
|
title: Extracted title (first line or slug)
|
|
excerpt: Short excerpt for previews
|
|
permalink: Public URL path
|
|
is_published: Alias for published (more readable)
|
|
|
|
Methods:
|
|
from_row: Create Note from database row
|
|
to_dict: Serialize to dictionary (for JSON)
|
|
verify_integrity: Check if file content matches hash
|
|
|
|
Examples:
|
|
>>> # Create from database row
|
|
>>> row = db.execute("SELECT * FROM notes WHERE slug = ?", (slug,)).fetchone()
|
|
>>> note = Note.from_row(row, data_dir=Path("data"))
|
|
|
|
>>> # Access metadata
|
|
>>> print(note.slug)
|
|
'my-first-note'
|
|
>>> print(note.published)
|
|
True
|
|
|
|
>>> # Lazy-load content
|
|
>>> content = note.content # Reads file on first access
|
|
>>> content = note.content # Returns cached value on subsequent access
|
|
|
|
>>> # Render HTML
|
|
>>> html = note.html # Renders markdown on first access
|
|
>>> html = note.html # Returns cached value
|
|
|
|
>>> # Extract metadata
|
|
>>> title = note.title
|
|
>>> permalink = note.permalink
|
|
"""
|
|
|
|
# Core fields from database
|
|
id: int
|
|
slug: str
|
|
file_path: str
|
|
published: bool
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
# Internal fields (not from database)
|
|
_data_dir: Path = field(repr=False, compare=False)
|
|
|
|
# Optional fields
|
|
content_hash: Optional[str] = None
|
|
_cached_content: Optional[str] = field(
|
|
default=None, repr=False, compare=False, init=False
|
|
)
|
|
_cached_html: Optional[str] = field(
|
|
default=None, repr=False, compare=False, init=False
|
|
)
|
|
|
|
@classmethod
|
|
def from_row(cls, row: sqlite3.Row | dict[str, Any], data_dir: Path) -> "Note":
|
|
"""
|
|
Create Note instance from database row
|
|
|
|
Args:
|
|
row: Database row (sqlite3.Row or dict with column names)
|
|
data_dir: Base data directory path
|
|
|
|
Returns:
|
|
Note instance
|
|
|
|
Examples:
|
|
>>> row = db.execute("SELECT * FROM notes WHERE id = ?", (1,)).fetchone()
|
|
>>> note = Note.from_row(row, Path("data"))
|
|
"""
|
|
# Handle both sqlite3.Row and dict
|
|
if hasattr(row, "keys"):
|
|
data = {key: row[key] for key in row.keys()}
|
|
else:
|
|
data = dict(row)
|
|
|
|
# Convert timestamps if they are strings
|
|
created_at = data["created_at"]
|
|
if isinstance(created_at, str):
|
|
created_at = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
|
|
|
|
updated_at = data["updated_at"]
|
|
if isinstance(updated_at, str):
|
|
updated_at = datetime.fromisoformat(updated_at.replace("Z", "+00:00"))
|
|
|
|
return cls(
|
|
id=data["id"],
|
|
slug=data["slug"],
|
|
file_path=data["file_path"],
|
|
published=bool(data["published"]),
|
|
created_at=created_at,
|
|
updated_at=updated_at,
|
|
_data_dir=data_dir,
|
|
content_hash=data.get("content_hash"),
|
|
)
|
|
|
|
@property
|
|
def content(self) -> str:
|
|
"""
|
|
Get note content (lazy-loaded from file)
|
|
|
|
Reads markdown content from file on first access, then caches.
|
|
Subsequent accesses return cached value.
|
|
|
|
Returns:
|
|
Markdown content as string
|
|
|
|
Raises:
|
|
FileNotFoundError: If note file doesn't exist
|
|
OSError: If file cannot be read
|
|
|
|
Examples:
|
|
>>> content = note.content
|
|
>>> print(content)
|
|
This is my note content...
|
|
"""
|
|
if self._cached_content is None:
|
|
# Build full path
|
|
full_path = self._data_dir / self.file_path
|
|
|
|
# Validate path for security
|
|
if not validate_note_path(full_path, self._data_dir):
|
|
raise ValueError(
|
|
f"Invalid file path: {self.file_path}. " f"Path traversal detected."
|
|
)
|
|
|
|
# Read content from file
|
|
content = read_note_file(full_path)
|
|
|
|
# Cache it (use object.__setattr__ for frozen dataclass)
|
|
object.__setattr__(self, "_cached_content", content)
|
|
|
|
return self._cached_content
|
|
|
|
@property
|
|
def html(self) -> str:
|
|
"""
|
|
Get rendered HTML content (lazy-rendered and cached)
|
|
|
|
Renders markdown to HTML on first access, then caches.
|
|
Uses Python-Markdown with extensions for code highlighting,
|
|
tables, and other features.
|
|
|
|
Returns:
|
|
Rendered HTML as string
|
|
|
|
Examples:
|
|
>>> html = note.html
|
|
>>> print(html)
|
|
<p>This is my note content...</p>
|
|
"""
|
|
if self._cached_html is None:
|
|
# Render markdown to HTML
|
|
html = markdown.markdown(
|
|
self.content, # This triggers lazy load of content
|
|
extensions=MARKDOWN_EXTENSIONS,
|
|
)
|
|
|
|
# Cache it
|
|
object.__setattr__(self, "_cached_html", html)
|
|
|
|
return self._cached_html
|
|
|
|
@property
|
|
def title(self) -> str:
|
|
"""
|
|
Extract title from content
|
|
|
|
Returns first line of content, or uses slug as fallback.
|
|
Strips markdown heading syntax (# ) if present.
|
|
|
|
Returns:
|
|
Title string
|
|
|
|
Examples:
|
|
>>> # Content: "# My First Note\\n\\nContent here..."
|
|
>>> note.title
|
|
'My First Note'
|
|
|
|
>>> # Content: "Just a note without heading"
|
|
>>> note.title
|
|
'Just a note without heading'
|
|
"""
|
|
try:
|
|
# Get content (may trigger lazy load)
|
|
content = self.content
|
|
|
|
# Split into lines and get first non-empty line
|
|
lines = content.strip().split("\n")
|
|
first_line = ""
|
|
for line in lines:
|
|
if line.strip():
|
|
first_line = line.strip()
|
|
break
|
|
|
|
if not first_line:
|
|
# No content, use slug
|
|
return self.slug
|
|
|
|
# Strip markdown heading syntax
|
|
if first_line.startswith("#"):
|
|
# Remove leading # characters and whitespace
|
|
first_line = first_line.lstrip("#").strip()
|
|
|
|
# Truncate to max length
|
|
if len(first_line) > MAX_TITLE_LENGTH:
|
|
first_line = first_line[:MAX_TITLE_LENGTH]
|
|
|
|
return first_line if first_line else self.slug
|
|
|
|
except (FileNotFoundError, OSError):
|
|
# If file doesn't exist, use slug
|
|
return self.slug
|
|
|
|
@property
|
|
def excerpt(self) -> str:
|
|
"""
|
|
Generate short excerpt for previews
|
|
|
|
Returns first 200 characters of content (plain text, no markdown).
|
|
Strips markdown formatting and adds ellipsis if truncated.
|
|
|
|
Returns:
|
|
Excerpt string
|
|
|
|
Examples:
|
|
>>> note.excerpt
|
|
'This is my note content. It has some interesting points...'
|
|
"""
|
|
try:
|
|
# Get content
|
|
content = self.content
|
|
|
|
# Remove markdown heading syntax from first line
|
|
lines = content.split("\n")
|
|
cleaned_lines = []
|
|
for line in lines:
|
|
if line.strip().startswith("#"):
|
|
# Remove heading syntax
|
|
cleaned_lines.append(line.lstrip("#").strip())
|
|
else:
|
|
cleaned_lines.append(line)
|
|
|
|
cleaned_content = "\n".join(cleaned_lines)
|
|
|
|
# Take first EXCERPT_LENGTH characters
|
|
excerpt = cleaned_content.strip()[:EXCERPT_LENGTH]
|
|
|
|
# Truncate to word boundary
|
|
if len(cleaned_content.strip()) > EXCERPT_LENGTH:
|
|
# Find last space in excerpt
|
|
last_space = excerpt.rfind(" ")
|
|
if last_space > 0:
|
|
excerpt = excerpt[:last_space]
|
|
excerpt += "..."
|
|
|
|
return excerpt
|
|
|
|
except (FileNotFoundError, OSError):
|
|
# If file doesn't exist, return slug
|
|
return self.slug
|
|
|
|
@property
|
|
def permalink(self) -> str:
|
|
"""
|
|
Generate permalink (public URL path)
|
|
|
|
Returns:
|
|
URL path string (e.g., '/note/my-first-note')
|
|
|
|
Examples:
|
|
>>> note.permalink
|
|
'/note/my-first-note'
|
|
"""
|
|
return f"/note/{self.slug}"
|
|
|
|
@property
|
|
def is_published(self) -> bool:
|
|
"""
|
|
Alias for published (more readable)
|
|
|
|
Returns:
|
|
True if note is published, False otherwise
|
|
"""
|
|
return self.published
|
|
|
|
def to_dict(
|
|
self, include_content: bool = False, include_html: bool = False
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Serialize note to dictionary
|
|
|
|
Converts note to dictionary for JSON serialization or template rendering.
|
|
Can optionally include content and rendered HTML.
|
|
|
|
Args:
|
|
include_content: Include markdown content in output
|
|
include_html: Include rendered HTML in output
|
|
|
|
Returns:
|
|
Dictionary with note data
|
|
|
|
Examples:
|
|
>>> note.to_dict()
|
|
{
|
|
'id': 1,
|
|
'slug': 'my-first-note',
|
|
'title': 'My First Note',
|
|
'published': True,
|
|
'created_at': '2024-11-18T14:30:00Z',
|
|
'updated_at': '2024-11-18T14:30:00Z',
|
|
'permalink': '/note/my-first-note'
|
|
}
|
|
|
|
>>> note.to_dict(include_content=True, include_html=True)
|
|
{
|
|
# ... same as above, plus:
|
|
'content': 'Markdown content...',
|
|
'html': '<p>Rendered HTML...</p>'
|
|
}
|
|
"""
|
|
data = {
|
|
"id": self.id,
|
|
"slug": self.slug,
|
|
"title": self.title,
|
|
"published": self.published,
|
|
"created_at": self.created_at.strftime("%Y-%m-%dT%H:%M:%SZ"),
|
|
"updated_at": self.updated_at.strftime("%Y-%m-%dT%H:%M:%SZ"),
|
|
"permalink": self.permalink,
|
|
"excerpt": self.excerpt,
|
|
}
|
|
|
|
if include_content:
|
|
data["content"] = self.content
|
|
|
|
if include_html:
|
|
data["html"] = self.html
|
|
|
|
return data
|
|
|
|
def verify_integrity(self) -> bool:
|
|
"""
|
|
Verify content matches stored hash
|
|
|
|
Reads content from file, calculates hash, and compares with
|
|
stored content_hash. Used to detect external file modifications.
|
|
|
|
Returns:
|
|
True if hash matches, False otherwise
|
|
|
|
Examples:
|
|
>>> note.verify_integrity()
|
|
True # File has not been modified
|
|
|
|
>>> # Someone edits file externally
|
|
>>> note.verify_integrity()
|
|
False # Hash mismatch detected
|
|
"""
|
|
if self.content_hash is None:
|
|
# No hash stored, cannot verify
|
|
return False
|
|
|
|
try:
|
|
# Calculate hash of current content
|
|
current_hash = calculate_content_hash(self.content)
|
|
|
|
# Compare with stored hash
|
|
return current_hash == self.content_hash
|
|
|
|
except (FileNotFoundError, OSError):
|
|
# If file doesn't exist, integrity check fails
|
|
return False
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Session:
|
|
"""
|
|
Represents an authenticated session
|
|
|
|
Sessions are created after successful IndieLogin authentication and
|
|
stored in the database. They have a configurable expiry time and can
|
|
be extended on use.
|
|
|
|
Attributes:
|
|
id: Database ID (primary key)
|
|
session_token: Unique session token (stored in cookie)
|
|
me: Authenticated user's URL (from IndieLogin)
|
|
created_at: Session creation timestamp (UTC)
|
|
expires_at: Session expiration timestamp (UTC)
|
|
last_used_at: Last activity timestamp (UTC, nullable)
|
|
|
|
Properties:
|
|
is_expired: Check if session has expired
|
|
is_active: Check if session is not expired
|
|
age: Age of session (timedelta)
|
|
time_until_expiry: Time remaining until expiry (timedelta)
|
|
|
|
Methods:
|
|
from_row: Create Session from database row
|
|
to_dict: Serialize to dictionary
|
|
is_valid: Comprehensive validation (checks expiry, token format)
|
|
with_updated_last_used: Create new session with updated last_used_at
|
|
|
|
Examples:
|
|
>>> # Create from database row
|
|
>>> row = db.execute(
|
|
... "SELECT * FROM sessions WHERE session_token = ?", (token,)
|
|
... ).fetchone()
|
|
>>> session = Session.from_row(row)
|
|
|
|
>>> # Check if expired
|
|
>>> if session.is_expired:
|
|
... print("Session expired")
|
|
|
|
>>> # Check validity
|
|
>>> if session.is_valid():
|
|
... print("Session is valid")
|
|
|
|
>>> # Update last used
|
|
>>> updated_session = session.with_updated_last_used()
|
|
"""
|
|
|
|
# Core fields from database
|
|
id: int
|
|
session_token: str
|
|
me: str
|
|
created_at: datetime
|
|
expires_at: datetime
|
|
last_used_at: Optional[datetime] = None
|
|
|
|
@classmethod
|
|
def from_row(cls, row: sqlite3.Row | dict[str, Any]) -> "Session":
|
|
"""
|
|
Create Session instance from database row
|
|
|
|
Args:
|
|
row: Database row (sqlite3.Row or dict)
|
|
|
|
Returns:
|
|
Session instance
|
|
|
|
Examples:
|
|
>>> row = db.execute("SELECT * FROM sessions WHERE id = ?", (1,)).fetchone()
|
|
>>> session = Session.from_row(row)
|
|
"""
|
|
# Handle both sqlite3.Row and dict
|
|
if hasattr(row, "keys"):
|
|
data = {key: row[key] for key in row.keys()}
|
|
else:
|
|
data = dict(row)
|
|
|
|
# Convert timestamps if they are strings
|
|
created_at = data["created_at"]
|
|
if isinstance(created_at, str):
|
|
created_at = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
|
|
|
|
expires_at = data["expires_at"]
|
|
if isinstance(expires_at, str):
|
|
expires_at = datetime.fromisoformat(expires_at.replace("Z", "+00:00"))
|
|
|
|
last_used_at = data.get("last_used_at")
|
|
if last_used_at and isinstance(last_used_at, str):
|
|
last_used_at = datetime.fromisoformat(last_used_at.replace("Z", "+00:00"))
|
|
|
|
return cls(
|
|
id=data["id"],
|
|
session_token=data["session_token"],
|
|
me=data["me"],
|
|
created_at=created_at,
|
|
expires_at=expires_at,
|
|
last_used_at=last_used_at,
|
|
)
|
|
|
|
@property
|
|
def is_expired(self) -> bool:
|
|
"""
|
|
Check if session has expired
|
|
|
|
Compares expires_at with current UTC time.
|
|
|
|
Returns:
|
|
True if expired, False otherwise
|
|
|
|
Examples:
|
|
>>> session.is_expired
|
|
False
|
|
"""
|
|
return datetime.utcnow() >= self.expires_at
|
|
|
|
@property
|
|
def is_active(self) -> bool:
|
|
"""
|
|
Check if session is active (not expired)
|
|
|
|
Returns:
|
|
True if not expired, False otherwise
|
|
|
|
Examples:
|
|
>>> session.is_active
|
|
True
|
|
"""
|
|
return not self.is_expired
|
|
|
|
@property
|
|
def age(self) -> timedelta:
|
|
"""
|
|
Get age of session
|
|
|
|
Returns:
|
|
Timedelta since session creation
|
|
|
|
Examples:
|
|
>>> session.age
|
|
datetime.timedelta(days=2, seconds=3600)
|
|
"""
|
|
return datetime.utcnow() - self.created_at
|
|
|
|
@property
|
|
def time_until_expiry(self) -> timedelta:
|
|
"""
|
|
Get time remaining until expiry
|
|
|
|
Returns:
|
|
Timedelta until expiry (negative if already expired)
|
|
|
|
Examples:
|
|
>>> session.time_until_expiry
|
|
datetime.timedelta(days=28)
|
|
"""
|
|
return self.expires_at - datetime.utcnow()
|
|
|
|
def is_valid(self) -> bool:
|
|
"""
|
|
Comprehensive session validation
|
|
|
|
Checks:
|
|
- Session is not expired
|
|
- Session token is not empty
|
|
- User 'me' URL is valid
|
|
|
|
Returns:
|
|
True if session is valid, False otherwise
|
|
|
|
Examples:
|
|
>>> session.is_valid()
|
|
True
|
|
"""
|
|
# Check if expired
|
|
if self.is_expired:
|
|
return False
|
|
|
|
# Check if token is not empty
|
|
if not self.session_token or not self.session_token.strip():
|
|
return False
|
|
|
|
# Check if 'me' URL is not empty (basic validation)
|
|
if not self.me or not self.me.strip():
|
|
return False
|
|
|
|
return True
|
|
|
|
def with_updated_last_used(self) -> "Session":
|
|
"""
|
|
Create new session with updated last_used_at
|
|
|
|
Since sessions are immutable, this returns a new Session instance
|
|
with last_used_at set to current UTC time.
|
|
|
|
Returns:
|
|
New Session instance with updated timestamp
|
|
|
|
Examples:
|
|
>>> updated = session.with_updated_last_used()
|
|
>>> updated.last_used_at
|
|
datetime.datetime(2024, 11, 18, 15, 30, 0)
|
|
"""
|
|
return replace(self, last_used_at=datetime.utcnow())
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
"""
|
|
Serialize session to dictionary
|
|
|
|
Returns:
|
|
Dictionary with session data (excludes sensitive token)
|
|
|
|
Examples:
|
|
>>> session.to_dict()
|
|
{
|
|
'id': 1,
|
|
'me': 'https://alice.example.com',
|
|
'created_at': '2024-11-18T14:30:00Z',
|
|
'expires_at': '2024-12-18T14:30:00Z',
|
|
'is_active': True
|
|
}
|
|
"""
|
|
return {
|
|
"id": self.id,
|
|
"me": self.me,
|
|
"created_at": self.created_at.strftime("%Y-%m-%dT%H:%M:%SZ"),
|
|
"expires_at": self.expires_at.strftime("%Y-%m-%dT%H:%M:%SZ"),
|
|
"is_active": self.is_active,
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Token:
|
|
"""
|
|
Represents a Micropub access token
|
|
|
|
Tokens are used to authenticate Micropub API requests. They have
|
|
associated scopes that determine what actions the token can perform.
|
|
|
|
Attributes:
|
|
token: Token string (primary key, used in Authorization header)
|
|
me: User's URL
|
|
client_id: Client application URL (optional)
|
|
scope: Space-separated scope string (e.g., "create update delete")
|
|
created_at: Token creation timestamp (UTC)
|
|
expires_at: Token expiration timestamp (UTC, nullable for no expiry)
|
|
|
|
Properties:
|
|
is_expired: Check if token has expired
|
|
is_active: Check if token is not expired
|
|
scopes: List of individual scopes
|
|
|
|
Methods:
|
|
from_row: Create Token from database row
|
|
to_dict: Serialize to dictionary
|
|
has_scope: Check if token has specific scope
|
|
is_valid: Comprehensive validation
|
|
|
|
Examples:
|
|
>>> # Create from database row
|
|
>>> row = db.execute(
|
|
... "SELECT * FROM tokens WHERE token = ?", (token,)
|
|
... ).fetchone()
|
|
>>> token_obj = Token.from_row(row)
|
|
|
|
>>> # Check scope
|
|
>>> if token_obj.has_scope('create'):
|
|
... print("Can create posts")
|
|
|
|
>>> # Check if expired
|
|
>>> if token_obj.is_active:
|
|
... print("Token is active")
|
|
"""
|
|
|
|
# Core fields from database
|
|
token: str
|
|
me: str
|
|
client_id: Optional[str] = None
|
|
scope: Optional[str] = None
|
|
created_at: datetime = field(default_factory=lambda: datetime.utcnow())
|
|
expires_at: Optional[datetime] = None
|
|
|
|
@classmethod
|
|
def from_row(cls, row: sqlite3.Row | dict[str, Any]) -> "Token":
|
|
"""
|
|
Create Token instance from database row
|
|
|
|
Args:
|
|
row: Database row (sqlite3.Row or dict)
|
|
|
|
Returns:
|
|
Token instance
|
|
|
|
Examples:
|
|
>>> row = db.execute(
|
|
... "SELECT * FROM tokens WHERE token = ?", (token,)
|
|
... ).fetchone()
|
|
>>> token_obj = Token.from_row(row)
|
|
"""
|
|
# Handle both sqlite3.Row and dict
|
|
if hasattr(row, "keys"):
|
|
data = {key: row[key] for key in row.keys()}
|
|
else:
|
|
data = dict(row)
|
|
|
|
# Convert timestamps if they are strings
|
|
created_at = data["created_at"]
|
|
if isinstance(created_at, str):
|
|
created_at = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
|
|
|
|
expires_at = data.get("expires_at")
|
|
if expires_at and isinstance(expires_at, str):
|
|
expires_at = datetime.fromisoformat(expires_at.replace("Z", "+00:00"))
|
|
|
|
return cls(
|
|
token=data["token"],
|
|
me=data["me"],
|
|
client_id=data.get("client_id"),
|
|
scope=data.get("scope"),
|
|
created_at=created_at,
|
|
expires_at=expires_at,
|
|
)
|
|
|
|
@property
|
|
def is_expired(self) -> bool:
|
|
"""
|
|
Check if token has expired
|
|
|
|
If expires_at is None, token never expires.
|
|
Otherwise, compares with current UTC time.
|
|
|
|
Returns:
|
|
True if expired, False otherwise
|
|
|
|
Examples:
|
|
>>> token_obj.is_expired
|
|
False
|
|
"""
|
|
if self.expires_at is None:
|
|
# No expiry set, token never expires
|
|
return False
|
|
|
|
return datetime.utcnow() >= self.expires_at
|
|
|
|
@property
|
|
def is_active(self) -> bool:
|
|
"""
|
|
Check if token is active (not expired)
|
|
|
|
Returns:
|
|
True if not expired, False otherwise
|
|
"""
|
|
return not self.is_expired
|
|
|
|
@property
|
|
def scopes(self) -> list[str]:
|
|
"""
|
|
Get list of individual scopes
|
|
|
|
Splits scope string on whitespace.
|
|
|
|
Returns:
|
|
List of scope strings
|
|
|
|
Examples:
|
|
>>> # scope = "create update delete"
|
|
>>> token_obj.scopes
|
|
['create', 'update', 'delete']
|
|
|
|
>>> # scope = None
|
|
>>> token_obj.scopes
|
|
[]
|
|
"""
|
|
if not self.scope:
|
|
return []
|
|
|
|
return self.scope.split()
|
|
|
|
def has_scope(self, required_scope: str) -> bool:
|
|
"""
|
|
Check if token has required scope
|
|
|
|
Args:
|
|
required_scope: Scope to check for (e.g., 'create', 'update')
|
|
|
|
Returns:
|
|
True if token has scope, False otherwise
|
|
|
|
Examples:
|
|
>>> token_obj.has_scope('create')
|
|
True
|
|
|
|
>>> token_obj.has_scope('delete')
|
|
False
|
|
"""
|
|
return required_scope in self.scopes
|
|
|
|
def is_valid(self, required_scope: Optional[str] = None) -> bool:
|
|
"""
|
|
Comprehensive token validation
|
|
|
|
Checks:
|
|
- Token is not expired
|
|
- Token string is not empty
|
|
- If required_scope provided, token has that scope
|
|
|
|
Args:
|
|
required_scope: Optional scope to check
|
|
|
|
Returns:
|
|
True if token is valid, False otherwise
|
|
|
|
Examples:
|
|
>>> token_obj.is_valid()
|
|
True
|
|
|
|
>>> token_obj.is_valid(required_scope='create')
|
|
True
|
|
"""
|
|
# Check if expired
|
|
if self.is_expired:
|
|
return False
|
|
|
|
# Check if token is not empty
|
|
if not self.token or not self.token.strip():
|
|
return False
|
|
|
|
# Check required scope if provided
|
|
if required_scope is not None:
|
|
if not self.has_scope(required_scope):
|
|
return False
|
|
|
|
return True
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
"""
|
|
Serialize token to dictionary
|
|
|
|
Returns:
|
|
Dictionary with token data (excludes sensitive token value)
|
|
|
|
Examples:
|
|
>>> token_obj.to_dict()
|
|
{
|
|
'me': 'https://alice.example.com',
|
|
'client_id': 'https://quill.p3k.io',
|
|
'scope': 'create update',
|
|
'created_at': '2024-11-18T14:30:00Z',
|
|
'is_active': True
|
|
}
|
|
"""
|
|
data = {
|
|
"me": self.me,
|
|
"client_id": self.client_id,
|
|
"scope": self.scope,
|
|
"created_at": self.created_at.strftime("%Y-%m-%dT%H:%M:%SZ"),
|
|
"is_active": self.is_active,
|
|
}
|
|
|
|
if self.expires_at:
|
|
data["expires_at"] = self.expires_at.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
return data
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AuthState:
|
|
"""
|
|
Represents an OAuth state token (CSRF protection)
|
|
|
|
State tokens are short-lived (5 minutes) and single-use. They prevent
|
|
CSRF attacks during the IndieLogin authentication flow.
|
|
|
|
Attributes:
|
|
state: Random state token string (primary key)
|
|
created_at: Token creation timestamp (UTC)
|
|
expires_at: Token expiration timestamp (UTC)
|
|
|
|
Properties:
|
|
is_expired: Check if state token has expired
|
|
is_active: Check if state token is not expired
|
|
age: Age of state token
|
|
|
|
Methods:
|
|
from_row: Create AuthState from database row
|
|
to_dict: Serialize to dictionary
|
|
is_valid: Comprehensive validation
|
|
|
|
Examples:
|
|
>>> # Create from database row
|
|
>>> row = db.execute(
|
|
... "SELECT * FROM auth_state WHERE state = ?", (state,)
|
|
... ).fetchone()
|
|
>>> auth_state = AuthState.from_row(row)
|
|
|
|
>>> # Check if expired
|
|
>>> if auth_state.is_active:
|
|
... print("State token is still valid")
|
|
|
|
>>> # Check validity
|
|
>>> if auth_state.is_valid():
|
|
... print("Valid state token")
|
|
"""
|
|
|
|
# Core fields from database
|
|
state: str
|
|
created_at: datetime
|
|
expires_at: datetime
|
|
|
|
@classmethod
|
|
def from_row(cls, row: sqlite3.Row | dict[str, Any]) -> "AuthState":
|
|
"""
|
|
Create AuthState instance from database row
|
|
|
|
Args:
|
|
row: Database row (sqlite3.Row or dict)
|
|
|
|
Returns:
|
|
AuthState instance
|
|
|
|
Examples:
|
|
>>> row = db.execute(
|
|
... "SELECT * FROM auth_state WHERE state = ?", (state,)
|
|
... ).fetchone()
|
|
>>> auth_state = AuthState.from_row(row)
|
|
"""
|
|
# Handle both sqlite3.Row and dict
|
|
if hasattr(row, "keys"):
|
|
data = {key: row[key] for key in row.keys()}
|
|
else:
|
|
data = dict(row)
|
|
|
|
# Convert timestamps if they are strings
|
|
created_at = data["created_at"]
|
|
if isinstance(created_at, str):
|
|
created_at = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
|
|
|
|
expires_at = data["expires_at"]
|
|
if isinstance(expires_at, str):
|
|
expires_at = datetime.fromisoformat(expires_at.replace("Z", "+00:00"))
|
|
|
|
return cls(
|
|
state=data["state"],
|
|
created_at=created_at,
|
|
expires_at=expires_at,
|
|
)
|
|
|
|
@property
|
|
def is_expired(self) -> bool:
|
|
"""
|
|
Check if state token has expired
|
|
|
|
State tokens have short expiry (5 minutes default).
|
|
|
|
Returns:
|
|
True if expired, False otherwise
|
|
|
|
Examples:
|
|
>>> auth_state.is_expired
|
|
False
|
|
"""
|
|
return datetime.utcnow() >= self.expires_at
|
|
|
|
@property
|
|
def is_active(self) -> bool:
|
|
"""
|
|
Check if state token is active (not expired)
|
|
|
|
Returns:
|
|
True if not expired, False otherwise
|
|
"""
|
|
return not self.is_expired
|
|
|
|
@property
|
|
def age(self) -> timedelta:
|
|
"""
|
|
Get age of state token
|
|
|
|
Returns:
|
|
Timedelta since token creation
|
|
|
|
Examples:
|
|
>>> auth_state.age
|
|
datetime.timedelta(seconds=120)
|
|
"""
|
|
return datetime.utcnow() - self.created_at
|
|
|
|
def is_valid(self) -> bool:
|
|
"""
|
|
Comprehensive state token validation
|
|
|
|
Checks:
|
|
- Token is not expired
|
|
- Token string is not empty
|
|
|
|
Returns:
|
|
True if valid, False otherwise
|
|
|
|
Examples:
|
|
>>> auth_state.is_valid()
|
|
True
|
|
"""
|
|
# Check if expired
|
|
if self.is_expired:
|
|
return False
|
|
|
|
# Check if state is not empty
|
|
if not self.state or not self.state.strip():
|
|
return False
|
|
|
|
return True
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
"""
|
|
Serialize state token to dictionary
|
|
|
|
Returns:
|
|
Dictionary with state data
|
|
|
|
Examples:
|
|
>>> auth_state.to_dict()
|
|
{
|
|
'created_at': '2024-11-18T14:30:00Z',
|
|
'expires_at': '2024-11-18T14:35:00Z',
|
|
'is_active': True
|
|
}
|
|
"""
|
|
return {
|
|
"created_at": self.created_at.strftime("%Y-%m-%dT%H:%M:%SZ"),
|
|
"expires_at": self.expires_at.strftime("%Y-%m-%dT%H:%M:%SZ"),
|
|
"is_active": self.is_active,
|
|
}
|