- URL validation with domain whitelist - Path validation to prevent directory traversal - Resource limits (content size, scroll iterations) - Content filtering and sanitization - Non-root Docker execution with gosu - Configurable output directory via CLI/env vars - Fixed Docker volume permission issues 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
225 lines
7.5 KiB
Python
225 lines
7.5 KiB
Python
from playwright.sync_api import sync_playwright
|
|
from bs4 import BeautifulSoup
|
|
from feedgen.feed import FeedGenerator
|
|
from datetime import datetime
|
|
import pytz
|
|
import time
|
|
import urllib.parse
|
|
import os
|
|
import sys
|
|
|
|
# Allowed domains for scraping - security whitelist
|
|
ALLOWED_DOMAINS = [
|
|
'warhammer-community.com',
|
|
'www.warhammer-community.com'
|
|
]
|
|
|
|
# Resource limits
|
|
MAX_SCROLL_ITERATIONS = 5
|
|
MAX_CONTENT_SIZE = 10 * 1024 * 1024 # 10MB
|
|
|
|
def validate_url(url):
|
|
"""Validate URL against whitelist of allowed domains"""
|
|
try:
|
|
parsed = urllib.parse.urlparse(url)
|
|
if not parsed.scheme or not parsed.netloc:
|
|
raise ValueError("Invalid URL format")
|
|
|
|
# Check if domain is in allowed list
|
|
domain = parsed.netloc.lower()
|
|
if domain not in ALLOWED_DOMAINS:
|
|
raise ValueError(f"Domain {domain} not in allowed list: {ALLOWED_DOMAINS}")
|
|
|
|
return True
|
|
except Exception as e:
|
|
raise ValueError(f"URL validation failed: {e}")
|
|
|
|
def validate_output_path(path, base_dir):
|
|
"""Validate and sanitize output file path"""
|
|
# Resolve to absolute path and check if it's safe
|
|
abs_path = os.path.abspath(path)
|
|
abs_base = os.path.abspath(base_dir)
|
|
|
|
# Ensure path is within allowed directory
|
|
if not abs_path.startswith(abs_base):
|
|
raise ValueError(f"Output path {abs_path} is outside allowed directory {abs_base}")
|
|
|
|
# Ensure output directory exists
|
|
os.makedirs(abs_base, exist_ok=True)
|
|
|
|
return abs_path
|
|
|
|
def sanitize_text(text):
|
|
"""Sanitize text content to prevent injection attacks"""
|
|
if not text:
|
|
return "No title"
|
|
|
|
# Remove potential harmful characters and limit length
|
|
sanitized = text.strip()[:500] # Limit title length
|
|
|
|
# Remove any script tags or potentially harmful content
|
|
dangerous_patterns = ['<script', '</script', 'javascript:', 'data:', 'vbscript:']
|
|
for pattern in dangerous_patterns:
|
|
sanitized = sanitized.replace(pattern.lower(), '').replace(pattern.upper(), '')
|
|
|
|
return sanitized if sanitized else "No title"
|
|
|
|
def validate_link(link, base_url):
|
|
"""Validate and sanitize article links"""
|
|
if not link:
|
|
return None
|
|
|
|
try:
|
|
# Handle relative URLs
|
|
if link.startswith('/'):
|
|
parsed_base = urllib.parse.urlparse(base_url)
|
|
link = f"{parsed_base.scheme}://{parsed_base.netloc}{link}"
|
|
|
|
# Validate the resulting URL
|
|
parsed = urllib.parse.urlparse(link)
|
|
if not parsed.scheme or not parsed.netloc:
|
|
return None
|
|
|
|
# Ensure it's from allowed domain
|
|
domain = parsed.netloc.lower()
|
|
if domain not in ALLOWED_DOMAINS:
|
|
return None
|
|
|
|
return link
|
|
except Exception:
|
|
return None
|
|
|
|
# Function to scrape articles using Playwright and generate an RSS feed
|
|
def scrape_and_generate_rss(url, output_dir=None):
|
|
# Validate URL first
|
|
validate_url(url)
|
|
|
|
# Set default output directory if not provided
|
|
if output_dir is None:
|
|
output_dir = '.' # Default to current directory
|
|
|
|
articles = []
|
|
seen_urls = set() # Set to track seen URLs and avoid duplicates
|
|
|
|
# Use Playwright to load the page
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=True)
|
|
page = browser.new_page()
|
|
|
|
# Set a longer timeout for loading the page
|
|
page.set_default_navigation_timeout(120000)
|
|
|
|
# Load the Warhammer Community page
|
|
page.goto(url, wait_until="networkidle")
|
|
|
|
# Simulate scrolling to load more content if needed (limited for security)
|
|
for _ in range(MAX_SCROLL_ITERATIONS):
|
|
page.evaluate("window.scrollBy(0, document.body.scrollHeight)")
|
|
time.sleep(2)
|
|
|
|
# Get the fully rendered HTML content
|
|
html = page.content()
|
|
|
|
# Check content size for security
|
|
if len(html) > MAX_CONTENT_SIZE:
|
|
browser.close()
|
|
raise ValueError(f"Content size {len(html)} exceeds maximum {MAX_CONTENT_SIZE}")
|
|
|
|
browser.close()
|
|
|
|
# Parse the HTML content with BeautifulSoup
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
|
|
# Define a timezone (UTC in this case)
|
|
timezone = pytz.UTC
|
|
|
|
# Find all articles in the page
|
|
for article in soup.find_all('article'):
|
|
# Extract and sanitize the title
|
|
title_tag = article.find('h3', class_='newsCard-title-sm') or article.find('h3', class_='newsCard-title-lg')
|
|
raw_title = title_tag.text.strip() if title_tag else 'No title'
|
|
title = sanitize_text(raw_title)
|
|
|
|
# Extract and validate the link
|
|
link_tag = article.find('a', href=True)
|
|
raw_link = link_tag['href'] if link_tag else None
|
|
link = validate_link(raw_link, url)
|
|
|
|
# Skip this entry if the link is None or the URL has already been seen
|
|
if not link or link in seen_urls:
|
|
continue # Skip duplicates or invalid entries
|
|
|
|
seen_urls.add(link) # Add the URL to the set of seen URLs
|
|
|
|
# Extract the publication date and ignore reading time
|
|
date = None
|
|
for time_tag in article.find_all('time'):
|
|
raw_date = time_tag.text.strip()
|
|
|
|
# Ignore "min" time blocks (reading time)
|
|
if "min" not in raw_date.lower():
|
|
try:
|
|
# Parse the actual date (e.g., "02 Oct 24")
|
|
date = datetime.strptime(raw_date, '%d %b %y')
|
|
date = timezone.localize(date) # Localize with UTC
|
|
break # Stop after finding the correct date
|
|
except ValueError:
|
|
continue
|
|
|
|
# If no valid date is found, use the current date as a fallback
|
|
if not date:
|
|
date = datetime.now(timezone)
|
|
|
|
# Add the article to the list with its publication date
|
|
articles.append({
|
|
'title': title,
|
|
'link': link,
|
|
'date': date
|
|
})
|
|
|
|
# Sort the articles by publication date (newest first)
|
|
articles.sort(key=lambda x: x['date'], reverse=True)
|
|
|
|
# Initialize the RSS feed generator
|
|
fg = FeedGenerator()
|
|
fg.title('Warhammer Community RSS Feed')
|
|
fg.link(href=url)
|
|
fg.description('Latest Warhammer Community Articles')
|
|
|
|
# Add the sorted articles to the RSS feed
|
|
for article in articles:
|
|
fe = fg.add_entry()
|
|
fe.title(article['title'])
|
|
fe.link(href=article['link'])
|
|
fe.pubDate(article['date'])
|
|
|
|
# Generate the RSS feed
|
|
rss_feed = fg.rss_str(pretty=True)
|
|
|
|
# Validate and save the RSS feed to a file
|
|
rss_path = validate_output_path(os.path.join(output_dir, 'warhammer_rss_feed.xml'), output_dir)
|
|
with open(rss_path, 'wb') as f:
|
|
f.write(rss_feed)
|
|
|
|
# Validate and save HTML for debugging
|
|
html_path = validate_output_path(os.path.join(output_dir, 'page.html'), output_dir)
|
|
with open(html_path, 'w', encoding='utf-8') as f:
|
|
f.write(soup.prettify())
|
|
print('RSS feed generated and saved as warhammer_rss_feed.xml')
|
|
|
|
if __name__ == "__main__":
|
|
# Get output directory from environment variable or command line argument
|
|
output_dir = os.getenv('OUTPUT_DIR')
|
|
|
|
if len(sys.argv) > 1:
|
|
output_dir = sys.argv[1]
|
|
|
|
# Default to current directory if no output specified (avoids permission issues)
|
|
if not output_dir:
|
|
output_dir = '.'
|
|
|
|
print(f"Using output directory: {output_dir}")
|
|
|
|
# Run the function
|
|
scrape_and_generate_rss('https://www.warhammer-community.com/en-gb/', output_dir)
|