iPhones use MPO (Multi-Picture Object) format for depth/portrait photos. This format contains multiple JPEG images (main + depth map). Pillow opens these as MPO format which wasn't in our allowed types. Added automatic MPO to JPEG conversion by extracting the primary image. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
774 lines
25 KiB
Python
774 lines
25 KiB
Python
"""
|
|
Media upload and management for StarPunk
|
|
|
|
Per ADR-057 and ADR-058:
|
|
- Social media attachment model (media at top of note)
|
|
- Pillow-based image optimization
|
|
- 50MB max upload, 10MB max output (v1.4.0)
|
|
- Image variants: thumb, small, medium, large (v1.4.0)
|
|
- Tiered resize strategy based on input size (v1.4.0)
|
|
- 4096x4096 max dimensions
|
|
- 4 images max per note
|
|
"""
|
|
|
|
from PIL import Image, ImageOps
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
import uuid
|
|
import io
|
|
from typing import Optional, List, Dict, Tuple
|
|
from flask import current_app
|
|
|
|
# HEIC/HEIF support - import registers with Pillow automatically
|
|
try:
|
|
import pillow_heif
|
|
pillow_heif.register_heif_opener()
|
|
HEIC_SUPPORTED = True
|
|
except ImportError:
|
|
HEIC_SUPPORTED = False
|
|
|
|
# Allowed MIME types per Q11
|
|
ALLOWED_MIME_TYPES = {
|
|
'image/jpeg': ['.jpg', '.jpeg'],
|
|
'image/png': ['.png'],
|
|
'image/gif': ['.gif'],
|
|
'image/webp': ['.webp']
|
|
}
|
|
|
|
# Limits per Q&A and ADR-058 (updated in v1.4.0)
|
|
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB (v1.4.0)
|
|
MAX_OUTPUT_SIZE = 10 * 1024 * 1024 # 10MB target after optimization (v1.4.0)
|
|
MAX_DIMENSION = 4096 # 4096x4096 max
|
|
RESIZE_DIMENSION = 2048 # Auto-resize to 2048px (default)
|
|
MIN_QUALITY = 70 # Minimum JPEG quality before rejection (v1.4.0)
|
|
MIN_DIMENSION = 640 # Minimum dimension before rejection (v1.4.0)
|
|
MAX_IMAGES_PER_NOTE = 4
|
|
|
|
# Variant specifications (v1.4.0 Phase 2)
|
|
VARIANT_SPECS = {
|
|
'thumb': {'size': (150, 150), 'crop': True},
|
|
'small': {'width': 320, 'crop': False},
|
|
'medium': {'width': 640, 'crop': False},
|
|
'large': {'width': 1280, 'crop': False},
|
|
}
|
|
|
|
|
|
def get_optimization_params(file_size: int) -> Tuple[int, int]:
|
|
"""
|
|
Determine optimization parameters based on input file size
|
|
|
|
Per v1.4.0 tiered resize strategy:
|
|
- <=10MB: 2048px max, 95% quality
|
|
- 10-25MB: 1600px max, 90% quality
|
|
- 25-50MB: 1280px max, 85% quality
|
|
|
|
Args:
|
|
file_size: Original file size in bytes
|
|
|
|
Returns:
|
|
Tuple of (max_dimension, quality_percent)
|
|
"""
|
|
if file_size <= 10 * 1024 * 1024: # <=10MB
|
|
return (2048, 95)
|
|
elif file_size <= 25 * 1024 * 1024: # 10-25MB
|
|
return (1600, 90)
|
|
else: # 25-50MB
|
|
return (1280, 85)
|
|
|
|
|
|
def validate_image(file_data: bytes, filename: str) -> Tuple[bytes, str, int, int]:
|
|
"""
|
|
Validate image file
|
|
|
|
Per Q11: Validate MIME type using Pillow
|
|
Per Q6: Reject if >50MB or >4096px (updated v1.4.0)
|
|
Per v1.4.2: Convert HEIC to JPEG (browsers cannot display HEIC)
|
|
|
|
Args:
|
|
file_data: Raw file bytes
|
|
filename: Original filename
|
|
|
|
Returns:
|
|
Tuple of (file_data, mime_type, width, height)
|
|
Note: file_data may be converted (e.g., HEIC to JPEG)
|
|
|
|
Raises:
|
|
ValueError: If file is invalid
|
|
"""
|
|
# Check file size first (before loading)
|
|
file_size = len(file_data)
|
|
if file_size > MAX_FILE_SIZE:
|
|
raise ValueError("File too large. Maximum size is 50MB")
|
|
|
|
# Try to open with Pillow (validates integrity)
|
|
try:
|
|
img = Image.open(io.BytesIO(file_data))
|
|
img.verify() # Verify it's a valid image
|
|
|
|
# Re-open after verify (verify() closes the file)
|
|
img = Image.open(io.BytesIO(file_data))
|
|
except Exception as e:
|
|
# v1.4.2: If Pillow can't open, try explicitly as HEIC
|
|
# iOS sometimes saves HEIC with .jpeg extension
|
|
if HEIC_SUPPORTED:
|
|
try:
|
|
heif_file = pillow_heif.read_heif(file_data)
|
|
img = Image.frombytes(
|
|
heif_file.mode,
|
|
heif_file.size,
|
|
heif_file.data,
|
|
"raw",
|
|
)
|
|
# Mark as HEIF so conversion happens below
|
|
img.format = 'HEIF'
|
|
except Exception as heic_error:
|
|
# Log the magic bytes and save file for debugging (if in app context)
|
|
try:
|
|
magic = file_data[:12].hex() if len(file_data) >= 12 else file_data.hex()
|
|
current_app.logger.warning(
|
|
f'Media upload failed both Pillow and HEIC: filename="{filename}", '
|
|
f'magic_bytes={magic}, pillow_error="{e}", heic_error="{heic_error}"'
|
|
)
|
|
# Save failed file for analysis
|
|
debug_dir = Path(current_app.config.get('DATA_PATH', 'data')) / 'debug'
|
|
debug_dir.mkdir(parents=True, exist_ok=True)
|
|
debug_file = debug_dir / f"failed_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{filename}"
|
|
debug_file.write_bytes(file_data)
|
|
current_app.logger.info(f'Saved failed upload for analysis: {debug_file}')
|
|
except RuntimeError:
|
|
pass # Outside app context (e.g., tests)
|
|
raise ValueError(f"Invalid or corrupted image: {e}")
|
|
else:
|
|
raise ValueError(f"Invalid or corrupted image: {e}")
|
|
|
|
# HEIC/HEIF conversion (v1.4.2)
|
|
# HEIC cannot be displayed in browsers, convert to JPEG
|
|
if img.format in ('HEIF', 'HEIC'):
|
|
if not HEIC_SUPPORTED:
|
|
raise ValueError(
|
|
"HEIC/HEIF images require pillow-heif library. "
|
|
"Please convert to JPEG before uploading."
|
|
)
|
|
# Convert HEIC to JPEG in memory
|
|
output = io.BytesIO()
|
|
# Convert to RGB if needed (HEIC may have alpha channel)
|
|
if img.mode in ('RGBA', 'P'):
|
|
img = img.convert('RGB')
|
|
img.save(output, format='JPEG', quality=95)
|
|
output.seek(0)
|
|
# Re-open as JPEG for further processing
|
|
file_data = output.getvalue()
|
|
img = Image.open(io.BytesIO(file_data))
|
|
|
|
# MPO (Multi-Picture Object) conversion (v1.4.2)
|
|
# MPO is used by iPhones for depth/portrait photos - extract primary image as JPEG
|
|
if img.format == 'MPO':
|
|
output = io.BytesIO()
|
|
# Convert to RGB if needed
|
|
if img.mode in ('RGBA', 'P'):
|
|
img = img.convert('RGB')
|
|
img.save(output, format='JPEG', quality=95)
|
|
output.seek(0)
|
|
file_data = output.getvalue()
|
|
img = Image.open(io.BytesIO(file_data))
|
|
|
|
# Check format is allowed
|
|
if img.format:
|
|
format_lower = img.format.lower()
|
|
mime_type = f'image/{format_lower}'
|
|
|
|
# Special case: JPEG format can be reported as 'jpeg'
|
|
if format_lower == 'jpeg':
|
|
mime_type = 'image/jpeg'
|
|
|
|
if mime_type not in ALLOWED_MIME_TYPES:
|
|
# Log the detected format for debugging (v1.4.2)
|
|
try:
|
|
current_app.logger.warning(
|
|
f'Media upload rejected format: filename="{filename}", '
|
|
f'detected_format="{img.format}", mime_type="{mime_type}"'
|
|
)
|
|
except RuntimeError:
|
|
pass # Outside app context
|
|
raise ValueError(f"Invalid image format '{img.format}'. Accepted: JPEG, PNG, GIF, WebP")
|
|
else:
|
|
raise ValueError("Could not determine image format")
|
|
|
|
# Check dimensions
|
|
width, height = img.size
|
|
if max(width, height) > MAX_DIMENSION:
|
|
raise ValueError(f"Image dimensions too large. Maximum is {MAX_DIMENSION}x{MAX_DIMENSION} pixels")
|
|
|
|
# Check for animated GIF (v1.4.0)
|
|
# Animated GIFs cannot be resized, so reject if >10MB
|
|
if img.format == 'GIF':
|
|
try:
|
|
img.seek(1) # Try to seek to second frame
|
|
# If successful, it's animated
|
|
if file_size > MAX_OUTPUT_SIZE:
|
|
raise ValueError(
|
|
"Animated GIF too large. Maximum size for animated GIFs is 10MB. "
|
|
"Consider using a shorter clip or lower resolution."
|
|
)
|
|
img.seek(0) # Reset to first frame
|
|
except EOFError:
|
|
# Not animated, continue normally
|
|
pass
|
|
|
|
return file_data, mime_type, width, height
|
|
|
|
|
|
def optimize_image(image_data: bytes, original_size: int = None) -> Tuple[Image.Image, int, int, bytes]:
|
|
"""
|
|
Optimize image for web display with size-aware strategy
|
|
|
|
Per v1.4.0:
|
|
- Tiered resize strategy based on input size
|
|
- Iterative quality reduction if needed
|
|
- Target output <=10MB
|
|
|
|
Args:
|
|
image_data: Raw image bytes
|
|
original_size: Original file size (for tiered optimization)
|
|
|
|
Returns:
|
|
Tuple of (optimized_image, width, height, optimized_bytes)
|
|
|
|
Raises:
|
|
ValueError: If image cannot be optimized to target size
|
|
"""
|
|
if original_size is None:
|
|
original_size = len(image_data)
|
|
|
|
# Get initial optimization parameters based on input size
|
|
max_dim, quality = get_optimization_params(original_size)
|
|
|
|
img = Image.open(io.BytesIO(image_data))
|
|
|
|
# Save original format before any processing (copy() loses this)
|
|
original_format = img.format
|
|
|
|
# Correct EXIF orientation (per ADR-058), except for GIFs
|
|
img = ImageOps.exif_transpose(img) if img.format != 'GIF' else img
|
|
|
|
# For animated GIFs, return as-is (already validated in validate_image)
|
|
if img.format == 'GIF' and getattr(img, 'is_animated', False):
|
|
# Already checked size in validate_image, just return original
|
|
return img, img.size[0], img.size[1], image_data
|
|
|
|
# Iterative optimization loop
|
|
while True:
|
|
# Create copy for this iteration
|
|
work_img = img.copy()
|
|
|
|
# Resize if needed
|
|
if max(work_img.size) > max_dim:
|
|
work_img.thumbnail((max_dim, max_dim), Image.Resampling.LANCZOS)
|
|
|
|
# Save to bytes to check size
|
|
output = io.BytesIO()
|
|
# Use original format (copy() loses the format attribute)
|
|
save_format = original_format or 'JPEG'
|
|
save_kwargs = {'optimize': True}
|
|
|
|
if save_format in ['JPEG', 'JPG']:
|
|
save_kwargs['quality'] = quality
|
|
elif save_format == 'WEBP':
|
|
save_kwargs['quality'] = quality
|
|
# For GIF and PNG, just use optimize flag
|
|
|
|
work_img.save(output, format=save_format, **save_kwargs)
|
|
output_bytes = output.getvalue()
|
|
|
|
# Check output size
|
|
if len(output_bytes) <= MAX_OUTPUT_SIZE:
|
|
width, height = work_img.size
|
|
return work_img, width, height, output_bytes
|
|
|
|
# Need to reduce further
|
|
if quality > MIN_QUALITY:
|
|
# Reduce quality first
|
|
quality -= 5
|
|
else:
|
|
# Already at min quality, reduce dimensions
|
|
max_dim = int(max_dim * 0.8)
|
|
quality = 85 # Reset quality for new dimension
|
|
|
|
# Safety check: minimum dimension
|
|
if max_dim < MIN_DIMENSION:
|
|
raise ValueError(
|
|
"Image cannot be optimized to target size. "
|
|
"Please use a smaller or lower-resolution image."
|
|
)
|
|
|
|
|
|
def generate_variant(
|
|
img: Image.Image,
|
|
variant_type: str,
|
|
base_path: Path,
|
|
base_filename: str,
|
|
file_ext: str
|
|
) -> Dict:
|
|
"""
|
|
Generate a single image variant
|
|
|
|
Args:
|
|
img: Source PIL Image
|
|
variant_type: One of 'thumb', 'small', 'medium', 'large'
|
|
base_path: Directory to save to
|
|
base_filename: Base filename (UUID without extension)
|
|
file_ext: File extension (e.g., '.jpg')
|
|
|
|
Returns:
|
|
Dict with variant metadata (path, width, height, size_bytes)
|
|
"""
|
|
spec = VARIANT_SPECS[variant_type]
|
|
work_img = img.copy()
|
|
|
|
if spec.get('crop'):
|
|
# Center crop for thumbnails using ImageOps.fit()
|
|
work_img = ImageOps.fit(
|
|
work_img,
|
|
spec['size'],
|
|
method=Image.Resampling.LANCZOS,
|
|
centering=(0.5, 0.5)
|
|
)
|
|
else:
|
|
# Aspect-preserving resize
|
|
target_width = spec['width']
|
|
if work_img.width > target_width:
|
|
ratio = target_width / work_img.width
|
|
new_height = int(work_img.height * ratio)
|
|
work_img = work_img.resize(
|
|
(target_width, new_height),
|
|
Image.Resampling.LANCZOS
|
|
)
|
|
|
|
# Generate variant filename
|
|
variant_filename = f"{base_filename}_{variant_type}{file_ext}"
|
|
variant_path = base_path / variant_filename
|
|
|
|
# Save with appropriate quality
|
|
save_kwargs = {'optimize': True}
|
|
if work_img.format in ['JPEG', 'JPG', None]:
|
|
save_kwargs['quality'] = 85
|
|
|
|
# Determine format from extension
|
|
save_format = 'JPEG' if file_ext.lower() in ['.jpg', '.jpeg'] else file_ext[1:].upper()
|
|
work_img.save(variant_path, format=save_format, **save_kwargs)
|
|
|
|
return {
|
|
'variant_type': variant_type,
|
|
'path': str(variant_path.relative_to(base_path.parent.parent)), # Relative to media root
|
|
'width': work_img.width,
|
|
'height': work_img.height,
|
|
'size_bytes': variant_path.stat().st_size
|
|
}
|
|
|
|
|
|
def generate_all_variants(
|
|
img: Image.Image,
|
|
base_path: Path,
|
|
base_filename: str,
|
|
file_ext: str,
|
|
media_id: int,
|
|
year: str,
|
|
month: str,
|
|
optimized_bytes: bytes
|
|
) -> List[Dict]:
|
|
"""
|
|
Generate all variants for an image and store in database
|
|
|
|
Args:
|
|
img: Source PIL Image (the optimized original)
|
|
base_path: Directory containing the original
|
|
base_filename: Base filename (UUID without extension)
|
|
file_ext: File extension
|
|
media_id: ID of parent media record
|
|
year: Year string (e.g., '2025') for path calculation
|
|
month: Month string (e.g., '01') for path calculation
|
|
optimized_bytes: Bytes of optimized original (avoids re-reading file)
|
|
|
|
Returns:
|
|
List of variant metadata dicts
|
|
"""
|
|
from starpunk.database import get_db
|
|
|
|
variants = []
|
|
db = get_db(current_app)
|
|
created_files = [] # Track files for cleanup on failure
|
|
|
|
try:
|
|
# Generate each variant type
|
|
for variant_type in ['thumb', 'small', 'medium', 'large']:
|
|
# Skip if image is smaller than target
|
|
spec = VARIANT_SPECS[variant_type]
|
|
target_width = spec.get('width') or spec['size'][0]
|
|
|
|
if img.width < target_width and variant_type != 'thumb':
|
|
continue # Skip variants larger than original
|
|
|
|
variant = generate_variant(img, variant_type, base_path, base_filename, file_ext)
|
|
variants.append(variant)
|
|
created_files.append(base_path / f"{base_filename}_{variant_type}{file_ext}")
|
|
|
|
# Insert into database
|
|
db.execute(
|
|
"""
|
|
INSERT INTO media_variants
|
|
(media_id, variant_type, path, width, height, size_bytes)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(media_id, variant['variant_type'], variant['path'],
|
|
variant['width'], variant['height'], variant['size_bytes'])
|
|
)
|
|
|
|
# Also record the original as 'original' variant
|
|
# Use explicit year/month for path calculation (avoids fragile parent traversal)
|
|
original_path = f"{year}/{month}/{base_filename}{file_ext}"
|
|
db.execute(
|
|
"""
|
|
INSERT INTO media_variants
|
|
(media_id, variant_type, path, width, height, size_bytes)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(media_id, 'original', original_path, img.width, img.height,
|
|
len(optimized_bytes)) # Use passed bytes instead of file I/O
|
|
)
|
|
|
|
db.commit()
|
|
return variants
|
|
|
|
except Exception as e:
|
|
# Clean up any created variant files on failure
|
|
for file_path in created_files:
|
|
try:
|
|
if file_path.exists():
|
|
file_path.unlink()
|
|
except OSError:
|
|
pass # Best effort cleanup
|
|
raise # Re-raise the original exception
|
|
|
|
|
|
def save_media(file_data: bytes, filename: str) -> Dict:
|
|
"""
|
|
Save uploaded media file
|
|
|
|
Per Q5: UUID-based filename to avoid collisions
|
|
Per Q2: Date-organized path: /media/YYYY/MM/uuid.ext
|
|
Per Q6: Validate, optimize, then save
|
|
Per v1.4.0: Size-aware optimization with iterative quality reduction
|
|
|
|
Args:
|
|
file_data: Raw file bytes
|
|
filename: Original filename
|
|
|
|
Returns:
|
|
Media metadata dict (for database insert)
|
|
|
|
Raises:
|
|
ValueError: If validation fails
|
|
"""
|
|
from starpunk.database import get_db
|
|
|
|
# Capture file size for logging
|
|
file_size = len(file_data)
|
|
|
|
try:
|
|
# Validate image (returns 4-tuple with potentially converted bytes)
|
|
try:
|
|
file_data, mime_type, orig_width, orig_height = validate_image(file_data, filename)
|
|
except ValueError as e:
|
|
current_app.logger.warning(
|
|
f'Media upload validation failed: filename="{filename}", '
|
|
f'size={file_size}b, error="{e}"'
|
|
)
|
|
raise
|
|
|
|
# Optimize image with size-aware strategy (now returns 4-tuple with bytes)
|
|
try:
|
|
optimized_img, width, height, optimized_bytes = optimize_image(file_data, file_size)
|
|
except ValueError as e:
|
|
current_app.logger.warning(
|
|
f'Media upload optimization failed: filename="{filename}", '
|
|
f'size={file_size}b, error="{e}"'
|
|
)
|
|
raise
|
|
|
|
# Generate UUID-based filename (per Q5)
|
|
file_ext = Path(filename).suffix.lower()
|
|
if not file_ext:
|
|
# Determine extension from MIME type
|
|
for mime, exts in ALLOWED_MIME_TYPES.items():
|
|
if mime == mime_type:
|
|
file_ext = exts[0]
|
|
break
|
|
|
|
stored_filename = f"{uuid.uuid4()}{file_ext}"
|
|
|
|
# Create date-based path (per Q2)
|
|
now = datetime.now()
|
|
year = now.strftime('%Y')
|
|
month = now.strftime('%m')
|
|
relative_path = f"{year}/{month}/{stored_filename}"
|
|
|
|
# Get media directory from app config
|
|
media_dir = Path(current_app.config.get('DATA_PATH', 'data')) / 'media'
|
|
full_dir = media_dir / year / month
|
|
full_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Save optimized image (using bytes from optimize_image to avoid re-encoding)
|
|
full_path = full_dir / stored_filename
|
|
full_path.write_bytes(optimized_bytes)
|
|
|
|
# Get actual file size (from optimized bytes)
|
|
actual_size = len(optimized_bytes)
|
|
|
|
# Insert into database
|
|
db = get_db(current_app)
|
|
cursor = db.execute(
|
|
"""
|
|
INSERT INTO media (filename, stored_filename, path, mime_type, size, width, height)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(filename, stored_filename, relative_path, mime_type, actual_size, width, height)
|
|
)
|
|
db.commit()
|
|
media_id = cursor.lastrowid
|
|
|
|
# Generate variants (synchronous) - v1.4.0 Phase 2
|
|
# Pass year, month, and optimized_bytes to avoid fragile path traversal and file I/O
|
|
base_filename = stored_filename.rsplit('.', 1)[0]
|
|
variants = []
|
|
try:
|
|
variants = generate_all_variants(
|
|
optimized_img,
|
|
full_dir,
|
|
base_filename,
|
|
file_ext,
|
|
media_id,
|
|
year,
|
|
month,
|
|
optimized_bytes
|
|
)
|
|
except Exception as e:
|
|
current_app.logger.warning(
|
|
f'Media upload variant generation failed: filename="{filename}", '
|
|
f'media_id={media_id}, error="{e}"'
|
|
)
|
|
# Continue - original image is still usable
|
|
|
|
# Log success
|
|
was_optimized = len(optimized_bytes) < file_size
|
|
current_app.logger.info(
|
|
f'Media upload successful: filename="{filename}", '
|
|
f'stored="{stored_filename}", size={len(optimized_bytes)}b, '
|
|
f'optimized={was_optimized}, variants={len(variants)}'
|
|
)
|
|
|
|
return {
|
|
'id': media_id,
|
|
'filename': filename,
|
|
'stored_filename': stored_filename,
|
|
'path': relative_path,
|
|
'mime_type': mime_type,
|
|
'size': actual_size,
|
|
'width': width,
|
|
'height': height,
|
|
'variants': variants
|
|
}
|
|
|
|
except ValueError:
|
|
# Already logged at WARNING level in validation/optimization blocks
|
|
raise
|
|
|
|
except Exception as e:
|
|
current_app.logger.error(
|
|
f'Media upload failed unexpectedly: filename="{filename}", '
|
|
f'error_type="{type(e).__name__}", error="{e}"'
|
|
)
|
|
raise
|
|
|
|
|
|
def attach_media_to_note(note_id: int, media_ids: List[int], captions: List[str]) -> None:
|
|
"""
|
|
Attach media files to note
|
|
|
|
Per Q4: Happens after note creation
|
|
Per Q7: Captions are optional per image
|
|
|
|
Args:
|
|
note_id: Note to attach to
|
|
media_ids: List of media IDs (max 4)
|
|
captions: List of captions (same length as media_ids)
|
|
|
|
Raises:
|
|
ValueError: If more than MAX_IMAGES_PER_NOTE
|
|
"""
|
|
from starpunk.database import get_db
|
|
|
|
if len(media_ids) > MAX_IMAGES_PER_NOTE:
|
|
raise ValueError(f"Maximum {MAX_IMAGES_PER_NOTE} images per note")
|
|
|
|
db = get_db(current_app)
|
|
|
|
# Delete existing associations (for edit case)
|
|
db.execute("DELETE FROM note_media WHERE note_id = ?", (note_id,))
|
|
|
|
# Insert new associations
|
|
for i, (media_id, caption) in enumerate(zip(media_ids, captions)):
|
|
db.execute(
|
|
"""
|
|
INSERT INTO note_media (note_id, media_id, display_order, caption)
|
|
VALUES (?, ?, ?, ?)
|
|
""",
|
|
(note_id, media_id, i, caption or None)
|
|
)
|
|
|
|
db.commit()
|
|
|
|
|
|
def get_note_media(note_id: int) -> List[Dict]:
|
|
"""
|
|
Get all media attached to a note with variants (v1.4.0)
|
|
|
|
Returns list sorted by display_order
|
|
|
|
Args:
|
|
note_id: Note ID to get media for
|
|
|
|
Returns:
|
|
List of media dicts with metadata (includes 'variants' key if variants exist)
|
|
"""
|
|
from starpunk.database import get_db
|
|
|
|
db = get_db(current_app)
|
|
rows = db.execute(
|
|
"""
|
|
SELECT
|
|
m.id,
|
|
m.filename,
|
|
m.stored_filename,
|
|
m.path,
|
|
m.mime_type,
|
|
m.size,
|
|
m.width,
|
|
m.height,
|
|
nm.caption,
|
|
nm.display_order
|
|
FROM note_media nm
|
|
JOIN media m ON nm.media_id = m.id
|
|
WHERE nm.note_id = ?
|
|
ORDER BY nm.display_order
|
|
""",
|
|
(note_id,)
|
|
).fetchall()
|
|
|
|
media_list = []
|
|
for row in rows:
|
|
media_dict = {
|
|
'id': row[0],
|
|
'filename': row[1],
|
|
'stored_filename': row[2],
|
|
'path': row[3],
|
|
'mime_type': row[4],
|
|
'size': row[5],
|
|
'width': row[6],
|
|
'height': row[7],
|
|
'caption': row[8],
|
|
'display_order': row[9]
|
|
}
|
|
|
|
# Fetch variants for this media (v1.4.0 Phase 2)
|
|
variants = db.execute(
|
|
"""
|
|
SELECT variant_type, path, width, height, size_bytes
|
|
FROM media_variants
|
|
WHERE media_id = ?
|
|
ORDER BY
|
|
CASE variant_type
|
|
WHEN 'thumb' THEN 1
|
|
WHEN 'small' THEN 2
|
|
WHEN 'medium' THEN 3
|
|
WHEN 'large' THEN 4
|
|
WHEN 'original' THEN 5
|
|
END
|
|
""",
|
|
(row[0],)
|
|
).fetchall()
|
|
|
|
# Only add 'variants' key if variants exist (backwards compatibility)
|
|
# Pre-v1.4.0 media won't have variants, and consumers shouldn't
|
|
# expect the key to be present
|
|
if variants:
|
|
media_dict['variants'] = {
|
|
v[0]: {
|
|
'path': v[1],
|
|
'width': v[2],
|
|
'height': v[3],
|
|
'size_bytes': v[4]
|
|
}
|
|
for v in variants
|
|
}
|
|
|
|
media_list.append(media_dict)
|
|
|
|
return media_list
|
|
|
|
|
|
def delete_media(media_id: int) -> None:
|
|
"""
|
|
Delete media file, variants, and database record
|
|
|
|
Per Q8: Cleanup orphaned files
|
|
Per v1.4.0: Also cleanup variant files
|
|
|
|
Args:
|
|
media_id: Media ID to delete
|
|
"""
|
|
from starpunk.database import get_db
|
|
|
|
db = get_db(current_app)
|
|
|
|
# Get media path before deleting
|
|
row = db.execute("SELECT path FROM media WHERE id = ?", (media_id,)).fetchone()
|
|
if not row:
|
|
return
|
|
|
|
media_path = row[0]
|
|
media_dir = Path(current_app.config.get('DATA_PATH', 'data')) / 'media'
|
|
|
|
# Get variant paths before deleting (v1.4.0)
|
|
variant_rows = db.execute(
|
|
"SELECT path FROM media_variants WHERE media_id = ?",
|
|
(media_id,)
|
|
).fetchall()
|
|
|
|
# Delete database record (cascade will delete media_variants and note_media entries)
|
|
db.execute("DELETE FROM media WHERE id = ?", (media_id,))
|
|
db.commit()
|
|
|
|
# Delete files from disk (best-effort cleanup)
|
|
deleted_count = 0
|
|
|
|
# Delete original file
|
|
full_path = media_dir / media_path
|
|
try:
|
|
if full_path.exists():
|
|
full_path.unlink()
|
|
deleted_count += 1
|
|
except OSError as e:
|
|
current_app.logger.warning(f"Failed to delete media file {media_path}: {e}")
|
|
|
|
# Delete variant files (v1.4.0)
|
|
for variant_row in variant_rows:
|
|
variant_path = media_dir / variant_row[0]
|
|
try:
|
|
if variant_path.exists():
|
|
variant_path.unlink()
|
|
deleted_count += 1
|
|
except OSError as e:
|
|
current_app.logger.warning(f"Failed to delete variant file {variant_row[0]}: {e}")
|
|
|
|
current_app.logger.info(f"Deleted media {media_id}: {deleted_count} file(s) removed from disk")
|