Server-Sent Events (SSE) Testing

Test Server-Sent Events streams without external dependencies using manual parsing

LoadForge can record your browser, graphically build tests, scan your site with a wizard and more. Sign up now to run your first test.

Sign up now


This guide demonstrates how to test Server-Sent Events (SSE) endpoints with LoadForge using manual SSE parsing without external dependencies.

Use Cases

  • Testing SSE event streams and real-time updates
  • Validating event data format and structure
  • Load testing streaming endpoints
  • Testing SSE reconnection and error handling
  • Monitoring event delivery performance

Complete SSE Testing

from locust import HttpUser, task, between
import time
import json
import re

class SSEUser(HttpUser):
    wait_time = between(1, 3)
    
    def on_start(self):
        """Initialize SSE testing"""
        self.events_received = 0
        self.connection_errors = 0
        self.last_event_id = None
        
    @task(3)
    def test_sse_stream(self):
        """Test SSE stream with manual parsing"""
        headers = {
            'Accept': 'text/event-stream',
            'Cache-Control': 'no-cache',
        }
        
        # Add Last-Event-ID header if we have one
        if self.last_event_id:
            headers['Last-Event-ID'] = self.last_event_id
        
        try:
            with self.client.get('/api/events/stream',
                               headers=headers,
                               stream=True,
                               name="SSE Stream Connection") as response:
                
                if response.status_code == 200:
                    self._process_sse_stream(response)
                else:
                    print(f"SSE connection failed: {response.status_code}")
                    self.connection_errors += 1
                    
        except Exception as e:
            print(f"SSE stream error: {str(e)}")
            self.connection_errors += 1

    def _process_sse_stream(self, response):
        """Process SSE stream manually"""
        buffer = ""
        events_processed = 0
        max_events = 5  # Limit events per connection for load testing
        
        try:
            for chunk in response.iter_content(chunk_size=1024, decode_unicode=True):
                if chunk:
                    buffer += chunk
                    
                    # Process complete events
                    while '\n\n' in buffer:
                        event_data, buffer = buffer.split('\n\n', 1)
                        event = self._parse_sse_event(event_data)
                        
                        if event:
                            self._handle_sse_event(event)
                            events_processed += 1
                            
                            # Limit events for load testing
                            if events_processed >= max_events:
                                return
                                
        except Exception as e:
            print(f"Error processing SSE stream: {str(e)}")

    def _parse_sse_event(self, event_data):
        """Parse SSE event from raw data"""
        event = {
            'event': None,
            'data': '',
            'id': None,
            'retry': None
        }
        
        lines = event_data.strip().split('\n')
        
        for line in lines:
            line = line.strip()
            if not line or line.startswith(':'):
                continue  # Skip empty lines and comments
                
            if ':' in line:
                field, value = line.split(':', 1)
                field = field.strip()
                value = value.strip()
                
                if field == 'data':
                    if event['data']:
                        event['data'] += '\n' + value
                    else:
                        event['data'] = value
                elif field in ['event', 'id', 'retry']:
                    event[field] = value
        
        return event if event['data'] or event['event'] else None

    def _handle_sse_event(self, event):
        """Handle parsed SSE event"""
        self.events_received += 1
        
        # Store last event ID for reconnection
        if event['id']:
            self.last_event_id = event['id']
        
        # Process different event types
        event_type = event['event'] or 'message'
        
        if event_type == 'heartbeat':
            self._handle_heartbeat_event(event)
        elif event_type == 'data':
            self._handle_data_event(event)
        elif event_type == 'notification':
            self._handle_notification_event(event)
        else:
            self._handle_generic_event(event)

    def _handle_heartbeat_event(self, event):
        """Handle heartbeat events"""
        print(f"Heartbeat received: {event['data']}")

    def _handle_data_event(self, event):
        """Handle data events"""
        try:
            data = json.loads(event['data'])
            print(f"Data event: {data.get('type', 'unknown')} - {len(event['data'])} bytes")
        except json.JSONDecodeError:
            print(f"Non-JSON data event: {len(event['data'])} bytes")

    def _handle_notification_event(self, event):
        """Handle notification events"""
        try:
            notification = json.loads(event['data'])
            print(f"Notification: {notification.get('message', 'No message')}")
        except json.JSONDecodeError:
            print(f"Invalid notification format")

    def _handle_generic_event(self, event):
        """Handle generic events"""
        print(f"Event '{event['event']}': {event['data'][:100]}...")

    @task(1)
    def test_sse_with_parameters(self):
        """Test SSE with query parameters"""
        params = {
            'channel': 'updates',
            'filter': 'important',
            'format': 'json'
        }
        
        headers = {
            'Accept': 'text/event-stream',
            'Cache-Control': 'no-cache',
        }
        
        try:
            with self.client.get('/api/events/filtered',
                               params=params,
                               headers=headers,
                               stream=True,
                               name="SSE Filtered Stream") as response:
                
                if response.status_code == 200:
                    self._process_sse_stream(response)
                else:
                    print(f"Filtered SSE failed: {response.status_code}")
                    
        except Exception as e:
            print(f"Filtered SSE error: {str(e)}")

    @task(1)
    def test_sse_reconnection(self):
        """Test SSE reconnection behavior"""
        headers = {
            'Accept': 'text/event-stream',
            'Cache-Control': 'no-cache',
        }
        
        # Simulate reconnection with Last-Event-ID
        if self.last_event_id:
            headers['Last-Event-ID'] = self.last_event_id
        
        response = self.client.get('/api/events/stream',
                                 headers=headers,
                                 name="SSE Reconnection Test")
        
        if response.status_code == 200:
            print(f"SSE reconnection successful with ID: {self.last_event_id}")
        else:
            print(f"SSE reconnection failed: {response.status_code}")

    def on_stop(self):
        """Report SSE testing statistics"""
        print(f"\n=== SSE Testing Complete ===")
        print(f"Events received: {self.events_received}")
        print(f"Connection errors: {self.connection_errors}")
        print(f"Last event ID: {self.last_event_id}")

Advanced SSE Testing

from locust import HttpUser, task, between
import time
import json

class AdvancedSSEUser(HttpUser):
    wait_time = between(2, 5)
    
    @task(2)
    def test_multiple_sse_channels(self):
        """Test multiple SSE channels simultaneously"""
        channels = ['notifications', 'updates', 'alerts']
        
        for channel in channels:
            headers = {
                'Accept': 'text/event-stream',
                'Cache-Control': 'no-cache',
            }
            
            response = self.client.get(f'/api/events/{channel}',
                                     headers=headers,
                                     stream=True,
                                     name=f"SSE Channel: {channel}")
            
            if response.status_code == 200:
                # Process a few events from each channel
                self._process_limited_stream(response, max_events=2)

    def _process_limited_stream(self, response, max_events=3):
        """Process limited number of events from stream"""
        buffer = ""
        events_count = 0
        
        try:
            for chunk in response.iter_content(chunk_size=512, decode_unicode=True):
                if chunk:
                    buffer += chunk
                    
                    while '\n\n' in buffer and events_count < max_events:
                        event_data, buffer = buffer.split('\n\n', 1)
                        if event_data.strip():
                            events_count += 1
                            print(f"Received event {events_count}")
                            
                    if events_count >= max_events:
                        break
                        
        except Exception as e:
            print(f"Stream processing error: {str(e)}")

    @task(1)
    def test_sse_error_handling(self):
        """Test SSE error scenarios"""
        # Test invalid endpoint
        response = self.client.get('/api/events/nonexistent',
                                 headers={'Accept': 'text/event-stream'},
                                 name="SSE Invalid Endpoint")
        
        if response.status_code == 404:
            print("SSE 404 handling works correctly")
        
        # Test without proper headers
        response = self.client.get('/api/events/stream',
                                 name="SSE Without Headers")
        
        if response.status_code != 200:
            print(f"SSE requires proper headers: {response.status_code}")

Key Features

  1. Manual SSE Parsing: No external dependencies, uses only built-in Python
  2. Event Type Handling: Supports different SSE event types
  3. Reconnection Support: Implements Last-Event-ID for reconnection
  4. Error Handling: Robust error handling for connection issues
  5. Load Testing Optimized: Limits events per connection for performance
  6. Multiple Channels: Support for testing different SSE channels

This guide provides comprehensive SSE testing using only LoadForge-supported packages with manual event parsing for reliable streaming endpoint validation.

Ready to run your test?
Launch your locust test at scale.