Server-Sent Events Testing

Test SSE endpoints with automatic reconnection and event handling

LoadForge can record your browser, graphically build tests, scan your site with a wizard and more. Sign up now to run your first test.

Sign up now


This guide demonstrates how to test Server-Sent Events (SSE) endpoints with LoadForge, including automatic reconnection logic and proper event parsing.

Use Cases

  • Testing real-time data streams (stock prices, chat messages, notifications)
  • Validating SSE endpoint reliability and reconnection behavior
  • Load testing streaming APIs with persistent connections
  • Testing event-driven architectures

Basic SSE Testing

from locust import HttpUser, task, between
import time
import json
import re

class SSEUser(HttpUser):
    wait_time = between(1, 3)
    
    def on_start(self):
        """Initialize SSE connection tracking"""
        self.connection_id = None
        self.last_event_id = None
        self.events_received = 0
        
    @task(3)
    def test_sse_connection(self):
        """Test basic SSE connection and event reception"""
        headers = {
            'Accept': 'text/event-stream',
            'Cache-Control': 'no-cache',
        }
        
        # Add Last-Event-ID for reconnection
        if self.last_event_id:
            headers['Last-Event-ID'] = self.last_event_id
            
        with self.client.get('/api/events/stream', 
                           headers=headers, 
                           stream=True,
                           catch_response=True) as response:
            
            if response.status_code != 200:
                response.failure(f"SSE connection failed: {response.status_code}")
                return
                
            # Process SSE stream for limited time
            start_time = time.time()
            timeout = 10  # 10 seconds max per connection
            
            try:
                for line in response.iter_lines(decode_unicode=True):
                    if time.time() - start_time > timeout:
                        break
                        
                    if line:
                        self._process_sse_line(line)
                        
                if self.events_received > 0:
                    response.success()
                else:
                    response.failure("No events received")
                    
            except Exception as e:
                response.failure(f"SSE processing error: {str(e)}")
    
    def _process_sse_line(self, line):
        """Process individual SSE lines"""
        if line.startswith('data: '):
            # Extract event data
            data = line[6:]  # Remove 'data: ' prefix
            try:
                event_data = json.loads(data)
                self.events_received += 1
                
                # Track event ID for reconnection
                if 'id' in event_data:
                    self.last_event_id = event_data['id']
                    
            except json.JSONDecodeError:
                # Handle plain text events
                self.events_received += 1
                
        elif line.startswith('id: '):
            # Track event ID
            self.last_event_id = line[4:]
            
        elif line.startswith('event: '):
            # Handle named events
            event_type = line[7:]
            
    @task(2)
    def test_sse_with_filters(self):
        """Test SSE with query parameters and filters"""
        params = {
            'channel': 'notifications',
            'user_id': f'user_{self.client.base_url.split("//")[1].replace(".", "_")}',
            'types': 'message,alert,update'
        }
        
        headers = {
            'Accept': 'text/event-stream',
            'Authorization': 'Bearer test-token-123'
        }
        
        with self.client.get('/api/events/filtered', 
                           params=params,
                           headers=headers,
                           stream=True,
                           catch_response=True) as response:
            
            if response.status_code == 200:
                # Process a few events
                event_count = 0
                for line in response.iter_lines(decode_unicode=True):
                    if line and line.startswith('data: '):
                        event_count += 1
                        if event_count >= 3:  # Process 3 events then disconnect
                            break
                            
                response.success()
            else:
                response.failure(f"Filtered SSE failed: {response.status_code}")

    @task(1)
    def test_sse_reconnection(self):
        """Test SSE reconnection after connection drop"""
        headers = {
            'Accept': 'text/event-stream',
            'Connection': 'keep-alive'
        }
        
        # First connection
        with self.client.get('/api/events/stream',
                           headers=headers,
                           stream=True,
                           catch_response=True) as response:
            
            if response.status_code != 200:
                response.failure("Initial SSE connection failed")
                return
                
            # Simulate connection drop after receiving some events
            event_count = 0
            for line in response.iter_lines(decode_unicode=True):
                if line and line.startswith('data: '):
                    event_count += 1
                    if event_count >= 2:
                        break  # Simulate connection drop
                        
        # Wait before reconnection
        time.sleep(1)
        
        # Reconnection attempt
        reconnect_headers = headers.copy()
        if self.last_event_id:
            reconnect_headers['Last-Event-ID'] = self.last_event_id
            
        with self.client.get('/api/events/stream',
                           headers=reconnect_headers,
                           stream=True,
                           catch_response=True) as response:
            
            if response.status_code == 200:
                response.success()
            else:
                response.failure(f"SSE reconnection failed: {response.status_code}")

Advanced SSE Testing

from locust import HttpUser, task, between
import time
import json
import threading
from queue import Queue

class AdvancedSSEUser(HttpUser):
    wait_time = between(2, 5)
    
    def on_start(self):
        """Initialize advanced SSE tracking"""
        self.event_queue = Queue()
        self.connection_stats = {
            'connections': 0,
            'reconnections': 0,
            'events_received': 0,
            'connection_errors': 0
        }
        
    @task(2)
    def test_multiple_sse_channels(self):
        """Test multiple SSE channels simultaneously"""
        channels = ['notifications', 'updates', 'alerts']
        
        for channel in channels:
            self._test_channel(channel)
            time.sleep(0.5)  # Stagger connections
            
    def _test_channel(self, channel):
        """Test individual SSE channel"""
        headers = {
            'Accept': 'text/event-stream',
            'X-Channel': channel
        }
        
        with self.client.get(f'/api/events/{channel}',
                           headers=headers,
                           stream=True,
                           catch_response=True) as response:
            
            if response.status_code == 200:
                self.connection_stats['connections'] += 1
                
                # Process events for short duration
                start_time = time.time()
                while time.time() - start_time < 5:
                    try:
                        for line in response.iter_lines(decode_unicode=True, chunk_size=1):
                            if line and line.startswith('data: '):
                                self.connection_stats['events_received'] += 1
                                break
                    except:
                        break
                        
                response.success()
            else:
                self.connection_stats['connection_errors'] += 1
                response.failure(f"Channel {channel} failed: {response.status_code}")

    @task(1)
    def test_sse_heartbeat(self):
        """Test SSE heartbeat and keep-alive"""
        headers = {
            'Accept': 'text/event-stream',
            'X-Heartbeat': 'true'
        }
        
        with self.client.get('/api/events/heartbeat',
                           headers=headers,
                           stream=True,
                           catch_response=True) as response:
            
            if response.status_code != 200:
                response.failure("Heartbeat SSE connection failed")
                return
                
            heartbeat_count = 0
            start_time = time.time()
            
            for line in response.iter_lines(decode_unicode=True):
                if time.time() - start_time > 15:  # 15 second test
                    break
                    
                if line and 'heartbeat' in line.lower():
                    heartbeat_count += 1
                    
            if heartbeat_count > 0:
                response.success()
            else:
                response.failure("No heartbeat events received")

    def on_stop(self):
        """Log connection statistics"""
        print(f"SSE Stats: {self.connection_stats}")

Key Testing Points

  1. Connection Establishment: Verify SSE endpoints accept proper headers
  2. Event Processing: Parse and validate event data format
  3. Reconnection Logic: Test automatic reconnection with Last-Event-ID
  4. Multiple Channels: Test concurrent SSE connections
  5. Error Handling: Validate behavior on connection failures
  6. Performance: Monitor connection overhead and event throughput

Common SSE Patterns

  • Real-time Notifications: User-specific event streams
  • Live Data Feeds: Stock prices, sports scores, sensor data
  • Chat Applications: Message broadcasting and presence
  • Progress Updates: Long-running task status updates
  • System Monitoring: Real-time metrics and alerts

This guide provides comprehensive SSE testing patterns for real-time applications with proper connection management and event handling. 

Ready to run your test?
Launch your locust test at scale.