
Introduction
Redis is often the performance backbone of modern applications. Teams use it for caching, session storage, rate limiting, queues, leaderboards, and real-time analytics because it delivers extremely fast in-memory operations. But even though Redis is known for speed, it can still become a bottleneck under heavy traffic, poor key design, oversized values, or inefficient command patterns.
That’s why Redis load testing matters. A proper Redis load test helps you measure throughput, latency, error rates, and behavior at high concurrency before production traffic exposes weaknesses. Whether you are validating cache performance for a web application, stress testing session storage, or benchmarking a queue-backed workflow, performance testing Redis gives you the data you need to scale confidently.
In this guide, you’ll learn how to use LoadForge for Redis load testing with realistic Locust-based Python scripts. Because LoadForge uses Locust under the hood, you can build custom tests that simulate real application behavior while taking advantage of cloud-based infrastructure, distributed testing, real-time reporting, CI/CD integration, and global test locations.
Prerequisites
Before you begin load testing Redis with LoadForge, make sure you have the following:
- A Redis instance to test
- Redis standalone
- Redis Sentinel-managed deployment
- Redis Cluster
- Managed Redis such as AWS ElastiCache, Azure Cache for Redis, or Redis Enterprise
- Network access from your LoadForge workers or test environment to the Redis endpoint
- Authentication details if required
- Password
- ACL username and password
- TLS settings if enabled
- A clear test goal, such as:
- Maximum GET/SET throughput
- P95 or P99 latency under load
- Connection saturation limits
- Session store performance
- Queue processing under burst traffic
- Basic familiarity with:
- Redis commands like GET, SET, HGETALL, LPUSH, BRPOP, INCR
- Python
- Locust event hooks and user classes
You will also need the Redis Python client in your test environment. If you are running locally first, install dependencies like this:
pip install locust redisIf your Redis deployment uses TLS or ACLs, confirm the connection details beforehand. A small mistake in authentication or SSL configuration can invalidate your test before it starts.
Understanding Redis Under Load
Redis is single-threaded for command execution in many common deployments, which is part of why it is so predictable and fast. However, that design also means you need to understand how command complexity, network overhead, and memory pressure affect performance.
What happens when Redis is under heavy load?
Under concurrency, Redis typically handles a very high number of simple operations such as GET and SET. But performance can degrade when:
- Too many clients open connections simultaneously
- Large values increase serialization and network transfer time
- Expensive commands like KEYS or large-range scans are used
- Lua scripts or transactions become long-running
- Persistence settings add disk I/O pressure
- Memory fragmentation increases
- Evictions occur due to maxmemory limits
- Replication lag builds on replicas
Common Redis bottlenecks
Network saturation
Redis is fast enough that the network often becomes the bottleneck before CPU does, especially for small key-value operations.
Connection overhead
If your application creates and destroys Redis connections too often, latency increases and throughput drops. Connection pooling is critical.
Hot keys
A small number of extremely popular keys can create uneven load patterns, especially in clustered environments.
Large payloads
A 200-byte JSON blob behaves very differently from a 200 KB serialized object. Bigger values increase memory use and response times.
Inefficient command patterns
Running many individual commands instead of pipelining, or using blocking commands carelessly, can reduce overall throughput.
A good Redis stress test should reflect your real usage patterns, not just synthetic SET/GET loops. That means testing actual session keys, cache item sizes, TTL behavior, queue operations, and authentication patterns.
Writing Your First Load Test
Let’s start with a basic Redis load testing script using Locust. Since Redis is not an HTTP service, we’ll use Locust’s User class rather than HttpUser. This allows us to simulate Redis operations directly with the Python Redis client while still capturing metrics in Locust and LoadForge.
Basic Redis GET and SET load test
This example simulates a typical cache workload:
- Writing user profile cache entries
- Reading cached profiles
- Applying realistic TTL values
- Recording custom request metrics in Locust
from locust import User, task, between, events
import redis
import json
import random
import time
class RedisCacheUser(User):
wait_time = between(0.01, 0.2)
def on_start(self):
self.client = redis.Redis(
host="redis-prod.internal.example.com",
port=6379,
username="loadtest_user",
password="SuperSecretPassword!",
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5
)
@task(3)
def get_user_profile_cache(self):
user_id = random.randint(1, 50000)
key = f"cache:user:profile:{user_id}"
start_time = time.time()
try:
value = self.client.get(key)
response_time = (time.time() - start_time) * 1000
if value is None:
events.request.fire(
request_type="redis",
name="GET cache:user:profile:* [miss]",
response_time=response_time,
response_length=0,
exception=None
)
else:
events.request.fire(
request_type="redis",
name="GET cache:user:profile:* [hit]",
response_time=response_time,
response_length=len(value),
exception=None
)
except Exception as e:
response_time = (time.time() - start_time) * 1000
events.request.fire(
request_type="redis",
name="GET cache:user:profile:*",
response_time=response_time,
response_length=0,
exception=e
)
@task(1)
def set_user_profile_cache(self):
user_id = random.randint(1, 50000)
key = f"cache:user:profile:{user_id}"
payload = {
"user_id": user_id,
"name": f"user-{user_id}",
"plan": random.choice(["free", "pro", "enterprise"]),
"last_login": "2026-04-06T10:15:00Z",
"feature_flags": {
"beta_dashboard": random.choice([True, False]),
"priority_support": random.choice([True, False])
}
}
start_time = time.time()
try:
value = json.dumps(payload)
self.client.setex(key, 300, value)
response_time = (time.time() - start_time) * 1000
events.request.fire(
request_type="redis",
name="SETEX cache:user:profile:*",
response_time=response_time,
response_length=len(value),
exception=None
)
except Exception as e:
response_time = (time.time() - start_time) * 1000
events.request.fire(
request_type="redis",
name="SETEX cache:user:profile:*",
response_time=response_time,
response_length=0,
exception=e
)What this script tests
This script represents a common Redis caching use case:
- 75% reads
- 25% writes
- JSON values similar to user profile cache objects
- TTL-based cache entries
- Authenticated Redis access using ACL username and password
This is a strong starting point for Redis performance testing because it reflects how many applications actually use Redis: fast key-value retrieval with periodic updates.
Why this matters
A simple GET/SET benchmark can help you answer questions like:
- How many cache operations per second can Redis sustain?
- Does latency remain low at 500, 1,000, or 5,000 concurrent users?
- Do timeouts appear when many clients connect at once?
- Are cache misses significantly affecting application behavior?
In LoadForge, you can run this test across distributed workers to simulate realistic load from multiple regions and watch throughput and latency in real time.
Advanced Load Testing Scenarios
Basic cache tests are useful, but production Redis workloads are usually more complex. Below are several advanced Redis load testing scenarios that better represent real applications.
Scenario 1: Session store load testing with login token validation
Many applications store user sessions in Redis. This test simulates:
- Creating session records after login
- Looking up active sessions
- Refreshing TTL on active sessions
- Deleting sessions on logout
from locust import User, task, between, events
import redis
import json
import random
import string
import time
class RedisSessionUser(User):
wait_time = between(0.05, 0.3)
def on_start(self):
self.client = redis.Redis(
host="redis-prod.internal.example.com",
port=6379,
username="session_service",
password="AnotherSecretPassword!",
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5
)
self.session_token = None
def _record(self, command_name, start_time, response_length=0, exception=None):
events.request.fire(
request_type="redis",
name=command_name,
response_time=(time.time() - start_time) * 1000,
response_length=response_length,
exception=exception
)
def _generate_token(self, length=40):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
@task(1)
def create_session(self):
user_id = random.randint(1000, 200000)
self.session_token = self._generate_token()
key = f"session:{self.session_token}"
session_data = {
"user_id": user_id,
"ip": f"192.168.1.{random.randint(1, 254)}",
"user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
"scopes": ["read:profile", "read:billing"],
"created_at": "2026-04-06T10:30:00Z"
}
start_time = time.time()
try:
payload = json.dumps(session_data)
self.client.setex(key, 1800, payload)
self._record("SETEX session:*", start_time, len(payload))
except Exception as e:
self._record("SETEX session:*", start_time, exception=e)
@task(4)
def validate_session(self):
if not self.session_token:
self.create_session()
return
key = f"session:{self.session_token}"
start_time = time.time()
try:
value = self.client.get(key)
if value:
self.client.expire(key, 1800)
self._record("GET+EXPIRE session:*", start_time, len(value))
else:
self._record("GET+EXPIRE session:* [missing]", start_time, 0)
except Exception as e:
self._record("GET+EXPIRE session:*", start_time, exception=e)
@task(1)
def logout_session(self):
if not self.session_token:
return
key = f"session:{self.session_token}"
start_time = time.time()
try:
deleted = self.client.delete(key)
self._record("DEL session:*", start_time, deleted)
self.session_token = None
except Exception as e:
self._record("DEL session:*", start_time, exception=e)When to use this test
This is ideal for:
- SaaS applications
- User login systems
- API gateway token/session validation
- Applications with rolling session expiration
It helps you measure how Redis behaves when session validation traffic spikes, such as after a product launch or peak login event.
Scenario 2: Queue and background job stress testing
Redis is frequently used as a lightweight queue for background jobs. This example simulates producers pushing jobs into a queue and workers consuming them. It is useful for testing burst traffic and queue depth growth.
from locust import User, task, between, events
import redis
import json
import random
import time
import uuid
class RedisQueueUser(User):
wait_time = between(0.01, 0.1)
def on_start(self):
self.client = redis.Redis(
host="redis-prod.internal.example.com",
port=6379,
username="queue_worker",
password="QueueSecretPassword!",
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5
)
def _record(self, command_name, start_time, response_length=0, exception=None):
events.request.fire(
request_type="redis",
name=command_name,
response_time=(time.time() - start_time) * 1000,
response_length=response_length,
exception=exception
)
@task(3)
def enqueue_email_job(self):
queue_name = "queue:email:send"
job = {
"job_id": str(uuid.uuid4()),
"template": random.choice(["welcome_email", "password_reset", "invoice_ready"]),
"recipient": f"user{random.randint(1, 500000)}@example.com",
"priority": random.choice(["normal", "high"]),
"payload": {
"first_name": random.choice(["Ava", "Liam", "Noah", "Emma"]),
"account_id": random.randint(10000, 99999)
},
"created_at": "2026-04-06T10:45:00Z"
}
start_time = time.time()
try:
payload = json.dumps(job)
self.client.lpush(queue_name, payload)
self._record("LPUSH queue:email:send", start_time, len(payload))
except Exception as e:
self._record("LPUSH queue:email:send", start_time, exception=e)
@task(2)
def dequeue_email_job(self):
queue_name = "queue:email:send"
processing_queue = "queue:email:processing"
start_time = time.time()
try:
result = self.client.rpoplpush(queue_name, processing_queue)
if result:
self._record("RPOPLPUSH email queue", start_time, len(result))
else:
self._record("RPOPLPUSH email queue [empty]", start_time, 0)
except Exception as e:
self._record("RPOPLPUSH email queue", start_time, exception=e)
@task(1)
def queue_depth_check(self):
start_time = time.time()
try:
depth = self.client.llen("queue:email:send")
self._record("LLEN queue:email:send", start_time, depth)
except Exception as e:
self._record("LLEN queue:email:send", start_time, exception=e)What to look for
With this Redis stress testing scenario, watch for:
- Queue growth over time
- Increased latency as queue depth rises
- Command timeouts during bursts
- Whether consumers keep pace with producers
This kind of test is especially useful before scheduled campaigns, flash sales, or batch-processing windows.
Scenario 3: Hashes, counters, and pipelining for leaderboard and analytics workloads
Redis is also widely used for counters and leaderboard-like features. This scenario simulates:
- Incrementing page view counters
- Updating leaderboard scores
- Reading top-ranked items
- Using pipelining to reduce round-trip overhead
from locust import User, task, between, events
import redis
import random
import time
class RedisAnalyticsUser(User):
wait_time = between(0.01, 0.15)
def on_start(self):
self.client = redis.Redis(
host="redis-prod.internal.example.com",
port=6379,
username="analytics_service",
password="AnalyticsSecretPassword!",
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5
)
def _record(self, command_name, start_time, response_length=0, exception=None):
events.request.fire(
request_type="redis",
name=command_name,
response_time=(time.time() - start_time) * 1000,
response_length=response_length,
exception=exception
)
@task(5)
def increment_page_metrics(self):
article_id = random.randint(1, 10000)
country = random.choice(["us", "uk", "de", "fr", "in", "jp"])
start_time = time.time()
try:
pipe = self.client.pipeline()
pipe.incr(f"metrics:article:{article_id}:views")
pipe.hincrby(f"metrics:article:{article_id}:countries", country, 1)
pipe.zincrby("leaderboard:articles:daily", 1, article_id)
results = pipe.execute()
self._record("PIPELINE article metrics update", start_time, len(results))
except Exception as e:
self._record("PIPELINE article metrics update", start_time, exception=e)
@task(2)
def read_top_articles(self):
start_time = time.time()
try:
top_articles = self.client.zrevrange("leaderboard:articles:daily", 0, 9, withscores=True)
self._record("ZREVRANGE leaderboard:articles:daily", start_time, len(top_articles))
except Exception as e:
self._record("ZREVRANGE leaderboard:articles:daily", start_time, exception=e)
@task(1)
def read_article_country_breakdown(self):
article_id = random.randint(1, 10000)
start_time = time.time()
try:
stats = self.client.hgetall(f"metrics:article:{article_id}:countries")
self._record("HGETALL metrics:article:*:countries", start_time, len(stats))
except Exception as e:
self._record("HGETALL metrics:article:*:countries", start_time, exception=e)Why this scenario is valuable
This test reflects high-volume analytics and ranking use cases. It is especially useful for:
- Media sites
- Gaming leaderboards
- Social engagement counters
- Real-time dashboards
It also demonstrates how pipelining can improve Redis throughput by reducing network round trips, something you should compare directly against non-pipelined workloads during performance testing.
Analyzing Your Results
After running your Redis load test in LoadForge, focus on more than just average response time. Redis often looks excellent at the average while hiding serious issues in the tail latencies.
Key metrics to review
Requests per second
This tells you how much Redis traffic your deployment can handle. Compare throughput across:
- Different concurrency levels
- Different command types
- Different payload sizes
- Different infrastructure sizes
P95 and P99 latency
These are often more important than the average. A Redis instance serving most requests in 2 ms but some in 100 ms may still hurt application performance.
Error rate
Look for:
- Connection timeouts
- Authentication failures
- Read/write timeouts
- Redis server busy errors
- Connection resets
Command-specific performance
Segment results by operation:
- GET vs SETEX
- LPUSH vs RPOPLPUSH
- INCR vs HGETALL
- Pipelined vs non-pipelined operations
This helps identify which command patterns break down first.
Concurrency thresholds
Find the point where:
- Latency starts climbing sharply
- Errors begin appearing
- Throughput stops scaling linearly
That threshold is often more actionable than the absolute maximum throughput.
LoadForge-specific advantages
LoadForge makes Redis performance testing easier because you can:
- Run distributed load tests from multiple regions
- Observe real-time reporting as command latency changes
- Compare multiple test runs over time
- Integrate tests into CI/CD pipelines before releases
- Use cloud-based infrastructure to generate much higher concurrency than local machines
For example, you can test whether Redis behaves differently when traffic comes from one region versus several global test locations, which is helpful for managed Redis deployments used by international applications.
Performance Optimization Tips
If your Redis load testing reveals issues, here are some practical optimization steps.
Use connection pooling
Avoid opening a new connection per operation. Reuse connections through pooling in your application and test scripts.
Keep values small
Large JSON blobs or serialized objects can increase latency significantly. Store only what you need.
Prefer pipelining for related operations
If your application performs several commands together, pipelining can dramatically improve throughput.
Set expirations intentionally
TTL-based keys are powerful, but poor expiration strategy can create spikes in evictions or memory churn.
Avoid expensive commands in production paths
Commands like KEYS or very broad scans can damage performance under load.
Watch memory and eviction policies
If Redis approaches maxmemory, eviction behavior can impact both latency and correctness.
Distribute hot keys
If one key receives disproportionate traffic, consider sharding or redesigning access patterns.
Benchmark realistic payloads
Always test with key names, value sizes, and command ratios that match your real application.
Common Pitfalls to Avoid
Redis load testing is simple to start, but there are several common mistakes that lead to misleading results.
Testing only GET and SET
Real Redis workloads often include hashes, sorted sets, counters, lists, expiration, and pipelining. A simplistic benchmark may not reflect production.
Ignoring authentication and TLS overhead
If production uses ACLs and TLS, your test should too. Otherwise, performance results may be overly optimistic.
Using unrealistic key distributions
A perfectly random key pattern may hide hot-key problems, while a single repeated key may overstate contention. Match production behavior as closely as possible.
Not warming the dataset
If you test cache reads before populating keys, you are really testing misses, not hits. Warm the cache appropriately.
Overlooking client-side bottlenecks
Sometimes the Redis server is fine, but the load generator, Python runtime, or network path becomes the bottleneck. Distributed testing with LoadForge helps reduce this risk.
Forgetting persistence effects
A Redis instance with AOF or RDB snapshots may behave differently under write-heavy load than one with persistence disabled.
Not separating read-heavy and write-heavy tests
A mixed workload is useful, but isolated scenarios help you understand which type of traffic causes degradation.
Conclusion
Redis is fast, but speed alone does not guarantee reliability at scale. Whether you are load testing a cache, stress testing a session store, validating a queue, or benchmarking counters and leaderboards, the right Redis load testing strategy helps you uncover latency spikes, connection issues, and throughput limits before they impact users.
With LoadForge, you can build realistic Locust-based Redis performance tests, run them with distributed cloud infrastructure, monitor results in real time, and integrate them into your release process. If you want to measure cache throughput, response times, and Redis performance at high concurrency with confidence, now is a great time to try LoadForge.
LoadForge Team
LoadForge is a load and performance testing platform built on Locust. Our team has been shipping load tests against production systems since 2018, and we write these guides from real customer engagements.
Related guides
Keep going with more guides from the same category.

Database Stress Testing Best Practices with LoadForge
Explore database stress testing best practices with LoadForge for realistic workloads, peak traffic, and failure testing.

MariaDB Load Testing with LoadForge
Use LoadForge for MariaDB load testing to benchmark query performance, concurrent users, and database stability.

Cassandra Load Testing with LoadForge
Run Cassandra load tests with LoadForge to measure write throughput, query latency, and distributed database resilience.