This guide demonstrates how to test Server-Sent Events (SSE) endpoints with LoadForge, including automatic reconnection logic and proper event parsing.
Use Cases
- Testing real-time data streams (stock prices, chat messages, notifications)
- Validating SSE endpoint reliability and reconnection behavior
- Load testing streaming APIs with persistent connections
- Testing event-driven architectures
Basic SSE Testing
from locust import HttpUser, task, between
import time
import json
import re
class SSEUser(HttpUser):
wait_time = between(1, 3)
def on_start(self):
"""Initialize SSE connection tracking"""
self.connection_id = None
self.last_event_id = None
self.events_received = 0
@task(3)
def test_sse_connection(self):
"""Test basic SSE connection and event reception"""
headers = {
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
}
# Add Last-Event-ID for reconnection
if self.last_event_id:
headers['Last-Event-ID'] = self.last_event_id
with self.client.get('/api/events/stream',
headers=headers,
stream=True,
catch_response=True) as response:
if response.status_code != 200:
response.failure(f"SSE connection failed: {response.status_code}")
return
# Process SSE stream for limited time
start_time = time.time()
timeout = 10 # 10 seconds max per connection
try:
for line in response.iter_lines(decode_unicode=True):
if time.time() - start_time > timeout:
break
if line:
self._process_sse_line(line)
if self.events_received > 0:
response.success()
else:
response.failure("No events received")
except Exception as e:
response.failure(f"SSE processing error: {str(e)}")
def _process_sse_line(self, line):
"""Process individual SSE lines"""
if line.startswith('data: '):
# Extract event data
data = line[6:] # Remove 'data: ' prefix
try:
event_data = json.loads(data)
self.events_received += 1
# Track event ID for reconnection
if 'id' in event_data:
self.last_event_id = event_data['id']
except json.JSONDecodeError:
# Handle plain text events
self.events_received += 1
elif line.startswith('id: '):
# Track event ID
self.last_event_id = line[4:]
elif line.startswith('event: '):
# Handle named events
event_type = line[7:]
@task(2)
def test_sse_with_filters(self):
"""Test SSE with query parameters and filters"""
params = {
'channel': 'notifications',
'user_id': f'user_{self.client.base_url.split("//")[1].replace(".", "_")}',
'types': 'message,alert,update'
}
headers = {
'Accept': 'text/event-stream',
'Authorization': 'Bearer test-token-123'
}
with self.client.get('/api/events/filtered',
params=params,
headers=headers,
stream=True,
catch_response=True) as response:
if response.status_code == 200:
# Process a few events
event_count = 0
for line in response.iter_lines(decode_unicode=True):
if line and line.startswith('data: '):
event_count += 1
if event_count >= 3: # Process 3 events then disconnect
break
response.success()
else:
response.failure(f"Filtered SSE failed: {response.status_code}")
@task(1)
def test_sse_reconnection(self):
"""Test SSE reconnection after connection drop"""
headers = {
'Accept': 'text/event-stream',
'Connection': 'keep-alive'
}
# First connection
with self.client.get('/api/events/stream',
headers=headers,
stream=True,
catch_response=True) as response:
if response.status_code != 200:
response.failure("Initial SSE connection failed")
return
# Simulate connection drop after receiving some events
event_count = 0
for line in response.iter_lines(decode_unicode=True):
if line and line.startswith('data: '):
event_count += 1
if event_count >= 2:
break # Simulate connection drop
# Wait before reconnection
time.sleep(1)
# Reconnection attempt
reconnect_headers = headers.copy()
if self.last_event_id:
reconnect_headers['Last-Event-ID'] = self.last_event_id
with self.client.get('/api/events/stream',
headers=reconnect_headers,
stream=True,
catch_response=True) as response:
if response.status_code == 200:
response.success()
else:
response.failure(f"SSE reconnection failed: {response.status_code}")
Advanced SSE Testing
from locust import HttpUser, task, between
import time
import json
import threading
from queue import Queue
class AdvancedSSEUser(HttpUser):
wait_time = between(2, 5)
def on_start(self):
"""Initialize advanced SSE tracking"""
self.event_queue = Queue()
self.connection_stats = {
'connections': 0,
'reconnections': 0,
'events_received': 0,
'connection_errors': 0
}
@task(2)
def test_multiple_sse_channels(self):
"""Test multiple SSE channels simultaneously"""
channels = ['notifications', 'updates', 'alerts']
for channel in channels:
self._test_channel(channel)
time.sleep(0.5) # Stagger connections
def _test_channel(self, channel):
"""Test individual SSE channel"""
headers = {
'Accept': 'text/event-stream',
'X-Channel': channel
}
with self.client.get(f'/api/events/{channel}',
headers=headers,
stream=True,
catch_response=True) as response:
if response.status_code == 200:
self.connection_stats['connections'] += 1
# Process events for short duration
start_time = time.time()
while time.time() - start_time < 5:
try:
for line in response.iter_lines(decode_unicode=True, chunk_size=1):
if line and line.startswith('data: '):
self.connection_stats['events_received'] += 1
break
except:
break
response.success()
else:
self.connection_stats['connection_errors'] += 1
response.failure(f"Channel {channel} failed: {response.status_code}")
@task(1)
def test_sse_heartbeat(self):
"""Test SSE heartbeat and keep-alive"""
headers = {
'Accept': 'text/event-stream',
'X-Heartbeat': 'true'
}
with self.client.get('/api/events/heartbeat',
headers=headers,
stream=True,
catch_response=True) as response:
if response.status_code != 200:
response.failure("Heartbeat SSE connection failed")
return
heartbeat_count = 0
start_time = time.time()
for line in response.iter_lines(decode_unicode=True):
if time.time() - start_time > 15: # 15 second test
break
if line and 'heartbeat' in line.lower():
heartbeat_count += 1
if heartbeat_count > 0:
response.success()
else:
response.failure("No heartbeat events received")
def on_stop(self):
"""Log connection statistics"""
print(f"SSE Stats: {self.connection_stats}")
Key Testing Points
- Connection Establishment: Verify SSE endpoints accept proper headers
- Event Processing: Parse and validate event data format
- Reconnection Logic: Test automatic reconnection with Last-Event-ID
- Multiple Channels: Test concurrent SSE connections
- Error Handling: Validate behavior on connection failures
- Performance: Monitor connection overhead and event throughput
Common SSE Patterns
- Real-time Notifications: User-specific event streams
- Live Data Feeds: Stock prices, sports scores, sensor data
- Chat Applications: Message broadcasting and presence
- Progress Updates: Long-running task status updates
- System Monitoring: Real-time metrics and alerts
This guide provides comprehensive SSE testing patterns for real-time applications with proper connection management and event handling.