""" Tests for metrics instrumentation (v1.1.2 Phase 1) Tests database monitoring, HTTP metrics, memory monitoring, and business metrics. """ import pytest import sqlite3 import time import threading from unittest.mock import Mock, patch, MagicMock from starpunk.monitoring import ( MonitoredConnection, MemoryMonitor, get_metrics, get_metrics_stats, business, ) from starpunk.monitoring.metrics import get_buffer from starpunk.monitoring.http import setup_http_metrics class TestMonitoredConnection: """Tests for database operation monitoring""" def test_execute_records_metric(self): """Test that execute() records a metric""" # Create in-memory database conn = sqlite3.connect(':memory:') conn.execute('CREATE TABLE test (id INTEGER, name TEXT)') # Wrap with monitoring monitored = MonitoredConnection(conn, slow_query_threshold=1.0) # Clear metrics buffer get_buffer().clear() # Execute query monitored.execute('SELECT * FROM test') # Check metric was recorded metrics = get_metrics() # Note: May not be recorded due to sampling, but slow queries are forced # So we'll check stats instead stats = get_metrics_stats() assert stats['total_count'] >= 0 # May be 0 due to sampling def test_slow_query_always_recorded(self): """Test that slow queries are always recorded regardless of sampling""" # Create in-memory database conn = sqlite3.connect(':memory:') # Set very low threshold so any query is "slow" monitored = MonitoredConnection(conn, slow_query_threshold=0.0) # Clear metrics buffer get_buffer().clear() # Execute query (will be considered slow) monitored.execute('SELECT 1') # Check metric was recorded (forced due to being slow) metrics = get_metrics() assert len(metrics) > 0 # Check that is_slow is True in metadata assert any(m.metadata.get('is_slow', False) is True for m in metrics) def test_extract_table_name_select(self): """Test table name extraction from SELECT query""" conn = sqlite3.connect(':memory:') conn.execute('CREATE TABLE notes (id INTEGER)') monitored = MonitoredConnection(conn) table_name = monitored._extract_table_name('SELECT * FROM notes WHERE id = 1') assert table_name == 'notes' def test_extract_table_name_insert(self): """Test table name extraction from INSERT query""" conn = sqlite3.connect(':memory:') monitored = MonitoredConnection(conn) table_name = monitored._extract_table_name('INSERT INTO users (name) VALUES (?)') assert table_name == 'users' def test_extract_table_name_update(self): """Test table name extraction from UPDATE query""" conn = sqlite3.connect(':memory:') monitored = MonitoredConnection(conn) table_name = monitored._extract_table_name('UPDATE posts SET title = ?') assert table_name == 'posts' def test_extract_table_name_unknown(self): """Test that complex queries return 'unknown'""" conn = sqlite3.connect(':memory:') monitored = MonitoredConnection(conn) # Complex query with JOIN table_name = monitored._extract_table_name( 'SELECT a.* FROM notes a JOIN users b ON a.user_id = b.id' ) # Our simple regex will find 'notes' from the first FROM assert table_name in ['notes', 'unknown'] def test_get_query_type(self): """Test query type extraction""" conn = sqlite3.connect(':memory:') monitored = MonitoredConnection(conn) assert monitored._get_query_type('SELECT * FROM notes') == 'SELECT' assert monitored._get_query_type('INSERT INTO notes VALUES (?)') == 'INSERT' assert monitored._get_query_type('UPDATE notes SET x = 1') == 'UPDATE' assert monitored._get_query_type('DELETE FROM notes') == 'DELETE' assert monitored._get_query_type('CREATE TABLE test (id INT)') == 'CREATE' assert monitored._get_query_type('PRAGMA journal_mode=WAL') == 'PRAGMA' def test_execute_with_parameters(self): """Test execute with query parameters""" conn = sqlite3.connect(':memory:') conn.execute('CREATE TABLE test (id INTEGER, name TEXT)') monitored = MonitoredConnection(conn, slow_query_threshold=1.0) # Execute with parameters monitored.execute('INSERT INTO test (id, name) VALUES (?, ?)', (1, 'test')) # Verify data was inserted cursor = monitored.execute('SELECT * FROM test WHERE id = ?', (1,)) rows = cursor.fetchall() assert len(rows) == 1 def test_executemany(self): """Test executemany batch operations""" conn = sqlite3.connect(':memory:') conn.execute('CREATE TABLE test (id INTEGER, name TEXT)') monitored = MonitoredConnection(conn) # Clear metrics get_buffer().clear() # Execute batch insert data = [(1, 'first'), (2, 'second'), (3, 'third')] monitored.executemany('INSERT INTO test (id, name) VALUES (?, ?)', data) # Check metric was recorded metrics = get_metrics() # May not be recorded due to sampling stats = get_metrics_stats() assert stats is not None def test_error_recording(self): """Test that errors are recorded in metrics""" conn = sqlite3.connect(':memory:') monitored = MonitoredConnection(conn) # Clear metrics get_buffer().clear() # Execute invalid query with pytest.raises(sqlite3.OperationalError): monitored.execute('SELECT * FROM nonexistent_table') # Check error was recorded (forced) metrics = get_metrics() assert len(metrics) > 0 assert any('ERROR' in m.operation_name for m in metrics) class TestHTTPMetrics: """Tests for HTTP request/response monitoring""" def test_setup_http_metrics(self, app): """Test HTTP metrics middleware setup""" # Add a simple test route @app.route('/test') def test_route(): return 'OK', 200 setup_http_metrics(app) # Clear metrics get_buffer().clear() # Make a request with app.test_client() as client: response = client.get('/test') assert response.status_code == 200 # Check request ID header was added assert 'X-Request-ID' in response.headers # Check metrics were recorded metrics = get_metrics() # May be sampled, so just check structure stats = get_metrics_stats() assert stats is not None def test_request_id_generation(self, app): """Test that unique request IDs are generated""" # Add a simple test route @app.route('/test') def test_route(): return 'OK', 200 setup_http_metrics(app) request_ids = set() with app.test_client() as client: for _ in range(5): response = client.get('/test') request_id = response.headers.get('X-Request-ID') assert request_id is not None request_ids.add(request_id) # All request IDs should be unique assert len(request_ids) == 5 def test_error_metrics_recorded(self, app): """Test that errors are recorded in metrics""" # Add a simple test route @app.route('/test') def test_route(): return 'OK', 200 setup_http_metrics(app) # Clear metrics get_buffer().clear() with app.test_client() as client: # Request non-existent endpoint response = client.get('/this-does-not-exist') assert response.status_code == 404 # Error metrics should be recorded (forced) # Note: 404 is not necessarily an error in the teardown handler # but will be in metrics as a 404 status code metrics = get_metrics() stats = get_metrics_stats() assert stats is not None class TestMemoryMonitor: """Tests for memory monitoring thread""" def test_memory_monitor_initialization(self): """Test memory monitor can be initialized""" monitor = MemoryMonitor(interval=1) assert monitor.interval == 1 assert monitor.daemon is True # Per CQ5 def test_memory_monitor_starts_and_stops(self): """Test memory monitor thread lifecycle""" monitor = MemoryMonitor(interval=1) # Start monitor monitor.start() assert monitor.is_alive() # Wait a bit for initialization time.sleep(0.5) # Stop monitor gracefully monitor.stop() # Give it time to finish gracefully time.sleep(1.0) monitor.join(timeout=5) # Thread should have stopped # Note: In rare cases daemon thread may still be cleaning up if monitor.is_alive(): # Give it one more second time.sleep(1.0) assert not monitor.is_alive() def test_memory_monitor_collects_metrics(self): """Test that memory monitor collects metrics""" # Clear metrics get_buffer().clear() monitor = MemoryMonitor(interval=1) monitor.start() # Wait for baseline + one collection time.sleep(7) # 5s baseline + 2s for collection # Stop monitor monitor.stop() monitor.join(timeout=2) # Check metrics were collected metrics = get_metrics() memory_metrics = [m for m in metrics if 'memory' in m.operation_name.lower()] # Should have at least one memory metric assert len(memory_metrics) > 0 def test_memory_monitor_stats(self): """Test memory monitor statistics""" monitor = MemoryMonitor(interval=1) monitor.start() # Wait for baseline time.sleep(6) # Get stats stats = monitor.get_stats() assert stats['status'] == 'running' assert 'current_rss_mb' in stats assert 'baseline_rss_mb' in stats assert stats['baseline_rss_mb'] > 0 monitor.stop() monitor.join(timeout=2) class TestBusinessMetrics: """Tests for business metrics tracking""" def test_track_note_created(self): """Test note creation tracking""" get_buffer().clear() business.track_note_created(note_id=123, content_length=500, has_media=False) metrics = get_metrics() assert len(metrics) > 0 note_metrics = [m for m in metrics if 'note_created' in m.operation_name] assert len(note_metrics) > 0 assert note_metrics[0].metadata['note_id'] == 123 assert note_metrics[0].metadata['content_length'] == 500 def test_track_note_updated(self): """Test note update tracking""" get_buffer().clear() business.track_note_updated( note_id=456, content_length=750, fields_changed=['title', 'content'] ) metrics = get_metrics() note_metrics = [m for m in metrics if 'note_updated' in m.operation_name] assert len(note_metrics) > 0 assert note_metrics[0].metadata['note_id'] == 456 def test_track_note_deleted(self): """Test note deletion tracking""" get_buffer().clear() business.track_note_deleted(note_id=789) metrics = get_metrics() note_metrics = [m for m in metrics if 'note_deleted' in m.operation_name] assert len(note_metrics) > 0 assert note_metrics[0].metadata['note_id'] == 789 def test_track_feed_generated(self): """Test feed generation tracking""" get_buffer().clear() business.track_feed_generated( format='rss', item_count=50, duration_ms=45.2, cached=False ) metrics = get_metrics() feed_metrics = [m for m in metrics if 'feed_rss' in m.operation_name] assert len(feed_metrics) > 0 assert feed_metrics[0].metadata['format'] == 'rss' assert feed_metrics[0].metadata['item_count'] == 50 def test_track_cache_hit(self): """Test cache hit tracking""" get_buffer().clear() business.track_cache_hit(cache_type='feed', key='rss:latest') metrics = get_metrics() cache_metrics = [m for m in metrics if 'cache_hit' in m.operation_name] assert len(cache_metrics) > 0 def test_track_cache_miss(self): """Test cache miss tracking""" get_buffer().clear() business.track_cache_miss(cache_type='feed', key='atom:latest') metrics = get_metrics() cache_metrics = [m for m in metrics if 'cache_miss' in m.operation_name] assert len(cache_metrics) > 0 class TestMetricsConfiguration: """Tests for metrics configuration""" def test_metrics_can_be_disabled(self, app): """Test that metrics can be disabled via configuration""" # This would be tested by setting METRICS_ENABLED=False # and verifying no metrics are collected assert 'METRICS_ENABLED' in app.config def test_slow_query_threshold_configurable(self, app): """Test that slow query threshold is configurable""" assert 'METRICS_SLOW_QUERY_THRESHOLD' in app.config assert isinstance(app.config['METRICS_SLOW_QUERY_THRESHOLD'], float) def test_sampling_rate_configurable(self, app): """Test that sampling rate is configurable""" assert 'METRICS_SAMPLING_RATE' in app.config assert isinstance(app.config['METRICS_SAMPLING_RATE'], float) assert 0.0 <= app.config['METRICS_SAMPLING_RATE'] <= 1.0 def test_buffer_size_configurable(self, app): """Test that buffer size is configurable""" assert 'METRICS_BUFFER_SIZE' in app.config assert isinstance(app.config['METRICS_BUFFER_SIZE'], int) assert app.config['METRICS_BUFFER_SIZE'] > 0 def test_memory_interval_configurable(self, app): """Test that memory monitor interval is configurable""" assert 'METRICS_MEMORY_INTERVAL' in app.config assert isinstance(app.config['METRICS_MEMORY_INTERVAL'], int) assert app.config['METRICS_MEMORY_INTERVAL'] > 0 @pytest.fixture def app(): """Create test Flask app with minimal configuration""" from flask import Flask from pathlib import Path import tempfile app = Flask(__name__) # Create temp directory for testing temp_dir = tempfile.mkdtemp() temp_path = Path(temp_dir) # Minimal configuration to avoid migration issues app.config.update({ 'TESTING': True, 'DATABASE_PATH': temp_path / 'test.db', 'DATA_PATH': temp_path, 'NOTES_PATH': temp_path / 'notes', 'SESSION_SECRET': 'test-secret', 'ADMIN_ME': 'https://test.example.com', 'METRICS_ENABLED': True, 'METRICS_SLOW_QUERY_THRESHOLD': 1.0, 'METRICS_SAMPLING_RATE': 1.0, 'METRICS_BUFFER_SIZE': 1000, 'METRICS_MEMORY_INTERVAL': 30, }) return app