feat(media): Make variant generation atomic with database

Per v1.5.0 Phase 4:
- Generate variants to temp directory first
- Perform database inserts in transaction
- Move files to final location before commit
- Clean up temp files on any failure
- Add startup recovery for orphaned temp files
- All media operations now fully atomic

Changes:
- Modified generate_all_variants() to return file moves
- Modified save_media() to handle full atomic operation
- Add cleanup_orphaned_temp_files() for startup recovery
- Added 4 new tests for atomic behavior
- Fixed HEIC variant format detection
- Updated variant failure test for atomic behavior

Fixes:
- No orphaned files on database failures
- No orphaned DB records on file failures
- Startup recovery detects and cleans orphans

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-12-17 11:26:26 -07:00
parent b689e02e64
commit 21fa7acfbb
5 changed files with 951 additions and 87 deletions

View File

@@ -9,6 +9,7 @@ import pytest
from PIL import Image
import io
from pathlib import Path
from datetime import datetime
from starpunk.media import (
validate_image,
@@ -618,7 +619,7 @@ class TestMediaLogging:
assert 'error=' in caplog.text
def test_save_media_logs_variant_failure(self, app, caplog, monkeypatch):
"""Test variant generation failure logs at WARNING level but continues"""
"""Test variant generation failure causes atomic rollback (v1.5.0 Phase 4)"""
import logging
from starpunk import media
@@ -631,20 +632,15 @@ class TestMediaLogging:
image_data = create_test_image(800, 600, 'PNG')
with app.app_context():
with caplog.at_level(logging.INFO): # Need INFO level to capture success log
# Should succeed despite variant failure
media_info = save_media(image_data, 'test.png')
with caplog.at_level(logging.WARNING):
# Should fail due to atomic operation (v1.5.0 Phase 4)
with pytest.raises(RuntimeError, match="Variant generation failed"):
save_media(image_data, 'test.png')
# Check variant failure log
assert "Media upload variant generation failed" in caplog.text
# Check atomic operation failure log
assert "Media upload atomic operation failed" in caplog.text
assert 'filename="test.png"' in caplog.text
assert f'media_id={media_info["id"]}' in caplog.text
assert 'error=' in caplog.text
# But success log should also be present
assert "Media upload successful" in caplog.text
# And variants should be 0
assert 'variants=0' in caplog.text
assert 'error="Variant generation failed"' in caplog.text
def test_save_media_logs_unexpected_error(self, app, caplog, monkeypatch):
"""Test unexpected error logs at ERROR level"""
@@ -680,3 +676,151 @@ def sample_note(app):
with app.app_context():
note = create_note("Test note content", published=True)
yield note
class TestAtomicVariantGeneration:
"""
Test atomic variant generation (v1.5.0 Phase 4)
Tests that variant generation is atomic with database commits,
preventing orphaned files or database records.
"""
def test_atomic_media_save_success(self, app):
"""Test that media save operation is fully atomic on success"""
from pathlib import Path
from starpunk.media import save_media
# Create test image
img_data = create_test_image(1600, 1200, 'JPEG')
with app.app_context():
# Save media
result = save_media(img_data, 'test_atomic.jpg')
# Verify media record was created
assert result['id'] > 0
assert result['filename'] == 'test_atomic.jpg'
# Verify original file exists in final location
media_dir = Path(app.config['DATA_PATH']) / 'media'
original_path = media_dir / result['path']
assert original_path.exists(), "Original file should exist in final location"
# Verify variant files exist in final location
for variant in result['variants']:
variant_path = media_dir / variant['path']
assert variant_path.exists(), f"Variant {variant['variant_type']} should exist"
# Verify no temp files left behind
temp_dir = media_dir / '.tmp'
if temp_dir.exists():
temp_files = list(temp_dir.glob('**/*'))
temp_files = [f for f in temp_files if f.is_file()]
assert len(temp_files) == 0, "No temp files should remain after successful save"
def test_file_move_failure_rolls_back_database(self, app, monkeypatch):
"""Test that file move failure rolls back database transaction"""
from pathlib import Path
from starpunk.media import save_media
import shutil
# Create test image
img_data = create_test_image(1600, 1200, 'JPEG')
with app.app_context():
from starpunk.database import get_db
# Mock shutil.move to fail
original_move = shutil.move
call_count = [0]
def mock_move(src, dst):
call_count[0] += 1
# Fail on first move (original file)
if call_count[0] == 1:
raise OSError("File move failed")
return original_move(src, dst)
monkeypatch.setattr(shutil, 'move', mock_move)
# Count media records before operation
db = get_db(app)
media_count_before = db.execute("SELECT COUNT(*) FROM media").fetchone()[0]
# Try to save media - should fail
with pytest.raises(OSError, match="File move failed"):
save_media(img_data, 'test_rollback.jpg')
# Verify no new media records were added (transaction rolled back)
media_count_after = db.execute("SELECT COUNT(*) FROM media").fetchone()[0]
assert media_count_after == media_count_before, "No media records should be added on failure"
# Verify temp files were cleaned up
media_dir = Path(app.config['DATA_PATH']) / 'media'
temp_dir = media_dir / '.tmp'
if temp_dir.exists():
temp_files = list(temp_dir.glob('**/*'))
temp_files = [f for f in temp_files if f.is_file()]
assert len(temp_files) == 0, "Temp files should be cleaned up after file move failure"
# Restore original move
monkeypatch.setattr(shutil, 'move', original_move)
def test_startup_recovery_cleans_orphaned_temp_files(self, app):
"""Test that startup recovery detects and cleans orphaned temp files"""
from pathlib import Path
from starpunk.media import cleanup_orphaned_temp_files
import logging
with app.app_context():
media_dir = Path(app.config['DATA_PATH']) / 'media'
temp_dir = media_dir / '.tmp'
temp_dir.mkdir(parents=True, exist_ok=True)
# Create orphaned temp subdirectory with files
orphan_dir = temp_dir / 'orphaned_test_12345678'
orphan_dir.mkdir(parents=True, exist_ok=True)
# Create some fake orphaned files
orphan_file1 = orphan_dir / 'test_thumb.jpg'
orphan_file2 = orphan_dir / 'test_small.jpg'
orphan_file1.write_bytes(b'fake image data')
orphan_file2.write_bytes(b'fake image data')
# Run cleanup
with app.test_request_context():
cleanup_orphaned_temp_files(app)
# Verify files were cleaned up
assert not orphan_file1.exists(), "Orphaned file 1 should be deleted"
assert not orphan_file2.exists(), "Orphaned file 2 should be deleted"
assert not orphan_dir.exists(), "Orphaned directory should be deleted"
def test_startup_recovery_logs_orphaned_files(self, app, caplog):
"""Test that startup recovery logs warnings for orphaned files"""
from pathlib import Path
from starpunk.media import cleanup_orphaned_temp_files
import logging
with app.app_context():
media_dir = Path(app.config['DATA_PATH']) / 'media'
temp_dir = media_dir / '.tmp'
temp_dir.mkdir(parents=True, exist_ok=True)
# Create orphaned temp subdirectory with files
orphan_dir = temp_dir / 'orphaned_test_99999999'
orphan_dir.mkdir(parents=True, exist_ok=True)
# Create some fake orphaned files
orphan_file = orphan_dir / 'test_medium.jpg'
orphan_file.write_bytes(b'fake image data')
# Run cleanup with logging
with caplog.at_level(logging.WARNING):
cleanup_orphaned_temp_files(app)
# Verify warning was logged
assert "Found orphaned temp directory from failed operation" in caplog.text
assert "orphaned_test_99999999" in caplog.text