LoadForge LogoLoadForge

Load Testing WebSocket Applications with LoadForge

Load Testing WebSocket Applications with LoadForge

Introduction

WebSocket applications power some of the most demanding real-time systems on the web: chat platforms, trading dashboards, multiplayer games, collaborative editors, live notifications, IoT telemetry streams, and support tooling. Unlike traditional request/response APIs, WebSockets keep long-lived connections open and allow bidirectional communication between clients and servers. That changes how you approach load testing, performance testing, and stress testing.

If you only test your HTTP endpoints, you may miss the real bottlenecks in your WebSocket stack: connection establishment, authentication handshakes, message fan-out, subscription management, heartbeat traffic, backpressure, and connection stability under scale. A system that looks healthy at 100 users making REST calls can fail quickly when 20,000 clients maintain persistent WebSocket sessions and exchange messages every few seconds.

In this guide, you’ll learn how to load test WebSocket applications with LoadForge using Locust-based Python scripts. We’ll cover connection scale, message throughput, authentication flows, subscriptions, and realistic real-time usage patterns. You’ll also see how LoadForge helps with distributed testing, real-time reporting, cloud-based infrastructure, CI/CD integration, and global test locations when validating WebSocket performance from multiple regions.

Prerequisites

Before you start load testing a WebSocket application with LoadForge, make sure you have:

  • A WebSocket endpoint, such as:
    • wss://api.example.com/ws/chat
    • wss://stream.example.com/v1/realtime
    • wss://app.example.com/socket
  • A clear understanding of your protocol:
    • Raw JSON messages
    • STOMP over WebSocket
    • Socket.IO-style semantics
    • GraphQL subscriptions over WebSocket
  • Test credentials or a token generation flow
  • Expected usage patterns:
    • Number of concurrent connections
    • Message frequency per user
    • Subscription count per connection
    • Expected server push volume
  • Success criteria, such as:
    • Connection success rate above 99.5%
    • Median subscription acknowledgment under 200 ms
    • Message delivery latency under 500 ms at 10,000 concurrent users
    • Stable memory and CPU during a 30-minute soak test

You should also know whether your application requires:

  • JWT authentication in query parameters or headers
  • Session cookies from a prior login request
  • Periodic ping/pong heartbeats
  • Channel or topic subscriptions after connect
  • Reconnection behavior on disconnect

Because LoadForge uses Locust, your scripts can combine standard HTTP login flows with custom WebSocket behavior in Python. That makes it practical to simulate realistic end-to-end user journeys.

Understanding WebSocket Under Load

WebSocket systems behave differently from stateless HTTP APIs under load. Instead of short-lived requests, each connected user consumes ongoing server resources:

  • Open TCP socket
  • Memory for session state
  • Authentication context
  • Subscription tracking
  • Queued outbound messages
  • Heartbeat handling

This means your bottlenecks often shift from request throughput to connection lifecycle management and message distribution.

Common WebSocket bottlenecks

Connection establishment

A sudden spike in users connecting at once can overwhelm:

  • TLS termination
  • Load balancers
  • Authentication services
  • Session stores
  • WebSocket upgrade handling

If your app performs token validation or database lookups during the handshake, connection latency can rise sharply.

Message fan-out

Many systems send one event to thousands of subscribers. For example:

  • A stock price update to all users watching NASDAQ:AAPL
  • A chat message to everyone in a room
  • A live sports score update to all connected clients

Fan-out can become CPU- and memory-intensive, especially if serialization happens per recipient.

Backpressure and slow consumers

Some clients consume messages slowly. If your server buffers too much per connection, memory usage can grow rapidly under load.

Heartbeats and idle traffic

Even “idle” WebSocket users often send ping frames or app-level heartbeat messages. At scale, heartbeat traffic becomes significant.

Broadcast storms and subscription churn

Applications with frequent subscribe/unsubscribe activity can stress routing layers, Redis pub/sub backplanes, Kafka consumers, or in-memory channel maps.

A proper WebSocket load test should validate more than “can I connect?” It should measure:

  • Concurrent connection capacity
  • Connect and authenticate latency
  • Subscription acknowledgment time
  • Message send throughput
  • Server push latency
  • Disconnect and reconnect behavior
  • Stability over time

Writing Your First Load Test

Let’s start with a realistic but simple WebSocket load test. In this example, users authenticate with a bearer token in the query string, connect to a chat endpoint, subscribe to a room, send periodic messages, and track response timing.

This pattern is common in chat, support, and collaboration applications.

Basic WebSocket chat load test

python
from locust import User, task, between, events
import json
import random
import string
import time
from websocket import create_connection, WebSocketTimeoutException
 
class WebSocketChatUser(User):
    wait_time = between(2, 5)
 
    def on_start(self):
        self.user_id = f"user-{random.randint(10000, 99999)}"
        self.room_id = "support-room-42"
        self.token = f"demo-jwt-token-{self.user_id}"
        self.ws = None
        self.connect()
 
    def connect(self):
        start_time = time.time()
        try:
            self.ws = create_connection(
                f"wss://api.example.com/ws/chat?token={self.token}",
                timeout=5
            )
 
            connect_time = int((time.time() - start_time) * 1000)
            events.request.fire(
                request_type="WS",
                name="connect /ws/chat",
                response_time=connect_time,
                response_length=0,
                exception=None
            )
 
            subscribe_payload = {
                "action": "subscribe",
                "channel": f"room:{self.room_id}",
                "userId": self.user_id
            }
 
            sub_start = time.time()
            self.ws.send(json.dumps(subscribe_payload))
            response = self.ws.recv()
            sub_time = int((time.time() - sub_start) * 1000)
 
            data = json.loads(response)
            if data.get("type") == "subscription_confirmed":
                events.request.fire(
                    request_type="WS",
                    name="subscribe room",
                    response_time=sub_time,
                    response_length=len(response),
                    exception=None
                )
            else:
                raise Exception(f"Unexpected subscribe response: {response}")
 
        except Exception as e:
            events.request.fire(
                request_type="WS",
                name="connect /ws/chat",
                response_time=int((time.time() - start_time) * 1000),
                response_length=0,
                exception=e
            )
 
    @task
    def send_chat_message(self):
        if not self.ws:
            return
 
        message = {
            "action": "publish",
            "channel": f"room:{self.room_id}",
            "message": {
                "id": ''.join(random.choices(string.ascii_lowercase + string.digits, k=12)),
                "senderId": self.user_id,
                "text": random.choice([
                    "Can someone help me with my order?",
                    "I need an update on ticket #5821",
                    "Is the checkout issue resolved?",
                    "Thanks for the quick response!"
                ]),
                "sentAt": int(time.time() * 1000)
            }
        }
 
        start_time = time.time()
        try:
            self.ws.send(json.dumps(message))
            response = self.ws.recv()
            elapsed = int((time.time() - start_time) * 1000)
 
            data = json.loads(response)
            if data.get("type") in ["message_ack", "message_delivered"]:
                events.request.fire(
                    request_type="WS",
                    name="publish chat message",
                    response_time=elapsed,
                    response_length=len(response),
                    exception=None
                )
            else:
                raise Exception(f"Unexpected message response: {response}")
 
        except WebSocketTimeoutException as e:
            events.request.fire(
                request_type="WS",
                name="publish chat message",
                response_time=int((time.time() - start_time) * 1000),
                response_length=0,
                exception=e
            )
        except Exception as e:
            events.request.fire(
                request_type="WS",
                name="publish chat message",
                response_time=int((time.time() - start_time) * 1000),
                response_length=0,
                exception=e
            )
 
    def on_stop(self):
        if self.ws:
            try:
                self.ws.close()
            except:
                pass

What this test covers

This first script simulates a common WebSocket workflow:

  1. Open a WebSocket connection to /ws/chat
  2. Authenticate using a token
  3. Subscribe to a room
  4. Send chat messages
  5. Measure connection and message performance

This is a good starting point for performance testing because it validates:

  • WebSocket upgrade success
  • Authentication overhead
  • Subscription handling
  • Message acknowledgment latency

In LoadForge, these custom events.request.fire() calls appear in your metrics just like HTTP requests, making it easy to track response times and failures in real time.

Advanced Load Testing Scenarios

Basic connection tests are useful, but most production WebSocket systems have more complex behavior. Let’s look at realistic advanced scenarios developers often need to validate.

Scenario 1: Authenticate via HTTP, then connect to a real-time notifications stream

Many applications don’t pass tokens directly in the WebSocket URL. Instead, users log in via REST, receive a JWT, and then connect to a notifications stream.

This example simulates a user logging in, opening a WebSocket to /v1/realtime/notifications, subscribing to account and order events, and handling heartbeat traffic.

python
from locust import HttpUser, task, between, events
import json
import random
import time
from websocket import create_connection
 
class RealtimeNotificationsUser(HttpUser):
    wait_time = between(5, 10)
 
    def on_start(self):
        self.account_id = random.choice(["acct_102938", "acct_847261", "acct_564738"])
        self.ws = None
        self.access_token = self.login()
        if self.access_token:
            self.connect_and_subscribe()
 
    def login(self):
        payload = {
            "email": f"loadtest+{random.randint(1000,9999)}@example.com",
            "password": "TestPassword123!"
        }
 
        with self.client.post("/api/v1/auth/login", json=payload, catch_response=True, name="POST /api/v1/auth/login") as response:
            if response.status_code == 200:
                token = response.json().get("access_token")
                if token:
                    response.success()
                    return token
                response.failure("Missing access token in login response")
            else:
                response.failure(f"Login failed: {response.status_code}")
        return None
 
    def connect_and_subscribe(self):
        connect_start = time.time()
        try:
            self.ws = create_connection(
                f"wss://stream.example.com/v1/realtime/notifications",
                header=[
                    f"Authorization: Bearer {self.access_token}",
                    "X-Client-Version: web-2.14.0"
                ],
                timeout=5
            )
 
            events.request.fire(
                request_type="WS",
                name="connect /v1/realtime/notifications",
                response_time=int((time.time() - connect_start) * 1000),
                response_length=0,
                exception=None
            )
 
            subscribe_message = {
                "type": "subscribe",
                "topics": [
                    f"account.{self.account_id}.notifications",
                    f"account.{self.account_id}.orders",
                    f"account.{self.account_id}.billing"
                ]
            }
 
            sub_start = time.time()
            self.ws.send(json.dumps(subscribe_message))
            response = self.ws.recv()
            elapsed = int((time.time() - sub_start) * 1000)
 
            data = json.loads(response)
            if data.get("type") == "subscribed":
                events.request.fire(
                    request_type="WS",
                    name="subscribe notifications topics",
                    response_time=elapsed,
                    response_length=len(response),
                    exception=None
                )
            else:
                raise Exception(f"Subscription failed: {response}")
 
        except Exception as e:
            events.request.fire(
                request_type="WS",
                name="connect /v1/realtime/notifications",
                response_time=int((time.time() - connect_start) * 1000),
                response_length=0,
                exception=e
            )
 
    @task(3)
    def send_heartbeat(self):
        if not self.ws:
            return
 
        payload = {
            "type": "ping",
            "ts": int(time.time() * 1000)
        }
 
        start = time.time()
        try:
            self.ws.send(json.dumps(payload))
            response = self.ws.recv()
            elapsed = int((time.time() - start) * 1000)
 
            data = json.loads(response)
            if data.get("type") == "pong":
                events.request.fire(
                    request_type="WS",
                    name="heartbeat ping/pong",
                    response_time=elapsed,
                    response_length=len(response),
                    exception=None
                )
            else:
                raise Exception(f"Unexpected heartbeat response: {response}")
        except Exception as e:
            events.request.fire(
                request_type="WS",
                name="heartbeat ping/pong",
                response_time=int((time.time() - start) * 1000),
                response_length=0,
                exception=e
            )
 
    @task(1)
    def wait_for_server_event(self):
        if not self.ws:
            return
 
        start = time.time()
        try:
            self.ws.settimeout(10)
            response = self.ws.recv()
            elapsed = int((time.time() - start) * 1000)
 
            data = json.loads(response)
            if data.get("type") in ["notification", "order_update", "invoice_ready"]:
                events.request.fire(
                    request_type="WS",
                    name="receive server event",
                    response_time=elapsed,
                    response_length=len(response),
                    exception=None
                )
            else:
                events.request.fire(
                    request_type="WS",
                    name="receive server event",
                    response_time=elapsed,
                    response_length=len(response),
                    exception=Exception(f"Unexpected event: {response}")
                )
        except Exception as e:
            events.request.fire(
                request_type="WS",
                name="receive server event",
                response_time=int((time.time() - start) * 1000),
                response_length=0,
                exception=e
            )
 
    def on_stop(self):
        if self.ws:
            self.ws.close()

This is a strong example of real-world WebSocket load testing because it combines:

  • HTTP authentication
  • Authenticated WebSocket connection
  • Multi-topic subscriptions
  • Heartbeat validation
  • Passive server event consumption

Scenario 2: Load test a market data stream with high-frequency subscriptions

Real-time dashboards and trading systems are especially sensitive to message throughput and connection stability. In this scenario, users subscribe to several symbols and process quote updates.

python
from locust import User, task, between, events
import json
import random
import time
from websocket import create_connection
 
class MarketDataUser(User):
    wait_time = between(1, 2)
 
    SYMBOLS = [
        "NASDAQ:AAPL", "NASDAQ:MSFT", "NASDAQ:NVDA",
        "NYSE:TSLA", "NYSE:IBM", "NASDAQ:AMZN"
    ]
 
    def on_start(self):
        self.client_id = f"terminal-{random.randint(100000, 999999)}"
        self.ws = None
        self.subscribed_symbols = random.sample(self.SYMBOLS, 3)
        self.connect()
 
    def connect(self):
        start = time.time()
        try:
            self.ws = create_connection(
                f"wss://md.example.com/ws/quotes?clientId={self.client_id}&format=json",
                timeout=5
            )
 
            events.request.fire(
                request_type="WS",
                name="connect /ws/quotes",
                response_time=int((time.time() - start) * 1000),
                response_length=0,
                exception=None
            )
 
            subscribe = {
                "op": "subscribe",
                "streams": [f"quotes.{symbol}" for symbol in self.subscribed_symbols],
                "snapshot": True
            }
 
            sub_start = time.time()
            self.ws.send(json.dumps(subscribe))
            response = self.ws.recv()
            elapsed = int((time.time() - sub_start) * 1000)
 
            data = json.loads(response)
            if data.get("op") == "subscribed":
                events.request.fire(
                    request_type="WS",
                    name="subscribe market streams",
                    response_time=elapsed,
                    response_length=len(response),
                    exception=None
                )
            else:
                raise Exception(f"Subscription failed: {response}")
 
        except Exception as e:
            events.request.fire(
                request_type="WS",
                name="connect /ws/quotes",
                response_time=int((time.time() - start) * 1000),
                response_length=0,
                exception=e
            )
 
    @task
    def receive_quotes(self):
        if not self.ws:
            return
 
        start = time.time()
        try:
            self.ws.settimeout(3)
            response = self.ws.recv()
            elapsed = int((time.time() - start) * 1000)
            data = json.loads(response)
 
            if data.get("type") == "quote":
                events.request.fire(
                    request_type="WS",
                    name="receive quote update",
                    response_time=elapsed,
                    response_length=len(response),
                    exception=None
                )
            elif data.get("type") == "snapshot":
                events.request.fire(
                    request_type="WS",
                    name="receive quote snapshot",
                    response_time=elapsed,
                    response_length=len(response),
                    exception=None
                )
            else:
                raise Exception(f"Unexpected market data payload: {response}")
 
        except Exception as e:
            events.request.fire(
                request_type="WS",
                name="receive quote update",
                response_time=int((time.time() - start) * 1000),
                response_length=0,
                exception=e
            )
 
    def on_stop(self):
        if self.ws:
            self.ws.close()

This kind of stress testing is useful for validating:

  • High-frequency server push delivery
  • Subscription routing efficiency
  • Latency under message bursts
  • Stability of long-lived connections

Scenario 3: Test reconnect behavior and session recovery

A WebSocket application can appear healthy until network interruptions occur. Reconnection logic is especially important for mobile apps, dashboards, and collaboration tools.

python
from locust import User, task, between, events
import json
import random
import time
from websocket import create_connection
 
class ReconnectingCollaborationUser(User):
    wait_time = between(3, 6)
 
    def on_start(self):
        self.document_id = random.choice(["doc_5012", "doc_7821", "doc_9944"])
        self.session_token = f"collab-session-{random.randint(100000,999999)}"
        self.ws = None
        self.connect()
 
    def connect(self):
        start = time.time()
        try:
            self.ws = create_connection(
                f"wss://app.example.com/ws/collaboration?session={self.session_token}",
                timeout=5
            )
 
            join_payload = {
                "action": "join_document",
                "documentId": self.document_id,
                "presence": {
                    "cursorColor": random.choice(["blue", "green", "purple"]),
                    "displayName": f"Load User {random.randint(1, 999)}"
                }
            }
 
            self.ws.send(json.dumps(join_payload))
            response = self.ws.recv()
            elapsed = int((time.time() - start) * 1000)
 
            data = json.loads(response)
            if data.get("type") == "document_joined":
                events.request.fire(
                    request_type="WS",
                    name="connect and join document",
                    response_time=elapsed,
                    response_length=len(response),
                    exception=None
                )
            else:
                raise Exception(f"Join failed: {response}")
 
        except Exception as e:
            events.request.fire(
                request_type="WS",
                name="connect and join document",
                response_time=int((time.time() - start) * 1000),
                response_length=0,
                exception=e
            )
 
    @task(4)
    def send_edit_operation(self):
        if not self.ws:
            return
 
        operation = {
            "action": "apply_operation",
            "documentId": self.document_id,
            "operation": {
                "type": "insert_text",
                "position": random.randint(0, 500),
                "text": random.choice(["hello ", "world ", "update ", "test "])
            },
            "clientTimestamp": int(time.time() * 1000)
        }
 
        start = time.time()
        try:
            self.ws.send(json.dumps(operation))
            response = self.ws.recv()
            elapsed = int((time.time() - start) * 1000)
 
            data = json.loads(response)
            if data.get("type") == "operation_applied":
                events.request.fire(
                    request_type="WS",
                    name="apply document operation",
                    response_time=elapsed,
                    response_length=len(response),
                    exception=None
                )
            else:
                raise Exception(f"Operation failed: {response}")
        except Exception as e:
            events.request.fire(
                request_type="WS",
                name="apply document operation",
                response_time=int((time.time() - start) * 1000),
                response_length=0,
                exception=e
            )
 
    @task(1)
    def simulate_reconnect(self):
        if self.ws:
            self.ws.close()
            self.ws = None
 
        reconnect_start = time.time()
        try:
            self.ws = create_connection(
                f"wss://app.example.com/ws/collaboration?session={self.session_token}",
                timeout=5
            )
 
            resume_payload = {
                "action": "resume_session",
                "documentId": self.document_id,
                "lastSequence": random.randint(1000, 5000)
            }
 
            self.ws.send(json.dumps(resume_payload))
            response = self.ws.recv()
            elapsed = int((time.time() - reconnect_start) * 1000)
 
            data = json.loads(response)
            if data.get("type") == "session_resumed":
                events.request.fire(
                    request_type="WS",
                    name="reconnect and resume session",
                    response_time=elapsed,
                    response_length=len(response),
                    exception=None
                )
            else:
                raise Exception(f"Session resume failed: {response}")
 
        except Exception as e:
            events.request.fire(
                request_type="WS",
                name="reconnect and resume session",
                response_time=int((time.time() - reconnect_start) * 1000),
                response_length=0,
                exception=e
            )
 
    def on_stop(self):
        if self.ws:
            self.ws.close()

This scenario helps uncover issues with:

  • Reconnection storms
  • Session recovery logic
  • Missed event replay
  • Load balancer stickiness problems
  • Stateful backend synchronization

Analyzing Your Results

After running your WebSocket load test in LoadForge, focus on metrics that reflect real-time behavior rather than just traditional request counts.

Key metrics to review

Connection success rate

If connection failures rise as user count increases, you may have issues in:

  • WebSocket upgrade handling
  • TLS termination
  • Authentication dependencies
  • Load balancer connection limits

Connect latency

Track how long it takes to establish a WebSocket session. Rising connect times often indicate pressure on:

  • Auth services
  • Session stores
  • Reverse proxies
  • Connection pools

Subscription acknowledgment time

For pub/sub systems, measure how quickly the server confirms topic or room subscriptions. Slow acknowledgments may reveal bottlenecks in routing or broker registration.

Message latency

Look at the response time for:

  • Message publish acknowledgments
  • Heartbeats
  • Server event delivery
  • Reconnect flows

High p95 or p99 latency often matters more than averages in WebSocket performance testing.

Error patterns

Inspect errors by type:

  • Timeouts
  • Authentication failures
  • Unexpected message formats
  • Connection resets
  • Closed socket exceptions

These patterns tell you whether the issue is protocol, infrastructure, or application logic.

Use LoadForge reporting effectively

LoadForge’s real-time reporting helps you monitor your WebSocket test as it runs, which is especially useful for long soak tests and ramp-up scenarios. With distributed testing and global test locations, you can also see whether users in different regions experience different connection or message latencies.

For team workflows, CI/CD integration lets you run repeatable WebSocket load testing as part of deployment validation, helping prevent regressions in real-time systems.

Performance Optimization Tips

When load testing reveals bottlenecks in your WebSocket application, these optimizations often help:

Reduce handshake overhead

If authentication during connect is expensive, consider:

  • Caching token validation results
  • Using stateless JWT verification
  • Avoiding database reads on every connection
  • Offloading auth to a dedicated gateway layer

Tune connection infrastructure

Review limits and settings for:

  • NGINX or Envoy WebSocket proxying
  • Idle timeout values
  • File descriptor limits
  • Maximum concurrent connections per node
  • Keepalive and TCP tuning

Optimize fan-out

For high-volume publish/subscribe systems:

  • Use efficient broker-backed routing
  • Avoid per-recipient serialization when possible
  • Batch outbound messages where appropriate
  • Partition hot channels or topics

Control heartbeat frequency

Heartbeats that are too frequent waste resources. Heartbeats that are too slow delay failure detection. Find the right balance for your traffic profile.

Handle slow consumers safely

Protect the system from memory pressure by:

  • Enforcing outbound buffer limits
  • Dropping stale clients
  • Applying backpressure controls
  • Compressing large payloads carefully

Minimize payload size

Large JSON messages increase CPU, bandwidth, and latency. Remove unnecessary fields and consider compact schemas for high-frequency streams.

Common Pitfalls to Avoid

WebSocket load testing is easy to get wrong if you treat it like standard HTTP testing.

Testing only connection setup

A test that only opens sockets does not validate message throughput, fan-out, or long-lived connection stability. Always include realistic message behavior.

Ignoring server push traffic

Many WebSocket systems are mostly server-driven. If your test only sends messages but never consumes them, you may miss the real bottleneck.

Unrealistic user pacing

Real users do not all send messages every second forever. Model realistic behavior:

  • Some users are mostly idle
  • Some subscribe to many channels
  • Some reconnect occasionally
  • Some only receive events

Forgetting heartbeats

If production clients send ping/pong or app-level keepalives, your load test should too. Otherwise, connection behavior may differ from real usage.

Not measuring reconnect behavior

Temporary disconnects are normal in real-world networks. If reconnect logic is critical, test it explicitly.

Overlooking infrastructure limits

Sometimes the application is fine, but:

  • Load balancers cap concurrent upgraded connections
  • Firewall idle timeouts kill sessions
  • Containers run out of file descriptors
  • NAT gateways exhaust ephemeral ports

Running from a single location only

Real-time performance can vary by geography. LoadForge’s cloud-based infrastructure and global test locations help you validate latency and stability from the regions your users actually occupy.

Conclusion

WebSocket applications require a different approach to load testing than traditional APIs. To validate real-time performance, you need to test more than request throughput—you need to measure concurrent connection capacity, subscription handling, message latency, heartbeat behavior, and reconnect resilience.

Using LoadForge with Locust-based Python scripts gives you the flexibility to model realistic WebSocket workloads, from chat systems and notification streams to market data feeds and collaborative applications. With distributed testing, real-time reporting, CI/CD integration, and global cloud infrastructure, LoadForge makes it much easier to performance test and stress test WebSocket systems at production scale.

If you’re ready to uncover bottlenecks in your real-time architecture, try LoadForge and start building WebSocket load tests that reflect how your users actually connect, subscribe, and communicate.

Try LoadForge free for 7 days

Set up your first load test in under 2 minutes. No commitment.