This guide demonstrates comprehensive CDN performance testing in your locustfile. CDN testing involves validating cache behavior, edge location performance, and content delivery optimization across global regions.
Use Cases
- Testing CDN cache hit/miss ratios
- Validating edge location performance
- Testing cache invalidation and purging
- Measuring global content delivery performance
- Testing CDN failover and redundancy
- Validating cache headers and TTL behavior
Complete CDN Performance Testing Locustfile
import time
import random
import hashlib
import json
from datetime import datetime, timedelta
from urllib.parse import urlparse, urljoin
from locust import HttpUser, task, between
from locust.exception import StopUser
import logging
import requests
import base64
class CDNPerformanceUser(HttpUser):
wait_time = between(1, 3)
def on_start(self):
"""Initialize CDN performance testing"""
self.cdn_config = {
# CDN Provider Settings - Configure these for your setup
'primary_cdn': 'https://cdn.example.com',
'backup_cdn': 'https://backup-cdn.example.com',
'origin_server': 'https://origin.example.com',
# Edge Locations to Test
'edge_locations': [
'https://us-east-1.cdn.example.com',
'https://us-west-1.cdn.example.com',
'https://eu-west-1.cdn.example.com',
'https://ap-southeast-1.cdn.example.com',
'https://ap-northeast-1.cdn.example.com'
],
# Content Types to Test
'content_types': {
'static_assets': ['/css/main.css', '/js/app.js', '/images/logo.png'],
'api_responses': ['/api/v1/products', '/api/v1/categories'],
'large_files': ['/downloads/app.zip', '/videos/demo.mp4'],
'dynamic_content': ['/api/v1/user/profile', '/api/v1/cart']
},
# Cache Settings
'cache_test_params': {
'cache_busting_enabled': True,
'test_cache_headers': True,
'validate_etags': True,
'test_compression': True
},
# Performance Thresholds - Adjust these for your requirements
'performance_thresholds': {
'cache_hit_response_time': 200, # ms
'cache_miss_response_time': 1000, # ms
'edge_location_max_time': 500, # ms
'origin_fallback_max_time': 2000 # ms
}
}
# Initialize metrics tracking
self.cdn_metrics = {
'cache_hits': 0,
'cache_misses': 0,
'edge_performance': {},
'compression_ratios': [],
'cache_headers_valid': 0,
'failover_events': 0
}
# Generate unique test session ID
self.test_session_id = hashlib.md5(f"{time.time()}_{random.randint(1000, 9999)}".encode()).hexdigest()[:8]
# Setup logging
self.logger = logging.getLogger(__name__)
# Validate CDN endpoints
self.validate_cdn_endpoints()
def validate_cdn_endpoints(self):
"""Validate CDN endpoint connectivity"""
for location in self.cdn_config['edge_locations']:
try:
response = requests.get(f"{location}/health", timeout=5)
if response.status_code == 200:
self.logger.info(f"CDN edge location validated: {location}")
else:
self.logger.warning(f"CDN edge location issue: {location} - {response.status_code}")
except Exception as e:
self.logger.error(f"CDN edge location failed: {location} - {e}")
def detect_cache_status(self, response):
"""Detect cache hit/miss from response headers"""
cache_headers = [
'cf-cache-status', # Cloudflare
'x-cache', # AWS CloudFront, Fastly
'x-served-by', # Fastly
'x-cache-status', # Generic
'cache-control' # Standard
]
for header in cache_headers:
if header in response.headers:
value = response.headers[header].lower()
if any(hit_indicator in value for hit_indicator in ['hit', 'cached', 'fresh']):
return 'hit'
elif any(miss_indicator in value for miss_indicator in ['miss', 'expired', 'stale']):
return 'miss'
# Fallback: detect by response time (heuristic)
response_time = response.elapsed.total_seconds() * 1000
return 'hit' if response_time < 100 else 'miss'
def calculate_compression_ratio(self, response):
"""Calculate content compression ratio"""
content_length = response.headers.get('content-length')
content_encoding = response.headers.get('content-encoding', '').lower()
if content_length and content_encoding in ['gzip', 'br', 'deflate']:
compressed_size = int(content_length)
# Estimate original size (rough approximation)
if content_encoding == 'gzip':
estimated_original = compressed_size * 3
elif content_encoding == 'br':
estimated_original = compressed_size * 4
else: # deflate
estimated_original = compressed_size * 2.5
compression_ratio = (1 - compressed_size / estimated_original) * 100
return max(0, min(100, compression_ratio)) # Clamp between 0-100%
return 0
@task(3)
def test_static_asset_caching(self):
"""Test static asset caching performance"""
asset_path = random.choice(self.cdn_config['content_types']['static_assets'])
# Test primary CDN
with self.client.get(
asset_path,
name="CDN Static Asset",
catch_response=True
) as response:
if response.status_code == 200:
cache_status = self.detect_cache_status(response)
response_time = response.elapsed.total_seconds() * 1000
if cache_status == 'hit':
self.cdn_metrics['cache_hits'] += 1
threshold = self.cdn_config['performance_thresholds']['cache_hit_response_time']
if response_time <= threshold:
response.success()
self.logger.info(f"Cache HIT: {asset_path} - {response_time:.0f}ms")
else:
response.failure(f"Cache hit too slow: {response_time:.0f}ms > {threshold}ms")
else:
self.cdn_metrics['cache_misses'] += 1
threshold = self.cdn_config['performance_thresholds']['cache_miss_response_time']
if response_time <= threshold:
response.success()
self.logger.info(f"Cache MISS: {asset_path} - {response_time:.0f}ms")
else:
response.failure(f"Cache miss too slow: {response_time:.0f}ms > {threshold}ms")
# Test compression
compression_ratio = self.calculate_compression_ratio(response)
if compression_ratio > 0:
self.cdn_metrics['compression_ratios'].append(compression_ratio)
self.logger.info(f"Compression: {compression_ratio:.1f}%")
else:
response.failure(f"Static asset failed: {response.status_code}")
@task(2)
def test_edge_location_performance(self):
"""Test performance across different edge locations"""
edge_location = random.choice(self.cdn_config['edge_locations'])
test_path = '/api/v1/products'
# Parse edge location for metrics
location_name = urlparse(edge_location).netloc.split('.')[0]
with self.client.get(
test_path,
headers={'Host': urlparse(edge_location).netloc},
name=f"Edge Location - {location_name}",
catch_response=True
) as response:
if response.status_code == 200:
response_time = response.elapsed.total_seconds() * 1000
threshold = self.cdn_config['performance_thresholds']['edge_location_max_time']
# Store edge performance metrics
if location_name not in self.cdn_metrics['edge_performance']:
self.cdn_metrics['edge_performance'][location_name] = []
self.cdn_metrics['edge_performance'][location_name].append(response_time)
if response_time <= threshold:
response.success()
self.logger.info(f"Edge {location_name}: {response_time:.0f}ms")
else:
response.failure(f"Edge location too slow: {response_time:.0f}ms > {threshold}ms")
else:
response.failure(f"Edge location failed: {response.status_code}")
@task(2)
def test_large_file_delivery(self):
"""Test large file delivery performance"""
large_file = random.choice(self.cdn_config['content_types']['large_files'])
start_time = time.time()
with self.client.get(
large_file,
name="CDN Large File",
catch_response=True
) as response:
download_time = time.time() - start_time
if response.status_code == 200:
# Calculate throughput
content_length = response.headers.get('content-length')
if content_length:
file_size_mb = int(content_length) / (1024 * 1024)
throughput_mbps = file_size_mb / download_time if download_time > 0 else 0
self.logger.info(f"Large file: {file_size_mb:.1f}MB in {download_time:.1f}s ({throughput_mbps:.1f} MB/s)")
# Validate reasonable download speed (>1 MB/s)
if throughput_mbps >= 1.0:
response.success()
else:
response.failure(f"Download too slow: {throughput_mbps:.1f} MB/s")
else:
# No content-length header, validate by time
if download_time < 30: # 30 second timeout
response.success()
else:
response.failure(f"Download timeout: {download_time:.1f}s")
elif response.status_code == 206: # Partial content (byte-range)
response.success()
self.logger.info("Byte-range request successful")
else:
response.failure(f"Large file delivery failed: {response.status_code}")
@task(1)
def test_api_response_caching(self):
"""Test API response caching"""
api_endpoint = random.choice(self.cdn_config['content_types']['api_responses'])
with self.client.get(
api_endpoint,
name="CDN API Response",
catch_response=True
) as response:
if response.status_code == 200:
cache_status = self.detect_cache_status(response)
# Validate cache headers
cache_control = response.headers.get('cache-control', '')
etag = response.headers.get('etag', '')
if cache_control or etag:
self.cdn_metrics['cache_headers_valid'] += 1
self.logger.info(f"API cache headers valid: {api_endpoint}")
response.success()
else:
response.failure(f"API response caching failed: {response.status_code}")
@task(1)
def test_cache_invalidation(self):
"""Test cache invalidation behavior"""
test_path = '/api/v1/cache-test'
cache_buster = f"?v={int(time.time())}" if self.cdn_config['cache_test_params']['cache_busting_enabled'] else ""
with self.client.get(
f"{test_path}{cache_buster}",
name="Cache Invalidation Test",
catch_response=True
) as response:
if response.status_code == 200:
cache_status = self.detect_cache_status(response)
if cache_buster and cache_status == 'miss':
response.success()
self.logger.info("Cache invalidation working (cache miss with buster)")
elif not cache_buster:
response.success()
self.logger.info(f"Cache behavior: {cache_status}")
else:
response.failure("Cache invalidation not working properly")
else:
response.failure(f"Cache invalidation test failed: {response.status_code}")
@task(1)
def test_cdn_failover(self):
"""Test CDN failover to backup or origin"""
# Simulate primary CDN failure by testing backup
backup_cdn = self.cdn_config['backup_cdn']
test_path = '/api/v1/products'
with self.client.get(
test_path,
headers={'Host': urlparse(backup_cdn).netloc},
name="CDN Failover Test",
catch_response=True
) as response:
if response.status_code == 200:
response_time = response.elapsed.total_seconds() * 1000
threshold = self.cdn_config['performance_thresholds']['origin_fallback_max_time']
self.cdn_metrics['failover_events'] += 1
if response_time <= threshold:
response.success()
self.logger.info(f"CDN failover successful: {response_time:.0f}ms")
else:
response.failure(f"CDN failover too slow: {response_time:.0f}ms > {threshold}ms")
else:
response.failure(f"CDN failover failed: {response.status_code}")
@task(1)
def test_image_optimization(self):
"""Test modern image format delivery (WebP, AVIF)"""
image_path = '/images/hero.jpg'
# Test WebP support
with self.client.get(
image_path,
headers={'Accept': 'image/webp,image/*,*/*;q=0.8'},
name="Image Optimization - WebP",
catch_response=True
) as response:
if response.status_code == 200:
content_type = response.headers.get('content-type', '').lower()
if 'webp' in content_type:
response.success()
self.logger.info("WebP optimization active")
elif 'jpeg' in content_type or 'jpg' in content_type:
response.success()
self.logger.info("Fallback to JPEG (WebP not supported)")
else:
response.failure(f"Unexpected image format: {content_type}")
else:
response.failure(f"Image optimization test failed: {response.status_code}")
@task(1)
def test_video_streaming(self):
"""Test video streaming with byte-range requests"""
video_path = '/videos/demo.mp4'
# Test byte-range request
with self.client.get(
video_path,
headers={'Range': 'bytes=0-1023'}, # First 1KB
name="Video Streaming - Byte Range",
catch_response=True
) as response:
if response.status_code == 206: # Partial Content
content_range = response.headers.get('content-range', '')
accept_ranges = response.headers.get('accept-ranges', '')
if 'bytes' in accept_ranges.lower():
response.success()
self.logger.info(f"Video streaming supported: {content_range}")
else:
response.failure("Byte-range requests not supported")
elif response.status_code == 200:
# Full content returned (acceptable for small ranges)
response.success()
self.logger.info("Video streaming: full content returned")
else:
response.failure(f"Video streaming failed: {response.status_code}")
# Advanced CDN testing with multi-provider support
class MultiProviderCDNUser(CDNPerformanceUser):
"""Advanced CDN testing across multiple providers"""
def on_start(self):
super().on_start()
# Multi-provider configuration
self.provider_configs = {
'cloudflare': {
'endpoint': 'https://cdn-cf.example.com',
'cache_header': 'cf-cache-status',
'edge_locations': ['us', 'eu', 'asia']
},
'aws_cloudfront': {
'endpoint': 'https://d123456789.cloudfront.net',
'cache_header': 'x-cache',
'edge_locations': ['us-east-1', 'eu-west-1', 'ap-southeast-1']
},
'fastly': {
'endpoint': 'https://cdn-fastly.example.com',
'cache_header': 'x-served-by',
'edge_locations': ['us-west', 'eu-central', 'asia-pacific']
}
}
self.current_provider = random.choice(list(self.provider_configs.keys()))
self.logger.info(f"Testing CDN provider: {self.current_provider}")
@task
def test_provider_specific_features(self):
"""Test provider-specific CDN features"""
provider_config = self.provider_configs[self.current_provider]
with self.client.get(
'/api/v1/products',
headers={'Host': urlparse(provider_config['endpoint']).netloc},
name=f"Provider Test - {self.current_provider}",
catch_response=True
) as response:
if response.status_code == 200:
# Check provider-specific cache header
cache_header = provider_config['cache_header']
if cache_header in response.headers:
cache_value = response.headers[cache_header]
self.logger.info(f"{self.current_provider} cache status: {cache_value}")
response.success()
else:
response.failure(f"Missing {self.current_provider} cache header")
else:
response.failure(f"{self.current_provider} test failed: {response.status_code}")