This guide demonstrates how to test Server-Sent Events (SSE) endpoints with LoadForge using manual SSE parsing without external dependencies.
Use Cases
- Testing SSE event streams and real-time updates
- Validating event data format and structure
- Load testing streaming endpoints
- Testing SSE reconnection and error handling
- Monitoring event delivery performance
Complete SSE Testing
from locust import HttpUser, task, between
import time
import json
import re
class SSEUser(HttpUser):
wait_time = between(1, 3)
def on_start(self):
"""Initialize SSE testing"""
self.events_received = 0
self.connection_errors = 0
self.last_event_id = None
@task(3)
def test_sse_stream(self):
"""Test SSE stream with manual parsing"""
headers = {
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
}
# Add Last-Event-ID header if we have one
if self.last_event_id:
headers['Last-Event-ID'] = self.last_event_id
try:
with self.client.get('/api/events/stream',
headers=headers,
stream=True,
name="SSE Stream Connection") as response:
if response.status_code == 200:
self._process_sse_stream(response)
else:
print(f"SSE connection failed: {response.status_code}")
self.connection_errors += 1
except Exception as e:
print(f"SSE stream error: {str(e)}")
self.connection_errors += 1
def _process_sse_stream(self, response):
"""Process SSE stream manually"""
buffer = ""
events_processed = 0
max_events = 5 # Limit events per connection for load testing
try:
for chunk in response.iter_content(chunk_size=1024, decode_unicode=True):
if chunk:
buffer += chunk
# Process complete events
while '\n\n' in buffer:
event_data, buffer = buffer.split('\n\n', 1)
event = self._parse_sse_event(event_data)
if event:
self._handle_sse_event(event)
events_processed += 1
# Limit events for load testing
if events_processed >= max_events:
return
except Exception as e:
print(f"Error processing SSE stream: {str(e)}")
def _parse_sse_event(self, event_data):
"""Parse SSE event from raw data"""
event = {
'event': None,
'data': '',
'id': None,
'retry': None
}
lines = event_data.strip().split('\n')
for line in lines:
line = line.strip()
if not line or line.startswith(':'):
continue # Skip empty lines and comments
if ':' in line:
field, value = line.split(':', 1)
field = field.strip()
value = value.strip()
if field == 'data':
if event['data']:
event['data'] += '\n' + value
else:
event['data'] = value
elif field in ['event', 'id', 'retry']:
event[field] = value
return event if event['data'] or event['event'] else None
def _handle_sse_event(self, event):
"""Handle parsed SSE event"""
self.events_received += 1
# Store last event ID for reconnection
if event['id']:
self.last_event_id = event['id']
# Process different event types
event_type = event['event'] or 'message'
if event_type == 'heartbeat':
self._handle_heartbeat_event(event)
elif event_type == 'data':
self._handle_data_event(event)
elif event_type == 'notification':
self._handle_notification_event(event)
else:
self._handle_generic_event(event)
def _handle_heartbeat_event(self, event):
"""Handle heartbeat events"""
print(f"Heartbeat received: {event['data']}")
def _handle_data_event(self, event):
"""Handle data events"""
try:
data = json.loads(event['data'])
print(f"Data event: {data.get('type', 'unknown')} - {len(event['data'])} bytes")
except json.JSONDecodeError:
print(f"Non-JSON data event: {len(event['data'])} bytes")
def _handle_notification_event(self, event):
"""Handle notification events"""
try:
notification = json.loads(event['data'])
print(f"Notification: {notification.get('message', 'No message')}")
except json.JSONDecodeError:
print(f"Invalid notification format")
def _handle_generic_event(self, event):
"""Handle generic events"""
print(f"Event '{event['event']}': {event['data'][:100]}...")
@task(1)
def test_sse_with_parameters(self):
"""Test SSE with query parameters"""
params = {
'channel': 'updates',
'filter': 'important',
'format': 'json'
}
headers = {
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
}
try:
with self.client.get('/api/events/filtered',
params=params,
headers=headers,
stream=True,
name="SSE Filtered Stream") as response:
if response.status_code == 200:
self._process_sse_stream(response)
else:
print(f"Filtered SSE failed: {response.status_code}")
except Exception as e:
print(f"Filtered SSE error: {str(e)}")
@task(1)
def test_sse_reconnection(self):
"""Test SSE reconnection behavior"""
headers = {
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
}
# Simulate reconnection with Last-Event-ID
if self.last_event_id:
headers['Last-Event-ID'] = self.last_event_id
response = self.client.get('/api/events/stream',
headers=headers,
name="SSE Reconnection Test")
if response.status_code == 200:
print(f"SSE reconnection successful with ID: {self.last_event_id}")
else:
print(f"SSE reconnection failed: {response.status_code}")
def on_stop(self):
"""Report SSE testing statistics"""
print(f"\n=== SSE Testing Complete ===")
print(f"Events received: {self.events_received}")
print(f"Connection errors: {self.connection_errors}")
print(f"Last event ID: {self.last_event_id}")
Advanced SSE Testing
from locust import HttpUser, task, between
import time
import json
class AdvancedSSEUser(HttpUser):
wait_time = between(2, 5)
@task(2)
def test_multiple_sse_channels(self):
"""Test multiple SSE channels simultaneously"""
channels = ['notifications', 'updates', 'alerts']
for channel in channels:
headers = {
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
}
response = self.client.get(f'/api/events/{channel}',
headers=headers,
stream=True,
name=f"SSE Channel: {channel}")
if response.status_code == 200:
# Process a few events from each channel
self._process_limited_stream(response, max_events=2)
def _process_limited_stream(self, response, max_events=3):
"""Process limited number of events from stream"""
buffer = ""
events_count = 0
try:
for chunk in response.iter_content(chunk_size=512, decode_unicode=True):
if chunk:
buffer += chunk
while '\n\n' in buffer and events_count < max_events:
event_data, buffer = buffer.split('\n\n', 1)
if event_data.strip():
events_count += 1
print(f"Received event {events_count}")
if events_count >= max_events:
break
except Exception as e:
print(f"Stream processing error: {str(e)}")
@task(1)
def test_sse_error_handling(self):
"""Test SSE error scenarios"""
# Test invalid endpoint
response = self.client.get('/api/events/nonexistent',
headers={'Accept': 'text/event-stream'},
name="SSE Invalid Endpoint")
if response.status_code == 404:
print("SSE 404 handling works correctly")
# Test without proper headers
response = self.client.get('/api/events/stream',
name="SSE Without Headers")
if response.status_code != 200:
print(f"SSE requires proper headers: {response.status_code}")
Key Features
- Manual SSE Parsing: No external dependencies, uses only built-in Python
- Event Type Handling: Supports different SSE event types
- Reconnection Support: Implements Last-Event-ID for reconnection
- Error Handling: Robust error handling for connection issues
- Load Testing Optimized: Limits events per connection for performance
- Multiple Channels: Support for testing different SSE channels
This guide provides comprehensive SSE testing using only LoadForge-supported packages with manual event parsing for reliable streaming endpoint validation.