How to customize LoadForge tests to test stress test Apache Kafka
By default, LoadForge tests plain HTTP/HTTPS sites. However, as you can use python for test definitions it is easy to expand beyond that.
The below example allows you to load test an Apache Kafka installation. This use case is not officially supported, but you can see the example script below from the locust community.
from locust_plugins import run_single_user
from locust_plugins.users import KafkaUser
from locust import task
import os
class MyUser(KafkaUser):
bootstrap_servers = os.environ["LOCUST_KAFKA_SERVERS"]
@task
def t(self):
self.client.send("lafp_test", b"payload")
# if you dont poll immediately after sending message your timings will be incorrect
# (but if throughput is most important then you may want to delay it)
self.client.producer.poll(1)
# How to set up a (global) consumer and read the last message. Consider this as inspiration, it might not work for you.
# And it is probably out of date. Probably best to ignore this.
#
# @events.init.add_listener
# def on_locust_init(environment, **_kwargs):
# consumer = KafkaConsumer(MyUser.bootstrap_servers)
# tp = TopicPartition("my_topic", 0)
# consumer.assign([tp])
# last_offset = consumer.position(tp)
# consumer.seek(tp, last_offset - 1)
# last_message = next(consumer)
# last = someProtobufObject()
# last.ParseFromString(last_message.value)
# environment.events.request_success.fire(
# request_type="CONSUME", name="retrans1", response_time=0, response_length=0,
# )
# control_consumer(environment)
# gevent.spawn(wait_for_retrans, environment, consumer)
#
# def wait_for_retrans(environment: Environment, consumer):
# for message in consumer:
# with sema:
# control_message = someProtobufObject().FromString(message.value)
# environment.events.request_success.fire(
# request_type="CONSUME",
# name="retrans2",
# response_time=0,
# response_length=0,
# )
if __name__ == "__main__":
run_single_user(MyUser)