-
Notifications
You must be signed in to change notification settings - Fork 12
Producer
- Producer component consist of two elements
- Kafka Producer Config
- Kafka Producer Sampler
To publish/send a message to a Kafka topic you need to add producer components to the testplan.
- The Kafka Producer config is responsible to hold the connection information, which includes security and other properties required to talk to the broker.
- The Kakfa Producer Sampler helps to send messages to the topic with the connection established using Config element.
Right click on Test Plan -> Add -> Config Element -> Kafka Producer Config
Provide a Variable name to export the connection object (Which will be used in Sampler element)
Provide the Kafka connection configs (list of Brokers with comma separated)
Provide a Client ID (Make it unique, to define where you sending the message from)
Select the right security to connect to brokers (This will be completely based on how Kafka security is defined)
For JAAS Security, You need to add the below key and value to the Additional Properties
Config key: sasl.jaas.config
Config value: org.apache.kafka.common.security.scram.ScramLoginModule required username="<USERNAME>" password="<PASSWORD>";
Right click on Test Plan -> Add -> Sampler -> Kafka Producer Sampler
Use the same Variable name which was defined in the config element
Define the topic name where you want to send the message (Case sensitive)
Kafka Message - The Original message which needs to be pushed to the topic
Partition String (Optional) - This option helps you to post messages to particular partition by providing the partition number
Message Headers (Optional) - This helps in adding headers to the messages which are being pushed (Supports more than one header)
Supported Producer properties which can be added to Additional Properties field.
Property | Available Options | Default |
---|---|---|
acks | [0, 1, -1] | 1 |
batch.size | positive integer | 16384 |
bootstrap.servers | comma-separated host:port pairs | localhost:9092 |
buffer.memory | positive long | 33554432 |
client.id | string | "" |
compression.type | [none, gzip, snappy, lz4, zstd] | none |
connections.max.idle.ms | positive long | 540000 |
delivery.timeout.ms | positive long | 120000 |
enable.idempotence | [true, false] | false |
interceptor.classes | fully-qualified class names | [] |
key.serializer | fully-qualified class name | org.apache.kafka.common.serialization.StringSerializer |
linger.ms | non-negative integer | 0 |
max.block.ms | non-negative long | 60000 |
max.in.flight.requests.per.connection | positive integer | 5 |
max.request.size | positive integer | 1048576 |
metadata.fetch.timeout.ms | positive long | 60000 |
metadata.max.age.ms | positive long | 300000 |
partitioner.class | fully-qualified class name | org.apache.kafka.clients.producer.internals.DefaultPartitioner |
receive.buffer.bytes | positive integer | 32768 |
reconnect.backoff.ms | non-negative long | 50 |
request.timeout.ms | positive integer | 30000 |
retries | non-negative integer | 0 |
sasl.jaas.config | string | null |
sasl.kerberos.kinit.cmd | string | /usr/bin/kinit |
sasl.kerberos.min.time.before.relogin | positive long | 60000 |
sasl.kerberos.service.name | string | null |
sasl.mechanism | [GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512] | GSSAPI |
security.protocol | [PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL] | PLAINTEXT |
sender.flush.timeout.ms | non-negative long | 0 |
send.buffer.bytes | positive integer | 131072 |
value.serializer | fully-qualified class name | org.apache.kafka.common.serialization.StringSerializer |