@ivangrynenko/python-ssrf
Detect and prevent Server-Side Request Forgery (SSRF) vulnerabilities in Python applications as defined in OWASP Top 10:2021-A10 glob
prpm install @ivangrynenko/python-ssrf0 total downloads
📄 Full Prompt Content
---
description: Detect and prevent Server-Side Request Forgery (SSRF) vulnerabilities in Python applications as defined in OWASP Top 10:2021-A10
globs: *.py
alwaysApply: false
---
# Python Server-Side Request Forgery (SSRF) Standards (OWASP A10:2021)
This rule enforces security best practices to prevent Server-Side Request Forgery (SSRF) vulnerabilities in Python applications, as defined in OWASP Top 10:2021-A10.
<rule>
name: python_ssrf
description: Detect and prevent Server-Side Request Forgery (SSRF) vulnerabilities in Python applications as defined in OWASP Top 10:2021-A10
filters:
- type: file_extension
pattern: "\\.py$"
- type: file_path
pattern: ".*"
actions:
- type: enforce
conditions:
# Pattern 1: Detect direct use of requests library with user input
- pattern: "requests\\.(get|post|put|delete|head|options|patch)\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used directly in HTTP requests. Implement URL validation and allowlisting."
# Pattern 2: Detect urllib usage with user input
- pattern: "urllib\\.(request|parse)\\.\\w+\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used directly in urllib functions. Implement URL validation and allowlisting."
# Pattern 3: Detect http.client usage with user input
- pattern: "http\\.client\\.\\w+\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used directly in http.client functions. Implement URL validation and allowlisting."
# Pattern 4: Detect aiohttp usage with user input
- pattern: "aiohttp\\.\\w+\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used directly in aiohttp functions. Implement URL validation and allowlisting."
# Pattern 5: Detect httpx usage with user input
- pattern: "httpx\\.\\w+\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used directly in httpx functions. Implement URL validation and allowlisting."
# Pattern 6: Detect pycurl usage with user input
- pattern: "pycurl\\.\\w+\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used directly in pycurl functions. Implement URL validation and allowlisting."
# Pattern 7: Detect subprocess calls with user input that might lead to SSRF
- pattern: "subprocess\\.(Popen|call|run|check_output|check_call)\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used in subprocess calls, which might lead to SSRF. Validate and sanitize input."
# Pattern 8: Detect os.system calls with user input that might lead to SSRF
- pattern: "os\\.(system|popen|spawn)\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used in OS commands, which might lead to SSRF. Validate and sanitize input."
# Pattern 9: Detect URL construction with user input
- pattern: "(f|r)[\"\']https?://[^\"\']*?\\{[^\\}]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used in URL construction. Implement URL validation and allowlisting."
# Pattern 10: Detect URL joining with user input
- pattern: "urljoin\\([^,]+,[^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used in URL joining. Implement URL validation and allowlisting."
# Pattern 11: Detect file opening with user input (potential local SSRF)
- pattern: "open\\([^,]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential local SSRF vulnerability detected. User-controlled input is being used in file operations. Validate file paths and use path sanitization."
# Pattern 12: Detect XML/YAML parsing with user input (potential XXE leading to SSRF)
- pattern: "(ET\\.fromstring|ET\\.parse|ET\\.XML|minidom\\.parse|parseString|yaml\\.load)\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential XXE vulnerability that could lead to SSRF detected. User-controlled input is being used in XML/YAML parsing. Use safe parsing methods and disable external entities."
# Pattern 13: Detect socket connections with user input
- pattern: "socket\\.(socket|create_connection)\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used in socket connections. Implement host/port validation and allowlisting."
# Pattern 14: Detect FTP connections with user input
- pattern: "ftplib\\.FTP\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
message: "Potential SSRF vulnerability detected. User-controlled input is being used in FTP connections. Implement host validation and allowlisting."
# Pattern 15: Detect missing URL validation before making requests
- pattern: "def\\s+\\w+\\([^)]*?\\):[^\\n]*?\\n(?:[^\\n]*?\\n)*?[^\\n]*?requests\\.(get|post|put|delete|head|options|patch)\\([^)]*?url\\s*=\\s*[^\\n]*?(?!.*?validate_url)"
message: "Missing URL validation before making HTTP requests. Implement URL validation with allowlisting to prevent SSRF attacks."
- type: suggest
message: |
**Python Server-Side Request Forgery (SSRF) Prevention Best Practices:**
1. **URL Validation and Allowlisting:**
- Implement strict URL validation
- Use allowlists for domains, IP ranges, and protocols
- Example implementation:
```python
import re
import socket
import ipaddress
from urllib.parse import urlparse
def is_valid_url(url, allowed_domains=None, allowed_protocols=None, block_private_ips=True):
"""
Validate URLs against allowlists and block private IPs.
Args:
url (str): The URL to validate
allowed_domains (list): List of allowed domains
allowed_protocols (list): List of allowed protocols
block_private_ips (bool): Whether to block private IPs
Returns:
bool: True if URL is valid according to rules
"""
if not url:
return False
# Default allowlists if none provided
if allowed_domains is None:
allowed_domains = ["example.com", "api.example.com"]
if allowed_protocols is None:
allowed_protocols = ["https"]
try:
# Parse URL
parsed_url = urlparse(url)
# Check protocol
if parsed_url.scheme not in allowed_protocols:
return False
# Check domain against allowlist
if parsed_url.netloc not in allowed_domains:
return False
# Block private IPs if enabled
if block_private_ips:
hostname = parsed_url.netloc.split(':')[0]
try:
ip_addresses = socket.getaddrinfo(
hostname, None, socket.AF_INET, socket.SOCK_STREAM
)
for family, socktype, proto, canonname, sockaddr in ip_addresses:
ip = sockaddr[0]
ip_obj = ipaddress.ip_address(ip)
if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_reserved:
return False
except socket.gaierror:
# DNS resolution failed
return False
return True
except Exception:
return False
# Usage example
def fetch_resource(resource_url):
if not is_valid_url(resource_url):
raise ValueError("Invalid or disallowed URL")
# Proceed with request
import requests
return requests.get(resource_url)
```
2. **Implement Network-Level Controls:**
- Use network-level allowlists
- Configure firewalls to block outbound requests to internal resources
- Example with proxy configuration:
```python
import requests
def safe_request(url):
# Configure proxy that implements URL filtering
proxies = {
'http': 'http://ssrf-protecting-proxy:8080',
'https': 'http://ssrf-protecting-proxy:8080'
}
# Set timeout to prevent long-running requests
timeout = 10
try:
return requests.get(url, proxies=proxies, timeout=timeout)
except requests.exceptions.RequestException as e:
# Log the error and handle gracefully
logging.error(f"Request failed: {e}")
return None
```
3. **Use Safe Libraries and Wrappers:**
- Create wrapper functions for HTTP requests
- Implement consistent security controls
- Example wrapper:
```python
import requests
from urllib.parse import urlparse
class SafeRequestHandler:
def __init__(self, allowed_domains=None, allowed_protocols=None):
self.allowed_domains = allowed_domains or ["api.example.com"]
self.allowed_protocols = allowed_protocols or ["https"]
def validate_url(self, url):
parsed_url = urlparse(url)
# Validate protocol
if parsed_url.scheme not in self.allowed_protocols:
return False
# Validate domain
if parsed_url.netloc not in self.allowed_domains:
return False
return True
def request(self, method, url, **kwargs):
if not self.validate_url(url):
raise ValueError(f"URL validation failed for: {url}")
# Set sensible defaults
kwargs.setdefault('timeout', 10)
# Make the request
return requests.request(method, url, **kwargs)
def get(self, url, **kwargs):
return self.request('GET', url, **kwargs)
def post(self, url, **kwargs):
return self.request('POST', url, **kwargs)
# Usage
safe_requests = SafeRequestHandler()
response = safe_requests.get('https://api.example.com/data')
```
4. **Disable Redirects or Implement Redirect Validation:**
- Disable automatic redirects
- Validate each redirect location
- Example:
```python
import requests
def safe_request_with_redirect_validation(url, allowed_domains):
# Disable automatic redirects
session = requests.Session()
response = session.get(url, allow_redirects=False)
# Handle redirects manually with validation
redirect_count = 0
max_redirects = 5
while 300 <= response.status_code < 400 and redirect_count < max_redirects:
redirect_url = response.headers.get('Location')
# Validate redirect URL
parsed_url = urlparse(redirect_url)
if parsed_url.netloc not in allowed_domains:
raise ValueError(f"Redirect to disallowed domain: {parsed_url.netloc}")
# Follow the redirect with validation
redirect_count += 1
response = session.get(redirect_url, allow_redirects=False)
return response
```
5. **Use Metadata Instead of Direct URLs:**
- Use resource identifiers instead of URLs
- Resolve identifiers server-side
- Example:
```python
def fetch_resource_by_id(resource_id):
# Map of allowed resources
resource_map = {
"user_profile": "https://api.example.com/profiles/",
"product_data": "https://api.example.com/products/",
"weather_info": "https://api.weather.com/forecast/"
}
# Check if resource_id is in allowed list
if resource_id not in resource_map:
raise ValueError(f"Unknown resource ID: {resource_id}")
# Construct URL from safe base + ID
base_url = resource_map[resource_id]
return requests.get(base_url)
```
6. **Implement Response Handling Controls:**
- Sanitize and validate responses
- Prevent response data from being used in further requests
- Example:
```python
def safe_request_with_response_validation(url):
response = requests.get(url)
# Check response size
if len(response.content) > MAX_RESPONSE_SIZE:
raise ValueError("Response too large")
# Validate content type
content_type = response.headers.get('Content-Type', '')
if not content_type.startswith('application/json'):
raise ValueError(f"Unexpected content type: {content_type}")
# Parse and validate JSON structure
try:
data = response.json()
# Validate expected structure
if 'result' not in data:
raise ValueError("Invalid response structure")
return data
except ValueError:
raise ValueError("Invalid JSON response")
```
7. **Use Timeouts and Circuit Breakers:**
- Set appropriate timeouts
- Implement circuit breakers for failing services
- Example:
```python
import requests
from requests.exceptions import Timeout, ConnectionError
def request_with_circuit_breaker(url, max_retries=3, timeout=5):
retries = 0
while retries < max_retries:
try:
return requests.get(url, timeout=timeout)
except (Timeout, ConnectionError) as e:
retries += 1
if retries >= max_retries:
# Circuit is now open
raise ValueError(f"Circuit breaker open for {url}: {str(e)}")
# Exponential backoff
time.sleep(2 ** retries)
```
8. **Implement Proper Logging and Monitoring:**
- Log all outbound requests
- Monitor for unusual patterns
- Example:
```python
import logging
import requests
def logged_request(url, **kwargs):
# Log the outbound request
logging.info(f"Outbound request to: {url}")
try:
response = requests.get(url, **kwargs)
# Log the response
logging.info(f"Response from {url}: status={response.status_code}")
return response
except Exception as e:
# Log the error
logging.error(f"Request to {url} failed: {str(e)}")
raise
```
9. **Use DNS Resolution Controls:**
- Implement DNS resolution controls
- Block internal DNS names
- Example:
```python
import socket
import ipaddress
def is_safe_host(hostname):
try:
# Resolve hostname to IP
ip_addresses = socket.getaddrinfo(
hostname, None, socket.AF_INET, socket.SOCK_STREAM
)
for family, socktype, proto, canonname, sockaddr in ip_addresses:
ip = sockaddr[0]
ip_obj = ipaddress.ip_address(ip)
# Check if IP is private/internal
if (ip_obj.is_private or ip_obj.is_loopback or
ip_obj.is_link_local or ip_obj.is_reserved):
return False
return True
except (socket.gaierror, ValueError):
return False
def safe_request_with_dns_check(url):
parsed_url = urlparse(url)
hostname = parsed_url.netloc.split(':')[0]
if not is_safe_host(hostname):
raise ValueError(f"Hostname resolves to unsafe IP: {hostname}")
return requests.get(url)
```
10. **Implement Defense in Depth:**
- Combine multiple protection mechanisms
- Don't rely on a single control
- Example comprehensive approach:
```python
class SSRFProtectedClient:
def __init__(self):
self.allowed_domains = ["api.example.com", "cdn.example.com"]
self.allowed_protocols = ["https"]
self.max_redirects = 3
self.timeout = 10
def is_safe_url(self, url):
# URL validation
parsed_url = urlparse(url)
# Protocol check
if parsed_url.scheme not in self.allowed_protocols:
return False
# Domain check
if parsed_url.netloc not in self.allowed_domains:
return False
# DNS resolution check
hostname = parsed_url.netloc.split(':')[0]
try:
ip_addresses = socket.getaddrinfo(
hostname, None, socket.AF_INET, socket.SOCK_STREAM
)
for family, socktype, proto, canonname, sockaddr in ip_addresses:
ip = sockaddr[0]
ip_obj = ipaddress.ip_address(ip)
if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_reserved:
return False
except socket.gaierror:
return False
return True
def request(self, method, url, **kwargs):
# Validate URL
if not self.is_safe_url(url):
raise ValueError(f"URL failed security validation: {url}")
# Set sensible defaults
kwargs.setdefault('timeout', self.timeout)
kwargs.setdefault('allow_redirects', False)
# Make initial request
session = requests.Session()
response = session.request(method, url, **kwargs)
# Handle redirects manually with validation
redirect_count = 0
while 300 <= response.status_code < 400 and redirect_count < self.max_redirects:
redirect_url = response.headers.get('Location')
# Validate redirect URL
if not self.is_safe_url(redirect_url):
raise ValueError(f"Redirect URL failed security validation: {redirect_url}")
# Follow the redirect with validation
redirect_count += 1
response = session.request(method, redirect_url, **kwargs)
# Log the request
logging.info(f"{method} request to {url} completed with status {response.status_code}")
return response
def get(self, url, **kwargs):
return self.request('GET', url, **kwargs)
def post(self, url, **kwargs):
return self.request('POST', url, **kwargs)
# Usage
client = SSRFProtectedClient()
response = client.get('https://api.example.com/data')
```
- type: validate
conditions:
# Check 1: URL validation implementation
- pattern: "def\\s+is_valid_url|def\\s+validate_url"
message: "URL validation function is implemented."
# Check 2: Allowlist implementation
- pattern: "allowed_domains|allowed_urls|ALLOWED_HOSTS|whitelist"
message: "URL allowlisting is implemented."
# Check 3: Safe request wrapper
- pattern: "class\\s+\\w+Request|def\\s+safe_request"
message: "Safe request wrapper is implemented."
# Check 4: IP address validation
- pattern: "ipaddress\\.ip_address|is_private|is_loopback|is_reserved"
message: "IP address validation is implemented to prevent access to internal resources."
metadata:
priority: high
version: 1.0
tags:
- security
- python
- ssrf
- owasp
- language:python
- framework:django
- framework:flask
- framework:fastapi
- category:security
- subcategory:ssrf
- standard:owasp-top10
- risk:a10-server-side-request-forgery
references:
- "https://owasp.org/Top10/A10_2021-Server-Side_Request_Forgery_%28SSRF%29/"
- "https://cheatsheetseries.owasp.org/cheatsheets/Server_Side_Request_Forgery_Prevention_Cheat_Sheet.html"
- "https://portswigger.net/web-security/ssrf"
- "https://docs.python.org/3/library/urllib.request.html"
- "https://docs.python-requests.org/en/latest/user/advanced/#ssl-cert-verification"
- "https://docs.python.org/3/library/ipaddress.html"
</rule>💡 Suggested Test Inputs
Loading suggested inputs...
🎯 Community Test Results
Loading results...
📦 Package Info
- Format
- cursor
- Type
- rule
- Category
- languages
- License
- MIT