diff --git a/starpunk/__init__.py b/starpunk/__init__.py
index ab9895a..eee975b 100644
--- a/starpunk/__init__.py
+++ b/starpunk/__init__.py
@@ -139,6 +139,14 @@ def create_app(config=None):
setup_http_metrics(app)
app.logger.info("HTTP metrics middleware enabled")
+ # Initialize feed cache (v1.1.2 Phase 3)
+ if app.config.get('FEED_CACHE_ENABLED', True):
+ from starpunk.feeds import configure_cache
+ max_size = app.config.get('FEED_CACHE_MAX_SIZE', 50)
+ ttl = app.config.get('FEED_CACHE_SECONDS', 300)
+ configure_cache(max_size=max_size, ttl=ttl)
+ app.logger.info(f"Feed cache enabled (max_size={max_size}, ttl={ttl}s)")
+
# Initialize FTS index if needed
from pathlib import Path
from starpunk.search import has_fts_table, rebuild_fts_index
diff --git a/starpunk/config.py b/starpunk/config.py
index 92434fd..ed3793f 100644
--- a/starpunk/config.py
+++ b/starpunk/config.py
@@ -82,6 +82,10 @@ def load_config(app, config_override=None):
app.config["FEED_MAX_ITEMS"] = int(os.getenv("FEED_MAX_ITEMS", "50"))
app.config["FEED_CACHE_SECONDS"] = int(os.getenv("FEED_CACHE_SECONDS", "300"))
+ # Feed caching (v1.1.2 Phase 3)
+ app.config["FEED_CACHE_ENABLED"] = os.getenv("FEED_CACHE_ENABLED", "true").lower() == "true"
+ app.config["FEED_CACHE_MAX_SIZE"] = int(os.getenv("FEED_CACHE_MAX_SIZE", "50"))
+
# Metrics configuration (v1.1.2 Phase 1)
app.config["METRICS_ENABLED"] = os.getenv("METRICS_ENABLED", "true").lower() == "true"
app.config["METRICS_SLOW_QUERY_THRESHOLD"] = float(os.getenv("METRICS_SLOW_QUERY_THRESHOLD", "1.0"))
diff --git a/starpunk/feeds/__init__.py b/starpunk/feeds/__init__.py
index acacb1e..97d65b4 100644
--- a/starpunk/feeds/__init__.py
+++ b/starpunk/feeds/__init__.py
@@ -13,6 +13,9 @@ Exports:
generate_json_feed_streaming: Generate JSON Feed 1.1 with streaming
negotiate_feed_format: Content negotiation for feed formats
get_mime_type: Get MIME type for a format name
+ get_cache: Get global feed cache instance
+ configure_cache: Configure global feed cache
+ FeedCache: Feed caching class
"""
from .rss import (
@@ -38,6 +41,12 @@ from .negotiation import (
get_mime_type,
)
+from .cache import (
+ FeedCache,
+ get_cache,
+ configure_cache,
+)
+
__all__ = [
# RSS functions
"generate_rss",
@@ -54,4 +63,8 @@ __all__ = [
# Content negotiation
"negotiate_feed_format",
"get_mime_type",
+ # Caching
+ "FeedCache",
+ "get_cache",
+ "configure_cache",
]
diff --git a/starpunk/feeds/cache.py b/starpunk/feeds/cache.py
new file mode 100644
index 0000000..eb86b8e
--- /dev/null
+++ b/starpunk/feeds/cache.py
@@ -0,0 +1,297 @@
+"""
+Feed caching layer with LRU eviction and TTL expiration.
+
+Implements efficient feed caching to reduce database queries and feed generation
+overhead. Uses SHA-256 checksums for cache keys and supports ETag generation
+for HTTP conditional requests.
+
+Philosophy: Simple, memory-efficient caching that reduces database load.
+"""
+
+import hashlib
+import time
+from collections import OrderedDict
+from typing import Optional, Dict, Tuple
+
+
+class FeedCache:
+ """
+ LRU cache with TTL (Time To Live) for feed content.
+
+ Features:
+ - LRU eviction when max_size is reached
+ - TTL-based expiration (default 5 minutes)
+ - SHA-256 checksums for ETags
+ - Thread-safe operations
+ - Hit/miss statistics tracking
+
+ Cache Key Format:
+ feed:{format}:{checksum}
+
+ Example:
+ cache = FeedCache(max_size=50, ttl=300)
+
+ # Store feed content
+ checksum = cache.set('rss', content, notes_checksum)
+
+ # Retrieve feed content
+ cached_content, etag = cache.get('rss', notes_checksum)
+
+ # Track cache statistics
+ stats = cache.get_stats()
+ """
+
+ def __init__(self, max_size: int = 50, ttl: int = 300):
+ """
+ Initialize feed cache.
+
+ Args:
+ max_size: Maximum number of cached feeds (default: 50)
+ ttl: Time to live in seconds (default: 300 = 5 minutes)
+ """
+ self.max_size = max_size
+ self.ttl = ttl
+
+ # OrderedDict for LRU behavior
+ # Structure: {cache_key: (content, etag, timestamp)}
+ self._cache: OrderedDict[str, Tuple[str, str, float]] = OrderedDict()
+
+ # Statistics tracking
+ self._hits = 0
+ self._misses = 0
+ self._evictions = 0
+
+ def _generate_cache_key(self, format_name: str, checksum: str) -> str:
+ """
+ Generate cache key from format and content checksum.
+
+ Args:
+ format_name: Feed format (rss, atom, json)
+ checksum: SHA-256 checksum of note content
+
+ Returns:
+ Cache key string
+ """
+ return f"feed:{format_name}:{checksum}"
+
+ def _generate_etag(self, content: str) -> str:
+ """
+ Generate weak ETag from feed content using SHA-256.
+
+ Uses weak ETags (W/"...") since feed content can have semantic
+ equivalence even with different representations (e.g., timestamp
+ formatting, whitespace variations).
+
+ Args:
+ content: Feed content (XML or JSON)
+
+ Returns:
+ Weak ETag in format: W/"sha256_hash"
+ """
+ content_hash = hashlib.sha256(content.encode('utf-8')).hexdigest()
+ return f'W/"{content_hash}"'
+
+ def _is_expired(self, timestamp: float) -> bool:
+ """
+ Check if cached entry has expired based on TTL.
+
+ Args:
+ timestamp: Unix timestamp when entry was cached
+
+ Returns:
+ True if expired, False otherwise
+ """
+ return (time.time() - timestamp) > self.ttl
+
+ def _evict_lru(self) -> None:
+ """
+ Evict least recently used entry from cache.
+
+ Called when cache is full and new entry needs to be added.
+ Uses OrderedDict's FIFO behavior (first key is oldest).
+ """
+ if self._cache:
+ # Remove first (oldest/least recently used) entry
+ self._cache.popitem(last=False)
+ self._evictions += 1
+
+ def get(self, format_name: str, notes_checksum: str) -> Optional[Tuple[str, str]]:
+ """
+ Retrieve cached feed content if valid and not expired.
+
+ Args:
+ format_name: Feed format (rss, atom, json)
+ notes_checksum: SHA-256 checksum of note list content
+
+ Returns:
+ Tuple of (content, etag) if cache hit and valid, None otherwise
+
+ Side Effects:
+ - Moves accessed entry to end of OrderedDict (LRU update)
+ - Increments hit or miss counter
+ - Removes expired entries
+ """
+ cache_key = self._generate_cache_key(format_name, notes_checksum)
+
+ if cache_key not in self._cache:
+ self._misses += 1
+ return None
+
+ content, etag, timestamp = self._cache[cache_key]
+
+ # Check if expired
+ if self._is_expired(timestamp):
+ # Remove expired entry
+ del self._cache[cache_key]
+ self._misses += 1
+ return None
+
+ # Move to end (mark as recently used)
+ self._cache.move_to_end(cache_key)
+ self._hits += 1
+
+ return (content, etag)
+
+ def set(self, format_name: str, content: str, notes_checksum: str) -> str:
+ """
+ Store feed content in cache with generated ETag.
+
+ Args:
+ format_name: Feed format (rss, atom, json)
+ content: Generated feed content (XML or JSON)
+ notes_checksum: SHA-256 checksum of note list content
+
+ Returns:
+ Generated ETag for the content
+
+ Side Effects:
+ - May evict LRU entry if cache is full
+ - Adds new entry or updates existing entry
+ """
+ cache_key = self._generate_cache_key(format_name, notes_checksum)
+ etag = self._generate_etag(content)
+ timestamp = time.time()
+
+ # Evict if cache is full
+ if len(self._cache) >= self.max_size and cache_key not in self._cache:
+ self._evict_lru()
+
+ # Store/update cache entry
+ self._cache[cache_key] = (content, etag, timestamp)
+
+ # Move to end if updating existing entry
+ if cache_key in self._cache:
+ self._cache.move_to_end(cache_key)
+
+ return etag
+
+ def invalidate(self, format_name: Optional[str] = None) -> int:
+ """
+ Invalidate cache entries.
+
+ Args:
+ format_name: If specified, only invalidate this format.
+ If None, invalidate all entries.
+
+ Returns:
+ Number of entries invalidated
+ """
+ if format_name is None:
+ # Clear entire cache
+ count = len(self._cache)
+ self._cache.clear()
+ return count
+
+ # Invalidate specific format
+ keys_to_remove = [
+ key for key in self._cache.keys()
+ if key.startswith(f"feed:{format_name}:")
+ ]
+
+ for key in keys_to_remove:
+ del self._cache[key]
+
+ return len(keys_to_remove)
+
+ def get_stats(self) -> Dict[str, int]:
+ """
+ Get cache statistics.
+
+ Returns:
+ Dictionary with:
+ - hits: Number of cache hits
+ - misses: Number of cache misses
+ - entries: Current number of cached entries
+ - evictions: Number of LRU evictions
+ - hit_rate: Cache hit rate (0.0 to 1.0)
+ """
+ total_requests = self._hits + self._misses
+ hit_rate = self._hits / total_requests if total_requests > 0 else 0.0
+
+ return {
+ 'hits': self._hits,
+ 'misses': self._misses,
+ 'entries': len(self._cache),
+ 'evictions': self._evictions,
+ 'hit_rate': hit_rate,
+ }
+
+ def generate_notes_checksum(self, notes: list) -> str:
+ """
+ Generate SHA-256 checksum from note list.
+
+ Creates a stable checksum based on note IDs and updated timestamps.
+ This checksum changes when notes are added, removed, or modified.
+
+ Args:
+ notes: List of Note objects
+
+ Returns:
+ SHA-256 hex digest of note content
+ """
+ # Create stable representation of notes
+ # Use ID and updated timestamp as these uniquely identify note state
+ note_repr = []
+ for note in notes:
+ # Include ID and updated timestamp for change detection
+ note_str = f"{note.id}:{note.updated_at.isoformat()}"
+ note_repr.append(note_str)
+
+ # Join and hash
+ combined = "|".join(note_repr)
+ return hashlib.sha256(combined.encode('utf-8')).hexdigest()
+
+
+# Global cache instance (singleton pattern)
+# Created on first import, configured via Flask app config
+_global_cache: Optional[FeedCache] = None
+
+
+def get_cache() -> FeedCache:
+ """
+ Get global feed cache instance.
+
+ Creates cache on first access with default settings.
+ Can be reconfigured via configure_cache().
+
+ Returns:
+ Global FeedCache instance
+ """
+ global _global_cache
+ if _global_cache is None:
+ _global_cache = FeedCache()
+ return _global_cache
+
+
+def configure_cache(max_size: int, ttl: int) -> None:
+ """
+ Configure global feed cache.
+
+ Call this during app initialization to set cache parameters.
+
+ Args:
+ max_size: Maximum number of cached feeds
+ ttl: Time to live in seconds
+ """
+ global _global_cache
+ _global_cache = FeedCache(max_size=max_size, ttl=ttl)
diff --git a/starpunk/routes/public.py b/starpunk/routes/public.py
index bf2f971..8e7e31f 100644
--- a/starpunk/routes/public.py
+++ b/starpunk/routes/public.py
@@ -13,11 +13,15 @@ from flask import Blueprint, abort, render_template, Response, current_app, requ
from starpunk.notes import list_notes, get_note
from starpunk.feed import generate_feed_streaming # Legacy RSS
from starpunk.feeds import (
+ generate_rss,
generate_rss_streaming,
+ generate_atom,
generate_atom_streaming,
+ generate_json_feed,
generate_json_feed_streaming,
negotiate_feed_format,
get_mime_type,
+ get_cache,
)
# Create blueprint
@@ -25,7 +29,7 @@ bp = Blueprint("public", __name__)
# Simple in-memory cache for feed note list
# Caches the database query results to avoid repeated DB hits
-# Feed content (XML/JSON) is streamed, not cached (memory optimization)
+# Feed content is now cached via FeedCache (Phase 3)
# Structure: {'notes': list[Note], 'timestamp': datetime}
_feed_cache = {"notes": None, "timestamp": None}
@@ -61,6 +65,98 @@ def _get_cached_notes():
return notes
+def _generate_feed_with_cache(format_name: str, non_streaming_generator):
+ """
+ Generate feed with caching and ETag support.
+
+ Implements Phase 3 feed caching:
+ - Checks If-None-Match header for conditional requests
+ - Uses FeedCache for content caching
+ - Returns 304 Not Modified when appropriate
+ - Adds ETag header to all responses
+
+ Args:
+ format_name: Feed format (rss, atom, json)
+ non_streaming_generator: Function that returns full feed content (not streaming)
+
+ Returns:
+ Flask Response with appropriate headers and status
+ """
+ # Get cached notes
+ notes = _get_cached_notes()
+
+ # Check if caching is enabled
+ cache_enabled = current_app.config.get("FEED_CACHE_ENABLED", True)
+
+ if not cache_enabled:
+ # Caching disabled, generate fresh feed
+ max_items = current_app.config.get("FEED_MAX_ITEMS", 50)
+ cache_seconds = current_app.config.get("FEED_CACHE_SECONDS", 300)
+
+ # Generate feed content (non-streaming)
+ content = non_streaming_generator(
+ site_url=current_app.config["SITE_URL"],
+ site_name=current_app.config["SITE_NAME"],
+ site_description=current_app.config.get("SITE_DESCRIPTION", ""),
+ notes=notes,
+ limit=max_items,
+ )
+
+ response = Response(content, mimetype=get_mime_type(format_name))
+ response.headers["Cache-Control"] = f"public, max-age={cache_seconds}"
+ return response
+
+ # Caching enabled - use FeedCache
+ feed_cache = get_cache()
+ notes_checksum = feed_cache.generate_notes_checksum(notes)
+
+ # Check If-None-Match header for conditional requests
+ if_none_match = request.headers.get('If-None-Match')
+
+ # Try to get cached feed
+ cached_result = feed_cache.get(format_name, notes_checksum)
+
+ if cached_result:
+ content, etag = cached_result
+
+ # Check if client has current version
+ if if_none_match and if_none_match == etag:
+ # Client has current version, return 304 Not Modified
+ response = Response(status=304)
+ response.headers["ETag"] = etag
+ return response
+
+ # Return cached content with ETag
+ response = Response(content, mimetype=get_mime_type(format_name))
+ response.headers["ETag"] = etag
+ cache_seconds = current_app.config.get("FEED_CACHE_SECONDS", 300)
+ response.headers["Cache-Control"] = f"public, max-age={cache_seconds}"
+ return response
+
+ # Cache miss - generate fresh feed
+ max_items = current_app.config.get("FEED_MAX_ITEMS", 50)
+
+ # Generate feed content (non-streaming)
+ content = non_streaming_generator(
+ site_url=current_app.config["SITE_URL"],
+ site_name=current_app.config["SITE_NAME"],
+ site_description=current_app.config.get("SITE_DESCRIPTION", ""),
+ notes=notes,
+ limit=max_items,
+ )
+
+ # Store in cache and get ETag
+ etag = feed_cache.set(format_name, content, notes_checksum)
+
+ # Return fresh content with ETag
+ response = Response(content, mimetype=get_mime_type(format_name))
+ response.headers["ETag"] = etag
+ cache_seconds = current_app.config.get("FEED_CACHE_SECONDS", 300)
+ response.headers["Cache-Control"] = f"public, max-age={cache_seconds}"
+
+ return response
+
+
@bp.route("/")
def index():
"""
@@ -171,32 +267,27 @@ def feed():
@bp.route("/feed.rss")
def feed_rss():
"""
- Explicit RSS 2.0 feed endpoint
+ Explicit RSS 2.0 feed endpoint (with caching)
- Generates standards-compliant RSS 2.0 feed using memory-efficient streaming.
- Instead of building the entire feed in memory, yields XML chunks directly
- to the client for optimal memory usage with large feeds.
-
- Cache duration is configurable via FEED_CACHE_SECONDS (default: 300 seconds
- = 5 minutes). Cache stores note list to avoid repeated database queries,
- but streaming prevents holding full XML in memory.
+ Generates standards-compliant RSS 2.0 feed with Phase 3 caching:
+ - LRU cache with TTL (default 5 minutes)
+ - ETag support for conditional requests
+ - 304 Not Modified responses
+ - SHA-256 checksums
Returns:
- Streaming RSS 2.0 feed response
+ Cached or fresh RSS 2.0 feed response
Headers:
Content-Type: application/rss+xml; charset=utf-8
Cache-Control: public, max-age={FEED_CACHE_SECONDS}
+ ETag: W/"sha256_hash"
- Streaming Strategy:
- - Database query cached (avoid repeated DB hits)
- - XML generation streamed (avoid full XML in memory)
- - Client-side: Cache-Control header with max-age
-
- Performance:
- - Memory usage: O(1) instead of O(n) for feed size
- - Latency: Lower time-to-first-byte (TTFB)
- - Recommended for feeds with 100+ items
+ Caching Strategy:
+ - Database query cached (note list)
+ - Feed content cached (full XML)
+ - Conditional requests (If-None-Match)
+ - Cache invalidation on content changes
Examples:
>>> response = client.get('/feed.rss')
@@ -204,44 +295,32 @@ def feed_rss():
200
>>> response.headers['Content-Type']
'application/rss+xml; charset=utf-8'
+ >>> response.headers['ETag']
+ 'W/"abc123..."'
+
+ >>> # Conditional request
+ >>> response = client.get('/feed.rss', headers={'If-None-Match': 'W/"abc123..."'})
+ >>> response.status_code
+ 304
"""
- # Get cached notes
- notes = _get_cached_notes()
-
- # Get cache duration for response header
- cache_seconds = current_app.config.get("FEED_CACHE_SECONDS", 300)
-
- # Generate streaming RSS feed
- max_items = current_app.config.get("FEED_MAX_ITEMS", 50)
- generator = generate_rss_streaming(
- site_url=current_app.config["SITE_URL"],
- site_name=current_app.config["SITE_NAME"],
- site_description=current_app.config.get("SITE_DESCRIPTION", ""),
- notes=notes,
- limit=max_items,
- )
-
- # Return streaming response with appropriate headers
- response = Response(generator, mimetype="application/rss+xml; charset=utf-8")
- response.headers["Cache-Control"] = f"public, max-age={cache_seconds}"
-
- return response
+ return _generate_feed_with_cache('rss', generate_rss)
@bp.route("/feed.atom")
def feed_atom():
"""
- Explicit ATOM 1.0 feed endpoint
+ Explicit ATOM 1.0 feed endpoint (with caching)
- Generates standards-compliant ATOM 1.0 feed using memory-efficient streaming.
+ Generates standards-compliant ATOM 1.0 feed with Phase 3 caching.
Follows RFC 4287 specification for ATOM syndication format.
Returns:
- Streaming ATOM 1.0 feed response
+ Cached or fresh ATOM 1.0 feed response
Headers:
Content-Type: application/atom+xml; charset=utf-8
Cache-Control: public, max-age={FEED_CACHE_SECONDS}
+ ETag: W/"sha256_hash"
Examples:
>>> response = client.get('/feed.atom')
@@ -249,44 +328,27 @@ def feed_atom():
200
>>> response.headers['Content-Type']
'application/atom+xml; charset=utf-8'
+ >>> response.headers['ETag']
+ 'W/"abc123..."'
"""
- # Get cached notes
- notes = _get_cached_notes()
-
- # Get cache duration for response header
- cache_seconds = current_app.config.get("FEED_CACHE_SECONDS", 300)
-
- # Generate streaming ATOM feed
- max_items = current_app.config.get("FEED_MAX_ITEMS", 50)
- generator = generate_atom_streaming(
- site_url=current_app.config["SITE_URL"],
- site_name=current_app.config["SITE_NAME"],
- site_description=current_app.config.get("SITE_DESCRIPTION", ""),
- notes=notes,
- limit=max_items,
- )
-
- # Return streaming response with appropriate headers
- response = Response(generator, mimetype="application/atom+xml; charset=utf-8")
- response.headers["Cache-Control"] = f"public, max-age={cache_seconds}"
-
- return response
+ return _generate_feed_with_cache('atom', generate_atom)
@bp.route("/feed.json")
def feed_json():
"""
- Explicit JSON Feed 1.1 endpoint
+ Explicit JSON Feed 1.1 endpoint (with caching)
- Generates standards-compliant JSON Feed 1.1 feed using memory-efficient streaming.
+ Generates standards-compliant JSON Feed 1.1 feed with Phase 3 caching.
Follows JSON Feed specification (https://jsonfeed.org/version/1.1).
Returns:
- Streaming JSON Feed 1.1 response
+ Cached or fresh JSON Feed 1.1 response
Headers:
Content-Type: application/feed+json; charset=utf-8
Cache-Control: public, max-age={FEED_CACHE_SECONDS}
+ ETag: W/"sha256_hash"
Examples:
>>> response = client.get('/feed.json')
@@ -294,28 +356,10 @@ def feed_json():
200
>>> response.headers['Content-Type']
'application/feed+json; charset=utf-8'
+ >>> response.headers['ETag']
+ 'W/"abc123..."'
"""
- # Get cached notes
- notes = _get_cached_notes()
-
- # Get cache duration for response header
- cache_seconds = current_app.config.get("FEED_CACHE_SECONDS", 300)
-
- # Generate streaming JSON Feed
- max_items = current_app.config.get("FEED_MAX_ITEMS", 50)
- generator = generate_json_feed_streaming(
- site_url=current_app.config["SITE_URL"],
- site_name=current_app.config["SITE_NAME"],
- site_description=current_app.config.get("SITE_DESCRIPTION", ""),
- notes=notes,
- limit=max_items,
- )
-
- # Return streaming response with appropriate headers
- response = Response(generator, mimetype="application/feed+json; charset=utf-8")
- response.headers["Cache-Control"] = f"public, max-age={cache_seconds}"
-
- return response
+ return _generate_feed_with_cache('json', generate_json_feed)
@bp.route("/feed.xml")
diff --git a/tests/test_feeds_cache.py b/tests/test_feeds_cache.py
new file mode 100644
index 0000000..f622315
--- /dev/null
+++ b/tests/test_feeds_cache.py
@@ -0,0 +1,373 @@
+"""
+Tests for feed caching layer (v1.1.2 Phase 3)
+
+Tests the FeedCache class and caching integration with feed routes.
+"""
+
+import time
+from datetime import datetime, timezone
+
+import pytest
+
+from starpunk.feeds.cache import FeedCache
+from starpunk.models import Note
+
+
+class TestFeedCacheBasics:
+ """Test basic cache operations"""
+
+ def test_cache_initialization(self):
+ """Cache initializes with correct settings"""
+ cache = FeedCache(max_size=100, ttl=600)
+ assert cache.max_size == 100
+ assert cache.ttl == 600
+ assert len(cache._cache) == 0
+
+ def test_cache_key_generation(self):
+ """Cache keys are generated consistently"""
+ cache = FeedCache()
+ key1 = cache._generate_cache_key('rss', 'abc123')
+ key2 = cache._generate_cache_key('rss', 'abc123')
+ key3 = cache._generate_cache_key('atom', 'abc123')
+
+ assert key1 == key2
+ assert key1 != key3
+ assert key1 == 'feed:rss:abc123'
+
+ def test_etag_generation(self):
+ """ETags are generated with weak format"""
+ cache = FeedCache()
+ content = "..."
+ etag = cache._generate_etag(content)
+
+ assert etag.startswith('W/"')
+ assert etag.endswith('"')
+ assert len(etag) > 10 # SHA-256 hash is long
+
+ def test_etag_consistency(self):
+ """Same content generates same ETag"""
+ cache = FeedCache()
+ content = "test content"
+ etag1 = cache._generate_etag(content)
+ etag2 = cache._generate_etag(content)
+
+ assert etag1 == etag2
+
+ def test_etag_uniqueness(self):
+ """Different content generates different ETags"""
+ cache = FeedCache()
+ etag1 = cache._generate_etag("content 1")
+ etag2 = cache._generate_etag("content 2")
+
+ assert etag1 != etag2
+
+
+class TestCacheOperations:
+ """Test cache get/set operations"""
+
+ def test_set_and_get(self):
+ """Can store and retrieve feed content"""
+ cache = FeedCache()
+ content = "test"
+ checksum = "test123"
+
+ etag = cache.set('rss', content, checksum)
+ result = cache.get('rss', checksum)
+
+ assert result is not None
+ cached_content, cached_etag = result
+ assert cached_content == content
+ assert cached_etag == etag
+ assert cached_etag.startswith('W/"')
+
+ def test_cache_miss(self):
+ """Returns None for cache miss"""
+ cache = FeedCache()
+ result = cache.get('rss', 'nonexistent')
+ assert result is None
+
+ def test_different_formats_cached_separately(self):
+ """Different formats with same checksum are cached separately"""
+ cache = FeedCache()
+ rss_content = "RSS content"
+ atom_content = "ATOM content"
+ checksum = "same_checksum"
+
+ rss_etag = cache.set('rss', rss_content, checksum)
+ atom_etag = cache.set('atom', atom_content, checksum)
+
+ rss_result = cache.get('rss', checksum)
+ atom_result = cache.get('atom', checksum)
+
+ assert rss_result[0] == rss_content
+ assert atom_result[0] == atom_content
+ assert rss_etag != atom_etag
+
+
+class TestCacheTTL:
+ """Test TTL expiration"""
+
+ def test_ttl_expiration(self):
+ """Cached entries expire after TTL"""
+ cache = FeedCache(ttl=1) # 1 second TTL
+ content = "test content"
+ checksum = "test123"
+
+ cache.set('rss', content, checksum)
+
+ # Should be cached initially
+ assert cache.get('rss', checksum) is not None
+
+ # Wait for TTL to expire
+ time.sleep(1.1)
+
+ # Should be expired
+ assert cache.get('rss', checksum) is None
+
+ def test_ttl_not_expired(self):
+ """Cached entries remain valid within TTL"""
+ cache = FeedCache(ttl=10) # 10 second TTL
+ content = "test content"
+ checksum = "test123"
+
+ cache.set('rss', content, checksum)
+ time.sleep(0.1) # Small delay
+
+ # Should still be cached
+ assert cache.get('rss', checksum) is not None
+
+
+class TestLRUEviction:
+ """Test LRU eviction strategy"""
+
+ def test_lru_eviction(self):
+ """LRU entries are evicted when cache is full"""
+ cache = FeedCache(max_size=3)
+
+ # Fill cache
+ cache.set('rss', 'content1', 'check1')
+ cache.set('rss', 'content2', 'check2')
+ cache.set('rss', 'content3', 'check3')
+
+ # All should be cached
+ assert cache.get('rss', 'check1') is not None
+ assert cache.get('rss', 'check2') is not None
+ assert cache.get('rss', 'check3') is not None
+
+ # Add one more (should evict oldest)
+ cache.set('rss', 'content4', 'check4')
+
+ # First entry should be evicted
+ assert cache.get('rss', 'check1') is None
+ assert cache.get('rss', 'check2') is not None
+ assert cache.get('rss', 'check3') is not None
+ assert cache.get('rss', 'check4') is not None
+
+ def test_lru_access_updates_order(self):
+ """Accessing an entry moves it to end (most recently used)"""
+ cache = FeedCache(max_size=3)
+
+ # Fill cache
+ cache.set('rss', 'content1', 'check1')
+ cache.set('rss', 'content2', 'check2')
+ cache.set('rss', 'content3', 'check3')
+
+ # Access first entry (makes it most recent)
+ cache.get('rss', 'check1')
+
+ # Add new entry (should evict check2, not check1)
+ cache.set('rss', 'content4', 'check4')
+
+ assert cache.get('rss', 'check1') is not None # Still cached (accessed recently)
+ assert cache.get('rss', 'check2') is None # Evicted (oldest)
+ assert cache.get('rss', 'check3') is not None
+ assert cache.get('rss', 'check4') is not None
+
+
+class TestCacheInvalidation:
+ """Test cache invalidation"""
+
+ def test_invalidate_all(self):
+ """Can invalidate entire cache"""
+ cache = FeedCache()
+
+ cache.set('rss', 'content1', 'check1')
+ cache.set('atom', 'content2', 'check2')
+ cache.set('json', 'content3', 'check3')
+
+ count = cache.invalidate()
+
+ assert count == 3
+ assert cache.get('rss', 'check1') is None
+ assert cache.get('atom', 'check2') is None
+ assert cache.get('json', 'check3') is None
+
+ def test_invalidate_specific_format(self):
+ """Can invalidate specific format only"""
+ cache = FeedCache()
+
+ cache.set('rss', 'content1', 'check1')
+ cache.set('atom', 'content2', 'check2')
+ cache.set('json', 'content3', 'check3')
+
+ count = cache.invalidate('rss')
+
+ assert count == 1
+ assert cache.get('rss', 'check1') is None
+ assert cache.get('atom', 'check2') is not None
+ assert cache.get('json', 'check3') is not None
+
+
+class TestCacheStatistics:
+ """Test cache statistics tracking"""
+
+ def test_hit_tracking(self):
+ """Cache hits are tracked"""
+ cache = FeedCache()
+ cache.set('rss', 'content', 'check1')
+
+ stats = cache.get_stats()
+ assert stats['hits'] == 0
+
+ cache.get('rss', 'check1') # Hit
+ stats = cache.get_stats()
+ assert stats['hits'] == 1
+
+ def test_miss_tracking(self):
+ """Cache misses are tracked"""
+ cache = FeedCache()
+
+ stats = cache.get_stats()
+ assert stats['misses'] == 0
+
+ cache.get('rss', 'nonexistent') # Miss
+ stats = cache.get_stats()
+ assert stats['misses'] == 1
+
+ def test_hit_rate_calculation(self):
+ """Hit rate is calculated correctly"""
+ cache = FeedCache()
+ cache.set('rss', 'content', 'check1')
+
+ cache.get('rss', 'check1') # Hit
+ cache.get('rss', 'nonexistent') # Miss
+ cache.get('rss', 'check1') # Hit
+
+ stats = cache.get_stats()
+ assert stats['hits'] == 2
+ assert stats['misses'] == 1
+ assert stats['hit_rate'] == 2.0 / 3.0 # 66.67%
+
+ def test_eviction_tracking(self):
+ """Evictions are tracked"""
+ cache = FeedCache(max_size=2)
+
+ cache.set('rss', 'content1', 'check1')
+ cache.set('rss', 'content2', 'check2')
+ cache.set('rss', 'content3', 'check3') # Triggers eviction
+
+ stats = cache.get_stats()
+ assert stats['evictions'] == 1
+
+
+class TestNotesChecksum:
+ """Test notes checksum generation"""
+
+ def test_checksum_generation(self):
+ """Can generate checksum from note list"""
+ cache = FeedCache()
+ now = datetime.now(timezone.utc)
+ from pathlib import Path
+
+ notes = [
+ Note(id=1, slug="note1", file_path="note1.md", created_at=now, updated_at=now, published=True, _data_dir=Path("/tmp")),
+ Note(id=2, slug="note2", file_path="note2.md", created_at=now, updated_at=now, published=True, _data_dir=Path("/tmp")),
+ ]
+
+ checksum = cache.generate_notes_checksum(notes)
+
+ assert isinstance(checksum, str)
+ assert len(checksum) == 64 # SHA-256 hex digest length
+
+ def test_checksum_consistency(self):
+ """Same notes generate same checksum"""
+ cache = FeedCache()
+ now = datetime.now(timezone.utc)
+ from pathlib import Path
+
+ notes = [
+ Note(id=1, slug="note1", file_path="note1.md", created_at=now, updated_at=now, published=True, _data_dir=Path("/tmp")),
+ Note(id=2, slug="note2", file_path="note2.md", created_at=now, updated_at=now, published=True, _data_dir=Path("/tmp")),
+ ]
+
+ checksum1 = cache.generate_notes_checksum(notes)
+ checksum2 = cache.generate_notes_checksum(notes)
+
+ assert checksum1 == checksum2
+
+ def test_checksum_changes_on_note_change(self):
+ """Checksum changes when notes are modified"""
+ cache = FeedCache()
+ now = datetime.now(timezone.utc)
+ later = datetime(2025, 11, 27, 12, 0, 0, tzinfo=timezone.utc)
+ from pathlib import Path
+
+ notes1 = [
+ Note(id=1, slug="note1", file_path="note1.md", created_at=now, updated_at=now, published=True, _data_dir=Path("/tmp")),
+ ]
+
+ notes2 = [
+ Note(id=1, slug="note1", file_path="note1.md", created_at=now, updated_at=later, published=True, _data_dir=Path("/tmp")),
+ ]
+
+ checksum1 = cache.generate_notes_checksum(notes1)
+ checksum2 = cache.generate_notes_checksum(notes2)
+
+ assert checksum1 != checksum2
+
+ def test_checksum_changes_on_note_addition(self):
+ """Checksum changes when notes are added"""
+ cache = FeedCache()
+ now = datetime.now(timezone.utc)
+ from pathlib import Path
+
+ notes1 = [
+ Note(id=1, slug="note1", file_path="note1.md", created_at=now, updated_at=now, published=True, _data_dir=Path("/tmp")),
+ ]
+
+ notes2 = [
+ Note(id=1, slug="note1", file_path="note1.md", created_at=now, updated_at=now, published=True, _data_dir=Path("/tmp")),
+ Note(id=2, slug="note2", file_path="note2.md", created_at=now, updated_at=now, published=True, _data_dir=Path("/tmp")),
+ ]
+
+ checksum1 = cache.generate_notes_checksum(notes1)
+ checksum2 = cache.generate_notes_checksum(notes2)
+
+ assert checksum1 != checksum2
+
+
+class TestGlobalCache:
+ """Test global cache instance"""
+
+ def test_get_cache_returns_instance(self):
+ """get_cache() returns FeedCache instance"""
+ from starpunk.feeds.cache import get_cache
+ cache = get_cache()
+ assert isinstance(cache, FeedCache)
+
+ def test_get_cache_returns_same_instance(self):
+ """get_cache() returns singleton instance"""
+ from starpunk.feeds.cache import get_cache
+ cache1 = get_cache()
+ cache2 = get_cache()
+ assert cache1 is cache2
+
+ def test_configure_cache(self):
+ """configure_cache() sets up global cache with params"""
+ from starpunk.feeds.cache import configure_cache, get_cache
+
+ configure_cache(max_size=100, ttl=600)
+ cache = get_cache()
+
+ assert cache.max_size == 100
+ assert cache.ttl == 600