<- Test Directory

Kafka Load Testing

How to customize LoadForge tests to test stress test Apache Kafka

Overview

By default, LoadForge tests plain HTTP/HTTPS sites. However, as you can use python for test definitions it is easy to expand beyond that.

The below example allows you to load test an Apache Kafka installation. This use case is not officially supported, but you can see the example script below from the locust community.

Code

from locust_plugins import run_single_user
from locust_plugins.users import KafkaUser
from locust import task
import os


class MyUser(KafkaUser):
    bootstrap_servers = os.environ["LOCUST_KAFKA_SERVERS"]

    @task
    def t(self):
        self.client.send("lafp_test", b"payload")
        # if you dont poll immediately after sending message your timings will be incorrect
        # (but if throughput is most important then you may want to delay it)
        self.client.producer.poll(1)


        # How to set up a (global) consumer and read the last message. Consider this as inspiration, it might not work for you.
        # And it is probably out of date. Probably best to ignore this.
        #
        # @events.init.add_listener
        # def on_locust_init(environment, **_kwargs):
        #     consumer = KafkaConsumer(MyUser.bootstrap_servers)
        #     tp = TopicPartition("my_topic", 0)
        #     consumer.assign([tp])
        #     last_offset = consumer.position(tp)
        #     consumer.seek(tp, last_offset - 1)
        #     last_message = next(consumer)
        #     last = someProtobufObject()
        #     last.ParseFromString(last_message.value)
        #     environment.events.request_success.fire(
        #         request_type="CONSUME", name="retrans1", response_time=0, response_length=0,
        #     )
        #     control_consumer(environment)
        #     gevent.spawn(wait_for_retrans, environment, consumer)
        #
        # def wait_for_retrans(environment: Environment, consumer):
        #     for message in consumer:
        #         with sema:
        #             control_message = someProtobufObject().FromString(message.value)
        #             environment.events.request_success.fire(
        #                 request_type="CONSUME",
        #                 name="retrans2",
        #                 response_time=0,
        #                 response_length=0,
        #             )

if __name__ == "__main__":
    run_single_user(MyUser)    


This guide is part of the LoadForge Directory, an index of locustfile's for use with LoadForge website and API load tests. We also provide a wizard to generate tests, and onboarding assistance for clients. Contact us should you have any questions.

LoadForge provides load testing and stress tests for websites, APIs and servers. It uses your cloud account to rapidly scale large numbers of simulated users to load test your website, store, API, or application for cheap - just cents per test!

For more help on Tests please see our official documentation. Logged in users can also use our wizard to generate a locustfile, or you can record your browser steps.

Ready to run your test?
Start your first test within minutes.