-
Notifications
You must be signed in to change notification settings - Fork 796
/
addoffsetstotxn_test.go
122 lines (105 loc) · 2.76 KB
/
addoffsetstotxn_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package kafka
import (
"context"
"log"
"net"
"os"
"strconv"
"testing"
"time"
ktesting "github.com/segmentio/kafka-go/testing"
)
func TestClientAddOffsetsToTxn(t *testing.T) {
if !ktesting.KafkaIsAtLeast("0.11.0") {
t.Skip("Skipping test because kafka version is not high enough.")
}
topic := makeTopic()
transactionalID := makeTransactionalID()
client, shutdown := newLocalClient()
defer shutdown()
err := clientCreateTopic(client, topic, 3)
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
respc, err := waitForCoordinatorIndefinitely(ctx, client, &FindCoordinatorRequest{
Addr: client.Addr,
Key: transactionalID,
KeyType: CoordinatorKeyTypeConsumer,
})
if err != nil {
t.Fatal(err)
}
if respc.Error != nil {
t.Fatal(err)
}
groupID := makeGroupID()
group, err := NewConsumerGroup(ConsumerGroupConfig{
ID: groupID,
Topics: []string{topic},
Brokers: []string{"localhost:9092"},
HeartbeatInterval: 2 * time.Second,
RebalanceTimeout: 2 * time.Second,
RetentionTime: time.Hour,
Logger: log.New(os.Stdout, "cg-test: ", 0),
})
if err != nil {
t.Fatal(err)
}
defer group.Close()
ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
_, err = group.Next(ctx)
if err != nil {
t.Fatal(err)
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
respc, err = waitForCoordinatorIndefinitely(ctx, client, &FindCoordinatorRequest{
Addr: client.Addr,
Key: transactionalID,
KeyType: CoordinatorKeyTypeTransaction,
})
if err != nil {
t.Fatal(err)
}
transactionCoordinator := TCP(net.JoinHostPort(respc.Coordinator.Host, strconv.Itoa(int(respc.Coordinator.Port))))
client, shutdown = newClient(transactionCoordinator)
defer shutdown()
ipResp, err := client.InitProducerID(ctx, &InitProducerIDRequest{
TransactionalID: transactionalID,
TransactionTimeoutMs: 10000,
})
if err != nil {
t.Fatal(err)
}
if ipResp.Error != nil {
t.Fatal(ipResp.Error)
}
defer func() {
err := clientEndTxn(client, &EndTxnRequest{
TransactionalID: transactionalID,
ProducerID: ipResp.Producer.ProducerID,
ProducerEpoch: ipResp.Producer.ProducerEpoch,
Committed: false,
})
if err != nil {
t.Fatal(err)
}
}()
ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
resp, err := client.AddOffsetsToTxn(ctx, &AddOffsetsToTxnRequest{
TransactionalID: transactionalID,
ProducerID: ipResp.Producer.ProducerID,
ProducerEpoch: ipResp.Producer.ProducerEpoch,
GroupID: groupID,
})
if err != nil {
t.Fatal(err)
}
if resp.Error != nil {
t.Fatal(err)
}
}