Skip to content
This repository has been archived by the owner on Mar 22, 2022. It is now read-only.

Commit

Permalink
Let simulations set topic per request
Browse files Browse the repository at this point in the history
  • Loading branch information
reftel committed Jan 10, 2020
1 parent 2deebc5 commit b320e43
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,16 @@ class KafkaRequestAction[K,V]( val producer: KafkaProducer[K,V],

kafkaAttributes payload session map { payload =>

val topic = kafkaAttributes.topic match {
case Some(k) => k(session).toOption.get
case None => kafkaProtocol.topic
}

val record = kafkaAttributes.key match {
case Some(k) =>
new ProducerRecord[K, V](kafkaProtocol.topic, k(session).toOption.get, payload)
new ProducerRecord[K, V](topic, k(session).toOption.get, payload)
case None =>
new ProducerRecord[K, V](kafkaProtocol.topic, payload)
new ProducerRecord[K, V](topic, payload)
}

val requestStartDate = clock.nowMillis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@ import io.gatling.core.session._

case class KafkaAttributes[K,V]( requestName: Expression[String],
key: Option[Expression[K]],
payload: Expression[V] )
payload: Expression[V],
topic: Option[Expression[String]])

case class KafkaRequestBuilder(requestName: Expression[String]) {

def send[V](payload: Expression[V]): KafkaRequestActionBuilder[_,V] = send(payload, None)
def send[V](payload: Expression[V]): KafkaRequestActionBuilder[_,V] = send(payload, None, None)

def send[K,V](key: Expression[K], payload: Expression[V]): KafkaRequestActionBuilder[K,V] = send(payload, Some(key))
def send[K,V](key: Expression[K], payload: Expression[V]): KafkaRequestActionBuilder[K,V] = send(payload, Some(key), None)

private def send[K,V](payload: Expression[V], key: Option[Expression[K]]) =
new KafkaRequestActionBuilder(KafkaAttributes(requestName, key, payload))
def send[K,V](topic: Expression[String], key: Expression[K], payload: Expression[V]): KafkaRequestActionBuilder[K,V] =
send(payload, Some(key), Some(topic))

private def send[K,V](payload: Expression[V], key: Option[Expression[K]], topic: Option[Expression[String]]) =
new KafkaRequestActionBuilder(KafkaAttributes(requestName, key, payload, topic))

}

0 comments on commit b320e43

Please sign in to comment.