CDN Performance Testing

Comprehensive CDN endpoint testing with edge caching validation and global performance metrics

LoadForge can record your browser, graphically build tests, scan your site with a wizard and more. Sign up now to run your first test.

Sign up now


This guide demonstrates comprehensive CDN performance testing in your locustfile. CDN testing involves validating cache behavior, edge location performance, and content delivery optimization across global regions.

Use Cases

  • Testing CDN cache hit/miss ratios
  • Validating edge location performance
  • Testing cache invalidation and purging
  • Measuring global content delivery performance
  • Testing CDN failover and redundancy
  • Validating cache headers and TTL behavior

Complete CDN Performance Testing Locustfile

import time
import random
import hashlib
import json
from datetime import datetime, timedelta
from urllib.parse import urlparse, urljoin
from locust import HttpUser, task, between
from locust.exception import StopUser
import logging
import requests
import base64

class CDNPerformanceUser(HttpUser):
    wait_time = between(1, 3)
    
    def on_start(self):
        """Initialize CDN performance testing"""
        self.cdn_config = {
            # CDN Provider Settings - Configure these for your setup
            'primary_cdn': 'https://cdn.example.com',
            'backup_cdn': 'https://backup-cdn.example.com',
            'origin_server': 'https://origin.example.com',
            
            # Edge Locations to Test
            'edge_locations': [
                'https://us-east-1.cdn.example.com',
                'https://us-west-1.cdn.example.com',
                'https://eu-west-1.cdn.example.com',
                'https://ap-southeast-1.cdn.example.com',
                'https://ap-northeast-1.cdn.example.com'
            ],
            
            # Content Types to Test
            'content_types': {
                'static_assets': ['/css/main.css', '/js/app.js', '/images/logo.png'],
                'api_responses': ['/api/v1/products', '/api/v1/categories'],
                'large_files': ['/downloads/app.zip', '/videos/demo.mp4'],
                'dynamic_content': ['/api/v1/user/profile', '/api/v1/cart']
            },
            
            # Cache Settings
            'cache_test_params': {
                'cache_busting_enabled': True,
                'test_cache_headers': True,
                'validate_etags': True,
                'test_compression': True
            },
            
            # Performance Thresholds - Adjust these for your requirements
            'performance_thresholds': {
                'cache_hit_response_time': 200,  # ms
                'cache_miss_response_time': 1000,  # ms
                'edge_location_max_time': 500,  # ms
                'origin_fallback_max_time': 2000  # ms
            }
        }
        
        # Initialize metrics tracking
        self.cdn_metrics = {
            'cache_hits': 0,
            'cache_misses': 0,
            'edge_performance': {},
            'compression_ratios': [],
            'cache_headers_valid': 0,
            'failover_events': 0
        }
        
        # Generate unique test session ID
        self.test_session_id = hashlib.md5(f"{time.time()}_{random.randint(1000, 9999)}".encode()).hexdigest()[:8]
        
        # Setup logging
        self.logger = logging.getLogger(__name__)
        
        # Validate CDN endpoints
        self.validate_cdn_endpoints()
    
    def validate_cdn_endpoints(self):
        """Validate CDN endpoint connectivity"""
        for location in self.cdn_config['edge_locations']:
            try:
                response = requests.get(f"{location}/health", timeout=5)
                if response.status_code == 200:
                    self.logger.info(f"CDN edge location validated: {location}")
                else:
                    self.logger.warning(f"CDN edge location issue: {location} - {response.status_code}")
            except Exception as e:
                self.logger.error(f"CDN edge location failed: {location} - {e}")
    
    def detect_cache_status(self, response):
        """Detect cache hit/miss from response headers"""
        cache_headers = [
            'cf-cache-status',  # Cloudflare
            'x-cache',          # AWS CloudFront, Fastly
            'x-served-by',      # Fastly
            'x-cache-status',   # Generic
            'cache-control'     # Standard
        ]
        
        for header in cache_headers:
            if header in response.headers:
                value = response.headers[header].lower()
                if any(hit_indicator in value for hit_indicator in ['hit', 'cached', 'fresh']):
                    return 'hit'
                elif any(miss_indicator in value for miss_indicator in ['miss', 'expired', 'stale']):
                    return 'miss'
        
        # Fallback: detect by response time (heuristic)
        response_time = response.elapsed.total_seconds() * 1000
        return 'hit' if response_time < 100 else 'miss'
    
    def calculate_compression_ratio(self, response):
        """Calculate content compression ratio"""
        content_length = response.headers.get('content-length')
        content_encoding = response.headers.get('content-encoding', '').lower()
        
        if content_length and content_encoding in ['gzip', 'br', 'deflate']:
            compressed_size = int(content_length)
            # Estimate original size (rough approximation)
            if content_encoding == 'gzip':
                estimated_original = compressed_size * 3
            elif content_encoding == 'br':
                estimated_original = compressed_size * 4
            else:  # deflate
                estimated_original = compressed_size * 2.5
            
            compression_ratio = (1 - compressed_size / estimated_original) * 100
            return max(0, min(100, compression_ratio))  # Clamp between 0-100%
        
        return 0
    
    @task(3)
    def test_static_asset_caching(self):
        """Test static asset caching performance"""
        asset_path = random.choice(self.cdn_config['content_types']['static_assets'])
        
        # Test primary CDN
        with self.client.get(
            asset_path,
            name="CDN Static Asset",
            catch_response=True
        ) as response:
            if response.status_code == 200:
                cache_status = self.detect_cache_status(response)
                response_time = response.elapsed.total_seconds() * 1000
                
                if cache_status == 'hit':
                    self.cdn_metrics['cache_hits'] += 1
                    threshold = self.cdn_config['performance_thresholds']['cache_hit_response_time']
                    
                    if response_time <= threshold:
                        response.success()
                        self.logger.info(f"Cache HIT: {asset_path} - {response_time:.0f}ms")
                    else:
                        response.failure(f"Cache hit too slow: {response_time:.0f}ms > {threshold}ms")
                else:
                    self.cdn_metrics['cache_misses'] += 1
                    threshold = self.cdn_config['performance_thresholds']['cache_miss_response_time']
                    
                    if response_time <= threshold:
                        response.success()
                        self.logger.info(f"Cache MISS: {asset_path} - {response_time:.0f}ms")
                    else:
                        response.failure(f"Cache miss too slow: {response_time:.0f}ms > {threshold}ms")
                
                # Test compression
                compression_ratio = self.calculate_compression_ratio(response)
                if compression_ratio > 0:
                    self.cdn_metrics['compression_ratios'].append(compression_ratio)
                    self.logger.info(f"Compression: {compression_ratio:.1f}%")
                
            else:
                response.failure(f"Static asset failed: {response.status_code}")
    
    @task(2)
    def test_edge_location_performance(self):
        """Test performance across different edge locations"""
        edge_location = random.choice(self.cdn_config['edge_locations'])
        test_path = '/api/v1/products'
        
        # Parse edge location for metrics
        location_name = urlparse(edge_location).netloc.split('.')[0]
        
        with self.client.get(
            test_path,
            headers={'Host': urlparse(edge_location).netloc},
            name=f"Edge Location - {location_name}",
            catch_response=True
        ) as response:
            if response.status_code == 200:
                response_time = response.elapsed.total_seconds() * 1000
                threshold = self.cdn_config['performance_thresholds']['edge_location_max_time']
                
                # Store edge performance metrics
                if location_name not in self.cdn_metrics['edge_performance']:
                    self.cdn_metrics['edge_performance'][location_name] = []
                
                self.cdn_metrics['edge_performance'][location_name].append(response_time)
                
                if response_time <= threshold:
                    response.success()
                    self.logger.info(f"Edge {location_name}: {response_time:.0f}ms")
                else:
                    response.failure(f"Edge location too slow: {response_time:.0f}ms > {threshold}ms")
                    
            else:
                response.failure(f"Edge location failed: {response.status_code}")
    
    @task(2)
    def test_large_file_delivery(self):
        """Test large file delivery performance"""
        large_file = random.choice(self.cdn_config['content_types']['large_files'])
        
        start_time = time.time()
        with self.client.get(
            large_file,
            name="CDN Large File",
            catch_response=True
        ) as response:
            download_time = time.time() - start_time
            
            if response.status_code == 200:
                # Calculate throughput
                content_length = response.headers.get('content-length')
                if content_length:
                    file_size_mb = int(content_length) / (1024 * 1024)
                    throughput_mbps = file_size_mb / download_time if download_time > 0 else 0
                    
                    self.logger.info(f"Large file: {file_size_mb:.1f}MB in {download_time:.1f}s ({throughput_mbps:.1f} MB/s)")
                    
                    # Validate reasonable download speed (>1 MB/s)
                    if throughput_mbps >= 1.0:
                        response.success()
                    else:
                        response.failure(f"Download too slow: {throughput_mbps:.1f} MB/s")
                else:
                    # No content-length header, validate by time
                    if download_time < 30:  # 30 second timeout
                        response.success()
                    else:
                        response.failure(f"Download timeout: {download_time:.1f}s")
                        
            elif response.status_code == 206:  # Partial content (byte-range)
                response.success()
                self.logger.info("Byte-range request successful")
            else:
                response.failure(f"Large file delivery failed: {response.status_code}")
    
    @task(1)
    def test_api_response_caching(self):
        """Test API response caching"""
        api_endpoint = random.choice(self.cdn_config['content_types']['api_responses'])
        
        with self.client.get(
            api_endpoint,
            name="CDN API Response",
            catch_response=True
        ) as response:
            if response.status_code == 200:
                cache_status = self.detect_cache_status(response)
                
                # Validate cache headers
                cache_control = response.headers.get('cache-control', '')
                etag = response.headers.get('etag', '')
                
                if cache_control or etag:
                    self.cdn_metrics['cache_headers_valid'] += 1
                    self.logger.info(f"API cache headers valid: {api_endpoint}")
                
                response.success()
                
            else:
                response.failure(f"API response caching failed: {response.status_code}")
    
    @task(1)
    def test_cache_invalidation(self):
        """Test cache invalidation behavior"""
        test_path = '/api/v1/cache-test'
        cache_buster = f"?v={int(time.time())}" if self.cdn_config['cache_test_params']['cache_busting_enabled'] else ""
        
        with self.client.get(
            f"{test_path}{cache_buster}",
            name="Cache Invalidation Test",
            catch_response=True
        ) as response:
            if response.status_code == 200:
                cache_status = self.detect_cache_status(response)
                
                if cache_buster and cache_status == 'miss':
                    response.success()
                    self.logger.info("Cache invalidation working (cache miss with buster)")
                elif not cache_buster:
                    response.success()
                    self.logger.info(f"Cache behavior: {cache_status}")
                else:
                    response.failure("Cache invalidation not working properly")
                    
            else:
                response.failure(f"Cache invalidation test failed: {response.status_code}")
    
    @task(1)
    def test_cdn_failover(self):
        """Test CDN failover to backup or origin"""
        # Simulate primary CDN failure by testing backup
        backup_cdn = self.cdn_config['backup_cdn']
        test_path = '/api/v1/products'
        
        with self.client.get(
            test_path,
            headers={'Host': urlparse(backup_cdn).netloc},
            name="CDN Failover Test",
            catch_response=True
        ) as response:
            if response.status_code == 200:
                response_time = response.elapsed.total_seconds() * 1000
                threshold = self.cdn_config['performance_thresholds']['origin_fallback_max_time']
                
                self.cdn_metrics['failover_events'] += 1
                
                if response_time <= threshold:
                    response.success()
                    self.logger.info(f"CDN failover successful: {response_time:.0f}ms")
                else:
                    response.failure(f"CDN failover too slow: {response_time:.0f}ms > {threshold}ms")
                    
            else:
                response.failure(f"CDN failover failed: {response.status_code}")
    
    @task(1)
    def test_image_optimization(self):
        """Test modern image format delivery (WebP, AVIF)"""
        image_path = '/images/hero.jpg'
        
        # Test WebP support
        with self.client.get(
            image_path,
            headers={'Accept': 'image/webp,image/*,*/*;q=0.8'},
            name="Image Optimization - WebP",
            catch_response=True
        ) as response:
            if response.status_code == 200:
                content_type = response.headers.get('content-type', '').lower()
                
                if 'webp' in content_type:
                    response.success()
                    self.logger.info("WebP optimization active")
                elif 'jpeg' in content_type or 'jpg' in content_type:
                    response.success()
                    self.logger.info("Fallback to JPEG (WebP not supported)")
                else:
                    response.failure(f"Unexpected image format: {content_type}")
                    
            else:
                response.failure(f"Image optimization test failed: {response.status_code}")
    
    @task(1)
    def test_video_streaming(self):
        """Test video streaming with byte-range requests"""
        video_path = '/videos/demo.mp4'
        
        # Test byte-range request
        with self.client.get(
            video_path,
            headers={'Range': 'bytes=0-1023'},  # First 1KB
            name="Video Streaming - Byte Range",
            catch_response=True
        ) as response:
            if response.status_code == 206:  # Partial Content
                content_range = response.headers.get('content-range', '')
                accept_ranges = response.headers.get('accept-ranges', '')
                
                if 'bytes' in accept_ranges.lower():
                    response.success()
                    self.logger.info(f"Video streaming supported: {content_range}")
                else:
                    response.failure("Byte-range requests not supported")
                    
            elif response.status_code == 200:
                # Full content returned (acceptable for small ranges)
                response.success()
                self.logger.info("Video streaming: full content returned")
            else:
                response.failure(f"Video streaming failed: {response.status_code}")

# Advanced CDN testing with multi-provider support
class MultiProviderCDNUser(CDNPerformanceUser):
    """Advanced CDN testing across multiple providers"""
    
    def on_start(self):
        super().on_start()
        
        # Multi-provider configuration
        self.provider_configs = {
            'cloudflare': {
                'endpoint': 'https://cdn-cf.example.com',
                'cache_header': 'cf-cache-status',
                'edge_locations': ['us', 'eu', 'asia']
            },
            'aws_cloudfront': {
                'endpoint': 'https://d123456789.cloudfront.net',
                'cache_header': 'x-cache',
                'edge_locations': ['us-east-1', 'eu-west-1', 'ap-southeast-1']
            },
            'fastly': {
                'endpoint': 'https://cdn-fastly.example.com',
                'cache_header': 'x-served-by',
                'edge_locations': ['us-west', 'eu-central', 'asia-pacific']
            }
        }
        
        self.current_provider = random.choice(list(self.provider_configs.keys()))
        self.logger.info(f"Testing CDN provider: {self.current_provider}")
    
    @task
    def test_provider_specific_features(self):
        """Test provider-specific CDN features"""
        provider_config = self.provider_configs[self.current_provider]
        
        with self.client.get(
            '/api/v1/products',
            headers={'Host': urlparse(provider_config['endpoint']).netloc},
            name=f"Provider Test - {self.current_provider}",
            catch_response=True
        ) as response:
            if response.status_code == 200:
                # Check provider-specific cache header
                cache_header = provider_config['cache_header']
                if cache_header in response.headers:
                    cache_value = response.headers[cache_header]
                    self.logger.info(f"{self.current_provider} cache status: {cache_value}")
                    response.success()
                else:
                    response.failure(f"Missing {self.current_provider} cache header")
            else:
                response.failure(f"{self.current_provider} test failed: {response.status_code}")

Ready to run your test?
Launch your locust test at scale.