LoadForge LogoLoadForge

Load Testing LLM APIs with Streaming and Non-Streaming Requests

Load Testing LLM APIs with Streaming and Non-Streaming Requests

Introduction

Load testing LLM APIs with streaming and non-streaming requests is no longer optional for teams building AI-powered products. If your application depends on large language model responses for chat, summarization, classification, retrieval-augmented generation, or agent workflows, user experience is directly tied to API performance under load.

Unlike traditional REST APIs, LLM endpoints behave differently depending on whether you request a full response at once or stream tokens back incrementally. Non-streaming requests often optimize for simpler client handling and straightforward metrics like total response time. Streaming requests, on the other hand, can dramatically improve perceived responsiveness by delivering the first token quickly, even if total generation time remains high.

That difference matters during load testing and performance testing. A system that looks healthy under low traffic can degrade quickly when many users request long completions simultaneously. Streaming can hide some latency from end users, but it also introduces new bottlenecks: connection duration, token delivery jitter, server-side buffering, and concurrency pressure on gateways and inference workers.

In this guide, you’ll learn how to load test LLM APIs with both streaming and non-streaming requests using Locust on LoadForge. We’ll cover realistic Python scripts, authentication patterns, prompt payloads, chat-completions style endpoints, and how to compare latency, throughput, and UX-related metrics. We’ll also show where LoadForge helps, including distributed testing, real-time reporting, cloud-based infrastructure, global test locations, and CI/CD integration.

Prerequisites

Before you start load testing LLM APIs, make sure you have:

  • A LoadForge account
  • Access to the LLM API you want to test
  • An API key or bearer token
  • Knowledge of the model endpoint and request schema
  • Permission to generate test traffic against the environment
  • A clear goal, such as:
    • Compare streaming vs non-streaming latency
    • Measure first-token response time
    • Stress test concurrency limits
    • Validate throughput for production-like prompt sizes

You should also know a few LLM-specific variables that affect performance testing:

  • Model name, such as gpt-4o-mini, claude, llama, or an internal hosted model
  • Prompt length
  • Maximum output tokens
  • Temperature and sampling settings
  • Whether retrieval, tools, or function calling are enabled
  • Whether the endpoint supports server-sent events or chunked streaming

For the examples below, we’ll use realistic chat completion patterns similar to common AI APIs:

  • POST /v1/chat/completions for non-streaming
  • POST /v1/chat/completions with "stream": true for streaming
  • POST /v1/embeddings for a supporting AI workload
  • POST /v1/responses-style orchestration patterns where relevant

Understanding AI & LLM Under Load

LLM APIs behave very differently from conventional CRUD APIs under load. The bottlenecks are often not just web-server related. They can appear across several layers:

Inference latency

The model itself is usually the biggest factor. Larger prompts and longer outputs increase compute time. Under concurrency, GPU workers or inference pods can become saturated, causing queueing delays before generation even starts.

Time to first token vs total completion time

For streaming APIs, the user experience depends heavily on time to first token. A response that starts in 400 ms and finishes in 12 seconds can feel faster than a non-streaming response that arrives fully formed in 8 seconds.

This is why load testing streaming APIs requires more than measuring a single end-to-end response time.

Connection duration

Streaming requests keep connections open longer. That affects:

  • Reverse proxies
  • Load balancers
  • API gateways
  • Worker concurrency
  • Client-side connection pools

A backend may support 500 short non-streaming requests per minute but fail under 150 simultaneous long-lived streaming sessions.

Token throughput

LLM performance is often constrained by tokens per second rather than requests per second. Two tests with the same RPS can have radically different results depending on prompt size and max token settings.

Shared infrastructure contention

If your AI application also performs retrieval, embeddings, reranking, logging, moderation, or tool execution, those systems can become bottlenecks before the model itself does.

Common failure patterns include:

  • 429 rate limiting
  • 502/503 gateway failures
  • Increased first-token latency
  • Stream interruptions mid-generation
  • Timeouts on long prompts
  • Uneven latency across regions

That’s why load testing and stress testing LLM APIs should model realistic traffic, not just hammer a single short prompt repeatedly.

Writing Your First Load Test

Let’s begin with a simple non-streaming load test against a chat completions endpoint. This is useful as a baseline for total response time, request success rate, and throughput.

Basic non-streaming chat completion test

python
from locust import HttpUser, task, between
import os
import json
 
class LLMNonStreamingUser(HttpUser):
    wait_time = between(1, 3)
    host = os.getenv("LLM_API_HOST", "https://api.example-llm.com")
 
    def on_start(self):
        self.api_key = os.getenv("LLM_API_KEY", "replace-me")
        self.headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
 
    @task
    def chat_completion_non_streaming(self):
        payload = {
            "model": "gpt-4o-mini",
            "stream": False,
            "temperature": 0.3,
            "max_tokens": 220,
            "messages": [
                {
                    "role": "system",
                    "content": "You are a concise support assistant for an e-commerce platform."
                },
                {
                    "role": "user",
                    "content": (
                        "A customer says: 'My package shows delivered but I never received it. "
                        "What should I do next?' Provide a helpful response in 5 bullet points."
                    )
                }
            ]
        }
 
        with self.client.post(
            "/v1/chat/completions",
            headers=self.headers,
            json=payload,
            name="/v1/chat/completions [non-streaming]",
            catch_response=True,
            timeout=60
        ) as response:
            if response.status_code != 200:
                response.failure(f"Unexpected status code: {response.status_code}")
                return
 
            try:
                data = response.json()
                content = data["choices"][0]["message"]["content"]
                usage = data.get("usage", {})
                if not content:
                    response.failure("Empty completion returned")
                    return
 
                response.success()
            except (KeyError, json.JSONDecodeError) as e:
                response.failure(f"Invalid response format: {e}")

What this test measures

This script gives you a baseline for:

  • End-to-end latency
  • Success/failure rate
  • Throughput under concurrent users
  • Error patterns like 429s and 5xx responses

It’s a good starting point for performance testing because it isolates the simple request/response path. However, it does not tell you how quickly users begin seeing output, which is critical for chat UX.

Why this matters

Many teams initially test only non-streaming requests because they’re easier to validate. But if your frontend uses token streaming, this can underrepresent real production behavior. Streaming changes connection patterns and server resource usage significantly.

Advanced Load Testing Scenarios

Now let’s move to more realistic AI & LLM scenarios. We’ll compare streaming and non-streaming behavior, add prompt variation, and simulate mixed workloads that resemble production traffic.

Scenario 1: Streaming chat completions with first-token timing

This example measures both total stream duration and time to first token. Since Locust’s default HTTP timing won’t fully capture streaming UX, we manually track milestones and report them as custom events.

python
from locust import HttpUser, task, between, events
import os
import time
import json
 
class LLMStreamingUser(HttpUser):
    wait_time = between(1, 2)
    host = os.getenv("LLM_API_HOST", "https://api.example-llm.com")
 
    def on_start(self):
        self.api_key = os.getenv("LLM_API_KEY", "replace-me")
        self.headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
            "Accept": "text/event-stream"
        }
 
    @task
    def chat_completion_streaming(self):
        payload = {
            "model": "gpt-4o-mini",
            "stream": True,
            "temperature": 0.2,
            "max_tokens": 300,
            "messages": [
                {
                    "role": "system",
                    "content": "You are a helpful travel assistant."
                },
                {
                    "role": "user",
                    "content": (
                        "Plan a 3-day trip to Tokyo for a first-time visitor. "
                        "Include neighborhoods, food recommendations, and transit tips."
                    )
                }
            ]
        }
 
        start_time = time.time()
        first_token_time = None
        token_count_estimate = 0
 
        try:
            with self.client.post(
                "/v1/chat/completions",
                headers=self.headers,
                json=payload,
                name="/v1/chat/completions [streaming]",
                catch_response=True,
                stream=True,
                timeout=90
            ) as response:
                if response.status_code != 200:
                    response.failure(f"Unexpected status code: {response.status_code}")
                    return
 
                for line in response.iter_lines(decode_unicode=True):
                    if not line:
                        continue
 
                    if line.startswith("data: "):
                        data_part = line[6:].strip()
 
                        if data_part == "[DONE]":
                            break
 
                        try:
                            event_data = json.loads(data_part)
                            delta = event_data["choices"][0].get("delta", {})
                            content = delta.get("content", "")
 
                            if content and first_token_time is None:
                                first_token_time = time.time()
 
                            if content:
                                token_count_estimate += len(content.split())
 
                        except (json.JSONDecodeError, KeyError, IndexError):
                            continue
 
                total_time_ms = int((time.time() - start_time) * 1000)
 
                if first_token_time:
                    first_token_ms = int((first_token_time - start_time) * 1000)
                    events.request.fire(
                        request_type="STREAM",
                        name="first_token_latency",
                        response_time=first_token_ms,
                        response_length=token_count_estimate,
                        exception=None,
                        context={}
                    )
                    response.success()
                else:
                    response.failure("No streamed tokens received")
 
                events.request.fire(
                    request_type="STREAM",
                    name="stream_total_duration",
                    response_time=total_time_ms,
                    response_length=token_count_estimate,
                    exception=None,
                    context={}
                )
 
        except Exception as e:
            events.request.fire(
                request_type="STREAM",
                name="stream_total_duration",
                response_time=0,
                response_length=0,
                exception=e,
                context={}
            )

Why this script is useful

This test helps you compare:

  • Time to first token
  • Total completion duration
  • Stability of long-lived streaming connections
  • Whether streams terminate cleanly

In LoadForge, these custom request metrics can be extremely useful when analyzing real-time reporting dashboards. You can compare first-token latency against traditional response time and see whether streaming actually improves user-perceived performance under load.

Scenario 2: Mixed streaming and non-streaming traffic with weighted tasks

Most real AI applications don’t serve only one traffic type. Some users want streamed chat. Others request structured JSON outputs or short backend completions that don’t need streaming. This test simulates a more realistic traffic mix.

python
from locust import HttpUser, task, between
import os
import time
import json
 
class MixedLLMWorkloadUser(HttpUser):
    wait_time = between(1, 4)
    host = os.getenv("LLM_API_HOST", "https://api.example-llm.com")
 
    def on_start(self):
        self.api_key = os.getenv("LLM_API_KEY", "replace-me")
        self.base_headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
 
    @task(3)
    def non_streaming_summary(self):
        payload = {
            "model": "gpt-4o-mini",
            "stream": False,
            "temperature": 0.1,
            "max_tokens": 180,
            "messages": [
                {
                    "role": "system",
                    "content": "You summarize business meeting notes into action items."
                },
                {
                    "role": "user",
                    "content": (
                        "Meeting notes: Marketing launch delayed by 2 weeks. "
                        "Engineering needs final API schema by Friday. "
                        "Customer success wants updated onboarding docs. "
                        "Summarize into action items with owners and deadlines."
                    )
                }
            ]
        }
 
        with self.client.post(
            "/v1/chat/completions",
            headers=self.base_headers,
            json=payload,
            name="/v1/chat/completions [summary non-stream]",
            catch_response=True,
            timeout=60
        ) as response:
            if response.status_code == 200:
                response.success()
            else:
                response.failure(f"Status {response.status_code}")
 
    @task(2)
    def streaming_chat(self):
        headers = dict(self.base_headers)
        headers["Accept"] = "text/event-stream"
 
        payload = {
            "model": "gpt-4o-mini",
            "stream": True,
            "temperature": 0.7,
            "max_tokens": 350,
            "messages": [
                {
                    "role": "system",
                    "content": "You are an expert coding assistant."
                },
                {
                    "role": "user",
                    "content": (
                        "Explain how to implement exponential backoff for API retries in Python, "
                        "and include a short code example."
                    )
                }
            ]
        }
 
        with self.client.post(
            "/v1/chat/completions",
            headers=headers,
            json=payload,
            name="/v1/chat/completions [chat stream]",
            catch_response=True,
            stream=True,
            timeout=90
        ) as response:
            if response.status_code != 200:
                response.failure(f"Status {response.status_code}")
                return
 
            received_content = False
            for line in response.iter_lines(decode_unicode=True):
                if not line:
                    continue
                if line.startswith("data: "):
                    data_part = line[6:].strip()
                    if data_part == "[DONE]":
                        break
                    try:
                        event_data = json.loads(data_part)
                        delta = event_data["choices"][0].get("delta", {})
                        if delta.get("content"):
                            received_content = True
                    except Exception:
                        pass
 
            if received_content:
                response.success()
            else:
                response.failure("Stream produced no content")
 
    @task(1)
    def embeddings_request(self):
        payload = {
            "model": "text-embedding-3-large",
            "input": [
                "How do I reset my password?",
                "What is your refund policy for annual subscriptions?",
                "Can I export my analytics data to CSV?"
            ]
        }
 
        with self.client.post(
            "/v1/embeddings",
            headers=self.base_headers,
            json=payload,
            name="/v1/embeddings",
            catch_response=True,
            timeout=30
        ) as response:
            if response.status_code == 200:
                response.success()
            else:
                response.failure(f"Status {response.status_code}")

What this mixed workload reveals

This type of load testing is valuable because it reflects production conditions more closely:

  • Shorter non-streaming requests may compete with long streaming sessions
  • Embeddings traffic may contend for shared infrastructure
  • The API gateway may behave differently under mixed workloads
  • Throughput may degrade even if average latency appears acceptable

With LoadForge’s distributed testing, you can run this workload from multiple regions to see whether edge routing, regional inference clusters, or CDN/API gateway layers affect performance.

Scenario 3: Authenticated multi-step AI workflow with session setup and long prompts

Many AI products don’t call the model directly from anonymous traffic. They authenticate users, fetch context, and then send a large prompt. This example simulates a realistic workflow: login, retrieve knowledge context, then call a non-streaming LLM endpoint with a larger payload.

python
from locust import HttpUser, task, between
import os
import random
 
class RAGWorkflowUser(HttpUser):
    wait_time = between(2, 5)
    host = os.getenv("APP_API_HOST", "https://app.example-ai.com")
 
    def on_start(self):
        self.email = os.getenv("TEST_USER_EMAIL", "loadtest@example-ai.com")
        self.password = os.getenv("TEST_USER_PASSWORD", "SuperSecret123!")
        self.access_token = None
        self.authenticate()
 
    def authenticate(self):
        payload = {
            "email": self.email,
            "password": self.password
        }
 
        response = self.client.post(
            "/api/auth/login",
            json=payload,
            name="/api/auth/login",
            timeout=20
        )
        response.raise_for_status()
        self.access_token = response.json()["access_token"]
 
    def auth_headers(self):
        return {
            "Authorization": f"Bearer {self.access_token}",
            "Content-Type": "application/json"
        }
 
    @task
    def ask_with_retrieval_context(self):
        kb_queries = [
            "SOC 2 compliance requirements for data retention",
            "Enterprise SSO setup for Okta SAML",
            "Incident response SLA for premium customers",
            "API rate limits for partner integrations"
        ]
        query = random.choice(kb_queries)
 
        search_payload = {
            "query": query,
            "top_k": 4,
            "filters": {
                "workspace_id": "ws_enterprise_docs"
            }
        }
 
        with self.client.post(
            "/api/search",
            headers=self.auth_headers(),
            json=search_payload,
            name="/api/search",
            catch_response=True,
            timeout=20
        ) as search_response:
            if search_response.status_code != 200:
                search_response.failure(f"Search failed: {search_response.status_code}")
                return
 
            documents = search_response.json().get("documents", [])
            if not documents:
                search_response.failure("No documents returned from search")
                return
 
        context_text = "\n\n".join(
            f"Document {i+1}: {doc.get('title', 'Untitled')}\n{doc.get('content', '')[:1200]}"
            for i, doc in enumerate(documents)
        )
 
        llm_payload = {
            "model": "gpt-4o-mini",
            "stream": False,
            "temperature": 0.0,
            "max_tokens": 450,
            "messages": [
                {
                    "role": "system",
                    "content": (
                        "You are an enterprise support assistant. "
                        "Answer only using the provided documentation context."
                    )
                },
                {
                    "role": "user",
                    "content": (
                        f"Context:\n{context_text}\n\n"
                        f"Question: {query}\n\n"
                        "Provide a precise answer and cite which document sections were used."
                    )
                }
            ]
        }
 
        with self.client.post(
            "/v1/chat/completions",
            headers=self.auth_headers(),
            json=llm_payload,
            name="/v1/chat/completions [RAG non-stream]",
            catch_response=True,
            timeout=90
        ) as llm_response:
            if llm_response.status_code == 401:
                self.authenticate()
                llm_response.failure("Token expired; re-authenticated")
            elif llm_response.status_code != 200:
                llm_response.failure(f"LLM call failed: {llm_response.status_code}")
            else:
                content = llm_response.json()["choices"][0]["message"]["content"]
                if len(content) < 50:
                    llm_response.failure("LLM response too short")
                else:
                    llm_response.success()

Why this workflow matters

This scenario is especially useful for AI & LLM performance testing because it surfaces bottlenecks beyond the model itself:

  • Authentication overhead
  • Retrieval latency
  • Large prompt assembly
  • Increased token count from injected context
  • Session expiration under long tests

This is where LoadForge’s cloud-based infrastructure and CI/CD integration become particularly helpful. You can run this workflow automatically after backend changes to catch regressions in retrieval or generation latency before they hit production.

Analyzing Your Results

When load testing LLM APIs, average response time alone is not enough. You should analyze results across several dimensions.

Compare streaming and non-streaming separately

For non-streaming requests, focus on:

  • Median and p95 response time
  • Request throughput
  • Error rate
  • Queueing behavior as concurrency increases

For streaming requests, focus on:

  • Time to first token
  • Total stream duration
  • Stream completion success rate
  • Connection stability under concurrency

A common pattern is that streaming improves UX at low to moderate concurrency, but total infrastructure pressure rises because connections remain open longer.

Watch p95 and p99 latency

LLM workloads often have wide latency variance. Average numbers can look fine while tail latency becomes unacceptable. If your p95 first-token latency jumps from 700 ms to 4 seconds under load, users will notice immediately.

Track error classes

Different failures imply different bottlenecks:

  • 429 Too Many Requests: provider or gateway rate limiting
  • 502/503: overloaded upstream inference or proxy instability
  • 504: timeout in gateway or backend
  • Broken streams: network/proxy buffering or worker interruption
  • Empty completions: application-level failures or malformed event handling

Evaluate token-heavy scenarios separately

A short prompt benchmark may be misleading. Always test combinations of:

  • Short prompts + short outputs
  • Long prompts + short outputs
  • Long prompts + long outputs

This helps you understand whether bottlenecks are caused by prompt processing, generation time, or both.

Use LoadForge’s reporting strategically

LoadForge’s real-time reporting is useful for spotting inflection points during a test:

  • When first-token latency begins rising
  • When throughput plateaus
  • When errors spike at certain user counts
  • Whether one region performs worse than another

If your application serves global users, run tests from multiple global test locations to identify regional performance gaps or routing issues.

Performance Optimization Tips

Once your load testing reveals bottlenecks, these are the most common optimization opportunities for AI & LLM systems.

Reduce prompt size

Large prompts dramatically increase inference cost and latency. Trim unnecessary system instructions, retrieved context, and conversation history where possible.

Cap output tokens

If users don’t need long-form responses, lower max_tokens. This reduces compute time and improves throughput.

Stream for UX, not blindly

Streaming can improve perceived responsiveness, but it may reduce infrastructure efficiency at scale because connections stay open longer. Test both modes carefully before standardizing on one.

Separate workloads by endpoint or pool

If embeddings and chat completions share infrastructure, one workload can starve the other. Consider isolation by queue, worker pool, or deployment.

Implement backpressure and rate limiting

If your upstream LLM provider enforces quotas, your application should degrade gracefully instead of flooding retries and amplifying failures.

Cache where possible

For repeated prompts, retrieval results, or common system contexts, caching can reduce pressure on both your application and the model backend.

Tune gateway and proxy settings

Streaming often fails because of:

  • Idle timeout settings
  • Buffering behavior
  • Connection limits
  • Keep-alive misconfiguration

Your model may be healthy while your proxy layer breaks the user experience.

Common Pitfalls to Avoid

Testing only one prompt

A single repeated prompt rarely reflects production. Vary prompt size, complexity, and output expectations.

Ignoring first-token latency

For streaming APIs, total duration is not the whole story. If the first token arrives too slowly, users still perceive the application as sluggish.

Using unrealistic token limits

Setting max_tokens too low or too high can distort results. Use values that reflect actual product usage.

Not validating response content

A 200 OK does not guarantee a useful model response. Your Locust scripts should verify that content exists and is structurally valid.

Overlooking authentication and upstream dependencies

If production traffic includes login, retrieval, moderation, or tool execution, your load tests should include them too.

Running tests from only one region

AI applications often serve users globally. Latency and reliability can vary significantly by geography, so use distributed testing to understand real-world performance.

Misinterpreting averages

LLM APIs often produce long-tail latency distributions. Always review p95 and p99 metrics, not just averages.

Conclusion

Load testing LLM APIs with streaming and non-streaming requests is essential for understanding real user experience, backend capacity, and infrastructure bottlenecks. Non-streaming tests give you clean end-to-end latency and throughput measurements, while streaming tests reveal first-token responsiveness, long-lived connection behavior, and the true UX impact of token delivery.

By using realistic Locust scripts on LoadForge, you can simulate production-like AI & LLM traffic patterns, compare streaming and non-streaming performance, and identify where your architecture starts to break down. With LoadForge’s distributed testing, cloud-based infrastructure, real-time reporting, global test locations, and CI/CD integration, you can turn LLM performance testing into a repeatable engineering practice instead of a one-off exercise.

If you’re building AI products that depend on fast, reliable model responses, now is the time to test them properly. Try LoadForge and start benchmarking your LLM APIs under real load.

Try LoadForge free for 7 days

Set up your first load test in under 2 minutes. No commitment.