Compare commits
No commits in common. "25086fc01b4160d5785049452d01a3691b4e6eda" and "eecee074e2dfd1ed3d012658264203875a2d011c" have entirely different histories.
25086fc01b
...
eecee074e2
15
.gitignore
vendored
15
.gitignore
vendored
@ -1,15 +1,2 @@
|
||||
*.xml
|
||||
.python-version
|
||||
output/
|
||||
output/*
|
||||
cache/
|
||||
*.log
|
||||
__pycache__/
|
||||
*.pyc
|
||||
*.pyo
|
||||
.pytest_cache/
|
||||
.coverage
|
||||
htmlcov/
|
||||
.env
|
||||
.venv/
|
||||
venv/
|
||||
.python-version
|
50
Dockerfile
50
Dockerfile
@ -4,7 +4,7 @@ FROM python:3.12.7-slim-bullseye
|
||||
# Set the working directory
|
||||
WORKDIR /app
|
||||
|
||||
# Install system dependencies needed for Playwright and gosu
|
||||
# Install system dependencies needed for Playwright and its browsers
|
||||
RUN apt-get update && apt-get install -y \
|
||||
bash \
|
||||
build-essential \
|
||||
@ -14,7 +14,6 @@ RUN apt-get update && apt-get install -y \
|
||||
ca-certificates \
|
||||
wget \
|
||||
gnupg \
|
||||
gosu \
|
||||
libnss3 \
|
||||
libatk-bridge2.0-0 \
|
||||
libx11-xcb1 \
|
||||
@ -37,45 +36,22 @@ RUN apt-get update && apt-get install -y \
|
||||
libdrm2 \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Copy requirements and install Python dependencies
|
||||
COPY requirements.txt .
|
||||
# Install Playwright and required Python dependencies
|
||||
RUN pip install --upgrade pip && \
|
||||
pip install -r requirements.txt
|
||||
pip install \
|
||||
playwright \
|
||||
beautifulsoup4 \
|
||||
feedgen \
|
||||
pytz
|
||||
|
||||
# Install Playwright browser binaries
|
||||
RUN playwright install
|
||||
|
||||
# Create an entrypoint script to handle permissions (as root)
|
||||
RUN echo '#!/bin/bash\n\
|
||||
# Fix permissions for mounted volumes\n\
|
||||
if [ -d "/app/output" ]; then\n\
|
||||
chmod 777 /app/output 2>/dev/null || true\n\
|
||||
fi\n\
|
||||
# Run as scraper user\n\
|
||||
exec gosu scraper "$@"' > /entrypoint.sh && chmod +x /entrypoint.sh
|
||||
|
||||
|
||||
# Create non-root user for security
|
||||
RUN useradd -m -u 1001 scraper && \
|
||||
mkdir -p /app/output && \
|
||||
chown -R scraper:scraper /app && \
|
||||
chmod 755 /app/output
|
||||
|
||||
# Copy the application code to the container
|
||||
# Copy the Python script to the container
|
||||
COPY main.py .
|
||||
COPY src/ src/
|
||||
RUN chown -R scraper:scraper main.py src/
|
||||
|
||||
# Set environment variables
|
||||
ENV PYTHONUNBUFFERED=1 \
|
||||
PYTHONDONTWRITEBYTECODE=1 \
|
||||
PLAYWRIGHT_BROWSERS_PATH=/home/scraper/.cache/ms-playwright
|
||||
# Set the environment variable to ensure Playwright works in the container
|
||||
ENV PLAYWRIGHT_BROWSERS_PATH=/root/.cache/ms-playwright
|
||||
|
||||
# Don't switch user here - entrypoint will handle it
|
||||
# USER scraper
|
||||
|
||||
# Install Chromium for the scraper user
|
||||
USER scraper
|
||||
RUN playwright install chromium
|
||||
USER root
|
||||
|
||||
ENTRYPOINT ["/entrypoint.sh"]
|
||||
# Command to run the Python script
|
||||
CMD ["python", "main.py"]
|
||||
|
349
README.md
349
README.md
@ -1,349 +0,0 @@
|
||||
# Warhammer Community RSS Scraper
|
||||
|
||||
A production-ready Python application that scrapes the Warhammer Community website and generates an RSS feed from the latest articles.
|
||||
|
||||
## Overview
|
||||
|
||||
This project provides a robust, secure, and scalable RSS scraper for the Warhammer Community website. It features comprehensive error handling, caching, rate limiting, and security measures suitable for production deployment.
|
||||
|
||||
## Features
|
||||
|
||||
### Core Functionality
|
||||
- Scrapes articles from Warhammer Community website
|
||||
- Generates properly formatted RSS feeds
|
||||
- Handles duplicate article detection
|
||||
- Sorts articles by publication date (newest first)
|
||||
- Saves both RSS feed and debug HTML
|
||||
|
||||
### Production Features
|
||||
- **Modular Architecture**: Clean separation of concerns with dedicated modules
|
||||
- **Comprehensive Logging**: Structured logging with configurable levels
|
||||
- **Configuration Management**: Environment-based configuration
|
||||
- **Caching**: Intelligent content caching with ETags and conditional requests
|
||||
- **Rate Limiting**: Respectful scraping with configurable delays
|
||||
- **Retry Logic**: Exponential backoff for network failures
|
||||
- **Type Safety**: Full type hints throughout codebase
|
||||
- **Comprehensive Tests**: Unit tests with pytest framework
|
||||
|
||||
### Security Features
|
||||
- **URL Validation**: Whitelist-based domain validation
|
||||
- **Content Sanitization**: HTML sanitization using bleach library
|
||||
- **Path Validation**: Prevention of directory traversal attacks
|
||||
- **Resource Limits**: Memory and execution time constraints
|
||||
- **Input Validation**: Comprehensive argument and data validation
|
||||
- **Non-root Execution**: Secure container execution
|
||||
- **File Sanitization**: Safe filename handling
|
||||
|
||||
## Requirements
|
||||
|
||||
- Python 3.12+
|
||||
- Dependencies listed in `requirements.txt`
|
||||
|
||||
## Installation
|
||||
|
||||
### Local Setup
|
||||
|
||||
1. Install dependencies:
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
2. Install Playwright browsers:
|
||||
```bash
|
||||
playwright install
|
||||
```
|
||||
|
||||
3. Run the scraper:
|
||||
```bash
|
||||
# Basic usage
|
||||
python main.py
|
||||
|
||||
# With custom options
|
||||
python main.py --url https://www.warhammer-community.com/en-gb/ \
|
||||
--output-dir ./output \
|
||||
--log-level DEBUG \
|
||||
--max-scroll 3
|
||||
|
||||
# View all options
|
||||
python main.py --help
|
||||
```
|
||||
|
||||
### Docker Setup
|
||||
|
||||
1. Build the Docker image:
|
||||
```bash
|
||||
docker build -t warhammer-rss .
|
||||
```
|
||||
|
||||
2. Run the container:
|
||||
```bash
|
||||
# Basic usage
|
||||
docker run -v $(pwd)/output:/app/output warhammer-rss
|
||||
|
||||
# With custom configuration
|
||||
docker run -e MAX_SCROLL_ITERATIONS=3 \
|
||||
-e LOG_LEVEL=DEBUG \
|
||||
-v $(pwd)/output:/app/output \
|
||||
warhammer-rss --no-cache
|
||||
|
||||
# With resource limits
|
||||
docker run --memory=512m --cpu-quota=50000 \
|
||||
-v $(pwd)/output:/app/output \
|
||||
warhammer-rss
|
||||
```
|
||||
|
||||
## Command Line Options
|
||||
|
||||
```bash
|
||||
Usage: main.py [OPTIONS]
|
||||
|
||||
Options:
|
||||
--url URL URL to scrape (default: Warhammer Community)
|
||||
--output-dir PATH Output directory for files
|
||||
--max-scroll INT Maximum scroll iterations (default: 5)
|
||||
--log-level LEVEL Logging level: DEBUG, INFO, WARNING, ERROR
|
||||
--log-file PATH Log file path (default: scraper.log)
|
||||
--no-cache Disable content caching
|
||||
--clear-cache Clear cache before running
|
||||
--cache-info Show cache information and exit
|
||||
-h, --help Show help message
|
||||
```
|
||||
|
||||
## Configuration
|
||||
|
||||
### Environment Variables
|
||||
|
||||
The application supports extensive configuration via environment variables:
|
||||
|
||||
```bash
|
||||
# Scraping Configuration
|
||||
MAX_SCROLL_ITERATIONS=5 # Number of scroll iterations
|
||||
MAX_CONTENT_SIZE=10485760 # Maximum content size (10MB)
|
||||
SCROLL_DELAY_SECONDS=2.0 # Delay between scrolls
|
||||
PAGE_TIMEOUT_MS=120000 # Page load timeout
|
||||
|
||||
# Security Configuration
|
||||
ALLOWED_DOMAINS="warhammer-community.com,www.warhammer-community.com"
|
||||
MAX_TITLE_LENGTH=500 # Maximum title length
|
||||
|
||||
# Output Configuration
|
||||
DEFAULT_OUTPUT_DIR="." # Default output directory
|
||||
RSS_FILENAME="warhammer_rss_feed.xml"
|
||||
DEBUG_HTML_FILENAME="page.html"
|
||||
|
||||
# Feed Metadata
|
||||
FEED_TITLE="Warhammer Community RSS Feed"
|
||||
FEED_DESCRIPTION="Latest Warhammer Community Articles"
|
||||
```
|
||||
|
||||
### Cache Management
|
||||
|
||||
```bash
|
||||
# View cache status
|
||||
python main.py --cache-info
|
||||
|
||||
# Clear cache
|
||||
python main.py --clear-cache
|
||||
|
||||
# Disable caching for a run
|
||||
python main.py --no-cache
|
||||
```
|
||||
|
||||
## Project Structure
|
||||
|
||||
```
|
||||
rss_warhammer/
|
||||
├── main.py # CLI entry point
|
||||
├── src/rss_scraper/ # Main package
|
||||
│ ├── __init__.py
|
||||
│ ├── config.py # Configuration management
|
||||
│ ├── exceptions.py # Custom exceptions
|
||||
│ ├── validation.py # URL and path validation
|
||||
│ ├── scraper.py # Web scraping with Playwright
|
||||
│ ├── parser.py # HTML parsing and article extraction
|
||||
│ ├── rss_generator.py # RSS feed generation
|
||||
│ ├── cache.py # Content caching system
|
||||
│ ├── security.py # Security utilities
|
||||
│ └── retry_utils.py # Retry logic with backoff
|
||||
├── tests/ # Comprehensive test suite
|
||||
├── cache/ # Cache directory (auto-created)
|
||||
├── requirements.txt # Python dependencies
|
||||
├── pytest.ini # Test configuration
|
||||
├── Dockerfile # Container configuration
|
||||
└── README.md # This file
|
||||
```
|
||||
|
||||
## Output Files
|
||||
|
||||
The application generates:
|
||||
- `warhammer_rss_feed.xml` - RSS feed with extracted articles
|
||||
- `page.html` - Raw HTML for debugging (optional)
|
||||
- `scraper.log` - Application logs
|
||||
- `cache/` - Cached content and ETags
|
||||
|
||||
## Testing
|
||||
|
||||
Run the comprehensive test suite:
|
||||
|
||||
```bash
|
||||
# Run all tests
|
||||
pytest
|
||||
|
||||
# Run with coverage
|
||||
pytest --cov=src/rss_scraper
|
||||
|
||||
# Run specific test categories
|
||||
pytest -m unit # Unit tests only
|
||||
pytest tests/test_parser.py # Specific module
|
||||
```
|
||||
|
||||
## Error Handling
|
||||
|
||||
The application uses specific exit codes for different error types:
|
||||
|
||||
- `0` - Success
|
||||
- `1` - Configuration/Validation error
|
||||
- `2` - Network error
|
||||
- `3` - Page loading error
|
||||
- `4` - Content parsing error
|
||||
- `5` - File operation error
|
||||
- `6` - Content size exceeded
|
||||
- `99` - Unexpected error
|
||||
|
||||
## Security Considerations
|
||||
|
||||
### Allowed Domains
|
||||
The scraper only operates on whitelisted domains:
|
||||
- `warhammer-community.com`
|
||||
- `www.warhammer-community.com`
|
||||
|
||||
### Rate Limiting
|
||||
- Default: 30 requests per minute
|
||||
- Minimum delay: 2 seconds between requests
|
||||
- Configurable via environment variables
|
||||
|
||||
### Content Sanitization
|
||||
- HTML content sanitized using bleach
|
||||
- Dangerous scripts and patterns removed
|
||||
- File paths validated against directory traversal
|
||||
- URL validation against malicious patterns
|
||||
|
||||
## Deployment
|
||||
|
||||
### Production Deployment
|
||||
|
||||
1. **Environment Setup**:
|
||||
```bash
|
||||
# Create production environment file
|
||||
cat > .env << EOF
|
||||
MAX_SCROLL_ITERATIONS=3
|
||||
SCROLL_DELAY_SECONDS=3.0
|
||||
DEFAULT_OUTPUT_DIR=/app/data
|
||||
LOG_LEVEL=INFO
|
||||
EOF
|
||||
```
|
||||
|
||||
2. **Docker Compose** (recommended):
|
||||
```yaml
|
||||
version: '3.8'
|
||||
services:
|
||||
rss-scraper:
|
||||
build: .
|
||||
environment:
|
||||
- MAX_SCROLL_ITERATIONS=3
|
||||
- LOG_LEVEL=INFO
|
||||
volumes:
|
||||
- ./output:/app/output
|
||||
- ./logs:/app/logs
|
||||
restart: unless-stopped
|
||||
memory: 512m
|
||||
cpus: 0.5
|
||||
```
|
||||
|
||||
3. **Cron Schedule**:
|
||||
```bash
|
||||
# Add to crontab for regular updates
|
||||
0 */6 * * * docker run --rm -v /path/to/output:/app/output warhammer-rss
|
||||
```
|
||||
|
||||
## Development
|
||||
|
||||
### Setup Development Environment
|
||||
|
||||
```bash
|
||||
# Install development dependencies
|
||||
pip install -r requirements.txt
|
||||
pip install pytest pytest-cov black isort
|
||||
|
||||
# Install pre-commit hooks (optional)
|
||||
pre-commit install
|
||||
|
||||
# Run tests
|
||||
pytest
|
||||
|
||||
# Format code
|
||||
black src/ tests/
|
||||
isort src/ tests/
|
||||
```
|
||||
|
||||
### Adding New Features
|
||||
|
||||
1. Follow the modular architecture
|
||||
2. Add type hints to all functions
|
||||
3. Include comprehensive error handling
|
||||
4. Write tests for new functionality
|
||||
5. Update configuration if needed
|
||||
6. Document changes in README
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Common Issues
|
||||
|
||||
1. **Permission Errors**:
|
||||
- Ensure output directory is writable
|
||||
- Use proper Docker volume mounting
|
||||
|
||||
2. **Memory Issues**:
|
||||
- Reduce `MAX_SCROLL_ITERATIONS`
|
||||
- Increase Docker memory limits
|
||||
|
||||
3. **Rate Limiting**:
|
||||
- Increase `SCROLL_DELAY_SECONDS`
|
||||
- Check network connectivity
|
||||
|
||||
4. **Cache Issues**:
|
||||
- Clear cache with `--clear-cache`
|
||||
- Check cache directory permissions
|
||||
|
||||
### Debug Mode
|
||||
|
||||
```bash
|
||||
# Enable debug logging
|
||||
python main.py --log-level DEBUG
|
||||
|
||||
# Disable caching for testing
|
||||
python main.py --no-cache --log-level DEBUG
|
||||
```
|
||||
|
||||
## License
|
||||
|
||||
This project is provided as-is for educational purposes. Please respect the Warhammer Community website's robots.txt and terms of service.
|
||||
|
||||
## Contributing
|
||||
|
||||
1. Fork the repository
|
||||
2. Create a feature branch
|
||||
3. Add tests for new functionality
|
||||
4. Ensure all tests pass
|
||||
5. Submit a pull request
|
||||
|
||||
## Changelog
|
||||
|
||||
### Version 1.0.0
|
||||
- Complete rewrite with modular architecture
|
||||
- Added comprehensive caching system
|
||||
- Implemented rate limiting and security hardening
|
||||
- Full test coverage with pytest
|
||||
- Production-ready Docker container
|
||||
- Extensive configuration management
|
||||
- Structured logging and error handling
|
315
main.py
315
main.py
@ -1,220 +1,109 @@
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
import argparse
|
||||
from typing import Optional
|
||||
|
||||
from src.rss_scraper.config import Config
|
||||
from src.rss_scraper.exceptions import (
|
||||
ValidationError, NetworkError, PageLoadError,
|
||||
ContentSizeError, ParseError, FileOperationError
|
||||
)
|
||||
from src.rss_scraper.validation import validate_url
|
||||
from src.rss_scraper.scraper import load_page_with_retry, clear_cache, get_cache_info
|
||||
from src.rss_scraper.parser import extract_articles_from_html
|
||||
from src.rss_scraper.rss_generator import generate_rss_feed, save_rss_feed, save_debug_html
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.StreamHandler(sys.stdout),
|
||||
logging.FileHandler('scraper.log')
|
||||
]
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def parse_arguments() -> argparse.Namespace:
|
||||
"""Parse and validate command line arguments."""
|
||||
parser = argparse.ArgumentParser(
|
||||
description='RSS scraper for Warhammer Community website',
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--url',
|
||||
type=str,
|
||||
default=Config.DEFAULT_URL,
|
||||
help='URL to scrape for articles'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--output-dir',
|
||||
type=str,
|
||||
default=None,
|
||||
help='Output directory for RSS feed and HTML files'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--max-scroll',
|
||||
type=int,
|
||||
default=Config.MAX_SCROLL_ITERATIONS,
|
||||
help='Maximum number of scroll iterations'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--log-level',
|
||||
type=str,
|
||||
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
|
||||
default='INFO',
|
||||
help='Logging level'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--log-file',
|
||||
type=str,
|
||||
default='scraper.log',
|
||||
help='Log file path'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--no-cache',
|
||||
action='store_true',
|
||||
help='Disable content caching'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--clear-cache',
|
||||
action='store_true',
|
||||
help='Clear cache before running'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--cache-info',
|
||||
action='store_true',
|
||||
help='Show cache information and exit'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Validate arguments
|
||||
if args.max_scroll < 0:
|
||||
parser.error("--max-scroll must be non-negative")
|
||||
|
||||
if args.output_dir and not os.path.isabs(args.output_dir):
|
||||
# Convert relative path to absolute
|
||||
args.output_dir = os.path.abspath(args.output_dir)
|
||||
|
||||
return args
|
||||
|
||||
|
||||
def setup_logging(log_level: str, log_file: str) -> None:
|
||||
"""Setup logging configuration."""
|
||||
# Clear any existing handlers
|
||||
for handler in logging.root.handlers[:]:
|
||||
logging.root.removeHandler(handler)
|
||||
|
||||
# Set up new configuration
|
||||
logging.basicConfig(
|
||||
level=getattr(logging, log_level),
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.StreamHandler(sys.stdout),
|
||||
logging.FileHandler(log_file)
|
||||
]
|
||||
)
|
||||
|
||||
from playwright.sync_api import sync_playwright
|
||||
from bs4 import BeautifulSoup
|
||||
from feedgen.feed import FeedGenerator
|
||||
from datetime import datetime
|
||||
import pytz
|
||||
import time
|
||||
|
||||
# Function to scrape articles using Playwright and generate an RSS feed
|
||||
def scrape_and_generate_rss(url: str, output_dir: Optional[str] = None, use_cache: bool = True) -> None:
|
||||
"""Main function to scrape articles and generate RSS feed."""
|
||||
logger.info(f"Starting scrape of {url}")
|
||||
|
||||
# Validate URL first
|
||||
validate_url(url)
|
||||
|
||||
# Set default output directory if not provided
|
||||
if output_dir is None:
|
||||
output_dir = Config.DEFAULT_OUTPUT_DIR
|
||||
|
||||
logger.info(f"Using output directory: {output_dir}")
|
||||
logger.info(f"Caching {'enabled' if use_cache else 'disabled'}")
|
||||
|
||||
# Load page content with retry logic
|
||||
html = load_page_with_retry(url, use_cache=use_cache)
|
||||
|
||||
# Extract articles from HTML
|
||||
articles = extract_articles_from_html(html, url)
|
||||
|
||||
# Generate RSS feed
|
||||
rss_content = generate_rss_feed(articles, url)
|
||||
|
||||
# Save RSS feed and debug HTML
|
||||
save_rss_feed(rss_content, output_dir)
|
||||
save_debug_html(html, output_dir)
|
||||
|
||||
logger.info(f'RSS feed generated successfully with {len(articles)} articles')
|
||||
def scrape_and_generate_rss(url):
|
||||
articles = []
|
||||
seen_urls = set() # Set to track seen URLs and avoid duplicates
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
# Parse command line arguments
|
||||
args = parse_arguments()
|
||||
# Use Playwright to load the page
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.launch(headless=True)
|
||||
page = browser.new_page()
|
||||
|
||||
# Setup logging with parsed arguments
|
||||
setup_logging(args.log_level, args.log_file)
|
||||
# Set a longer timeout for loading the page
|
||||
page.set_default_navigation_timeout(120000)
|
||||
|
||||
# Re-get logger after setup
|
||||
logger = logging.getLogger(__name__)
|
||||
# Load the Warhammer Community page
|
||||
page.goto(url, wait_until="networkidle")
|
||||
|
||||
# Handle cache operations first
|
||||
if args.cache_info:
|
||||
cache_info = get_cache_info()
|
||||
print(f"Cache file: {cache_info['cache_file']}")
|
||||
print(f"ETag file: {cache_info['etag_file']}")
|
||||
print(f"Cache entries: {cache_info['cache_entries']}")
|
||||
print(f"ETag entries: {cache_info['etag_entries']}")
|
||||
print(f"Cache size: {cache_info['cache_size_bytes']} bytes")
|
||||
sys.exit(0)
|
||||
# Simulate scrolling to load more content if needed
|
||||
for _ in range(10):
|
||||
page.evaluate("window.scrollBy(0, document.body.scrollHeight)")
|
||||
time.sleep(2)
|
||||
|
||||
if args.clear_cache:
|
||||
logger.info("Clearing cache...")
|
||||
clear_cache()
|
||||
logger.info("Cache cleared successfully")
|
||||
|
||||
# Validate configuration
|
||||
Config.validate_config()
|
||||
logger.info("Configuration validation passed")
|
||||
|
||||
# Determine output directory
|
||||
output_dir = args.output_dir or os.getenv('OUTPUT_DIR') or Config.DEFAULT_OUTPUT_DIR
|
||||
|
||||
logger.info(f"Starting RSS scraper with URL: {args.url}")
|
||||
logger.info(f"Output directory: {output_dir}")
|
||||
logger.info(f"Max scroll iterations: {args.max_scroll}")
|
||||
|
||||
# Temporarily override config if max_scroll was provided
|
||||
if args.max_scroll != Config.MAX_SCROLL_ITERATIONS:
|
||||
Config.MAX_SCROLL_ITERATIONS = args.max_scroll
|
||||
logger.info(f"Overriding max scroll iterations to: {args.max_scroll}")
|
||||
|
||||
# Run the function
|
||||
use_cache = not args.no_cache
|
||||
scrape_and_generate_rss(args.url, output_dir, use_cache)
|
||||
logger.info("RSS scraping completed successfully")
|
||||
|
||||
except argparse.ArgumentError as e:
|
||||
print(f"Argument error: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except (ValueError, ValidationError) as e:
|
||||
print(f"Configuration/Validation error: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except PageLoadError as e:
|
||||
logger.error(f"Page loading error: {e}")
|
||||
sys.exit(3)
|
||||
except NetworkError as e:
|
||||
logger.error(f"Network error: {e}")
|
||||
sys.exit(2)
|
||||
except ParseError as e:
|
||||
logger.error(f"Content parsing error: {e}")
|
||||
sys.exit(4)
|
||||
except FileOperationError as e:
|
||||
logger.error(f"File operation error: {e}")
|
||||
sys.exit(5)
|
||||
except ContentSizeError as e:
|
||||
logger.error(f"Content size error: {e}")
|
||||
sys.exit(6)
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error: {e}")
|
||||
sys.exit(99)
|
||||
# Get the fully rendered HTML content
|
||||
html = page.content()
|
||||
browser.close()
|
||||
|
||||
# Parse the HTML content with BeautifulSoup
|
||||
soup = BeautifulSoup(html, 'html.parser')
|
||||
|
||||
# Define a timezone (UTC in this case)
|
||||
timezone = pytz.UTC
|
||||
|
||||
# Find all articles in the page
|
||||
for article in soup.find_all('article'):
|
||||
# Extract the title
|
||||
title_tag = article.find('h3', class_='newsCard-title-sm') or article.find('h3', class_='newsCard-title-lg')
|
||||
title = title_tag.text.strip() if title_tag else 'No title'
|
||||
|
||||
# Extract the link
|
||||
link_tag = article.find('a', href=True)
|
||||
link = link_tag['href'] if link_tag else None
|
||||
|
||||
# Skip this entry if the link is None or the URL has already been seen
|
||||
if not link or link in seen_urls:
|
||||
continue # Skip duplicates or invalid entries
|
||||
|
||||
seen_urls.add(link) # Add the URL to the set of seen URLs
|
||||
|
||||
# Extract the publication date and ignore reading time
|
||||
date = None
|
||||
for time_tag in article.find_all('time'):
|
||||
raw_date = time_tag.text.strip()
|
||||
|
||||
# Ignore "min" time blocks (reading time)
|
||||
if "min" not in raw_date.lower():
|
||||
try:
|
||||
# Parse the actual date (e.g., "02 Oct 24")
|
||||
date = datetime.strptime(raw_date, '%d %b %y')
|
||||
date = timezone.localize(date) # Localize with UTC
|
||||
break # Stop after finding the correct date
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
# If no valid date is found, use the current date as a fallback
|
||||
if not date:
|
||||
date = datetime.now(timezone)
|
||||
|
||||
# Add the article to the list with its publication date
|
||||
articles.append({
|
||||
'title': title,
|
||||
'link': link,
|
||||
'date': date
|
||||
})
|
||||
|
||||
# Sort the articles by publication date (newest first)
|
||||
articles.sort(key=lambda x: x['date'], reverse=True)
|
||||
|
||||
# Initialize the RSS feed generator
|
||||
fg = FeedGenerator()
|
||||
fg.title('Warhammer Community RSS Feed')
|
||||
fg.link(href=url)
|
||||
fg.description('Latest Warhammer Community Articles')
|
||||
|
||||
# Add the sorted articles to the RSS feed
|
||||
for article in articles:
|
||||
fe = fg.add_entry()
|
||||
fe.title(article['title'])
|
||||
fe.link(href=article['link'])
|
||||
fe.pubDate(article['date'])
|
||||
|
||||
# Generate the RSS feed
|
||||
rss_feed = fg.rss_str(pretty=True)
|
||||
|
||||
# Save the RSS feed to a file
|
||||
with open('/app/output/warhammer_rss_feed.xml', 'wb') as f:
|
||||
f.write(rss_feed)
|
||||
|
||||
with open('/app/output/page.html','w', encoding='utf-8') as f:
|
||||
f.write(soup.prettify())
|
||||
print('RSS feed generated and saved as warhammer_rss_feed.xml')
|
||||
|
||||
# Run the function
|
||||
scrape_and_generate_rss('https://www.warhammer-community.com/en-gb/')
|
||||
|
6353
output/page.html
6353
output/page.html
File diff suppressed because one or more lines are too long
14
pytest.ini
14
pytest.ini
@ -1,14 +0,0 @@
|
||||
[tool:pytest]
|
||||
testpaths = tests
|
||||
python_files = test_*.py
|
||||
python_classes = Test*
|
||||
python_functions = test_*
|
||||
addopts =
|
||||
-v
|
||||
--tb=short
|
||||
--strict-markers
|
||||
--disable-warnings
|
||||
markers =
|
||||
unit: Unit tests
|
||||
integration: Integration tests
|
||||
slow: Slow running tests
|
@ -1,9 +1,5 @@
|
||||
requests
|
||||
beautifulsoup4
|
||||
bs4
|
||||
feedgen
|
||||
playwright
|
||||
pytz
|
||||
pytest
|
||||
pytest-mock
|
||||
pytest-asyncio
|
||||
bleach
|
||||
pytz
|
@ -1 +0,0 @@
|
||||
# RSS Scraper package
|
@ -1,5 +0,0 @@
|
||||
"""RSS Scraper for Warhammer Community website."""
|
||||
|
||||
__version__ = "1.0.0"
|
||||
__author__ = "RSS Scraper"
|
||||
__description__ = "A production-ready RSS scraper for Warhammer Community website"
|
@ -1,216 +0,0 @@
|
||||
"""Caching utilities for avoiding redundant scraping."""
|
||||
|
||||
import os
|
||||
import json
|
||||
import hashlib
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional, Dict, Any, List
|
||||
import requests
|
||||
|
||||
from .config import Config
|
||||
from .exceptions import FileOperationError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ContentCache:
|
||||
"""Cache for storing and retrieving scraped content."""
|
||||
|
||||
def __init__(self, cache_dir: str = "cache"):
|
||||
self.cache_dir = cache_dir
|
||||
self.cache_file = os.path.join(cache_dir, "content_cache.json")
|
||||
self.etag_file = os.path.join(cache_dir, "etags.json")
|
||||
self.max_cache_age_hours = 24 # Cache expires after 24 hours
|
||||
|
||||
# Ensure cache directory exists
|
||||
os.makedirs(cache_dir, exist_ok=True)
|
||||
|
||||
def _get_cache_key(self, url: str) -> str:
|
||||
"""Generate cache key from URL."""
|
||||
return hashlib.sha256(url.encode()).hexdigest()
|
||||
|
||||
def _load_cache(self) -> Dict[str, Any]:
|
||||
"""Load cache from file."""
|
||||
try:
|
||||
if os.path.exists(self.cache_file):
|
||||
with open(self.cache_file, 'r') as f:
|
||||
return json.load(f)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to load cache: {e}")
|
||||
return {}
|
||||
|
||||
def _save_cache(self, cache_data: Dict[str, Any]) -> None:
|
||||
"""Save cache to file."""
|
||||
try:
|
||||
with open(self.cache_file, 'w') as f:
|
||||
json.dump(cache_data, f, indent=2, default=str)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save cache: {e}")
|
||||
raise FileOperationError(f"Failed to save cache: {e}")
|
||||
|
||||
def _load_etags(self) -> Dict[str, str]:
|
||||
"""Load ETags from file."""
|
||||
try:
|
||||
if os.path.exists(self.etag_file):
|
||||
with open(self.etag_file, 'r') as f:
|
||||
return json.load(f)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to load ETags: {e}")
|
||||
return {}
|
||||
|
||||
def _save_etags(self, etag_data: Dict[str, str]) -> None:
|
||||
"""Save ETags to file."""
|
||||
try:
|
||||
with open(self.etag_file, 'w') as f:
|
||||
json.dump(etag_data, f, indent=2)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to save ETags: {e}")
|
||||
|
||||
def _is_cache_valid(self, cached_entry: Dict[str, Any]) -> bool:
|
||||
"""Check if cached entry is still valid."""
|
||||
try:
|
||||
cached_time = datetime.fromisoformat(cached_entry['timestamp'])
|
||||
expiry_time = cached_time + timedelta(hours=self.max_cache_age_hours)
|
||||
return datetime.now() < expiry_time
|
||||
except (KeyError, ValueError):
|
||||
return False
|
||||
|
||||
def check_if_content_changed(self, url: str) -> Optional[Dict[str, str]]:
|
||||
"""Check if content has changed using conditional requests."""
|
||||
etags = self._load_etags()
|
||||
cache_key = self._get_cache_key(url)
|
||||
|
||||
headers = {}
|
||||
if cache_key in etags:
|
||||
headers['If-None-Match'] = etags[cache_key]
|
||||
|
||||
try:
|
||||
logger.debug(f"Checking if content changed for {url}")
|
||||
response = requests.head(url, headers=headers, timeout=10)
|
||||
|
||||
# 304 means not modified
|
||||
if response.status_code == 304:
|
||||
logger.info(f"Content not modified for {url}")
|
||||
return {'status': 'not_modified'}
|
||||
|
||||
# Update ETag if available
|
||||
if 'etag' in response.headers:
|
||||
etags[cache_key] = response.headers['etag']
|
||||
self._save_etags(etags)
|
||||
logger.debug(f"Updated ETag for {url}")
|
||||
|
||||
return {'status': 'modified', 'etag': response.headers.get('etag')}
|
||||
|
||||
except requests.RequestException as e:
|
||||
logger.warning(f"Failed to check content modification for {url}: {e}")
|
||||
# If we can't check, assume it's modified
|
||||
return {'status': 'modified'}
|
||||
|
||||
def get_cached_content(self, url: str) -> Optional[str]:
|
||||
"""Get cached HTML content if available and valid."""
|
||||
cache_data = self._load_cache()
|
||||
cache_key = self._get_cache_key(url)
|
||||
|
||||
if cache_key not in cache_data:
|
||||
logger.debug(f"No cached content for {url}")
|
||||
return None
|
||||
|
||||
cached_entry = cache_data[cache_key]
|
||||
|
||||
if not self._is_cache_valid(cached_entry):
|
||||
logger.debug(f"Cached content for {url} has expired")
|
||||
# Remove expired entry
|
||||
del cache_data[cache_key]
|
||||
self._save_cache(cache_data)
|
||||
return None
|
||||
|
||||
logger.info(f"Using cached content for {url}")
|
||||
return cached_entry['content']
|
||||
|
||||
def cache_content(self, url: str, content: str) -> None:
|
||||
"""Cache HTML content with timestamp."""
|
||||
cache_data = self._load_cache()
|
||||
cache_key = self._get_cache_key(url)
|
||||
|
||||
cache_data[cache_key] = {
|
||||
'url': url,
|
||||
'content': content,
|
||||
'timestamp': datetime.now().isoformat(),
|
||||
'size': len(content)
|
||||
}
|
||||
|
||||
self._save_cache(cache_data)
|
||||
logger.info(f"Cached content for {url} ({len(content)} bytes)")
|
||||
|
||||
def get_cached_articles(self, url: str) -> Optional[List[Dict[str, Any]]]:
|
||||
"""Get cached articles if available and valid."""
|
||||
cache_data = self._load_cache()
|
||||
cache_key = self._get_cache_key(url) + "_articles"
|
||||
|
||||
if cache_key not in cache_data:
|
||||
return None
|
||||
|
||||
cached_entry = cache_data[cache_key]
|
||||
|
||||
if not self._is_cache_valid(cached_entry):
|
||||
# Remove expired entry
|
||||
del cache_data[cache_key]
|
||||
self._save_cache(cache_data)
|
||||
return None
|
||||
|
||||
logger.info(f"Using cached articles for {url}")
|
||||
return cached_entry['articles']
|
||||
|
||||
def cache_articles(self, url: str, articles: List[Dict[str, Any]]) -> None:
|
||||
"""Cache extracted articles."""
|
||||
cache_data = self._load_cache()
|
||||
cache_key = self._get_cache_key(url) + "_articles"
|
||||
|
||||
# Convert datetime objects to strings for JSON serialization
|
||||
serializable_articles = []
|
||||
for article in articles:
|
||||
serializable_article = article.copy()
|
||||
if 'date' in serializable_article and hasattr(serializable_article['date'], 'isoformat'):
|
||||
serializable_article['date'] = serializable_article['date'].isoformat()
|
||||
serializable_articles.append(serializable_article)
|
||||
|
||||
cache_data[cache_key] = {
|
||||
'url': url,
|
||||
'articles': serializable_articles,
|
||||
'timestamp': datetime.now().isoformat(),
|
||||
'count': len(articles)
|
||||
}
|
||||
|
||||
self._save_cache(cache_data)
|
||||
logger.info(f"Cached {len(articles)} articles for {url}")
|
||||
|
||||
def clear_cache(self) -> None:
|
||||
"""Clear all cached content."""
|
||||
try:
|
||||
if os.path.exists(self.cache_file):
|
||||
os.remove(self.cache_file)
|
||||
if os.path.exists(self.etag_file):
|
||||
os.remove(self.etag_file)
|
||||
logger.info("Cache cleared successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to clear cache: {e}")
|
||||
raise FileOperationError(f"Failed to clear cache: {e}")
|
||||
|
||||
def get_cache_info(self) -> Dict[str, Any]:
|
||||
"""Get information about cached content."""
|
||||
cache_data = self._load_cache()
|
||||
etags = self._load_etags()
|
||||
|
||||
info = {
|
||||
'cache_file': self.cache_file,
|
||||
'etag_file': self.etag_file,
|
||||
'cache_entries': len(cache_data),
|
||||
'etag_entries': len(etags),
|
||||
'cache_size_bytes': 0
|
||||
}
|
||||
|
||||
if os.path.exists(self.cache_file):
|
||||
info['cache_size_bytes'] = os.path.getsize(self.cache_file)
|
||||
|
||||
return info
|
@ -1,77 +0,0 @@
|
||||
"""Configuration management for RSS Warhammer scraper."""
|
||||
|
||||
import os
|
||||
from typing import List, Optional
|
||||
|
||||
|
||||
class Config:
|
||||
"""Configuration class for RSS scraper settings."""
|
||||
|
||||
# Security settings
|
||||
ALLOWED_DOMAINS: List[str] = [
|
||||
'warhammer-community.com',
|
||||
'www.warhammer-community.com'
|
||||
]
|
||||
|
||||
# Scraping limits
|
||||
MAX_SCROLL_ITERATIONS: int = int(os.getenv('MAX_SCROLL_ITERATIONS', '5'))
|
||||
MAX_CONTENT_SIZE: int = int(os.getenv('MAX_CONTENT_SIZE', str(10 * 1024 * 1024))) # 10MB
|
||||
MAX_TITLE_LENGTH: int = int(os.getenv('MAX_TITLE_LENGTH', '500'))
|
||||
|
||||
# Timing settings
|
||||
SCROLL_DELAY_SECONDS: float = float(os.getenv('SCROLL_DELAY_SECONDS', '2.0'))
|
||||
PAGE_TIMEOUT_MS: int = int(os.getenv('PAGE_TIMEOUT_MS', '120000'))
|
||||
|
||||
# Default URLs and paths
|
||||
DEFAULT_URL: str = os.getenv('DEFAULT_URL', 'https://www.warhammer-community.com/en-gb/')
|
||||
DEFAULT_OUTPUT_DIR: str = os.getenv('DEFAULT_OUTPUT_DIR', '.')
|
||||
|
||||
# File names
|
||||
RSS_FILENAME: str = os.getenv('RSS_FILENAME', 'warhammer_rss_feed.xml')
|
||||
DEBUG_HTML_FILENAME: str = os.getenv('DEBUG_HTML_FILENAME', 'page.html')
|
||||
|
||||
# Feed metadata
|
||||
FEED_TITLE: str = os.getenv('FEED_TITLE', 'Warhammer Community RSS Feed')
|
||||
FEED_DESCRIPTION: str = os.getenv('FEED_DESCRIPTION', 'Latest Warhammer Community Articles')
|
||||
|
||||
# Security patterns to remove from content
|
||||
DANGEROUS_PATTERNS: List[str] = [
|
||||
'<script', '</script', 'javascript:', 'data:', 'vbscript:'
|
||||
]
|
||||
|
||||
# CSS selectors for article parsing
|
||||
TITLE_SELECTORS: List[str] = [
|
||||
'h3.newsCard-title-sm',
|
||||
'h3.newsCard-title-lg'
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def get_output_dir(cls, override: Optional[str] = None) -> str:
|
||||
"""Get output directory with optional override."""
|
||||
return override or cls.DEFAULT_OUTPUT_DIR
|
||||
|
||||
@classmethod
|
||||
def get_allowed_domains(cls) -> List[str]:
|
||||
"""Get list of allowed domains for scraping."""
|
||||
env_domains = os.getenv('ALLOWED_DOMAINS')
|
||||
if env_domains:
|
||||
return [domain.strip() for domain in env_domains.split(',')]
|
||||
return cls.ALLOWED_DOMAINS
|
||||
|
||||
@classmethod
|
||||
def validate_config(cls) -> None:
|
||||
"""Validate configuration values."""
|
||||
if cls.MAX_SCROLL_ITERATIONS < 0:
|
||||
raise ValueError("MAX_SCROLL_ITERATIONS must be non-negative")
|
||||
if cls.MAX_CONTENT_SIZE <= 0:
|
||||
raise ValueError("MAX_CONTENT_SIZE must be positive")
|
||||
if cls.MAX_TITLE_LENGTH <= 0:
|
||||
raise ValueError("MAX_TITLE_LENGTH must be positive")
|
||||
if cls.SCROLL_DELAY_SECONDS < 0:
|
||||
raise ValueError("SCROLL_DELAY_SECONDS must be non-negative")
|
||||
if cls.PAGE_TIMEOUT_MS <= 0:
|
||||
raise ValueError("PAGE_TIMEOUT_MS must be positive")
|
||||
if not cls.DEFAULT_URL.startswith(('http://', 'https://')):
|
||||
raise ValueError("DEFAULT_URL must be a valid HTTP/HTTPS URL")
|
||||
if not cls.get_allowed_domains():
|
||||
raise ValueError("ALLOWED_DOMAINS cannot be empty")
|
@ -1,41 +0,0 @@
|
||||
"""Custom exceptions for the RSS scraper."""
|
||||
|
||||
|
||||
class ScrapingError(Exception):
|
||||
"""Base exception for scraping-related errors."""
|
||||
pass
|
||||
|
||||
|
||||
class ValidationError(ScrapingError):
|
||||
"""Exception raised for validation errors."""
|
||||
pass
|
||||
|
||||
|
||||
class NetworkError(ScrapingError):
|
||||
"""Exception raised for network-related errors."""
|
||||
pass
|
||||
|
||||
|
||||
class PageLoadError(NetworkError):
|
||||
"""Exception raised when page fails to load properly."""
|
||||
pass
|
||||
|
||||
|
||||
class ContentSizeError(ScrapingError):
|
||||
"""Exception raised when content exceeds size limits."""
|
||||
pass
|
||||
|
||||
|
||||
class ParseError(ScrapingError):
|
||||
"""Exception raised when HTML parsing fails."""
|
||||
pass
|
||||
|
||||
|
||||
class ConfigurationError(ScrapingError):
|
||||
"""Exception raised for configuration-related errors."""
|
||||
pass
|
||||
|
||||
|
||||
class FileOperationError(ScrapingError):
|
||||
"""Exception raised for file operation errors."""
|
||||
pass
|
@ -1,111 +0,0 @@
|
||||
"""HTML parsing and article extraction functionality."""
|
||||
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import List, Dict, Any, Optional
|
||||
import pytz
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from .config import Config
|
||||
from .validation import validate_link
|
||||
from .exceptions import ParseError
|
||||
from .security import sanitize_text_content, sanitize_html_content
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def sanitize_text(text: Optional[str]) -> str:
|
||||
"""Sanitize text content to prevent injection attacks"""
|
||||
return sanitize_text_content(text)
|
||||
|
||||
|
||||
def extract_articles_from_html(html: str, base_url: str) -> List[Dict[str, Any]]:
|
||||
"""Extract articles from HTML content."""
|
||||
logger.info("Parsing HTML content with BeautifulSoup")
|
||||
|
||||
# Sanitize HTML content first for security
|
||||
sanitized_html = sanitize_html_content(html)
|
||||
|
||||
try:
|
||||
soup = BeautifulSoup(sanitized_html, 'html.parser')
|
||||
except Exception as e:
|
||||
raise ParseError(f"Failed to parse HTML content: {e}")
|
||||
|
||||
# Define a timezone (UTC in this case)
|
||||
timezone = pytz.UTC
|
||||
|
||||
# Find all articles in the page - look for article elements with shared- classes (all article types)
|
||||
all_articles = soup.find_all('article')
|
||||
article_elements = []
|
||||
for article in all_articles:
|
||||
classes = article.get('class', [])
|
||||
if classes and any('shared-' in cls for cls in classes):
|
||||
article_elements.append(article)
|
||||
logger.info(f"Found {len(article_elements)} article elements on page")
|
||||
|
||||
articles: List[Dict[str, Any]] = []
|
||||
seen_urls: set = set() # Set to track seen URLs and avoid duplicates
|
||||
|
||||
for article in article_elements:
|
||||
# Extract and sanitize the title
|
||||
title_tag = None
|
||||
for selector in Config.TITLE_SELECTORS:
|
||||
class_name = selector.split('.')[1] if '.' in selector else selector
|
||||
title_tag = article.find('h3', class_=class_name)
|
||||
if title_tag:
|
||||
break
|
||||
|
||||
raw_title = title_tag.text.strip() if title_tag else 'No title'
|
||||
title = sanitize_text(raw_title)
|
||||
|
||||
# Extract and validate the link - look for btn-cover class first, then any anchor
|
||||
link_tag = article.find('a', class_='btn-cover', href=True) or article.find('a', href=True)
|
||||
raw_link = link_tag['href'] if link_tag else None
|
||||
link = validate_link(raw_link, base_url)
|
||||
|
||||
# Skip this entry if the link is None or the URL has already been seen
|
||||
if not link or link in seen_urls:
|
||||
logger.debug(f"Skipping duplicate or invalid article: {title}")
|
||||
continue # Skip duplicates or invalid entries
|
||||
|
||||
seen_urls.add(link) # Add the URL to the set of seen URLs
|
||||
logger.debug(f"Processing article: {title[:50]}...")
|
||||
|
||||
# Extract the publication date and ignore reading time
|
||||
date = None
|
||||
for time_tag in article.find_all('time'):
|
||||
raw_date = time_tag.text.strip()
|
||||
|
||||
# Ignore "min" time blocks (reading time)
|
||||
if "min" not in raw_date.lower():
|
||||
try:
|
||||
# Parse the actual date (e.g., "05 Jun 25")
|
||||
date = datetime.strptime(raw_date, '%d %b %y')
|
||||
date = timezone.localize(date) # Localize with UTC
|
||||
break # Stop after finding the correct date
|
||||
except ValueError:
|
||||
# Try alternative date formats if the first one fails
|
||||
try:
|
||||
# Try format like "Jun 05, 2025"
|
||||
date = datetime.strptime(raw_date, '%b %d, %Y')
|
||||
date = timezone.localize(date)
|
||||
break
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
# If no valid date is found, use the current date as a fallback
|
||||
if not date:
|
||||
date = datetime.now(timezone)
|
||||
|
||||
# Add the article to the list with its publication date
|
||||
articles.append({
|
||||
'title': title,
|
||||
'link': link,
|
||||
'date': date
|
||||
})
|
||||
|
||||
# Sort the articles by publication date (newest first)
|
||||
articles.sort(key=lambda x: x['date'], reverse=True)
|
||||
logger.info(f"Successfully extracted {len(articles)} unique articles")
|
||||
|
||||
return articles
|
@ -1,124 +0,0 @@
|
||||
"""Retry utilities with exponential backoff for network operations."""
|
||||
|
||||
import time
|
||||
import logging
|
||||
from typing import Any, Callable, Optional, Type, Union, Tuple
|
||||
from functools import wraps
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RetryConfig:
|
||||
"""Configuration for retry behavior."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
max_attempts: int = 3,
|
||||
base_delay: float = 1.0,
|
||||
max_delay: float = 60.0,
|
||||
backoff_factor: float = 2.0,
|
||||
jitter: bool = True
|
||||
):
|
||||
self.max_attempts = max_attempts
|
||||
self.base_delay = base_delay
|
||||
self.max_delay = max_delay
|
||||
self.backoff_factor = backoff_factor
|
||||
self.jitter = jitter
|
||||
|
||||
|
||||
def calculate_delay(attempt: int, config: RetryConfig) -> float:
|
||||
"""Calculate delay for retry attempt with exponential backoff."""
|
||||
delay = config.base_delay * (config.backoff_factor ** (attempt - 1))
|
||||
delay = min(delay, config.max_delay)
|
||||
|
||||
if config.jitter:
|
||||
# Add random jitter to avoid thundering herd
|
||||
import random
|
||||
jitter_amount = delay * 0.1
|
||||
delay += random.uniform(-jitter_amount, jitter_amount)
|
||||
|
||||
return max(0, delay)
|
||||
|
||||
|
||||
def retry_on_exception(
|
||||
exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]],
|
||||
config: Optional[RetryConfig] = None
|
||||
) -> Callable:
|
||||
"""Decorator to retry function calls on specific exceptions.
|
||||
|
||||
Args:
|
||||
exceptions: Exception type(s) to retry on
|
||||
config: Retry configuration, uses default if None
|
||||
|
||||
Returns:
|
||||
Decorated function with retry logic
|
||||
"""
|
||||
if config is None:
|
||||
config = RetryConfig()
|
||||
|
||||
def decorator(func: Callable) -> Callable:
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs) -> Any:
|
||||
last_exception = None
|
||||
|
||||
for attempt in range(1, config.max_attempts + 1):
|
||||
try:
|
||||
result = func(*args, **kwargs)
|
||||
if attempt > 1:
|
||||
logger.info(f"{func.__name__} succeeded on attempt {attempt}")
|
||||
return result
|
||||
|
||||
except exceptions as e:
|
||||
last_exception = e
|
||||
|
||||
if attempt == config.max_attempts:
|
||||
logger.error(
|
||||
f"{func.__name__} failed after {config.max_attempts} attempts. "
|
||||
f"Final error: {e}"
|
||||
)
|
||||
raise
|
||||
|
||||
delay = calculate_delay(attempt, config)
|
||||
logger.warning(
|
||||
f"{func.__name__} attempt {attempt} failed: {e}. "
|
||||
f"Retrying in {delay:.2f} seconds..."
|
||||
)
|
||||
time.sleep(delay)
|
||||
|
||||
except Exception as e:
|
||||
# Don't retry on unexpected exceptions
|
||||
logger.error(f"{func.__name__} failed with unexpected error: {e}")
|
||||
raise
|
||||
|
||||
# This should never be reached, but just in case
|
||||
if last_exception:
|
||||
raise last_exception
|
||||
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
# Common retry configurations for different scenarios
|
||||
NETWORK_RETRY_CONFIG = RetryConfig(
|
||||
max_attempts=3,
|
||||
base_delay=1.0,
|
||||
max_delay=30.0,
|
||||
backoff_factor=2.0,
|
||||
jitter=True
|
||||
)
|
||||
|
||||
PLAYWRIGHT_RETRY_CONFIG = RetryConfig(
|
||||
max_attempts=2,
|
||||
base_delay=2.0,
|
||||
max_delay=10.0,
|
||||
backoff_factor=2.0,
|
||||
jitter=False
|
||||
)
|
||||
|
||||
FILE_RETRY_CONFIG = RetryConfig(
|
||||
max_attempts=3,
|
||||
base_delay=0.5,
|
||||
max_delay=5.0,
|
||||
backoff_factor=1.5,
|
||||
jitter=False
|
||||
)
|
@ -1,59 +0,0 @@
|
||||
"""RSS feed generation functionality."""
|
||||
|
||||
import os
|
||||
import logging
|
||||
from typing import List, Dict, Any
|
||||
from feedgen.feed import FeedGenerator
|
||||
|
||||
from .config import Config
|
||||
from .validation import validate_output_path
|
||||
from .exceptions import FileOperationError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def generate_rss_feed(articles: List[Dict[str, Any]], feed_url: str) -> bytes:
|
||||
"""Generate RSS feed from articles list."""
|
||||
logger.info(f"Generating RSS feed for {len(articles)} articles")
|
||||
|
||||
# Initialize the RSS feed generator
|
||||
fg = FeedGenerator()
|
||||
fg.title(Config.FEED_TITLE)
|
||||
fg.link(href=feed_url)
|
||||
fg.description(Config.FEED_DESCRIPTION)
|
||||
|
||||
# Add the sorted articles to the RSS feed
|
||||
for article in articles:
|
||||
fe = fg.add_entry()
|
||||
fe.title(article['title'])
|
||||
fe.link(href=article['link'])
|
||||
fe.pubDate(article['date'])
|
||||
|
||||
# Generate the RSS feed
|
||||
return fg.rss_str(pretty=True)
|
||||
|
||||
|
||||
def save_rss_feed(rss_content: bytes, output_dir: str) -> str:
|
||||
"""Save RSS feed to file."""
|
||||
try:
|
||||
rss_path = validate_output_path(os.path.join(output_dir, Config.RSS_FILENAME), output_dir)
|
||||
with open(rss_path, 'wb') as f:
|
||||
f.write(rss_content)
|
||||
logger.info(f'RSS feed saved to: {rss_path}')
|
||||
return rss_path
|
||||
except Exception as e:
|
||||
raise FileOperationError(f"Failed to save RSS feed: {e}")
|
||||
|
||||
|
||||
def save_debug_html(html_content: str, output_dir: str) -> None:
|
||||
"""Save HTML content for debugging purposes."""
|
||||
try:
|
||||
from bs4 import BeautifulSoup
|
||||
soup = BeautifulSoup(html_content, 'html.parser')
|
||||
html_path = validate_output_path(os.path.join(output_dir, Config.DEBUG_HTML_FILENAME), output_dir)
|
||||
with open(html_path, 'w', encoding='utf-8') as f:
|
||||
f.write(soup.prettify())
|
||||
logger.info(f'Debug HTML saved to: {html_path}')
|
||||
except Exception as e:
|
||||
# HTML saving is not critical, just log the error
|
||||
logger.warning(f"Failed to save debug HTML: {e}")
|
@ -1,112 +0,0 @@
|
||||
"""Web scraping functionality using Playwright."""
|
||||
|
||||
import time
|
||||
import logging
|
||||
from playwright.sync_api import sync_playwright
|
||||
from typing import Optional
|
||||
|
||||
from .config import Config
|
||||
from .exceptions import NetworkError, PageLoadError, ContentSizeError
|
||||
from .retry_utils import retry_on_exception, PLAYWRIGHT_RETRY_CONFIG
|
||||
from .cache import ContentCache
|
||||
from .security import wait_for_rate_limit
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Global cache instance
|
||||
_cache = ContentCache()
|
||||
|
||||
|
||||
def load_page_with_retry(url: str, use_cache: bool = True) -> str:
|
||||
"""Load page content with caching and retry logic for network errors."""
|
||||
logger.info(f"Loading page: {url}")
|
||||
|
||||
# Check cache first if enabled
|
||||
if use_cache:
|
||||
# Check if content has changed using conditional requests
|
||||
change_check = _cache.check_if_content_changed(url)
|
||||
if change_check and change_check['status'] == 'not_modified':
|
||||
cached_content = _cache.get_cached_content(url)
|
||||
if cached_content:
|
||||
logger.info("Using cached content (not modified)")
|
||||
return cached_content
|
||||
|
||||
# Check for valid cached content
|
||||
cached_content = _cache.get_cached_content(url)
|
||||
if cached_content:
|
||||
logger.info("Using cached content")
|
||||
return cached_content
|
||||
|
||||
# Load fresh content
|
||||
html = _load_page_fresh(url)
|
||||
|
||||
# Cache the content if caching is enabled
|
||||
if use_cache:
|
||||
_cache.cache_content(url, html)
|
||||
|
||||
return html
|
||||
|
||||
|
||||
@retry_on_exception((NetworkError, PageLoadError), PLAYWRIGHT_RETRY_CONFIG)
|
||||
def _load_page_fresh(url: str) -> str:
|
||||
"""Load fresh page content using Playwright."""
|
||||
logger.info(f"Loading fresh content from: {url}")
|
||||
|
||||
# Apply rate limiting before making request
|
||||
wait_for_rate_limit()
|
||||
|
||||
try:
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.launch(headless=True)
|
||||
page = browser.new_page()
|
||||
|
||||
# Set a longer timeout for loading the page
|
||||
page.set_default_navigation_timeout(Config.PAGE_TIMEOUT_MS)
|
||||
|
||||
try:
|
||||
# Load the page
|
||||
page.goto(url, wait_until="networkidle")
|
||||
|
||||
# Simulate scrolling to load more content
|
||||
logger.info(f"Scrolling page {Config.MAX_SCROLL_ITERATIONS} times to load content")
|
||||
for i in range(Config.MAX_SCROLL_ITERATIONS):
|
||||
logger.debug(f"Scroll iteration {i + 1}/{Config.MAX_SCROLL_ITERATIONS}")
|
||||
page.evaluate("window.scrollBy(0, document.body.scrollHeight)")
|
||||
time.sleep(Config.SCROLL_DELAY_SECONDS)
|
||||
|
||||
# Get the fully rendered HTML content
|
||||
html = page.content()
|
||||
|
||||
# Check content size for security
|
||||
if len(html) > Config.MAX_CONTENT_SIZE:
|
||||
error_msg = f"Content size {len(html)} exceeds maximum {Config.MAX_CONTENT_SIZE}"
|
||||
logger.error(error_msg)
|
||||
raise ContentSizeError(error_msg)
|
||||
|
||||
logger.info(f"Page loaded successfully, content size: {len(html)} bytes")
|
||||
return html
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load page content: {e}")
|
||||
if "timeout" in str(e).lower() or "network" in str(e).lower():
|
||||
raise NetworkError(f"Network error loading page: {e}")
|
||||
else:
|
||||
raise PageLoadError(f"Page load error: {e}")
|
||||
finally:
|
||||
browser.close()
|
||||
|
||||
except Exception as e:
|
||||
if isinstance(e, (NetworkError, PageLoadError, ContentSizeError)):
|
||||
raise
|
||||
logger.error(f"Unexpected error in Playwright: {e}")
|
||||
raise PageLoadError(f"Playwright error: {e}")
|
||||
|
||||
|
||||
def clear_cache() -> None:
|
||||
"""Clear the content cache."""
|
||||
_cache.clear_cache()
|
||||
|
||||
|
||||
def get_cache_info() -> dict:
|
||||
"""Get information about the cache."""
|
||||
return _cache.get_cache_info()
|
@ -1,236 +0,0 @@
|
||||
"""Security utilities for content sanitization and rate limiting."""
|
||||
|
||||
import time
|
||||
import logging
|
||||
import re
|
||||
from typing import Optional, Dict, Any
|
||||
from datetime import datetime, timedelta
|
||||
import bleach
|
||||
|
||||
from .config import Config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RateLimiter:
|
||||
"""Rate limiter to prevent excessive requests."""
|
||||
|
||||
def __init__(self, requests_per_minute: int = 30):
|
||||
self.requests_per_minute = requests_per_minute
|
||||
self.request_times: list = []
|
||||
self.min_delay_seconds = 60.0 / requests_per_minute
|
||||
self.last_request_time: Optional[float] = None
|
||||
|
||||
def wait_if_needed(self) -> None:
|
||||
"""Wait if necessary to respect rate limits."""
|
||||
current_time = time.time()
|
||||
|
||||
# Clean old request times (older than 1 minute)
|
||||
cutoff_time = current_time - 60
|
||||
self.request_times = [t for t in self.request_times if t > cutoff_time]
|
||||
|
||||
# Check if we've hit the rate limit
|
||||
if len(self.request_times) >= self.requests_per_minute:
|
||||
sleep_time = 60 - (current_time - self.request_times[0])
|
||||
if sleep_time > 0:
|
||||
logger.info(f"Rate limit reached, sleeping for {sleep_time:.2f} seconds")
|
||||
time.sleep(sleep_time)
|
||||
|
||||
# Ensure minimum delay between requests
|
||||
if self.last_request_time:
|
||||
time_since_last = current_time - self.last_request_time
|
||||
if time_since_last < self.min_delay_seconds:
|
||||
sleep_time = self.min_delay_seconds - time_since_last
|
||||
logger.debug(f"Enforcing minimum delay, sleeping for {sleep_time:.2f} seconds")
|
||||
time.sleep(sleep_time)
|
||||
|
||||
# Record this request
|
||||
self.request_times.append(time.time())
|
||||
self.last_request_time = time.time()
|
||||
|
||||
|
||||
class ContentSanitizer:
|
||||
"""Enhanced content sanitization for security."""
|
||||
|
||||
def __init__(self):
|
||||
# Allowed HTML tags for RSS content (including structural elements for parsing)
|
||||
self.allowed_tags = [
|
||||
'p', 'br', 'strong', 'em', 'b', 'i', 'u', 'span',
|
||||
'h1', 'h2', 'h3', 'h4', 'h5', 'h6',
|
||||
'ul', 'ol', 'li', 'blockquote',
|
||||
'div', 'article', 'section', 'header', 'footer', 'main', 'nav',
|
||||
'a', 'img', 'figure', 'figcaption', 'time'
|
||||
]
|
||||
|
||||
# Allowed attributes
|
||||
self.allowed_attributes = {
|
||||
'*': ['class', 'id'],
|
||||
'a': ['href', 'title', 'class'],
|
||||
'img': ['src', 'alt', 'title', 'width', 'height', 'class'],
|
||||
'time': ['datetime', 'class'],
|
||||
'div': ['class', 'id'],
|
||||
'article': ['class', 'id'],
|
||||
'section': ['class', 'id']
|
||||
}
|
||||
|
||||
# Protocols allowed in URLs
|
||||
self.allowed_protocols = ['http', 'https']
|
||||
|
||||
# Dangerous patterns to remove (pre-compiled for performance)
|
||||
self.dangerous_patterns = [
|
||||
re.compile(r'<script[^>]*>.*?</script>', re.IGNORECASE | re.DOTALL),
|
||||
re.compile(r'<iframe[^>]*>.*?</iframe>', re.IGNORECASE | re.DOTALL),
|
||||
re.compile(r'<object[^>]*>.*?</object>', re.IGNORECASE | re.DOTALL),
|
||||
re.compile(r'<embed[^>]*>.*?</embed>', re.IGNORECASE | re.DOTALL),
|
||||
re.compile(r'<applet[^>]*>.*?</applet>', re.IGNORECASE | re.DOTALL),
|
||||
re.compile(r'<form[^>]*>.*?</form>', re.IGNORECASE | re.DOTALL),
|
||||
re.compile(r'javascript:', re.IGNORECASE),
|
||||
re.compile(r'vbscript:', re.IGNORECASE),
|
||||
re.compile(r'data:', re.IGNORECASE),
|
||||
re.compile(r'on\w+\s*=', re.IGNORECASE), # event handlers like onclick, onload, etc.
|
||||
]
|
||||
|
||||
def sanitize_html(self, html_content: str) -> str:
|
||||
"""Sanitize HTML content using bleach library."""
|
||||
if not html_content:
|
||||
return ""
|
||||
|
||||
try:
|
||||
# First pass: remove obviously dangerous patterns
|
||||
cleaned = html_content
|
||||
for pattern in self.dangerous_patterns:
|
||||
cleaned = pattern.sub('', cleaned)
|
||||
|
||||
# Second pass: use bleach for comprehensive sanitization
|
||||
sanitized = bleach.clean(
|
||||
cleaned,
|
||||
tags=self.allowed_tags,
|
||||
attributes=self.allowed_attributes,
|
||||
protocols=self.allowed_protocols,
|
||||
strip=True,
|
||||
strip_comments=True
|
||||
)
|
||||
|
||||
return sanitized
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error sanitizing HTML: {e}")
|
||||
# If sanitization fails, return empty string for safety
|
||||
return ""
|
||||
|
||||
def sanitize_text(self, text: Optional[str]) -> str:
|
||||
"""Enhanced text sanitization with better security."""
|
||||
if not text:
|
||||
return "No title"
|
||||
|
||||
# Basic cleaning
|
||||
sanitized = text.strip()
|
||||
|
||||
# Remove null bytes and other control characters
|
||||
sanitized = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', sanitized)
|
||||
|
||||
# Remove dangerous patterns (case insensitive)
|
||||
for pattern in Config.DANGEROUS_PATTERNS:
|
||||
sanitized = re.sub(re.escape(pattern), '', sanitized, flags=re.IGNORECASE)
|
||||
|
||||
# Limit length
|
||||
sanitized = sanitized[:Config.MAX_TITLE_LENGTH]
|
||||
|
||||
# Remove excessive whitespace
|
||||
sanitized = re.sub(r'\s+', ' ', sanitized).strip()
|
||||
|
||||
return sanitized if sanitized else "No title"
|
||||
|
||||
def validate_url_security(self, url: str) -> bool:
|
||||
"""Enhanced URL validation for security."""
|
||||
if not url:
|
||||
return False
|
||||
|
||||
# Check for dangerous protocols
|
||||
dangerous_protocols = ['javascript:', 'vbscript:', 'data:', 'file:', 'ftp:']
|
||||
url_lower = url.lower()
|
||||
|
||||
for protocol in dangerous_protocols:
|
||||
if url_lower.startswith(protocol):
|
||||
logger.warning(f"Blocked dangerous protocol in URL: {url}")
|
||||
return False
|
||||
|
||||
# Check for suspicious patterns
|
||||
suspicious_patterns = [
|
||||
r'\.\./', # Path traversal
|
||||
r'%2e%2e%2f', # Encoded path traversal
|
||||
r'<script', # Script injection
|
||||
r'javascript:', # JavaScript protocol
|
||||
r'vbscript:', # VBScript protocol
|
||||
]
|
||||
|
||||
for pattern in suspicious_patterns:
|
||||
if re.search(pattern, url, re.IGNORECASE):
|
||||
logger.warning(f"Blocked suspicious pattern in URL: {url}")
|
||||
return False
|
||||
|
||||
# Check URL length (prevent buffer overflow attacks)
|
||||
if len(url) > 2048:
|
||||
logger.warning(f"Blocked excessively long URL (length: {len(url)})")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def sanitize_filename(self, filename: str) -> str:
|
||||
"""Sanitize filenames to prevent directory traversal and injection."""
|
||||
if not filename:
|
||||
return "default"
|
||||
|
||||
# Remove path separators and dangerous characters
|
||||
sanitized = re.sub(r'[<>:"|?*\\/]', '_', filename)
|
||||
|
||||
# Remove null bytes and control characters
|
||||
sanitized = re.sub(r'[\x00-\x1F\x7F]', '', sanitized)
|
||||
|
||||
# Remove leading/trailing dots and spaces
|
||||
sanitized = sanitized.strip('. ')
|
||||
|
||||
# Prevent reserved Windows filenames
|
||||
reserved_names = [
|
||||
'CON', 'PRN', 'AUX', 'NUL',
|
||||
'COM1', 'COM2', 'COM3', 'COM4', 'COM5', 'COM6', 'COM7', 'COM8', 'COM9',
|
||||
'LPT1', 'LPT2', 'LPT3', 'LPT4', 'LPT5', 'LPT6', 'LPT7', 'LPT8', 'LPT9'
|
||||
]
|
||||
|
||||
if sanitized.upper() in reserved_names:
|
||||
sanitized = f"file_{sanitized}"
|
||||
|
||||
# Limit length
|
||||
sanitized = sanitized[:255]
|
||||
|
||||
return sanitized if sanitized else "default"
|
||||
|
||||
|
||||
# Global instances
|
||||
_rate_limiter = RateLimiter(requests_per_minute=30)
|
||||
_sanitizer = ContentSanitizer()
|
||||
|
||||
|
||||
def wait_for_rate_limit() -> None:
|
||||
"""Apply rate limiting."""
|
||||
_rate_limiter.wait_if_needed()
|
||||
|
||||
|
||||
def sanitize_html_content(html: str) -> str:
|
||||
"""Sanitize HTML content."""
|
||||
return _sanitizer.sanitize_html(html)
|
||||
|
||||
|
||||
def sanitize_text_content(text: Optional[str]) -> str:
|
||||
"""Sanitize text content."""
|
||||
return _sanitizer.sanitize_text(text)
|
||||
|
||||
|
||||
def validate_url_security(url: str) -> bool:
|
||||
"""Validate URL for security."""
|
||||
return _sanitizer.validate_url_security(url)
|
||||
|
||||
|
||||
def sanitize_filename(filename: str) -> str:
|
||||
"""Sanitize filename."""
|
||||
return _sanitizer.sanitize_filename(filename)
|
@ -1,113 +0,0 @@
|
||||
"""URL and path validation utilities."""
|
||||
|
||||
import os
|
||||
import urllib.parse
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from .config import Config
|
||||
from .exceptions import ValidationError, FileOperationError
|
||||
from .security import validate_url_security, sanitize_filename
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def validate_url(url: str) -> bool:
|
||||
"""Validate URL against whitelist of allowed domains"""
|
||||
try:
|
||||
logger.debug(f"Validating URL: {url}")
|
||||
|
||||
# Enhanced security validation first
|
||||
if not validate_url_security(url):
|
||||
raise ValidationError(f"URL failed security validation: {url}")
|
||||
|
||||
parsed = urllib.parse.urlparse(url)
|
||||
if not parsed.scheme or not parsed.netloc:
|
||||
raise ValidationError("Invalid URL format")
|
||||
|
||||
# Check if domain is in allowed list
|
||||
domain = parsed.netloc.lower()
|
||||
allowed_domains = Config.get_allowed_domains()
|
||||
if domain not in allowed_domains:
|
||||
raise ValidationError(f"Domain {domain} not in allowed list: {allowed_domains}")
|
||||
|
||||
logger.debug(f"URL validation successful for domain: {domain}")
|
||||
return True
|
||||
except ValidationError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"URL validation failed for {url}: {e}")
|
||||
raise ValidationError(f"URL validation failed: {e}")
|
||||
|
||||
|
||||
def validate_output_path(path: str, base_dir: str) -> str:
|
||||
"""Validate and sanitize output file path"""
|
||||
logger.debug(f"Validating output path: {path} in base directory: {base_dir}")
|
||||
|
||||
try:
|
||||
# Sanitize the filename component
|
||||
dir_part, filename = os.path.split(path)
|
||||
if filename:
|
||||
sanitized_filename = sanitize_filename(filename)
|
||||
path = os.path.join(dir_part, sanitized_filename)
|
||||
logger.debug(f"Sanitized filename: {filename} -> {sanitized_filename}")
|
||||
|
||||
# Resolve to absolute path and check if it's safe
|
||||
abs_path = os.path.abspath(path)
|
||||
abs_base = os.path.abspath(base_dir)
|
||||
|
||||
# Ensure path is within allowed directory
|
||||
if not abs_path.startswith(abs_base):
|
||||
error_msg = f"Output path {abs_path} is outside allowed directory {abs_base}"
|
||||
logger.error(error_msg)
|
||||
raise ValidationError(error_msg)
|
||||
|
||||
# Additional security check for suspicious patterns - only check for directory traversal
|
||||
# Note: We allow absolute paths since they're resolved safely above
|
||||
if '..' in path:
|
||||
error_msg = f"Directory traversal detected in path: {path}"
|
||||
logger.error(error_msg)
|
||||
raise ValidationError(error_msg)
|
||||
|
||||
# Ensure output directory exists
|
||||
os.makedirs(abs_base, exist_ok=True)
|
||||
logger.debug(f"Output path validated: {abs_path}")
|
||||
|
||||
return abs_path
|
||||
except OSError as e:
|
||||
raise FileOperationError(f"Failed to create or access directory {base_dir}: {e}")
|
||||
except ValidationError:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise FileOperationError(f"Unexpected error validating path: {e}")
|
||||
|
||||
|
||||
def validate_link(link: Optional[str], base_url: str) -> Optional[str]:
|
||||
"""Validate and sanitize article links"""
|
||||
if not link:
|
||||
return None
|
||||
|
||||
try:
|
||||
# Handle relative URLs
|
||||
if link.startswith('/'):
|
||||
parsed_base = urllib.parse.urlparse(base_url)
|
||||
link = f"{parsed_base.scheme}://{parsed_base.netloc}{link}"
|
||||
|
||||
# Enhanced security validation
|
||||
if not validate_url_security(link):
|
||||
logger.warning(f"Link failed security validation: {link}")
|
||||
return None
|
||||
|
||||
# Validate the resulting URL
|
||||
parsed = urllib.parse.urlparse(link)
|
||||
if not parsed.scheme or not parsed.netloc:
|
||||
return None
|
||||
|
||||
# Ensure it's from allowed domain
|
||||
domain = parsed.netloc.lower()
|
||||
if domain not in Config.get_allowed_domains():
|
||||
return None
|
||||
|
||||
return link
|
||||
except Exception:
|
||||
return None
|
File diff suppressed because one or more lines are too long
@ -1 +0,0 @@
|
||||
# Tests package
|
@ -1,116 +0,0 @@
|
||||
"""Tests for configuration module."""
|
||||
|
||||
import pytest
|
||||
import os
|
||||
from unittest.mock import patch
|
||||
|
||||
from src.rss_scraper.config import Config
|
||||
|
||||
|
||||
class TestConfig:
|
||||
"""Test configuration functionality."""
|
||||
|
||||
def test_default_values(self):
|
||||
"""Test that default configuration values are set correctly."""
|
||||
assert Config.MAX_SCROLL_ITERATIONS == 5
|
||||
assert Config.MAX_CONTENT_SIZE == 10 * 1024 * 1024
|
||||
assert Config.MAX_TITLE_LENGTH == 500
|
||||
assert Config.SCROLL_DELAY_SECONDS == 2.0
|
||||
assert Config.PAGE_TIMEOUT_MS == 120000
|
||||
assert Config.DEFAULT_URL == 'https://www.warhammer-community.com/en-gb/'
|
||||
assert Config.DEFAULT_OUTPUT_DIR == '.'
|
||||
assert Config.RSS_FILENAME == 'warhammer_rss_feed.xml'
|
||||
assert Config.DEBUG_HTML_FILENAME == 'page.html'
|
||||
assert Config.FEED_TITLE == 'Warhammer Community RSS Feed'
|
||||
assert Config.FEED_DESCRIPTION == 'Latest Warhammer Community Articles'
|
||||
|
||||
def test_environment_variable_override(self):
|
||||
"""Test that environment variables override default values."""
|
||||
with patch.dict(os.environ, {
|
||||
'MAX_SCROLL_ITERATIONS': '10',
|
||||
'MAX_CONTENT_SIZE': '20971520', # 20MB
|
||||
'SCROLL_DELAY_SECONDS': '1.5',
|
||||
'DEFAULT_URL': 'https://example.com',
|
||||
'RSS_FILENAME': 'custom_feed.xml'
|
||||
}):
|
||||
# Need to reload the config to pick up environment changes
|
||||
import importlib
|
||||
import config
|
||||
importlib.reload(config)
|
||||
|
||||
assert config.Config.MAX_SCROLL_ITERATIONS == 10
|
||||
assert config.Config.MAX_CONTENT_SIZE == 20971520
|
||||
assert config.Config.SCROLL_DELAY_SECONDS == 1.5
|
||||
assert config.Config.DEFAULT_URL == 'https://example.com'
|
||||
assert config.Config.RSS_FILENAME == 'custom_feed.xml'
|
||||
|
||||
def test_get_output_dir_with_override(self):
|
||||
"""Test get_output_dir method with override."""
|
||||
result = Config.get_output_dir('/custom/path')
|
||||
assert result == '/custom/path'
|
||||
|
||||
def test_get_output_dir_without_override(self):
|
||||
"""Test get_output_dir method without override."""
|
||||
result = Config.get_output_dir()
|
||||
assert result == Config.DEFAULT_OUTPUT_DIR
|
||||
|
||||
def test_get_allowed_domains_default(self):
|
||||
"""Test get_allowed_domains returns default domains."""
|
||||
domains = Config.get_allowed_domains()
|
||||
assert 'warhammer-community.com' in domains
|
||||
assert 'www.warhammer-community.com' in domains
|
||||
|
||||
def test_get_allowed_domains_from_env(self):
|
||||
"""Test get_allowed_domains reads from environment variable."""
|
||||
with patch.dict(os.environ, {
|
||||
'ALLOWED_DOMAINS': 'example.com,test.com,another.com'
|
||||
}):
|
||||
domains = Config.get_allowed_domains()
|
||||
assert domains == ['example.com', 'test.com', 'another.com']
|
||||
|
||||
def test_validate_config_success(self):
|
||||
"""Test that valid configuration passes validation."""
|
||||
# Should not raise any exception
|
||||
Config.validate_config()
|
||||
|
||||
def test_validate_config_negative_scroll_iterations(self):
|
||||
"""Test validation fails for negative scroll iterations."""
|
||||
with patch.object(Config, 'MAX_SCROLL_ITERATIONS', -1):
|
||||
with pytest.raises(ValueError, match="MAX_SCROLL_ITERATIONS must be non-negative"):
|
||||
Config.validate_config()
|
||||
|
||||
def test_validate_config_zero_content_size(self):
|
||||
"""Test validation fails for zero content size."""
|
||||
with patch.object(Config, 'MAX_CONTENT_SIZE', 0):
|
||||
with pytest.raises(ValueError, match="MAX_CONTENT_SIZE must be positive"):
|
||||
Config.validate_config()
|
||||
|
||||
def test_validate_config_zero_title_length(self):
|
||||
"""Test validation fails for zero title length."""
|
||||
with patch.object(Config, 'MAX_TITLE_LENGTH', 0):
|
||||
with pytest.raises(ValueError, match="MAX_TITLE_LENGTH must be positive"):
|
||||
Config.validate_config()
|
||||
|
||||
def test_validate_config_negative_scroll_delay(self):
|
||||
"""Test validation fails for negative scroll delay."""
|
||||
with patch.object(Config, 'SCROLL_DELAY_SECONDS', -1.0):
|
||||
with pytest.raises(ValueError, match="SCROLL_DELAY_SECONDS must be non-negative"):
|
||||
Config.validate_config()
|
||||
|
||||
def test_validate_config_zero_timeout(self):
|
||||
"""Test validation fails for zero timeout."""
|
||||
with patch.object(Config, 'PAGE_TIMEOUT_MS', 0):
|
||||
with pytest.raises(ValueError, match="PAGE_TIMEOUT_MS must be positive"):
|
||||
Config.validate_config()
|
||||
|
||||
def test_validate_config_invalid_url(self):
|
||||
"""Test validation fails for invalid default URL."""
|
||||
with patch.object(Config, 'DEFAULT_URL', 'not-a-url'):
|
||||
with pytest.raises(ValueError, match="DEFAULT_URL must be a valid HTTP/HTTPS URL"):
|
||||
Config.validate_config()
|
||||
|
||||
def test_validate_config_empty_domains(self):
|
||||
"""Test validation fails for empty allowed domains."""
|
||||
with patch.object(Config, 'get_allowed_domains', return_value=[]):
|
||||
with pytest.raises(ValueError, match="ALLOWED_DOMAINS cannot be empty"):
|
||||
Config.validate_config()
|
@ -1,202 +0,0 @@
|
||||
"""Tests for main module functionality."""
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
import tempfile
|
||||
from unittest.mock import patch, MagicMock
|
||||
from argparse import Namespace
|
||||
|
||||
from main import parse_arguments, setup_logging, scrape_and_generate_rss
|
||||
from src.rss_scraper.exceptions import ValidationError, NetworkError, ParseError
|
||||
|
||||
|
||||
class TestParseArguments:
|
||||
"""Test command line argument parsing."""
|
||||
|
||||
def test_parse_arguments_defaults(self):
|
||||
"""Test parsing with default arguments."""
|
||||
with patch('sys.argv', ['main.py']):
|
||||
args = parse_arguments()
|
||||
|
||||
assert args.url == 'https://www.warhammer-community.com/en-gb/'
|
||||
assert args.output_dir is None
|
||||
assert args.max_scroll == 5
|
||||
assert args.log_level == 'INFO'
|
||||
assert args.log_file == 'scraper.log'
|
||||
|
||||
def test_parse_arguments_custom_values(self):
|
||||
"""Test parsing with custom argument values."""
|
||||
test_args = [
|
||||
'main.py',
|
||||
'--url', 'https://example.com',
|
||||
'--output-dir', '/custom/path',
|
||||
'--max-scroll', '10',
|
||||
'--log-level', 'DEBUG',
|
||||
'--log-file', 'custom.log'
|
||||
]
|
||||
|
||||
with patch('sys.argv', test_args):
|
||||
args = parse_arguments()
|
||||
|
||||
assert args.url == 'https://example.com'
|
||||
assert args.output_dir == '/custom/path'
|
||||
assert args.max_scroll == 10
|
||||
assert args.log_level == 'DEBUG'
|
||||
assert args.log_file == 'custom.log'
|
||||
|
||||
def test_parse_arguments_invalid_max_scroll(self):
|
||||
"""Test parsing fails with invalid max_scroll value."""
|
||||
test_args = ['main.py', '--max-scroll', '-1']
|
||||
|
||||
with patch('sys.argv', test_args):
|
||||
with pytest.raises(SystemExit):
|
||||
parse_arguments()
|
||||
|
||||
def test_parse_arguments_relative_output_dir(self):
|
||||
"""Test that relative output directory is converted to absolute."""
|
||||
test_args = ['main.py', '--output-dir', 'relative/path']
|
||||
|
||||
with patch('sys.argv', test_args):
|
||||
args = parse_arguments()
|
||||
|
||||
assert args.output_dir.startswith('/') # Should be absolute path
|
||||
assert args.output_dir.endswith('relative/path')
|
||||
|
||||
|
||||
class TestSetupLogging:
|
||||
"""Test logging setup functionality."""
|
||||
|
||||
def test_setup_logging_info_level(self):
|
||||
"""Test logging setup with INFO level."""
|
||||
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
|
||||
setup_logging('INFO', temp_file.name)
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger('test')
|
||||
logger.info("Test message")
|
||||
logger.debug("Debug message") # Should not appear
|
||||
|
||||
# Check that the log file was created and has correct level
|
||||
assert logging.getLogger().level == logging.INFO
|
||||
|
||||
def test_setup_logging_debug_level(self):
|
||||
"""Test logging setup with DEBUG level."""
|
||||
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
|
||||
setup_logging('DEBUG', temp_file.name)
|
||||
|
||||
import logging
|
||||
assert logging.getLogger().level == logging.DEBUG
|
||||
|
||||
def test_setup_logging_clears_existing_handlers(self):
|
||||
"""Test that setup_logging clears existing handlers."""
|
||||
import logging
|
||||
|
||||
# Add a dummy handler
|
||||
dummy_handler = logging.StreamHandler()
|
||||
logging.getLogger().addHandler(dummy_handler)
|
||||
initial_handler_count = len(logging.getLogger().handlers)
|
||||
|
||||
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
|
||||
setup_logging('INFO', temp_file.name)
|
||||
|
||||
# Should have exactly 2 handlers (console + file)
|
||||
assert len(logging.getLogger().handlers) == 2
|
||||
|
||||
|
||||
class TestScrapeAndGenerateRss:
|
||||
"""Test main scraping function."""
|
||||
|
||||
@patch('main.save_debug_html')
|
||||
@patch('main.save_rss_feed')
|
||||
@patch('main.generate_rss_feed')
|
||||
@patch('main.extract_articles_from_html')
|
||||
@patch('main.load_page_with_retry')
|
||||
@patch('main.validate_url')
|
||||
def test_scrape_and_generate_rss_success(
|
||||
self, mock_validate_url, mock_load_page, mock_extract_articles,
|
||||
mock_generate_rss, mock_save_rss, mock_save_html
|
||||
):
|
||||
"""Test successful RSS scraping and generation."""
|
||||
# Setup mocks
|
||||
mock_validate_url.return_value = True
|
||||
mock_load_page.return_value = "<html>test</html>"
|
||||
mock_extract_articles.return_value = [
|
||||
{'title': 'Test', 'link': 'http://example.com', 'date': 'date'}
|
||||
]
|
||||
mock_generate_rss.return_value = b"<rss>feed</rss>"
|
||||
mock_save_rss.return_value = "/path/to/feed.xml"
|
||||
|
||||
url = "https://www.warhammer-community.com/en-gb/"
|
||||
output_dir = "/test/output"
|
||||
|
||||
# Should not raise any exception
|
||||
scrape_and_generate_rss(url, output_dir)
|
||||
|
||||
# Verify all functions were called
|
||||
mock_validate_url.assert_called_once_with(url)
|
||||
mock_load_page.assert_called_once_with(url)
|
||||
mock_extract_articles.assert_called_once_with("<html>test</html>", url)
|
||||
mock_generate_rss.assert_called_once()
|
||||
mock_save_rss.assert_called_once()
|
||||
mock_save_html.assert_called_once()
|
||||
|
||||
@patch('main.validate_url')
|
||||
def test_scrape_and_generate_rss_validation_error(self, mock_validate_url):
|
||||
"""Test scraping fails with validation error."""
|
||||
mock_validate_url.side_effect = ValidationError("Invalid URL")
|
||||
|
||||
with pytest.raises(ValidationError):
|
||||
scrape_and_generate_rss("invalid-url")
|
||||
|
||||
@patch('main.load_page_with_retry')
|
||||
@patch('main.validate_url')
|
||||
def test_scrape_and_generate_rss_network_error(
|
||||
self, mock_validate_url, mock_load_page
|
||||
):
|
||||
"""Test scraping fails with network error."""
|
||||
mock_validate_url.return_value = True
|
||||
mock_load_page.side_effect = NetworkError("Network error")
|
||||
|
||||
with pytest.raises(NetworkError):
|
||||
scrape_and_generate_rss("https://www.warhammer-community.com/en-gb/")
|
||||
|
||||
@patch('main.extract_articles_from_html')
|
||||
@patch('main.load_page_with_retry')
|
||||
@patch('main.validate_url')
|
||||
def test_scrape_and_generate_rss_parse_error(
|
||||
self, mock_validate_url, mock_load_page, mock_extract_articles
|
||||
):
|
||||
"""Test scraping fails with parse error."""
|
||||
mock_validate_url.return_value = True
|
||||
mock_load_page.return_value = "<html>test</html>"
|
||||
mock_extract_articles.side_effect = ParseError("Parse error")
|
||||
|
||||
with pytest.raises(ParseError):
|
||||
scrape_and_generate_rss("https://www.warhammer-community.com/en-gb/")
|
||||
|
||||
@patch('main.save_debug_html')
|
||||
@patch('main.save_rss_feed')
|
||||
@patch('main.generate_rss_feed')
|
||||
@patch('main.extract_articles_from_html')
|
||||
@patch('main.load_page_with_retry')
|
||||
@patch('main.validate_url')
|
||||
def test_scrape_and_generate_rss_default_output_dir(
|
||||
self, mock_validate_url, mock_load_page, mock_extract_articles,
|
||||
mock_generate_rss, mock_save_rss, mock_save_html
|
||||
):
|
||||
"""Test scraping uses default output directory when none provided."""
|
||||
# Setup mocks
|
||||
mock_validate_url.return_value = True
|
||||
mock_load_page.return_value = "<html>test</html>"
|
||||
mock_extract_articles.return_value = []
|
||||
mock_generate_rss.return_value = b"<rss>feed</rss>"
|
||||
mock_save_rss.return_value = "/path/to/feed.xml"
|
||||
|
||||
url = "https://www.warhammer-community.com/en-gb/"
|
||||
|
||||
# Call without output_dir
|
||||
scrape_and_generate_rss(url)
|
||||
|
||||
# Verify functions were called (output_dir would be set to default)
|
||||
mock_validate_url.assert_called_once_with(url)
|
||||
mock_save_rss.assert_called_once_with(b"<rss>feed</rss>", ".") # Default output dir
|
@ -1,208 +0,0 @@
|
||||
"""Tests for parser module."""
|
||||
|
||||
import pytest
|
||||
from datetime import datetime
|
||||
import pytz
|
||||
from unittest.mock import patch
|
||||
|
||||
from src.rss_scraper.parser import sanitize_text, extract_articles_from_html
|
||||
from src.rss_scraper.exceptions import ParseError
|
||||
from src.rss_scraper.config import Config
|
||||
|
||||
|
||||
class TestSanitizeText:
|
||||
"""Test text sanitization functionality."""
|
||||
|
||||
def test_sanitize_normal_text(self):
|
||||
"""Test sanitization of normal text."""
|
||||
text = "Normal article title"
|
||||
result = sanitize_text(text)
|
||||
assert result == "Normal article title"
|
||||
|
||||
def test_sanitize_none_text(self):
|
||||
"""Test sanitization of None text."""
|
||||
result = sanitize_text(None)
|
||||
assert result == "No title"
|
||||
|
||||
def test_sanitize_empty_text(self):
|
||||
"""Test sanitization of empty text."""
|
||||
result = sanitize_text("")
|
||||
assert result == "No title"
|
||||
|
||||
def test_sanitize_whitespace_text(self):
|
||||
"""Test sanitization of whitespace-only text."""
|
||||
result = sanitize_text(" ")
|
||||
assert result == "No title"
|
||||
|
||||
def test_remove_dangerous_patterns(self):
|
||||
"""Test removal of dangerous patterns."""
|
||||
dangerous_text = "Title with <script>alert('xss')</script> content"
|
||||
result = sanitize_text(dangerous_text)
|
||||
assert "<script" not in result
|
||||
assert "</script" not in result
|
||||
assert "alert('xss')" in result # Only script tags should be removed
|
||||
|
||||
def test_length_limit(self):
|
||||
"""Test that text is limited to max length."""
|
||||
long_text = "a" * 1000
|
||||
result = sanitize_text(long_text)
|
||||
assert len(result) <= Config.MAX_TITLE_LENGTH
|
||||
|
||||
def test_case_insensitive_pattern_removal(self):
|
||||
"""Test that dangerous patterns are removed case-insensitively."""
|
||||
text = "Title with <SCRIPT>alert('xss')</SCRIPT> and javascript: protocol"
|
||||
result = sanitize_text(text)
|
||||
assert "<SCRIPT" not in result
|
||||
assert "</SCRIPT" not in result
|
||||
assert "javascript:" not in result
|
||||
|
||||
|
||||
class TestExtractArticlesFromHtml:
|
||||
"""Test article extraction from HTML."""
|
||||
|
||||
def test_extract_articles_valid_html(self):
|
||||
"""Test extraction from valid HTML with articles."""
|
||||
html = """
|
||||
<html>
|
||||
<body>
|
||||
<article>
|
||||
<h3 class="newsCard-title-sm">Test Article 1</h3>
|
||||
<a href="/article/test-1">Read more</a>
|
||||
<time>01 Jan 24</time>
|
||||
</article>
|
||||
<article>
|
||||
<h3 class="newsCard-title-lg">Test Article 2</h3>
|
||||
<a href="https://www.warhammer-community.com/article/test-2">Read more</a>
|
||||
<time>02 Jan 24</time>
|
||||
</article>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
base_url = "https://www.warhammer-community.com"
|
||||
articles = extract_articles_from_html(html, base_url)
|
||||
|
||||
assert len(articles) == 2
|
||||
assert articles[0]['title'] == "Test Article 2" # Sorted by date, newest first
|
||||
assert articles[1]['title'] == "Test Article 1"
|
||||
assert "warhammer-community.com" in articles[0]['link']
|
||||
assert "warhammer-community.com" in articles[1]['link']
|
||||
|
||||
def test_extract_articles_no_articles(self):
|
||||
"""Test extraction from HTML with no articles."""
|
||||
html = """
|
||||
<html>
|
||||
<body>
|
||||
<div>No articles here</div>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
base_url = "https://www.warhammer-community.com"
|
||||
articles = extract_articles_from_html(html, base_url)
|
||||
|
||||
assert len(articles) == 0
|
||||
|
||||
def test_extract_articles_duplicate_links(self):
|
||||
"""Test that duplicate links are filtered out."""
|
||||
html = """
|
||||
<html>
|
||||
<body>
|
||||
<article>
|
||||
<h3 class="newsCard-title-sm">Test Article 1</h3>
|
||||
<a href="/article/test-1">Read more</a>
|
||||
<time>01 Jan 24</time>
|
||||
</article>
|
||||
<article>
|
||||
<h3 class="newsCard-title-lg">Test Article 1 Duplicate</h3>
|
||||
<a href="/article/test-1">Read more</a>
|
||||
<time>02 Jan 24</time>
|
||||
</article>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
base_url = "https://www.warhammer-community.com"
|
||||
articles = extract_articles_from_html(html, base_url)
|
||||
|
||||
assert len(articles) == 1 # Duplicate should be filtered out
|
||||
assert articles[0]['title'] == "Test Article 1"
|
||||
|
||||
def test_extract_articles_invalid_links(self):
|
||||
"""Test handling of articles with invalid links."""
|
||||
html = """
|
||||
<html>
|
||||
<body>
|
||||
<article>
|
||||
<h3 class="newsCard-title-sm">Valid Article</h3>
|
||||
<a href="/article/valid">Read more</a>
|
||||
<time>01 Jan 24</time>
|
||||
</article>
|
||||
<article>
|
||||
<h3 class="newsCard-title-lg">Invalid Article</h3>
|
||||
<a href="https://malicious-site.com/article">Read more</a>
|
||||
<time>02 Jan 24</time>
|
||||
</article>
|
||||
<article>
|
||||
<h3 class="newsCard-title-sm">No Link Article</h3>
|
||||
<time>03 Jan 24</time>
|
||||
</article>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
base_url = "https://www.warhammer-community.com"
|
||||
articles = extract_articles_from_html(html, base_url)
|
||||
|
||||
assert len(articles) == 1 # Only valid article should be included
|
||||
assert articles[0]['title'] == "Valid Article"
|
||||
|
||||
def test_extract_articles_date_parsing(self):
|
||||
"""Test parsing of various date formats."""
|
||||
html = """
|
||||
<html>
|
||||
<body>
|
||||
<article>
|
||||
<h3 class="newsCard-title-sm">Article with good date</h3>
|
||||
<a href="/article/1">Read more</a>
|
||||
<time>15 Mar 24</time>
|
||||
</article>
|
||||
<article>
|
||||
<h3 class="newsCard-title-lg">Article with bad date</h3>
|
||||
<a href="/article/2">Read more</a>
|
||||
<time>Invalid Date Format</time>
|
||||
</article>
|
||||
<article>
|
||||
<h3 class="newsCard-title-sm">Article with reading time</h3>
|
||||
<a href="/article/3">Read more</a>
|
||||
<time>5 min read</time>
|
||||
<time>20 Mar 24</time>
|
||||
</article>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
base_url = "https://www.warhammer-community.com"
|
||||
articles = extract_articles_from_html(html, base_url)
|
||||
|
||||
assert len(articles) == 3
|
||||
|
||||
# Check that dates are parsed correctly
|
||||
for article in articles:
|
||||
assert isinstance(article['date'], datetime)
|
||||
assert article['date'].tzinfo is not None
|
||||
|
||||
def test_extract_articles_malformed_html(self):
|
||||
"""Test handling of malformed HTML."""
|
||||
malformed_html = "<html><body><article><h3>Unclosed tags"
|
||||
|
||||
base_url = "https://www.warhammer-community.com"
|
||||
# Should not raise ParseError - BeautifulSoup handles malformed HTML gracefully
|
||||
articles = extract_articles_from_html(malformed_html, base_url)
|
||||
assert isinstance(articles, list)
|
||||
|
||||
def test_extract_articles_invalid_html(self):
|
||||
"""Test handling of completely invalid HTML."""
|
||||
with patch('bs4.BeautifulSoup', side_effect=Exception("Parser error")):
|
||||
with pytest.raises(ParseError):
|
||||
extract_articles_from_html("<html></html>", "https://example.com")
|
@ -1,162 +0,0 @@
|
||||
"""Tests for RSS generator module."""
|
||||
|
||||
import pytest
|
||||
import os
|
||||
import tempfile
|
||||
from datetime import datetime
|
||||
import pytz
|
||||
from unittest.mock import patch, mock_open
|
||||
|
||||
from src.rss_scraper.rss_generator import generate_rss_feed, save_rss_feed, save_debug_html
|
||||
from src.rss_scraper.exceptions import FileOperationError
|
||||
|
||||
|
||||
class TestGenerateRssFeed:
|
||||
"""Test RSS feed generation functionality."""
|
||||
|
||||
def test_generate_rss_feed_with_articles(self):
|
||||
"""Test RSS generation with valid articles."""
|
||||
timezone = pytz.UTC
|
||||
articles = [
|
||||
{
|
||||
'title': 'Test Article 1',
|
||||
'link': 'https://example.com/article1',
|
||||
'date': datetime(2024, 1, 1, tzinfo=timezone)
|
||||
},
|
||||
{
|
||||
'title': 'Test Article 2',
|
||||
'link': 'https://example.com/article2',
|
||||
'date': datetime(2024, 1, 2, tzinfo=timezone)
|
||||
}
|
||||
]
|
||||
|
||||
feed_url = "https://example.com"
|
||||
rss_content = generate_rss_feed(articles, feed_url)
|
||||
|
||||
assert isinstance(rss_content, bytes)
|
||||
rss_str = rss_content.decode('utf-8')
|
||||
assert 'Test Article 1' in rss_str
|
||||
assert 'Test Article 2' in rss_str
|
||||
assert 'https://example.com/article1' in rss_str
|
||||
assert 'https://example.com/article2' in rss_str
|
||||
assert '<?xml version=' in rss_str
|
||||
assert '<rss version=' in rss_str
|
||||
|
||||
def test_generate_rss_feed_empty_articles(self):
|
||||
"""Test RSS generation with empty articles list."""
|
||||
articles = []
|
||||
feed_url = "https://example.com"
|
||||
|
||||
rss_content = generate_rss_feed(articles, feed_url)
|
||||
|
||||
assert isinstance(rss_content, bytes)
|
||||
rss_str = rss_content.decode('utf-8')
|
||||
assert '<?xml version=' in rss_str
|
||||
assert '<rss version=' in rss_str
|
||||
# Should still contain feed metadata
|
||||
assert 'Warhammer Community RSS Feed' in rss_str
|
||||
|
||||
def test_generate_rss_feed_unicode_content(self):
|
||||
"""Test RSS generation with unicode content."""
|
||||
timezone = pytz.UTC
|
||||
articles = [
|
||||
{
|
||||
'title': 'Tëst Artìclé with Ūnïcödë',
|
||||
'link': 'https://example.com/unicode',
|
||||
'date': datetime(2024, 1, 1, tzinfo=timezone)
|
||||
}
|
||||
]
|
||||
|
||||
feed_url = "https://example.com"
|
||||
rss_content = generate_rss_feed(articles, feed_url)
|
||||
|
||||
assert isinstance(rss_content, bytes)
|
||||
rss_str = rss_content.decode('utf-8')
|
||||
assert 'Tëst Artìclé with Ūnïcödë' in rss_str
|
||||
|
||||
|
||||
class TestSaveRssFeed:
|
||||
"""Test RSS feed saving functionality."""
|
||||
|
||||
def test_save_rss_feed_success(self):
|
||||
"""Test successful RSS feed saving."""
|
||||
rss_content = b'<?xml version="1.0"?><rss>test</rss>'
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
result_path = save_rss_feed(rss_content, temp_dir)
|
||||
|
||||
assert os.path.exists(result_path)
|
||||
assert result_path.endswith('warhammer_rss_feed.xml')
|
||||
|
||||
with open(result_path, 'rb') as f:
|
||||
saved_content = f.read()
|
||||
assert saved_content == rss_content
|
||||
|
||||
def test_save_rss_feed_permission_error(self):
|
||||
"""Test RSS feed saving with permission error."""
|
||||
rss_content = b'<?xml version="1.0"?><rss>test</rss>'
|
||||
|
||||
with patch('builtins.open', side_effect=PermissionError("Permission denied")):
|
||||
with pytest.raises(FileOperationError):
|
||||
save_rss_feed(rss_content, "/some/path")
|
||||
|
||||
def test_save_rss_feed_creates_directory(self):
|
||||
"""Test that RSS feed saving creates directory if needed."""
|
||||
rss_content = b'<?xml version="1.0"?><rss>test</rss>'
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
new_subdir = os.path.join(temp_dir, "new_subdir")
|
||||
result_path = save_rss_feed(rss_content, new_subdir)
|
||||
|
||||
assert os.path.exists(new_subdir)
|
||||
assert os.path.exists(result_path)
|
||||
|
||||
|
||||
class TestSaveDebugHtml:
|
||||
"""Test debug HTML saving functionality."""
|
||||
|
||||
def test_save_debug_html_success(self):
|
||||
"""Test successful debug HTML saving."""
|
||||
html_content = "<html><body>Test content</body></html>"
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
save_debug_html(html_content, temp_dir)
|
||||
|
||||
html_path = os.path.join(temp_dir, "page.html")
|
||||
assert os.path.exists(html_path)
|
||||
|
||||
with open(html_path, 'r', encoding='utf-8') as f:
|
||||
saved_content = f.read()
|
||||
# BeautifulSoup prettifies the content
|
||||
assert "Test content" in saved_content
|
||||
|
||||
def test_save_debug_html_permission_error(self):
|
||||
"""Test debug HTML saving with permission error (should not raise)."""
|
||||
html_content = "<html><body>Test content</body></html>"
|
||||
|
||||
with patch('builtins.open', side_effect=PermissionError("Permission denied")):
|
||||
# Should not raise exception, just log warning
|
||||
save_debug_html(html_content, "/some/path")
|
||||
|
||||
def test_save_debug_html_malformed_content(self):
|
||||
"""Test debug HTML saving with malformed HTML content."""
|
||||
malformed_html = "<html><body>Unclosed tags"
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
# Should handle malformed HTML gracefully
|
||||
save_debug_html(malformed_html, temp_dir)
|
||||
|
||||
html_path = os.path.join(temp_dir, "page.html")
|
||||
assert os.path.exists(html_path)
|
||||
|
||||
def test_save_debug_html_creates_directory(self):
|
||||
"""Test that debug HTML saving creates directory if needed."""
|
||||
html_content = "<html><body>Test content</body></html>"
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
new_subdir = os.path.join(temp_dir, "new_subdir")
|
||||
save_debug_html(html_content, new_subdir)
|
||||
|
||||
assert os.path.exists(new_subdir)
|
||||
html_path = os.path.join(new_subdir, "page.html")
|
||||
assert os.path.exists(html_path)
|
@ -1,170 +0,0 @@
|
||||
"""Tests for validation module."""
|
||||
|
||||
import pytest
|
||||
import os
|
||||
import tempfile
|
||||
from unittest.mock import patch
|
||||
|
||||
from src.rss_scraper.validation import validate_url, validate_output_path, validate_link
|
||||
from src.rss_scraper.exceptions import ValidationError, FileOperationError
|
||||
from src.rss_scraper.config import Config
|
||||
|
||||
|
||||
class TestValidateUrl:
|
||||
"""Test URL validation functionality."""
|
||||
|
||||
def test_valid_url(self):
|
||||
"""Test validation of valid URLs."""
|
||||
valid_urls = [
|
||||
"https://www.warhammer-community.com/en-gb/",
|
||||
"https://warhammer-community.com/some/path",
|
||||
]
|
||||
|
||||
for url in valid_urls:
|
||||
assert validate_url(url) is True
|
||||
|
||||
def test_invalid_url_format(self):
|
||||
"""Test validation fails for invalid URL formats."""
|
||||
invalid_urls = [
|
||||
"not-a-url",
|
||||
"ftp://example.com",
|
||||
"",
|
||||
"http://",
|
||||
"https://",
|
||||
]
|
||||
|
||||
for url in invalid_urls:
|
||||
with pytest.raises(ValidationError):
|
||||
validate_url(url)
|
||||
|
||||
def test_disallowed_domain(self):
|
||||
"""Test validation fails for disallowed domains."""
|
||||
disallowed_urls = [
|
||||
"https://malicious-site.com",
|
||||
"https://example.com",
|
||||
"https://google.com",
|
||||
]
|
||||
|
||||
for url in disallowed_urls:
|
||||
with pytest.raises(ValidationError):
|
||||
validate_url(url)
|
||||
|
||||
def test_case_insensitive_domain(self):
|
||||
"""Test domain validation is case insensitive."""
|
||||
urls = [
|
||||
"https://WWW.WARHAMMER-COMMUNITY.COM",
|
||||
"https://Warhammer-Community.com",
|
||||
]
|
||||
|
||||
for url in urls:
|
||||
assert validate_url(url) is True
|
||||
|
||||
|
||||
class TestValidateOutputPath:
|
||||
"""Test output path validation functionality."""
|
||||
|
||||
def test_valid_path_within_base(self):
|
||||
"""Test validation of valid paths within base directory."""
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
test_path = os.path.join(temp_dir, "output.xml")
|
||||
result = validate_output_path(test_path, temp_dir)
|
||||
assert result == os.path.abspath(test_path)
|
||||
|
||||
def test_path_outside_base_directory(self):
|
||||
"""Test validation fails for paths outside base directory."""
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
outside_path = "/tmp/malicious.xml"
|
||||
with pytest.raises(ValidationError):
|
||||
validate_output_path(outside_path, temp_dir)
|
||||
|
||||
def test_absolute_path_within_base_directory(self):
|
||||
"""Test that absolute paths within base directory are allowed."""
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
# This should work - absolute path within the base directory
|
||||
abs_path = os.path.join(temp_dir, "output.xml")
|
||||
result = validate_output_path(abs_path, temp_dir)
|
||||
assert result == os.path.abspath(abs_path)
|
||||
|
||||
def test_creates_directory_if_not_exists(self):
|
||||
"""Test that validation creates directory if it doesn't exist."""
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
new_subdir = os.path.join(temp_dir, "new_subdir")
|
||||
test_path = os.path.join(new_subdir, "output.xml")
|
||||
|
||||
result = validate_output_path(test_path, new_subdir)
|
||||
|
||||
assert os.path.exists(new_subdir)
|
||||
assert result == os.path.abspath(test_path)
|
||||
|
||||
def test_directory_traversal_protection(self):
|
||||
"""Test that directory traversal attacks are blocked."""
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
# These should be blocked - either by directory traversal check or outside-base check
|
||||
traversal_paths = [
|
||||
"../../../etc/passwd",
|
||||
"subdir/../../../etc/passwd",
|
||||
"normal/../../../dangerous.xml"
|
||||
]
|
||||
|
||||
for path in traversal_paths:
|
||||
with pytest.raises(ValidationError): # Either error type is acceptable
|
||||
validate_output_path(path, temp_dir)
|
||||
|
||||
def test_permission_error(self):
|
||||
"""Test handling of permission errors."""
|
||||
with patch('os.makedirs', side_effect=PermissionError("Permission denied")):
|
||||
with pytest.raises(FileOperationError):
|
||||
validate_output_path("/some/path/file.xml", "/some/path")
|
||||
|
||||
|
||||
class TestValidateLink:
|
||||
"""Test link validation functionality."""
|
||||
|
||||
def test_valid_absolute_link(self):
|
||||
"""Test validation of valid absolute links."""
|
||||
base_url = "https://www.warhammer-community.com"
|
||||
valid_link = "https://www.warhammer-community.com/article"
|
||||
|
||||
result = validate_link(valid_link, base_url)
|
||||
assert result == valid_link
|
||||
|
||||
def test_valid_relative_link(self):
|
||||
"""Test validation of valid relative links."""
|
||||
base_url = "https://www.warhammer-community.com/en-gb/"
|
||||
relative_link = "/article/some-article"
|
||||
|
||||
result = validate_link(relative_link, base_url)
|
||||
assert result == "https://www.warhammer-community.com/article/some-article"
|
||||
|
||||
def test_none_link(self):
|
||||
"""Test handling of None link."""
|
||||
base_url = "https://www.warhammer-community.com"
|
||||
result = validate_link(None, base_url)
|
||||
assert result is None
|
||||
|
||||
def test_empty_link(self):
|
||||
"""Test handling of empty link."""
|
||||
base_url = "https://www.warhammer-community.com"
|
||||
result = validate_link("", base_url)
|
||||
assert result is None
|
||||
|
||||
def test_invalid_domain_link(self):
|
||||
"""Test rejection of links from invalid domains."""
|
||||
base_url = "https://www.warhammer-community.com"
|
||||
invalid_link = "https://malicious-site.com/article"
|
||||
|
||||
result = validate_link(invalid_link, base_url)
|
||||
assert result is None
|
||||
|
||||
def test_malformed_link(self):
|
||||
"""Test handling of malformed links."""
|
||||
base_url = "https://www.warhammer-community.com"
|
||||
malformed_links = [
|
||||
"not-a-url",
|
||||
"://missing-scheme",
|
||||
"https://",
|
||||
]
|
||||
|
||||
for link in malformed_links:
|
||||
result = validate_link(link, base_url)
|
||||
assert result is None
|
Loading…
x
Reference in New Issue
Block a user