This guide demonstrates how to crawl your entire website and validate security headers implementation across all discovered pages, ensuring consistent security controls throughout your site.
Use Cases
- Validating security headers across all website pages
- Ensuring consistent security implementation after deployments
- Detecting missing security headers on specific page types
- Monitoring security header configuration changes
- Compliance checking for security standards
Simple Security Headers Crawler
from locust import HttpUser, task, between
import re
import time
from urllib.parse import urlparse
from collections import deque
# CONFIGURATION - Edit these settings for your requirements
REQUIRED_HEADERS = {
'Strict-Transport-Security': True, # HSTS - highly recommended
'X-Frame-Options': True, # Clickjacking protection
'X-Content-Type-Options': True, # MIME type sniffing protection
'Referrer-Policy': True, # Referrer information control
'Content-Security-Policy': False, # CSP - optional by default (can be complex)
'X-XSS-Protection': False, # Deprecated but still useful
}
# Optional: Validate specific header values
HEADER_VALUES = {
'X-Content-Type-Options': ['nosniff'],
'X-Frame-Options': ['DENY', 'SAMEORIGIN'],
'Referrer-Policy': ['strict-origin-when-cross-origin', 'same-origin', 'no-referrer', 'strict-origin']
}
class SecurityHeadersCrawler(HttpUser):
wait_time = between(1, 2)
def on_start(self):
"""Initialize security headers crawling"""
self.visited_pages = set()
self.pages_to_check = deque(['/'])
self.security_issues = []
self.pages_checked = 0
self.base_domain = None
print("Starting security headers validation...")
print(f"Required headers: {[h for h, required in REQUIRED_HEADERS.items() if required]}")
@task(5)
def crawl_and_validate_headers(self):
"""Main crawling task to validate security headers"""
if not self.pages_to_check:
return
current_page = self.pages_to_check.popleft()
if current_page in self.visited_pages:
return
self.visited_pages.add(current_page)
self.pages_checked += 1
with self.client.get(current_page, name=f"SECURITY: {current_page}", catch_response=True) as response:
if response.status_code == 200:
# Set base domain on first successful request
if not self.base_domain:
self.base_domain = urlparse(self.client.base_url).netloc
# Validate security headers
missing_headers, invalid_headers = self._validate_security_headers(current_page, response.headers)
# Find more pages to crawl
self._find_internal_pages(response.text)
# Report results
if missing_headers or invalid_headers:
issues = missing_headers + invalid_headers
failure_msg = f"❌ Security issues: {', '.join(issues)}"
response.failure(failure_msg)
else:
response.success()
else:
response.failure(f"Could not access page: HTTP {response.status_code}")
def _validate_security_headers(self, page_url, headers):
"""Validate security headers for a page"""
missing_headers = []
invalid_headers = []
for header_name, is_required in REQUIRED_HEADERS.items():
if is_required:
if header_name not in headers:
missing_headers.append(f"Missing {header_name}")
self._log_security_issue(page_url, 'HIGH', f'Missing required header: {header_name}')
else:
# Check header value if validation rules exist
if header_name in HEADER_VALUES:
header_value = headers[header_name]
valid_values = HEADER_VALUES[header_name]
# For some headers, check if value contains any of the valid options
if header_name == 'Referrer-Policy':
if not any(valid_val in header_value for valid_val in valid_values):
invalid_headers.append(f"Invalid {header_name}")
self._log_security_issue(page_url, 'MEDIUM', f'Invalid {header_name}: {header_value}')
else:
if header_value not in valid_values:
invalid_headers.append(f"Invalid {header_name}")
self._log_security_issue(page_url, 'MEDIUM', f'Invalid {header_name}: {header_value}')
elif header_name in headers:
# Optional header is present - validate it
if header_name in HEADER_VALUES:
header_value = headers[header_name]
valid_values = HEADER_VALUES[header_name]
if header_name == 'Referrer-Policy':
if not any(valid_val in header_value for valid_val in valid_values):
invalid_headers.append(f"Invalid {header_name}")
self._log_security_issue(page_url, 'MEDIUM', f'Invalid optional {header_name}: {header_value}')
else:
if header_value not in valid_values:
invalid_headers.append(f"Invalid {header_name}")
self._log_security_issue(page_url, 'MEDIUM', f'Invalid optional {header_name}: {header_value}')
return missing_headers, invalid_headers
def _find_internal_pages(self, html_content):
"""Find internal pages from current page links"""
if len(self.pages_to_check) > 50: # Limit crawling depth
return
# Find internal links
links = re.findall(r'<a[^>]+href=["\']([^"\']+)["\']', html_content, re.IGNORECASE)
for link in links:
if self._is_internal_page_link(link):
normalized_link = self._normalize_link(link)
if normalized_link and normalized_link not in self.visited_pages:
if normalized_link not in self.pages_to_check:
self.pages_to_check.append(normalized_link)
def _is_internal_page_link(self, link):
"""Check if link is an internal page (not resource)"""
# Skip anchors, external protocols, and resources
if any(skip in link.lower() for skip in ['#', 'mailto:', 'tel:', 'javascript:']):
return False
# Skip common resource extensions
resource_extensions = ['.css', '.js', '.jpg', '.jpeg', '.png', '.gif', '.pdf', '.zip',
'.svg', '.ico', '.mp4', '.mp3', '.woff', '.woff2', '.ttf', '.eot']
if any(link.lower().endswith(ext) for ext in resource_extensions):
return False
# Must be internal (relative or same domain)
if link.startswith('/') or not link.startswith('http'):
return True
if link.startswith('http') and self.base_domain:
return urlparse(link).netloc == self.base_domain
return False
def _normalize_link(self, link):
"""Normalize link for checking"""
try:
if link.startswith('/'):
return link.split('#')[0] # Remove fragment
elif not link.startswith('http'):
return '/' + link.lstrip('./')
elif self.base_domain and link.startswith('http'):
parsed = urlparse(link)
if parsed.netloc == self.base_domain:
return parsed.path
return None
except:
return None
def _log_security_issue(self, page_url, severity, description):
"""Log security header issue"""
issue = {
'page': page_url,
'severity': severity,
'description': description,
'timestamp': time.time()
}
self.security_issues.append(issue)
print(f"SECURITY ISSUE [{severity}]: {description} on {page_url}")
@task(1)
def report_security_status(self):
"""Report current security validation status"""
if self.pages_checked < 3:
return
high_issues = [issue for issue in self.security_issues if issue['severity'] == 'HIGH']
medium_issues = [issue for issue in self.security_issues if issue['severity'] == 'MEDIUM']
print(f"SECURITY HEADERS STATUS: {len(self.security_issues)} total issues "
f"({len(high_issues)} high, {len(medium_issues)} medium) "
f"across {self.pages_checked} pages")
def on_stop(self):
"""Final security headers report"""
print("\n" + "="*50)
print("SECURITY HEADERS VALIDATION COMPLETE")
print("="*50)
print(f"Pages checked: {self.pages_checked}")
print(f"Total security issues: {len(self.security_issues)}")
if self.security_issues:
print(f"\nTOP SECURITY ISSUES:")
for issue in self.security_issues[:10]:
print(f"❌ [{issue['severity']}] {issue['description']}")
print(f" Page: {issue['page']}")
else:
print("✅ All pages have proper security headers!")
Comprehensive Security Headers Validation
from locust import HttpUser, task, between
import re
import time
from urllib.parse import urlparse
from collections import deque, defaultdict
# COMPREHENSIVE CONFIGURATION
SECURITY_HEADERS_CONFIG = {
'required_headers': {
'Strict-Transport-Security': {
'required': True,
'min_max_age': 31536000, # 1 year minimum
'should_include_subdomains': True
},
'X-Frame-Options': {
'required': True,
'valid_values': ['DENY', 'SAMEORIGIN']
},
'X-Content-Type-Options': {
'required': True,
'valid_values': ['nosniff']
},
'Referrer-Policy': {
'required': True,
'valid_values': ['strict-origin-when-cross-origin', 'same-origin', 'no-referrer', 'strict-origin']
},
'Content-Security-Policy': {
'required': False, # Optional but recommended
'check_unsafe_directives': True
},
'Permissions-Policy': {
'required': False, # Modern replacement for Feature-Policy
}
},
'page_type_requirements': {
'login_pages': ['X-Frame-Options', 'Strict-Transport-Security'],
'api_endpoints': ['X-Content-Type-Options', 'Strict-Transport-Security'],
'admin_pages': ['X-Frame-Options', 'Strict-Transport-Security', 'Content-Security-Policy']
}
}
class ComprehensiveSecurityValidator(HttpUser):
wait_time = between(1, 3)
def on_start(self):
"""Initialize comprehensive security validation"""
self.visited_pages = set()
self.pages_to_check = deque(['/'])
self.security_issues = []
self.page_classifications = defaultdict(list)
self.header_stats = defaultdict(int)
self.base_domain = None
print("Starting comprehensive security headers validation...")
@task(5)
def crawl_and_validate_comprehensive(self):
"""Comprehensive security headers validation"""
if not self.pages_to_check:
return
current_page = self.pages_to_check.popleft()
if current_page in self.visited_pages:
return
self.visited_pages.add(current_page)
with self.client.get(current_page, name=f"SECURITY: {current_page}", catch_response=True) as response:
if response.status_code == 200:
if not self.base_domain:
self.base_domain = urlparse(self.client.base_url).netloc
# Classify page type
page_type = self._classify_page_type(current_page, response.text)
# Validate headers based on page type and general requirements
issues = self._comprehensive_header_validation(current_page, response.headers, page_type)
# Find more pages
self._find_internal_pages(response.text)
# Report results
if issues:
failure_msg = f"❌ {len(issues)} security issues found"
response.failure(failure_msg)
else:
response.success()
else:
response.failure(f"Could not access page: HTTP {response.status_code}")
def _classify_page_type(self, page_url, html_content):
"""Classify page type for specific security requirements"""
page_type = 'general'
# Check for login/auth pages
if any(keyword in page_url.lower() for keyword in ['/login', '/signin', '/auth', '/register']):
page_type = 'login_pages'
elif any(keyword in html_content.lower() for keyword in ['<input type="password"', 'login', 'sign in']):
page_type = 'login_pages'
# Check for API endpoints
elif '/api/' in page_url.lower() or page_url.startswith('/api'):
page_type = 'api_endpoints'
# Check for admin pages
elif any(keyword in page_url.lower() for keyword in ['/admin', '/dashboard', '/manage']):
page_type = 'admin_pages'
self.page_classifications[page_type].append(page_url)
return page_type
def _comprehensive_header_validation(self, page_url, headers, page_type):
"""Comprehensive validation of security headers"""
issues = []
config = SECURITY_HEADERS_CONFIG
# Check general required headers
for header_name, header_config in config['required_headers'].items():
if header_config.get('required', False):
if header_name not in headers:
issues.append(f"Missing {header_name}")
self._log_security_issue(page_url, 'HIGH', f'Missing required header: {header_name}')
else:
# Validate specific header requirements
header_value = headers[header_name]
header_issues = self._validate_header_value(header_name, header_value, header_config)
issues.extend(header_issues)
for issue in header_issues:
self._log_security_issue(page_url, 'MEDIUM', f'{header_name}: {issue}')
# Check page-type specific requirements
if page_type in config['page_type_requirements']:
required_for_type = config['page_type_requirements'][page_type]
for required_header in required_for_type:
if required_header not in headers:
issues.append(f"Missing {required_header} (required for {page_type})")
self._log_security_issue(page_url, 'HIGH',
f'Missing {required_header} required for {page_type}')
# Update statistics
for header_name in config['required_headers'].keys():
if header_name in headers:
self.header_stats[f'{header_name}_present'] += 1
else:
self.header_stats[f'{header_name}_missing'] += 1
return issues
def _validate_header_value(self, header_name, header_value, config):
"""Validate specific header value requirements"""
issues = []
if header_name == 'Strict-Transport-Security':
# Check max-age
max_age_match = re.search(r'max-age=(\d+)', header_value)
if max_age_match:
max_age = int(max_age_match.group(1))
min_age = config.get('min_max_age', 31536000)
if max_age < min_age:
issues.append(f'max-age too short: {max_age} (minimum: {min_age})')
else:
issues.append('missing max-age directive')
# Check includeSubDomains
if config.get('should_include_subdomains', False):
if 'includeSubDomains' not in header_value:
issues.append('missing includeSubDomains')
elif header_name == 'Content-Security-Policy' and config.get('check_unsafe_directives', False):
# Check for unsafe CSP directives
unsafe_patterns = ["'unsafe-inline'", "'unsafe-eval'"]
for pattern in unsafe_patterns:
if pattern in header_value:
issues.append(f'contains unsafe directive: {pattern}')
elif 'valid_values' in config:
valid_values = config['valid_values']
if header_name == 'Referrer-Policy':
# Referrer-Policy can have multiple values
if not any(valid_val in header_value for valid_val in valid_values):
issues.append(f'invalid value: {header_value}')
else:
if header_value not in valid_values:
issues.append(f'invalid value: {header_value} (expected: {valid_values})')
return issues
def _find_internal_pages(self, html_content):
"""Find internal pages from current page links"""
if len(self.pages_to_check) > 100: # Limit crawling depth
return
links = re.findall(r'<a[^>]+href=["\']([^"\']+)["\']', html_content, re.IGNORECASE)
for link in links:
if self._is_internal_page_link(link):
normalized_link = self._normalize_link(link)
if normalized_link and normalized_link not in self.visited_pages:
if normalized_link not in self.pages_to_check:
self.pages_to_check.append(normalized_link)
def _is_internal_page_link(self, link):
"""Check if link is an internal page"""
if any(skip in link.lower() for skip in ['#', 'mailto:', 'tel:', 'javascript:']):
return False
resource_extensions = ['.css', '.js', '.jpg', '.jpeg', '.png', '.gif', '.pdf', '.zip',
'.svg', '.ico', '.mp4', '.mp3', '.woff', '.woff2', '.ttf', '.eot']
if any(link.lower().endswith(ext) for ext in resource_extensions):
return False
if link.startswith('/') or not link.startswith('http'):
return True
if link.startswith('http') and self.base_domain:
return urlparse(link).netloc == self.base_domain
return False
def _normalize_link(self, link):
"""Normalize link for checking"""
try:
if link.startswith('/'):
return link.split('#')[0]
elif not link.startswith('http'):
return '/' + link.lstrip('./')
elif self.base_domain and link.startswith('http'):
parsed = urlparse(link)
if parsed.netloc == self.base_domain:
return parsed.path
return None
except:
return None
def _log_security_issue(self, page_url, severity, description):
"""Log security issue with details"""
issue = {
'page': page_url,
'severity': severity,
'description': description,
'timestamp': time.time()
}
self.security_issues.append(issue)
print(f"SECURITY ISSUE [{severity}]: {description} on {page_url}")
@task(1)
def generate_security_report(self):
"""Generate comprehensive security report"""
if len(self.visited_pages) < 5:
return
high_issues = [i for i in self.security_issues if i['severity'] == 'HIGH']
medium_issues = [i for i in self.security_issues if i['severity'] == 'MEDIUM']
print(f"SECURITY REPORT: {len(self.security_issues)} total issues "
f"({len(high_issues)} high, {len(medium_issues)} medium) "
f"across {len(self.visited_pages)} pages")
# Report page type distribution
for page_type, pages in self.page_classifications.items():
print(f" {page_type}: {len(pages)} pages")
def on_stop(self):
"""Final comprehensive security report"""
print("\n" + "="*60)
print("COMPREHENSIVE SECURITY HEADERS VALIDATION COMPLETE")
print("="*60)
print(f"Pages validated: {len(self.visited_pages)}")
print(f"Total security issues: {len(self.security_issues)}")
# Header statistics
print(f"\nHEADER STATISTICS:")
for header, count in self.header_stats.items():
print(f" {header}: {count}")
# Page type breakdown
print(f"\nPAGE TYPE BREAKDOWN:")
for page_type, pages in self.page_classifications.items():
print(f" {page_type}: {len(pages)} pages")
if self.security_issues:
print(f"\nTOP SECURITY ISSUES:")
for issue in self.security_issues[:10]:
print(f"❌ [{issue['severity']}] {issue['description']}")
print(f" Page: {issue['page']}")
else:
print("✅ All pages have proper security headers!")
Key Security Headers Validation Features
- Website Crawling: Discovers and validates all pages automatically
- Configurable Requirements: Simple configuration for required headers
- Pass/Fail Testing: LoadForge integration with clear success/failure status
- Page Type Classification: Different requirements for login, API, and admin pages
- Header Value Validation: Checks not just presence but proper configuration
- Comprehensive Reporting: Detailed statistics and issue classification
Configuration Guide
Edit the configuration at the top of the script to match your security requirements:
- Required Headers: Set to
True
for headers that must be present
- Header Values: Specify valid values for headers that need validation
- Page Types: Define specific requirements for different page types
- HSTS Settings: Configure minimum max-age and subdomain requirements
This guide provides comprehensive security headers validation across your entire website with flexible configuration options.