Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue for multi-thread consumers. How to organize consumers for multi-thread access? #21

Open
polyakovmyu opened this issue Feb 19, 2024 · 7 comments
Assignees
Labels
enhancement New feature or request not a bug

Comments

@polyakovmyu
Copy link

Hello!
Assume that we have 1 pool of values for multiple threads and we want to get fresh value every time we ask for it.
Example of it is VTS which I used with LoadRunner. HTTP Simple Table Server has the same idea.
When I launch 2 threads I get exception: KafkaConsumer is not safe for multi-threaded access.
How should I configure consumers to get fresh values with multiple threads and is it possible?
consumer
consumer_config
logs_kafka

@rollno748
Copy link
Owner

Hello,

This plugin is designed in such a way to support the multi threaded functionality.

Is it possible for you to attach a sample test plan, I will have a look at it and come back at the earliest?

@polyakovmyu
Copy link
Author

polyakovmyu commented Feb 20, 2024

I have made simple test plan with all used plugins, modified elements were marked with >, except kafka elements.
jmeter version 5.6.3
java version 17
Kafka_playground.zip

@rollno748 rollno748 self-assigned this Feb 21, 2024
@rollno748 rollno748 added the bug Something isn't working label Feb 21, 2024
@rollno748
Copy link
Owner

Thanks for addressing this issue.
I have validated it, it is throwing error while running the consumer with multiple threads

2024-02-21 16:33:09,684 ERROR o.a.j.t.JMeterThread: Error while processing sampler: 'Thread-2-Read'.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
	at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2495) ~[kafka-clients-3.3.1.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2479) ~[kafka-clients-3.3.1.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227) ~[kafka-clients-3.3.1.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220) ~[kafka-clients-3.3.1.jar:?]
	at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.getConsumerRecords(KafkaConsumerSampler.java:90) ~[di-kafkameter-1.2.jar:?]
	at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.sample(KafkaConsumerSampler.java:73) ~[di-kafkameter-1.2.jar:?]
	at org.apache.jmeter.threads.JMeterThread.doSampling(JMeterThread.java:651) ~[ApacheJMeter_core.jar:5.6.2]
	at org.apache.jmeter.threads.JMeterThread.executeSamplePackage(JMeterThread.java:570) ~[ApacheJMeter_core.jar:5.6.2]
	at org.apache.jmeter.threads.JMeterThread.processSampler(JMeterThread.java:501) [ApacheJMeter_core.jar:5.6.2]
	at org.apache.jmeter.threads.JMeterThread.run(JMeterThread.java:268) [ApacheJMeter_core.jar:5.6.2]
	at java.lang.Thread.run(Thread.java:829) [?:?]

I will release a patch soon to fix this.
Keep supporting by hitting a star and open up issues, if you have bug

@rollno748
Copy link
Owner

Hey @polyakovmyu

After careful read about the consumer group, I came to know that, The Kafka consumer is not thread-safe, and multi-threaded access must be properly synchronized, which can be complex.

Typically, a single-threaded model is used where each consumer in a consumer group is mapped to a partition of the topic.

If you want to use multiple threads, you should ensure that each thread is consuming from a different partition to avoid conflicts and potential data inconsistencies

https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/

https://stackoverflow.com/questions/44587416/kafka-single-consumer-group-in-multiple-instances

So, you should try reading each thread to partition instead of reading from a global factor on where the broker decides

I will look for other options to make it multi threaded - until then, will keep this issue open. If you come across any implementation idea, pls lmk

@rollno748 rollno748 added enhancement New feature or request not a bug and removed bug Something isn't working labels Mar 24, 2024
@polyakovmyu
Copy link
Author

My solution is simple table server plugin - enough for my tasks.
It would be a problem in case you want to load kafka itself.

@rollno748
Copy link
Owner

Reading is not always a bottleneck its the insert. Btw, will update here when I implement a logic

I have a couple of options in mind but it won't be a replica of the real time scenario

@amolsr
Copy link

amolsr commented Apr 18, 2024

bump

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request not a bug
Projects
None yet
Development

No branches or pull requests

3 participants