Explorer reports addition
We have added a new Explorer feature to reports, with a timeline scrubber and easy anomaly detection.
Test message queue APIs including RabbitMQ, SQS, and Redis queues
LoadForge can record your browser, graphically build tests, scan your site with a wizard and more. Sign up now to run your first test.
This guide demonstrates how to test message queue APIs with LoadForge, covering popular queue systems like RabbitMQ, AWS SQS, and Redis queues.
from locust import HttpUser, task, between
import json
import time
import uuid
import random
class MessageQueueUser(HttpUser):
wait_time = between(1, 3)
def on_start(self):
"""Initialize queue testing"""
self.queue_name = f"test-queue-{int(time.time())}"
self.message_count = 0
self.published_messages = []
# Create test queue
self._create_queue()
def _create_queue(self):
"""Create a test queue"""
queue_config = {
"name": self.queue_name,
"durable": True,
"auto_delete": False,
"max_length": 1000
}
response = self.client.post('/api/queues',
json=queue_config,
headers={'Content-Type': 'application/json'},
name="create_queue")
if response.status_code not in [200, 201, 409]: # 409 = already exists
print(f"Failed to create queue: {response.status_code}")
@task(4)
def test_publish_message(self):
"""Test publishing messages to queue"""
message_id = str(uuid.uuid4())
message = {
"id": message_id,
"type": random.choice(["order", "notification", "email", "sms"]),
"payload": {
"user_id": f"user_{random.randint(1000, 9999)}",
"action": random.choice(["create", "update", "delete"]),
"timestamp": int(time.time()),
"data": {"key": f"value_{random.randint(1, 100)}"}
},
"priority": random.randint(1, 10),
"retry_count": 0,
"max_retries": 3
}
headers = {
'Content-Type': 'application/json',
'X-Message-ID': message_id,
'X-Queue-Name': self.queue_name
}
response = self.client.post(f'/api/queues/{self.queue_name}/messages',
json=message,
headers=headers,
name="publish_message")
if response.status_code in [200, 201, 202]:
self.message_count += 1
self.published_messages.append(message_id)
@task(3)
def test_consume_message(self):
"""Test consuming messages from queue"""
params = {
'count': random.randint(1, 5),
'timeout': 10,
'ack_mode': 'auto'
}
response = self.client.get(f'/api/queues/{self.queue_name}/messages',
params=params,
name="consume_message")
if response.status_code == 200:
try:
messages = response.json()
if isinstance(messages, list):
for message in messages:
self._process_consumed_message(message)
except json.JSONDecodeError:
pass
def _process_consumed_message(self, message):
"""Process consumed message and acknowledge"""
message_id = message.get('id')
if not message_id:
return
# Simulate message processing
processing_time = random.uniform(0.1, 0.5)
time.sleep(processing_time)
# Acknowledge message
ack_response = self.client.post(f'/api/queues/{self.queue_name}/ack',
json={'message_id': message_id},
name="acknowledge_message")
@task(2)
def test_batch_publish(self):
"""Test batch message publishing"""
batch_size = random.randint(5, 20)
messages = []
for i in range(batch_size):
message = {
"id": str(uuid.uuid4()),
"type": "batch_message",
"payload": {
"batch_index": i,
"batch_size": batch_size,
"timestamp": int(time.time())
},
"priority": 5
}
messages.append(message)
batch_payload = {
"queue": self.queue_name,
"messages": messages,
"batch_id": str(uuid.uuid4())
}
response = self.client.post('/api/queues/batch-publish',
json=batch_payload,
headers={'Content-Type': 'application/json'},
name="batch_publish")
@task(1)
def test_queue_management(self):
"""Test queue management operations"""
# Get queue stats
response = self.client.get(f'/api/queues/{self.queue_name}/stats',
name="queue_stats")
if response.status_code == 200:
try:
stats = response.json()
message_count = stats.get('message_count', 0)
consumer_count = stats.get('consumer_count', 0)
except json.JSONDecodeError:
pass
# Test queue purge (occasionally)
if random.random() < 0.1: # 10% chance
purge_response = self.client.delete(f'/api/queues/{self.queue_name}/messages',
name="purge_queue")
@task(1)
def test_dead_letter_queue(self):
"""Test dead letter queue functionality"""
# Publish message that will fail processing
failed_message = {
"id": str(uuid.uuid4()),
"type": "failing_message",
"payload": {
"should_fail": True,
"error_type": "processing_error"
},
"retry_count": 0,
"max_retries": 2
}
response = self.client.post(f'/api/queues/{self.queue_name}/messages',
json=failed_message,
headers={'Content-Type': 'application/json'},
name="publish_failing_message")
# Check dead letter queue after some time
time.sleep(2)
dlq_response = self.client.get(f'/api/queues/{self.queue_name}-dlq/messages',
params={'count': 1},
name="check_dead_letter_queue")
def on_stop(self):
"""Cleanup test queue"""
try:
self.client.delete(f'/api/queues/{self.queue_name}',
name="delete_queue")
except:
pass # Ignore cleanup errors
from locust import HttpUser, task, between
import json
import time
import uuid
class SQSUser(HttpUser):
wait_time = between(1, 2)
def on_start(self):
"""Initialize SQS testing"""
self.queue_url = "https://sqs.us-east-1.amazonaws.com/123456789/test-queue"
self.aws_region = "us-east-1"
@task(3)
def test_sqs_send_message(self):
"""Test SQS SendMessage API"""
message_body = {
"event": "user_action",
"user_id": f"user_{random.randint(1000, 9999)}",
"action": random.choice(["login", "logout", "purchase"]),
"timestamp": int(time.time())
}
payload = {
"Action": "SendMessage",
"QueueUrl": self.queue_url,
"MessageBody": json.dumps(message_body),
"MessageAttributes": {
"EventType": {
"StringValue": "user_action",
"DataType": "String"
},
"Priority": {
"StringValue": str(random.randint(1, 10)),
"DataType": "Number"
}
}
}
headers = {
'Content-Type': 'application/x-amz-json-1.0',
'X-Amz-Target': 'AWSSimpleQueueService.SendMessage'
}
response = self.client.post('/sqs',
json=payload,
headers=headers,
name="sqs_send_message")
@task(2)
def test_sqs_receive_message(self):
"""Test SQS ReceiveMessage API"""
payload = {
"Action": "ReceiveMessage",
"QueueUrl": self.queue_url,
"MaxNumberOfMessages": random.randint(1, 10),
"WaitTimeSeconds": 5,
"MessageAttributeNames": ["All"]
}
headers = {
'Content-Type': 'application/x-amz-json-1.0',
'X-Amz-Target': 'AWSSimpleQueueService.ReceiveMessage'
}
response = self.client.post('/sqs',
json=payload,
headers=headers,
name="sqs_receive_message")
if response.status_code == 200:
try:
result = response.json()
messages = result.get('Messages', [])
for message in messages:
self._delete_sqs_message(message.get('ReceiptHandle'))
except json.JSONDecodeError:
pass
def _delete_sqs_message(self, receipt_handle):
"""Delete processed SQS message"""
if not receipt_handle:
return
payload = {
"Action": "DeleteMessage",
"QueueUrl": self.queue_url,
"ReceiptHandle": receipt_handle
}
headers = {
'Content-Type': 'application/x-amz-json-1.0',
'X-Amz-Target': 'AWSSimpleQueueService.DeleteMessage'
}
self.client.post('/sqs',
json=payload,
headers=headers,
name="sqs_delete_message")
@task(1)
def test_sqs_batch_operations(self):
"""Test SQS batch send and receive"""
# Batch send
entries = []
for i in range(random.randint(2, 10)):
entry = {
"Id": str(i),
"MessageBody": json.dumps({
"batch_index": i,
"timestamp": int(time.time())
})
}
entries.append(entry)
batch_payload = {
"Action": "SendMessageBatch",
"QueueUrl": self.queue_url,
"Entries": entries
}
headers = {
'Content-Type': 'application/x-amz-json-1.0',
'X-Amz-Target': 'AWSSimpleQueueService.SendMessageBatch'
}
response = self.client.post('/sqs',
json=batch_payload,
headers=headers,
name="sqs_batch_send")
from locust import HttpUser, task, between
import json
import time
class RedisQueueUser(HttpUser):
wait_time = between(0.5, 2)
def on_start(self):
"""Initialize Redis queue testing"""
self.queue_key = f"queue:test:{int(time.time())}"
@task(4)
def test_redis_lpush(self):
"""Test Redis LPUSH (left push) operation"""
message = {
"id": str(uuid.uuid4()),
"data": f"message_{int(time.time())}",
"priority": random.randint(1, 5)
}
payload = {
"command": "LPUSH",
"key": self.queue_key,
"value": json.dumps(message)
}
response = self.client.post('/redis/queue',
json=payload,
name="redis_lpush")
@task(3)
def test_redis_brpop(self):
"""Test Redis BRPOP (blocking right pop) operation"""
payload = {
"command": "BRPOP",
"keys": [self.queue_key],
"timeout": 5
}
response = self.client.post('/redis/queue',
json=payload,
name="redis_brpop")
@task(1)
def test_redis_llen(self):
"""Test Redis LLEN (list length) operation"""
payload = {
"command": "LLEN",
"key": self.queue_key
}
response = self.client.post('/redis/queue',
json=payload,
name="redis_llen")
This guide provides comprehensive message queue testing patterns for reliable async processing systems with proper error handling and performance monitoring.