Files
StarPunk/starpunk/search.py
Phil Skentelbery b3c1b16617 feat: Add full-text search with FTS5
Implements FTS5-based full-text search for notes as specified in ADR-034.

Changes:
- Created migration 005_add_fts5_search.sql with FTS5 virtual table
- Created starpunk/search.py module with search functions
- Integrated FTS index updates into create_note() and update_note()
- DELETE trigger automatically removes notes from FTS index
- INSERT/UPDATE handled by application code (files not in DB)

Features:
- Porter stemming for better English search
- Unicode normalization for international characters
- Relevance ranking with snippets
- Graceful degradation if FTS5 unavailable
- Helper function to rebuild index if needed

Note: Initial FTS index population needs to be added to app startup.
Part of v1.1.0 (Phase 3).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-25 10:03:28 -07:00

247 lines
6.8 KiB
Python

"""
Full-text search functionality for StarPunk
This module provides FTS5-based search capabilities for notes. It handles:
- Search query execution with relevance ranking
- FTS index population and maintenance
- Graceful degradation when FTS5 is unavailable
The FTS index is maintained by application code (not SQL triggers) because
note content is stored in external files that SQLite cannot access.
"""
import sqlite3
import logging
from pathlib import Path
from typing import Optional
from flask import current_app
logger = logging.getLogger(__name__)
def check_fts5_support(db_path: Path) -> bool:
"""
Check if SQLite was compiled with FTS5 support
Args:
db_path: Path to SQLite database
Returns:
bool: True if FTS5 is available, False otherwise
"""
try:
conn = sqlite3.connect(db_path)
# Try to create a test FTS5 table
conn.execute("CREATE VIRTUAL TABLE IF NOT EXISTS _fts5_test USING fts5(content)")
conn.execute("DROP TABLE IF EXISTS _fts5_test")
conn.close()
return True
except sqlite3.OperationalError as e:
if "no such module" in str(e).lower():
logger.warning(f"FTS5 not available in SQLite: {e}")
return False
raise
def has_fts_table(db_path: Path) -> bool:
"""
Check if FTS table exists in database
Args:
db_path: Path to SQLite database
Returns:
bool: True if notes_fts table exists
"""
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='notes_fts'"
)
exists = cursor.fetchone() is not None
conn.close()
return exists
except sqlite3.Error:
return False
def update_fts_index(conn: sqlite3.Connection, note_id: int, slug: str, content: str):
"""
Update FTS index for a note (insert or replace)
Extracts title from first line of content and updates the FTS index.
Uses REPLACE to handle both new notes and updates.
Args:
conn: SQLite database connection
note_id: Note ID (used as FTS rowid)
slug: Note slug
content: Full markdown content
Raises:
sqlite3.Error: If FTS update fails
"""
# Extract title from first line
lines = content.split('\n', 1)
title = lines[0].strip() if lines else ''
# Remove markdown heading syntax (# ## ###)
if title.startswith('#'):
title = title.lstrip('#').strip()
# Limit title length
if len(title) > 100:
title = title[:100] + '...'
# Use REPLACE to handle both insert and update
# rowid explicitly set to match note ID for efficient lookups
conn.execute(
"REPLACE INTO notes_fts (rowid, slug, title, content) VALUES (?, ?, ?, ?)",
(note_id, slug, title, content)
)
def delete_from_fts_index(conn: sqlite3.Connection, note_id: int):
"""
Remove note from FTS index
Args:
conn: SQLite database connection
note_id: Note ID to remove
"""
conn.execute("DELETE FROM notes_fts WHERE rowid = ?", (note_id,))
def rebuild_fts_index(db_path: Path, data_dir: Path):
"""
Rebuild entire FTS index from existing notes
This is used during migration and can be run manually if the index
becomes corrupted. Reads all notes and re-indexes them.
Args:
db_path: Path to SQLite database
data_dir: Path to data directory containing note files
Raises:
sqlite3.Error: If rebuild fails
"""
logger.info("Rebuilding FTS index from existing notes")
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
# Clear existing index
conn.execute("DELETE FROM notes_fts")
# Get all non-deleted notes
cursor = conn.execute(
"SELECT id, slug, file_path FROM notes WHERE deleted_at IS NULL"
)
indexed_count = 0
error_count = 0
for row in cursor:
try:
# Read note content from file
note_path = data_dir / row['file_path']
if not note_path.exists():
logger.warning(f"Note file not found: {note_path}")
error_count += 1
continue
content = note_path.read_text(encoding='utf-8')
# Update FTS index
update_fts_index(conn, row['id'], row['slug'], content)
indexed_count += 1
except Exception as e:
logger.error(f"Failed to index note {row['slug']}: {e}")
error_count += 1
conn.commit()
logger.info(f"FTS index rebuilt: {indexed_count} notes indexed, {error_count} errors")
except Exception as e:
conn.rollback()
logger.error(f"Failed to rebuild FTS index: {e}")
raise
finally:
conn.close()
def search_notes(
query: str,
db_path: Path,
published_only: bool = True,
limit: int = 50,
offset: int = 0
) -> list[dict]:
"""
Search notes using FTS5
Args:
query: Search query (FTS5 query syntax supported)
db_path: Path to SQLite database
published_only: If True, only return published notes
limit: Maximum number of results
offset: Number of results to skip (for pagination)
Returns:
List of dicts with keys: id, slug, title, rank, snippet
Raises:
sqlite3.Error: If search fails
"""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
# Build query
# FTS5 returns results ordered by relevance (rank)
# Lower rank = better match
sql = """
SELECT
notes.id,
notes.slug,
notes_fts.title,
notes.published,
notes.created_at,
rank AS relevance,
snippet(notes_fts, 2, '<mark>', '</mark>', '...', 40) AS snippet
FROM notes_fts
INNER JOIN notes ON notes_fts.rowid = notes.id
WHERE notes_fts MATCH ?
AND notes.deleted_at IS NULL
"""
params = [query]
if published_only:
sql += " AND notes.published = 1"
sql += " ORDER BY rank LIMIT ? OFFSET ?"
params.extend([limit, offset])
cursor = conn.execute(sql, params)
results = []
for row in cursor:
results.append({
'id': row['id'],
'slug': row['slug'],
'title': row['title'],
'snippet': row['snippet'],
'relevance': row['relevance'],
'published': bool(row['published']),
'created_at': row['created_at'],
})
return results
finally:
conn.close()