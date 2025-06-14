This guide demonstrates how to test Server-Sent Events (SSE) endpoints with LoadForge using manual SSE parsing without external dependencies.

Use Cases

Testing SSE event streams and real-time updates

Validating event data format and structure

Load testing streaming endpoints

Testing SSE reconnection and error handling

Monitoring event delivery performance

Complete SSE Testing

from locust import HttpUser, task, between import time import json import re class SSEUser(HttpUser): wait_time = between(1, 3) def on_start(self): """Initialize SSE testing""" self.events_received = 0 self.connection_errors = 0 self.last_event_id = None @task(3) def test_sse_stream(self): """Test SSE stream with manual parsing""" headers = { 'Accept': 'text/event-stream', 'Cache-Control': 'no-cache', } # Add Last-Event-ID header if we have one if self.last_event_id: headers['Last-Event-ID'] = self.last_event_id try: with self.client.get('/api/events/stream', headers=headers, stream=True, name="SSE Stream Connection") as response: if response.status_code == 200: self._process_sse_stream(response) else: print(f"SSE connection failed: {response.status_code}") self.connection_errors += 1 except Exception as e: print(f"SSE stream error: {str(e)}") self.connection_errors += 1 def _process_sse_stream(self, response): """Process SSE stream manually""" buffer = "" events_processed = 0 max_events = 5 # Limit events per connection for load testing try: for chunk in response.iter_content(chunk_size=1024, decode_unicode=True): if chunk: buffer += chunk # Process complete events while '



' in buffer: event_data, buffer = buffer.split('



', 1) event = self._parse_sse_event(event_data) if event: self._handle_sse_event(event) events_processed += 1 # Limit events for load testing if events_processed >= max_events: return except Exception as e: print(f"Error processing SSE stream: {str(e)}") def _parse_sse_event(self, event_data): """Parse SSE event from raw data""" event = { 'event': None, 'data': '', 'id': None, 'retry': None } lines = event_data.strip().split('

') for line in lines: line = line.strip() if not line or line.startswith(':'): continue # Skip empty lines and comments if ':' in line: field, value = line.split(':', 1) field = field.strip() value = value.strip() if field == 'data': if event['data']: event['data'] += '

' + value else: event['data'] = value elif field in ['event', 'id', 'retry']: event[field] = value return event if event['data'] or event['event'] else None def _handle_sse_event(self, event): """Handle parsed SSE event""" self.events_received += 1 # Store last event ID for reconnection if event['id']: self.last_event_id = event['id'] # Process different event types event_type = event['event'] or 'message' if event_type == 'heartbeat': self._handle_heartbeat_event(event) elif event_type == 'data': self._handle_data_event(event) elif event_type == 'notification': self._handle_notification_event(event) else: self._handle_generic_event(event) def _handle_heartbeat_event(self, event): """Handle heartbeat events""" print(f"Heartbeat received: {event['data']}") def _handle_data_event(self, event): """Handle data events""" try: data = json.loads(event['data']) print(f"Data event: {data.get('type', 'unknown')} - {len(event['data'])} bytes") except json.JSONDecodeError: print(f"Non-JSON data event: {len(event['data'])} bytes") def _handle_notification_event(self, event): """Handle notification events""" try: notification = json.loads(event['data']) print(f"Notification: {notification.get('message', 'No message')}") except json.JSONDecodeError: print(f"Invalid notification format") def _handle_generic_event(self, event): """Handle generic events""" print(f"Event '{event['event']}': {event['data'][:100]}...") @task(1) def test_sse_with_parameters(self): """Test SSE with query parameters""" params = { 'channel': 'updates', 'filter': 'important', 'format': 'json' } headers = { 'Accept': 'text/event-stream', 'Cache-Control': 'no-cache', } try: with self.client.get('/api/events/filtered', params=params, headers=headers, stream=True, name="SSE Filtered Stream") as response: if response.status_code == 200: self._process_sse_stream(response) else: print(f"Filtered SSE failed: {response.status_code}") except Exception as e: print(f"Filtered SSE error: {str(e)}") @task(1) def test_sse_reconnection(self): """Test SSE reconnection behavior""" headers = { 'Accept': 'text/event-stream', 'Cache-Control': 'no-cache', } # Simulate reconnection with Last-Event-ID if self.last_event_id: headers['Last-Event-ID'] = self.last_event_id response = self.client.get('/api/events/stream', headers=headers, name="SSE Reconnection Test") if response.status_code == 200: print(f"SSE reconnection successful with ID: {self.last_event_id}") else: print(f"SSE reconnection failed: {response.status_code}") def on_stop(self): """Report SSE testing statistics""" print(f"

=== SSE Testing Complete ===") print(f"Events received: {self.events_received}") print(f"Connection errors: {self.connection_errors}") print(f"Last event ID: {self.last_event_id}")

Advanced SSE Testing

from locust import HttpUser, task, between import time import json class AdvancedSSEUser(HttpUser): wait_time = between(2, 5) @task(2) def test_multiple_sse_channels(self): """Test multiple SSE channels simultaneously""" channels = ['notifications', 'updates', 'alerts'] for channel in channels: headers = { 'Accept': 'text/event-stream', 'Cache-Control': 'no-cache', } response = self.client.get(f'/api/events/{channel}', headers=headers, stream=True, name=f"SSE Channel: {channel}") if response.status_code == 200: # Process a few events from each channel self._process_limited_stream(response, max_events=2) def _process_limited_stream(self, response, max_events=3): """Process limited number of events from stream""" buffer = "" events_count = 0 try: for chunk in response.iter_content(chunk_size=512, decode_unicode=True): if chunk: buffer += chunk while '



' in buffer and events_count < max_events: event_data, buffer = buffer.split('



', 1) if event_data.strip(): events_count += 1 print(f"Received event {events_count}") if events_count >= max_events: break except Exception as e: print(f"Stream processing error: {str(e)}") @task(1) def test_sse_error_handling(self): """Test SSE error scenarios""" # Test invalid endpoint response = self.client.get('/api/events/nonexistent', headers={'Accept': 'text/event-stream'}, name="SSE Invalid Endpoint") if response.status_code == 404: print("SSE 404 handling works correctly") # Test without proper headers response = self.client.get('/api/events/stream', name="SSE Without Headers") if response.status_code != 200: print(f"SSE requires proper headers: {response.status_code}")

Key Features

Manual SSE Parsing: No external dependencies, uses only built-in Python Event Type Handling: Supports different SSE event types Reconnection Support: Implements Last-Event-ID for reconnection Error Handling: Robust error handling for connection issues Load Testing Optimized: Limits events per connection for performance Multiple Channels: Support for testing different SSE channels