diff --git a/CHANGELOG.md b/CHANGELOG.md index fb65f48..f26c0c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog for rabtap +## v1.30 (2021-10-13) + +* new: support header based routing (pub, queue bind, queue unbind) + ## v1.29 (2021-09-15) * new: add a docker image (ghcr.io/jandelgado/rabtap) diff --git a/README.md b/README.md index c56ad6b..ee61dae 100644 --- a/README.md +++ b/README.md @@ -145,7 +145,8 @@ Usage: [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--format=FORMAT] [--no-auto-ack] [-jksvn] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] - rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--routingkey=KEY] [--format=FORMAT] + rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--format=FORMAT] + [--routingkey=KEY | (--header=KV)...] [--confirms] [--mandatory] [--delay=DELAY | --speed=FACTOR] [-jkv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap exchange create EXCHANGE [--uri=URI] [--type=TYPE] [-adkv] @@ -154,9 +155,11 @@ Usage: [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap queue create QUEUE [--uri=URI] [-adkv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] - rabtap queue bind QUEUE to EXCHANGE --bindingkey=KEY [--uri=URI] [-kv] + rabtap queue bind QUEUE to EXCHANGE [--uri=URI] [-kv] + (--bindingkey=KEY | (--header=KV)... (--all|--any)) [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] - rabtap queue unbind QUEUE from EXCHANGE --bindingkey=KEY [--uri=URI] [-kv] + rabtap queue unbind QUEUE from EXCHANGE [--uri=URI] [-kv] + (--bindingkey=KEY | (--header=KV)... (--all|--any)) [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap queue rm QUEUE [--uri=URI] [-kv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap queue purge QUEUE [--uri=URI] [-kv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] @@ -165,7 +168,7 @@ Usage: rabtap --version Arguments and options: - EXCHANGES comma-separated list of exchanges and binding keys, + EXCHANGES comma-separated list of exchanges and optional binding keys, e.g. amq.topic:# or exchange1:key1,exchange2:key2. EXCHANGE name of an exchange, e.g. amq.direct. SOURCE file or directory to publish in pub mode. If omitted, stdin will be read. @@ -173,6 +176,8 @@ Arguments and options: CONNECTION name of a connection. DIR directory to read messages from. -a, --autodelete create auto delete exchange/queue. + --all set x-match=all option in header based routing. + --any set x-match=any option in header based routing. --api=APIURI connect to given API server. If APIURL is omitted, the environment variable RABTAP_APIURI will be used. -b, --bindingkey=KEY binding key to use in bind queue command. @@ -191,6 +196,8 @@ Arguments and options: * for info command: controls generated output format. Valid options are: "text", "dot". Default: text -h, --help print this help. + --header=KV A key value pair in the form of "key=value" used as a + routing- or binding-key. Can occur multiple times. -j, --json deprecated. Use "--format json" instead. -k, --insecure allow insecure TLS connections (no certificate check). --tls-cert-file=CERTFILE A Cert file to use for client authentication. @@ -474,26 +481,40 @@ Example assumes that `RABTAP_AMQPURI` environment variable is set, as the The `pub` command is used to publish messages to an exchange with a routing key. The messages to be published are either read from a file, or from a directory which contains previously recorded messages (e.g. using the -`--saveto` option of the `tap` command). Messages can be published either in -raw format, in which they are send as-is, or in [JSON-format, as described -here](#json-message-format), which includes message metadata and the body in a -single JSON document. +`--saveto` option of the `tap` command). + +Message routing is either specified with a routing key and the `--routingkey` +option or, when header based routing should be used, by specifying the headers +with the `--header` option. Each header is specified in the form `KEY=VALUE`. +Multiple headers can be specified by specifying multiple `--header` options. + +Messages can be published either in raw format, in which they are send as-is, +or in [JSON-format, as described here](#json-message-format), which includes +message metadata and the body in a single JSON document. When multiple messages +are published with metadata, rabtap will calculate the time elapsed of +consecutive recorded messages using the metadata, and delay publishing +accordingly. To set the publishing delay to a fix value, use the `--delay` +option. To publish without delays, use `--delay=0s`. To modify publishing speed +use the `--speed` option, which allows to set a factor to apply to the delays. The general form of the `pub` command is ``` -rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--routingkey=KEY] - [--confirms] [--mandatory] [--format=FORMAT] - [--delay=DELAY | --speed=FACTOR] [-jkv] +rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--format=FORMAT] + [--routingkey=KEY | (--header=HEADERKV)...] + [--confirms] [--mandatory] [--delay=DELAY | --speed=FACTOR] [-jkv] ``` -* `$ echo hello | rabtap pub amq.fanout` - publish "hello" to exchange amqp.fanout -* `$ rabtap pub messages.json --format=json` - messages are read from file `messages.json` - in [rabtap JSON format](#json-message-format). Target exchange and routing - keys are read from the messages meta data. The `messages.json` file can - contain multiple JSON documents as it is treated as a JSON stream. Rabtap - will honor the `XRabtapReceived` timestamps of the messages and by default - delay the messages as they were recorded. This behaviour can be overridden - by the `--delay` and `--speed` options. +* `$ echo hello | rabtap pub --exchange amq.fanout` - publish "hello" to + exchange amqp.fanout +* `echo "hello" | rabtap pub --exchange amq.header --header KEY=VAL --header X=Y` - + publish hello to exchange amq.header use set message headers. +* `$ rabtap pub messages.json --format=json` - messages are read from file + `messages.json` in [rabtap JSON format](#json-message-format). Target + exchange and routing keys are read from the messages meta data. The + `messages.json` file can contain multiple JSON documents as it is treated as + a JSON stream. Rabtap will honor the `XRabtapReceived` timestamps of the + messages and by default will delay the messages as they were recorded. This + behaviour can be overridden by the `--delay` and `--speed` options. * `$ rabtap pub --exchange amq.direct -r myKey --format=json messages.json --delay=0s` - as before, but publish messages always to exchange `amq.direct` with routing key `myKey` and without any delays. diff --git a/cmd/rabtap/cmd_publish.go b/cmd/rabtap/cmd_publish.go index 2a57d3a..962924b 100644 --- a/cmd/rabtap/cmd_publish.go +++ b/cmd/rabtap/cmd_publish.go @@ -22,6 +22,7 @@ type CmdPublishArg struct { tlsConfig *tls.Config exchange *string routingKey *string + headers rabtap.KeyValueMap readerFunc MessageReaderFunc speed float64 fixedDelay *time.Duration @@ -55,15 +56,11 @@ func durationBetweenMessages(first, second *RabtapPersistentMessage, // publishMessage publishes a single message on the given exchange with the // provided routingkey func publishMessage(publishChannel rabtap.PublishChannel, - exchange, routingKey string, + routing rabtap.Routing, amqpPublishing amqp.Publishing) { - log.Debugf("publishing message to exchange '%s' with routing key '%s'", - exchange, routingKey) - publishChannel <- &rabtap.PublishMessage{ - Exchange: exchange, - RoutingKey: routingKey, + Routing: routing, Publishing: &amqpPublishing} } @@ -76,10 +73,21 @@ func selectOptionalOrDefault(optionalStr *string, defaultStr string) string { return defaultStr } +// routingFromMessage creates a Routing from a message and optional defaults +// that can override fields in the message +func routingFromMessage(optExchange, optRoutingKey *string, headers rabtap.KeyValueMap, msg RabtapPersistentMessage) rabtap.Routing { + routingKey := selectOptionalOrDefault(optRoutingKey, msg.RoutingKey) + exchange := selectOptionalOrDefault(optExchange, msg.Exchange) + mergedHeaders := rabtap.MergeTables(msg.Headers, rabtap.ToAMQPTable(headers)) + return rabtap.NewRouting(exchange, routingKey, mergedHeaders) +} + // publishMessageStream publishes messages from the provided message stream // provided by readNextMessageFunc. When done closes the publishChannel func publishMessageStream(publishCh rabtap.PublishChannel, - optExchange, optRoutingKey *string, + optExchange *string, + optRoutingKey *string, + headers rabtap.KeyValueMap, readNextMessageFunc MessageReaderFunc, delayFunc DelayFunc) error { @@ -95,9 +103,15 @@ func publishMessageStream(publishCh rabtap.PublishChannel, return nil case nil: delayFunc(lastMsg, &msg) - routingKey := selectOptionalOrDefault(optRoutingKey, msg.RoutingKey) - exchange := selectOptionalOrDefault(optExchange, msg.Exchange) - publishMessage(publishCh, exchange, routingKey, msg.ToAmqpPublishing()) + + // the per-message routing key (in case it was read from a json + // file) can be overriden by the command line, if set. + routing := routingFromMessage(optExchange, optRoutingKey, headers, msg) + + // during publishing, header information in msg.Header will be overriden + // by header information in the routing object (if present). The + // latter are set on the command line using --header K=V options. + publishMessage(publishCh, routing, msg.ToAmqpPublishing()) lastMsg = &msg default: return err @@ -146,7 +160,7 @@ func cmdPublish(ctx context.Context, cmd CmdPublishArg) error { // avoid blocking when e.g. the user presses CTRL+S and then CTRL+C. // TODO find better solution resultCh <- publishMessageStream(publishCh, cmd.exchange, - cmd.routingKey, cmd.readerFunc, delayFunc) + cmd.routingKey, cmd.headers, cmd.readerFunc, delayFunc) }() g.Go(func() error { diff --git a/cmd/rabtap/cmd_publish_test.go b/cmd/rabtap/cmd_publish_test.go index 4e823bb..d45db7d 100644 --- a/cmd/rabtap/cmd_publish_test.go +++ b/cmd/rabtap/cmd_publish_test.go @@ -19,6 +19,31 @@ import ( "github.com/stretchr/testify/require" ) +func TestRoutingFromMessageUsesOptExchangeWhenSpecified(t *testing.T) { + + key := "okey" + exchange := "oexchange" + testMsg := RabtapPersistentMessage{Exchange: "exchange", RoutingKey: "key", Headers: amqp.Table{"A": "B"}} + + tests := []struct { + optKey *string + optExchange *string + msg RabtapPersistentMessage + headers rabtap.KeyValueMap + expected rabtap.Routing + }{ + {optKey: nil, optExchange: nil, msg: testMsg, headers: nil, expected: rabtap.NewRouting("exchange", "key", amqp.Table{"A": "B"})}, + {optKey: nil, optExchange: nil, msg: testMsg, headers: rabtap.KeyValueMap{"A": "X"}, expected: rabtap.NewRouting("exchange", "key", amqp.Table{"A": "X"})}, + {optKey: &key, optExchange: &exchange, msg: testMsg, headers: nil, expected: rabtap.NewRouting("oexchange", "okey", amqp.Table{"A": "B"})}, + } + + for _, tc := range tests { + routing := routingFromMessage(tc.optExchange, tc.optKey, tc.headers, tc.msg) + assert.Equal(t, tc.expected, routing, tc) + } + +} + func TestMultDurationReturnsCorrectValue(t *testing.T) { assert.Equal(t, time.Duration(50), multDuration(time.Duration(100), 0.5)) } @@ -74,14 +99,14 @@ func TestPublishMessageStreamPublishesNextMessage(t *testing.T) { pubCh := make(rabtap.PublishChannel, 1) exchange := "exchange" - routingKey := "key" - err := publishMessageStream(pubCh, &exchange, &routingKey, mockReader, delayer) + key := "key" + err := publishMessageStream(pubCh, &exchange, &key, rabtap.KeyValueMap{}, mockReader, delayer) assert.Nil(t, err) select { case message := <-pubCh: - assert.Equal(t, "exchange", message.Exchange) - assert.Equal(t, "key", message.RoutingKey) + assert.Equal(t, "exchange", message.Routing.Exchange()) + assert.Equal(t, "key", message.Routing.Key()) assert.Equal(t, "hello", string(message.Publishing.Body)) case <-time.After(time.Second * 2): assert.Fail(t, "did not receive message within expected time") @@ -103,8 +128,8 @@ func TestPublishMessageStreamPropagatesMessageReadError(t *testing.T) { pubCh := make(rabtap.PublishChannel) exchange := "" - routingKey := "" - err := publishMessageStream(pubCh, &exchange, &routingKey, mockReader, delayer) + key := "key" + err := publishMessageStream(pubCh, &exchange, &key, rabtap.KeyValueMap{}, mockReader, delayer) assert.Equal(t, errors.New("error"), err) } diff --git a/cmd/rabtap/cmd_queue.go b/cmd/rabtap/cmd_queue.go index 7185d1b..89be01b 100644 --- a/cmd/rabtap/cmd_queue.go +++ b/cmd/rabtap/cmd_queue.go @@ -1,9 +1,8 @@ -// Copyright (C) 2017 Jan Delgado +// rabtap queue related commands +// Copyright (C) 2017-2021 Jan Delgado package main -// exchange related cli command handlers - import ( "crypto/tls" "net/url" @@ -21,6 +20,24 @@ type CmdQueueCreateArg struct { tlsConfig *tls.Config } +type CmdQueueBindArg struct { + amqpURL *url.URL + queue string + exchange string + key string + headers rabtap.KeyValueMap + headerMode HeaderMode + tlsConfig *tls.Config +} + +func amqpHeaderRoutingMode(mode HeaderMode) string { + modes := map[HeaderMode]string{ + HeaderMatchAny: "any", + HeaderMatchAll: "all", + HeaderNone: ""} + return modes[mode] +} + // cmdQueueCreate creates a new queue on the given broker func cmdQueueCreate(cmd CmdQueueCreateArg) { failOnError(rabtap.SimpleAmqpConnector(cmd.amqpURL, @@ -59,27 +76,28 @@ func cmdQueuePurge(amqpURL *url.URL, queueName string, tlsConfig *tls.Config) { } // cmdQueueBindToExchange binds a queue to an exchange -func cmdQueueBindToExchange(amqpURL *url.URL, queueName, key, exchangeName string, - tlsConfig *tls.Config) { - - failOnError(rabtap.SimpleAmqpConnector(amqpURL, - tlsConfig, +func cmdQueueBindToExchange(cmd CmdQueueBindArg) { + failOnError(rabtap.SimpleAmqpConnector(cmd.amqpURL, cmd.tlsConfig, func(session rabtap.Session) error { - log.Debugf("binding queue %s to exchange %s w/ key %s", - queueName, exchangeName, key) - return rabtap.BindQueueToExchange(session, queueName, key, exchangeName) + if cmd.headerMode != HeaderNone { + cmd.headers["x-match"] = amqpHeaderRoutingMode(cmd.headerMode) + } + log.Debugf("binding queue %s to exchange %s w/ key %s and headers %v", + cmd.queue, cmd.exchange, cmd.key, cmd.headers) + + return rabtap.BindQueueToExchange(session, cmd.queue, cmd.key, cmd.exchange, rabtap.ToAMQPTable(cmd.headers)) }), "bind queue failed", os.Exit) } // cmdQueueUnbindFromExchange unbinds a queue from an exchange -func cmdQueueUnbindFromExchange(amqpURL *url.URL, queueName, key, exchangeName string, - tlsConfig *tls.Config) { - - failOnError(rabtap.SimpleAmqpConnector(amqpURL, - tlsConfig, +func cmdQueueUnbindFromExchange(cmd CmdQueueBindArg) { + failOnError(rabtap.SimpleAmqpConnector(cmd.amqpURL, cmd.tlsConfig, func(session rabtap.Session) error { - log.Debugf("unbinding queue %s from exchange %s w/ key %s", - queueName, exchangeName, key) - return rabtap.UnbindQueueFromExchange(session, queueName, key, exchangeName) + if cmd.headerMode != HeaderNone { + cmd.headers["x-match"] = amqpHeaderRoutingMode(cmd.headerMode) + } + log.Debugf("unbinding queue %s from exchange %s w/ key %s and headers %v", + cmd.queue, cmd.exchange, cmd.key, cmd.headers) + return rabtap.UnbindQueueFromExchange(session, cmd.queue, cmd.key, cmd.exchange, rabtap.ToAMQPTable(cmd.headers)) }), "unbind queue failed", os.Exit) } diff --git a/cmd/rabtap/cmd_queue_test.go b/cmd/rabtap/cmd_queue_test.go index 0945e1a..36f708b 100644 --- a/cmd/rabtap/cmd_queue_test.go +++ b/cmd/rabtap/cmd_queue_test.go @@ -18,6 +18,12 @@ import ( "github.com/stretchr/testify/require" ) +func TestAmqpHeaderRoutingModeConverts(t *testing.T) { + assert.Equal(t, "all", amqpHeaderRoutingMode(HeaderMatchAll)) + assert.Equal(t, "any", amqpHeaderRoutingMode(HeaderMatchAny)) + assert.Equal(t, "", amqpHeaderRoutingMode(HeaderNone)) +} + func TestIntegrationCmdQueueCreatePurgeiBindUnbindQueue(t *testing.T) { // integration tests queue creation, bind to exchange, purge, diff --git a/cmd/rabtap/cmd_subscribe_test.go b/cmd/rabtap/cmd_subscribe_test.go index 96a0876..820c036 100644 --- a/cmd/rabtap/cmd_subscribe_test.go +++ b/cmd/rabtap/cmd_subscribe_test.go @@ -92,6 +92,7 @@ func TestCmdSub(t *testing.T) { amqpURL: amqpURL, exchange: &testExchange, routingKey: &testKey, + headers: rabtap.KeyValueMap{}, tlsConfig: tlsConfig, readerFunc: func() (RabtapPersistentMessage, bool, error) { // provide exactly one message diff --git a/cmd/rabtap/command_line.go b/cmd/rabtap/command_line.go index d035603..4120888 100644 --- a/cmd/rabtap/command_line.go +++ b/cmd/rabtap/command_line.go @@ -10,6 +10,7 @@ import ( "fmt" "net/url" "os" + "regexp" "strconv" "time" @@ -36,7 +37,8 @@ Usage: [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--format=FORMAT] [--no-auto-ack] [-jksvn] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] - rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--routingkey=KEY] [--format=FORMAT] + rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--format=FORMAT] + [--routingkey=KEY | (--header=KV)...] [--confirms] [--mandatory] [--delay=DELAY | --speed=FACTOR] [-jkv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap exchange create EXCHANGE [--uri=URI] [--type=TYPE] [-adkv] @@ -45,9 +47,11 @@ Usage: [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap queue create QUEUE [--uri=URI] [-adkv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] - rabtap queue bind QUEUE to EXCHANGE --bindingkey=KEY [--uri=URI] [-kv] + rabtap queue bind QUEUE to EXCHANGE [--uri=URI] [-kv] + (--bindingkey=KEY | (--header=KV)... (--all|--any)) [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] - rabtap queue unbind QUEUE from EXCHANGE --bindingkey=KEY [--uri=URI] [-kv] + rabtap queue unbind QUEUE from EXCHANGE [--uri=URI] [-kv] + (--bindingkey=KEY | (--header=KV)... (--all|--any)) [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap queue rm QUEUE [--uri=URI] [-kv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap queue purge QUEUE [--uri=URI] [-kv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] @@ -56,7 +60,7 @@ Usage: rabtap --version Arguments and options: - EXCHANGES comma-separated list of exchanges and binding keys, + EXCHANGES comma-separated list of exchanges and optional binding keys, e.g. amq.topic:# or exchange1:key1,exchange2:key2. EXCHANGE name of an exchange, e.g. amq.direct. SOURCE file or directory to publish in pub mode. If omitted, stdin will be read. @@ -64,6 +68,8 @@ Arguments and options: CONNECTION name of a connection. DIR directory to read messages from. -a, --autodelete create auto delete exchange/queue. + --all set x-match=all option in header based routing. + --any set x-match=any option in header based routing. --api=APIURI connect to given API server. If APIURL is omitted, the environment variable RABTAP_APIURI will be used. -b, --bindingkey=KEY binding key to use in bind queue command. @@ -82,6 +88,8 @@ Arguments and options: * for info command: controls generated output format. Valid options are: "text", "dot". Default: text -h, --help print this help. + --header=KV A key value pair in the form of "key=value" used as a + routing- or binding-key. Can occur multiple times. -j, --json deprecated. Use "--format json" instead. -k, --insecure allow insecure TLS connections (no certificate check). --tls-cert-file=CERTFILE A Cert file to use for client authentication. @@ -166,6 +174,42 @@ const ( ConnCloseCmd ) +type HeaderMode int + +const ( + // match any headers (--any) + HeaderMatchAny HeaderMode = iota + // match all headers (-all) + HeaderMatchAll + // header based routing is not used + HeaderNone +) + +// parseKeyValue parses an expression of the form "key=value" +func parseKeyValue(expr string) (string, string, error) { + re := regexp.MustCompile(`\s*([^= ]+)\s*=\s*([^= ]+)\s*`) + all := re.FindStringSubmatch(expr) + if all == nil { + return "", "", fmt.Errorf("could not parse key-value expression") + } + return all[1], all[2], nil +} + +func parseKeyValueList(exprs []string) (map[string]string, error) { + if exprs == nil { + return nil, nil + } + res := make(map[string]string, len(exprs)) + for _, expr := range exprs { + k, v, err := parseKeyValue(expr) + if err != nil { + return nil, fmt.Errorf("%s: %w", expr, err) + } + res[k] = v + } + return res, nil +} + type commonArgs struct { TLSCertFile string TLSKeyFile string @@ -185,32 +229,33 @@ type CommandLineArgs struct { TapConfig []rabtap.TapConfiguration // configuration in tap mode APIURL *url.URL - PubExchange *string // pub: exchange to publish to - PubRoutingKey *string // pub: routing key, defaults to "" - Source *string // pub: file to send - Speed float64 // pub: speed factor - Delay *time.Duration // pub: fixed delay in ms - Confirms bool // pub: wait for confirmations - Mandatory bool // pub: set mandatory flag - AutoAck bool // sub: auto ack enabled - QueueName string // queue create, remove, bind, sub - QueueBindingKey string // queue bind - ExchangeName string // exchange name create, remove or queue bind - ExchangeType string // exchange type create, remove or queue bind - ShowConsumers bool // info: also show consumer - InfoMode string // info: byExchange, byConnection - ShowStats bool // info: also show statistics - QueueFilter string // info: optional filter predicate - OmitEmptyExchanges bool // info: do not show exchanges wo/ bindings - ShowDefaultExchange bool // info: show default exchange - Format string // output format, depends on command - Durable bool // queue create, exchange create - Autodelete bool // queue create, exchange create - SaveDir *string // save: optional directory to stores files to - Silent bool // suppress message printing - ConnName string // conn: name of connection - CloseReason string // conn: reason of close - + PubExchange *string // pub: exchange to publish to + PubRoutingKey *string // pub: routing key, defaults to "" + Source *string // pub: file to send + Speed float64 // pub: speed factor + Delay *time.Duration // pub: fixed delay in ms + Confirms bool // pub: wait for confirmations + Mandatory bool // pub: set mandatory flag + AutoAck bool // sub: auto ack enabled + QueueName string // queue create, remove, bind, sub + QueueBindingKey string // queue bind + ExchangeName string // exchange name create, remove or queue bind + ExchangeType string // exchange type create, remove or queue bind + ShowConsumers bool // info: also show consumer + InfoMode string // info: byExchange, byConnection + ShowStats bool // info: also show statistics + QueueFilter string // info: optional filter predicate + OmitEmptyExchanges bool // info: do not show exchanges wo/ bindings + ShowDefaultExchange bool // info: show default exchange + Format string // output format, depends on command + Durable bool // queue create, exchange create + Autodelete bool // queue create, exchange create + SaveDir *string // save: optional directory to stores files to + Silent bool // suppress message printing + ConnName string // conn: name of connection + CloseReason string // conn: reason of close + Headers map[string]string // pub, tap: headers for header based routing + HeaderMode HeaderMode // queue ceate, header based routing } // getAMQPURL returns the ith entry of amqpURLs array or the value @@ -367,6 +412,20 @@ func parseSubCmdArgs(args map[string]interface{}) (CommandLineArgs, error) { return result, nil } +func parseBindingKey(args map[string]interface{}) string { + if key, ok := args["--bindingkey"].(string); ok { + return key + } + return "" +} + +func parseBindingHeaders(args map[string]interface{}) (map[string]string, error) { + if headers, ok := args["--header"].([]string); ok { + return parseKeyValueList(headers) + } + return map[string]string{}, nil +} + func parseQueueCmdArgs(args map[string]interface{}) (CommandLineArgs, error) { result := CommandLineArgs{ commonArgs: parseCommonArgs(args), @@ -384,14 +443,40 @@ func parseQueueCmdArgs(args map[string]interface{}) (CommandLineArgs, error) { case args["rm"].(bool): result.Cmd = QueueRemoveCmd case args["bind"].(bool): - // bind QUEUE to EXCHANGE [--bindingkey key] + // bind QUEUE to EXCHANGE ([--bindingkey key] | (--header KEYVAL)* ) + var err error result.Cmd = QueueBindCmd - result.QueueBindingKey = args["--bindingkey"].(string) + result.QueueBindingKey = parseBindingKey(args) + + result.Headers, err = parseBindingHeaders(args) + if err != nil { + return result, err + } + if args["--any"].(bool) { + result.HeaderMode = HeaderMatchAny + } else if args["--all"].(bool) { + result.HeaderMode = HeaderMatchAll + } else { + result.HeaderMode = HeaderNone + } + result.ExchangeName = args["EXCHANGE"].(string) + case args["unbind"].(bool): // unbind QUEUE from EXCHANGE [--bindingkey key] result.Cmd = QueueUnbindCmd - result.QueueBindingKey = args["--bindingkey"].(string) + result.QueueBindingKey = parseBindingKey(args) + result.Headers, err = parseBindingHeaders(args) + if err != nil { + return result, err + } + if args["--any"].(bool) { + result.HeaderMode = HeaderMatchAny + } else if args["--all"].(bool) { + result.HeaderMode = HeaderMatchAll + } else { + result.HeaderMode = HeaderNone + } result.ExchangeName = args["EXCHANGE"].(string) case args["purge"].(bool): result.Cmd = QueuePurgeCmd @@ -440,6 +525,10 @@ func parsePublishCmdArgs(args map[string]interface{}) (CommandLineArgs, error) { exchange := args["--exchange"].(string) result.PubExchange = &exchange } + result.Headers, err = parseBindingHeaders(args) + if err != nil { + return result, err + } if args["--routingkey"] != nil { routingKey := args["--routingkey"].(string) result.PubRoutingKey = &routingKey diff --git a/cmd/rabtap/command_line_test.go b/cmd/rabtap/command_line_test.go index a32440b..db82bf8 100644 --- a/cmd/rabtap/command_line_test.go +++ b/cmd/rabtap/command_line_test.go @@ -27,6 +27,41 @@ func assertEqualURL(t *testing.T, expected string, actual *url.URL) { assert.Equal(t, expectedURI, actual) } +func TestParseKeyValueExpressions(t *testing.T) { + testcases := []struct { + desc string + probe string + expectedKey, expectedVal string + err bool + }{ + {"empty is invalid", "", "", "", true}, + {"without assignment is invalid", "a b", "", "", true}, + {"without lhs is invalid", "=rhs", "", "", true}, + {"without rhs is invalid", "lhs=", "", "", true}, + {"standard case", "key=value", "key", "value", false}, + {"standard case with whitespace", " key = value ", "key", "value", false}, + {"standard case with whitespace and special chars", " key_1.2 = value%3 ", "key_1.2", "value%3", false}, + } + + for _, tc := range testcases { + k, v, err := parseKeyValue(tc.probe) + assert.Equal(t, tc.expectedKey, k, tc.desc) + assert.Equal(t, tc.expectedVal, v, tc.desc) + assert.Equal(t, tc.err, err != nil, tc.desc) + } +} + +func TestParseKeyValueListParsesValidList(t *testing.T) { + kv, err := parseKeyValueList([]string{"a=b"}) + assert.NoError(t, err) + assert.Equal(t, map[string]string{"a": "b"}, kv) +} + +func TestParseKeyValueListParsesFailsOnInvalidInput(t *testing.T) { + _, err := parseKeyValueList([]string{"a="}) + assert.Error(t, err) +} + func TestParseAMQPURLParsesValidURI(t *testing.T) { // since multple --uri arguments are possible docopt returns an array args := map[string]interface{}{"--uri": []string{"uri"}} @@ -483,7 +518,7 @@ func TestCliUnbindQueue(t *testing.T) { assertEqualURL(t, "uri", args.AMQPURL) } -func TestCliBindQueue(t *testing.T) { +func TestCliBindQueueWithBindingKey(t *testing.T) { args, err := ParseCommandLineArgs( []string{"queue", "bind", "queuename", "to", "exchangename", "--bindingkey", "key", "--uri", "uri"}) @@ -493,6 +528,23 @@ func TestCliBindQueue(t *testing.T) { assert.Equal(t, "queuename", args.QueueName) assert.Equal(t, "exchangename", args.ExchangeName) assert.Equal(t, "key", args.QueueBindingKey) + assert.Equal(t, map[string]string{}, args.Headers) + assert.Equal(t, HeaderNone, args.HeaderMode) + assertEqualURL(t, "uri", args.AMQPURL) +} + +func TestCliBindQueueWithHeaders(t *testing.T) { + args, err := ParseCommandLineArgs( + []string{"queue", "bind", "queuename", "to", "exchangename", + "--header", "a=b", "--header", "c=d", "--any", "--uri", "uri"}) + + assert.Nil(t, err) + assert.Equal(t, QueueBindCmd, args.Cmd) + assert.Equal(t, "queuename", args.QueueName) + assert.Equal(t, "exchangename", args.ExchangeName) + assert.Equal(t, "", args.QueueBindingKey) + assert.Equal(t, map[string]string{"a": "b", "c": "d"}, args.Headers) + assert.Equal(t, HeaderMatchAny, args.HeaderMode) assertEqualURL(t, "uri", args.AMQPURL) } diff --git a/cmd/rabtap/main.go b/cmd/rabtap/main.go index 3dfc012..d1588b9 100644 --- a/cmd/rabtap/main.go +++ b/cmd/rabtap/main.go @@ -138,6 +138,7 @@ func startCmdPublish(ctx context.Context, args CommandLineArgs) { amqpURL: args.AMQPURL, exchange: args.PubExchange, routingKey: args.PubRoutingKey, + headers: args.Headers, fixedDelay: args.Delay, speed: args.Speed, tlsConfig: getTLSConfig(args.InsecureTLS, args.TLSCertFile, args.TLSKeyFile, args.TLSCaFile), @@ -208,11 +209,19 @@ func dispatchCmd(ctx context.Context, args CommandLineArgs, tlsConfig *tls.Confi case QueuePurgeCmd: cmdQueuePurge(args.AMQPURL, args.QueueName, tlsConfig) case QueueBindCmd: - cmdQueueBindToExchange(args.AMQPURL, args.QueueName, - args.QueueBindingKey, args.ExchangeName, tlsConfig) + cmdQueueBindToExchange(CmdQueueBindArg{ + amqpURL: args.AMQPURL, + exchange: args.ExchangeName, + queue: args.QueueName, key: args.QueueBindingKey, + headerMode: args.HeaderMode, headers: args.Headers, + tlsConfig: tlsConfig}) case QueueUnbindCmd: - cmdQueueUnbindFromExchange(args.AMQPURL, args.QueueName, - args.QueueBindingKey, args.ExchangeName, tlsConfig) + cmdQueueUnbindFromExchange(CmdQueueBindArg{ + amqpURL: args.AMQPURL, + exchange: args.ExchangeName, + queue: args.QueueName, key: args.QueueBindingKey, + headerMode: args.HeaderMode, headers: args.Headers, + tlsConfig: tlsConfig}) case ConnCloseCmd: cmdConnClose(ctx, args.APIURL, args.ConnName, args.CloseReason, tlsConfig) diff --git a/inttest/rabbitmq/docker-compose.yml b/inttest/rabbitmq/docker-compose.yml index f852e13..1e85fa0 100644 --- a/inttest/rabbitmq/docker-compose.yml +++ b/inttest/rabbitmq/docker-compose.yml @@ -1,7 +1,7 @@ version: '3' services: rabbitmq: - image: rabbitmq:3.8-management-alpine + image: rabbitmq:3.9.8-management-alpine volumes: - ./definitions.json:/etc/rabbitmq/definitions.json:z - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:z diff --git a/pkg/key_value.go b/pkg/key_value.go new file mode 100644 index 0000000..2cf4bf1 --- /dev/null +++ b/pkg/key_value.go @@ -0,0 +1,13 @@ +package rabtap + +import "github.com/streadway/amqp" + +type KeyValueMap map[string]string + +func ToAMQPTable(headers KeyValueMap) amqp.Table { + table := amqp.Table{} + for k, v := range headers { + table[k] = v + } + return table +} diff --git a/pkg/publish.go b/pkg/publish.go index 30ccaad..a6fdd54 100644 --- a/pkg/publish.go +++ b/pkg/publish.go @@ -12,11 +12,13 @@ import ( "github.com/streadway/amqp" ) +const timeoutWaitACK = time.Second * 2 +const timeoutWaitServer = time.Second * 1 + // PublishMessage is a message to be published by AmqpPublish via // a PublishChannel type PublishMessage struct { - Exchange string - RoutingKey string + Routing Routing Publishing *amqp.Publishing } @@ -59,17 +61,22 @@ type PublishErrorChannel chan *PublishError func (s *PublishError) Error() string { switch s.Reason { case PublishErrorAckTimeout: - return fmt.Sprintf("timeout waiting for ACK publishing to exchange '%s' with routingkey '%s'", - s.Message.Exchange, s.Message.RoutingKey) + return fmt.Sprintf("publish to %s failed: timeout waiting for ACK", + s.Message.Routing) case PublishErrorNack: - return fmt.Sprintf("NACK from server publishing to exchange '%s' with routingkey '%s'", - s.Message.Exchange, s.Message.RoutingKey) + return fmt.Sprintf("publish to %s failed: NACK", + s.Message.Routing) case PublishErrorPublishFailed: - return fmt.Sprintf("publish to exchange '%s' with routingkey '%s' failed: %s", - s.Message.Exchange, s.Message.RoutingKey, s.Cause) + return fmt.Sprintf("publish to %s failed: %s", + s.Message.Routing, s.Cause) case PublishErrorReturned: - return fmt.Sprintf("server returned message for exchange '%s' with routingkey '%s': %s", - s.ReturnedMessage.Exchange, s.ReturnedMessage.RoutingKey, s.ReturnedMessage.ReplyText) + // note: RabbitMQ seems not to set the headers on a returned message + // when e.g. header based routing was used. + routing := NewRouting(s.ReturnedMessage.Exchange, + s.ReturnedMessage.RoutingKey, + s.ReturnedMessage.Headers) + return fmt.Sprintf("server returned message for %s: %s", + routing, s.ReturnedMessage.ReplyText) case PublishErrorChannelError: return fmt.Sprintf("channel error: %s", s.Cause) } @@ -105,6 +112,8 @@ func NewAmqpPublish(url *url.URL, tlsConfig *tls.Config, // The immedeate flag is not supported since RabbitMQ 3.0, see // https://blog.rabbitmq.com/posts/2012/11/breaking-things-with-rabbitmq-3-0 // +// TODO detect throttling +// TODO simplify func (s *AmqpPublish) createWorkerFunc( publishCh PublishChannel, errorCh PublishErrorChannel) AmqpWorkerFunc { @@ -114,7 +123,7 @@ func (s *AmqpPublish) createWorkerFunc( // errors receives channel errors (e.g. publishing to non-existant exchange) errors := session.Channel.NotifyClose(make(chan *amqp.Error, 1)) // return receivces unroutable messages back from the server - returns := session.NotifyReturn(make(chan amqp.Return)) + returns := session.NotifyReturn(make(chan amqp.Return, 1)) // confirms receives confirmations from the server (if enabled below) confirms := session.NotifyPublish(make(chan amqp.Confirmation, 1)) @@ -123,41 +132,43 @@ func (s *AmqpPublish) createWorkerFunc( s.logger.Errorf("Channel could not be put into confirm mode: %s", err) } } + // wait a while for outstanding errors and returned messages // since these can arrive after we finished publishing. - - // TODO when an error was detected on the errors channel (event loop), - // then the defer needs not to be run (???) defer func() { - s.logger.Debugf("waiting for confirms & returns... ") - timeout := time.After(time.Second * 1) // TODO config/const + if !s.mandatory { + return + } - // wait for pending returned messages from the broker, whem - // e.g. a message could not be routed. in this case the - // message WILL be confirmed (ACK=true), but an async - // return message will be send, for which we wait here. + s.logger.Debugf("waiting for pending server messages ... ") + timeout := time.After(timeoutWaitServer) + + // wait for pending returned messages from the broker, when e.g. a + // message could not be routed. in this case the message WILL be + // confirmed (ACK=true), but an async return message will be send, + // for which we wait here. for { select { case <-timeout: return case returned, more := <-returns: - if !more { - continue + if more { + errorCh <- &PublishError{Reason: PublishErrorReturned, ReturnedMessage: &returned} } - errorCh <- &PublishError{Reason: PublishErrorReturned, ReturnedMessage: &returned} - // these events singal closing of the channel TODO relevant here? case err, more := <-errors: - if !more { - continue + if more { + errorCh <- &PublishError{Reason: PublishErrorChannelError, Cause: err} } - errorCh <- &PublishError{Reason: PublishErrorChannelError, Cause: err} } } }() + ackTimeout := time.NewTimer(timeoutWaitACK) + defer ackTimeout.Stop() + for { select { case err := <-errors: @@ -176,8 +187,12 @@ func (s *AmqpPublish) createWorkerFunc( return doNotReconnect, nil } - err := session.Publish(message.Exchange, - message.RoutingKey, + size := len((*message.Publishing).Body) + s.logger.Debugf("publish message to %s (%d bytes)", message.Routing, size) + message.Publishing.Headers = message.Routing.Headers() + err := session.Publish( + message.Routing.Exchange(), + message.Routing.Key(), s.mandatory, false, // immeadiate flag was removed with RabbitMQ 3 *message.Publishing) @@ -187,19 +202,38 @@ func (s *AmqpPublish) createWorkerFunc( } else { // wait for the confirmation before publishing a new message + // https://www.rabbitmq.com/confirms.html + // TODO batched confirms + // + // "For unroutable messages, the broker will issue a confirm + // once the exchange verifies a message won't route to any + // queue (returns an empty list of queues). If the message + // is also published as mandatory, the basic.return is sent + // to the client before basic.ack. The same is true for + // negative acknowledgements (basic.nack)." if s.confirms { - select { - case <-time.After(2 * time.Second): // TODO MEMORY LEAK when not firing FIXME - errorCh <- &PublishError{Reason: PublishErrorAckTimeout, Message: message} - case confirmed := <-confirms: - if !confirmed.Ack { - errorCh <- &PublishError{Reason: PublishErrorNack, Message: message} - } else { - s.logger.Infof("delivery with delivery tag #%d was ACKed by the server", - confirmed.DeliveryTag) + ackTimeout.Reset(timeoutWaitACK) + Outer: + for { + select { + case <-ackTimeout.C: + errorCh <- &PublishError{Reason: PublishErrorAckTimeout, Message: message} + break Outer + case returned, more := <-returns: + if more { + errorCh <- &PublishError{Reason: PublishErrorReturned, ReturnedMessage: &returned} + } + case confirmed := <-confirms: + if !confirmed.Ack { + errorCh <- &PublishError{Reason: PublishErrorNack, Message: message} + } else { + s.logger.Infof("delivery with delivery tag #%d was ACKed by the server", + confirmed.DeliveryTag) + } + break Outer + case <-ctx.Done(): + return doNotReconnect, nil } - case <-ctx.Done(): - return doNotReconnect, nil } } } diff --git a/pkg/publish_test.go b/pkg/publish_test.go index c7d7c33..b476085 100644 --- a/pkg/publish_test.go +++ b/pkg/publish_test.go @@ -41,14 +41,16 @@ func TestIntegrationAmqpPublishDirectExchange(t *testing.T) { // AmqpPublish now has started a go-routine which handles // connection to broker and expects messages on the publishChannel + key := "queue-1" for i := 0; i < numPublishingMessages; i++ { - publishChannel <- &PublishMessage{Exchange: "direct-exchange", - RoutingKey: "queue-1", + routing := NewRouting("direct-exchange", key, amqp.Table{}) + publishChannel <- &PublishMessage{ + Routing: routing, Publishing: &amqp.Publishing{Body: []byte("Hello")}} } doneChan := make(chan int) - testcommon.VerifyTestMessageOnQueue(t, ch, "consumer", numPublishingMessages, "queue-1", doneChan) + testcommon.VerifyTestMessageOnQueue(t, ch, "consumer", numPublishingMessages, key, doneChan) numReceivedOriginal := <-doneChan assert.Equal(t, numPublishingMessages, numReceivedOriginal) } diff --git a/pkg/queue.go b/pkg/queue.go index 315fe77..91a2be8 100644 --- a/pkg/queue.go +++ b/pkg/queue.go @@ -3,6 +3,8 @@ package rabtap +import "github.com/streadway/amqp" + // CreateQueue creates a new queue // TODO(JD) get rid of bool types func CreateQueue(session Session, queueName string, @@ -31,14 +33,13 @@ func PurgeQueue(session Session, queueName string) (int, error) { } // BindQueueToExchange binds the given queue to the given exchange. -// TODO(JD) support for header based routing func BindQueueToExchange(session Session, - queueName, key, exchangeName string) error { - return session.QueueBind(queueName, key, exchangeName, false /* wait */, nil) + queueName, key, exchangeName string, headers amqp.Table) error { + return session.QueueBind(queueName, key, exchangeName, false /* wait */, headers) } // UnbindQueueFromExchange unbinds a queue from an exchange func UnbindQueueFromExchange(session Session, - queueName, key, exchangeName string) error { - return session.QueueUnbind(queueName, key, exchangeName, nil) + queueName, key, exchangeName string, headers amqp.Table) error { + return session.QueueUnbind(queueName, key, exchangeName, headers) } diff --git a/pkg/queue_test.go b/pkg/queue_test.go index df79f36..2a59d95 100644 --- a/pkg/queue_test.go +++ b/pkg/queue_test.go @@ -16,6 +16,7 @@ import ( "testing" "github.com/jandelgado/rabtap/pkg/testcommon" + "github.com/streadway/amqp" "github.com/stretchr/testify/assert" ) @@ -94,14 +95,14 @@ func TestIntegrationAmqpQueueCreateBindUnbindAndRemove(t *testing.T) { assert.NotEqual(t, -1, findQueue(queueTestName, queues)) // bind queue to exchange - err = BindQueueToExchange(session, queueTestName, keyTestName, exchangeTestName) + err = BindQueueToExchange(session, queueTestName, keyTestName, exchangeTestName, amqp.Table{}) assert.Nil(t, err) bindings, err := client.Bindings(context.TODO()) assert.Nil(t, err) assert.NotEqual(t, -1, findBinding(queueTestName, exchangeTestName, keyTestName, bindings)) // unbind queue from exchange - err = UnbindQueueFromExchange(session, queueTestName, keyTestName, exchangeTestName) + err = UnbindQueueFromExchange(session, queueTestName, keyTestName, exchangeTestName, amqp.Table{}) assert.Nil(t, err) bindings, err = client.Bindings(context.TODO()) assert.Nil(t, err) diff --git a/pkg/routing.go b/pkg/routing.go new file mode 100644 index 0000000..7237136 --- /dev/null +++ b/pkg/routing.go @@ -0,0 +1,58 @@ +package rabtap + +import ( + "fmt" + + "github.com/streadway/amqp" +) + +// Routing describes where a message should be published +type Routing struct { + key string + headers amqp.Table + exchange string +} + +func (s Routing) String() string { + r := fmt.Sprintf("exchange: '%s'", s.exchange) + if len(s.headers) > 0 { + r += fmt.Sprintf(", headers: %v", s.headers) + } + if s.key != "" { + r += fmt.Sprintf(", routingkey: '%s'", s.key) + } + return r +} + +func NewRouting(exchange, key string, headers amqp.Table) Routing { + amqpHeaders := amqp.Table{} + for k, v := range headers { + amqpHeaders[k] = v + } + return Routing{exchange: exchange, key: key, headers: amqpHeaders} +} + +func (s Routing) Exchange() string { + return s.exchange +} + +func (s Routing) Key() string { + return s.key +} + +func (s Routing) Headers() amqp.Table { + return s.headers +} + +// MergeTable merges the given amqp.Table's, the second one overrideing the +// values of the first one +func MergeTables(first, second amqp.Table) amqp.Table { + res := make(amqp.Table, len(first)+len(second)) + for k, v := range first { + res[k] = v + } + for k, v := range second { + res[k] = v + } + return res +} diff --git a/pkg/routing_test.go b/pkg/routing_test.go new file mode 100644 index 0000000..9c94e63 --- /dev/null +++ b/pkg/routing_test.go @@ -0,0 +1,30 @@ +package rabtap + +import ( + "testing" + + "github.com/streadway/amqp" + "github.com/stretchr/testify/assert" +) + +func TestNewRoutingConstructsRoutingWithAmqpTable(t *testing.T) { + + r := NewRouting("exchange", "key", amqp.Table{"A": "B"}) + + assert.Equal(t, "exchange", r.Exchange()) + assert.Equal(t, "key", r.Key()) + assert.Equal(t, amqp.Table{"A": "B"}, r.Headers()) +} + +func TestMergeTableMergesNilTablesIntoAnEmptyTable(t *testing.T) { + assert.Equal(t, amqp.Table{}, MergeTables(nil, nil)) +} + +func TestMergeTableMergesTwoTablesSecondOneOverridingFirstOne(t *testing.T) { + first := amqp.Table{"A": "B", "X": "Y"} + second := amqp.Table{"C": "D", "X": "W"} + + merged := MergeTables(first, second) + + assert.Equal(t, amqp.Table{"A": "B", "C": "D", "X": "W"}, merged) +} diff --git a/pkg/subscribe_test.go b/pkg/subscribe_test.go index c544686..105743e 100644 --- a/pkg/subscribe_test.go +++ b/pkg/subscribe_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/jandelgado/rabtap/pkg/testcommon" + "github.com/streadway/amqp" ) func TestSubscribe(t *testing.T) { @@ -26,7 +27,7 @@ func TestSubscribe(t *testing.T) { // bound to the connection which created them (other connections get // error RESOURCE_LOCKED (405)). CreateQueue(session, queueName, false /*durable*/, true /*ad*/, false /*excl*/) - BindQueueToExchange(session, queueName, keyName, "subtest-direct-exchange") + BindQueueToExchange(session, queueName, keyName, "subtest-direct-exchange", amqp.Table{}) finishChan := make(chan int) diff --git a/pkg/tap.go b/pkg/tap.go index af1661f..a186a08 100644 --- a/pkg/tap.go +++ b/pkg/tap.go @@ -165,5 +165,5 @@ func (s *AmqpTap) createExchangeToExchangeBinding(session Session, func (s *AmqpTap) bindQueueToExchange(session Session, exchangeName, bindingKey, queueName string) error { - return BindQueueToExchange(session, queueName, bindingKey, exchangeName) + return BindQueueToExchange(session, queueName, bindingKey, exchangeName, amqp.Table{}) }