Files
StarPunk/starpunk/monitoring/metrics.py
Phil Skentelbery 1e2135a49a fix: Resolve v1.1.2-rc.1 production issues - Static files and metrics
This release candidate fixes two critical production issues discovered in v1.1.2-rc.1:

1. CRITICAL: Static files returning 500 errors
   - HTTP monitoring middleware was accessing response.data on streaming responses
   - Fixed by checking direct_passthrough flag before accessing response data
   - Static files (CSS, JS, images) now load correctly
   - File: starpunk/monitoring/http.py

2. HIGH: Database metrics showing zero
   - Configuration key mismatch: config set METRICS_SAMPLING_RATE (singular),
     buffer read METRICS_SAMPLING_RATES (plural)
   - Fixed by standardizing on singular key name
   - Modified MetricsBuffer to accept both float and dict for flexibility
   - Changed default sampling from 10% to 100% for better visibility
   - Files: starpunk/monitoring/metrics.py, starpunk/config.py

Version: 1.1.2-rc.2

Documentation:
- Investigation report: docs/reports/2025-11-28-v1.1.2-rc.1-production-issues.md
- Architect review: docs/reviews/2025-11-28-v1.1.2-rc.1-architect-review.md
- Implementation report: docs/reports/2025-11-28-v1.1.2-rc.2-fixes.md

Testing: All monitoring tests pass (28/28)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-28 09:46:31 -07:00

426 lines
13 KiB
Python

"""
Metrics collection and buffering for performance monitoring
Per ADR-053 and developer Q&A Q6, Q12:
- Per-process circular buffers using deque
- Configurable buffer size (default 1000 entries)
- Include process ID in all metrics
- Configuration-based sampling rates
- Operation types: database, http, render
Example usage:
>>> from starpunk.monitoring import record_metric, get_metrics
>>>
>>> # Record a database operation
>>> record_metric('database', 'query', duration_ms=45.2, query='SELECT * FROM notes')
>>>
>>> # Get all metrics
>>> metrics = get_metrics()
>>> print(f"Collected {len(metrics)} metrics")
"""
import os
import random
import time
from collections import deque
from dataclasses import dataclass, field, asdict
from datetime import datetime
from threading import Lock
from typing import Any, Deque, Dict, List, Literal, Optional, Union
# Operation types for categorizing metrics
OperationType = Literal["database", "http", "render"]
# Module-level circular buffer (per-process)
# Each process in a multi-process deployment maintains its own buffer
_metrics_buffer: Optional["MetricsBuffer"] = None
_buffer_lock = Lock()
@dataclass
class Metric:
"""
Represents a single performance metric
Attributes:
operation_type: Type of operation (database/http/render)
operation_name: Name/description of operation
timestamp: When the metric was recorded (ISO format)
duration_ms: Duration in milliseconds
process_id: Process ID that recorded the metric
metadata: Additional operation-specific data
"""
operation_type: OperationType
operation_name: str
timestamp: str
duration_ms: float
process_id: int
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert metric to dictionary for serialization"""
return asdict(self)
class MetricsBuffer:
"""
Circular buffer for storing performance metrics
Per developer Q&A Q6:
- Uses deque for efficient circular buffer
- Per-process storage (not shared across workers)
- Thread-safe with locking
- Configurable max size (default 1000)
- Automatic eviction of oldest entries when full
Per developer Q&A Q12:
- Configurable sampling rates per operation type
- Default 100% sampling (suitable for low-traffic sites)
- Slow queries always logged regardless of sampling
Example:
>>> buffer = MetricsBuffer(max_size=1000)
>>> buffer.record('database', 'query', 45.2, {'query': 'SELECT ...'})
>>> metrics = buffer.get_all()
"""
def __init__(
self,
max_size: int = 1000,
sampling_rates: Optional[Union[Dict[OperationType, float], float]] = None
):
"""
Initialize metrics buffer
Args:
max_size: Maximum number of metrics to store
sampling_rates: Either:
- float: Global sampling rate for all operation types (0.0-1.0)
- dict: Mapping operation type to sampling rate
Default: 1.0 (100% sampling)
"""
self.max_size = max_size
self._buffer: Deque[Metric] = deque(maxlen=max_size)
self._lock = Lock()
self._process_id = os.getpid()
# Handle different sampling_rates types
if sampling_rates is None:
# Default to 100% sampling for all types
self._sampling_rates = {
"database": 1.0,
"http": 1.0,
"render": 1.0,
}
elif isinstance(sampling_rates, (int, float)):
# Global rate for all types
rate = float(sampling_rates)
self._sampling_rates = {
"database": rate,
"http": rate,
"render": rate,
}
else:
# Dict with per-type rates
self._sampling_rates = sampling_rates
def record(
self,
operation_type: OperationType,
operation_name: str,
duration_ms: float,
metadata: Optional[Dict[str, Any]] = None,
force: bool = False
) -> bool:
"""
Record a performance metric
Args:
operation_type: Type of operation (database/http/render)
operation_name: Name/description of operation
duration_ms: Duration in milliseconds
metadata: Additional operation-specific data
force: If True, bypass sampling (for slow query logging)
Returns:
True if metric was recorded, False if skipped due to sampling
Example:
>>> buffer.record('database', 'SELECT notes', 45.2,
... {'query': 'SELECT * FROM notes LIMIT 10'})
True
"""
# Apply sampling (unless forced)
if not force:
sampling_rate = self._sampling_rates.get(operation_type, 0.1)
if random.random() > sampling_rate:
return False
metric = Metric(
operation_type=operation_type,
operation_name=operation_name,
timestamp=datetime.utcnow().isoformat() + "Z",
duration_ms=duration_ms,
process_id=self._process_id,
metadata=metadata or {}
)
with self._lock:
self._buffer.append(metric)
return True
def get_all(self) -> List[Metric]:
"""
Get all metrics from buffer
Returns:
List of metrics (oldest to newest)
Example:
>>> metrics = buffer.get_all()
>>> len(metrics)
1000
"""
with self._lock:
return list(self._buffer)
def get_recent(self, count: int) -> List[Metric]:
"""
Get most recent N metrics
Args:
count: Number of recent metrics to return
Returns:
List of most recent metrics (newest first)
Example:
>>> recent = buffer.get_recent(10)
>>> len(recent)
10
"""
with self._lock:
# Convert to list, reverse to get newest first, then slice
all_metrics = list(self._buffer)
all_metrics.reverse()
return all_metrics[:count]
def get_by_type(self, operation_type: OperationType) -> List[Metric]:
"""
Get all metrics of a specific type
Args:
operation_type: Type to filter by (database/http/render)
Returns:
List of metrics matching the type
Example:
>>> db_metrics = buffer.get_by_type('database')
"""
with self._lock:
return [m for m in self._buffer if m.operation_type == operation_type]
def get_slow_operations(
self,
threshold_ms: float = 1000.0,
operation_type: Optional[OperationType] = None
) -> List[Metric]:
"""
Get operations that exceeded a duration threshold
Args:
threshold_ms: Duration threshold in milliseconds
operation_type: Optional type filter
Returns:
List of slow operations
Example:
>>> slow_queries = buffer.get_slow_operations(1000, 'database')
"""
with self._lock:
metrics = list(self._buffer)
# Filter by type if specified
if operation_type:
metrics = [m for m in metrics if m.operation_type == operation_type]
# Filter by duration threshold
return [m for m in metrics if m.duration_ms >= threshold_ms]
def get_stats(self) -> Dict[str, Any]:
"""
Get statistics about the buffer
Returns:
Dict with buffer statistics
Example:
>>> stats = buffer.get_stats()
>>> stats['total_count']
1000
"""
with self._lock:
metrics = list(self._buffer)
# Calculate stats per operation type
type_stats = {}
for op_type in ["database", "http", "render"]:
type_metrics = [m for m in metrics if m.operation_type == op_type]
if type_metrics:
durations = [m.duration_ms for m in type_metrics]
type_stats[op_type] = {
"count": len(type_metrics),
"avg_duration_ms": sum(durations) / len(durations),
"min_duration_ms": min(durations),
"max_duration_ms": max(durations),
}
else:
type_stats[op_type] = {
"count": 0,
"avg_duration_ms": 0.0,
"min_duration_ms": 0.0,
"max_duration_ms": 0.0,
}
return {
"total_count": len(metrics),
"max_size": self.max_size,
"process_id": self._process_id,
"sampling_rates": self._sampling_rates,
"by_type": type_stats,
}
def clear(self) -> None:
"""
Clear all metrics from buffer
Example:
>>> buffer.clear()
"""
with self._lock:
self._buffer.clear()
def set_sampling_rate(
self,
operation_type: OperationType,
rate: float
) -> None:
"""
Update sampling rate for an operation type
Args:
operation_type: Type to update
rate: New sampling rate (0.0-1.0)
Example:
>>> buffer.set_sampling_rate('database', 0.5) # 50% sampling
"""
if not 0.0 <= rate <= 1.0:
raise ValueError("Sampling rate must be between 0.0 and 1.0")
with self._lock:
self._sampling_rates[operation_type] = rate
def get_buffer() -> MetricsBuffer:
"""
Get or create the module-level metrics buffer
This ensures a single buffer per process. In multi-process deployments
(e.g., gunicorn), each worker process will have its own buffer.
Returns:
MetricsBuffer instance for this process
Example:
>>> buffer = get_buffer()
>>> buffer.record('database', 'query', 45.2)
"""
global _metrics_buffer
if _metrics_buffer is None:
with _buffer_lock:
# Double-check locking pattern
if _metrics_buffer is None:
# Get configuration from Flask app if available
try:
from flask import current_app
max_size = current_app.config.get('METRICS_BUFFER_SIZE', 1000)
sampling_rate = current_app.config.get('METRICS_SAMPLING_RATE', 1.0)
except (ImportError, RuntimeError):
# Flask not available or no app context
max_size = 1000
sampling_rate = 1.0 # Default to 100%
_metrics_buffer = MetricsBuffer(
max_size=max_size,
sampling_rates=sampling_rate
)
return _metrics_buffer
def record_metric(
operation_type: OperationType,
operation_name: str,
duration_ms: float,
metadata: Optional[Dict[str, Any]] = None,
force: bool = False
) -> bool:
"""
Record a metric using the module-level buffer
Convenience function that uses get_buffer() internally.
Args:
operation_type: Type of operation (database/http/render)
operation_name: Name/description of operation
duration_ms: Duration in milliseconds
metadata: Additional operation-specific data
force: If True, bypass sampling (for slow query logging)
Returns:
True if metric was recorded, False if skipped due to sampling
Example:
>>> record_metric('database', 'SELECT notes', 45.2,
... {'query': 'SELECT * FROM notes LIMIT 10'})
True
"""
buffer = get_buffer()
return buffer.record(operation_type, operation_name, duration_ms, metadata, force)
def get_metrics() -> List[Metric]:
"""
Get all metrics from the module-level buffer
Returns:
List of metrics (oldest to newest)
Example:
>>> metrics = get_metrics()
>>> len(metrics)
1000
"""
buffer = get_buffer()
return buffer.get_all()
def get_metrics_stats() -> Dict[str, Any]:
"""
Get statistics from the module-level buffer
Returns:
Dict with buffer statistics
Example:
>>> stats = get_metrics_stats()
>>> print(f"Total metrics: {stats['total_count']}")
"""
buffer = get_buffer()
return buffer.get_stats()