"""Security utilities for content sanitization and rate limiting.""" import time import logging import re from typing import Optional, Dict, Any from datetime import datetime, timedelta import bleach from .config import Config logger = logging.getLogger(__name__) class RateLimiter: """Rate limiter to prevent excessive requests.""" def __init__(self, requests_per_minute: int = 30): self.requests_per_minute = requests_per_minute self.request_times: list = [] self.min_delay_seconds = 60.0 / requests_per_minute self.last_request_time: Optional[float] = None def wait_if_needed(self) -> None: """Wait if necessary to respect rate limits.""" current_time = time.time() # Clean old request times (older than 1 minute) cutoff_time = current_time - 60 self.request_times = [t for t in self.request_times if t > cutoff_time] # Check if we've hit the rate limit if len(self.request_times) >= self.requests_per_minute: sleep_time = 60 - (current_time - self.request_times[0]) if sleep_time > 0: logger.info(f"Rate limit reached, sleeping for {sleep_time:.2f} seconds") time.sleep(sleep_time) # Ensure minimum delay between requests if self.last_request_time: time_since_last = current_time - self.last_request_time if time_since_last < self.min_delay_seconds: sleep_time = self.min_delay_seconds - time_since_last logger.debug(f"Enforcing minimum delay, sleeping for {sleep_time:.2f} seconds") time.sleep(sleep_time) # Record this request self.request_times.append(time.time()) self.last_request_time = time.time() class ContentSanitizer: """Enhanced content sanitization for security.""" def __init__(self): # Allowed HTML tags for RSS content (including structural elements for parsing) self.allowed_tags = [ 'p', 'br', 'strong', 'em', 'b', 'i', 'u', 'span', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'ul', 'ol', 'li', 'blockquote', 'div', 'article', 'section', 'header', 'footer', 'main', 'nav', 'a', 'img', 'figure', 'figcaption', 'time' ] # Allowed attributes self.allowed_attributes = { '*': ['class', 'id'], 'a': ['href', 'title', 'class'], 'img': ['src', 'alt', 'title', 'width', 'height', 'class'], 'time': ['datetime', 'class'], 'div': ['class', 'id'], 'article': ['class', 'id'], 'section': ['class', 'id'] } # Protocols allowed in URLs self.allowed_protocols = ['http', 'https'] # Dangerous patterns to remove (pre-compiled for performance) self.dangerous_patterns = [ re.compile(r']*>.*?', re.IGNORECASE | re.DOTALL), re.compile(r']*>.*?', re.IGNORECASE | re.DOTALL), re.compile(r']*>.*?', re.IGNORECASE | re.DOTALL), re.compile(r']*>.*?', re.IGNORECASE | re.DOTALL), re.compile(r']*>.*?', re.IGNORECASE | re.DOTALL), re.compile(r']*>.*?', re.IGNORECASE | re.DOTALL), re.compile(r'javascript:', re.IGNORECASE), re.compile(r'vbscript:', re.IGNORECASE), re.compile(r'data:', re.IGNORECASE), re.compile(r'on\w+\s*=', re.IGNORECASE), # event handlers like onclick, onload, etc. ] def sanitize_html(self, html_content: str) -> str: """Sanitize HTML content using bleach library.""" if not html_content: return "" try: # First pass: remove obviously dangerous patterns cleaned = html_content for pattern in self.dangerous_patterns: cleaned = pattern.sub('', cleaned) # Second pass: use bleach for comprehensive sanitization sanitized = bleach.clean( cleaned, tags=self.allowed_tags, attributes=self.allowed_attributes, protocols=self.allowed_protocols, strip=True, strip_comments=True ) return sanitized except Exception as e: logger.error(f"Error sanitizing HTML: {e}") # If sanitization fails, return empty string for safety return "" def sanitize_text(self, text: Optional[str]) -> str: """Enhanced text sanitization with better security.""" if not text: return "No title" # Basic cleaning sanitized = text.strip() # Remove null bytes and other control characters sanitized = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', sanitized) # Remove dangerous patterns (case insensitive) for pattern in Config.DANGEROUS_PATTERNS: sanitized = re.sub(re.escape(pattern), '', sanitized, flags=re.IGNORECASE) # Limit length sanitized = sanitized[:Config.MAX_TITLE_LENGTH] # Remove excessive whitespace sanitized = re.sub(r'\s+', ' ', sanitized).strip() return sanitized if sanitized else "No title" def validate_url_security(self, url: str) -> bool: """Enhanced URL validation for security.""" if not url: return False # Check for dangerous protocols dangerous_protocols = ['javascript:', 'vbscript:', 'data:', 'file:', 'ftp:'] url_lower = url.lower() for protocol in dangerous_protocols: if url_lower.startswith(protocol): logger.warning(f"Blocked dangerous protocol in URL: {url}") return False # Check for suspicious patterns suspicious_patterns = [ r'\.\./', # Path traversal r'%2e%2e%2f', # Encoded path traversal r' 2048: logger.warning(f"Blocked excessively long URL (length: {len(url)})") return False return True def sanitize_filename(self, filename: str) -> str: """Sanitize filenames to prevent directory traversal and injection.""" if not filename: return "default" # Remove path separators and dangerous characters sanitized = re.sub(r'[<>:"|?*\\/]', '_', filename) # Remove null bytes and control characters sanitized = re.sub(r'[\x00-\x1F\x7F]', '', sanitized) # Remove leading/trailing dots and spaces sanitized = sanitized.strip('. ') # Prevent reserved Windows filenames reserved_names = [ 'CON', 'PRN', 'AUX', 'NUL', 'COM1', 'COM2', 'COM3', 'COM4', 'COM5', 'COM6', 'COM7', 'COM8', 'COM9', 'LPT1', 'LPT2', 'LPT3', 'LPT4', 'LPT5', 'LPT6', 'LPT7', 'LPT8', 'LPT9' ] if sanitized.upper() in reserved_names: sanitized = f"file_{sanitized}" # Limit length sanitized = sanitized[:255] return sanitized if sanitized else "default" # Global instances _rate_limiter = RateLimiter(requests_per_minute=30) _sanitizer = ContentSanitizer() def wait_for_rate_limit() -> None: """Apply rate limiting.""" _rate_limiter.wait_if_needed() def sanitize_html_content(html: str) -> str: """Sanitize HTML content.""" return _sanitizer.sanitize_html(html) def sanitize_text_content(text: Optional[str]) -> str: """Sanitize text content.""" return _sanitizer.sanitize_text(text) def validate_url_security(url: str) -> bool: """Validate URL for security.""" return _sanitizer.validate_url_security(url) def sanitize_filename(filename: str) -> str: """Sanitize filename.""" return _sanitizer.sanitize_filename(filename)