This guide demonstrates how to test message queue APIs with LoadForge, covering popular queue systems like RabbitMQ, AWS SQS, and Redis queues.

Use Cases

Testing message queue throughput and reliability

Validating queue API endpoints for publishing and consuming

Load testing async processing systems

Testing queue management operations (create, delete, purge)

Validating message ordering and delivery guarantees

Basic Queue API Testing

from locust import HttpUser, task, between import json import time import uuid import random class MessageQueueUser(HttpUser): wait_time = between(1, 3) def on_start(self): """Initialize queue testing""" self.queue_name = f"test-queue-{int(time.time())}" self.message_count = 0 self.published_messages = [] # Create test queue self._create_queue() def _create_queue(self): """Create a test queue""" queue_config = { "name": self.queue_name, "durable": True, "auto_delete": False, "max_length": 1000 } response = self.client.post('/api/queues', json=queue_config, headers={'Content-Type': 'application/json'}, name="create_queue") if response.status_code not in [200, 201, 409]: # 409 = already exists print(f"Failed to create queue: {response.status_code}") @task(4) def test_publish_message(self): """Test publishing messages to queue""" message_id = str(uuid.uuid4()) message = { "id": message_id, "type": random.choice(["order", "notification", "email", "sms"]), "payload": { "user_id": f"user_{random.randint(1000, 9999)}", "action": random.choice(["create", "update", "delete"]), "timestamp": int(time.time()), "data": {"key": f"value_{random.randint(1, 100)}"} }, "priority": random.randint(1, 10), "retry_count": 0, "max_retries": 3 } headers = { 'Content-Type': 'application/json', 'X-Message-ID': message_id, 'X-Queue-Name': self.queue_name } response = self.client.post(f'/api/queues/{self.queue_name}/messages', json=message, headers=headers, name="publish_message") if response.status_code in [200, 201, 202]: self.message_count += 1 self.published_messages.append(message_id) @task(3) def test_consume_message(self): """Test consuming messages from queue""" params = { 'count': random.randint(1, 5), 'timeout': 10, 'ack_mode': 'auto' } response = self.client.get(f'/api/queues/{self.queue_name}/messages', params=params, name="consume_message") if response.status_code == 200: try: messages = response.json() if isinstance(messages, list): for message in messages: self._process_consumed_message(message) except json.JSONDecodeError: pass def _process_consumed_message(self, message): """Process consumed message and acknowledge""" message_id = message.get('id') if not message_id: return # Simulate message processing processing_time = random.uniform(0.1, 0.5) time.sleep(processing_time) # Acknowledge message ack_response = self.client.post(f'/api/queues/{self.queue_name}/ack', json={'message_id': message_id}, name="acknowledge_message") @task(2) def test_batch_publish(self): """Test batch message publishing""" batch_size = random.randint(5, 20) messages = [] for i in range(batch_size): message = { "id": str(uuid.uuid4()), "type": "batch_message", "payload": { "batch_index": i, "batch_size": batch_size, "timestamp": int(time.time()) }, "priority": 5 } messages.append(message) batch_payload = { "queue": self.queue_name, "messages": messages, "batch_id": str(uuid.uuid4()) } response = self.client.post('/api/queues/batch-publish', json=batch_payload, headers={'Content-Type': 'application/json'}, name="batch_publish") @task(1) def test_queue_management(self): """Test queue management operations""" # Get queue stats response = self.client.get(f'/api/queues/{self.queue_name}/stats', name="queue_stats") if response.status_code == 200: try: stats = response.json() message_count = stats.get('message_count', 0) consumer_count = stats.get('consumer_count', 0) except json.JSONDecodeError: pass # Test queue purge (occasionally) if random.random() < 0.1: # 10% chance purge_response = self.client.delete(f'/api/queues/{self.queue_name}/messages', name="purge_queue") @task(1) def test_dead_letter_queue(self): """Test dead letter queue functionality""" # Publish message that will fail processing failed_message = { "id": str(uuid.uuid4()), "type": "failing_message", "payload": { "should_fail": True, "error_type": "processing_error" }, "retry_count": 0, "max_retries": 2 } response = self.client.post(f'/api/queues/{self.queue_name}/messages', json=failed_message, headers={'Content-Type': 'application/json'}, name="publish_failing_message") # Check dead letter queue after some time time.sleep(2) dlq_response = self.client.get(f'/api/queues/{self.queue_name}-dlq/messages', params={'count': 1}, name="check_dead_letter_queue") def on_stop(self): """Cleanup test queue""" try: self.client.delete(f'/api/queues/{self.queue_name}', name="delete_queue") except: pass # Ignore cleanup errors

AWS SQS Testing

from locust import HttpUser, task, between import json import time import uuid class SQSUser(HttpUser): wait_time = between(1, 2) def on_start(self): """Initialize SQS testing""" self.queue_url = "https://sqs.us-east-1.amazonaws.com/123456789/test-queue" self.aws_region = "us-east-1" @task(3) def test_sqs_send_message(self): """Test SQS SendMessage API""" message_body = { "event": "user_action", "user_id": f"user_{random.randint(1000, 9999)}", "action": random.choice(["login", "logout", "purchase"]), "timestamp": int(time.time()) } payload = { "Action": "SendMessage", "QueueUrl": self.queue_url, "MessageBody": json.dumps(message_body), "MessageAttributes": { "EventType": { "StringValue": "user_action", "DataType": "String" }, "Priority": { "StringValue": str(random.randint(1, 10)), "DataType": "Number" } } } headers = { 'Content-Type': 'application/x-amz-json-1.0', 'X-Amz-Target': 'AWSSimpleQueueService.SendMessage' } response = self.client.post('/sqs', json=payload, headers=headers, name="sqs_send_message") @task(2) def test_sqs_receive_message(self): """Test SQS ReceiveMessage API""" payload = { "Action": "ReceiveMessage", "QueueUrl": self.queue_url, "MaxNumberOfMessages": random.randint(1, 10), "WaitTimeSeconds": 5, "MessageAttributeNames": ["All"] } headers = { 'Content-Type': 'application/x-amz-json-1.0', 'X-Amz-Target': 'AWSSimpleQueueService.ReceiveMessage' } response = self.client.post('/sqs', json=payload, headers=headers, name="sqs_receive_message") if response.status_code == 200: try: result = response.json() messages = result.get('Messages', []) for message in messages: self._delete_sqs_message(message.get('ReceiptHandle')) except json.JSONDecodeError: pass def _delete_sqs_message(self, receipt_handle): """Delete processed SQS message""" if not receipt_handle: return payload = { "Action": "DeleteMessage", "QueueUrl": self.queue_url, "ReceiptHandle": receipt_handle } headers = { 'Content-Type': 'application/x-amz-json-1.0', 'X-Amz-Target': 'AWSSimpleQueueService.DeleteMessage' } self.client.post('/sqs', json=payload, headers=headers, name="sqs_delete_message") @task(1) def test_sqs_batch_operations(self): """Test SQS batch send and receive""" # Batch send entries = [] for i in range(random.randint(2, 10)): entry = { "Id": str(i), "MessageBody": json.dumps({ "batch_index": i, "timestamp": int(time.time()) }) } entries.append(entry) batch_payload = { "Action": "SendMessageBatch", "QueueUrl": self.queue_url, "Entries": entries } headers = { 'Content-Type': 'application/x-amz-json-1.0', 'X-Amz-Target': 'AWSSimpleQueueService.SendMessageBatch' } response = self.client.post('/sqs', json=batch_payload, headers=headers, name="sqs_batch_send")

Redis Queue Testing

from locust import HttpUser, task, between import json import time class RedisQueueUser(HttpUser): wait_time = between(0.5, 2) def on_start(self): """Initialize Redis queue testing""" self.queue_key = f"queue:test:{int(time.time())}" @task(4) def test_redis_lpush(self): """Test Redis LPUSH (left push) operation""" message = { "id": str(uuid.uuid4()), "data": f"message_{int(time.time())}", "priority": random.randint(1, 5) } payload = { "command": "LPUSH", "key": self.queue_key, "value": json.dumps(message) } response = self.client.post('/redis/queue', json=payload, name="redis_lpush") @task(3) def test_redis_brpop(self): """Test Redis BRPOP (blocking right pop) operation""" payload = { "command": "BRPOP", "keys": [self.queue_key], "timeout": 5 } response = self.client.post('/redis/queue', json=payload, name="redis_brpop") @task(1) def test_redis_llen(self): """Test Redis LLEN (list length) operation""" payload = { "command": "LLEN", "key": self.queue_key } response = self.client.post('/redis/queue', json=payload, name="redis_llen")

Key Testing Points

Message Publishing: Test message creation and queue insertion Message Consumption: Test message retrieval and processing Batch Operations: Test bulk message handling Queue Management: Test queue creation, deletion, and stats Error Handling: Test dead letter queues and retry logic Performance: Monitor throughput and latency