that initial commit
This commit is contained in:
56
starpunk/__init__.py
Normal file
56
starpunk/__init__.py
Normal file
@@ -0,0 +1,56 @@
|
||||
"""
|
||||
StarPunk package initialization
|
||||
Creates and configures the Flask application
|
||||
"""
|
||||
|
||||
from flask import Flask
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def create_app(config=None):
|
||||
"""
|
||||
Application factory for StarPunk
|
||||
|
||||
Args:
|
||||
config: Optional configuration dict to override defaults
|
||||
|
||||
Returns:
|
||||
Configured Flask application instance
|
||||
"""
|
||||
app = Flask(
|
||||
__name__,
|
||||
static_folder='../static',
|
||||
template_folder='../templates'
|
||||
)
|
||||
|
||||
# Load configuration
|
||||
from starpunk.config import load_config
|
||||
load_config(app, config)
|
||||
|
||||
# Initialize database
|
||||
from starpunk.database import init_db
|
||||
init_db(app)
|
||||
|
||||
# Register blueprints
|
||||
# TODO: Implement blueprints in separate modules
|
||||
# from starpunk.routes import public, admin, api
|
||||
# app.register_blueprint(public.bp)
|
||||
# app.register_blueprint(admin.bp)
|
||||
# app.register_blueprint(api.bp)
|
||||
|
||||
# Error handlers
|
||||
@app.errorhandler(404)
|
||||
def not_found(error):
|
||||
return {'error': 'Not found'}, 404
|
||||
|
||||
@app.errorhandler(500)
|
||||
def server_error(error):
|
||||
return {'error': 'Internal server error'}, 500
|
||||
|
||||
return app
|
||||
|
||||
|
||||
# Package version (Semantic Versioning 2.0.0)
|
||||
# See docs/standards/versioning-strategy.md for details
|
||||
__version__ = "0.3.0"
|
||||
__version_info__ = (0, 3, 0)
|
||||
73
starpunk/config.py
Normal file
73
starpunk/config.py
Normal file
@@ -0,0 +1,73 @@
|
||||
"""
|
||||
Configuration management for StarPunk
|
||||
Loads settings from environment variables and .env file
|
||||
"""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
||||
def load_config(app, config_override=None):
|
||||
"""
|
||||
Load configuration into Flask app
|
||||
|
||||
Args:
|
||||
app: Flask application instance
|
||||
config_override: Optional dict to override config values
|
||||
"""
|
||||
# Load .env file
|
||||
load_dotenv()
|
||||
|
||||
# Site configuration
|
||||
app.config['SITE_URL'] = os.getenv('SITE_URL', 'http://localhost:5000')
|
||||
app.config['SITE_NAME'] = os.getenv('SITE_NAME', 'StarPunk')
|
||||
app.config['SITE_AUTHOR'] = os.getenv('SITE_AUTHOR', 'Unknown')
|
||||
app.config['SITE_DESCRIPTION'] = os.getenv(
|
||||
'SITE_DESCRIPTION',
|
||||
'A minimal IndieWeb CMS'
|
||||
)
|
||||
|
||||
# Authentication
|
||||
app.config['ADMIN_ME'] = os.getenv('ADMIN_ME')
|
||||
app.config['SESSION_SECRET'] = os.getenv('SESSION_SECRET')
|
||||
app.config['SESSION_LIFETIME'] = int(os.getenv('SESSION_LIFETIME', '30'))
|
||||
app.config['INDIELOGIN_URL'] = os.getenv(
|
||||
'INDIELOGIN_URL',
|
||||
'https://indielogin.com'
|
||||
)
|
||||
|
||||
# Validate required configuration
|
||||
if not app.config['SESSION_SECRET']:
|
||||
raise ValueError(
|
||||
"SESSION_SECRET must be set in .env file. "
|
||||
"Generate with: python3 -c \"import secrets; print(secrets.token_hex(32))\""
|
||||
)
|
||||
|
||||
# Flask secret key (uses SESSION_SECRET by default)
|
||||
app.config['SECRET_KEY'] = os.getenv(
|
||||
'FLASK_SECRET_KEY',
|
||||
app.config['SESSION_SECRET']
|
||||
)
|
||||
|
||||
# Data paths
|
||||
app.config['DATA_PATH'] = Path(os.getenv('DATA_PATH', './data'))
|
||||
app.config['NOTES_PATH'] = Path(os.getenv('NOTES_PATH', './data/notes'))
|
||||
app.config['DATABASE_PATH'] = Path(
|
||||
os.getenv('DATABASE_PATH', './data/starpunk.db')
|
||||
)
|
||||
|
||||
# Flask environment
|
||||
app.config['ENV'] = os.getenv('FLASK_ENV', 'development')
|
||||
app.config['DEBUG'] = os.getenv('FLASK_DEBUG', '1') == '1'
|
||||
|
||||
# Logging
|
||||
app.config['LOG_LEVEL'] = os.getenv('LOG_LEVEL', 'INFO')
|
||||
|
||||
# Apply overrides if provided
|
||||
if config_override:
|
||||
app.config.update(config_override)
|
||||
|
||||
# Ensure data directories exist
|
||||
app.config['DATA_PATH'].mkdir(parents=True, exist_ok=True)
|
||||
app.config['NOTES_PATH'].mkdir(parents=True, exist_ok=True)
|
||||
104
starpunk/database.py
Normal file
104
starpunk/database.py
Normal file
@@ -0,0 +1,104 @@
|
||||
"""
|
||||
Database initialization and operations for StarPunk
|
||||
SQLite database for metadata, sessions, and tokens
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
# Database schema
|
||||
SCHEMA_SQL = """
|
||||
-- Notes metadata (content is in files)
|
||||
CREATE TABLE IF NOT EXISTS notes (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
slug TEXT UNIQUE NOT NULL,
|
||||
file_path TEXT UNIQUE NOT NULL,
|
||||
published BOOLEAN DEFAULT 0,
|
||||
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
deleted_at TIMESTAMP,
|
||||
content_hash TEXT
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_notes_created_at ON notes(created_at DESC);
|
||||
CREATE INDEX IF NOT EXISTS idx_notes_published ON notes(published);
|
||||
CREATE INDEX IF NOT EXISTS idx_notes_slug ON notes(slug);
|
||||
CREATE INDEX IF NOT EXISTS idx_notes_deleted_at ON notes(deleted_at);
|
||||
|
||||
-- Authentication sessions (IndieLogin)
|
||||
CREATE TABLE IF NOT EXISTS sessions (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
session_token TEXT UNIQUE NOT NULL,
|
||||
me TEXT NOT NULL,
|
||||
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
expires_at TIMESTAMP NOT NULL,
|
||||
last_used_at TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_sessions_token ON sessions(session_token);
|
||||
CREATE INDEX IF NOT EXISTS idx_sessions_expires ON sessions(expires_at);
|
||||
|
||||
-- Micropub access tokens
|
||||
CREATE TABLE IF NOT EXISTS tokens (
|
||||
token TEXT PRIMARY KEY,
|
||||
me TEXT NOT NULL,
|
||||
client_id TEXT,
|
||||
scope TEXT,
|
||||
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
expires_at TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_tokens_me ON tokens(me);
|
||||
|
||||
-- CSRF state tokens (for IndieAuth flow)
|
||||
CREATE TABLE IF NOT EXISTS auth_state (
|
||||
state TEXT PRIMARY KEY,
|
||||
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
expires_at TIMESTAMP NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_auth_state_expires ON auth_state(expires_at);
|
||||
"""
|
||||
|
||||
|
||||
def init_db(app=None):
|
||||
"""
|
||||
Initialize database schema
|
||||
|
||||
Args:
|
||||
app: Flask application instance (optional, for config access)
|
||||
"""
|
||||
if app:
|
||||
db_path = app.config['DATABASE_PATH']
|
||||
else:
|
||||
# Fallback to default path
|
||||
db_path = Path('./data/starpunk.db')
|
||||
|
||||
# Ensure parent directory exists
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Create database and schema
|
||||
conn = sqlite3.connect(db_path)
|
||||
try:
|
||||
conn.executescript(SCHEMA_SQL)
|
||||
conn.commit()
|
||||
print(f"Database initialized: {db_path}")
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def get_db(app):
|
||||
"""
|
||||
Get database connection
|
||||
|
||||
Args:
|
||||
app: Flask application instance
|
||||
|
||||
Returns:
|
||||
sqlite3.Connection
|
||||
"""
|
||||
db_path = app.config['DATABASE_PATH']
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.row_factory = sqlite3.Row # Return rows as dictionaries
|
||||
return conn
|
||||
1072
starpunk/models.py
Normal file
1072
starpunk/models.py
Normal file
File diff suppressed because it is too large
Load Diff
866
starpunk/notes.py
Normal file
866
starpunk/notes.py
Normal file
@@ -0,0 +1,866 @@
|
||||
"""
|
||||
Notes management for StarPunk
|
||||
|
||||
This module provides CRUD operations for notes with atomic file+database
|
||||
synchronization. All write operations use database transactions to ensure
|
||||
files and database records stay in sync.
|
||||
|
||||
Functions:
|
||||
create_note: Create new note with file and database entry
|
||||
get_note: Retrieve note by slug or ID
|
||||
list_notes: List notes with filtering and pagination
|
||||
update_note: Update note content and/or metadata
|
||||
delete_note: Delete note (soft or hard delete)
|
||||
|
||||
Exceptions:
|
||||
NoteNotFoundError: Note does not exist
|
||||
InvalidNoteDataError: Invalid content or parameters
|
||||
NoteSyncError: File/database synchronization failure
|
||||
NoteError: Base exception for all note operations
|
||||
"""
|
||||
|
||||
# Standard library imports
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
# Third-party imports
|
||||
from flask import current_app
|
||||
|
||||
# Local imports
|
||||
from starpunk.database import get_db
|
||||
from starpunk.models import Note
|
||||
from starpunk.utils import (
|
||||
generate_slug,
|
||||
make_slug_unique,
|
||||
generate_note_path,
|
||||
ensure_note_directory,
|
||||
write_note_file,
|
||||
delete_note_file,
|
||||
calculate_content_hash,
|
||||
validate_note_path,
|
||||
validate_slug
|
||||
)
|
||||
|
||||
|
||||
# Custom Exceptions
|
||||
|
||||
class NoteError(Exception):
|
||||
"""Base exception for note operations"""
|
||||
pass
|
||||
|
||||
|
||||
class NoteNotFoundError(NoteError):
|
||||
"""
|
||||
Raised when a note cannot be found
|
||||
|
||||
This exception is raised when attempting to retrieve, update, or delete
|
||||
a note that doesn't exist in the database.
|
||||
|
||||
Attributes:
|
||||
identifier: The slug or ID used to search for the note
|
||||
message: Human-readable error message
|
||||
"""
|
||||
def __init__(self, identifier: str | int, message: Optional[str] = None):
|
||||
self.identifier = identifier
|
||||
if message is None:
|
||||
message = f"Note not found: {identifier}"
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class InvalidNoteDataError(NoteError, ValueError):
|
||||
"""
|
||||
Raised when note data is invalid
|
||||
|
||||
This exception is raised when attempting to create or update a note
|
||||
with invalid data (empty content, invalid slug, etc.)
|
||||
|
||||
Attributes:
|
||||
field: The field that failed validation
|
||||
value: The invalid value
|
||||
message: Human-readable error message
|
||||
"""
|
||||
def __init__(self, field: str, value: any, message: Optional[str] = None):
|
||||
self.field = field
|
||||
self.value = value
|
||||
if message is None:
|
||||
message = f"Invalid {field}: {value}"
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class NoteSyncError(NoteError):
|
||||
"""
|
||||
Raised when file/database synchronization fails
|
||||
|
||||
This exception is raised when a file operation and database operation
|
||||
cannot be kept in sync (e.g., file written but database insert failed).
|
||||
|
||||
Attributes:
|
||||
operation: The operation that failed ('create', 'update', 'delete')
|
||||
details: Additional details about the failure
|
||||
message: Human-readable error message
|
||||
"""
|
||||
def __init__(self, operation: str, details: str, message: Optional[str] = None):
|
||||
self.operation = operation
|
||||
self.details = details
|
||||
if message is None:
|
||||
message = f"Sync error during {operation}: {details}"
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
# Helper Functions
|
||||
|
||||
def _get_existing_slugs(db) -> set[str]:
|
||||
"""
|
||||
Query all existing slugs from database
|
||||
|
||||
Args:
|
||||
db: Database connection
|
||||
|
||||
Returns:
|
||||
Set of existing slug strings
|
||||
"""
|
||||
rows = db.execute("SELECT slug FROM notes").fetchall()
|
||||
return {row['slug'] for row in rows}
|
||||
|
||||
|
||||
# Core CRUD Functions
|
||||
|
||||
def create_note(
|
||||
content: str,
|
||||
published: bool = False,
|
||||
created_at: Optional[datetime] = None
|
||||
) -> Note:
|
||||
"""
|
||||
Create a new note
|
||||
|
||||
Creates a new note by generating a unique slug, writing the markdown
|
||||
content to a file, and inserting a database record. File and database
|
||||
operations are atomic - if either fails, both are rolled back.
|
||||
|
||||
Args:
|
||||
content: Markdown content for the note (must not be empty)
|
||||
published: Whether the note should be published (default: False)
|
||||
created_at: Creation timestamp (default: current UTC time)
|
||||
|
||||
Returns:
|
||||
Note object with all metadata and content loaded
|
||||
|
||||
Raises:
|
||||
InvalidNoteDataError: If content is empty or whitespace-only
|
||||
NoteSyncError: If file write succeeds but database insert fails
|
||||
OSError: If file cannot be written (permissions, disk full, etc.)
|
||||
ValueError: If configuration is missing or invalid
|
||||
|
||||
Examples:
|
||||
>>> # Create unpublished draft
|
||||
>>> note = create_note("# My First Note\\n\\nContent here.", published=False)
|
||||
>>> print(note.slug)
|
||||
'my-first-note'
|
||||
|
||||
>>> # Create published note
|
||||
>>> note = create_note(
|
||||
... "Just published this!",
|
||||
... published=True
|
||||
... )
|
||||
>>> print(note.published)
|
||||
True
|
||||
|
||||
>>> # Create with specific timestamp
|
||||
>>> from datetime import datetime
|
||||
>>> note = create_note(
|
||||
... "Backdated note",
|
||||
... created_at=datetime(2024, 1, 1, 12, 0, 0)
|
||||
... )
|
||||
|
||||
Transaction Safety:
|
||||
1. Validates content (before any changes)
|
||||
2. Generates unique slug (database query)
|
||||
3. Writes file to disk
|
||||
4. Begins database transaction
|
||||
5. Inserts database record
|
||||
6. If database fails: deletes file, raises NoteSyncError
|
||||
7. If successful: commits transaction, returns Note
|
||||
|
||||
Notes:
|
||||
- Slug is generated from first 5 words of content
|
||||
- Random suffix added if slug already exists
|
||||
- File path follows pattern: data/notes/YYYY/MM/slug.md
|
||||
- Content hash calculated and stored for integrity checking
|
||||
- created_at and updated_at set to same value initially
|
||||
"""
|
||||
# 1. VALIDATION (before any changes)
|
||||
if not content or not content.strip():
|
||||
raise InvalidNoteDataError(
|
||||
'content',
|
||||
content,
|
||||
'Content cannot be empty or whitespace-only'
|
||||
)
|
||||
|
||||
# 2. SETUP
|
||||
if created_at is None:
|
||||
created_at = datetime.utcnow()
|
||||
|
||||
updated_at = created_at # Same as created_at for new notes
|
||||
|
||||
data_dir = Path(current_app.config['DATA_PATH'])
|
||||
|
||||
# 3. GENERATE UNIQUE SLUG
|
||||
# Query all existing slugs from database
|
||||
db = get_db(current_app)
|
||||
existing_slugs = _get_existing_slugs(db)
|
||||
|
||||
# Generate base slug from content
|
||||
base_slug = generate_slug(content, created_at)
|
||||
|
||||
# Make unique if collision
|
||||
slug = make_slug_unique(base_slug, existing_slugs)
|
||||
|
||||
# Validate final slug (defensive check)
|
||||
if not validate_slug(slug):
|
||||
raise InvalidNoteDataError('slug', slug, f'Generated slug is invalid: {slug}')
|
||||
|
||||
# 4. GENERATE FILE PATH
|
||||
note_path = generate_note_path(slug, created_at, data_dir)
|
||||
|
||||
# Security: Validate path stays within data directory
|
||||
if not validate_note_path(note_path, data_dir):
|
||||
raise NoteSyncError(
|
||||
'create',
|
||||
f'Generated path outside data directory: {note_path}',
|
||||
'Path validation failed'
|
||||
)
|
||||
|
||||
# 5. CALCULATE CONTENT HASH
|
||||
content_hash = calculate_content_hash(content)
|
||||
|
||||
# 6. WRITE FILE (before database to fail fast on disk issues)
|
||||
try:
|
||||
ensure_note_directory(note_path)
|
||||
write_note_file(note_path, content)
|
||||
except OSError as e:
|
||||
# File write failed, nothing to clean up
|
||||
raise NoteSyncError(
|
||||
'create',
|
||||
f'Failed to write file: {e}',
|
||||
f'Could not write note file: {note_path}'
|
||||
)
|
||||
|
||||
# 7. INSERT DATABASE RECORD (transaction starts here)
|
||||
file_path_rel = str(note_path.relative_to(data_dir))
|
||||
|
||||
try:
|
||||
db.execute(
|
||||
"""
|
||||
INSERT INTO notes (slug, file_path, published, created_at, updated_at, content_hash)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(slug, file_path_rel, published, created_at, updated_at, content_hash)
|
||||
)
|
||||
db.commit()
|
||||
except Exception as e:
|
||||
# Database insert failed, delete the file we created
|
||||
try:
|
||||
note_path.unlink()
|
||||
except OSError:
|
||||
# Log warning but don't fail - file cleanup is best effort
|
||||
current_app.logger.warning(f'Failed to clean up file after DB error: {note_path}')
|
||||
|
||||
# Raise sync error
|
||||
raise NoteSyncError(
|
||||
'create',
|
||||
f'Database insert failed: {e}',
|
||||
f'Failed to create note: {slug}'
|
||||
)
|
||||
|
||||
# 8. RETRIEVE AND RETURN NOTE OBJECT
|
||||
# Get the auto-generated ID
|
||||
note_id = db.execute("SELECT last_insert_rowid()").fetchone()[0]
|
||||
|
||||
# Fetch the complete record
|
||||
row = db.execute(
|
||||
"SELECT * FROM notes WHERE id = ?",
|
||||
(note_id,)
|
||||
).fetchone()
|
||||
|
||||
# Create Note object
|
||||
note = Note.from_row(row, data_dir)
|
||||
|
||||
return note
|
||||
|
||||
|
||||
def get_note(
|
||||
slug: Optional[str] = None,
|
||||
id: Optional[int] = None,
|
||||
load_content: bool = True
|
||||
) -> Optional[Note]:
|
||||
"""
|
||||
Get a note by slug or ID
|
||||
|
||||
Retrieves note metadata from database and optionally loads content
|
||||
from file. Exactly one of slug or id must be provided.
|
||||
|
||||
Args:
|
||||
slug: Note slug (unique identifier in URLs)
|
||||
id: Note database ID (primary key)
|
||||
load_content: Whether to load file content (default: True)
|
||||
|
||||
Returns:
|
||||
Note object with metadata and optionally content, or None if not found
|
||||
|
||||
Raises:
|
||||
ValueError: If both slug and id provided, or neither provided
|
||||
OSError: If file cannot be read (when load_content=True)
|
||||
FileNotFoundError: If note file doesn't exist (when load_content=True)
|
||||
|
||||
Examples:
|
||||
>>> # Get by slug
|
||||
>>> note = get_note(slug="my-first-note")
|
||||
>>> if note:
|
||||
... print(note.content) # Content loaded
|
||||
... else:
|
||||
... print("Note not found")
|
||||
|
||||
>>> # Get by ID
|
||||
>>> note = get_note(id=42)
|
||||
|
||||
>>> # Get metadata only (no file I/O)
|
||||
>>> note = get_note(slug="my-note", load_content=False)
|
||||
>>> print(note.slug) # Works
|
||||
>>> print(note.content) # Will trigger file load on access
|
||||
|
||||
>>> # Check if note exists
|
||||
>>> if get_note(slug="maybe-exists"):
|
||||
... print("Note exists")
|
||||
|
||||
Performance:
|
||||
- Metadata retrieval: Single database query, <1ms
|
||||
- Content loading: File I/O, typically <5ms for normal notes
|
||||
- Use load_content=False for list operations to avoid file I/O
|
||||
|
||||
Notes:
|
||||
- Returns None if note not found (does not raise exception)
|
||||
- Content hash verification is optional (logs warning if mismatch)
|
||||
- Note.content property will lazy-load if load_content=False
|
||||
- Soft-deleted notes (deleted_at != NULL) are excluded
|
||||
"""
|
||||
# 1. VALIDATE PARAMETERS
|
||||
if slug is None and id is None:
|
||||
raise ValueError("Must provide either slug or id")
|
||||
|
||||
if slug is not None and id is not None:
|
||||
raise ValueError("Cannot provide both slug and id")
|
||||
|
||||
# 2. QUERY DATABASE
|
||||
db = get_db(current_app)
|
||||
|
||||
if slug is not None:
|
||||
# Query by slug
|
||||
row = db.execute(
|
||||
"SELECT * FROM notes WHERE slug = ? AND deleted_at IS NULL",
|
||||
(slug,)
|
||||
).fetchone()
|
||||
else:
|
||||
# Query by ID
|
||||
row = db.execute(
|
||||
"SELECT * FROM notes WHERE id = ? AND deleted_at IS NULL",
|
||||
(id,)
|
||||
).fetchone()
|
||||
|
||||
# 3. CHECK IF FOUND
|
||||
if row is None:
|
||||
return None
|
||||
|
||||
# 4. CREATE NOTE OBJECT
|
||||
data_dir = Path(current_app.config['DATA_PATH'])
|
||||
note = Note.from_row(row, data_dir)
|
||||
|
||||
# 5. OPTIONALLY LOAD CONTENT
|
||||
if load_content:
|
||||
# Access content property to trigger load
|
||||
try:
|
||||
_ = note.content
|
||||
except (FileNotFoundError, OSError) as e:
|
||||
current_app.logger.warning(
|
||||
f'Failed to load content for note {note.slug}: {e}'
|
||||
)
|
||||
|
||||
# 6. OPTIONALLY VERIFY INTEGRITY
|
||||
# This is a passive check - log warning but don't fail
|
||||
if load_content and note.content_hash:
|
||||
try:
|
||||
if not note.verify_integrity():
|
||||
current_app.logger.warning(
|
||||
f'Content hash mismatch for note {note.slug}. '
|
||||
f'File may have been modified externally.'
|
||||
)
|
||||
except Exception as e:
|
||||
current_app.logger.warning(
|
||||
f'Failed to verify integrity for note {note.slug}: {e}'
|
||||
)
|
||||
|
||||
# 7. RETURN NOTE
|
||||
return note
|
||||
|
||||
|
||||
def list_notes(
|
||||
published_only: bool = False,
|
||||
limit: int = 50,
|
||||
offset: int = 0,
|
||||
order_by: str = 'created_at',
|
||||
order_dir: str = 'DESC'
|
||||
) -> list[Note]:
|
||||
"""
|
||||
List notes with filtering and pagination
|
||||
|
||||
Retrieves notes from database with optional filtering by published
|
||||
status, sorting, and pagination. Does not load file content for
|
||||
performance - use note.content to lazy-load when needed.
|
||||
|
||||
Args:
|
||||
published_only: If True, only return published notes (default: False)
|
||||
limit: Maximum number of notes to return (default: 50, max: 1000)
|
||||
offset: Number of notes to skip for pagination (default: 0)
|
||||
order_by: Field to sort by (default: 'created_at')
|
||||
order_dir: Sort direction, 'ASC' or 'DESC' (default: 'DESC')
|
||||
|
||||
Returns:
|
||||
List of Note objects with metadata only (content not loaded)
|
||||
|
||||
Raises:
|
||||
ValueError: If order_by is not a valid column name (SQL injection prevention)
|
||||
ValueError: If order_dir is not 'ASC' or 'DESC'
|
||||
ValueError: If limit exceeds maximum allowed value
|
||||
|
||||
Examples:
|
||||
>>> # List recent published notes
|
||||
>>> notes = list_notes(published_only=True, limit=10)
|
||||
>>> for note in notes:
|
||||
... print(note.slug, note.created_at)
|
||||
|
||||
>>> # List all notes, oldest first
|
||||
>>> notes = list_notes(order_dir='ASC')
|
||||
|
||||
>>> # Pagination (page 2, 20 per page)
|
||||
>>> notes = list_notes(limit=20, offset=20)
|
||||
|
||||
>>> # List by update time
|
||||
>>> notes = list_notes(order_by='updated_at')
|
||||
|
||||
Performance:
|
||||
- Single database query
|
||||
- No file I/O (content not loaded)
|
||||
- Efficient for large result sets with pagination
|
||||
- Typical query time: <10ms for 1000s of notes
|
||||
|
||||
Pagination Example:
|
||||
>>> page = 1
|
||||
>>> per_page = 20
|
||||
>>> notes = list_notes(
|
||||
... published_only=True,
|
||||
... limit=per_page,
|
||||
... offset=(page - 1) * per_page
|
||||
... )
|
||||
|
||||
Notes:
|
||||
- Excludes soft-deleted notes (deleted_at IS NULL)
|
||||
- Content is lazy-loaded when accessed via note.content
|
||||
- order_by values are validated to prevent SQL injection
|
||||
- Default sort is newest first (created_at DESC)
|
||||
"""
|
||||
# 1. VALIDATE PARAMETERS
|
||||
# Prevent SQL injection - validate order_by column
|
||||
ALLOWED_ORDER_FIELDS = ['id', 'slug', 'created_at', 'updated_at', 'published']
|
||||
if order_by not in ALLOWED_ORDER_FIELDS:
|
||||
raise ValueError(
|
||||
f"Invalid order_by field: {order_by}. "
|
||||
f"Allowed: {', '.join(ALLOWED_ORDER_FIELDS)}"
|
||||
)
|
||||
|
||||
# Validate order direction
|
||||
order_dir = order_dir.upper()
|
||||
if order_dir not in ['ASC', 'DESC']:
|
||||
raise ValueError(f"Invalid order_dir: {order_dir}. Must be 'ASC' or 'DESC'")
|
||||
|
||||
# Validate limit (prevent excessive queries)
|
||||
MAX_LIMIT = 1000
|
||||
if limit > MAX_LIMIT:
|
||||
raise ValueError(f"Limit {limit} exceeds maximum {MAX_LIMIT}")
|
||||
|
||||
if limit < 1:
|
||||
raise ValueError(f"Limit must be >= 1")
|
||||
|
||||
if offset < 0:
|
||||
raise ValueError(f"Offset must be >= 0")
|
||||
|
||||
# 2. BUILD QUERY
|
||||
# Start with base query
|
||||
query = "SELECT * FROM notes WHERE deleted_at IS NULL"
|
||||
|
||||
# Add filters
|
||||
params = []
|
||||
if published_only:
|
||||
query += " AND published = 1"
|
||||
|
||||
# Add ordering (safe because order_by validated above)
|
||||
query += f" ORDER BY {order_by} {order_dir}"
|
||||
|
||||
# Add pagination
|
||||
query += " LIMIT ? OFFSET ?"
|
||||
params.extend([limit, offset])
|
||||
|
||||
# 3. EXECUTE QUERY
|
||||
db = get_db(current_app)
|
||||
rows = db.execute(query, params).fetchall()
|
||||
|
||||
# 4. CREATE NOTE OBJECTS (without loading content)
|
||||
data_dir = Path(current_app.config['DATA_PATH'])
|
||||
notes = [Note.from_row(row, data_dir) for row in rows]
|
||||
|
||||
return notes
|
||||
|
||||
|
||||
def update_note(
|
||||
slug: Optional[str] = None,
|
||||
id: Optional[int] = None,
|
||||
content: Optional[str] = None,
|
||||
published: Optional[bool] = None
|
||||
) -> Note:
|
||||
"""
|
||||
Update a note's content and/or published status
|
||||
|
||||
Updates note content and/or metadata, maintaining atomic synchronization
|
||||
between file and database. At least one of content or published must
|
||||
be provided.
|
||||
|
||||
Args:
|
||||
slug: Note slug to update (mutually exclusive with id)
|
||||
id: Note ID to update (mutually exclusive with slug)
|
||||
content: New markdown content (None = no change)
|
||||
published: New published status (None = no change)
|
||||
|
||||
Returns:
|
||||
Updated Note object with new content and metadata
|
||||
|
||||
Raises:
|
||||
ValueError: If both slug and id provided, or neither provided
|
||||
ValueError: If neither content nor published provided (no changes)
|
||||
NoteNotFoundError: If note doesn't exist
|
||||
InvalidNoteDataError: If content is empty/whitespace (when provided)
|
||||
NoteSyncError: If file update succeeds but database update fails
|
||||
OSError: If file cannot be written
|
||||
|
||||
Examples:
|
||||
>>> # Update content only
|
||||
>>> note = update_note(
|
||||
... slug="my-note",
|
||||
... content="# Updated content\\n\\nNew text here."
|
||||
... )
|
||||
|
||||
>>> # Publish a draft
|
||||
>>> note = update_note(slug="draft-note", published=True)
|
||||
|
||||
>>> # Update both content and status
|
||||
>>> note = update_note(
|
||||
... id=42,
|
||||
... content="New content",
|
||||
... published=True
|
||||
... )
|
||||
|
||||
>>> # Unpublish a note
|
||||
>>> note = update_note(slug="old-post", published=False)
|
||||
|
||||
Transaction Safety:
|
||||
1. Validates parameters
|
||||
2. Retrieves existing note from database
|
||||
3. If content changed: writes new file (old file preserved)
|
||||
4. Begins database transaction
|
||||
5. Updates database record
|
||||
6. If database fails: log error, raise NoteSyncError
|
||||
7. If successful: commits transaction, returns updated Note
|
||||
|
||||
Notes:
|
||||
- Slug cannot be changed (use delete + create for that)
|
||||
- updated_at is automatically set to current time
|
||||
- Content hash recalculated if content changes
|
||||
- File is overwritten atomically (temp file + rename)
|
||||
- Old file content is lost (no backup by default)
|
||||
"""
|
||||
# 1. VALIDATE PARAMETERS
|
||||
if slug is None and id is None:
|
||||
raise ValueError("Must provide either slug or id")
|
||||
|
||||
if slug is not None and id is not None:
|
||||
raise ValueError("Cannot provide both slug and id")
|
||||
|
||||
if content is None and published is None:
|
||||
raise ValueError("Must provide at least one of content or published to update")
|
||||
|
||||
# Validate content if provided
|
||||
if content is not None:
|
||||
if not content or not content.strip():
|
||||
raise InvalidNoteDataError(
|
||||
'content',
|
||||
content,
|
||||
'Content cannot be empty or whitespace-only'
|
||||
)
|
||||
|
||||
# 2. GET EXISTING NOTE
|
||||
existing_note = get_note(slug=slug, id=id, load_content=False)
|
||||
|
||||
if existing_note is None:
|
||||
identifier = slug if slug is not None else id
|
||||
raise NoteNotFoundError(identifier)
|
||||
|
||||
# 3. SETUP
|
||||
updated_at = datetime.utcnow()
|
||||
data_dir = Path(current_app.config['DATA_PATH'])
|
||||
note_path = data_dir / existing_note.file_path
|
||||
|
||||
# Validate path (security check)
|
||||
if not validate_note_path(note_path, data_dir):
|
||||
raise NoteSyncError(
|
||||
'update',
|
||||
f'Note file path outside data directory: {note_path}',
|
||||
'Path validation failed'
|
||||
)
|
||||
|
||||
# 4. UPDATE FILE (if content changed)
|
||||
new_content_hash = existing_note.content_hash
|
||||
if content is not None:
|
||||
try:
|
||||
# Write new content atomically
|
||||
write_note_file(note_path, content)
|
||||
|
||||
# Calculate new hash
|
||||
new_content_hash = calculate_content_hash(content)
|
||||
except OSError as e:
|
||||
raise NoteSyncError(
|
||||
'update',
|
||||
f'Failed to write file: {e}',
|
||||
f'Could not update note file: {note_path}'
|
||||
)
|
||||
|
||||
# 5. UPDATE DATABASE
|
||||
db = get_db(current_app)
|
||||
|
||||
# Build update query based on what changed
|
||||
update_fields = ['updated_at = ?']
|
||||
params = [updated_at]
|
||||
|
||||
if content is not None:
|
||||
update_fields.append('content_hash = ?')
|
||||
params.append(new_content_hash)
|
||||
|
||||
if published is not None:
|
||||
update_fields.append('published = ?')
|
||||
params.append(published)
|
||||
|
||||
# Add WHERE clause parameter
|
||||
if slug is not None:
|
||||
where_clause = "slug = ?"
|
||||
params.append(slug)
|
||||
else:
|
||||
where_clause = "id = ?"
|
||||
params.append(id)
|
||||
|
||||
query = f"UPDATE notes SET {', '.join(update_fields)} WHERE {where_clause}"
|
||||
|
||||
try:
|
||||
db.execute(query, params)
|
||||
db.commit()
|
||||
except Exception as e:
|
||||
# Database update failed
|
||||
# File has been updated, but we can't roll that back easily
|
||||
# Log error and raise
|
||||
current_app.logger.error(
|
||||
f'Database update failed for note {existing_note.slug}: {e}'
|
||||
)
|
||||
raise NoteSyncError(
|
||||
'update',
|
||||
f'Database update failed: {e}',
|
||||
f'Failed to update note: {existing_note.slug}'
|
||||
)
|
||||
|
||||
# 6. RETURN UPDATED NOTE
|
||||
updated_note = get_note(slug=existing_note.slug, load_content=True)
|
||||
|
||||
return updated_note
|
||||
|
||||
|
||||
def delete_note(
|
||||
slug: Optional[str] = None,
|
||||
id: Optional[int] = None,
|
||||
soft: bool = True
|
||||
) -> None:
|
||||
"""
|
||||
Delete a note (soft or hard delete)
|
||||
|
||||
Deletes a note either by marking it as deleted (soft delete) or by
|
||||
permanently removing the file and database record (hard delete).
|
||||
|
||||
Args:
|
||||
slug: Note slug to delete (mutually exclusive with id)
|
||||
id: Note ID to delete (mutually exclusive with id)
|
||||
soft: If True, soft delete (mark deleted_at); if False, hard delete (default: True)
|
||||
|
||||
Returns:
|
||||
None
|
||||
|
||||
Raises:
|
||||
ValueError: If both slug and id provided, or neither provided
|
||||
NoteSyncError: If file deletion succeeds but database update fails
|
||||
OSError: If file cannot be deleted
|
||||
|
||||
Examples:
|
||||
>>> # Soft delete (default)
|
||||
>>> delete_note(slug="old-note")
|
||||
>>> # Note marked as deleted, file remains
|
||||
|
||||
>>> # Hard delete
|
||||
>>> delete_note(slug="spam-note", soft=False)
|
||||
>>> # Note and file permanently removed
|
||||
|
||||
>>> # Delete by ID
|
||||
>>> delete_note(id=42, soft=False)
|
||||
|
||||
Soft Delete:
|
||||
- Sets deleted_at timestamp in database
|
||||
- File remains on disk (optionally moved to .trash/)
|
||||
- Note excluded from normal queries (deleted_at IS NULL)
|
||||
- Can be undeleted by clearing deleted_at (future feature)
|
||||
|
||||
Hard Delete:
|
||||
- Removes database record permanently
|
||||
- Deletes file from disk
|
||||
- Cannot be recovered
|
||||
- Use for spam, test data, or confirmed deletions
|
||||
|
||||
Transaction Safety:
|
||||
Soft delete:
|
||||
1. Updates database (sets deleted_at)
|
||||
2. Optionally moves file to .trash/
|
||||
3. If move fails: log warning but succeed (database is source of truth)
|
||||
|
||||
Hard delete:
|
||||
1. Deletes database record
|
||||
2. Deletes file from disk
|
||||
3. If file delete fails: log warning but succeed (record already gone)
|
||||
|
||||
Notes:
|
||||
- Soft delete is default and recommended
|
||||
- Hard delete is permanent and cannot be undone
|
||||
- Missing files during hard delete are not errors (idempotent)
|
||||
- Deleting already-deleted note returns successfully (idempotent)
|
||||
"""
|
||||
# 1. VALIDATE PARAMETERS
|
||||
if slug is None and id is None:
|
||||
raise ValueError("Must provide either slug or id")
|
||||
|
||||
if slug is not None and id is not None:
|
||||
raise ValueError("Cannot provide both slug and id")
|
||||
|
||||
# 2. GET EXISTING NOTE
|
||||
# For soft delete, exclude already soft-deleted notes
|
||||
# For hard delete, get note even if soft-deleted
|
||||
if soft:
|
||||
existing_note = get_note(slug=slug, id=id, load_content=False)
|
||||
else:
|
||||
# Hard delete: query including soft-deleted notes
|
||||
db = get_db(current_app)
|
||||
if slug is not None:
|
||||
row = db.execute(
|
||||
"SELECT * FROM notes WHERE slug = ?",
|
||||
(slug,)
|
||||
).fetchone()
|
||||
else:
|
||||
row = db.execute(
|
||||
"SELECT * FROM notes WHERE id = ?",
|
||||
(id,)
|
||||
).fetchone()
|
||||
|
||||
if row is None:
|
||||
existing_note = None
|
||||
else:
|
||||
data_dir = Path(current_app.config['DATA_PATH'])
|
||||
existing_note = Note.from_row(row, data_dir)
|
||||
|
||||
# 3. CHECK IF NOTE EXISTS
|
||||
if existing_note is None:
|
||||
# Note not found - could already be deleted
|
||||
# For idempotency, don't raise error - just return
|
||||
return
|
||||
|
||||
# 4. SETUP
|
||||
data_dir = Path(current_app.config['DATA_PATH'])
|
||||
note_path = data_dir / existing_note.file_path
|
||||
|
||||
# Validate path (security check)
|
||||
if not validate_note_path(note_path, data_dir):
|
||||
raise NoteSyncError(
|
||||
'delete',
|
||||
f'Note file path outside data directory: {note_path}',
|
||||
'Path validation failed'
|
||||
)
|
||||
|
||||
# 5. PERFORM DELETION
|
||||
db = get_db(current_app)
|
||||
|
||||
if soft:
|
||||
# SOFT DELETE: Mark as deleted in database
|
||||
deleted_at = datetime.utcnow()
|
||||
|
||||
try:
|
||||
db.execute(
|
||||
"UPDATE notes SET deleted_at = ? WHERE id = ?",
|
||||
(deleted_at, existing_note.id)
|
||||
)
|
||||
db.commit()
|
||||
except Exception as e:
|
||||
raise NoteSyncError(
|
||||
'delete',
|
||||
f'Database update failed: {e}',
|
||||
f'Failed to soft delete note: {existing_note.slug}'
|
||||
)
|
||||
|
||||
# Optionally move file to trash (best effort)
|
||||
# This is optional and failure is not critical
|
||||
try:
|
||||
delete_note_file(note_path, soft=True, data_dir=data_dir)
|
||||
except Exception as e:
|
||||
current_app.logger.warning(
|
||||
f'Failed to move file to trash for note {existing_note.slug}: {e}'
|
||||
)
|
||||
# Don't fail - database update succeeded
|
||||
|
||||
else:
|
||||
# HARD DELETE: Remove from database and filesystem
|
||||
try:
|
||||
db.execute(
|
||||
"DELETE FROM notes WHERE id = ?",
|
||||
(existing_note.id,)
|
||||
)
|
||||
db.commit()
|
||||
except Exception as e:
|
||||
raise NoteSyncError(
|
||||
'delete',
|
||||
f'Database delete failed: {e}',
|
||||
f'Failed to delete note: {existing_note.slug}'
|
||||
)
|
||||
|
||||
# Delete file (best effort)
|
||||
try:
|
||||
delete_note_file(note_path, soft=False)
|
||||
except FileNotFoundError:
|
||||
# File already gone - that's fine
|
||||
current_app.logger.info(
|
||||
f'File already deleted for note {existing_note.slug}'
|
||||
)
|
||||
except Exception as e:
|
||||
current_app.logger.warning(
|
||||
f'Failed to delete file for note {existing_note.slug}: {e}'
|
||||
)
|
||||
# Don't fail - database record already deleted
|
||||
|
||||
# 6. RETURN (no value)
|
||||
return None
|
||||
644
starpunk/utils.py
Normal file
644
starpunk/utils.py
Normal file
@@ -0,0 +1,644 @@
|
||||
"""
|
||||
Core utility functions for StarPunk
|
||||
|
||||
This module provides essential utilities for slug generation, file operations,
|
||||
hashing, and date/time handling. These utilities are used throughout the
|
||||
application and have no external dependencies beyond standard library.
|
||||
"""
|
||||
|
||||
# Standard library imports
|
||||
import hashlib
|
||||
import re
|
||||
import secrets
|
||||
import shutil
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
# Constants - Slug configuration
|
||||
MAX_SLUG_LENGTH = 100
|
||||
MIN_SLUG_LENGTH = 1
|
||||
SLUG_WORDS_COUNT = 5
|
||||
RANDOM_SUFFIX_LENGTH = 4
|
||||
|
||||
# Reserved slugs (system routes)
|
||||
RESERVED_SLUGS = {"admin", "api", "static", "auth", "feed", "login", "logout"}
|
||||
|
||||
# File operations
|
||||
TEMP_FILE_SUFFIX = ".tmp"
|
||||
TRASH_DIR_NAME = ".trash"
|
||||
|
||||
# Hashing
|
||||
CONTENT_HASH_ALGORITHM = "sha256"
|
||||
|
||||
# Regex patterns
|
||||
SLUG_PATTERN = re.compile(r"^[a-z0-9]+(?:-[a-z0-9]+)*$")
|
||||
SAFE_SLUG_PATTERN = re.compile(r"[^a-z0-9-]")
|
||||
MULTIPLE_HYPHENS_PATTERN = re.compile(r"-+")
|
||||
|
||||
# Character set for random suffix generation
|
||||
RANDOM_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789"
|
||||
|
||||
|
||||
# Helper Functions
|
||||
|
||||
|
||||
def extract_first_words(text: str, max_words: int = 5) -> str:
|
||||
"""
|
||||
Extract first N words from text
|
||||
|
||||
Helper function for slug generation. Splits text on whitespace
|
||||
and returns first N non-empty words.
|
||||
|
||||
Args:
|
||||
text: Text to extract words from
|
||||
max_words: Maximum number of words to extract (default: 5)
|
||||
|
||||
Returns:
|
||||
Space-separated string of first N words
|
||||
|
||||
Examples:
|
||||
>>> extract_first_words("Hello world this is a test", 3)
|
||||
'Hello world this'
|
||||
|
||||
>>> extract_first_words(" Multiple spaces ", 2)
|
||||
'Multiple spaces'
|
||||
"""
|
||||
words = text.strip().split()
|
||||
return " ".join(words[:max_words])
|
||||
|
||||
|
||||
def normalize_slug_text(text: str) -> str:
|
||||
"""
|
||||
Normalize text for use in slug
|
||||
|
||||
Converts to lowercase, replaces spaces with hyphens, removes
|
||||
special characters, and collapses multiple hyphens.
|
||||
|
||||
Args:
|
||||
text: Text to normalize
|
||||
|
||||
Returns:
|
||||
Normalized slug-safe text
|
||||
|
||||
Examples:
|
||||
>>> normalize_slug_text("Hello World!")
|
||||
'hello-world'
|
||||
|
||||
>>> normalize_slug_text("Testing... with -- special chars!")
|
||||
'testing-with-special-chars'
|
||||
"""
|
||||
# Convert to lowercase
|
||||
text = text.lower()
|
||||
|
||||
# Replace spaces with hyphens
|
||||
text = text.replace(" ", "-")
|
||||
|
||||
# Remove all non-alphanumeric characters except hyphens
|
||||
text = SAFE_SLUG_PATTERN.sub("", text)
|
||||
|
||||
# Collapse multiple hyphens to single hyphen
|
||||
text = MULTIPLE_HYPHENS_PATTERN.sub("-", text)
|
||||
|
||||
# Strip leading/trailing hyphens
|
||||
text = text.strip("-")
|
||||
|
||||
return text
|
||||
|
||||
|
||||
def generate_random_suffix(length: int = 4) -> str:
|
||||
"""
|
||||
Generate random alphanumeric suffix
|
||||
|
||||
Creates a secure random string for making slugs unique.
|
||||
Uses lowercase letters and numbers only.
|
||||
|
||||
Args:
|
||||
length: Length of suffix (default: 4)
|
||||
|
||||
Returns:
|
||||
Random alphanumeric string
|
||||
|
||||
Examples:
|
||||
>>> suffix = generate_random_suffix()
|
||||
>>> len(suffix)
|
||||
4
|
||||
>>> suffix.isalnum()
|
||||
True
|
||||
"""
|
||||
return "".join(secrets.choice(RANDOM_CHARS) for _ in range(length))
|
||||
|
||||
|
||||
# Slug Functions
|
||||
|
||||
|
||||
def generate_slug(content: str, created_at: Optional[datetime] = None) -> str:
|
||||
"""
|
||||
Generate URL-safe slug from note content
|
||||
|
||||
Creates a slug by extracting the first few words from the content and
|
||||
normalizing them to lowercase with hyphens. If content is insufficient,
|
||||
falls back to timestamp-based slug.
|
||||
|
||||
Args:
|
||||
content: The note content (markdown text)
|
||||
created_at: Optional timestamp for fallback slug (defaults to now)
|
||||
|
||||
Returns:
|
||||
URL-safe slug string (lowercase, alphanumeric + hyphens only)
|
||||
|
||||
Raises:
|
||||
ValueError: If content is empty or contains only whitespace
|
||||
|
||||
Examples:
|
||||
>>> generate_slug("Hello World! This is my first note.")
|
||||
'hello-world-this-is-my'
|
||||
|
||||
>>> generate_slug("Testing... with special chars!@#")
|
||||
'testing-with-special-chars'
|
||||
|
||||
>>> generate_slug("A") # Too short, uses timestamp
|
||||
'20241118-143022'
|
||||
|
||||
Notes:
|
||||
- This function does NOT check for uniqueness
|
||||
- Caller must verify slug doesn't exist in database
|
||||
- Use make_slug_unique() to add random suffix if needed
|
||||
"""
|
||||
# Validate input
|
||||
if not content or not content.strip():
|
||||
raise ValueError("Content cannot be empty or whitespace-only")
|
||||
|
||||
# Extract first N words from content
|
||||
first_words = extract_first_words(content, SLUG_WORDS_COUNT)
|
||||
|
||||
# Normalize to slug format
|
||||
slug = normalize_slug_text(first_words)
|
||||
|
||||
# If slug is empty or too short, use timestamp fallback
|
||||
if len(slug) < MIN_SLUG_LENGTH:
|
||||
if created_at is None:
|
||||
created_at = datetime.utcnow()
|
||||
slug = created_at.strftime("%Y%m%d-%H%M%S")
|
||||
|
||||
# Truncate to maximum length
|
||||
slug = slug[:MAX_SLUG_LENGTH]
|
||||
|
||||
return slug
|
||||
|
||||
|
||||
def make_slug_unique(base_slug: str, existing_slugs: set[str]) -> str:
|
||||
"""
|
||||
Make a slug unique by adding random suffix if needed
|
||||
|
||||
If the base_slug already exists in the provided set, appends a random
|
||||
alphanumeric suffix until a unique slug is found.
|
||||
|
||||
Args:
|
||||
base_slug: The base slug to make unique
|
||||
existing_slugs: Set of existing slugs to check against
|
||||
|
||||
Returns:
|
||||
Unique slug (base_slug or base_slug-{random})
|
||||
|
||||
Examples:
|
||||
>>> make_slug_unique("test-note", set())
|
||||
'test-note'
|
||||
|
||||
>>> make_slug_unique("test-note", {"test-note"})
|
||||
'test-note-a7c9' # Random suffix
|
||||
|
||||
>>> make_slug_unique("test-note", {"test-note", "test-note-a7c9"})
|
||||
'test-note-x3k2' # Different random suffix
|
||||
|
||||
Notes:
|
||||
- Random suffix is 4 lowercase alphanumeric characters
|
||||
- Extremely low collision probability (36^4 = 1.6M combinations)
|
||||
- Will retry up to 100 times if collision occurs (should never happen)
|
||||
"""
|
||||
# If base slug doesn't exist, return it unchanged
|
||||
if base_slug not in existing_slugs:
|
||||
return base_slug
|
||||
|
||||
# Generate unique slug with random suffix
|
||||
max_attempts = 100
|
||||
for _ in range(max_attempts):
|
||||
suffix = generate_random_suffix(RANDOM_SUFFIX_LENGTH)
|
||||
unique_slug = f"{base_slug}-{suffix}"
|
||||
|
||||
if unique_slug not in existing_slugs:
|
||||
return unique_slug
|
||||
|
||||
# This should never happen with 36^4 combinations
|
||||
raise RuntimeError(
|
||||
f"Failed to generate unique slug after {max_attempts} attempts. "
|
||||
f"This is extremely unlikely and may indicate a problem."
|
||||
)
|
||||
|
||||
|
||||
def validate_slug(slug: str) -> bool:
|
||||
"""
|
||||
Validate that a slug meets all requirements
|
||||
|
||||
Checks that slug contains only allowed characters and is within
|
||||
length limits. Also checks against reserved slugs.
|
||||
|
||||
Args:
|
||||
slug: The slug to validate
|
||||
|
||||
Returns:
|
||||
True if slug is valid, False otherwise
|
||||
|
||||
Rules:
|
||||
- Must contain only: a-z, 0-9, hyphen (-)
|
||||
- Must be between 1 and 100 characters
|
||||
- Cannot start or end with hyphen
|
||||
- Cannot contain consecutive hyphens
|
||||
- Cannot be a reserved slug
|
||||
|
||||
Examples:
|
||||
>>> validate_slug("hello-world")
|
||||
True
|
||||
|
||||
>>> validate_slug("Hello-World") # Uppercase
|
||||
False
|
||||
|
||||
>>> validate_slug("-hello") # Leading hyphen
|
||||
False
|
||||
|
||||
>>> validate_slug("hello--world") # Double hyphen
|
||||
False
|
||||
|
||||
>>> validate_slug("admin") # Reserved slug
|
||||
False
|
||||
"""
|
||||
# Check basic constraints
|
||||
if not slug:
|
||||
return False
|
||||
|
||||
if len(slug) < MIN_SLUG_LENGTH or len(slug) > MAX_SLUG_LENGTH:
|
||||
return False
|
||||
|
||||
# Check against reserved slugs
|
||||
if slug in RESERVED_SLUGS:
|
||||
return False
|
||||
|
||||
# Check pattern (lowercase alphanumeric with single hyphens)
|
||||
return bool(SLUG_PATTERN.match(slug))
|
||||
|
||||
|
||||
# Content Hashing
|
||||
|
||||
|
||||
def calculate_content_hash(content: str) -> str:
|
||||
"""
|
||||
Calculate SHA-256 hash of content
|
||||
|
||||
Generates a cryptographic hash of the content for change detection
|
||||
and cache invalidation. Uses UTF-8 encoding.
|
||||
|
||||
Args:
|
||||
content: The content to hash (markdown text)
|
||||
|
||||
Returns:
|
||||
Hexadecimal hash string (64 characters)
|
||||
|
||||
Examples:
|
||||
>>> calculate_content_hash("Hello World")
|
||||
'a591a6d40bf420404a011733cfb7b190d62c65bf0bcda32b57b277d9ad9f146e'
|
||||
|
||||
>>> calculate_content_hash("")
|
||||
'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
|
||||
|
||||
Notes:
|
||||
- Same content always produces same hash
|
||||
- Hash is deterministic across systems
|
||||
- Useful for detecting external file modifications
|
||||
- SHA-256 chosen for security and wide support
|
||||
"""
|
||||
content_bytes = content.encode("utf-8")
|
||||
hash_obj = hashlib.sha256(content_bytes)
|
||||
return hash_obj.hexdigest()
|
||||
|
||||
|
||||
# File Path Operations
|
||||
|
||||
|
||||
def generate_note_path(slug: str, created_at: datetime, data_dir: Path) -> Path:
|
||||
"""
|
||||
Generate file path for a note
|
||||
|
||||
Creates path following pattern: data/notes/YYYY/MM/slug.md
|
||||
|
||||
Args:
|
||||
slug: URL-safe slug for the note
|
||||
created_at: Creation timestamp (determines YYYY/MM)
|
||||
data_dir: Base data directory path
|
||||
|
||||
Returns:
|
||||
Full Path object for the note file
|
||||
|
||||
Raises:
|
||||
ValueError: If slug is invalid
|
||||
|
||||
Examples:
|
||||
>>> from datetime import datetime
|
||||
>>> from pathlib import Path
|
||||
>>> dt = datetime(2024, 11, 18, 14, 30)
|
||||
>>> generate_note_path("test-note", dt, Path("data"))
|
||||
PosixPath('data/notes/2024/11/test-note.md')
|
||||
|
||||
Notes:
|
||||
- Does NOT create directories (use ensure_note_directory)
|
||||
- Does NOT check if file exists
|
||||
- Validates slug before generating path
|
||||
"""
|
||||
# Validate slug before generating path
|
||||
if not validate_slug(slug):
|
||||
raise ValueError(f"Invalid slug: {slug}")
|
||||
|
||||
# Extract year and month from created_at
|
||||
year = created_at.strftime("%Y")
|
||||
month = created_at.strftime("%m")
|
||||
|
||||
# Build path: data_dir/notes/YYYY/MM/slug.md
|
||||
note_path = data_dir / "notes" / year / month / f"{slug}.md"
|
||||
|
||||
return note_path
|
||||
|
||||
|
||||
def ensure_note_directory(note_path: Path) -> Path:
|
||||
"""
|
||||
Ensure directory exists for note file
|
||||
|
||||
Creates parent directories if they don't exist. Safe to call
|
||||
even if directories already exist.
|
||||
|
||||
Args:
|
||||
note_path: Full path to note file
|
||||
|
||||
Returns:
|
||||
Parent directory path
|
||||
|
||||
Raises:
|
||||
OSError: If directory cannot be created (permissions, etc.)
|
||||
|
||||
Examples:
|
||||
>>> note_path = Path("data/notes/2024/11/test-note.md")
|
||||
>>> ensure_note_directory(note_path)
|
||||
PosixPath('data/notes/2024/11')
|
||||
"""
|
||||
# Create parent directories if they don't exist
|
||||
parent_dir = note_path.parent
|
||||
parent_dir.mkdir(parents=True, exist_ok=True)
|
||||
return parent_dir
|
||||
|
||||
|
||||
def validate_note_path(file_path: Path, data_dir: Path) -> bool:
|
||||
"""
|
||||
Validate that file path is within data directory
|
||||
|
||||
Security check to prevent path traversal attacks. Ensures the
|
||||
resolved path is within the allowed data directory.
|
||||
|
||||
Args:
|
||||
file_path: Path to validate
|
||||
data_dir: Base data directory that must contain file_path
|
||||
|
||||
Returns:
|
||||
True if path is safe, False otherwise
|
||||
|
||||
Examples:
|
||||
>>> validate_note_path(
|
||||
... Path("data/notes/2024/11/note.md"),
|
||||
... Path("data")
|
||||
... )
|
||||
True
|
||||
|
||||
>>> validate_note_path(
|
||||
... Path("data/notes/../../etc/passwd"),
|
||||
... Path("data")
|
||||
... )
|
||||
False
|
||||
|
||||
Security:
|
||||
- Resolves symlinks and relative paths
|
||||
- Checks if resolved path is child of data_dir
|
||||
- Prevents directory traversal attacks
|
||||
"""
|
||||
# Resolve both paths to absolute
|
||||
try:
|
||||
resolved_file = file_path.resolve()
|
||||
resolved_data_dir = data_dir.resolve()
|
||||
|
||||
# Check if file_path is relative to data_dir
|
||||
return resolved_file.is_relative_to(resolved_data_dir)
|
||||
except (ValueError, OSError):
|
||||
# If resolve() fails or is_relative_to() raises an error
|
||||
return False
|
||||
|
||||
|
||||
# Atomic File Operations
|
||||
|
||||
|
||||
def write_note_file(file_path: Path, content: str) -> None:
|
||||
"""
|
||||
Write note content to file atomically
|
||||
|
||||
Writes to temporary file first, then atomically renames to final path.
|
||||
This prevents corruption if write is interrupted.
|
||||
|
||||
Args:
|
||||
file_path: Destination file path
|
||||
content: Content to write (markdown text)
|
||||
|
||||
Raises:
|
||||
OSError: If file cannot be written
|
||||
ValueError: If file_path is invalid
|
||||
|
||||
Examples:
|
||||
>>> write_note_file(Path("data/notes/2024/11/test.md"), "# Test")
|
||||
|
||||
Implementation:
|
||||
1. Create temp file: {file_path}.tmp
|
||||
2. Write content to temp file
|
||||
3. Atomically rename temp to final path
|
||||
4. If any step fails, clean up temp file
|
||||
|
||||
Notes:
|
||||
- Atomic rename is guaranteed on POSIX systems
|
||||
- Temp file created in same directory as target
|
||||
- UTF-8 encoding used for all text
|
||||
"""
|
||||
# Create temp file path
|
||||
temp_path = file_path.with_suffix(file_path.suffix + TEMP_FILE_SUFFIX)
|
||||
|
||||
try:
|
||||
# Write to temp file
|
||||
temp_path.write_text(content, encoding="utf-8")
|
||||
|
||||
# Atomically rename temp to final path
|
||||
temp_path.replace(file_path)
|
||||
except Exception:
|
||||
# Clean up temp file if it exists
|
||||
if temp_path.exists():
|
||||
temp_path.unlink()
|
||||
# Re-raise the exception
|
||||
raise
|
||||
|
||||
|
||||
def read_note_file(file_path: Path) -> str:
|
||||
"""
|
||||
Read note content from file
|
||||
|
||||
Args:
|
||||
file_path: Path to note file
|
||||
|
||||
Returns:
|
||||
File content as string
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If file doesn't exist
|
||||
OSError: If file cannot be read
|
||||
|
||||
Examples:
|
||||
>>> content = read_note_file(Path("data/notes/2024/11/test.md"))
|
||||
>>> print(content)
|
||||
# Test Note
|
||||
"""
|
||||
return file_path.read_text(encoding="utf-8")
|
||||
|
||||
|
||||
def delete_note_file(
|
||||
file_path: Path, soft: bool = False, data_dir: Optional[Path] = None
|
||||
) -> None:
|
||||
"""
|
||||
Delete note file from filesystem
|
||||
|
||||
Supports soft delete (move to trash) or hard delete (permanent removal).
|
||||
|
||||
Args:
|
||||
file_path: Path to note file
|
||||
soft: If True, move to .trash/ directory; if False, delete permanently
|
||||
data_dir: Required if soft=True, base data directory
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If file doesn't exist
|
||||
ValueError: If soft=True but data_dir not provided
|
||||
OSError: If file cannot be deleted or moved
|
||||
|
||||
Examples:
|
||||
>>> # Hard delete
|
||||
>>> delete_note_file(Path("data/notes/2024/11/test.md"))
|
||||
|
||||
>>> # Soft delete (move to trash)
|
||||
>>> delete_note_file(
|
||||
... Path("data/notes/2024/11/test.md"),
|
||||
... soft=True,
|
||||
... data_dir=Path("data")
|
||||
... )
|
||||
"""
|
||||
if soft:
|
||||
# Soft delete: move to trash
|
||||
if data_dir is None:
|
||||
raise ValueError("data_dir is required for soft delete")
|
||||
|
||||
# Extract year/month from file path
|
||||
# Assuming path structure: data_dir/notes/YYYY/MM/slug.md
|
||||
parts = file_path.parts
|
||||
try:
|
||||
# Find the year and month in the path
|
||||
notes_idx = parts.index("notes")
|
||||
year = parts[notes_idx + 1]
|
||||
month = parts[notes_idx + 2]
|
||||
except (ValueError, IndexError):
|
||||
# If path doesn't follow expected structure, use current date
|
||||
now = datetime.utcnow()
|
||||
year = now.strftime("%Y")
|
||||
month = now.strftime("%m")
|
||||
|
||||
# Create trash directory path
|
||||
trash_dir = data_dir / TRASH_DIR_NAME / year / month
|
||||
trash_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Move file to trash
|
||||
trash_path = trash_dir / file_path.name
|
||||
shutil.move(str(file_path), str(trash_path))
|
||||
else:
|
||||
# Hard delete: permanent removal
|
||||
file_path.unlink()
|
||||
|
||||
|
||||
# Date/Time Utilities
|
||||
|
||||
|
||||
def format_rfc822(dt: datetime) -> str:
|
||||
"""
|
||||
Format datetime as RFC-822 string
|
||||
|
||||
Converts datetime to RFC-822 format required by RSS 2.0 specification.
|
||||
Assumes UTC timezone.
|
||||
|
||||
Args:
|
||||
dt: Datetime to format (assumed UTC)
|
||||
|
||||
Returns:
|
||||
RFC-822 formatted string
|
||||
|
||||
Examples:
|
||||
>>> from datetime import datetime
|
||||
>>> dt = datetime(2024, 11, 18, 14, 30, 45)
|
||||
>>> format_rfc822(dt)
|
||||
'Mon, 18 Nov 2024 14:30:45 +0000'
|
||||
|
||||
References:
|
||||
- RSS 2.0 spec: https://www.rssboard.org/rss-specification
|
||||
- RFC-822 date format
|
||||
"""
|
||||
return dt.strftime("%a, %d %b %Y %H:%M:%S +0000")
|
||||
|
||||
|
||||
def format_iso8601(dt: datetime) -> str:
|
||||
"""
|
||||
Format datetime as ISO 8601 string
|
||||
|
||||
Converts datetime to ISO 8601 format for timestamps and APIs.
|
||||
|
||||
Args:
|
||||
dt: Datetime to format
|
||||
|
||||
Returns:
|
||||
ISO 8601 formatted string
|
||||
|
||||
Examples:
|
||||
>>> from datetime import datetime
|
||||
>>> dt = datetime(2024, 11, 18, 14, 30, 45)
|
||||
>>> format_iso8601(dt)
|
||||
'2024-11-18T14:30:45Z'
|
||||
"""
|
||||
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
|
||||
def parse_iso8601(date_string: str) -> datetime:
|
||||
"""
|
||||
Parse ISO 8601 string to datetime
|
||||
|
||||
Args:
|
||||
date_string: ISO 8601 formatted string
|
||||
|
||||
Returns:
|
||||
Datetime object (UTC)
|
||||
|
||||
Raises:
|
||||
ValueError: If string is not valid ISO 8601 format
|
||||
|
||||
Examples:
|
||||
>>> parse_iso8601("2024-11-18T14:30:45Z")
|
||||
datetime.datetime(2024, 11, 18, 14, 30, 45)
|
||||
"""
|
||||
# Remove 'Z' suffix if present
|
||||
if date_string.endswith("Z"):
|
||||
date_string = date_string[:-1]
|
||||
|
||||
# Parse using fromisoformat
|
||||
return datetime.fromisoformat(date_string)
|
||||
Reference in New Issue
Block a user