Files
StarPunk/docs/design/v1.1.2/feed-enhancements-spec.md
Phil Skentelbery b0230b1233 feat: Complete v1.1.2 Phase 1 - Metrics Instrumentation
Implements the metrics instrumentation framework that was missing from v1.1.1.
The monitoring framework existed but was never actually used to collect metrics.

Phase 1 Deliverables:
- Database operation monitoring with query timing and slow query detection
- HTTP request/response metrics with request IDs for all requests
- Memory monitoring via daemon thread with configurable intervals
- Business metrics framework for notes, feeds, and cache operations
- Configuration management with environment variable support

Implementation Details:
- MonitoredConnection wrapper at pool level for transparent DB monitoring
- Flask middleware hooks for HTTP metrics collection
- Background daemon thread for memory statistics (skipped in test mode)
- Simple business metric helpers for integration in Phase 2
- Comprehensive test suite with 28/28 tests passing

Quality Metrics:
- 100% test pass rate (28/28 tests)
- Zero architectural deviations from specifications
- <1% performance overhead achieved
- Production-ready with minimal memory impact (~2MB)

Architect Review: APPROVED with excellent marks

Documentation:
- Implementation report: docs/reports/v1.1.2-phase1-metrics-implementation.md
- Architect review: docs/reviews/2025-11-26-v1.1.2-phase1-review.md
- Updated CHANGELOG.md with Phase 1 additions

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-26 14:13:44 -07:00

26 KiB

Feed Enhancements Specification - v1.1.2

Overview

This specification defines the feed system enhancements for StarPunk v1.1.2, including content negotiation, caching, statistics tracking, and OPML export capabilities.

Requirements

Functional Requirements

  1. Content Negotiation

    • Parse HTTP Accept headers
    • Score format preferences
    • Select optimal format
    • Handle quality factors (q=)
  2. Feed Caching

    • LRU cache with TTL
    • Format-specific caching
    • Invalidation on changes
    • Memory-bounded storage
  3. Statistics Dashboard

    • Track feed requests
    • Monitor cache performance
    • Analyze client usage
    • Display trends
  4. OPML Export

    • Generate OPML 2.0
    • Include all feed formats
    • Add feed metadata
    • Validate output

Non-Functional Requirements

  1. Performance

    • Cache hit rate >80%
    • Negotiation <1ms
    • Dashboard load <100ms
    • OPML generation <10ms
  2. Scalability

    • Bounded memory usage
    • Efficient cache eviction
    • Statistical sampling
    • Async processing

Content Negotiation

Design

Content negotiation determines the best feed format based on the client's Accept header.

class ContentNegotiator:
    """HTTP content negotiation for feed formats"""

    # MIME type mappings
    MIME_TYPES = {
        'rss': [
            'application/rss+xml',
            'application/xml',
            'text/xml',
            'application/x-rss+xml'
        ],
        'atom': [
            'application/atom+xml',
            'application/x-atom+xml'
        ],
        'json': [
            'application/json',
            'application/feed+json',
            'application/x-json-feed'
        ]
    }

    def negotiate(self, accept_header: str, available_formats: List[str] = None) -> str:
        """Negotiate best format from Accept header

        Args:
            accept_header: HTTP Accept header value
            available_formats: List of enabled formats (default: all)

        Returns:
            Selected format: 'rss', 'atom', or 'json'
        """
        if not available_formats:
            available_formats = ['rss', 'atom', 'json']

        # Parse Accept header
        accept_types = self._parse_accept_header(accept_header)

        # Score each format
        scores = {}
        for format_name in available_formats:
            scores[format_name] = self._score_format(format_name, accept_types)

        # Select highest scoring format
        if scores:
            best_format = max(scores, key=scores.get)
            if scores[best_format] > 0:
                return best_format

        # Default to RSS if no preference
        return 'rss' if 'rss' in available_formats else available_formats[0]

    def _parse_accept_header(self, accept_header: str) -> List[Dict[str, Any]]:
        """Parse Accept header into list of types with quality"""
        if not accept_header:
            return []

        types = []
        for part in accept_header.split(','):
            part = part.strip()
            if not part:
                continue

            # Split type and parameters
            parts = part.split(';')
            mime_type = parts[0].strip()

            # Parse quality factor
            quality = 1.0
            for param in parts[1:]:
                param = param.strip()
                if param.startswith('q='):
                    try:
                        quality = float(param[2:])
                    except ValueError:
                        quality = 1.0

            types.append({
                'type': mime_type,
                'quality': quality
            })

        # Sort by quality descending
        return sorted(types, key=lambda x: x['quality'], reverse=True)

    def _score_format(self, format_name: str, accept_types: List[Dict]) -> float:
        """Score a format against Accept types"""
        mime_types = self.MIME_TYPES.get(format_name, [])
        best_score = 0.0

        for accept in accept_types:
            accept_type = accept['type']
            quality = accept['quality']

            # Check for exact match
            if accept_type in mime_types:
                best_score = max(best_score, quality)

            # Check for wildcard matches
            elif accept_type == '*/*':
                best_score = max(best_score, quality * 0.1)

            elif accept_type == 'application/*':
                if any(m.startswith('application/') for m in mime_types):
                    best_score = max(best_score, quality * 0.5)

            elif accept_type == 'text/*':
                if any(m.startswith('text/') for m in mime_types):
                    best_score = max(best_score, quality * 0.5)

        return best_score

Accept Header Examples

Accept Header Selected Format Reason
application/atom+xml atom Exact match
application/json json JSON match
application/rss+xml, application/atom+xml;q=0.9 rss Higher quality
text/html, application/*;q=0.9 rss Wildcard match, RSS default
*/* rss No preference, use default
(empty) rss No header, use default

Feed Caching

Cache Design

from collections import OrderedDict
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional, Any
import hashlib

@dataclass
class CacheEntry:
    """Single cache entry with metadata"""
    key: str
    content: str
    content_type: str
    created_at: datetime
    expires_at: datetime
    hit_count: int = 0
    size_bytes: int = 0

class FeedCache:
    """LRU cache with TTL for feed content"""

    def __init__(self, max_size: int = 100, default_ttl: int = 300):
        """Initialize cache

        Args:
            max_size: Maximum number of entries
            default_ttl: Default TTL in seconds
        """
        self.max_size = max_size
        self.default_ttl = default_ttl
        self.cache = OrderedDict()
        self.stats = {
            'hits': 0,
            'misses': 0,
            'evictions': 0,
            'invalidations': 0
        }

    def get(self, format: str, limit: int, checksum: str) -> Optional[CacheEntry]:
        """Get cached feed if available and not expired"""
        key = self._make_key(format, limit, checksum)

        if key not in self.cache:
            self.stats['misses'] += 1
            return None

        entry = self.cache[key]

        # Check expiration
        if datetime.now() > entry.expires_at:
            del self.cache[key]
            self.stats['misses'] += 1
            return None

        # Move to end (LRU)
        self.cache.move_to_end(key)

        # Update stats
        entry.hit_count += 1
        self.stats['hits'] += 1

        return entry

    def set(self, format: str, limit: int, checksum: str, content: str,
            content_type: str, ttl: Optional[int] = None):
        """Store feed in cache"""
        key = self._make_key(format, limit, checksum)
        ttl = ttl or self.default_ttl

        # Create entry
        entry = CacheEntry(
            key=key,
            content=content,
            content_type=content_type,
            created_at=datetime.now(),
            expires_at=datetime.now() + timedelta(seconds=ttl),
            size_bytes=len(content.encode('utf-8'))
        )

        # Add to cache
        self.cache[key] = entry

        # Enforce size limit
        while len(self.cache) > self.max_size:
            # Remove oldest (first) item
            evicted_key = next(iter(self.cache))
            del self.cache[evicted_key]
            self.stats['evictions'] += 1

    def invalidate(self, pattern: Optional[str] = None):
        """Invalidate cache entries matching pattern"""
        if pattern is None:
            # Clear all
            count = len(self.cache)
            self.cache.clear()
            self.stats['invalidations'] += count
        else:
            # Clear matching keys
            keys_to_remove = [
                key for key in self.cache
                if pattern in key
            ]
            for key in keys_to_remove:
                del self.cache[key]
                self.stats['invalidations'] += 1

    def _make_key(self, format: str, limit: int, checksum: str) -> str:
        """Generate cache key"""
        return f"feed:{format}:{limit}:{checksum}"

    def get_stats(self) -> Dict[str, Any]:
        """Get cache statistics"""
        total_requests = self.stats['hits'] + self.stats['misses']
        hit_rate = (self.stats['hits'] / total_requests * 100) if total_requests > 0 else 0

        # Calculate memory usage
        total_bytes = sum(entry.size_bytes for entry in self.cache.values())

        return {
            'entries': len(self.cache),
            'max_entries': self.max_size,
            'memory_mb': total_bytes / (1024 * 1024),
            'hit_rate': hit_rate,
            'hits': self.stats['hits'],
            'misses': self.stats['misses'],
            'evictions': self.stats['evictions'],
            'invalidations': self.stats['invalidations']
        }

class ContentChecksum:
    """Generate checksums for cache invalidation"""

    @staticmethod
    def calculate(notes: List[Note], config: Dict) -> str:
        """Calculate checksum based on content state"""
        # Use latest note timestamp and count
        if notes:
            latest_timestamp = max(n.updated_at or n.created_at for n in notes)
            checksum_data = f"{latest_timestamp.isoformat()}:{len(notes)}"
        else:
            checksum_data = "empty:0"

        # Include configuration that affects output
        config_data = f"{config.get('site_name')}:{config.get('site_url')}"

        # Generate hash
        combined = f"{checksum_data}:{config_data}"
        return hashlib.md5(combined.encode()).hexdigest()[:8]

Cache Integration

# In feed route handler
@app.route('/feed.<format>')
def serve_feed(format):
    """Serve feed in requested format"""
    # Content negotiation if format not specified
    if format == 'feed':
        negotiator = ContentNegotiator()
        format = negotiator.negotiate(request.headers.get('Accept'))

    # Get notes and calculate checksum
    notes = get_published_notes()
    checksum = ContentChecksum.calculate(notes, app.config)

    # Check cache
    cached = feed_cache.get(format, limit=50, checksum=checksum)
    if cached:
        return Response(
            cached.content,
            mimetype=cached.content_type,
            headers={'X-Cache': 'HIT'}
        )

    # Generate feed
    if format == 'rss':
        content = rss_generator.generate(notes)
        content_type = 'application/rss+xml'
    elif format == 'atom':
        content = atom_generator.generate(notes)
        content_type = 'application/atom+xml'
    elif format == 'json':
        content = json_generator.generate(notes)
        content_type = 'application/feed+json'
    else:
        abort(404)

    # Cache the result
    feed_cache.set(format, 50, checksum, content, content_type)

    return Response(
        content,
        mimetype=content_type,
        headers={'X-Cache': 'MISS'}
    )

Statistics Dashboard

Dashboard Design

class SyndicationStats:
    """Collect and analyze syndication statistics"""

    def __init__(self):
        self.requests = defaultdict(int)  # By format
        self.user_agents = defaultdict(int)
        self.generation_times = defaultdict(list)
        self.errors = deque(maxlen=100)

    def record_request(self, format: str, user_agent: str, cached: bool,
                      generation_time: Optional[float] = None):
        """Record feed request"""
        self.requests[format] += 1
        self.user_agents[self._normalize_user_agent(user_agent)] += 1

        if generation_time is not None:
            self.generation_times[format].append(generation_time)
            # Keep only last 1000 times
            if len(self.generation_times[format]) > 1000:
                self.generation_times[format] = self.generation_times[format][-1000:]

    def record_error(self, format: str, error: str):
        """Record feed generation error"""
        self.errors.append({
            'timestamp': datetime.now(),
            'format': format,
            'error': error
        })

    def get_summary(self) -> Dict[str, Any]:
        """Get statistics summary"""
        total_requests = sum(self.requests.values())

        # Calculate format distribution
        format_distribution = {
            format: (count / total_requests * 100) if total_requests > 0 else 0
            for format, count in self.requests.items()
        }

        # Top user agents
        top_agents = sorted(
            self.user_agents.items(),
            key=lambda x: x[1],
            reverse=True
        )[:10]

        # Generation time stats
        time_stats = {}
        for format, times in self.generation_times.items():
            if times:
                sorted_times = sorted(times)
                time_stats[format] = {
                    'avg': sum(times) / len(times),
                    'p50': sorted_times[len(times) // 2],
                    'p95': sorted_times[int(len(times) * 0.95)],
                    'p99': sorted_times[int(len(times) * 0.99)]
                }

        return {
            'total_requests': total_requests,
            'format_distribution': format_distribution,
            'top_user_agents': top_agents,
            'generation_times': time_stats,
            'recent_errors': list(self.errors)
        }

    def _normalize_user_agent(self, user_agent: str) -> str:
        """Normalize user agent for grouping"""
        if not user_agent:
            return 'Unknown'

        # Common patterns
        patterns = [
            (r'Feedly', 'Feedly'),
            (r'Inoreader', 'Inoreader'),
            (r'NewsBlur', 'NewsBlur'),
            (r'Tiny Tiny RSS', 'Tiny Tiny RSS'),
            (r'FreshRSS', 'FreshRSS'),
            (r'NetNewsWire', 'NetNewsWire'),
            (r'Feedbin', 'Feedbin'),
            (r'bot|Bot|crawler|Crawler', 'Bot/Crawler'),
            (r'Mozilla.*Firefox', 'Firefox'),
            (r'Mozilla.*Chrome', 'Chrome'),
            (r'Mozilla.*Safari', 'Safari')
        ]

        import re
        for pattern, name in patterns:
            if re.search(pattern, user_agent):
                return name

        return 'Other'

Dashboard Template

<!-- templates/admin/syndication.html -->
{% extends "admin/base.html" %}

{% block title %}Syndication Dashboard{% endblock %}

{% block content %}
<div class="syndication-dashboard">
    <h2>Syndication Statistics</h2>

    <!-- Overview Cards -->
    <div class="stats-grid">
        <div class="stat-card">
            <h3>Total Requests</h3>
            <p class="stat-value">{{ stats.total_requests }}</p>
        </div>
        <div class="stat-card">
            <h3>Cache Hit Rate</h3>
            <p class="stat-value">{{ cache_stats.hit_rate|round(1) }}%</p>
        </div>
        <div class="stat-card">
            <h3>Active Formats</h3>
            <p class="stat-value">{{ stats.format_distribution|length }}</p>
        </div>
        <div class="stat-card">
            <h3>Cache Memory</h3>
            <p class="stat-value">{{ cache_stats.memory_mb|round(2) }}MB</p>
        </div>
    </div>

    <!-- Format Distribution -->
    <div class="chart-container">
        <h3>Format Distribution</h3>
        <canvas id="format-chart"></canvas>
    </div>

    <!-- Top User Agents -->
    <div class="table-container">
        <h3>Top Feed Readers</h3>
        <table>
            <thead>
                <tr>
                    <th>Reader</th>
                    <th>Requests</th>
                    <th>Percentage</th>
                </tr>
            </thead>
            <tbody>
                {% for agent, count in stats.top_user_agents %}
                <tr>
                    <td>{{ agent }}</td>
                    <td>{{ count }}</td>
                    <td>{{ (count / stats.total_requests * 100)|round(1) }}%</td>
                </tr>
                {% endfor %}
            </tbody>
        </table>
    </div>

    <!-- Generation Performance -->
    <div class="table-container">
        <h3>Generation Performance</h3>
        <table>
            <thead>
                <tr>
                    <th>Format</th>
                    <th>Avg (ms)</th>
                    <th>P50 (ms)</th>
                    <th>P95 (ms)</th>
                    <th>P99 (ms)</th>
                </tr>
            </thead>
            <tbody>
                {% for format, times in stats.generation_times.items() %}
                <tr>
                    <td>{{ format|upper }}</td>
                    <td>{{ (times.avg * 1000)|round(1) }}</td>
                    <td>{{ (times.p50 * 1000)|round(1) }}</td>
                    <td>{{ (times.p95 * 1000)|round(1) }}</td>
                    <td>{{ (times.p99 * 1000)|round(1) }}</td>
                </tr>
                {% endfor %}
            </tbody>
        </table>
    </div>

    <!-- Recent Errors -->
    {% if stats.recent_errors %}
    <div class="error-log">
        <h3>Recent Errors</h3>
        <ul>
            {% for error in stats.recent_errors[-10:] %}
            <li>
                <span class="timestamp">{{ error.timestamp|timeago }}</span>
                <span class="format">{{ error.format }}</span>
                <span class="error">{{ error.error }}</span>
            </li>
            {% endfor %}
        </ul>
    </div>
    {% endif %}

    <!-- Feed URLs -->
    <div class="feed-urls">
        <h3>Available Feeds</h3>
        <ul>
            <li>RSS: <code>{{ url_for('serve_feed', format='rss', _external=True) }}</code></li>
            <li>ATOM: <code>{{ url_for('serve_feed', format='atom', _external=True) }}</code></li>
            <li>JSON: <code>{{ url_for('serve_feed', format='json', _external=True) }}</code></li>
            <li>OPML: <code>{{ url_for('export_opml', _external=True) }}</code></li>
        </ul>
    </div>
</div>

<script>
// Format distribution pie chart
const ctx = document.getElementById('format-chart').getContext('2d');
new Chart(ctx, {
    type: 'pie',
    data: {
        labels: {{ stats.format_distribution.keys()|list|tojson }},
        datasets: [{
            data: {{ stats.format_distribution.values()|list|tojson }},
            backgroundColor: ['#FF6384', '#36A2EB', '#FFCE56']
        }]
    }
});
</script>
{% endblock %}

OPML Export

OPML Generator

from xml.etree.ElementTree import Element, SubElement, tostring
from xml.dom import minidom

class OPMLGenerator:
    """Generate OPML 2.0 feed list"""

    def __init__(self, site_url: str, site_name: str, owner_name: str = None,
                 owner_email: str = None):
        self.site_url = site_url.rstrip('/')
        self.site_name = site_name
        self.owner_name = owner_name
        self.owner_email = owner_email

    def generate(self, include_formats: List[str] = None) -> str:
        """Generate OPML document

        Args:
            include_formats: List of formats to include (default: all enabled)

        Returns:
            OPML 2.0 XML string
        """
        if not include_formats:
            include_formats = ['rss', 'atom', 'json']

        # Create root element
        opml = Element('opml', version='2.0')

        # Add head
        head = SubElement(opml, 'head')
        SubElement(head, 'title').text = f"{self.site_name} Feeds"
        SubElement(head, 'dateCreated').text = datetime.now(timezone.utc).strftime(
            '%a, %d %b %Y %H:%M:%S %z'
        )
        SubElement(head, 'dateModified').text = datetime.now(timezone.utc).strftime(
            '%a, %d %b %Y %H:%M:%S %z'
        )

        if self.owner_name:
            SubElement(head, 'ownerName').text = self.owner_name
        if self.owner_email:
            SubElement(head, 'ownerEmail').text = self.owner_email

        # Add body with outlines
        body = SubElement(opml, 'body')

        # Add feed outlines
        if 'rss' in include_formats:
            SubElement(body, 'outline',
                      type='rss',
                      text=f"{self.site_name} - RSS Feed",
                      title=f"{self.site_name} - RSS Feed",
                      xmlUrl=f"{self.site_url}/feed.xml",
                      htmlUrl=self.site_url)

        if 'atom' in include_formats:
            SubElement(body, 'outline',
                      type='atom',
                      text=f"{self.site_name} - ATOM Feed",
                      title=f"{self.site_name} - ATOM Feed",
                      xmlUrl=f"{self.site_url}/feed.atom",
                      htmlUrl=self.site_url)

        if 'json' in include_formats:
            SubElement(body, 'outline',
                      type='json',
                      text=f"{self.site_name} - JSON Feed",
                      title=f"{self.site_name} - JSON Feed",
                      xmlUrl=f"{self.site_url}/feed.json",
                      htmlUrl=self.site_url)

        # Convert to pretty XML
        rough_string = tostring(opml, encoding='unicode')
        reparsed = minidom.parseString(rough_string)
        return reparsed.toprettyxml(indent='  ', encoding='UTF-8').decode('utf-8')

OPML Example Output

<?xml version="1.0" encoding="UTF-8"?>
<opml version="2.0">
  <head>
    <title>StarPunk Notes Feeds</title>
    <dateCreated>Mon, 25 Nov 2024 12:00:00 +0000</dateCreated>
    <dateModified>Mon, 25 Nov 2024 12:00:00 +0000</dateModified>
    <ownerName>John Doe</ownerName>
    <ownerEmail>john@example.com</ownerEmail>
  </head>
  <body>
    <outline type="rss"
             text="StarPunk Notes - RSS Feed"
             title="StarPunk Notes - RSS Feed"
             xmlUrl="https://example.com/feed.xml"
             htmlUrl="https://example.com"/>
    <outline type="atom"
             text="StarPunk Notes - ATOM Feed"
             title="StarPunk Notes - ATOM Feed"
             xmlUrl="https://example.com/feed.atom"
             htmlUrl="https://example.com"/>
    <outline type="json"
             text="StarPunk Notes - JSON Feed"
             title="StarPunk Notes - JSON Feed"
             xmlUrl="https://example.com/feed.json"
             htmlUrl="https://example.com"/>
  </body>
</opml>

Testing Strategy

Content Negotiation Tests

def test_content_negotiation():
    """Test Accept header parsing and format selection"""
    negotiator = ContentNegotiator()

    # Test exact matches
    assert negotiator.negotiate('application/atom+xml') == 'atom'
    assert negotiator.negotiate('application/feed+json') == 'json'
    assert negotiator.negotiate('application/rss+xml') == 'rss'

    # Test quality factors
    assert negotiator.negotiate('application/atom+xml;q=0.8, application/rss+xml') == 'rss'

    # Test wildcards
    assert negotiator.negotiate('*/*') == 'rss'  # Default
    assert negotiator.negotiate('application/*') == 'rss'  # First application type

    # Test no preference
    assert negotiator.negotiate('') == 'rss'
    assert negotiator.negotiate('text/html') == 'rss'

Cache Tests

def test_feed_cache():
    """Test LRU cache with TTL"""
    cache = FeedCache(max_size=3, default_ttl=1)

    # Test set and get
    cache.set('rss', 50, 'abc123', '<rss>content</rss>', 'application/rss+xml')
    entry = cache.get('rss', 50, 'abc123')
    assert entry is not None
    assert entry.content == '<rss>content</rss>'

    # Test expiration
    time.sleep(1.1)
    entry = cache.get('rss', 50, 'abc123')
    assert entry is None

    # Test LRU eviction
    cache.set('rss', 50, 'aaa', 'content1', 'application/rss+xml')
    cache.set('atom', 50, 'bbb', 'content2', 'application/atom+xml')
    cache.set('json', 50, 'ccc', 'content3', 'application/json')
    cache.set('rss', 100, 'ddd', 'content4', 'application/rss+xml')  # Evicts oldest

    assert cache.get('rss', 50, 'aaa') is None  # Evicted
    assert cache.get('atom', 50, 'bbb') is not None  # Still present

Statistics Tests

def test_syndication_stats():
    """Test statistics collection"""
    stats = SyndicationStats()

    # Record requests
    stats.record_request('rss', 'Feedly/1.0', cached=False, generation_time=0.05)
    stats.record_request('atom', 'Inoreader/1.0', cached=True)
    stats.record_request('json', 'NetNewsWire/6.0', cached=False, generation_time=0.03)

    summary = stats.get_summary()
    assert summary['total_requests'] == 3
    assert 'rss' in summary['format_distribution']
    assert len(summary['top_user_agents']) > 0

OPML Tests

def test_opml_generation():
    """Test OPML export"""
    generator = OPMLGenerator(
        site_url='https://example.com',
        site_name='Test Site',
        owner_name='John Doe'
    )

    opml = generator.generate(['rss', 'atom', 'json'])

    # Parse and validate
    import xml.etree.ElementTree as ET
    root = ET.fromstring(opml)

    assert root.tag == 'opml'
    assert root.get('version') == '2.0'

    # Check outlines
    outlines = root.findall('.//outline')
    assert len(outlines) == 3
    assert outlines[0].get('type') == 'rss'
    assert outlines[1].get('type') == 'atom'
    assert outlines[2].get('type') == 'json'

Performance Benchmarks

Negotiation Performance

def benchmark_content_negotiation():
    """Benchmark negotiation speed"""
    negotiator = ContentNegotiator()
    complex_header = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'

    start = time.perf_counter()
    for _ in range(10000):
        negotiator.negotiate(complex_header)
    duration = time.perf_counter() - start

    per_call = (duration / 10000) * 1000  # Convert to ms
    assert per_call < 1.0  # Less than 1ms per negotiation

Configuration

# Content negotiation
STARPUNK_FEED_NEGOTIATION_ENABLED=true
STARPUNK_FEED_DEFAULT_FORMAT=rss

# Cache settings
STARPUNK_FEED_CACHE_ENABLED=true
STARPUNK_FEED_CACHE_SIZE=100
STARPUNK_FEED_CACHE_TTL=300
STARPUNK_FEED_CACHE_MEMORY_LIMIT=10  # MB

# Statistics
STARPUNK_FEED_STATS_ENABLED=true
STARPUNK_FEED_STATS_RETENTION=7  # days

# OPML
STARPUNK_FEED_OPML_ENABLED=true
STARPUNK_FEED_OPML_OWNER_NAME=
STARPUNK_FEED_OPML_OWNER_EMAIL=

Security Considerations

  1. Cache Poisoning: Validate all cached content
  2. Header Injection: Sanitize Accept headers
  3. Memory Exhaustion: Limit cache size
  4. Statistics Privacy: Don't log sensitive data
  5. OPML Injection: Escape all XML content

Acceptance Criteria

  1. Content negotiation working correctly
  2. Cache hit rate >80% achieved
  3. Statistics dashboard functional
  4. OPML export valid
  5. Memory usage bounded
  6. Performance targets met
  7. All formats properly cached
  8. Invalidation working
  9. User agent detection accurate
  10. Security review passed