This guide demonstrates how to test message queue APIs with LoadForge, covering popular queue systems like RabbitMQ, AWS SQS, and Redis queues.
Use Cases
- Testing message queue throughput and reliability
- Validating queue API endpoints for publishing and consuming
- Load testing async processing systems
- Testing queue management operations (create, delete, purge)
- Validating message ordering and delivery guarantees
Basic Queue API Testing
from locust import HttpUser, task, between
import json
import time
import uuid
import random
class MessageQueueUser(HttpUser):
wait_time = between(1, 3)
def on_start(self):
"""Initialize queue testing"""
self.queue_name = f"test-queue-{int(time.time())}"
self.message_count = 0
self.published_messages = []
# Create test queue
self._create_queue()
def _create_queue(self):
"""Create a test queue"""
queue_config = {
"name": self.queue_name,
"durable": True,
"auto_delete": False,
"max_length": 1000
}
response = self.client.post('/api/queues',
json=queue_config,
headers={'Content-Type': 'application/json'},
name="create_queue")
if response.status_code not in [200, 201, 409]: # 409 = already exists
print(f"Failed to create queue: {response.status_code}")
@task(4)
def test_publish_message(self):
"""Test publishing messages to queue"""
message_id = str(uuid.uuid4())
message = {
"id": message_id,
"type": random.choice(["order", "notification", "email", "sms"]),
"payload": {
"user_id": f"user_{random.randint(1000, 9999)}",
"action": random.choice(["create", "update", "delete"]),
"timestamp": int(time.time()),
"data": {"key": f"value_{random.randint(1, 100)}"}
},
"priority": random.randint(1, 10),
"retry_count": 0,
"max_retries": 3
}
headers = {
'Content-Type': 'application/json',
'X-Message-ID': message_id,
'X-Queue-Name': self.queue_name
}
response = self.client.post(f'/api/queues/{self.queue_name}/messages',
json=message,
headers=headers,
name="publish_message")
if response.status_code in [200, 201, 202]:
self.message_count += 1
self.published_messages.append(message_id)
@task(3)
def test_consume_message(self):
"""Test consuming messages from queue"""
params = {
'count': random.randint(1, 5),
'timeout': 10,
'ack_mode': 'auto'
}
response = self.client.get(f'/api/queues/{self.queue_name}/messages',
params=params,
name="consume_message")
if response.status_code == 200:
try:
messages = response.json()
if isinstance(messages, list):
for message in messages:
self._process_consumed_message(message)
except json.JSONDecodeError:
pass
def _process_consumed_message(self, message):
"""Process consumed message and acknowledge"""
message_id = message.get('id')
if not message_id:
return
# Simulate message processing
processing_time = random.uniform(0.1, 0.5)
time.sleep(processing_time)
# Acknowledge message
ack_response = self.client.post(f'/api/queues/{self.queue_name}/ack',
json={'message_id': message_id},
name="acknowledge_message")
@task(2)
def test_batch_publish(self):
"""Test batch message publishing"""
batch_size = random.randint(5, 20)
messages = []
for i in range(batch_size):
message = {
"id": str(uuid.uuid4()),
"type": "batch_message",
"payload": {
"batch_index": i,
"batch_size": batch_size,
"timestamp": int(time.time())
},
"priority": 5
}
messages.append(message)
batch_payload = {
"queue": self.queue_name,
"messages": messages,
"batch_id": str(uuid.uuid4())
}
response = self.client.post('/api/queues/batch-publish',
json=batch_payload,
headers={'Content-Type': 'application/json'},
name="batch_publish")
@task(1)
def test_queue_management(self):
"""Test queue management operations"""
# Get queue stats
response = self.client.get(f'/api/queues/{self.queue_name}/stats',
name="queue_stats")
if response.status_code == 200:
try:
stats = response.json()
message_count = stats.get('message_count', 0)
consumer_count = stats.get('consumer_count', 0)
except json.JSONDecodeError:
pass
# Test queue purge (occasionally)
if random.random() < 0.1: # 10% chance
purge_response = self.client.delete(f'/api/queues/{self.queue_name}/messages',
name="purge_queue")
@task(1)
def test_dead_letter_queue(self):
"""Test dead letter queue functionality"""
# Publish message that will fail processing
failed_message = {
"id": str(uuid.uuid4()),
"type": "failing_message",
"payload": {
"should_fail": True,
"error_type": "processing_error"
},
"retry_count": 0,
"max_retries": 2
}
response = self.client.post(f'/api/queues/{self.queue_name}/messages',
json=failed_message,
headers={'Content-Type': 'application/json'},
name="publish_failing_message")
# Check dead letter queue after some time
time.sleep(2)
dlq_response = self.client.get(f'/api/queues/{self.queue_name}-dlq/messages',
params={'count': 1},
name="check_dead_letter_queue")
def on_stop(self):
"""Cleanup test queue"""
try:
self.client.delete(f'/api/queues/{self.queue_name}',
name="delete_queue")
except:
pass # Ignore cleanup errors
AWS SQS Testing
from locust import HttpUser, task, between
import json
import time
import uuid
class SQSUser(HttpUser):
wait_time = between(1, 2)
def on_start(self):
"""Initialize SQS testing"""
self.queue_url = "https://sqs.us-east-1.amazonaws.com/123456789/test-queue"
self.aws_region = "us-east-1"
@task(3)
def test_sqs_send_message(self):
"""Test SQS SendMessage API"""
message_body = {
"event": "user_action",
"user_id": f"user_{random.randint(1000, 9999)}",
"action": random.choice(["login", "logout", "purchase"]),
"timestamp": int(time.time())
}
payload = {
"Action": "SendMessage",
"QueueUrl": self.queue_url,
"MessageBody": json.dumps(message_body),
"MessageAttributes": {
"EventType": {
"StringValue": "user_action",
"DataType": "String"
},
"Priority": {
"StringValue": str(random.randint(1, 10)),
"DataType": "Number"
}
}
}
headers = {
'Content-Type': 'application/x-amz-json-1.0',
'X-Amz-Target': 'AWSSimpleQueueService.SendMessage'
}
response = self.client.post('/sqs',
json=payload,
headers=headers,
name="sqs_send_message")
@task(2)
def test_sqs_receive_message(self):
"""Test SQS ReceiveMessage API"""
payload = {
"Action": "ReceiveMessage",
"QueueUrl": self.queue_url,
"MaxNumberOfMessages": random.randint(1, 10),
"WaitTimeSeconds": 5,
"MessageAttributeNames": ["All"]
}
headers = {
'Content-Type': 'application/x-amz-json-1.0',
'X-Amz-Target': 'AWSSimpleQueueService.ReceiveMessage'
}
response = self.client.post('/sqs',
json=payload,
headers=headers,
name="sqs_receive_message")
if response.status_code == 200:
try:
result = response.json()
messages = result.get('Messages', [])
for message in messages:
self._delete_sqs_message(message.get('ReceiptHandle'))
except json.JSONDecodeError:
pass
def _delete_sqs_message(self, receipt_handle):
"""Delete processed SQS message"""
if not receipt_handle:
return
payload = {
"Action": "DeleteMessage",
"QueueUrl": self.queue_url,
"ReceiptHandle": receipt_handle
}
headers = {
'Content-Type': 'application/x-amz-json-1.0',
'X-Amz-Target': 'AWSSimpleQueueService.DeleteMessage'
}
self.client.post('/sqs',
json=payload,
headers=headers,
name="sqs_delete_message")
@task(1)
def test_sqs_batch_operations(self):
"""Test SQS batch send and receive"""
# Batch send
entries = []
for i in range(random.randint(2, 10)):
entry = {
"Id": str(i),
"MessageBody": json.dumps({
"batch_index": i,
"timestamp": int(time.time())
})
}
entries.append(entry)
batch_payload = {
"Action": "SendMessageBatch",
"QueueUrl": self.queue_url,
"Entries": entries
}
headers = {
'Content-Type': 'application/x-amz-json-1.0',
'X-Amz-Target': 'AWSSimpleQueueService.SendMessageBatch'
}
response = self.client.post('/sqs',
json=batch_payload,
headers=headers,
name="sqs_batch_send")
Redis Queue Testing
from locust import HttpUser, task, between
import json
import time
class RedisQueueUser(HttpUser):
wait_time = between(0.5, 2)
def on_start(self):
"""Initialize Redis queue testing"""
self.queue_key = f"queue:test:{int(time.time())}"
@task(4)
def test_redis_lpush(self):
"""Test Redis LPUSH (left push) operation"""
message = {
"id": str(uuid.uuid4()),
"data": f"message_{int(time.time())}",
"priority": random.randint(1, 5)
}
payload = {
"command": "LPUSH",
"key": self.queue_key,
"value": json.dumps(message)
}
response = self.client.post('/redis/queue',
json=payload,
name="redis_lpush")
@task(3)
def test_redis_brpop(self):
"""Test Redis BRPOP (blocking right pop) operation"""
payload = {
"command": "BRPOP",
"keys": [self.queue_key],
"timeout": 5
}
response = self.client.post('/redis/queue',
json=payload,
name="redis_brpop")
@task(1)
def test_redis_llen(self):
"""Test Redis LLEN (list length) operation"""
payload = {
"command": "LLEN",
"key": self.queue_key
}
response = self.client.post('/redis/queue',
json=payload,
name="redis_llen")
Key Testing Points
- Message Publishing: Test message creation and queue insertion
- Message Consumption: Test message retrieval and processing
- Batch Operations: Test bulk message handling
- Queue Management: Test queue creation, deletion, and stats
- Error Handling: Test dead letter queues and retry logic
- Performance: Monitor throughput and latency
This guide provides comprehensive message queue testing patterns for reliable async processing systems with proper error handling and performance monitoring.