Following design approved by architect in docs/design/v1.4.0/ Changes to starpunk/media.py: - Update delete_media() to fetch and delete variant files from disk - Query media_variants table before deletion for file paths - Use best-effort cleanup with try/except for each file - Log individual file deletion failures as warnings - Update final log to show total files deleted (original + variants) - Update module docstring to reflect v1.4.0 capabilities: * 50MB max upload, 10MB max output * Image variants (thumb, small, medium, large) * Tiered resize strategy This fixes the issue where variant files were left orphaned on disk when media was deleted. The database CASCADE already deleted variant records, but the physical files remained. All tests pass: uv run pytest tests/test_media_upload.py -v (23/23) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
652 lines
20 KiB
Python
652 lines
20 KiB
Python
"""
|
|
Media upload and management for StarPunk
|
|
|
|
Per ADR-057 and ADR-058:
|
|
- Social media attachment model (media at top of note)
|
|
- Pillow-based image optimization
|
|
- 50MB max upload, 10MB max output (v1.4.0)
|
|
- Image variants: thumb, small, medium, large (v1.4.0)
|
|
- Tiered resize strategy based on input size (v1.4.0)
|
|
- 4096x4096 max dimensions
|
|
- 4 images max per note
|
|
"""
|
|
|
|
from PIL import Image, ImageOps
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
import uuid
|
|
import io
|
|
from typing import Optional, List, Dict, Tuple
|
|
from flask import current_app
|
|
|
|
# Allowed MIME types per Q11
|
|
ALLOWED_MIME_TYPES = {
|
|
'image/jpeg': ['.jpg', '.jpeg'],
|
|
'image/png': ['.png'],
|
|
'image/gif': ['.gif'],
|
|
'image/webp': ['.webp']
|
|
}
|
|
|
|
# Limits per Q&A and ADR-058 (updated in v1.4.0)
|
|
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB (v1.4.0)
|
|
MAX_OUTPUT_SIZE = 10 * 1024 * 1024 # 10MB target after optimization (v1.4.0)
|
|
MAX_DIMENSION = 4096 # 4096x4096 max
|
|
RESIZE_DIMENSION = 2048 # Auto-resize to 2048px (default)
|
|
MIN_QUALITY = 70 # Minimum JPEG quality before rejection (v1.4.0)
|
|
MIN_DIMENSION = 640 # Minimum dimension before rejection (v1.4.0)
|
|
MAX_IMAGES_PER_NOTE = 4
|
|
|
|
# Variant specifications (v1.4.0 Phase 2)
|
|
VARIANT_SPECS = {
|
|
'thumb': {'size': (150, 150), 'crop': True},
|
|
'small': {'width': 320, 'crop': False},
|
|
'medium': {'width': 640, 'crop': False},
|
|
'large': {'width': 1280, 'crop': False},
|
|
}
|
|
|
|
|
|
def get_optimization_params(file_size: int) -> Tuple[int, int]:
|
|
"""
|
|
Determine optimization parameters based on input file size
|
|
|
|
Per v1.4.0 tiered resize strategy:
|
|
- <=10MB: 2048px max, 95% quality
|
|
- 10-25MB: 1600px max, 90% quality
|
|
- 25-50MB: 1280px max, 85% quality
|
|
|
|
Args:
|
|
file_size: Original file size in bytes
|
|
|
|
Returns:
|
|
Tuple of (max_dimension, quality_percent)
|
|
"""
|
|
if file_size <= 10 * 1024 * 1024: # <=10MB
|
|
return (2048, 95)
|
|
elif file_size <= 25 * 1024 * 1024: # 10-25MB
|
|
return (1600, 90)
|
|
else: # 25-50MB
|
|
return (1280, 85)
|
|
|
|
|
|
def validate_image(file_data: bytes, filename: str) -> Tuple[str, int, int]:
|
|
"""
|
|
Validate image file
|
|
|
|
Per Q11: Validate MIME type using Pillow
|
|
Per Q6: Reject if >50MB or >4096px (updated v1.4.0)
|
|
|
|
Args:
|
|
file_data: Raw file bytes
|
|
filename: Original filename
|
|
|
|
Returns:
|
|
Tuple of (mime_type, width, height)
|
|
|
|
Raises:
|
|
ValueError: If file is invalid
|
|
"""
|
|
# Check file size first (before loading)
|
|
file_size = len(file_data)
|
|
if file_size > MAX_FILE_SIZE:
|
|
raise ValueError("File too large. Maximum size is 50MB")
|
|
|
|
# Try to open with Pillow (validates integrity)
|
|
try:
|
|
img = Image.open(io.BytesIO(file_data))
|
|
img.verify() # Verify it's a valid image
|
|
|
|
# Re-open after verify (verify() closes the file)
|
|
img = Image.open(io.BytesIO(file_data))
|
|
except Exception as e:
|
|
raise ValueError(f"Invalid or corrupted image: {e}")
|
|
|
|
# Check format is allowed
|
|
if img.format:
|
|
format_lower = img.format.lower()
|
|
mime_type = f'image/{format_lower}'
|
|
|
|
# Special case: JPEG format can be reported as 'jpeg'
|
|
if format_lower == 'jpeg':
|
|
mime_type = 'image/jpeg'
|
|
|
|
if mime_type not in ALLOWED_MIME_TYPES:
|
|
raise ValueError(f"Invalid image format. Accepted: JPEG, PNG, GIF, WebP")
|
|
else:
|
|
raise ValueError("Could not determine image format")
|
|
|
|
# Check dimensions
|
|
width, height = img.size
|
|
if max(width, height) > MAX_DIMENSION:
|
|
raise ValueError(f"Image dimensions too large. Maximum is {MAX_DIMENSION}x{MAX_DIMENSION} pixels")
|
|
|
|
# Check for animated GIF (v1.4.0)
|
|
# Animated GIFs cannot be resized, so reject if >10MB
|
|
if img.format == 'GIF':
|
|
try:
|
|
img.seek(1) # Try to seek to second frame
|
|
# If successful, it's animated
|
|
if file_size > MAX_OUTPUT_SIZE:
|
|
raise ValueError(
|
|
"Animated GIF too large. Maximum size for animated GIFs is 10MB. "
|
|
"Consider using a shorter clip or lower resolution."
|
|
)
|
|
img.seek(0) # Reset to first frame
|
|
except EOFError:
|
|
# Not animated, continue normally
|
|
pass
|
|
|
|
return mime_type, width, height
|
|
|
|
|
|
def optimize_image(image_data: bytes, original_size: int = None) -> Tuple[Image.Image, int, int, bytes]:
|
|
"""
|
|
Optimize image for web display with size-aware strategy
|
|
|
|
Per v1.4.0:
|
|
- Tiered resize strategy based on input size
|
|
- Iterative quality reduction if needed
|
|
- Target output <=10MB
|
|
|
|
Args:
|
|
image_data: Raw image bytes
|
|
original_size: Original file size (for tiered optimization)
|
|
|
|
Returns:
|
|
Tuple of (optimized_image, width, height, optimized_bytes)
|
|
|
|
Raises:
|
|
ValueError: If image cannot be optimized to target size
|
|
"""
|
|
if original_size is None:
|
|
original_size = len(image_data)
|
|
|
|
# Get initial optimization parameters based on input size
|
|
max_dim, quality = get_optimization_params(original_size)
|
|
|
|
img = Image.open(io.BytesIO(image_data))
|
|
|
|
# Save original format before any processing (copy() loses this)
|
|
original_format = img.format
|
|
|
|
# Correct EXIF orientation (per ADR-058), except for GIFs
|
|
img = ImageOps.exif_transpose(img) if img.format != 'GIF' else img
|
|
|
|
# For animated GIFs, return as-is (already validated in validate_image)
|
|
if img.format == 'GIF' and getattr(img, 'is_animated', False):
|
|
# Already checked size in validate_image, just return original
|
|
return img, img.size[0], img.size[1], image_data
|
|
|
|
# Iterative optimization loop
|
|
while True:
|
|
# Create copy for this iteration
|
|
work_img = img.copy()
|
|
|
|
# Resize if needed
|
|
if max(work_img.size) > max_dim:
|
|
work_img.thumbnail((max_dim, max_dim), Image.Resampling.LANCZOS)
|
|
|
|
# Save to bytes to check size
|
|
output = io.BytesIO()
|
|
# Use original format (copy() loses the format attribute)
|
|
save_format = original_format or 'JPEG'
|
|
save_kwargs = {'optimize': True}
|
|
|
|
if save_format in ['JPEG', 'JPG']:
|
|
save_kwargs['quality'] = quality
|
|
elif save_format == 'WEBP':
|
|
save_kwargs['quality'] = quality
|
|
# For GIF and PNG, just use optimize flag
|
|
|
|
work_img.save(output, format=save_format, **save_kwargs)
|
|
output_bytes = output.getvalue()
|
|
|
|
# Check output size
|
|
if len(output_bytes) <= MAX_OUTPUT_SIZE:
|
|
width, height = work_img.size
|
|
return work_img, width, height, output_bytes
|
|
|
|
# Need to reduce further
|
|
if quality > MIN_QUALITY:
|
|
# Reduce quality first
|
|
quality -= 5
|
|
else:
|
|
# Already at min quality, reduce dimensions
|
|
max_dim = int(max_dim * 0.8)
|
|
quality = 85 # Reset quality for new dimension
|
|
|
|
# Safety check: minimum dimension
|
|
if max_dim < MIN_DIMENSION:
|
|
raise ValueError(
|
|
"Image cannot be optimized to target size. "
|
|
"Please use a smaller or lower-resolution image."
|
|
)
|
|
|
|
|
|
def generate_variant(
|
|
img: Image.Image,
|
|
variant_type: str,
|
|
base_path: Path,
|
|
base_filename: str,
|
|
file_ext: str
|
|
) -> Dict:
|
|
"""
|
|
Generate a single image variant
|
|
|
|
Args:
|
|
img: Source PIL Image
|
|
variant_type: One of 'thumb', 'small', 'medium', 'large'
|
|
base_path: Directory to save to
|
|
base_filename: Base filename (UUID without extension)
|
|
file_ext: File extension (e.g., '.jpg')
|
|
|
|
Returns:
|
|
Dict with variant metadata (path, width, height, size_bytes)
|
|
"""
|
|
spec = VARIANT_SPECS[variant_type]
|
|
work_img = img.copy()
|
|
|
|
if spec.get('crop'):
|
|
# Center crop for thumbnails using ImageOps.fit()
|
|
work_img = ImageOps.fit(
|
|
work_img,
|
|
spec['size'],
|
|
method=Image.Resampling.LANCZOS,
|
|
centering=(0.5, 0.5)
|
|
)
|
|
else:
|
|
# Aspect-preserving resize
|
|
target_width = spec['width']
|
|
if work_img.width > target_width:
|
|
ratio = target_width / work_img.width
|
|
new_height = int(work_img.height * ratio)
|
|
work_img = work_img.resize(
|
|
(target_width, new_height),
|
|
Image.Resampling.LANCZOS
|
|
)
|
|
|
|
# Generate variant filename
|
|
variant_filename = f"{base_filename}_{variant_type}{file_ext}"
|
|
variant_path = base_path / variant_filename
|
|
|
|
# Save with appropriate quality
|
|
save_kwargs = {'optimize': True}
|
|
if work_img.format in ['JPEG', 'JPG', None]:
|
|
save_kwargs['quality'] = 85
|
|
|
|
# Determine format from extension
|
|
save_format = 'JPEG' if file_ext.lower() in ['.jpg', '.jpeg'] else file_ext[1:].upper()
|
|
work_img.save(variant_path, format=save_format, **save_kwargs)
|
|
|
|
return {
|
|
'variant_type': variant_type,
|
|
'path': str(variant_path.relative_to(base_path.parent.parent)), # Relative to media root
|
|
'width': work_img.width,
|
|
'height': work_img.height,
|
|
'size_bytes': variant_path.stat().st_size
|
|
}
|
|
|
|
|
|
def generate_all_variants(
|
|
img: Image.Image,
|
|
base_path: Path,
|
|
base_filename: str,
|
|
file_ext: str,
|
|
media_id: int,
|
|
year: str,
|
|
month: str,
|
|
optimized_bytes: bytes
|
|
) -> List[Dict]:
|
|
"""
|
|
Generate all variants for an image and store in database
|
|
|
|
Args:
|
|
img: Source PIL Image (the optimized original)
|
|
base_path: Directory containing the original
|
|
base_filename: Base filename (UUID without extension)
|
|
file_ext: File extension
|
|
media_id: ID of parent media record
|
|
year: Year string (e.g., '2025') for path calculation
|
|
month: Month string (e.g., '01') for path calculation
|
|
optimized_bytes: Bytes of optimized original (avoids re-reading file)
|
|
|
|
Returns:
|
|
List of variant metadata dicts
|
|
"""
|
|
from starpunk.database import get_db
|
|
|
|
variants = []
|
|
db = get_db(current_app)
|
|
created_files = [] # Track files for cleanup on failure
|
|
|
|
try:
|
|
# Generate each variant type
|
|
for variant_type in ['thumb', 'small', 'medium', 'large']:
|
|
# Skip if image is smaller than target
|
|
spec = VARIANT_SPECS[variant_type]
|
|
target_width = spec.get('width') or spec['size'][0]
|
|
|
|
if img.width < target_width and variant_type != 'thumb':
|
|
continue # Skip variants larger than original
|
|
|
|
variant = generate_variant(img, variant_type, base_path, base_filename, file_ext)
|
|
variants.append(variant)
|
|
created_files.append(base_path / f"{base_filename}_{variant_type}{file_ext}")
|
|
|
|
# Insert into database
|
|
db.execute(
|
|
"""
|
|
INSERT INTO media_variants
|
|
(media_id, variant_type, path, width, height, size_bytes)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(media_id, variant['variant_type'], variant['path'],
|
|
variant['width'], variant['height'], variant['size_bytes'])
|
|
)
|
|
|
|
# Also record the original as 'original' variant
|
|
# Use explicit year/month for path calculation (avoids fragile parent traversal)
|
|
original_path = f"{year}/{month}/{base_filename}{file_ext}"
|
|
db.execute(
|
|
"""
|
|
INSERT INTO media_variants
|
|
(media_id, variant_type, path, width, height, size_bytes)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(media_id, 'original', original_path, img.width, img.height,
|
|
len(optimized_bytes)) # Use passed bytes instead of file I/O
|
|
)
|
|
|
|
db.commit()
|
|
return variants
|
|
|
|
except Exception as e:
|
|
# Clean up any created variant files on failure
|
|
for file_path in created_files:
|
|
try:
|
|
if file_path.exists():
|
|
file_path.unlink()
|
|
except OSError:
|
|
pass # Best effort cleanup
|
|
raise # Re-raise the original exception
|
|
|
|
|
|
def save_media(file_data: bytes, filename: str) -> Dict:
|
|
"""
|
|
Save uploaded media file
|
|
|
|
Per Q5: UUID-based filename to avoid collisions
|
|
Per Q2: Date-organized path: /media/YYYY/MM/uuid.ext
|
|
Per Q6: Validate, optimize, then save
|
|
Per v1.4.0: Size-aware optimization with iterative quality reduction
|
|
|
|
Args:
|
|
file_data: Raw file bytes
|
|
filename: Original filename
|
|
|
|
Returns:
|
|
Media metadata dict (for database insert)
|
|
|
|
Raises:
|
|
ValueError: If validation fails
|
|
"""
|
|
from starpunk.database import get_db
|
|
|
|
# Validate image (returns 3-tuple, signature unchanged)
|
|
mime_type, orig_width, orig_height = validate_image(file_data, filename)
|
|
|
|
# Compute file size for optimization strategy
|
|
file_size = len(file_data)
|
|
|
|
# Optimize image with size-aware strategy (now returns 4-tuple with bytes)
|
|
optimized_img, width, height, optimized_bytes = optimize_image(file_data, file_size)
|
|
|
|
# Generate UUID-based filename (per Q5)
|
|
file_ext = Path(filename).suffix.lower()
|
|
if not file_ext:
|
|
# Determine extension from MIME type
|
|
for mime, exts in ALLOWED_MIME_TYPES.items():
|
|
if mime == mime_type:
|
|
file_ext = exts[0]
|
|
break
|
|
|
|
stored_filename = f"{uuid.uuid4()}{file_ext}"
|
|
|
|
# Create date-based path (per Q2)
|
|
now = datetime.now()
|
|
year = now.strftime('%Y')
|
|
month = now.strftime('%m')
|
|
relative_path = f"{year}/{month}/{stored_filename}"
|
|
|
|
# Get media directory from app config
|
|
media_dir = Path(current_app.config.get('DATA_PATH', 'data')) / 'media'
|
|
full_dir = media_dir / year / month
|
|
full_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Save optimized image (using bytes from optimize_image to avoid re-encoding)
|
|
full_path = full_dir / stored_filename
|
|
full_path.write_bytes(optimized_bytes)
|
|
|
|
# Get actual file size (from optimized bytes)
|
|
actual_size = len(optimized_bytes)
|
|
|
|
# Insert into database
|
|
db = get_db(current_app)
|
|
cursor = db.execute(
|
|
"""
|
|
INSERT INTO media (filename, stored_filename, path, mime_type, size, width, height)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(filename, stored_filename, relative_path, mime_type, actual_size, width, height)
|
|
)
|
|
db.commit()
|
|
media_id = cursor.lastrowid
|
|
|
|
# Generate variants (synchronous) - v1.4.0 Phase 2
|
|
# Pass year, month, and optimized_bytes to avoid fragile path traversal and file I/O
|
|
base_filename = stored_filename.rsplit('.', 1)[0]
|
|
variants = generate_all_variants(
|
|
optimized_img,
|
|
full_dir,
|
|
base_filename,
|
|
file_ext,
|
|
media_id,
|
|
year,
|
|
month,
|
|
optimized_bytes
|
|
)
|
|
|
|
return {
|
|
'id': media_id,
|
|
'filename': filename,
|
|
'stored_filename': stored_filename,
|
|
'path': relative_path,
|
|
'mime_type': mime_type,
|
|
'size': actual_size,
|
|
'width': width,
|
|
'height': height,
|
|
'variants': variants
|
|
}
|
|
|
|
|
|
def attach_media_to_note(note_id: int, media_ids: List[int], captions: List[str]) -> None:
|
|
"""
|
|
Attach media files to note
|
|
|
|
Per Q4: Happens after note creation
|
|
Per Q7: Captions are optional per image
|
|
|
|
Args:
|
|
note_id: Note to attach to
|
|
media_ids: List of media IDs (max 4)
|
|
captions: List of captions (same length as media_ids)
|
|
|
|
Raises:
|
|
ValueError: If more than MAX_IMAGES_PER_NOTE
|
|
"""
|
|
from starpunk.database import get_db
|
|
|
|
if len(media_ids) > MAX_IMAGES_PER_NOTE:
|
|
raise ValueError(f"Maximum {MAX_IMAGES_PER_NOTE} images per note")
|
|
|
|
db = get_db(current_app)
|
|
|
|
# Delete existing associations (for edit case)
|
|
db.execute("DELETE FROM note_media WHERE note_id = ?", (note_id,))
|
|
|
|
# Insert new associations
|
|
for i, (media_id, caption) in enumerate(zip(media_ids, captions)):
|
|
db.execute(
|
|
"""
|
|
INSERT INTO note_media (note_id, media_id, display_order, caption)
|
|
VALUES (?, ?, ?, ?)
|
|
""",
|
|
(note_id, media_id, i, caption or None)
|
|
)
|
|
|
|
db.commit()
|
|
|
|
|
|
def get_note_media(note_id: int) -> List[Dict]:
|
|
"""
|
|
Get all media attached to a note with variants (v1.4.0)
|
|
|
|
Returns list sorted by display_order
|
|
|
|
Args:
|
|
note_id: Note ID to get media for
|
|
|
|
Returns:
|
|
List of media dicts with metadata (includes 'variants' key if variants exist)
|
|
"""
|
|
from starpunk.database import get_db
|
|
|
|
db = get_db(current_app)
|
|
rows = db.execute(
|
|
"""
|
|
SELECT
|
|
m.id,
|
|
m.filename,
|
|
m.stored_filename,
|
|
m.path,
|
|
m.mime_type,
|
|
m.size,
|
|
m.width,
|
|
m.height,
|
|
nm.caption,
|
|
nm.display_order
|
|
FROM note_media nm
|
|
JOIN media m ON nm.media_id = m.id
|
|
WHERE nm.note_id = ?
|
|
ORDER BY nm.display_order
|
|
""",
|
|
(note_id,)
|
|
).fetchall()
|
|
|
|
media_list = []
|
|
for row in rows:
|
|
media_dict = {
|
|
'id': row[0],
|
|
'filename': row[1],
|
|
'stored_filename': row[2],
|
|
'path': row[3],
|
|
'mime_type': row[4],
|
|
'size': row[5],
|
|
'width': row[6],
|
|
'height': row[7],
|
|
'caption': row[8],
|
|
'display_order': row[9]
|
|
}
|
|
|
|
# Fetch variants for this media (v1.4.0 Phase 2)
|
|
variants = db.execute(
|
|
"""
|
|
SELECT variant_type, path, width, height, size_bytes
|
|
FROM media_variants
|
|
WHERE media_id = ?
|
|
ORDER BY
|
|
CASE variant_type
|
|
WHEN 'thumb' THEN 1
|
|
WHEN 'small' THEN 2
|
|
WHEN 'medium' THEN 3
|
|
WHEN 'large' THEN 4
|
|
WHEN 'original' THEN 5
|
|
END
|
|
""",
|
|
(row[0],)
|
|
).fetchall()
|
|
|
|
# Only add 'variants' key if variants exist (backwards compatibility)
|
|
# Pre-v1.4.0 media won't have variants, and consumers shouldn't
|
|
# expect the key to be present
|
|
if variants:
|
|
media_dict['variants'] = {
|
|
v[0]: {
|
|
'path': v[1],
|
|
'width': v[2],
|
|
'height': v[3],
|
|
'size_bytes': v[4]
|
|
}
|
|
for v in variants
|
|
}
|
|
|
|
media_list.append(media_dict)
|
|
|
|
return media_list
|
|
|
|
|
|
def delete_media(media_id: int) -> None:
|
|
"""
|
|
Delete media file, variants, and database record
|
|
|
|
Per Q8: Cleanup orphaned files
|
|
Per v1.4.0: Also cleanup variant files
|
|
|
|
Args:
|
|
media_id: Media ID to delete
|
|
"""
|
|
from starpunk.database import get_db
|
|
|
|
db = get_db(current_app)
|
|
|
|
# Get media path before deleting
|
|
row = db.execute("SELECT path FROM media WHERE id = ?", (media_id,)).fetchone()
|
|
if not row:
|
|
return
|
|
|
|
media_path = row[0]
|
|
media_dir = Path(current_app.config.get('DATA_PATH', 'data')) / 'media'
|
|
|
|
# Get variant paths before deleting (v1.4.0)
|
|
variant_rows = db.execute(
|
|
"SELECT path FROM media_variants WHERE media_id = ?",
|
|
(media_id,)
|
|
).fetchall()
|
|
|
|
# Delete database record (cascade will delete media_variants and note_media entries)
|
|
db.execute("DELETE FROM media WHERE id = ?", (media_id,))
|
|
db.commit()
|
|
|
|
# Delete files from disk (best-effort cleanup)
|
|
deleted_count = 0
|
|
|
|
# Delete original file
|
|
full_path = media_dir / media_path
|
|
try:
|
|
if full_path.exists():
|
|
full_path.unlink()
|
|
deleted_count += 1
|
|
except OSError as e:
|
|
current_app.logger.warning(f"Failed to delete media file {media_path}: {e}")
|
|
|
|
# Delete variant files (v1.4.0)
|
|
for variant_row in variant_rows:
|
|
variant_path = media_dir / variant_row[0]
|
|
try:
|
|
if variant_path.exists():
|
|
variant_path.unlink()
|
|
deleted_count += 1
|
|
except OSError as e:
|
|
current_app.logger.warning(f"Failed to delete variant file {variant_row[0]}: {e}")
|
|
|
|
current_app.logger.info(f"Deleted media {media_id}: {deleted_count} file(s) removed from disk")
|