Message Queue APIs Testing

Test message queue APIs including RabbitMQ, SQS, and Redis queues

LoadForge can record your browser, graphically build tests, scan your site with a wizard and more. Sign up now to run your first test.

Sign up now


This guide demonstrates how to test message queue APIs with LoadForge, covering popular queue systems like RabbitMQ, AWS SQS, and Redis queues.

Use Cases

  • Testing message queue throughput and reliability
  • Validating queue API endpoints for publishing and consuming
  • Load testing async processing systems
  • Testing queue management operations (create, delete, purge)
  • Validating message ordering and delivery guarantees

Basic Queue API Testing

from locust import HttpUser, task, between
import json
import time
import uuid
import random

class MessageQueueUser(HttpUser):
    wait_time = between(1, 3)
    
    def on_start(self):
        """Initialize queue testing"""
        self.queue_name = f"test-queue-{int(time.time())}"
        self.message_count = 0
        self.published_messages = []
        
        # Create test queue
        self._create_queue()
        
    def _create_queue(self):
        """Create a test queue"""
        queue_config = {
            "name": self.queue_name,
            "durable": True,
            "auto_delete": False,
            "max_length": 1000
        }
        
        response = self.client.post('/api/queues',
                                  json=queue_config,
                                  headers={'Content-Type': 'application/json'},
                                  name="create_queue")
        
        if response.status_code not in [200, 201, 409]:  # 409 = already exists
            print(f"Failed to create queue: {response.status_code}")

    @task(4)
    def test_publish_message(self):
        """Test publishing messages to queue"""
        message_id = str(uuid.uuid4())
        message = {
            "id": message_id,
            "type": random.choice(["order", "notification", "email", "sms"]),
            "payload": {
                "user_id": f"user_{random.randint(1000, 9999)}",
                "action": random.choice(["create", "update", "delete"]),
                "timestamp": int(time.time()),
                "data": {"key": f"value_{random.randint(1, 100)}"}
            },
            "priority": random.randint(1, 10),
            "retry_count": 0,
            "max_retries": 3
        }
        
        headers = {
            'Content-Type': 'application/json',
            'X-Message-ID': message_id,
            'X-Queue-Name': self.queue_name
        }
        
        response = self.client.post(f'/api/queues/{self.queue_name}/messages',
                                  json=message,
                                  headers=headers,
                                  name="publish_message")
        
        if response.status_code in [200, 201, 202]:
            self.message_count += 1
            self.published_messages.append(message_id)

    @task(3)
    def test_consume_message(self):
        """Test consuming messages from queue"""
        params = {
            'count': random.randint(1, 5),
            'timeout': 10,
            'ack_mode': 'auto'
        }
        
        response = self.client.get(f'/api/queues/{self.queue_name}/messages',
                                 params=params,
                                 name="consume_message")
        
        if response.status_code == 200:
            try:
                messages = response.json()
                if isinstance(messages, list):
                    for message in messages:
                        self._process_consumed_message(message)
            except json.JSONDecodeError:
                pass

    def _process_consumed_message(self, message):
        """Process consumed message and acknowledge"""
        message_id = message.get('id')
        if not message_id:
            return
            
        # Simulate message processing
        processing_time = random.uniform(0.1, 0.5)
        time.sleep(processing_time)
        
        # Acknowledge message
        ack_response = self.client.post(f'/api/queues/{self.queue_name}/ack',
                                      json={'message_id': message_id},
                                      name="acknowledge_message")

    @task(2)
    def test_batch_publish(self):
        """Test batch message publishing"""
        batch_size = random.randint(5, 20)
        messages = []
        
        for i in range(batch_size):
            message = {
                "id": str(uuid.uuid4()),
                "type": "batch_message",
                "payload": {
                    "batch_index": i,
                    "batch_size": batch_size,
                    "timestamp": int(time.time())
                },
                "priority": 5
            }
            messages.append(message)
        
        batch_payload = {
            "queue": self.queue_name,
            "messages": messages,
            "batch_id": str(uuid.uuid4())
        }
        
        response = self.client.post('/api/queues/batch-publish',
                                  json=batch_payload,
                                  headers={'Content-Type': 'application/json'},
                                  name="batch_publish")

    @task(1)
    def test_queue_management(self):
        """Test queue management operations"""
        # Get queue stats
        response = self.client.get(f'/api/queues/{self.queue_name}/stats',
                                 name="queue_stats")
        
        if response.status_code == 200:
            try:
                stats = response.json()
                message_count = stats.get('message_count', 0)
                consumer_count = stats.get('consumer_count', 0)
            except json.JSONDecodeError:
                pass
        
        # Test queue purge (occasionally)
        if random.random() < 0.1:  # 10% chance
            purge_response = self.client.delete(f'/api/queues/{self.queue_name}/messages',
                                              name="purge_queue")

    @task(1)
    def test_dead_letter_queue(self):
        """Test dead letter queue functionality"""
        # Publish message that will fail processing
        failed_message = {
            "id": str(uuid.uuid4()),
            "type": "failing_message",
            "payload": {
                "should_fail": True,
                "error_type": "processing_error"
            },
            "retry_count": 0,
            "max_retries": 2
        }
        
        response = self.client.post(f'/api/queues/{self.queue_name}/messages',
                                  json=failed_message,
                                  headers={'Content-Type': 'application/json'},
                                  name="publish_failing_message")
        
        # Check dead letter queue after some time
        time.sleep(2)
        
        dlq_response = self.client.get(f'/api/queues/{self.queue_name}-dlq/messages',
                                     params={'count': 1},
                                     name="check_dead_letter_queue")

    def on_stop(self):
        """Cleanup test queue"""
        try:
            self.client.delete(f'/api/queues/{self.queue_name}',
                             name="delete_queue")
        except:
            pass  # Ignore cleanup errors

AWS SQS Testing

from locust import HttpUser, task, between
import json
import time
import uuid

class SQSUser(HttpUser):
    wait_time = between(1, 2)
    
    def on_start(self):
        """Initialize SQS testing"""
        self.queue_url = "https://sqs.us-east-1.amazonaws.com/123456789/test-queue"
        self.aws_region = "us-east-1"
        
    @task(3)
    def test_sqs_send_message(self):
        """Test SQS SendMessage API"""
        message_body = {
            "event": "user_action",
            "user_id": f"user_{random.randint(1000, 9999)}",
            "action": random.choice(["login", "logout", "purchase"]),
            "timestamp": int(time.time())
        }
        
        payload = {
            "Action": "SendMessage",
            "QueueUrl": self.queue_url,
            "MessageBody": json.dumps(message_body),
            "MessageAttributes": {
                "EventType": {
                    "StringValue": "user_action",
                    "DataType": "String"
                },
                "Priority": {
                    "StringValue": str(random.randint(1, 10)),
                    "DataType": "Number"
                }
            }
        }
        
        headers = {
            'Content-Type': 'application/x-amz-json-1.0',
            'X-Amz-Target': 'AWSSimpleQueueService.SendMessage'
        }
        
        response = self.client.post('/sqs',
                                  json=payload,
                                  headers=headers,
                                  name="sqs_send_message")

    @task(2)
    def test_sqs_receive_message(self):
        """Test SQS ReceiveMessage API"""
        payload = {
            "Action": "ReceiveMessage",
            "QueueUrl": self.queue_url,
            "MaxNumberOfMessages": random.randint(1, 10),
            "WaitTimeSeconds": 5,
            "MessageAttributeNames": ["All"]
        }
        
        headers = {
            'Content-Type': 'application/x-amz-json-1.0',
            'X-Amz-Target': 'AWSSimpleQueueService.ReceiveMessage'
        }
        
        response = self.client.post('/sqs',
                                  json=payload,
                                  headers=headers,
                                  name="sqs_receive_message")
        
        if response.status_code == 200:
            try:
                result = response.json()
                messages = result.get('Messages', [])
                for message in messages:
                    self._delete_sqs_message(message.get('ReceiptHandle'))
            except json.JSONDecodeError:
                pass

    def _delete_sqs_message(self, receipt_handle):
        """Delete processed SQS message"""
        if not receipt_handle:
            return
            
        payload = {
            "Action": "DeleteMessage",
            "QueueUrl": self.queue_url,
            "ReceiptHandle": receipt_handle
        }
        
        headers = {
            'Content-Type': 'application/x-amz-json-1.0',
            'X-Amz-Target': 'AWSSimpleQueueService.DeleteMessage'
        }
        
        self.client.post('/sqs',
                       json=payload,
                       headers=headers,
                       name="sqs_delete_message")

    @task(1)
    def test_sqs_batch_operations(self):
        """Test SQS batch send and receive"""
        # Batch send
        entries = []
        for i in range(random.randint(2, 10)):
            entry = {
                "Id": str(i),
                "MessageBody": json.dumps({
                    "batch_index": i,
                    "timestamp": int(time.time())
                })
            }
            entries.append(entry)
        
        batch_payload = {
            "Action": "SendMessageBatch",
            "QueueUrl": self.queue_url,
            "Entries": entries
        }
        
        headers = {
            'Content-Type': 'application/x-amz-json-1.0',
            'X-Amz-Target': 'AWSSimpleQueueService.SendMessageBatch'
        }
        
        response = self.client.post('/sqs',
                                  json=batch_payload,
                                  headers=headers,
                                  name="sqs_batch_send")

Redis Queue Testing

from locust import HttpUser, task, between
import json
import time

class RedisQueueUser(HttpUser):
    wait_time = between(0.5, 2)
    
    def on_start(self):
        """Initialize Redis queue testing"""
        self.queue_key = f"queue:test:{int(time.time())}"
        
    @task(4)
    def test_redis_lpush(self):
        """Test Redis LPUSH (left push) operation"""
        message = {
            "id": str(uuid.uuid4()),
            "data": f"message_{int(time.time())}",
            "priority": random.randint(1, 5)
        }
        
        payload = {
            "command": "LPUSH",
            "key": self.queue_key,
            "value": json.dumps(message)
        }
        
        response = self.client.post('/redis/queue',
                                  json=payload,
                                  name="redis_lpush")

    @task(3)
    def test_redis_brpop(self):
        """Test Redis BRPOP (blocking right pop) operation"""
        payload = {
            "command": "BRPOP",
            "keys": [self.queue_key],
            "timeout": 5
        }
        
        response = self.client.post('/redis/queue',
                                  json=payload,
                                  name="redis_brpop")

    @task(1)
    def test_redis_llen(self):
        """Test Redis LLEN (list length) operation"""
        payload = {
            "command": "LLEN",
            "key": self.queue_key
        }
        
        response = self.client.post('/redis/queue',
                                  json=payload,
                                  name="redis_llen")

Key Testing Points

  1. Message Publishing: Test message creation and queue insertion
  2. Message Consumption: Test message retrieval and processing
  3. Batch Operations: Test bulk message handling
  4. Queue Management: Test queue creation, deletion, and stats
  5. Error Handling: Test dead letter queues and retry logic
  6. Performance: Monitor throughput and latency

This guide provides comprehensive message queue testing patterns for reliable async processing systems with proper error handling and performance monitoring. 

Ready to run your test?
Run your test today with LoadForge.