Files
StarPunk/starpunk/media.py
Phil Skentelbery 1b51c82656 feat: Implement v1.4.0 Phase 1 - Large Image Support
Implement tiered resize strategy for large images per v1.4.0 design:

Changes:
- Increase MAX_FILE_SIZE from 10MB to 50MB
- Add MAX_OUTPUT_SIZE constant (10MB target after optimization)
- Add MIN_QUALITY and MIN_DIMENSION constants
- Add get_optimization_params() for tiered strategy:
  - <=10MB: 2048px max, 95% quality
  - 10-25MB: 1600px max, 90% quality
  - 25-50MB: 1280px max, 85% quality
- Update optimize_image() signature to return 4-tuple (img, w, h, bytes)
- Implement iterative quality reduction if output >10MB
- Add animated GIF detection and size check in validate_image()
- Update save_media() to use new optimize_image() return value
- Fix GIF format preservation during optimization
- Update tests to match new optimize_image() signature

All existing tests pass. Ready for Phase 2 (Image Variants).

Following design in:
/home/phil/Projects/starpunk/docs/design/v1.4.0/media-implementation-design.md

Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-10 17:57:44 -07:00

420 lines
12 KiB
Python

"""
Media upload and management for StarPunk
Per ADR-057 and ADR-058:
- Social media attachment model (media at top of note)
- Pillow-based image optimization
- 10MB max file size, 4096x4096 max dimensions
- Auto-resize to 2048px for performance
- 4 images max per note
"""
from PIL import Image, ImageOps
from pathlib import Path
from datetime import datetime
import uuid
import io
from typing import Optional, List, Dict, Tuple
from flask import current_app
# Allowed MIME types per Q11
ALLOWED_MIME_TYPES = {
'image/jpeg': ['.jpg', '.jpeg'],
'image/png': ['.png'],
'image/gif': ['.gif'],
'image/webp': ['.webp']
}
# Limits per Q&A and ADR-058 (updated in v1.4.0)
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB (v1.4.0)
MAX_OUTPUT_SIZE = 10 * 1024 * 1024 # 10MB target after optimization (v1.4.0)
MAX_DIMENSION = 4096 # 4096x4096 max
RESIZE_DIMENSION = 2048 # Auto-resize to 2048px (default)
MIN_QUALITY = 70 # Minimum JPEG quality before rejection (v1.4.0)
MIN_DIMENSION = 640 # Minimum dimension before rejection (v1.4.0)
MAX_IMAGES_PER_NOTE = 4
def get_optimization_params(file_size: int) -> Tuple[int, int]:
"""
Determine optimization parameters based on input file size
Per v1.4.0 tiered resize strategy:
- <=10MB: 2048px max, 95% quality
- 10-25MB: 1600px max, 90% quality
- 25-50MB: 1280px max, 85% quality
Args:
file_size: Original file size in bytes
Returns:
Tuple of (max_dimension, quality_percent)
"""
if file_size <= 10 * 1024 * 1024: # <=10MB
return (2048, 95)
elif file_size <= 25 * 1024 * 1024: # 10-25MB
return (1600, 90)
else: # 25-50MB
return (1280, 85)
def validate_image(file_data: bytes, filename: str) -> Tuple[str, int, int]:
"""
Validate image file
Per Q11: Validate MIME type using Pillow
Per Q6: Reject if >50MB or >4096px (updated v1.4.0)
Args:
file_data: Raw file bytes
filename: Original filename
Returns:
Tuple of (mime_type, width, height)
Raises:
ValueError: If file is invalid
"""
# Check file size first (before loading)
file_size = len(file_data)
if file_size > MAX_FILE_SIZE:
raise ValueError("File too large. Maximum size is 50MB")
# Try to open with Pillow (validates integrity)
try:
img = Image.open(io.BytesIO(file_data))
img.verify() # Verify it's a valid image
# Re-open after verify (verify() closes the file)
img = Image.open(io.BytesIO(file_data))
except Exception as e:
raise ValueError(f"Invalid or corrupted image: {e}")
# Check format is allowed
if img.format:
format_lower = img.format.lower()
mime_type = f'image/{format_lower}'
# Special case: JPEG format can be reported as 'jpeg'
if format_lower == 'jpeg':
mime_type = 'image/jpeg'
if mime_type not in ALLOWED_MIME_TYPES:
raise ValueError(f"Invalid image format. Accepted: JPEG, PNG, GIF, WebP")
else:
raise ValueError("Could not determine image format")
# Check dimensions
width, height = img.size
if max(width, height) > MAX_DIMENSION:
raise ValueError(f"Image dimensions too large. Maximum is {MAX_DIMENSION}x{MAX_DIMENSION} pixels")
# Check for animated GIF (v1.4.0)
# Animated GIFs cannot be resized, so reject if >10MB
if img.format == 'GIF':
try:
img.seek(1) # Try to seek to second frame
# If successful, it's animated
if file_size > MAX_OUTPUT_SIZE:
raise ValueError(
"Animated GIF too large. Maximum size for animated GIFs is 10MB. "
"Consider using a shorter clip or lower resolution."
)
img.seek(0) # Reset to first frame
except EOFError:
# Not animated, continue normally
pass
return mime_type, width, height
def optimize_image(image_data: bytes, original_size: int = None) -> Tuple[Image.Image, int, int, bytes]:
"""
Optimize image for web display with size-aware strategy
Per v1.4.0:
- Tiered resize strategy based on input size
- Iterative quality reduction if needed
- Target output <=10MB
Args:
image_data: Raw image bytes
original_size: Original file size (for tiered optimization)
Returns:
Tuple of (optimized_image, width, height, optimized_bytes)
Raises:
ValueError: If image cannot be optimized to target size
"""
if original_size is None:
original_size = len(image_data)
# Get initial optimization parameters based on input size
max_dim, quality = get_optimization_params(original_size)
img = Image.open(io.BytesIO(image_data))
# Save original format before any processing (copy() loses this)
original_format = img.format
# Correct EXIF orientation (per ADR-058), except for GIFs
img = ImageOps.exif_transpose(img) if img.format != 'GIF' else img
# For animated GIFs, return as-is (already validated in validate_image)
if img.format == 'GIF' and getattr(img, 'is_animated', False):
# Already checked size in validate_image, just return original
return img, img.size[0], img.size[1], image_data
# Iterative optimization loop
while True:
# Create copy for this iteration
work_img = img.copy()
# Resize if needed
if max(work_img.size) > max_dim:
work_img.thumbnail((max_dim, max_dim), Image.Resampling.LANCZOS)
# Save to bytes to check size
output = io.BytesIO()
# Use original format (copy() loses the format attribute)
save_format = original_format or 'JPEG'
save_kwargs = {'optimize': True}
if save_format in ['JPEG', 'JPG']:
save_kwargs['quality'] = quality
elif save_format == 'WEBP':
save_kwargs['quality'] = quality
# For GIF and PNG, just use optimize flag
work_img.save(output, format=save_format, **save_kwargs)
output_bytes = output.getvalue()
# Check output size
if len(output_bytes) <= MAX_OUTPUT_SIZE:
width, height = work_img.size
return work_img, width, height, output_bytes
# Need to reduce further
if quality > MIN_QUALITY:
# Reduce quality first
quality -= 5
else:
# Already at min quality, reduce dimensions
max_dim = int(max_dim * 0.8)
quality = 85 # Reset quality for new dimension
# Safety check: minimum dimension
if max_dim < MIN_DIMENSION:
raise ValueError(
"Image cannot be optimized to target size. "
"Please use a smaller or lower-resolution image."
)
def save_media(file_data: bytes, filename: str) -> Dict:
"""
Save uploaded media file
Per Q5: UUID-based filename to avoid collisions
Per Q2: Date-organized path: /media/YYYY/MM/uuid.ext
Per Q6: Validate, optimize, then save
Per v1.4.0: Size-aware optimization with iterative quality reduction
Args:
file_data: Raw file bytes
filename: Original filename
Returns:
Media metadata dict (for database insert)
Raises:
ValueError: If validation fails
"""
from starpunk.database import get_db
# Validate image (returns 3-tuple, signature unchanged)
mime_type, orig_width, orig_height = validate_image(file_data, filename)
# Compute file size for optimization strategy
file_size = len(file_data)
# Optimize image with size-aware strategy (now returns 4-tuple with bytes)
optimized_img, width, height, optimized_bytes = optimize_image(file_data, file_size)
# Generate UUID-based filename (per Q5)
file_ext = Path(filename).suffix.lower()
if not file_ext:
# Determine extension from MIME type
for mime, exts in ALLOWED_MIME_TYPES.items():
if mime == mime_type:
file_ext = exts[0]
break
stored_filename = f"{uuid.uuid4()}{file_ext}"
# Create date-based path (per Q2)
now = datetime.now()
year = now.strftime('%Y')
month = now.strftime('%m')
relative_path = f"{year}/{month}/{stored_filename}"
# Get media directory from app config
media_dir = Path(current_app.config.get('DATA_PATH', 'data')) / 'media'
full_dir = media_dir / year / month
full_dir.mkdir(parents=True, exist_ok=True)
# Save optimized image (using bytes from optimize_image to avoid re-encoding)
full_path = full_dir / stored_filename
full_path.write_bytes(optimized_bytes)
# Get actual file size (from optimized bytes)
actual_size = len(optimized_bytes)
# Insert into database
db = get_db(current_app)
cursor = db.execute(
"""
INSERT INTO media (filename, stored_filename, path, mime_type, size, width, height)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(filename, stored_filename, relative_path, mime_type, actual_size, width, height)
)
db.commit()
media_id = cursor.lastrowid
return {
'id': media_id,
'filename': filename,
'stored_filename': stored_filename,
'path': relative_path,
'mime_type': mime_type,
'size': actual_size,
'width': width,
'height': height
}
def attach_media_to_note(note_id: int, media_ids: List[int], captions: List[str]) -> None:
"""
Attach media files to note
Per Q4: Happens after note creation
Per Q7: Captions are optional per image
Args:
note_id: Note to attach to
media_ids: List of media IDs (max 4)
captions: List of captions (same length as media_ids)
Raises:
ValueError: If more than MAX_IMAGES_PER_NOTE
"""
from starpunk.database import get_db
if len(media_ids) > MAX_IMAGES_PER_NOTE:
raise ValueError(f"Maximum {MAX_IMAGES_PER_NOTE} images per note")
db = get_db(current_app)
# Delete existing associations (for edit case)
db.execute("DELETE FROM note_media WHERE note_id = ?", (note_id,))
# Insert new associations
for i, (media_id, caption) in enumerate(zip(media_ids, captions)):
db.execute(
"""
INSERT INTO note_media (note_id, media_id, display_order, caption)
VALUES (?, ?, ?, ?)
""",
(note_id, media_id, i, caption or None)
)
db.commit()
def get_note_media(note_id: int) -> List[Dict]:
"""
Get all media attached to a note
Returns list sorted by display_order
Args:
note_id: Note ID to get media for
Returns:
List of media dicts with metadata
"""
from starpunk.database import get_db
db = get_db(current_app)
rows = db.execute(
"""
SELECT
m.id,
m.filename,
m.stored_filename,
m.path,
m.mime_type,
m.size,
m.width,
m.height,
nm.caption,
nm.display_order
FROM note_media nm
JOIN media m ON nm.media_id = m.id
WHERE nm.note_id = ?
ORDER BY nm.display_order
""",
(note_id,)
).fetchall()
return [
{
'id': row[0],
'filename': row[1],
'stored_filename': row[2],
'path': row[3],
'mime_type': row[4],
'size': row[5],
'width': row[6],
'height': row[7],
'caption': row[8],
'display_order': row[9]
}
for row in rows
]
def delete_media(media_id: int) -> None:
"""
Delete media file and database record
Per Q8: Cleanup orphaned files
Args:
media_id: Media ID to delete
"""
from starpunk.database import get_db
db = get_db(current_app)
# Get media path before deleting
row = db.execute("SELECT path FROM media WHERE id = ?", (media_id,)).fetchone()
if not row:
return
media_path = row[0]
# Delete database record (cascade will delete note_media entries)
db.execute("DELETE FROM media WHERE id = ?", (media_id,))
db.commit()
# Delete file from disk
media_dir = Path(current_app.config.get('DATA_PATH', 'data')) / 'media'
full_path = media_dir / media_path
if full_path.exists():
full_path.unlink()
current_app.logger.info(f"Deleted media file: {media_path}")