Skip to content

Commit

Permalink
Merge pull request #57 from q2wdqea/feat-mq-kpush
Browse files Browse the repository at this point in the history
feat(mq): add key field
  • Loading branch information
kevwan authored Jul 24, 2024
2 parents 59d5072 + 2c75b16 commit 5af862f
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
5 changes: 4 additions & 1 deletion example/kq/producer/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@ func main() {
log.Fatal(err)
}

fmt.Println(string(body))
if err := pusher.Push(context.Background(), string(body)); err != nil {
log.Fatal(err)
}

if err := pusher.KPush(context.Background(), "test", string(body)); err != nil {
log.Fatal(err)
}
}

cmdline.EnterToContinue()
Expand Down
13 changes: 13 additions & 0 deletions kq/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,19 @@ func (p *Pusher) Name() string {
return p.topic
}

// KPush sends a message to the Kafka topic.
func (p *Pusher) KPush(ctx context.Context, k, v string) error {
msg := kafka.Message{
Key: []byte(k), // current timestamp
Value: []byte(v),
}
if p.executor != nil {
return p.executor.Add(msg, len(v))
} else {
return p.producer.WriteMessages(ctx, msg)
}
}

// Push sends a message to the Kafka topic.
func (p *Pusher) Push(ctx context.Context, v string) error {
msg := kafka.Message{
Expand Down

0 comments on commit 5af862f

Please sign in to comment.