""" Tests for batch loading functions (v1.5.0 Phase 3) Tests batch loading of media and tags to verify N+1 query fix in feed generation. """ import pytest from PIL import Image import io from starpunk.media import get_media_for_notes, save_media, attach_media_to_note from starpunk.tags import get_tags_for_notes, add_tags_to_note from starpunk.notes import create_note def create_test_image(width=800, height=600, format='JPEG'): """ Generate test image using PIL Args: width: Image width in pixels height: Image height in pixels format: Image format (PNG, JPEG, GIF, WEBP) Returns: Bytes of image data """ img = Image.new('RGB', (width, height), color='red') buffer = io.BytesIO() img.save(buffer, format=format) buffer.seek(0) return buffer.getvalue() class TestBatchMediaLoading: """Test get_media_for_notes batch loading function""" def test_batch_load_media_empty_list(self, app): """Test batch loading with empty note list""" with app.app_context(): result = get_media_for_notes([]) assert result == {} def test_batch_load_media_no_media(self, app): """Test batch loading for notes without media""" with app.app_context(): # Create notes without media note1 = create_note(content="Test note 1", published=True) note2 = create_note(content="Test note 2", published=True) result = get_media_for_notes([note1.id, note2.id]) assert len(result) == 2 assert result[note1.id] == [] assert result[note2.id] == [] def test_batch_load_media_with_media(self, app): """Test batch loading for notes with media""" with app.app_context(): # Create test notes note1 = create_note(content="Test note 1", published=True) note2 = create_note(content="Test note 2", published=True) # Upload media for note1 image_data = create_test_image(800, 600, 'JPEG') media1 = save_media(image_data, 'test1.jpg') attach_media_to_note(note1.id, [media1['id']], ['Caption 1']) # Upload media for note2 image_data2 = create_test_image(640, 480, 'PNG') media2 = save_media(image_data2, 'test2.png') attach_media_to_note(note2.id, [media2['id']], ['Caption 2']) # Batch load media result = get_media_for_notes([note1.id, note2.id]) # Verify results assert len(result) == 2 assert len(result[note1.id]) == 1 assert len(result[note2.id]) == 1 # Verify media1 assert result[note1.id][0]['id'] == media1['id'] assert result[note1.id][0]['filename'] == 'test1.jpg' assert result[note1.id][0]['caption'] == 'Caption 1' assert result[note1.id][0]['mime_type'] == 'image/jpeg' # Verify media2 assert result[note2.id][0]['id'] == media2['id'] assert result[note2.id][0]['filename'] == 'test2.png' assert result[note2.id][0]['caption'] == 'Caption 2' assert result[note2.id][0]['mime_type'] == 'image/png' def test_batch_load_media_with_variants(self, app): """Test batch loading includes variants (v1.4.0+)""" with app.app_context(): # Create note with media note = create_note(content="Test note with variants", published=True) # Upload large image (will generate variants) image_data = create_test_image(2000, 1500, 'JPEG') media = save_media(image_data, 'large.jpg') attach_media_to_note(note.id, [media['id']], ['Large image']) # Batch load media result = get_media_for_notes([note.id]) # Verify variants are included assert len(result[note.id]) == 1 media_dict = result[note.id][0] assert 'variants' in media_dict # Should have thumb, small, medium, large, original variants assert 'thumb' in media_dict['variants'] assert 'original' in media_dict['variants'] def test_batch_load_media_multiple_per_note(self, app): """Test batch loading with multiple media per note""" with app.app_context(): # Create note with multiple media note = create_note(content="Test note with multiple media", published=True) # Upload multiple images media_ids = [] captions = [] for i in range(3): image_data = create_test_image(800, 600, 'JPEG') media = save_media(image_data, f'test{i}.jpg') media_ids.append(media['id']) captions.append(f'Caption {i}') attach_media_to_note(note.id, media_ids, captions) # Batch load media result = get_media_for_notes([note.id]) # Verify all media loaded assert len(result[note.id]) == 3 # Verify display order preserved for i, media_dict in enumerate(result[note.id]): assert media_dict['caption'] == f'Caption {i}' assert media_dict['display_order'] == i def test_batch_load_media_mixed_notes(self, app): """Test batch loading with mix of notes with/without media""" with app.app_context(): # Create notes note1 = create_note(content="Note with media", published=True) note2 = create_note(content="Note without media", published=True) note3 = create_note(content="Another note with media", published=True) # Add media to note1 and note3 image_data = create_test_image(800, 600, 'JPEG') media1 = save_media(image_data, 'test1.jpg') attach_media_to_note(note1.id, [media1['id']], ['Caption 1']) image_data3 = create_test_image(800, 600, 'JPEG') media3 = save_media(image_data3, 'test3.jpg') attach_media_to_note(note3.id, [media3['id']], ['Caption 3']) # Batch load result = get_media_for_notes([note1.id, note2.id, note3.id]) # Verify results assert len(result) == 3 assert len(result[note1.id]) == 1 assert len(result[note2.id]) == 0 # No media assert len(result[note3.id]) == 1 class TestBatchTagLoading: """Test get_tags_for_notes batch loading function""" def test_batch_load_tags_empty_list(self, app): """Test batch loading with empty note list""" with app.app_context(): result = get_tags_for_notes([]) assert result == {} def test_batch_load_tags_no_tags(self, app): """Test batch loading for notes without tags""" with app.app_context(): # Create notes without tags note1 = create_note(content="Test note 1", published=True) note2 = create_note(content="Test note 2", published=True) result = get_tags_for_notes([note1.id, note2.id]) assert len(result) == 2 assert result[note1.id] == [] assert result[note2.id] == [] def test_batch_load_tags_with_tags(self, app): """Test batch loading for notes with tags""" with app.app_context(): # Create test notes note1 = create_note(content="Test note 1", published=True) note2 = create_note(content="Test note 2", published=True) # Add tags to notes add_tags_to_note(note1.id, ['Python', 'Testing']) add_tags_to_note(note2.id, ['IndieWeb', 'Web']) # Batch load tags result = get_tags_for_notes([note1.id, note2.id]) # Verify results assert len(result) == 2 assert len(result[note1.id]) == 2 assert len(result[note2.id]) == 2 # Verify note1 tags (alphabetical by display_name) assert result[note1.id][0]['display_name'] == 'Python' assert result[note1.id][0]['name'] == 'python' assert result[note1.id][1]['display_name'] == 'Testing' assert result[note1.id][1]['name'] == 'testing' # Verify note2 tags assert result[note2.id][0]['display_name'] == 'IndieWeb' assert result[note2.id][0]['name'] == 'indieweb' assert result[note2.id][1]['display_name'] == 'Web' assert result[note2.id][1]['name'] == 'web' def test_batch_load_tags_mixed_notes(self, app): """Test batch loading with mix of notes with/without tags""" with app.app_context(): # Create notes note1 = create_note(content="Note with tags", published=True) note2 = create_note(content="Note without tags", published=True) note3 = create_note(content="Another note with tags", published=True) # Add tags to note1 and note3 add_tags_to_note(note1.id, ['Tag1', 'Tag2']) add_tags_to_note(note3.id, ['Tag3']) # Batch load result = get_tags_for_notes([note1.id, note2.id, note3.id]) # Verify results assert len(result) == 3 assert len(result[note1.id]) == 2 assert len(result[note2.id]) == 0 # No tags assert len(result[note3.id]) == 1 def test_batch_load_tags_ordering(self, app): """Test batch loading preserves alphabetical ordering""" with app.app_context(): # Create note with tags in non-alphabetical order note = create_note(content="Test note", published=True) add_tags_to_note(note.id, ['Zebra', 'Apple', 'Banana']) # Batch load result = get_tags_for_notes([note.id]) # Verify alphabetical order (case-insensitive) assert len(result[note.id]) == 3 assert result[note.id][0]['display_name'] == 'Apple' assert result[note.id][1]['display_name'] == 'Banana' assert result[note.id][2]['display_name'] == 'Zebra' class TestBatchLoadingIntegration: """Test batch loading integration with feed generation""" def test_feed_generation_uses_batch_loading(self, client, app): """Test that feed generation correctly uses batch loaded data""" with app.app_context(): # Create multiple notes with media and tags notes = [] for i in range(5): note = create_note(content=f"Test note {i}", published=True) notes.append(note) # Add media image_data = create_test_image(800, 600, 'JPEG') media = save_media(image_data, f'test{i}.jpg') attach_media_to_note(note.id, [media['id']], [f'Caption {i}']) # Add tags add_tags_to_note(note.id, [f'Tag{i}', 'Common']) # Request feed (should use batch loading) response = client.get('/feed.rss') assert response.status_code == 200 # Verify feed contains data from batch loaded media/tags feed_data = response.data.decode('utf-8') assert 'Test note 0' in feed_data assert 'Test note 4' in feed_data # Media should be in feed assert 'test0.jpg' in feed_data or 'media/' in feed_data def test_batch_loading_performance_comparison(self, app): """Test that batch loading reduces query count""" with app.app_context(): from starpunk.database import get_db # Create test data notes = [] for i in range(10): note = create_note(content=f"Test note {i}", published=True) notes.append(note) # Add media image_data = create_test_image(800, 600, 'JPEG') media = save_media(image_data, f'test{i}.jpg') attach_media_to_note(note.id, [media['id']], [f'Caption {i}']) # Add tags add_tags_to_note(note.id, [f'Tag{i}']) note_ids = [n.id for n in notes] # Batch load (should be 2 queries: media + variants, tags) media_result = get_media_for_notes(note_ids) tags_result = get_tags_for_notes(note_ids) # Verify results complete assert len(media_result) == 10 assert len(tags_result) == 10 # Verify all notes have data for note_id in note_ids: assert len(media_result[note_id]) == 1 assert len(tags_result[note_id]) == 1