Files
Gondulf/tests/unit/test_rate_limiter.py
Phil Skentelbery 074f74002c feat(phase-2): implement domain verification system
Implements complete domain verification flow with:
- rel=me link verification service
- HTML fetching with security controls
- Rate limiting to prevent abuse
- Email validation utilities
- Authorization and verification API endpoints
- User-facing templates for authorization and verification flows

This completes Phase 2: Domain Verification as designed.

Tests:
- All Phase 2 unit tests passing
- Coverage: 85% overall
- Migration tests updated

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-20 13:44:33 -07:00

172 lines
6.6 KiB
Python

"""Tests for rate limiter service."""
import pytest
import time
from unittest.mock import patch
from gondulf.services.rate_limiter import RateLimiter
class TestRateLimiter:
"""Tests for RateLimiter."""
def test_init_default_params(self):
"""Test initialization with default parameters."""
limiter = RateLimiter()
assert limiter.max_attempts == 3
assert limiter.window_seconds == 3600
def test_init_custom_params(self):
"""Test initialization with custom parameters."""
limiter = RateLimiter(max_attempts=5, window_hours=2)
assert limiter.max_attempts == 5
assert limiter.window_seconds == 7200
def test_check_rate_limit_no_attempts(self):
"""Test rate limit check with no previous attempts."""
limiter = RateLimiter()
assert limiter.check_rate_limit("example.com") is True
def test_check_rate_limit_within_limit(self):
"""Test rate limit check within limit."""
limiter = RateLimiter(max_attempts=3)
limiter.record_attempt("example.com")
limiter.record_attempt("example.com")
assert limiter.check_rate_limit("example.com") is True
def test_check_rate_limit_at_limit(self):
"""Test rate limit check at exact limit."""
limiter = RateLimiter(max_attempts=3)
limiter.record_attempt("example.com")
limiter.record_attempt("example.com")
limiter.record_attempt("example.com")
assert limiter.check_rate_limit("example.com") is False
def test_check_rate_limit_exceeded(self):
"""Test rate limit check when exceeded."""
limiter = RateLimiter(max_attempts=2)
limiter.record_attempt("example.com")
limiter.record_attempt("example.com")
assert limiter.check_rate_limit("example.com") is False
def test_record_attempt_creates_entry(self):
"""Test that record_attempt creates new entry."""
limiter = RateLimiter()
limiter.record_attempt("example.com")
assert "example.com" in limiter._attempts
assert len(limiter._attempts["example.com"]) == 1
def test_record_attempt_appends_to_existing(self):
"""Test that record_attempt appends to existing entry."""
limiter = RateLimiter()
limiter.record_attempt("example.com")
limiter.record_attempt("example.com")
assert len(limiter._attempts["example.com"]) == 2
def test_clean_old_attempts_removes_expired(self):
"""Test that old attempts are cleaned up."""
limiter = RateLimiter(max_attempts=3, window_hours=1)
# Mock time to control timestamps
with patch('time.time', return_value=1000):
limiter.record_attempt("example.com")
# Move time forward past window
with patch('time.time', return_value=1000 + 3700): # 1 hour + 100 seconds
limiter._clean_old_attempts("example.com")
assert "example.com" not in limiter._attempts
def test_clean_old_attempts_preserves_recent(self):
"""Test that recent attempts are preserved."""
limiter = RateLimiter(max_attempts=3, window_hours=1)
with patch('time.time', return_value=1000):
limiter.record_attempt("example.com")
# Move time forward but still within window
with patch('time.time', return_value=1000 + 1800): # 30 minutes
limiter._clean_old_attempts("example.com")
assert "example.com" in limiter._attempts
assert len(limiter._attempts["example.com"]) == 1
def test_check_rate_limit_cleans_old_attempts(self):
"""Test that check_rate_limit cleans old attempts."""
limiter = RateLimiter(max_attempts=2, window_hours=1)
# Record attempts at time 1000
with patch('time.time', return_value=1000):
limiter.record_attempt("example.com")
limiter.record_attempt("example.com")
# Check limit should be False
with patch('time.time', return_value=1000):
assert limiter.check_rate_limit("example.com") is False
# Move time forward past window
with patch('time.time', return_value=1000 + 3700):
# Old attempts should be cleaned, limit should pass
assert limiter.check_rate_limit("example.com") is True
def test_different_domains_independent(self):
"""Test that different domains have independent limits."""
limiter = RateLimiter(max_attempts=2)
limiter.record_attempt("example.com")
limiter.record_attempt("example.com")
limiter.record_attempt("other.com")
assert limiter.check_rate_limit("example.com") is False
assert limiter.check_rate_limit("other.com") is True
def test_get_remaining_attempts_initial(self):
"""Test getting remaining attempts initially."""
limiter = RateLimiter(max_attempts=3)
assert limiter.get_remaining_attempts("example.com") == 3
def test_get_remaining_attempts_after_one(self):
"""Test getting remaining attempts after one attempt."""
limiter = RateLimiter(max_attempts=3)
limiter.record_attempt("example.com")
assert limiter.get_remaining_attempts("example.com") == 2
def test_get_remaining_attempts_exhausted(self):
"""Test getting remaining attempts when exhausted."""
limiter = RateLimiter(max_attempts=3)
limiter.record_attempt("example.com")
limiter.record_attempt("example.com")
limiter.record_attempt("example.com")
assert limiter.get_remaining_attempts("example.com") == 0
def test_get_reset_time_no_attempts(self):
"""Test getting reset time with no attempts."""
limiter = RateLimiter()
assert limiter.get_reset_time("example.com") == 0
def test_get_reset_time_with_attempts(self):
"""Test getting reset time with attempts."""
limiter = RateLimiter(window_hours=1)
with patch('time.time', return_value=1000):
limiter.record_attempt("example.com")
reset_time = limiter.get_reset_time("example.com")
assert reset_time == 1000 + 3600
def test_get_reset_time_multiple_attempts(self):
"""Test getting reset time with multiple attempts (returns oldest)."""
limiter = RateLimiter(window_hours=1)
with patch('time.time', return_value=1000):
limiter.record_attempt("example.com")
with patch('time.time', return_value=2000):
limiter.record_attempt("example.com")
# Reset time should be based on oldest attempt
reset_time = limiter.get_reset_time("example.com")
assert reset_time == 1000 + 3600