Skip to content

Commit

Permalink
support header based routing (#64)
Browse files Browse the repository at this point in the history
header based routing is now supported in pub, queue bind and queue
unbind commands.
  • Loading branch information
jandelgado authored Oct 25, 2021
1 parent a09e3b7 commit a9ff80d
Show file tree
Hide file tree
Showing 20 changed files with 525 additions and 146 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog for rabtap

## v1.30 (2021-10-13)

* new: support header based routing (pub, queue bind, queue unbind)

## v1.29 (2021-09-15)

* new: add a docker image (ghcr.io/jandelgado/rabtap)
Expand Down
59 changes: 40 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ Usage:
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--format=FORMAT] [--no-auto-ack] [-jksvn]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--routingkey=KEY] [--format=FORMAT]
rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--format=FORMAT]
[--routingkey=KEY | (--header=KV)...]
[--confirms] [--mandatory] [--delay=DELAY | --speed=FACTOR] [-jkv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap exchange create EXCHANGE [--uri=URI] [--type=TYPE] [-adkv]
Expand All @@ -154,9 +155,11 @@ Usage:
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap queue create QUEUE [--uri=URI] [-adkv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap queue bind QUEUE to EXCHANGE --bindingkey=KEY [--uri=URI] [-kv]
rabtap queue bind QUEUE to EXCHANGE [--uri=URI] [-kv]
(--bindingkey=KEY | (--header=KV)... (--all|--any))
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap queue unbind QUEUE from EXCHANGE --bindingkey=KEY [--uri=URI] [-kv]
rabtap queue unbind QUEUE from EXCHANGE [--uri=URI] [-kv]
(--bindingkey=KEY | (--header=KV)... (--all|--any))
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap queue rm QUEUE [--uri=URI] [-kv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap queue purge QUEUE [--uri=URI] [-kv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
Expand All @@ -165,14 +168,16 @@ Usage:
rabtap --version
Arguments and options:
EXCHANGES comma-separated list of exchanges and binding keys,
EXCHANGES comma-separated list of exchanges and optional binding keys,
e.g. amq.topic:# or exchange1:key1,exchange2:key2.
EXCHANGE name of an exchange, e.g. amq.direct.
SOURCE file or directory to publish in pub mode. If omitted, stdin will be read.
QUEUE name of a queue.
CONNECTION name of a connection.
DIR directory to read messages from.
-a, --autodelete create auto delete exchange/queue.
--all set x-match=all option in header based routing.
--any set x-match=any option in header based routing.
--api=APIURI connect to given API server. If APIURL is omitted,
the environment variable RABTAP_APIURI will be used.
-b, --bindingkey=KEY binding key to use in bind queue command.
Expand All @@ -191,6 +196,8 @@ Arguments and options:
* for info command: controls generated output format. Valid
options are: "text", "dot". Default: text
-h, --help print this help.
--header=KV A key value pair in the form of "key=value" used as a
routing- or binding-key. Can occur multiple times.
-j, --json deprecated. Use "--format json" instead.
-k, --insecure allow insecure TLS connections (no certificate check).
--tls-cert-file=CERTFILE A Cert file to use for client authentication.
Expand Down Expand Up @@ -474,26 +481,40 @@ Example assumes that `RABTAP_AMQPURI` environment variable is set, as the
The `pub` command is used to publish messages to an exchange with a routing
key. The messages to be published are either read from a file, or from a
directory which contains previously recorded messages (e.g. using the
`--saveto` option of the `tap` command). Messages can be published either in
raw format, in which they are send as-is, or in [JSON-format, as described
here](#json-message-format), which includes message metadata and the body in a
single JSON document.
`--saveto` option of the `tap` command).

Message routing is either specified with a routing key and the `--routingkey`
option or, when header based routing should be used, by specifying the headers
with the `--header` option. Each header is specified in the form `KEY=VALUE`.
Multiple headers can be specified by specifying multiple `--header` options.

Messages can be published either in raw format, in which they are send as-is,
or in [JSON-format, as described here](#json-message-format), which includes
message metadata and the body in a single JSON document. When multiple messages
are published with metadata, rabtap will calculate the time elapsed of
consecutive recorded messages using the metadata, and delay publishing
accordingly. To set the publishing delay to a fix value, use the `--delay`
option. To publish without delays, use `--delay=0s`. To modify publishing speed
use the `--speed` option, which allows to set a factor to apply to the delays.

The general form of the `pub` command is
```
rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--routingkey=KEY]
[--confirms] [--mandatory] [--format=FORMAT]
[--delay=DELAY | --speed=FACTOR] [-jkv]
rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--format=FORMAT]
[--routingkey=KEY | (--header=HEADERKV)...]
[--confirms] [--mandatory] [--delay=DELAY | --speed=FACTOR] [-jkv]
```

* `$ echo hello | rabtap pub amq.fanout` - publish "hello" to exchange amqp.fanout
* `$ rabtap pub messages.json --format=json` - messages are read from file `messages.json`
in [rabtap JSON format](#json-message-format). Target exchange and routing
keys are read from the messages meta data. The `messages.json` file can
contain multiple JSON documents as it is treated as a JSON stream. Rabtap
will honor the `XRabtapReceived` timestamps of the messages and by default
delay the messages as they were recorded. This behaviour can be overridden
by the `--delay` and `--speed` options.
* `$ echo hello | rabtap pub --exchange amq.fanout` - publish "hello" to
exchange amqp.fanout
* `echo "hello" | rabtap pub --exchange amq.header --header KEY=VAL --header X=Y` -
publish hello to exchange amq.header use set message headers.
* `$ rabtap pub messages.json --format=json` - messages are read from file
`messages.json` in [rabtap JSON format](#json-message-format). Target
exchange and routing keys are read from the messages meta data. The
`messages.json` file can contain multiple JSON documents as it is treated as
a JSON stream. Rabtap will honor the `XRabtapReceived` timestamps of the
messages and by default will delay the messages as they were recorded. This
behaviour can be overridden by the `--delay` and `--speed` options.
* `$ rabtap pub --exchange amq.direct -r myKey --format=json messages.json --delay=0s` - as
before, but publish messages always to exchange `amq.direct` with routing key
`myKey` and without any delays.
Expand Down
36 changes: 25 additions & 11 deletions cmd/rabtap/cmd_publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type CmdPublishArg struct {
tlsConfig *tls.Config
exchange *string
routingKey *string
headers rabtap.KeyValueMap
readerFunc MessageReaderFunc
speed float64
fixedDelay *time.Duration
Expand Down Expand Up @@ -55,15 +56,11 @@ func durationBetweenMessages(first, second *RabtapPersistentMessage,
// publishMessage publishes a single message on the given exchange with the
// provided routingkey
func publishMessage(publishChannel rabtap.PublishChannel,
exchange, routingKey string,
routing rabtap.Routing,
amqpPublishing amqp.Publishing) {

log.Debugf("publishing message to exchange '%s' with routing key '%s'",
exchange, routingKey)

publishChannel <- &rabtap.PublishMessage{
Exchange: exchange,
RoutingKey: routingKey,
Routing: routing,
Publishing: &amqpPublishing}
}

Expand All @@ -76,10 +73,21 @@ func selectOptionalOrDefault(optionalStr *string, defaultStr string) string {
return defaultStr
}

// routingFromMessage creates a Routing from a message and optional defaults
// that can override fields in the message
func routingFromMessage(optExchange, optRoutingKey *string, headers rabtap.KeyValueMap, msg RabtapPersistentMessage) rabtap.Routing {
routingKey := selectOptionalOrDefault(optRoutingKey, msg.RoutingKey)
exchange := selectOptionalOrDefault(optExchange, msg.Exchange)
mergedHeaders := rabtap.MergeTables(msg.Headers, rabtap.ToAMQPTable(headers))
return rabtap.NewRouting(exchange, routingKey, mergedHeaders)
}

// publishMessageStream publishes messages from the provided message stream
// provided by readNextMessageFunc. When done closes the publishChannel
func publishMessageStream(publishCh rabtap.PublishChannel,
optExchange, optRoutingKey *string,
optExchange *string,
optRoutingKey *string,
headers rabtap.KeyValueMap,
readNextMessageFunc MessageReaderFunc,
delayFunc DelayFunc) error {

Expand All @@ -95,9 +103,15 @@ func publishMessageStream(publishCh rabtap.PublishChannel,
return nil
case nil:
delayFunc(lastMsg, &msg)
routingKey := selectOptionalOrDefault(optRoutingKey, msg.RoutingKey)
exchange := selectOptionalOrDefault(optExchange, msg.Exchange)
publishMessage(publishCh, exchange, routingKey, msg.ToAmqpPublishing())

// the per-message routing key (in case it was read from a json
// file) can be overriden by the command line, if set.
routing := routingFromMessage(optExchange, optRoutingKey, headers, msg)

// during publishing, header information in msg.Header will be overriden
// by header information in the routing object (if present). The
// latter are set on the command line using --header K=V options.
publishMessage(publishCh, routing, msg.ToAmqpPublishing())
lastMsg = &msg
default:
return err
Expand Down Expand Up @@ -146,7 +160,7 @@ func cmdPublish(ctx context.Context, cmd CmdPublishArg) error {
// avoid blocking when e.g. the user presses CTRL+S and then CTRL+C.
// TODO find better solution
resultCh <- publishMessageStream(publishCh, cmd.exchange,
cmd.routingKey, cmd.readerFunc, delayFunc)
cmd.routingKey, cmd.headers, cmd.readerFunc, delayFunc)
}()

g.Go(func() error {
Expand Down
37 changes: 31 additions & 6 deletions cmd/rabtap/cmd_publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,31 @@ import (
"github.com/stretchr/testify/require"
)

func TestRoutingFromMessageUsesOptExchangeWhenSpecified(t *testing.T) {

key := "okey"
exchange := "oexchange"
testMsg := RabtapPersistentMessage{Exchange: "exchange", RoutingKey: "key", Headers: amqp.Table{"A": "B"}}

tests := []struct {
optKey *string
optExchange *string
msg RabtapPersistentMessage
headers rabtap.KeyValueMap
expected rabtap.Routing
}{
{optKey: nil, optExchange: nil, msg: testMsg, headers: nil, expected: rabtap.NewRouting("exchange", "key", amqp.Table{"A": "B"})},
{optKey: nil, optExchange: nil, msg: testMsg, headers: rabtap.KeyValueMap{"A": "X"}, expected: rabtap.NewRouting("exchange", "key", amqp.Table{"A": "X"})},
{optKey: &key, optExchange: &exchange, msg: testMsg, headers: nil, expected: rabtap.NewRouting("oexchange", "okey", amqp.Table{"A": "B"})},
}

for _, tc := range tests {
routing := routingFromMessage(tc.optExchange, tc.optKey, tc.headers, tc.msg)
assert.Equal(t, tc.expected, routing, tc)
}

}

func TestMultDurationReturnsCorrectValue(t *testing.T) {
assert.Equal(t, time.Duration(50), multDuration(time.Duration(100), 0.5))
}
Expand Down Expand Up @@ -74,14 +99,14 @@ func TestPublishMessageStreamPublishesNextMessage(t *testing.T) {

pubCh := make(rabtap.PublishChannel, 1)
exchange := "exchange"
routingKey := "key"
err := publishMessageStream(pubCh, &exchange, &routingKey, mockReader, delayer)
key := "key"
err := publishMessageStream(pubCh, &exchange, &key, rabtap.KeyValueMap{}, mockReader, delayer)

assert.Nil(t, err)
select {
case message := <-pubCh:
assert.Equal(t, "exchange", message.Exchange)
assert.Equal(t, "key", message.RoutingKey)
assert.Equal(t, "exchange", message.Routing.Exchange())
assert.Equal(t, "key", message.Routing.Key())
assert.Equal(t, "hello", string(message.Publishing.Body))
case <-time.After(time.Second * 2):
assert.Fail(t, "did not receive message within expected time")
Expand All @@ -103,8 +128,8 @@ func TestPublishMessageStreamPropagatesMessageReadError(t *testing.T) {

pubCh := make(rabtap.PublishChannel)
exchange := ""
routingKey := ""
err := publishMessageStream(pubCh, &exchange, &routingKey, mockReader, delayer)
key := "key"
err := publishMessageStream(pubCh, &exchange, &key, rabtap.KeyValueMap{}, mockReader, delayer)
assert.Equal(t, errors.New("error"), err)
}

Expand Down
56 changes: 37 additions & 19 deletions cmd/rabtap/cmd_queue.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
// Copyright (C) 2017 Jan Delgado
// rabtap queue related commands
// Copyright (C) 2017-2021 Jan Delgado

package main

// exchange related cli command handlers

import (
"crypto/tls"
"net/url"
Expand All @@ -21,6 +20,24 @@ type CmdQueueCreateArg struct {
tlsConfig *tls.Config
}

type CmdQueueBindArg struct {
amqpURL *url.URL
queue string
exchange string
key string
headers rabtap.KeyValueMap
headerMode HeaderMode
tlsConfig *tls.Config
}

func amqpHeaderRoutingMode(mode HeaderMode) string {
modes := map[HeaderMode]string{
HeaderMatchAny: "any",
HeaderMatchAll: "all",
HeaderNone: ""}
return modes[mode]
}

// cmdQueueCreate creates a new queue on the given broker
func cmdQueueCreate(cmd CmdQueueCreateArg) {
failOnError(rabtap.SimpleAmqpConnector(cmd.amqpURL,
Expand Down Expand Up @@ -59,27 +76,28 @@ func cmdQueuePurge(amqpURL *url.URL, queueName string, tlsConfig *tls.Config) {
}

// cmdQueueBindToExchange binds a queue to an exchange
func cmdQueueBindToExchange(amqpURL *url.URL, queueName, key, exchangeName string,
tlsConfig *tls.Config) {

failOnError(rabtap.SimpleAmqpConnector(amqpURL,
tlsConfig,
func cmdQueueBindToExchange(cmd CmdQueueBindArg) {
failOnError(rabtap.SimpleAmqpConnector(cmd.amqpURL, cmd.tlsConfig,
func(session rabtap.Session) error {
log.Debugf("binding queue %s to exchange %s w/ key %s",
queueName, exchangeName, key)
return rabtap.BindQueueToExchange(session, queueName, key, exchangeName)
if cmd.headerMode != HeaderNone {
cmd.headers["x-match"] = amqpHeaderRoutingMode(cmd.headerMode)
}
log.Debugf("binding queue %s to exchange %s w/ key %s and headers %v",
cmd.queue, cmd.exchange, cmd.key, cmd.headers)

return rabtap.BindQueueToExchange(session, cmd.queue, cmd.key, cmd.exchange, rabtap.ToAMQPTable(cmd.headers))
}), "bind queue failed", os.Exit)
}

// cmdQueueUnbindFromExchange unbinds a queue from an exchange
func cmdQueueUnbindFromExchange(amqpURL *url.URL, queueName, key, exchangeName string,
tlsConfig *tls.Config) {

failOnError(rabtap.SimpleAmqpConnector(amqpURL,
tlsConfig,
func cmdQueueUnbindFromExchange(cmd CmdQueueBindArg) {
failOnError(rabtap.SimpleAmqpConnector(cmd.amqpURL, cmd.tlsConfig,
func(session rabtap.Session) error {
log.Debugf("unbinding queue %s from exchange %s w/ key %s",
queueName, exchangeName, key)
return rabtap.UnbindQueueFromExchange(session, queueName, key, exchangeName)
if cmd.headerMode != HeaderNone {
cmd.headers["x-match"] = amqpHeaderRoutingMode(cmd.headerMode)
}
log.Debugf("unbinding queue %s from exchange %s w/ key %s and headers %v",
cmd.queue, cmd.exchange, cmd.key, cmd.headers)
return rabtap.UnbindQueueFromExchange(session, cmd.queue, cmd.key, cmd.exchange, rabtap.ToAMQPTable(cmd.headers))
}), "unbind queue failed", os.Exit)
}
6 changes: 6 additions & 0 deletions cmd/rabtap/cmd_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ import (
"github.com/stretchr/testify/require"
)

func TestAmqpHeaderRoutingModeConverts(t *testing.T) {
assert.Equal(t, "all", amqpHeaderRoutingMode(HeaderMatchAll))
assert.Equal(t, "any", amqpHeaderRoutingMode(HeaderMatchAny))
assert.Equal(t, "", amqpHeaderRoutingMode(HeaderNone))
}

func TestIntegrationCmdQueueCreatePurgeiBindUnbindQueue(t *testing.T) {

// integration tests queue creation, bind to exchange, purge,
Expand Down
1 change: 1 addition & 0 deletions cmd/rabtap/cmd_subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func TestCmdSub(t *testing.T) {
amqpURL: amqpURL,
exchange: &testExchange,
routingKey: &testKey,
headers: rabtap.KeyValueMap{},
tlsConfig: tlsConfig,
readerFunc: func() (RabtapPersistentMessage, bool, error) {
// provide exactly one message
Expand Down
Loading

0 comments on commit a9ff80d

Please sign in to comment.