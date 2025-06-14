This guide demonstrates how to test Server-Sent Events (SSE) endpoints with LoadForge, including automatic reconnection logic and proper event parsing.

Use Cases

Testing real-time data streams (stock prices, chat messages, notifications)

Validating SSE endpoint reliability and reconnection behavior

Load testing streaming APIs with persistent connections

Testing event-driven architectures

Basic SSE Testing

from locust import HttpUser, task, between import time import json import re class SSEUser(HttpUser): wait_time = between(1, 3) def on_start(self): """Initialize SSE connection tracking""" self.connection_id = None self.last_event_id = None self.events_received = 0 @task(3) def test_sse_connection(self): """Test basic SSE connection and event reception""" headers = { 'Accept': 'text/event-stream', 'Cache-Control': 'no-cache', } # Add Last-Event-ID for reconnection if self.last_event_id: headers['Last-Event-ID'] = self.last_event_id with self.client.get('/api/events/stream', headers=headers, stream=True, catch_response=True) as response: if response.status_code != 200: response.failure(f"SSE connection failed: {response.status_code}") return # Process SSE stream for limited time start_time = time.time() timeout = 10 # 10 seconds max per connection try: for line in response.iter_lines(decode_unicode=True): if time.time() - start_time > timeout: break if line: self._process_sse_line(line) if self.events_received > 0: response.success() else: response.failure("No events received") except Exception as e: response.failure(f"SSE processing error: {str(e)}") def _process_sse_line(self, line): """Process individual SSE lines""" if line.startswith('data: '): # Extract event data data = line[6:] # Remove 'data: ' prefix try: event_data = json.loads(data) self.events_received += 1 # Track event ID for reconnection if 'id' in event_data: self.last_event_id = event_data['id'] except json.JSONDecodeError: # Handle plain text events self.events_received += 1 elif line.startswith('id: '): # Track event ID self.last_event_id = line[4:] elif line.startswith('event: '): # Handle named events event_type = line[7:] @task(2) def test_sse_with_filters(self): """Test SSE with query parameters and filters""" params = { 'channel': 'notifications', 'user_id': f'user_{self.client.base_url.split("//")[1].replace(".", "_")}', 'types': 'message,alert,update' } headers = { 'Accept': 'text/event-stream', 'Authorization': 'Bearer test-token-123' } with self.client.get('/api/events/filtered', params=params, headers=headers, stream=True, catch_response=True) as response: if response.status_code == 200: # Process a few events event_count = 0 for line in response.iter_lines(decode_unicode=True): if line and line.startswith('data: '): event_count += 1 if event_count >= 3: # Process 3 events then disconnect break response.success() else: response.failure(f"Filtered SSE failed: {response.status_code}") @task(1) def test_sse_reconnection(self): """Test SSE reconnection after connection drop""" headers = { 'Accept': 'text/event-stream', 'Connection': 'keep-alive' } # First connection with self.client.get('/api/events/stream', headers=headers, stream=True, catch_response=True) as response: if response.status_code != 200: response.failure("Initial SSE connection failed") return # Simulate connection drop after receiving some events event_count = 0 for line in response.iter_lines(decode_unicode=True): if line and line.startswith('data: '): event_count += 1 if event_count >= 2: break # Simulate connection drop # Wait before reconnection time.sleep(1) # Reconnection attempt reconnect_headers = headers.copy() if self.last_event_id: reconnect_headers['Last-Event-ID'] = self.last_event_id with self.client.get('/api/events/stream', headers=reconnect_headers, stream=True, catch_response=True) as response: if response.status_code == 200: response.success() else: response.failure(f"SSE reconnection failed: {response.status_code}")

Advanced SSE Testing

from locust import HttpUser, task, between import time import json import threading from queue import Queue class AdvancedSSEUser(HttpUser): wait_time = between(2, 5) def on_start(self): """Initialize advanced SSE tracking""" self.event_queue = Queue() self.connection_stats = { 'connections': 0, 'reconnections': 0, 'events_received': 0, 'connection_errors': 0 } @task(2) def test_multiple_sse_channels(self): """Test multiple SSE channels simultaneously""" channels = ['notifications', 'updates', 'alerts'] for channel in channels: self._test_channel(channel) time.sleep(0.5) # Stagger connections def _test_channel(self, channel): """Test individual SSE channel""" headers = { 'Accept': 'text/event-stream', 'X-Channel': channel } with self.client.get(f'/api/events/{channel}', headers=headers, stream=True, catch_response=True) as response: if response.status_code == 200: self.connection_stats['connections'] += 1 # Process events for short duration start_time = time.time() while time.time() - start_time < 5: try: for line in response.iter_lines(decode_unicode=True, chunk_size=1): if line and line.startswith('data: '): self.connection_stats['events_received'] += 1 break except: break response.success() else: self.connection_stats['connection_errors'] += 1 response.failure(f"Channel {channel} failed: {response.status_code}") @task(1) def test_sse_heartbeat(self): """Test SSE heartbeat and keep-alive""" headers = { 'Accept': 'text/event-stream', 'X-Heartbeat': 'true' } with self.client.get('/api/events/heartbeat', headers=headers, stream=True, catch_response=True) as response: if response.status_code != 200: response.failure("Heartbeat SSE connection failed") return heartbeat_count = 0 start_time = time.time() for line in response.iter_lines(decode_unicode=True): if time.time() - start_time > 15: # 15 second test break if line and 'heartbeat' in line.lower(): heartbeat_count += 1 if heartbeat_count > 0: response.success() else: response.failure("No heartbeat events received") def on_stop(self): """Log connection statistics""" print(f"SSE Stats: {self.connection_stats}")

Key Testing Points

Connection Establishment: Verify SSE endpoints accept proper headers Event Processing: Parse and validate event data format Reconnection Logic: Test automatic reconnection with Last-Event-ID Multiple Channels: Test concurrent SSE connections Error Handling: Validate behavior on connection failures Performance: Monitor connection overhead and event throughput

Common SSE Patterns

Real-time Notifications : User-specific event streams

: User-specific event streams Live Data Feeds : Stock prices, sports scores, sensor data

: Stock prices, sports scores, sensor data Chat Applications : Message broadcasting and presence

: Message broadcasting and presence Progress Updates : Long-running task status updates

: Long-running task status updates System Monitoring: Real-time metrics and alerts