feat(tests): Phase 0 - Fix flaky and broken tests

Implements Phase 0 of v1.5.0 per ADR-012 and RELEASE.md.

Changes:
- Remove 5 broken multiprocessing tests (TestConcurrentExecution, TestPerformance)
- Fix brittle XML assertion tests (check semantics not quote style)
- Fix test_debug_level_for_early_retries logger configuration
- Rename test_feed_route_streaming to test_feed_route_caching (correct name)

Results:
- Test count: 879 → 874 (5 removed as planned)
- All tests pass consistently (verified across 3 runs)
- No flakiness detected

References:
- ADR-012: Flaky Test Removal and Test Quality Standards
- docs/projectplan/v1.5.0/RELEASE.md Phase 0

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-12-17 09:24:12 -07:00
parent 0acefa4670
commit 92e7bdd342
4 changed files with 140 additions and 154 deletions

View File

@@ -13,11 +13,8 @@ Tests cover:
import pytest
import sqlite3
import tempfile
import time
import multiprocessing
from pathlib import Path
from unittest.mock import patch, MagicMock, call
from multiprocessing import Barrier
from starpunk.migrations import (
MigrationError,
@@ -26,29 +23,6 @@ from starpunk.migrations import (
from starpunk import create_app
# Module-level worker functions for multiprocessing
# (Local functions can't be pickled by multiprocessing.Pool)
def _barrier_worker(args):
"""Worker that waits at barrier then runs migrations"""
db_path, barrier = args
try:
barrier.wait() # All workers start together
run_migrations(str(db_path))
return True
except Exception:
return False
def _simple_worker(db_path):
"""Worker that just runs migrations"""
try:
run_migrations(str(db_path))
return True
except Exception:
return False
@pytest.fixture
def temp_db():
"""Create a temporary database for testing"""
@@ -180,9 +154,6 @@ class TestGraduatedLogging:
"""Test DEBUG level for retries 1-3"""
import logging
# Clear any previous log records to ensure test isolation
caplog.clear()
with patch('time.sleep'):
with patch('sqlite3.connect') as mock_connect:
# Fail 3 times, then succeed
@@ -192,16 +163,16 @@ class TestGraduatedLogging:
errors = [sqlite3.OperationalError("database is locked")] * 3
mock_connect.side_effect = errors + [mock_conn]
# Configure caplog to capture DEBUG level for starpunk.migrations logger
with caplog.at_level(logging.DEBUG, logger='starpunk.migrations'):
caplog.clear() # Clear again inside the context
try:
run_migrations(str(temp_db))
except:
pass
# Check that DEBUG messages were logged for early retries
debug_msgs = [r for r in caplog.records if r.levelname == 'DEBUG' and 'retry' in r.getMessage().lower()]
assert len(debug_msgs) >= 1, f"Expected DEBUG retry messages, got {len(caplog.records)} total records"
# Check that DEBUG messages were logged for early retries
debug_msgs = [r for r in caplog.records if r.levelname == 'DEBUG' and 'retry' in r.getMessage().lower()]
assert len(debug_msgs) >= 1, f"Expected DEBUG retry messages, got {len(caplog.records)} total records"
def test_info_level_for_middle_retries(self, temp_db, caplog):
"""Test INFO level for retries 4-7"""
@@ -300,79 +271,11 @@ class TestConnectionManagement:
mock_connect.assert_called_with(str(temp_db), timeout=30.0)
class TestConcurrentExecution:
"""Test concurrent worker scenarios"""
def test_concurrent_workers_barrier_sync(self):
"""Test multiple workers starting simultaneously with barrier"""
# This test uses actual multiprocessing with barrier synchronization
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
# Initialize database first (simulates deployed app with existing schema)
from starpunk.database import init_db
app = create_app({'DATABASE_PATH': str(db_path), 'SECRET_KEY': 'test'})
init_db(app)
# Create a barrier for 4 workers using Manager (required for multiprocessing)
with multiprocessing.Manager() as manager:
barrier = manager.Barrier(4)
# Run 4 workers concurrently using module-level worker function
# (Pool.map requires picklable functions, so we pass args as tuples)
with multiprocessing.Pool(4) as pool:
# Create args for each worker: (db_path, barrier)
worker_args = [(db_path, barrier) for _ in range(4)]
results = pool.map(_barrier_worker, worker_args)
# All workers should succeed (one applies, others wait)
assert all(results), f"Some workers failed: {results}"
# Verify migrations were applied correctly (outside manager context)
conn = sqlite3.connect(db_path)
cursor = conn.execute("SELECT COUNT(*) FROM schema_migrations")
count = cursor.fetchone()[0]
conn.close()
# Should have migration records
assert count >= 0
def test_sequential_worker_startup(self):
"""Test workers starting one after another"""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
# Initialize database first (creates base schema)
from starpunk.database import init_db
app = create_app({'DATABASE_PATH': str(db_path), 'SECRET_KEY': 'test'})
init_db(app)
# Additional workers should detect completed migrations
run_migrations(str(db_path))
run_migrations(str(db_path))
# All should succeed without errors
def test_worker_late_arrival(self):
"""Test worker arriving after migrations complete"""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
# Initialize database first (creates base schema)
from starpunk.database import init_db
app = create_app({'DATABASE_PATH': str(db_path), 'SECRET_KEY': 'test'})
init_db(app)
# Simulate some time passing
time.sleep(0.1)
# Late worker should detect completed migrations immediately
start_time = time.time()
run_migrations(str(db_path))
elapsed = time.time() - start_time
# Should be very fast (< 1s) since migrations already applied
assert elapsed < 1.0
# TestConcurrentExecution class removed per ADR-012
# These tests cannot work reliably due to Python multiprocessing limitations:
# 1. Barrier objects cannot be pickled for Pool.map()
# 2. Flask app context doesn't transfer across processes
# 3. SQLite database files in temp directories may not be accessible across process boundaries
class TestErrorHandling:
@@ -429,47 +332,8 @@ class TestErrorHandling:
assert "Action:" in error_msg or "Restart" in error_msg
class TestPerformance:
"""Test performance characteristics"""
def test_single_worker_performance(self):
"""Test that single worker completes quickly"""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
# Initialize database and time it
from starpunk.database import init_db
app = create_app({'DATABASE_PATH': str(db_path), 'SECRET_KEY': 'test'})
start_time = time.time()
init_db(app)
elapsed = time.time() - start_time
# Should complete in under 1 second for single worker
assert elapsed < 1.0, f"Single worker took {elapsed}s (target: <1s)"
def test_concurrent_workers_performance(self):
"""Test that 4 concurrent workers complete in reasonable time"""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
# Initialize database first (simulates deployed app with existing schema)
from starpunk.database import init_db
app = create_app({'DATABASE_PATH': str(db_path), 'SECRET_KEY': 'test'})
init_db(app)
start_time = time.time()
with multiprocessing.Pool(4) as pool:
# Use module-level _simple_worker function
results = pool.map(_simple_worker, [db_path] * 4)
elapsed = time.time() - start_time
# All should succeed
assert all(results)
# Should complete in under 5 seconds
# (includes lock contention and retry delays)
assert elapsed < 5.0, f"4 workers took {elapsed}s (target: <5s)"
# TestPerformance class removed per ADR-012
# Same multiprocessing limitations prevent reliable testing
class TestBeginImmediateTransaction: