# Feed Enhancements Specification - v1.1.2 ## Overview This specification defines the feed system enhancements for StarPunk v1.1.2, including content negotiation, caching, statistics tracking, and OPML export capabilities. ## Requirements ### Functional Requirements 1. **Content Negotiation** - Parse HTTP Accept headers - Score format preferences - Select optimal format - Handle quality factors (q=) 2. **Feed Caching** - LRU cache with TTL - Format-specific caching - Invalidation on changes - Memory-bounded storage 3. **Statistics Dashboard** - Track feed requests - Monitor cache performance - Analyze client usage - Display trends 4. **OPML Export** - Generate OPML 2.0 - Include all feed formats - Add feed metadata - Validate output ### Non-Functional Requirements 1. **Performance** - Cache hit rate >80% - Negotiation <1ms - Dashboard load <100ms - OPML generation <10ms 2. **Scalability** - Bounded memory usage - Efficient cache eviction - Statistical sampling - Async processing ## Content Negotiation ### Design Content negotiation determines the best feed format based on the client's Accept header. ```python class ContentNegotiator: """HTTP content negotiation for feed formats""" # MIME type mappings MIME_TYPES = { 'rss': [ 'application/rss+xml', 'application/xml', 'text/xml', 'application/x-rss+xml' ], 'atom': [ 'application/atom+xml', 'application/x-atom+xml' ], 'json': [ 'application/json', 'application/feed+json', 'application/x-json-feed' ] } def negotiate(self, accept_header: str, available_formats: List[str] = None) -> str: """Negotiate best format from Accept header Args: accept_header: HTTP Accept header value available_formats: List of enabled formats (default: all) Returns: Selected format: 'rss', 'atom', or 'json' """ if not available_formats: available_formats = ['rss', 'atom', 'json'] # Parse Accept header accept_types = self._parse_accept_header(accept_header) # Score each format scores = {} for format_name in available_formats: scores[format_name] = self._score_format(format_name, accept_types) # Select highest scoring format if scores: best_format = max(scores, key=scores.get) if scores[best_format] > 0: return best_format # Default to RSS if no preference return 'rss' if 'rss' in available_formats else available_formats[0] def _parse_accept_header(self, accept_header: str) -> List[Dict[str, Any]]: """Parse Accept header into list of types with quality""" if not accept_header: return [] types = [] for part in accept_header.split(','): part = part.strip() if not part: continue # Split type and parameters parts = part.split(';') mime_type = parts[0].strip() # Parse quality factor quality = 1.0 for param in parts[1:]: param = param.strip() if param.startswith('q='): try: quality = float(param[2:]) except ValueError: quality = 1.0 types.append({ 'type': mime_type, 'quality': quality }) # Sort by quality descending return sorted(types, key=lambda x: x['quality'], reverse=True) def _score_format(self, format_name: str, accept_types: List[Dict]) -> float: """Score a format against Accept types""" mime_types = self.MIME_TYPES.get(format_name, []) best_score = 0.0 for accept in accept_types: accept_type = accept['type'] quality = accept['quality'] # Check for exact match if accept_type in mime_types: best_score = max(best_score, quality) # Check for wildcard matches elif accept_type == '*/*': best_score = max(best_score, quality * 0.1) elif accept_type == 'application/*': if any(m.startswith('application/') for m in mime_types): best_score = max(best_score, quality * 0.5) elif accept_type == 'text/*': if any(m.startswith('text/') for m in mime_types): best_score = max(best_score, quality * 0.5) return best_score ``` ### Accept Header Examples | Accept Header | Selected Format | Reason | |--------------|-----------------|--------| | `application/atom+xml` | atom | Exact match | | `application/json` | json | JSON match | | `application/rss+xml, application/atom+xml;q=0.9` | rss | Higher quality | | `text/html, application/*;q=0.9` | rss | Wildcard match, RSS default | | `*/*` | rss | No preference, use default | | (empty) | rss | No header, use default | ## Feed Caching ### Cache Design ```python from collections import OrderedDict from dataclasses import dataclass from datetime import datetime, timedelta from typing import Optional, Any import hashlib @dataclass class CacheEntry: """Single cache entry with metadata""" key: str content: str content_type: str created_at: datetime expires_at: datetime hit_count: int = 0 size_bytes: int = 0 class FeedCache: """LRU cache with TTL for feed content""" def __init__(self, max_size: int = 100, default_ttl: int = 300): """Initialize cache Args: max_size: Maximum number of entries default_ttl: Default TTL in seconds """ self.max_size = max_size self.default_ttl = default_ttl self.cache = OrderedDict() self.stats = { 'hits': 0, 'misses': 0, 'evictions': 0, 'invalidations': 0 } def get(self, format: str, limit: int, checksum: str) -> Optional[CacheEntry]: """Get cached feed if available and not expired""" key = self._make_key(format, limit, checksum) if key not in self.cache: self.stats['misses'] += 1 return None entry = self.cache[key] # Check expiration if datetime.now() > entry.expires_at: del self.cache[key] self.stats['misses'] += 1 return None # Move to end (LRU) self.cache.move_to_end(key) # Update stats entry.hit_count += 1 self.stats['hits'] += 1 return entry def set(self, format: str, limit: int, checksum: str, content: str, content_type: str, ttl: Optional[int] = None): """Store feed in cache""" key = self._make_key(format, limit, checksum) ttl = ttl or self.default_ttl # Create entry entry = CacheEntry( key=key, content=content, content_type=content_type, created_at=datetime.now(), expires_at=datetime.now() + timedelta(seconds=ttl), size_bytes=len(content.encode('utf-8')) ) # Add to cache self.cache[key] = entry # Enforce size limit while len(self.cache) > self.max_size: # Remove oldest (first) item evicted_key = next(iter(self.cache)) del self.cache[evicted_key] self.stats['evictions'] += 1 def invalidate(self, pattern: Optional[str] = None): """Invalidate cache entries matching pattern""" if pattern is None: # Clear all count = len(self.cache) self.cache.clear() self.stats['invalidations'] += count else: # Clear matching keys keys_to_remove = [ key for key in self.cache if pattern in key ] for key in keys_to_remove: del self.cache[key] self.stats['invalidations'] += 1 def _make_key(self, format: str, limit: int, checksum: str) -> str: """Generate cache key""" return f"feed:{format}:{limit}:{checksum}" def get_stats(self) -> Dict[str, Any]: """Get cache statistics""" total_requests = self.stats['hits'] + self.stats['misses'] hit_rate = (self.stats['hits'] / total_requests * 100) if total_requests > 0 else 0 # Calculate memory usage total_bytes = sum(entry.size_bytes for entry in self.cache.values()) return { 'entries': len(self.cache), 'max_entries': self.max_size, 'memory_mb': total_bytes / (1024 * 1024), 'hit_rate': hit_rate, 'hits': self.stats['hits'], 'misses': self.stats['misses'], 'evictions': self.stats['evictions'], 'invalidations': self.stats['invalidations'] } class ContentChecksum: """Generate checksums for cache invalidation""" @staticmethod def calculate(notes: List[Note], config: Dict) -> str: """Calculate checksum based on content state""" # Use latest note timestamp and count if notes: latest_timestamp = max(n.updated_at or n.created_at for n in notes) checksum_data = f"{latest_timestamp.isoformat()}:{len(notes)}" else: checksum_data = "empty:0" # Include configuration that affects output config_data = f"{config.get('site_name')}:{config.get('site_url')}" # Generate hash combined = f"{checksum_data}:{config_data}" return hashlib.md5(combined.encode()).hexdigest()[:8] ``` ### Cache Integration ```python # In feed route handler @app.route('/feed.') def serve_feed(format): """Serve feed in requested format""" # Content negotiation if format not specified if format == 'feed': negotiator = ContentNegotiator() format = negotiator.negotiate(request.headers.get('Accept')) # Get notes and calculate checksum notes = get_published_notes() checksum = ContentChecksum.calculate(notes, app.config) # Check cache cached = feed_cache.get(format, limit=50, checksum=checksum) if cached: return Response( cached.content, mimetype=cached.content_type, headers={'X-Cache': 'HIT'} ) # Generate feed if format == 'rss': content = rss_generator.generate(notes) content_type = 'application/rss+xml' elif format == 'atom': content = atom_generator.generate(notes) content_type = 'application/atom+xml' elif format == 'json': content = json_generator.generate(notes) content_type = 'application/feed+json' else: abort(404) # Cache the result feed_cache.set(format, 50, checksum, content, content_type) return Response( content, mimetype=content_type, headers={'X-Cache': 'MISS'} ) ``` ## Statistics Dashboard ### Dashboard Design ```python class SyndicationStats: """Collect and analyze syndication statistics""" def __init__(self): self.requests = defaultdict(int) # By format self.user_agents = defaultdict(int) self.generation_times = defaultdict(list) self.errors = deque(maxlen=100) def record_request(self, format: str, user_agent: str, cached: bool, generation_time: Optional[float] = None): """Record feed request""" self.requests[format] += 1 self.user_agents[self._normalize_user_agent(user_agent)] += 1 if generation_time is not None: self.generation_times[format].append(generation_time) # Keep only last 1000 times if len(self.generation_times[format]) > 1000: self.generation_times[format] = self.generation_times[format][-1000:] def record_error(self, format: str, error: str): """Record feed generation error""" self.errors.append({ 'timestamp': datetime.now(), 'format': format, 'error': error }) def get_summary(self) -> Dict[str, Any]: """Get statistics summary""" total_requests = sum(self.requests.values()) # Calculate format distribution format_distribution = { format: (count / total_requests * 100) if total_requests > 0 else 0 for format, count in self.requests.items() } # Top user agents top_agents = sorted( self.user_agents.items(), key=lambda x: x[1], reverse=True )[:10] # Generation time stats time_stats = {} for format, times in self.generation_times.items(): if times: sorted_times = sorted(times) time_stats[format] = { 'avg': sum(times) / len(times), 'p50': sorted_times[len(times) // 2], 'p95': sorted_times[int(len(times) * 0.95)], 'p99': sorted_times[int(len(times) * 0.99)] } return { 'total_requests': total_requests, 'format_distribution': format_distribution, 'top_user_agents': top_agents, 'generation_times': time_stats, 'recent_errors': list(self.errors) } def _normalize_user_agent(self, user_agent: str) -> str: """Normalize user agent for grouping""" if not user_agent: return 'Unknown' # Common patterns patterns = [ (r'Feedly', 'Feedly'), (r'Inoreader', 'Inoreader'), (r'NewsBlur', 'NewsBlur'), (r'Tiny Tiny RSS', 'Tiny Tiny RSS'), (r'FreshRSS', 'FreshRSS'), (r'NetNewsWire', 'NetNewsWire'), (r'Feedbin', 'Feedbin'), (r'bot|Bot|crawler|Crawler', 'Bot/Crawler'), (r'Mozilla.*Firefox', 'Firefox'), (r'Mozilla.*Chrome', 'Chrome'), (r'Mozilla.*Safari', 'Safari') ] import re for pattern, name in patterns: if re.search(pattern, user_agent): return name return 'Other' ``` ### Dashboard Template ```html {% extends "admin/base.html" %} {% block title %}Syndication Dashboard{% endblock %} {% block content %}

Syndication Statistics

Total Requests

{{ stats.total_requests }}

Cache Hit Rate

{{ cache_stats.hit_rate|round(1) }}%

Active Formats

{{ stats.format_distribution|length }}

Cache Memory

{{ cache_stats.memory_mb|round(2) }}MB

Format Distribution

Top Feed Readers

{% for agent, count in stats.top_user_agents %} {% endfor %}
Reader Requests Percentage
{{ agent }} {{ count }} {{ (count / stats.total_requests * 100)|round(1) }}%

Generation Performance

{% for format, times in stats.generation_times.items() %} {% endfor %}
Format Avg (ms) P50 (ms) P95 (ms) P99 (ms)
{{ format|upper }} {{ (times.avg * 1000)|round(1) }} {{ (times.p50 * 1000)|round(1) }} {{ (times.p95 * 1000)|round(1) }} {{ (times.p99 * 1000)|round(1) }}
{% if stats.recent_errors %}

Recent Errors

    {% for error in stats.recent_errors[-10:] %}
  • {{ error.timestamp|timeago }} {{ error.format }} {{ error.error }}
  • {% endfor %}
{% endif %}

Available Feeds

  • RSS: {{ url_for('serve_feed', format='rss', _external=True) }}
  • ATOM: {{ url_for('serve_feed', format='atom', _external=True) }}
  • JSON: {{ url_for('serve_feed', format='json', _external=True) }}
  • OPML: {{ url_for('export_opml', _external=True) }}
{% endblock %} ``` ## OPML Export ### OPML Generator ```python from xml.etree.ElementTree import Element, SubElement, tostring from xml.dom import minidom class OPMLGenerator: """Generate OPML 2.0 feed list""" def __init__(self, site_url: str, site_name: str, owner_name: str = None, owner_email: str = None): self.site_url = site_url.rstrip('/') self.site_name = site_name self.owner_name = owner_name self.owner_email = owner_email def generate(self, include_formats: List[str] = None) -> str: """Generate OPML document Args: include_formats: List of formats to include (default: all enabled) Returns: OPML 2.0 XML string """ if not include_formats: include_formats = ['rss', 'atom', 'json'] # Create root element opml = Element('opml', version='2.0') # Add head head = SubElement(opml, 'head') SubElement(head, 'title').text = f"{self.site_name} Feeds" SubElement(head, 'dateCreated').text = datetime.now(timezone.utc).strftime( '%a, %d %b %Y %H:%M:%S %z' ) SubElement(head, 'dateModified').text = datetime.now(timezone.utc).strftime( '%a, %d %b %Y %H:%M:%S %z' ) if self.owner_name: SubElement(head, 'ownerName').text = self.owner_name if self.owner_email: SubElement(head, 'ownerEmail').text = self.owner_email # Add body with outlines body = SubElement(opml, 'body') # Add feed outlines if 'rss' in include_formats: SubElement(body, 'outline', type='rss', text=f"{self.site_name} - RSS Feed", title=f"{self.site_name} - RSS Feed", xmlUrl=f"{self.site_url}/feed.xml", htmlUrl=self.site_url) if 'atom' in include_formats: SubElement(body, 'outline', type='atom', text=f"{self.site_name} - ATOM Feed", title=f"{self.site_name} - ATOM Feed", xmlUrl=f"{self.site_url}/feed.atom", htmlUrl=self.site_url) if 'json' in include_formats: SubElement(body, 'outline', type='json', text=f"{self.site_name} - JSON Feed", title=f"{self.site_name} - JSON Feed", xmlUrl=f"{self.site_url}/feed.json", htmlUrl=self.site_url) # Convert to pretty XML rough_string = tostring(opml, encoding='unicode') reparsed = minidom.parseString(rough_string) return reparsed.toprettyxml(indent=' ', encoding='UTF-8').decode('utf-8') ``` ### OPML Example Output ```xml StarPunk Notes Feeds Mon, 25 Nov 2024 12:00:00 +0000 Mon, 25 Nov 2024 12:00:00 +0000 John Doe john@example.com ``` ## Testing Strategy ### Content Negotiation Tests ```python def test_content_negotiation(): """Test Accept header parsing and format selection""" negotiator = ContentNegotiator() # Test exact matches assert negotiator.negotiate('application/atom+xml') == 'atom' assert negotiator.negotiate('application/feed+json') == 'json' assert negotiator.negotiate('application/rss+xml') == 'rss' # Test quality factors assert negotiator.negotiate('application/atom+xml;q=0.8, application/rss+xml') == 'rss' # Test wildcards assert negotiator.negotiate('*/*') == 'rss' # Default assert negotiator.negotiate('application/*') == 'rss' # First application type # Test no preference assert negotiator.negotiate('') == 'rss' assert negotiator.negotiate('text/html') == 'rss' ``` ### Cache Tests ```python def test_feed_cache(): """Test LRU cache with TTL""" cache = FeedCache(max_size=3, default_ttl=1) # Test set and get cache.set('rss', 50, 'abc123', 'content', 'application/rss+xml') entry = cache.get('rss', 50, 'abc123') assert entry is not None assert entry.content == 'content' # Test expiration time.sleep(1.1) entry = cache.get('rss', 50, 'abc123') assert entry is None # Test LRU eviction cache.set('rss', 50, 'aaa', 'content1', 'application/rss+xml') cache.set('atom', 50, 'bbb', 'content2', 'application/atom+xml') cache.set('json', 50, 'ccc', 'content3', 'application/json') cache.set('rss', 100, 'ddd', 'content4', 'application/rss+xml') # Evicts oldest assert cache.get('rss', 50, 'aaa') is None # Evicted assert cache.get('atom', 50, 'bbb') is not None # Still present ``` ### Statistics Tests ```python def test_syndication_stats(): """Test statistics collection""" stats = SyndicationStats() # Record requests stats.record_request('rss', 'Feedly/1.0', cached=False, generation_time=0.05) stats.record_request('atom', 'Inoreader/1.0', cached=True) stats.record_request('json', 'NetNewsWire/6.0', cached=False, generation_time=0.03) summary = stats.get_summary() assert summary['total_requests'] == 3 assert 'rss' in summary['format_distribution'] assert len(summary['top_user_agents']) > 0 ``` ### OPML Tests ```python def test_opml_generation(): """Test OPML export""" generator = OPMLGenerator( site_url='https://example.com', site_name='Test Site', owner_name='John Doe' ) opml = generator.generate(['rss', 'atom', 'json']) # Parse and validate import xml.etree.ElementTree as ET root = ET.fromstring(opml) assert root.tag == 'opml' assert root.get('version') == '2.0' # Check outlines outlines = root.findall('.//outline') assert len(outlines) == 3 assert outlines[0].get('type') == 'rss' assert outlines[1].get('type') == 'atom' assert outlines[2].get('type') == 'json' ``` ## Performance Benchmarks ### Negotiation Performance ```python def benchmark_content_negotiation(): """Benchmark negotiation speed""" negotiator = ContentNegotiator() complex_header = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' start = time.perf_counter() for _ in range(10000): negotiator.negotiate(complex_header) duration = time.perf_counter() - start per_call = (duration / 10000) * 1000 # Convert to ms assert per_call < 1.0 # Less than 1ms per negotiation ``` ## Configuration ```ini # Content negotiation STARPUNK_FEED_NEGOTIATION_ENABLED=true STARPUNK_FEED_DEFAULT_FORMAT=rss # Cache settings STARPUNK_FEED_CACHE_ENABLED=true STARPUNK_FEED_CACHE_SIZE=100 STARPUNK_FEED_CACHE_TTL=300 STARPUNK_FEED_CACHE_MEMORY_LIMIT=10 # MB # Statistics STARPUNK_FEED_STATS_ENABLED=true STARPUNK_FEED_STATS_RETENTION=7 # days # OPML STARPUNK_FEED_OPML_ENABLED=true STARPUNK_FEED_OPML_OWNER_NAME= STARPUNK_FEED_OPML_OWNER_EMAIL= ``` ## Security Considerations 1. **Cache Poisoning**: Validate all cached content 2. **Header Injection**: Sanitize Accept headers 3. **Memory Exhaustion**: Limit cache size 4. **Statistics Privacy**: Don't log sensitive data 5. **OPML Injection**: Escape all XML content ## Acceptance Criteria 1. ✅ Content negotiation working correctly 2. ✅ Cache hit rate >80% achieved 3. ✅ Statistics dashboard functional 4. ✅ OPML export valid 5. ✅ Memory usage bounded 6. ✅ Performance targets met 7. ✅ All formats properly cached 8. ✅ Invalidation working 9. ✅ User agent detection accurate 10. ✅ Security review passed