Home / Packages / @ivangrynenko/python-ssrf

@ivangrynenko/python-ssrf

Detect and prevent Server-Side Request Forgery (SSRF) vulnerabilities in Python applications as defined in OWASP Top 10:2021-A10 glob

prpm install @ivangrynenko/python-ssrf
0 total downloads

📄 Full Prompt Content

---
description: Detect and prevent Server-Side Request Forgery (SSRF) vulnerabilities in Python applications as defined in OWASP Top 10:2021-A10
globs: *.py
alwaysApply: false
---
 # Python Server-Side Request Forgery (SSRF) Standards (OWASP A10:2021)

This rule enforces security best practices to prevent Server-Side Request Forgery (SSRF) vulnerabilities in Python applications, as defined in OWASP Top 10:2021-A10.

<rule>
name: python_ssrf
description: Detect and prevent Server-Side Request Forgery (SSRF) vulnerabilities in Python applications as defined in OWASP Top 10:2021-A10
filters:
  - type: file_extension
    pattern: "\\.py$"
  - type: file_path
    pattern: ".*"

actions:
  - type: enforce
    conditions:
      # Pattern 1: Detect direct use of requests library with user input
      - pattern: "requests\\.(get|post|put|delete|head|options|patch)\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
        message: "Potential SSRF vulnerability detected. User-controlled input is being used directly in HTTP requests. Implement URL validation and allowlisting."
        
      # Pattern 2: Detect urllib usage with user input
      - pattern: "urllib\\.(request|parse)\\.\\w+\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
        message: "Potential SSRF vulnerability detected. User-controlled input is being used directly in urllib functions. Implement URL validation and allowlisting."
        
      # Pattern 3: Detect http.client usage with user input
      - pattern: "http\\.client\\.\\w+\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
        message: "Potential SSRF vulnerability detected. User-controlled input is being used directly in http.client functions. Implement URL validation and allowlisting."
        
      # Pattern 4: Detect aiohttp usage with user input
      - pattern: "aiohttp\\.\\w+\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
        message: "Potential SSRF vulnerability detected. User-controlled input is being used directly in aiohttp functions. Implement URL validation and allowlisting."
        
      # Pattern 5: Detect httpx usage with user input
      - pattern: "httpx\\.\\w+\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
        message: "Potential SSRF vulnerability detected. User-controlled input is being used directly in httpx functions. Implement URL validation and allowlisting."
        
      # Pattern 6: Detect pycurl usage with user input
      - pattern: "pycurl\\.\\w+\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
        message: "Potential SSRF vulnerability detected. User-controlled input is being used directly in pycurl functions. Implement URL validation and allowlisting."
        
      # Pattern 7: Detect subprocess calls with user input that might lead to SSRF
      - pattern: "subprocess\\.(Popen|call|run|check_output|check_call)\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
        message: "Potential SSRF vulnerability detected. User-controlled input is being used in subprocess calls, which might lead to SSRF. Validate and sanitize input."
        
      # Pattern 8: Detect os.system calls with user input that might lead to SSRF
      - pattern: "os\\.(system|popen|spawn)\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
        message: "Potential SSRF vulnerability detected. User-controlled input is being used in OS commands, which might lead to SSRF. Validate and sanitize input."
        
      # Pattern 9: Detect URL construction with user input
      - pattern: "(f|r)[\"\']https?://[^\"\']*?\\{[^\\}]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
        message: "Potential SSRF vulnerability detected. User-controlled input is being used in URL construction. Implement URL validation and allowlisting."
        
      # Pattern 10: Detect URL joining with user input
      - pattern: "urljoin\\([^,]+,[^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
        message: "Potential SSRF vulnerability detected. User-controlled input is being used in URL joining. Implement URL validation and allowlisting."
        
      # Pattern 11: Detect file opening with user input (potential local SSRF)
      - pattern: "open\\([^,]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
        message: "Potential local SSRF vulnerability detected. User-controlled input is being used in file operations. Validate file paths and use path sanitization."
        
      # Pattern 12: Detect XML/YAML parsing with user input (potential XXE leading to SSRF)
      - pattern: "(ET\\.fromstring|ET\\.parse|ET\\.XML|minidom\\.parse|parseString|yaml\\.load)\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
        message: "Potential XXE vulnerability that could lead to SSRF detected. User-controlled input is being used in XML/YAML parsing. Use safe parsing methods and disable external entities."
        
      # Pattern 13: Detect socket connections with user input
      - pattern: "socket\\.(socket|create_connection)\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
        message: "Potential SSRF vulnerability detected. User-controlled input is being used in socket connections. Implement host/port validation and allowlisting."
        
      # Pattern 14: Detect FTP connections with user input
      - pattern: "ftplib\\.FTP\\([^)]*?\\b(request\\.\\w+|params\\[\\'[^\\']+\\'\\]|data\\[\\'[^\\']+\\'\\]|json\\[\\'[^\\']+\\'\\]|args\\.get|form\\.get)"
        message: "Potential SSRF vulnerability detected. User-controlled input is being used in FTP connections. Implement host validation and allowlisting."
        
      # Pattern 15: Detect missing URL validation before making requests
      - pattern: "def\\s+\\w+\\([^)]*?\\):[^\\n]*?\\n(?:[^\\n]*?\\n)*?[^\\n]*?requests\\.(get|post|put|delete|head|options|patch)\\([^)]*?url\\s*=\\s*[^\\n]*?(?!.*?validate_url)"
        message: "Missing URL validation before making HTTP requests. Implement URL validation with allowlisting to prevent SSRF attacks."

  - type: suggest
    message: |
      **Python Server-Side Request Forgery (SSRF) Prevention Best Practices:**
      
      1. **URL Validation and Allowlisting:**
         - Implement strict URL validation
         - Use allowlists for domains, IP ranges, and protocols
         - Example implementation:
           ```python
           import re
           import socket
           import ipaddress
           from urllib.parse import urlparse
           
           def is_valid_url(url, allowed_domains=None, allowed_protocols=None, block_private_ips=True):
               """
               Validate URLs against allowlists and block private IPs.
               
               Args:
                   url (str): The URL to validate
                   allowed_domains (list): List of allowed domains
                   allowed_protocols (list): List of allowed protocols
                   block_private_ips (bool): Whether to block private IPs
                   
               Returns:
                   bool: True if URL is valid according to rules
               """
               if not url:
                   return False
                   
               # Default allowlists if none provided
               if allowed_domains is None:
                   allowed_domains = ["example.com", "api.example.com"]
               if allowed_protocols is None:
                   allowed_protocols = ["https"]
                   
               try:
                   # Parse URL
                   parsed_url = urlparse(url)
                   
                   # Check protocol
                   if parsed_url.scheme not in allowed_protocols:
                       return False
                       
                   # Check domain against allowlist
                   if parsed_url.netloc not in allowed_domains:
                       return False
                       
                   # Block private IPs if enabled
                   if block_private_ips:
                       hostname = parsed_url.netloc.split(':')[0]
                       try:
                           ip_addresses = socket.getaddrinfo(
                               hostname, None, socket.AF_INET, socket.SOCK_STREAM
                           )
                           for family, socktype, proto, canonname, sockaddr in ip_addresses:
                               ip = sockaddr[0]
                               ip_obj = ipaddress.ip_address(ip)
                               if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_reserved:
                                   return False
                       except socket.gaierror:
                           # DNS resolution failed
                           return False
                           
                   return True
               except Exception:
                   return False
           
           # Usage example
           def fetch_resource(resource_url):
               if not is_valid_url(resource_url):
                   raise ValueError("Invalid or disallowed URL")
                   
               # Proceed with request
               import requests
               return requests.get(resource_url)
           ```
      
      2. **Implement Network-Level Controls:**
         - Use network-level allowlists
         - Configure firewalls to block outbound requests to internal resources
         - Example with proxy configuration:
           ```python
           import requests
           
           def safe_request(url):
               # Configure proxy that implements URL filtering
               proxies = {
                   'http': 'http://ssrf-protecting-proxy:8080',
                   'https': 'http://ssrf-protecting-proxy:8080'
               }
               
               # Set timeout to prevent long-running requests
               timeout = 10
               
               try:
                   return requests.get(url, proxies=proxies, timeout=timeout)
               except requests.exceptions.RequestException as e:
                   # Log the error and handle gracefully
                   logging.error(f"Request failed: {e}")
                   return None
           ```
      
      3. **Use Safe Libraries and Wrappers:**
         - Create wrapper functions for HTTP requests
         - Implement consistent security controls
         - Example wrapper:
           ```python
           import requests
           from urllib.parse import urlparse
           
           class SafeRequestHandler:
               def __init__(self, allowed_domains=None, allowed_protocols=None):
                   self.allowed_domains = allowed_domains or ["api.example.com"]
                   self.allowed_protocols = allowed_protocols or ["https"]
                   
               def validate_url(self, url):
                   parsed_url = urlparse(url)
                   
                   # Validate protocol
                   if parsed_url.scheme not in self.allowed_protocols:
                       return False
                       
                   # Validate domain
                   if parsed_url.netloc not in self.allowed_domains:
                       return False
                       
                   return True
                   
               def request(self, method, url, **kwargs):
                   if not self.validate_url(url):
                       raise ValueError(f"URL validation failed for: {url}")
                       
                   # Set sensible defaults
                   kwargs.setdefault('timeout', 10)
                   
                   # Make the request
                   return requests.request(method, url, **kwargs)
                   
               def get(self, url, **kwargs):
                   return self.request('GET', url, **kwargs)
                   
               def post(self, url, **kwargs):
                   return self.request('POST', url, **kwargs)
           
           # Usage
           safe_requests = SafeRequestHandler()
           response = safe_requests.get('https://api.example.com/data')
           ```
      
      4. **Disable Redirects or Implement Redirect Validation:**
         - Disable automatic redirects
         - Validate each redirect location
         - Example:
           ```python
           import requests
           
           def safe_request_with_redirect_validation(url, allowed_domains):
               # Disable automatic redirects
               session = requests.Session()
               response = session.get(url, allow_redirects=False)
               
               # Handle redirects manually with validation
               redirect_count = 0
               max_redirects = 5
               
               while 300 <= response.status_code < 400 and redirect_count < max_redirects:
                   redirect_url = response.headers.get('Location')
                   
                   # Validate redirect URL
                   parsed_url = urlparse(redirect_url)
                   if parsed_url.netloc not in allowed_domains:
                       raise ValueError(f"Redirect to disallowed domain: {parsed_url.netloc}")
                       
                   # Follow the redirect with validation
                   redirect_count += 1
                   response = session.get(redirect_url, allow_redirects=False)
                   
               return response
           ```
      
      5. **Use Metadata Instead of Direct URLs:**
         - Use resource identifiers instead of URLs
         - Resolve identifiers server-side
         - Example:
           ```python
           def fetch_resource_by_id(resource_id):
               # Map of allowed resources
               resource_map = {
                   "user_profile": "https://api.example.com/profiles/",
                   "product_data": "https://api.example.com/products/",
                   "weather_info": "https://api.weather.com/forecast/"
               }
               
               # Check if resource_id is in allowed list
               if resource_id not in resource_map:
                   raise ValueError(f"Unknown resource ID: {resource_id}")
                   
               # Construct URL from safe base + ID
               base_url = resource_map[resource_id]
               return requests.get(base_url)
           ```
      
      6. **Implement Response Handling Controls:**
         - Sanitize and validate responses
         - Prevent response data from being used in further requests
         - Example:
           ```python
           def safe_request_with_response_validation(url):
               response = requests.get(url)
               
               # Check response size
               if len(response.content) > MAX_RESPONSE_SIZE:
                   raise ValueError("Response too large")
                   
               # Validate content type
               content_type = response.headers.get('Content-Type', '')
               if not content_type.startswith('application/json'):
                   raise ValueError(f"Unexpected content type: {content_type}")
                   
               # Parse and validate JSON structure
               try:
                   data = response.json()
                   # Validate expected structure
                   if 'result' not in data:
                       raise ValueError("Invalid response structure")
                   return data
               except ValueError:
                   raise ValueError("Invalid JSON response")
           ```
      
      7. **Use Timeouts and Circuit Breakers:**
         - Set appropriate timeouts
         - Implement circuit breakers for failing services
         - Example:
           ```python
           import requests
           from requests.exceptions import Timeout, ConnectionError
           
           def request_with_circuit_breaker(url, max_retries=3, timeout=5):
               retries = 0
               while retries < max_retries:
                   try:
                       return requests.get(url, timeout=timeout)
                   except (Timeout, ConnectionError) as e:
                       retries += 1
                       if retries >= max_retries:
                           # Circuit is now open
                           raise ValueError(f"Circuit breaker open for {url}: {str(e)}")
                       # Exponential backoff
                       time.sleep(2 ** retries)
           ```
      
      8. **Implement Proper Logging and Monitoring:**
         - Log all outbound requests
         - Monitor for unusual patterns
         - Example:
           ```python
           import logging
           import requests
           
           def logged_request(url, **kwargs):
               # Log the outbound request
               logging.info(f"Outbound request to: {url}")
               
               try:
                   response = requests.get(url, **kwargs)
                   # Log the response
                   logging.info(f"Response from {url}: status={response.status_code}")
                   return response
               except Exception as e:
                   # Log the error
                   logging.error(f"Request to {url} failed: {str(e)}")
                   raise
           ```
      
      9. **Use DNS Resolution Controls:**
         - Implement DNS resolution controls
         - Block internal DNS names
         - Example:
           ```python
           import socket
           import ipaddress
           
           def is_safe_host(hostname):
               try:
                   # Resolve hostname to IP
                   ip_addresses = socket.getaddrinfo(
                       hostname, None, socket.AF_INET, socket.SOCK_STREAM
                   )
                   
                   for family, socktype, proto, canonname, sockaddr in ip_addresses:
                       ip = sockaddr[0]
                       ip_obj = ipaddress.ip_address(ip)
                       
                       # Check if IP is private/internal
                       if (ip_obj.is_private or ip_obj.is_loopback or 
                           ip_obj.is_link_local or ip_obj.is_reserved):
                           return False
                           
                   return True
               except (socket.gaierror, ValueError):
                   return False
                   
           def safe_request_with_dns_check(url):
               parsed_url = urlparse(url)
               hostname = parsed_url.netloc.split(':')[0]
               
               if not is_safe_host(hostname):
                   raise ValueError(f"Hostname resolves to unsafe IP: {hostname}")
                   
               return requests.get(url)
           ```
      
      10. **Implement Defense in Depth:**
          - Combine multiple protection mechanisms
          - Don't rely on a single control
          - Example comprehensive approach:
            ```python
            class SSRFProtectedClient:
                def __init__(self):
                    self.allowed_domains = ["api.example.com", "cdn.example.com"]
                    self.allowed_protocols = ["https"]
                    self.max_redirects = 3
                    self.timeout = 10
                    
                def is_safe_url(self, url):
                    # URL validation
                    parsed_url = urlparse(url)
                    
                    # Protocol check
                    if parsed_url.scheme not in self.allowed_protocols:
                        return False
                        
                    # Domain check
                    if parsed_url.netloc not in self.allowed_domains:
                        return False
                        
                    # DNS resolution check
                    hostname = parsed_url.netloc.split(':')[0]
                    try:
                        ip_addresses = socket.getaddrinfo(
                            hostname, None, socket.AF_INET, socket.SOCK_STREAM
                        )
                        for family, socktype, proto, canonname, sockaddr in ip_addresses:
                            ip = sockaddr[0]
                            ip_obj = ipaddress.ip_address(ip)
                            if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_reserved:
                                return False
                    except socket.gaierror:
                        return False
                        
                    return True
                    
                def request(self, method, url, **kwargs):
                    # Validate URL
                    if not self.is_safe_url(url):
                        raise ValueError(f"URL failed security validation: {url}")
                        
                    # Set sensible defaults
                    kwargs.setdefault('timeout', self.timeout)
                    kwargs.setdefault('allow_redirects', False)
                    
                    # Make initial request
                    session = requests.Session()
                    response = session.request(method, url, **kwargs)
                    
                    # Handle redirects manually with validation
                    redirect_count = 0
                    
                    while 300 <= response.status_code < 400 and redirect_count < self.max_redirects:
                        redirect_url = response.headers.get('Location')
                        
                        # Validate redirect URL
                        if not self.is_safe_url(redirect_url):
                            raise ValueError(f"Redirect URL failed security validation: {redirect_url}")
                            
                        # Follow the redirect with validation
                        redirect_count += 1
                        response = session.request(method, redirect_url, **kwargs)
                        
                    # Log the request
                    logging.info(f"{method} request to {url} completed with status {response.status_code}")
                    
                    return response
                    
                def get(self, url, **kwargs):
                    return self.request('GET', url, **kwargs)
                    
                def post(self, url, **kwargs):
                    return self.request('POST', url, **kwargs)
            
            # Usage
            client = SSRFProtectedClient()
            response = client.get('https://api.example.com/data')
            ```

  - type: validate
    conditions:
      # Check 1: URL validation implementation
      - pattern: "def\\s+is_valid_url|def\\s+validate_url"
        message: "URL validation function is implemented."
      
      # Check 2: Allowlist implementation
      - pattern: "allowed_domains|allowed_urls|ALLOWED_HOSTS|whitelist"
        message: "URL allowlisting is implemented."
      
      # Check 3: Safe request wrapper
      - pattern: "class\\s+\\w+Request|def\\s+safe_request"
        message: "Safe request wrapper is implemented."
      
      # Check 4: IP address validation
      - pattern: "ipaddress\\.ip_address|is_private|is_loopback|is_reserved"
        message: "IP address validation is implemented to prevent access to internal resources."

metadata:
  priority: high
  version: 1.0
  tags:
    - security
    - python
    - ssrf
    - owasp
    - language:python
    - framework:django
    - framework:flask
    - framework:fastapi
    - category:security
    - subcategory:ssrf
    - standard:owasp-top10
    - risk:a10-server-side-request-forgery
  references:
    - "https://owasp.org/Top10/A10_2021-Server-Side_Request_Forgery_%28SSRF%29/"
    - "https://cheatsheetseries.owasp.org/cheatsheets/Server_Side_Request_Forgery_Prevention_Cheat_Sheet.html"
    - "https://portswigger.net/web-security/ssrf"
    - "https://docs.python.org/3/library/urllib.request.html"
    - "https://docs.python-requests.org/en/latest/user/advanced/#ssl-cert-verification"
    - "https://docs.python.org/3/library/ipaddress.html"
</rule>

💡 Suggested Test Inputs

Loading suggested inputs...

🎯 Community Test Results

Loading results...

📦 Package Info

Format
cursor
Type
rule
Category
languages
License
MIT