""" Full-text search functionality for StarPunk This module provides FTS5-based search capabilities for notes. It handles: - Search query execution with relevance ranking - FTS index population and maintenance - Graceful degradation when FTS5 is unavailable Per developer Q&A Q5: - FTS5 detection at startup with caching - Fallback to LIKE queries if FTS5 unavailable - Same function signature for both implementations Per developer Q&A Q13: - Search highlighting with XSS prevention using markupsafe.escape() - Whitelist only tags The FTS index is maintained by application code (not SQL triggers) because note content is stored in external files that SQLite cannot access. """ import sqlite3 import logging import re from pathlib import Path from typing import Optional from flask import current_app from markupsafe import escape, Markup logger = logging.getLogger(__name__) # Module-level cache for FTS5 availability (per developer Q&A Q5) _fts5_available: Optional[bool] = None _fts5_check_done: bool = False def check_fts5_support(db_path: Path) -> bool: """ Check if SQLite was compiled with FTS5 support Per developer Q&A Q5: - Detection happens at startup with caching - Cached result used for all subsequent calls - Logs which implementation is active Args: db_path: Path to SQLite database Returns: bool: True if FTS5 is available, False otherwise """ global _fts5_available, _fts5_check_done # Return cached result if already checked if _fts5_check_done: return _fts5_available try: conn = sqlite3.connect(db_path) # Try to create a test FTS5 table conn.execute("CREATE VIRTUAL TABLE IF NOT EXISTS _fts5_test USING fts5(content)") conn.execute("DROP TABLE IF EXISTS _fts5_test") conn.close() _fts5_available = True _fts5_check_done = True logger.info("FTS5 support detected - using FTS5 search implementation") return True except sqlite3.OperationalError as e: if "no such module" in str(e).lower(): _fts5_available = False _fts5_check_done = True logger.warning(f"FTS5 not available in SQLite - using fallback LIKE search: {e}") return False raise def has_fts_table(db_path: Path) -> bool: """ Check if FTS table exists in database Args: db_path: Path to SQLite database Returns: bool: True if notes_fts table exists """ try: conn = sqlite3.connect(db_path) cursor = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='notes_fts'" ) exists = cursor.fetchone() is not None conn.close() return exists except sqlite3.Error: return False def update_fts_index(conn: sqlite3.Connection, note_id: int, slug: str, content: str): """ Update FTS index for a note (insert or replace) Extracts title from first line of content and updates the FTS index. Uses REPLACE to handle both new notes and updates. Args: conn: SQLite database connection note_id: Note ID (used as FTS rowid) slug: Note slug content: Full markdown content Raises: sqlite3.Error: If FTS update fails """ # Extract title from first line lines = content.split('\n', 1) title = lines[0].strip() if lines else '' # Remove markdown heading syntax (# ## ###) if title.startswith('#'): title = title.lstrip('#').strip() # Limit title length if len(title) > 100: title = title[:100] + '...' # Use REPLACE to handle both insert and update # rowid explicitly set to match note ID for efficient lookups conn.execute( "REPLACE INTO notes_fts (rowid, slug, title, content) VALUES (?, ?, ?, ?)", (note_id, slug, title, content) ) def delete_from_fts_index(conn: sqlite3.Connection, note_id: int): """ Remove note from FTS index Args: conn: SQLite database connection note_id: Note ID to remove """ conn.execute("DELETE FROM notes_fts WHERE rowid = ?", (note_id,)) def rebuild_fts_index(db_path: Path, data_dir: Path): """ Rebuild entire FTS index from existing notes This is used during migration and can be run manually if the index becomes corrupted. Reads all notes and re-indexes them. Args: db_path: Path to SQLite database data_dir: Path to data directory containing note files Raises: sqlite3.Error: If rebuild fails """ logger.info("Rebuilding FTS index from existing notes") conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row try: # Clear existing index conn.execute("DELETE FROM notes_fts") # Get all non-deleted notes cursor = conn.execute( "SELECT id, slug, file_path FROM notes WHERE deleted_at IS NULL" ) indexed_count = 0 error_count = 0 for row in cursor: try: # Read note content from file note_path = data_dir / row['file_path'] if not note_path.exists(): logger.warning(f"Note file not found: {note_path}") error_count += 1 continue content = note_path.read_text(encoding='utf-8') # Update FTS index update_fts_index(conn, row['id'], row['slug'], content) indexed_count += 1 except Exception as e: logger.error(f"Failed to index note {row['slug']}: {e}") error_count += 1 conn.commit() logger.info(f"FTS index rebuilt: {indexed_count} notes indexed, {error_count} errors") except Exception as e: conn.rollback() logger.error(f"Failed to rebuild FTS index: {e}") raise finally: conn.close() def highlight_search_terms(text: str, query: str) -> str: """ Highlight search terms in text with XSS prevention Per developer Q&A Q13: - Uses markupsafe.escape() to prevent XSS - Whitelist only tags for highlighting - Returns safe Markup object Args: text: Text to highlight in query: Search query (terms to highlight) Returns: HTML-safe string with highlighted terms """ # Escape the text first to prevent XSS safe_text = escape(text) # Extract individual search terms (split on whitespace) terms = query.strip().split() # Highlight each term (case-insensitive) result = str(safe_text) for term in terms: if not term: continue # Escape special regex characters in the search term escaped_term = re.escape(term) # Replace with highlighted version (case-insensitive) # Use word boundaries to match whole words preferentially pattern = re.compile(f"({escaped_term})", re.IGNORECASE) result = pattern.sub(r"\1", result) # Return as Markup to indicate it's safe HTML return Markup(result) def generate_snippet(content: str, query: str, max_length: int = 200) -> str: """ Generate a search snippet from content Finds the first occurrence of a search term and extracts surrounding context. Args: content: Full content to extract snippet from query: Search query max_length: Maximum snippet length Returns: Snippet with highlighted search terms """ # Find first occurrence of any search term terms = query.strip().lower().split() content_lower = content.lower() best_pos = -1 for term in terms: pos = content_lower.find(term) if pos >= 0 and (best_pos < 0 or pos < best_pos): best_pos = pos if best_pos < 0: # No match found, return start of content snippet = content[:max_length] else: # Extract context around match start = max(0, best_pos - max_length // 2) end = min(len(content), start + max_length) snippet = content[start:end] # Add ellipsis if truncated if start > 0: snippet = "..." + snippet if end < len(content): snippet = snippet + "..." # Highlight search terms return highlight_search_terms(snippet, query) def search_notes_fts5( query: str, db_path: Path, published_only: bool = True, limit: int = 50, offset: int = 0 ) -> list[dict]: """ Search notes using FTS5 full-text search Uses SQLite's FTS5 extension for fast, relevance-ranked search. Args: query: Search query (FTS5 query syntax supported) db_path: Path to SQLite database published_only: If True, only return published notes limit: Maximum number of results offset: Number of results to skip (for pagination) Returns: List of dicts with keys: id, slug, title, rank, snippet Raises: sqlite3.Error: If search fails """ conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row try: # Build query # FTS5 returns results ordered by relevance (rank) # Lower rank = better match sql = """ SELECT notes.id, notes.slug, notes_fts.title, notes.published, notes.created_at, rank AS relevance, snippet(notes_fts, 2, '', '', '...', 40) AS snippet FROM notes_fts INNER JOIN notes ON notes_fts.rowid = notes.id WHERE notes_fts MATCH ? AND notes.deleted_at IS NULL """ params = [query] if published_only: sql += " AND notes.published = 1" sql += " ORDER BY rank LIMIT ? OFFSET ?" params.extend([limit, offset]) cursor = conn.execute(sql, params) results = [] for row in cursor: results.append({ 'id': row['id'], 'slug': row['slug'], 'title': row['title'], 'snippet': Markup(row['snippet']), # FTS5 snippet is safe 'relevance': row['relevance'], 'published': bool(row['published']), 'created_at': row['created_at'], }) return results finally: conn.close() def search_notes_fallback( query: str, db_path: Path, published_only: bool = True, limit: int = 50, offset: int = 0 ) -> list[dict]: """ Search notes using LIKE queries (fallback when FTS5 unavailable) Per developer Q&A Q5: - Same function signature as FTS5 search - Uses LIKE queries for basic search - No relevance ranking (ordered by creation date) Args: query: Search query (words separated by spaces) db_path: Path to SQLite database published_only: If True, only return published notes limit: Maximum number of results offset: Number of results to skip (for pagination) Returns: List of dicts with keys: id, slug, title, rank, snippet (compatible with FTS5 search results) Raises: sqlite3.Error: If search fails """ from starpunk.utils import read_note_file conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row try: # Build LIKE query for each search term # Search in file_path (which contains content file path) # We'll need to load content from files sql = """ SELECT id, slug, file_path, published, created_at FROM notes WHERE deleted_at IS NULL """ params = [] if published_only: sql += " AND published = 1" # Add basic slug filtering (can match without loading files) terms = query.strip().split() if terms: # Search in slug sql += " AND (" term_conditions = [] for term in terms: term_conditions.append("slug LIKE ?") params.append(f"%{term}%") sql += " OR ".join(term_conditions) sql += ")" sql += " ORDER BY created_at DESC LIMIT ? OFFSET ?" params.extend([limit * 3, offset]) # Get more results for content filtering cursor = conn.execute(sql, params) # Load content and filter/score results results = [] data_dir = Path(db_path).parent for row in cursor: try: # Load content from file file_path = data_dir / row['file_path'] content = read_note_file(file_path) # Check if query matches content (case-insensitive) content_lower = content.lower() query_lower = query.lower() matches = query_lower in content_lower if not matches: # Check individual terms matches = any(term.lower() in content_lower for term in terms) if matches: # Extract title from first line lines = content.split('\n', 1) title = lines[0].strip() if lines else row['slug'] if title.startswith('#'): title = title.lstrip('#').strip() results.append({ 'id': row['id'], 'slug': row['slug'], 'title': title, 'snippet': generate_snippet(content, query), 'relevance': 0.0, # No ranking in fallback mode 'published': bool(row['published']), 'created_at': row['created_at'], }) # Stop when we have enough results if len(results) >= limit: break except Exception as e: logger.warning(f"Error reading note {row['slug']}: {e}") continue return results finally: conn.close() def search_notes( query: str, db_path: Path, published_only: bool = True, limit: int = 50, offset: int = 0 ) -> list[dict]: """ Search notes with automatic FTS5 detection and fallback Per developer Q&A Q5: - Detects FTS5 support at startup and caches result - Uses FTS5 if available, otherwise falls back to LIKE queries - Same function signature for both implementations Args: query: Search query db_path: Path to SQLite database published_only: If True, only return published notes limit: Maximum number of results offset: Number of results to skip (for pagination) Returns: List of dicts with keys: id, slug, title, rank, snippet Raises: sqlite3.Error: If search fails """ # Check FTS5 availability (uses cached result after first check) if check_fts5_support(db_path) and has_fts_table(db_path): return search_notes_fts5(query, db_path, published_only, limit, offset) else: return search_notes_fallback(query, db_path, published_only, limit, offset)