Skip to content

Commit

Permalink
Pass optional arguments to sub command (#68)
Browse files Browse the repository at this point in the history
add --args option to sub command allowing to pass arbitrary options when consuming messages.

auto-ack no longer used, explicitly ack messages

add --offset command, queue-type and lazy aliases
  • Loading branch information
jandelgado authored Nov 26, 2021
1 parent 754f086 commit 140247d
Show file tree
Hide file tree
Showing 19 changed files with 498 additions and 181 deletions.
12 changes: 10 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
# Changelog for rabtap

## v1.34 (2021-11-24)

* new: specify multiple `--args=KEY=VALUE` options to pass additional arguments
to the `sub` command.
* new: create lazy queue with `queue create ... --lazy`
* new: specify queue type to create with `queue create ... --queue-type=TYPE`
* new: specify offset with `--offset=OFFSET` when reading from streams

## v1.33 (2021-11-14)

* new: specify multiple `--args=key=value` options to pass additional argzuments
to the queue and exchange create functions.
* new: specify multiple `--args=KEY=VALUE` options to pass additional arguments
to the `queue` and `exchange` commands.

## v1.32 (2021-11-13)

Expand Down
90 changes: 62 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ and exchanges, inspect broker.
* [Tap all messages published or delivered](#tap-all-messages-published-or-delivered)
* [Connect to multiple brokers](#connect-to-multiple-brokers)
* [Message recorder](#message-recorder)
* [Messages consumer (subscribe)](#messages-consumer-subscribe)
* [Consume Messages (subscribe)](#consume-messages-subscribe)
* [Publish messages](#publish-messages)
* [Poor mans shovel](#poor-mans-shovel)
* [Close connection](#close-connection)
Expand Down Expand Up @@ -69,7 +69,7 @@ and exchanges, inspect broker.
[RabbitMQ REST management API](https://rawcdn.githack.com/rabbitmq/rabbitmq-management/rabbitmq_v3_6_14/priv/www/api/index.html)
* save messages and meta data for later analysis and replay
* publish messages to exchanges
* consume messages from a queue (subscribe)
* consume messages from queues and streams (subscribe)
* supports TLS
* no runtime dependencies (statically linked golang single file binary)
* simple to use command line tool
Expand Down Expand Up @@ -133,7 +133,6 @@ compile from source.
## Usage

```
rabtap - RabbitMQ wire tap. github.com/jandelgado/rabtap
Usage:
Expand All @@ -146,26 +145,30 @@ Usage:
rabtap (tap --uri=URI EXCHANGES)... [--saveto=DIR] [--format=FORMAT] [--limit=NUM] [-jknsv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--format=FORMAT] [--limit=NUM]
[(--reject [--requeue])] [-jksvn]
[--offset=OFFSET] [--args=KV]... [(--reject [--requeue])] [-jksvn]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--format=FORMAT]
[--routingkey=KEY | (--header=KV)...]
[--confirms] [--mandatory] [--delay=DELAY | --speed=FACTOR] [-jkv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap exchange create EXCHANGE [--uri=URI] [--type=TYPE] [-adkv]
rabtap exchange create EXCHANGE [--uri=URI] [--type=TYPE] [--args=KV]... [-kv]
[--autodelete] [--durable]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap exchange rm EXCHANGE [--uri=URI] [-kv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap queue create QUEUE [--uri=URI] [-adkv]
rabtap queue create QUEUE [--uri=URI] [--queue-type=TYPE] [--args=KV]... [-kv]
[--autodelete] [--durable] [--lazy]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap queue bind QUEUE to EXCHANGE [--uri=URI] [-kv]
(--bindingkey=KEY | (--header=KV)... (--all|--any))
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap queue unbind QUEUE from EXCHANGE [--uri=URI] [-kv]
(--bindingkey=KEY | (--header=KV)... (--all|--any))
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap queue rm QUEUE [--uri=URI] [-kv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap queue purge QUEUE [--uri=URI] [-kv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap queue rm QUEUE [--uri=URI] [-kv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap queue purge QUEUE [--uri=URI] [-kv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap conn close CONNECTION [--api=APIURI] [--reason=REASON] [-kv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap --version
Expand All @@ -183,6 +186,8 @@ Arguments and options:
--any set x-match=any option in header based routing.
--api=APIURI connect to given API server. If APIURL is omitted,
the environment variable RABTAP_APIURI will be used.
--args=KV A key value pair in the form of "key=value" passed as
additional arguments. e.g. '--args=x-queue-type=quorum'
-b, --bindingkey=KEY binding key to use in bind queue command.
--by-connection output of info command starts with connections.
--confirms enable publisher confirms and wait for confirmations.
Expand All @@ -204,13 +209,19 @@ Arguments and options:
routing- or binding-key. Can occur multiple times.
-j, --json deprecated. Use "--format json" instead.
-k, --insecure allow insecure TLS connections (no certificate check).
--lazy create a lazy queue.
--limit=NUM Stop afer NUM messages were received. When set to 0, will
run until terminated [default: 0].
--mandatory enable mandatory publishing (messages must be delivered to queue).
--mode=MODE mode for info command. One of "byConnection", "byExchange".
[default: byExchange].
-n, --no-color don't colorize output (see also environment variable NO_COLOR).
--omit-empty don't show echanges without bindings in info command.
--offset=OFFSET Offset when reading from a stream. Can be 'first', 'last',
'next', a duration like '10m', a RFC3339-Timestamp or
an integer index value. Basically it is an alias for
'--args=x-stream-offset=OFFSET'.
--queue-type=TYPE type of queue [default: classic].
--reason=REASON reason why the connection was closed [default: closed by rabtap].
--reject Reject messages. Default behaviour is to acknowledge messages.
--requeue Instruct broker to requeue rejected message
Expand All @@ -222,7 +233,7 @@ Arguments and options:
-s, --silent suppress message output to stdout.
--speed=FACTOR Speed factor to use during publish [default: 1.0].
--stats include statistics in output of info command.
-t, --type=TYPE exchange type [default: fanout].
-t, --type=TYPE type of exchange [default: fanout].
--tls-cert-file=CERTFILE A Cert file to use for client authentication.
--tls-key-file=KEYFILE A Key file to use for client authentication.
--tls-ca-file=CAFILE A CA Cert file to use with TLS.
Expand Down Expand Up @@ -473,34 +484,58 @@ message the body base64 encode. Examples:
Files are created with file name `rabtap-`+`<Unix-Nano-Timestamp>`+ `.` +
`<extension>`.

#### Messages consumer (subscribe)
#### Consume Messages (subscribe)

The `sub` command reads messages from a queue. Note that unlike `tap`, `sub`
will consume messages that are in effect removed from the specified queue.
The `sub` command reads messages from a queue or a stream. The general form
of the `sub` command is:

```
rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--format=FORMAT] [--limit=NUM]
[--offset=OFFSET] [--args=KV]... [(--reject [--requeue])] [-jksvn]
```

Use the `--limit=NUM` option to limit the number of received messages. If
specified, rabtap will terminate, after `NUM` messages were successfully
received.
read.

Use the `--reject` option to 'nack' messages, which in turn will be discarded
by th broker or routed to a configured dead letter exchange (DLX). if
by the broker or routed to a configured dead letter exchange (DLX). if
`--requeue` is also set, the message will be returned to the queue.

Example:
The `--offset=OFFSET` option is used when subscribing to streams. Streams are
append-only data structures with non-destructive semantics and were introduced
with RabbitMQ 3.9. The `OFFSET` parameter specifies where to start reading from the
stream and must be any of: `first`, `last`, `next`, a numerical offset, a
RFC3339-Timestamp or a duration specification like `10m`. Consult the RabbitMQ
documentation for more information on [streams](https://www.rabbitmq.com/streams.html).

Examples:

* `$ rabtap sub somequeue --format json` - will consume messages from queue
* `$ rabtap sub somequeue --format=json` - will consume messages from queue
`somequeue` and print out messages in JSON format. The Example assumes that
`RABTAP_AMQPURI` environment variable is set, as the `--uri AMQPURI`
`RABTAP_AMQPURI` environment variable is set, as the `--uri=AMQPURI`
parameter is omitted.
* `rabtap sub somequeue --limit 1 --reject --requeue` - consume one message
* `rabtap sub somequeue --limit=1 --reject --requeue` - consume one message
from the queue `somequeue` and let the broker requeue the message.
* `rabtap sub mystream --offset=first` - read all messages from stream
`mystream`.
* `rabtap sub mystream --offset=50` - read messages from stream `mystream`
starting with the 50th message.
* `rabtap sub mystream --offset=10m` - read messages from stream `mystream`
which are aged 10 minutes or less.

#### Publish messages

The `pub` command is used to publish messages to an exchange. The messages to
be published are either read from a file, or from a directory which contains
previously recorded messages (e.g. using the `--saveto` option of the `tap`
command).
command). The general form of the `pub` command is:

```
rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--format=FORMAT]
[--routingkey=KEY | (--header=HEADERKV)...]
[--confirms] [--mandatory] [--delay=DELAY | --speed=FACTOR] [-jkv]
```

Message routing is either specified with a routing key and the `--routingkey`
option or, when header based routing should be used, by specifying the headers
Expand All @@ -524,12 +559,7 @@ When the `--mandatory` option is set, rabtap publishes message in mandatory
mode. If set and a message can not be delivered to a queue, the server returns
the message and rabtap will log an error.

The general form of the `pub` command is
```
rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--format=FORMAT]
[--routingkey=KEY | (--header=HEADERKV)...]
[--confirms] [--mandatory] [--delay=DELAY | --speed=FACTOR] [-jkv]
```
Examples:

* `$ echo hello | rabtap pub --exchange amq.fanout` - publish "hello" to
exchange amqp.fanout
Expand Down Expand Up @@ -646,9 +676,13 @@ RabbitMQ using the `--args=key=value` syntax. This allows for example to specify
the queue type or mode:

* `rabtap queue create quorum_queue --args=x-queue-type=quorum --durable` -
create a quorum queue named `quorum_queue`
* `rabtap queue create lazy_queue --args=x-queue-mode=lazy` - create a lazy
queue named `lazy_queue`
create a quorum queue named `quorum_queue`. The same can be achieved by using
the `--queue-type` option, which is an alias for setting the arg `x-queue-type`:
`rabtap queue create quorum --queue-type=quorum --durable`
* `rabtap queue create mystream --queue-type=stream --durable` - create a stream
* `rabtap queue create lazy_queue --lazy` - create a classic queue in lazy
mode that is named `lazy_queue`. `--lazy` is an alias for setting the arg
`x-queue-mode`.

## JSON message format

Expand Down
21 changes: 12 additions & 9 deletions cmd/rabtap/cmd_subscribe.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// Copyright (C) 2017 Jan Delgado
// subscribe cli command handler
// Copyright (C) 2017-2021 Jan Delgado

package main

// subscribe cli command handler

import (
"context"
"crypto/tls"
"fmt"
"net/url"

rabtap "github.com/jandelgado/rabtap/pkg"
Expand All @@ -22,6 +22,7 @@ type CmdSubscribeArg struct {
messageReceiveLoopPred MessageReceiveLoopPred
reject bool
requeue bool
args rabtap.KeyValueMap
}

// cmdSub subscribes to messages from the given queue
Expand All @@ -31,21 +32,23 @@ func cmdSubscribe(ctx context.Context, cmd CmdSubscribeArg) error {
ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)

messageChannel := make(rabtap.TapChannel)
config := rabtap.AmqpSubscriberConfig{Exclusive: false, AutoAck: false}
config := rabtap.AmqpSubscriberConfig{
Exclusive: false,
Args: rabtap.ToAMQPTable(cmd.args)}
subscriber := rabtap.NewAmqpSubscriber(config, cmd.amqpURL, cmd.tlsConfig, log)

g.Go(func() error { return subscriber.EstablishSubscription(ctx, cmd.queue, messageChannel) })
messageChannel := make(rabtap.TapChannel)
errorChannel := make(rabtap.SubscribeErrorChannel)
g.Go(func() error { return subscriber.EstablishSubscription(ctx, cmd.queue, messageChannel, errorChannel) })
g.Go(func() error {
acknowledger := createAcknowledgeFunc(cmd.reject, cmd.requeue)
err := messageReceiveLoop(ctx, messageChannel, cmd.messageReceiveFunc, cmd.messageReceiveLoopPred, acknowledger)
err := messageReceiveLoop(ctx, messageChannel, errorChannel, cmd.messageReceiveFunc, cmd.messageReceiveLoopPred, acknowledger)
cancel()
return err
})

if err := g.Wait(); err != nil {
log.Errorf("subscribe failed with %v", err)
return err
return fmt.Errorf("subscribe failed: %w", err)
}
return nil
}
5 changes: 3 additions & 2 deletions cmd/rabtap/cmd_tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ func cmdTap(ctx context.Context, tapConfig []rabtap.TapConfiguration, tlsConfig
g, ctx := errgroup.WithContext(ctx)

tapMessageChannel := make(rabtap.TapChannel)
errorChannel := make(rabtap.SubscribeErrorChannel)

for _, config := range tapConfig {
tap := rabtap.NewAmqpTap(config.AMQPURL, tlsConfig, log)
g.Go(func() error {
return tap.EstablishTap(ctx, config.Exchanges, tapMessageChannel)
return tap.EstablishTap(ctx, config.Exchanges, tapMessageChannel, errorChannel)
})
}
g.Go(func() error {
acknowledger := createAcknowledgeFunc(false, false) // ACK
err := messageReceiveLoop(ctx, tapMessageChannel, messageReceiveFunc, pred, acknowledger)
err := messageReceiveLoop(ctx, tapMessageChannel, errorChannel, messageReceiveFunc, pred, acknowledger)
cancel()
return err
})
Expand Down
Loading

0 comments on commit 140247d

Please sign in to comment.