""" Database operation monitoring wrapper Per ADR-053, v1.1.2 Phase 1, and developer Q&A CQ1, IQ1, IQ3: - Wraps SQLite connections at the pool level - Times all database operations - Extracts query type and table name (best effort) - Detects slow queries based on configurable threshold - Records metrics to the metrics collector Example usage: >>> from starpunk.monitoring.database import MonitoredConnection >>> conn = sqlite3.connect(':memory:') >>> monitored = MonitoredConnection(conn, metrics_collector) >>> cursor = monitored.execute('SELECT * FROM notes') """ import re import sqlite3 import time from typing import Optional, Any, Tuple from starpunk.monitoring.metrics import record_metric class MonitoredConnection: """ Wrapper for SQLite connections that monitors performance Per CQ1: Wraps connections at the pool level Per IQ1: Uses simple regex for table name extraction Per IQ3: Single configurable slow query threshold """ def __init__(self, connection: sqlite3.Connection, slow_query_threshold: float = 1.0): """ Initialize monitored connection wrapper Args: connection: SQLite connection to wrap slow_query_threshold: Threshold in seconds for slow query detection """ self._connection = connection self._slow_query_threshold = slow_query_threshold def execute(self, query: str, parameters: Optional[Tuple] = None) -> sqlite3.Cursor: """ Execute a query with performance monitoring Args: query: SQL query to execute parameters: Optional query parameters Returns: sqlite3.Cursor: Query cursor """ start_time = time.perf_counter() query_type = self._get_query_type(query) table_name = self._extract_table_name(query) try: if parameters: cursor = self._connection.execute(query, parameters) else: cursor = self._connection.execute(query) duration_sec = time.perf_counter() - start_time duration_ms = duration_sec * 1000 # Record metric (forced if slow query) is_slow = duration_sec >= self._slow_query_threshold metadata = { 'query_type': query_type, 'table': table_name, 'is_slow': is_slow, } # Add query text for slow queries (for debugging) if is_slow: # Truncate query to avoid storing huge queries metadata['query'] = query[:200] if len(query) > 200 else query record_metric( 'database', f'{query_type} {table_name}', duration_ms, metadata, force=is_slow # Always record slow queries ) return cursor except Exception as e: duration_sec = time.perf_counter() - start_time duration_ms = duration_sec * 1000 # Record error metric metadata = { 'query_type': query_type, 'table': table_name, 'error': str(e), 'query': query[:200] if len(query) > 200 else query } record_metric( 'database', f'{query_type} {table_name} ERROR', duration_ms, metadata, force=True # Always record errors ) raise def executemany(self, query: str, parameters) -> sqlite3.Cursor: """ Execute a query with multiple parameter sets Args: query: SQL query to execute parameters: Sequence of parameter tuples Returns: sqlite3.Cursor: Query cursor """ start_time = time.perf_counter() query_type = self._get_query_type(query) table_name = self._extract_table_name(query) try: cursor = self._connection.executemany(query, parameters) duration_ms = (time.perf_counter() - start_time) * 1000 # Record metric metadata = { 'query_type': query_type, 'table': table_name, 'batch': True, } record_metric( 'database', f'{query_type} {table_name} BATCH', duration_ms, metadata ) return cursor except Exception as e: duration_ms = (time.perf_counter() - start_time) * 1000 metadata = { 'query_type': query_type, 'table': table_name, 'error': str(e), 'batch': True } record_metric( 'database', f'{query_type} {table_name} BATCH ERROR', duration_ms, metadata, force=True ) raise def _get_query_type(self, query: str) -> str: """ Extract query type from SQL statement Args: query: SQL query Returns: Query type (SELECT, INSERT, UPDATE, DELETE, etc.) """ query_upper = query.strip().upper() for query_type in ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'CREATE', 'DROP', 'ALTER', 'PRAGMA']: if query_upper.startswith(query_type): return query_type return 'OTHER' def _extract_table_name(self, query: str) -> str: """ Extract table name from query (best effort) Per IQ1: Keep it simple with basic regex patterns. Returns "unknown" for complex queries. Note: Complex queries (JOINs, subqueries, CTEs) return "unknown". This covers 90% of queries accurately. Args: query: SQL query Returns: Table name or "unknown" """ query_lower = query.lower().strip() # Simple patterns that cover 90% of cases patterns = [ r'from\s+(\w+)', r'update\s+(\w+)', r'insert\s+into\s+(\w+)', r'delete\s+from\s+(\w+)', r'create\s+table\s+(?:if\s+not\s+exists\s+)?(\w+)', r'drop\s+table\s+(?:if\s+exists\s+)?(\w+)', r'alter\s+table\s+(\w+)', ] for pattern in patterns: match = re.search(pattern, query_lower) if match: return match.group(1) # Complex queries (JOINs, subqueries, CTEs) return "unknown" # Delegate all other connection methods to the wrapped connection def __getattr__(self, name: str) -> Any: """Delegate all other methods to the wrapped connection""" return getattr(self._connection, name) def __enter__(self): """Support context manager protocol""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Support context manager protocol""" return self._connection.__exit__(exc_type, exc_val, exc_tb)