""" Media upload and management for StarPunk Per ADR-057 and ADR-058: - Social media attachment model (media at top of note) - Pillow-based image optimization - 10MB max file size, 4096x4096 max dimensions - Auto-resize to 2048px for performance - 4 images max per note """ from PIL import Image, ImageOps from pathlib import Path from datetime import datetime import uuid import io from typing import Optional, List, Dict, Tuple from flask import current_app # Allowed MIME types per Q11 ALLOWED_MIME_TYPES = { 'image/jpeg': ['.jpg', '.jpeg'], 'image/png': ['.png'], 'image/gif': ['.gif'], 'image/webp': ['.webp'] } # Limits per Q&A and ADR-058 MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB MAX_DIMENSION = 4096 # 4096x4096 max RESIZE_DIMENSION = 2048 # Auto-resize to 2048px MAX_IMAGES_PER_NOTE = 4 def validate_image(file_data: bytes, filename: str) -> Tuple[str, int, int]: """ Validate image file Per Q11: Validate MIME type using Pillow Per Q6: Reject if >10MB or >4096px Args: file_data: Raw file bytes filename: Original filename Returns: Tuple of (mime_type, width, height) Raises: ValueError: If file is invalid """ # Check file size first (before loading) if len(file_data) > MAX_FILE_SIZE: raise ValueError(f"File too large. Maximum size is 10MB") # Try to open with Pillow (validates integrity) try: img = Image.open(io.BytesIO(file_data)) img.verify() # Verify it's a valid image # Re-open after verify (verify() closes the file) img = Image.open(io.BytesIO(file_data)) except Exception as e: raise ValueError(f"Invalid or corrupted image: {e}") # Check format is allowed if img.format: format_lower = img.format.lower() mime_type = f'image/{format_lower}' # Special case: JPEG format can be reported as 'jpeg' if format_lower == 'jpeg': mime_type = 'image/jpeg' if mime_type not in ALLOWED_MIME_TYPES: raise ValueError(f"Invalid image format. Accepted: JPEG, PNG, GIF, WebP") else: raise ValueError("Could not determine image format") # Check dimensions width, height = img.size if max(width, height) > MAX_DIMENSION: raise ValueError(f"Image dimensions too large. Maximum is {MAX_DIMENSION}x{MAX_DIMENSION} pixels") return mime_type, width, height def optimize_image(image_data: bytes) -> Tuple[Image.Image, int, int]: """ Optimize image for web display Per ADR-058: - Auto-resize if >2048px (maintaining aspect ratio) - Correct EXIF orientation - 95% quality Per Q12: Preserve GIF animation during resize Args: image_data: Raw image bytes Returns: Tuple of (optimized_image, width, height) """ img = Image.open(io.BytesIO(image_data)) # Correct EXIF orientation (per ADR-058) img = ImageOps.exif_transpose(img) if img.format != 'GIF' else img # Get original dimensions width, height = img.size # Resize if needed (per ADR-058: >2048px gets resized) if max(width, height) > RESIZE_DIMENSION: # For GIFs, we need special handling to preserve animation if img.format == 'GIF' and getattr(img, 'is_animated', False): # For animated GIFs, just return original # Per Q12: Preserve GIF animation # Note: Resizing animated GIFs is complex, skip for v1.2.0 return img, width, height else: # Calculate new size maintaining aspect ratio img.thumbnail((RESIZE_DIMENSION, RESIZE_DIMENSION), Image.Resampling.LANCZOS) width, height = img.size return img, width, height def save_media(file_data: bytes, filename: str) -> Dict: """ Save uploaded media file Per Q5: UUID-based filename to avoid collisions Per Q2: Date-organized path: /media/YYYY/MM/uuid.ext Per Q6: Validate, optimize, then save Args: file_data: Raw file bytes filename: Original filename Returns: Media metadata dict (for database insert) Raises: ValueError: If validation fails """ from starpunk.database import get_db # Validate image mime_type, orig_width, orig_height = validate_image(file_data, filename) # Optimize image optimized_img, width, height = optimize_image(file_data) # Generate UUID-based filename (per Q5) file_ext = Path(filename).suffix.lower() if not file_ext: # Determine extension from MIME type for mime, exts in ALLOWED_MIME_TYPES.items(): if mime == mime_type: file_ext = exts[0] break stored_filename = f"{uuid.uuid4()}{file_ext}" # Create date-based path (per Q2) now = datetime.now() year = now.strftime('%Y') month = now.strftime('%m') relative_path = f"{year}/{month}/{stored_filename}" # Get media directory from app config media_dir = Path(current_app.config.get('DATA_PATH', 'data')) / 'media' full_dir = media_dir / year / month full_dir.mkdir(parents=True, exist_ok=True) # Save optimized image full_path = full_dir / stored_filename # Determine save format and quality save_format = optimized_img.format or 'PNG' save_kwargs = {'optimize': True} if save_format in ['JPEG', 'JPG']: save_kwargs['quality'] = 95 # Per ADR-058 elif save_format == 'PNG': save_kwargs['optimize'] = True elif save_format == 'WEBP': save_kwargs['quality'] = 95 optimized_img.save(full_path, format=save_format, **save_kwargs) # Get actual file size after optimization actual_size = full_path.stat().st_size # Insert into database db = get_db(current_app) cursor = db.execute( """ INSERT INTO media (filename, stored_filename, path, mime_type, size, width, height) VALUES (?, ?, ?, ?, ?, ?, ?) """, (filename, stored_filename, relative_path, mime_type, actual_size, width, height) ) db.commit() media_id = cursor.lastrowid return { 'id': media_id, 'filename': filename, 'stored_filename': stored_filename, 'path': relative_path, 'mime_type': mime_type, 'size': actual_size, 'width': width, 'height': height } def attach_media_to_note(note_id: int, media_ids: List[int], captions: List[str]) -> None: """ Attach media files to note Per Q4: Happens after note creation Per Q7: Captions are optional per image Args: note_id: Note to attach to media_ids: List of media IDs (max 4) captions: List of captions (same length as media_ids) Raises: ValueError: If more than MAX_IMAGES_PER_NOTE """ from starpunk.database import get_db if len(media_ids) > MAX_IMAGES_PER_NOTE: raise ValueError(f"Maximum {MAX_IMAGES_PER_NOTE} images per note") db = get_db(current_app) # Delete existing associations (for edit case) db.execute("DELETE FROM note_media WHERE note_id = ?", (note_id,)) # Insert new associations for i, (media_id, caption) in enumerate(zip(media_ids, captions)): db.execute( """ INSERT INTO note_media (note_id, media_id, display_order, caption) VALUES (?, ?, ?, ?) """, (note_id, media_id, i, caption or None) ) db.commit() def get_note_media(note_id: int) -> List[Dict]: """ Get all media attached to a note Returns list sorted by display_order Args: note_id: Note ID to get media for Returns: List of media dicts with metadata """ from starpunk.database import get_db db = get_db(current_app) rows = db.execute( """ SELECT m.id, m.filename, m.stored_filename, m.path, m.mime_type, m.size, m.width, m.height, nm.caption, nm.display_order FROM note_media nm JOIN media m ON nm.media_id = m.id WHERE nm.note_id = ? ORDER BY nm.display_order """, (note_id,) ).fetchall() return [ { 'id': row[0], 'filename': row[1], 'stored_filename': row[2], 'path': row[3], 'mime_type': row[4], 'size': row[5], 'width': row[6], 'height': row[7], 'caption': row[8], 'display_order': row[9] } for row in rows ] def delete_media(media_id: int) -> None: """ Delete media file and database record Per Q8: Cleanup orphaned files Args: media_id: Media ID to delete """ from starpunk.database import get_db db = get_db(current_app) # Get media path before deleting row = db.execute("SELECT path FROM media WHERE id = ?", (media_id,)).fetchone() if not row: return media_path = row[0] # Delete database record (cascade will delete note_media entries) db.execute("DELETE FROM media WHERE id = ?", (media_id,)) db.commit() # Delete file from disk media_dir = Path(current_app.config.get('DATA_PATH', 'data')) / 'media' full_path = media_dir / media_path if full_path.exists(): full_path.unlink() current_app.logger.info(f"Deleted media file: {media_path}")