""" Memory monitoring background thread Per v1.1.2 Phase 1 and developer Q&A CQ5, IQ8: - Background daemon thread for continuous memory monitoring - Tracks RSS and VMS memory usage - Detects memory growth and potential leaks - 5-second baseline period after startup (IQ8) - Skipped in test mode (CQ5) Example usage: >>> from starpunk.monitoring.memory import MemoryMonitor >>> monitor = MemoryMonitor(interval=30) >>> monitor.start() # Runs as daemon thread >>> # ... application runs ... >>> monitor.stop() """ import gc import logging import os import sys import threading import time from typing import Dict, Any import psutil from starpunk.monitoring.metrics import record_metric logger = logging.getLogger(__name__) class MemoryMonitor(threading.Thread): """ Background thread for memory monitoring Per CQ5: Daemon thread that auto-terminates with main process Per IQ8: 5-second baseline period after startup """ def __init__(self, interval: int = 30): """ Initialize memory monitor thread Args: interval: Monitoring interval in seconds (default: 30) """ super().__init__(daemon=True) # CQ5: daemon thread self.interval = interval self._stop_event = threading.Event() self._process = psutil.Process() self._baseline_memory = None self._high_water_mark = 0 def run(self): """ Main monitoring loop Per IQ8: Wait 5 seconds for app initialization before setting baseline """ try: # Wait for app initialization (IQ8: 5 seconds) time.sleep(5) # Set baseline memory memory_info = self._get_memory_info() self._baseline_memory = memory_info['rss_mb'] logger.info(f"Memory monitor baseline set: {self._baseline_memory:.2f} MB RSS") # Start monitoring loop while not self._stop_event.is_set(): try: self._collect_metrics() except Exception as e: logger.error(f"Memory monitoring error: {e}", exc_info=True) # Wait for interval or until stop event self._stop_event.wait(self.interval) except Exception as e: logger.error(f"Memory monitor thread failed: {e}", exc_info=True) def _collect_metrics(self): """Collect and record memory metrics""" memory_info = self._get_memory_info() gc_stats = self._get_gc_stats() # Update high water mark if memory_info['rss_mb'] > self._high_water_mark: self._high_water_mark = memory_info['rss_mb'] # Calculate growth rate (MB/hour) if baseline is set growth_rate = 0.0 if self._baseline_memory: growth_rate = memory_info['rss_mb'] - self._baseline_memory # Record metrics metadata = { 'rss_mb': memory_info['rss_mb'], 'vms_mb': memory_info['vms_mb'], 'percent': memory_info['percent'], 'high_water_mb': self._high_water_mark, 'growth_mb': growth_rate, 'gc_collections': gc_stats['collections'], 'gc_collected': gc_stats['collected'], } record_metric( 'render', # Use 'render' operation type for memory metrics 'memory_usage', memory_info['rss_mb'], metadata, force=True # Always record memory metrics ) # Warn if significant growth detected (>10MB growth from baseline) if growth_rate > 10.0: logger.warning( f"Memory growth detected: +{growth_rate:.2f} MB from baseline " f"(current: {memory_info['rss_mb']:.2f} MB, baseline: {self._baseline_memory:.2f} MB)" ) def _get_memory_info(self) -> Dict[str, float]: """ Get current process memory usage Returns: Dict with memory info in MB """ memory = self._process.memory_info() return { 'rss_mb': memory.rss / (1024 * 1024), # Resident Set Size 'vms_mb': memory.vms / (1024 * 1024), # Virtual Memory Size 'percent': self._process.memory_percent(), } def _get_gc_stats(self) -> Dict[str, Any]: """ Get garbage collection statistics Returns: Dict with GC stats """ # Get collection counts per generation counts = gc.get_count() # Perform a quick gen 0 collection and count collected objects collected = gc.collect(0) return { 'collections': { 'gen0': counts[0], 'gen1': counts[1], 'gen2': counts[2], }, 'collected': collected, 'uncollectable': len(gc.garbage), } def stop(self): """ Stop the monitoring thread gracefully Sets the stop event to signal the thread to exit """ logger.info("Stopping memory monitor") self._stop_event.set() def get_stats(self) -> Dict[str, Any]: """ Get current memory statistics Returns: Dict with current memory stats """ if not self._baseline_memory: return {'status': 'initializing'} memory_info = self._get_memory_info() return { 'status': 'running', 'current_rss_mb': memory_info['rss_mb'], 'baseline_rss_mb': self._baseline_memory, 'growth_mb': memory_info['rss_mb'] - self._baseline_memory, 'high_water_mb': self._high_water_mark, 'percent': memory_info['percent'], }