diff --git a/CHANGELOG.md b/CHANGELOG.md index a5e682c..e094499 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,17 @@ # Changelog for rabtap +## v1.34 (2021-11-24) + +* new: specify multiple `--args=KEY=VALUE` options to pass additional arguments + to the `sub` command. +* new: create lazy queue with `queue create ... --lazy` +* new: specify queue type to create with `queue create ... --queue-type=TYPE` +* new: specify offset with `--offset=OFFSET` when reading from streams + ## v1.33 (2021-11-14) -* new: specify multiple `--args=key=value` options to pass additional argzuments - to the queue and exchange create functions. +* new: specify multiple `--args=KEY=VALUE` options to pass additional arguments + to the `queue` and `exchange` commands. ## v1.32 (2021-11-13) diff --git a/README.md b/README.md index f00147a..3ce6ea1 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ and exchanges, inspect broker. * [Tap all messages published or delivered](#tap-all-messages-published-or-delivered) * [Connect to multiple brokers](#connect-to-multiple-brokers) * [Message recorder](#message-recorder) - * [Messages consumer (subscribe)](#messages-consumer-subscribe) + * [Consume Messages (subscribe)](#consume-messages-subscribe) * [Publish messages](#publish-messages) * [Poor mans shovel](#poor-mans-shovel) * [Close connection](#close-connection) @@ -69,7 +69,7 @@ and exchanges, inspect broker. [RabbitMQ REST management API](https://rawcdn.githack.com/rabbitmq/rabbitmq-management/rabbitmq_v3_6_14/priv/www/api/index.html) * save messages and meta data for later analysis and replay * publish messages to exchanges -* consume messages from a queue (subscribe) +* consume messages from queues and streams (subscribe) * supports TLS * no runtime dependencies (statically linked golang single file binary) * simple to use command line tool @@ -133,7 +133,6 @@ compile from source. ## Usage ``` - rabtap - RabbitMQ wire tap. github.com/jandelgado/rabtap Usage: @@ -146,17 +145,19 @@ Usage: rabtap (tap --uri=URI EXCHANGES)... [--saveto=DIR] [--format=FORMAT] [--limit=NUM] [-jknsv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--format=FORMAT] [--limit=NUM] - [(--reject [--requeue])] [-jksvn] + [--offset=OFFSET] [--args=KV]... [(--reject [--requeue])] [-jksvn] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--format=FORMAT] [--routingkey=KEY | (--header=KV)...] [--confirms] [--mandatory] [--delay=DELAY | --speed=FACTOR] [-jkv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] - rabtap exchange create EXCHANGE [--uri=URI] [--type=TYPE] [-adkv] + rabtap exchange create EXCHANGE [--uri=URI] [--type=TYPE] [--args=KV]... [-kv] + [--autodelete] [--durable] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap exchange rm EXCHANGE [--uri=URI] [-kv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] - rabtap queue create QUEUE [--uri=URI] [-adkv] + rabtap queue create QUEUE [--uri=URI] [--queue-type=TYPE] [--args=KV]... [-kv] + [--autodelete] [--durable] [--lazy] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap queue bind QUEUE to EXCHANGE [--uri=URI] [-kv] (--bindingkey=KEY | (--header=KV)... (--all|--any)) @@ -164,8 +165,10 @@ Usage: rabtap queue unbind QUEUE from EXCHANGE [--uri=URI] [-kv] (--bindingkey=KEY | (--header=KV)... (--all|--any)) [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] - rabtap queue rm QUEUE [--uri=URI] [-kv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] - rabtap queue purge QUEUE [--uri=URI] [-kv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] + rabtap queue rm QUEUE [--uri=URI] [-kv] + [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] + rabtap queue purge QUEUE [--uri=URI] [-kv] + [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap conn close CONNECTION [--api=APIURI] [--reason=REASON] [-kv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap --version @@ -183,6 +186,8 @@ Arguments and options: --any set x-match=any option in header based routing. --api=APIURI connect to given API server. If APIURL is omitted, the environment variable RABTAP_APIURI will be used. + --args=KV A key value pair in the form of "key=value" passed as + additional arguments. e.g. '--args=x-queue-type=quorum' -b, --bindingkey=KEY binding key to use in bind queue command. --by-connection output of info command starts with connections. --confirms enable publisher confirms and wait for confirmations. @@ -204,6 +209,7 @@ Arguments and options: routing- or binding-key. Can occur multiple times. -j, --json deprecated. Use "--format json" instead. -k, --insecure allow insecure TLS connections (no certificate check). + --lazy create a lazy queue. --limit=NUM Stop afer NUM messages were received. When set to 0, will run until terminated [default: 0]. --mandatory enable mandatory publishing (messages must be delivered to queue). @@ -211,6 +217,11 @@ Arguments and options: [default: byExchange]. -n, --no-color don't colorize output (see also environment variable NO_COLOR). --omit-empty don't show echanges without bindings in info command. + --offset=OFFSET Offset when reading from a stream. Can be 'first', 'last', + 'next', a duration like '10m', a RFC3339-Timestamp or + an integer index value. Basically it is an alias for + '--args=x-stream-offset=OFFSET'. + --queue-type=TYPE type of queue [default: classic]. --reason=REASON reason why the connection was closed [default: closed by rabtap]. --reject Reject messages. Default behaviour is to acknowledge messages. --requeue Instruct broker to requeue rejected message @@ -222,7 +233,7 @@ Arguments and options: -s, --silent suppress message output to stdout. --speed=FACTOR Speed factor to use during publish [default: 1.0]. --stats include statistics in output of info command. - -t, --type=TYPE exchange type [default: fanout]. + -t, --type=TYPE type of exchange [default: fanout]. --tls-cert-file=CERTFILE A Cert file to use for client authentication. --tls-key-file=KEYFILE A Key file to use for client authentication. --tls-ca-file=CAFILE A CA Cert file to use with TLS. @@ -473,34 +484,58 @@ message the body base64 encode. Examples: Files are created with file name `rabtap-`+``+ `.` + ``. -#### Messages consumer (subscribe) +#### Consume Messages (subscribe) -The `sub` command reads messages from a queue. Note that unlike `tap`, `sub` -will consume messages that are in effect removed from the specified queue. +The `sub` command reads messages from a queue or a stream. The general form +of the `sub` command is: + +``` +rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--format=FORMAT] [--limit=NUM] + [--offset=OFFSET] [--args=KV]... [(--reject [--requeue])] [-jksvn] +``` Use the `--limit=NUM` option to limit the number of received messages. If specified, rabtap will terminate, after `NUM` messages were successfully -received. +read. Use the `--reject` option to 'nack' messages, which in turn will be discarded -by th broker or routed to a configured dead letter exchange (DLX). if +by the broker or routed to a configured dead letter exchange (DLX). if `--requeue` is also set, the message will be returned to the queue. -Example: +The `--offset=OFFSET` option is used when subscribing to streams. Streams are +append-only data structures with non-destructive semantics and were introduced +with RabbitMQ 3.9. The `OFFSET` parameter specifies where to start reading from the +stream and must be any of: `first`, `last`, `next`, a numerical offset, a +RFC3339-Timestamp or a duration specification like `10m`. Consult the RabbitMQ +documentation for more information on [streams](https://www.rabbitmq.com/streams.html). + +Examples: -* `$ rabtap sub somequeue --format json` - will consume messages from queue +* `$ rabtap sub somequeue --format=json` - will consume messages from queue `somequeue` and print out messages in JSON format. The Example assumes that - `RABTAP_AMQPURI` environment variable is set, as the `--uri AMQPURI` + `RABTAP_AMQPURI` environment variable is set, as the `--uri=AMQPURI` parameter is omitted. -* `rabtap sub somequeue --limit 1 --reject --requeue` - consume one message +* `rabtap sub somequeue --limit=1 --reject --requeue` - consume one message from the queue `somequeue` and let the broker requeue the message. +* `rabtap sub mystream --offset=first` - read all messages from stream + `mystream`. +* `rabtap sub mystream --offset=50` - read messages from stream `mystream` + starting with the 50th message. +* `rabtap sub mystream --offset=10m` - read messages from stream `mystream` + which are aged 10 minutes or less. #### Publish messages The `pub` command is used to publish messages to an exchange. The messages to be published are either read from a file, or from a directory which contains previously recorded messages (e.g. using the `--saveto` option of the `tap` -command). +command). The general form of the `pub` command is: + +``` +rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--format=FORMAT] + [--routingkey=KEY | (--header=HEADERKV)...] + [--confirms] [--mandatory] [--delay=DELAY | --speed=FACTOR] [-jkv] +``` Message routing is either specified with a routing key and the `--routingkey` option or, when header based routing should be used, by specifying the headers @@ -524,12 +559,7 @@ When the `--mandatory` option is set, rabtap publishes message in mandatory mode. If set and a message can not be delivered to a queue, the server returns the message and rabtap will log an error. -The general form of the `pub` command is -``` -rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--format=FORMAT] - [--routingkey=KEY | (--header=HEADERKV)...] - [--confirms] [--mandatory] [--delay=DELAY | --speed=FACTOR] [-jkv] -``` +Examples: * `$ echo hello | rabtap pub --exchange amq.fanout` - publish "hello" to exchange amqp.fanout @@ -646,9 +676,13 @@ RabbitMQ using the `--args=key=value` syntax. This allows for example to specify the queue type or mode: * `rabtap queue create quorum_queue --args=x-queue-type=quorum --durable` - - create a quorum queue named `quorum_queue` -* `rabtap queue create lazy_queue --args=x-queue-mode=lazy` - create a lazy - queue named `lazy_queue` + create a quorum queue named `quorum_queue`. The same can be achieved by using + the `--queue-type` option, which is an alias for setting the arg `x-queue-type`: + `rabtap queue create quorum --queue-type=quorum --durable` +* `rabtap queue create mystream --queue-type=stream --durable` - create a stream +* `rabtap queue create lazy_queue --lazy` - create a classic queue in lazy + mode that is named `lazy_queue`. `--lazy` is an alias for setting the arg + `x-queue-mode`. ## JSON message format diff --git a/cmd/rabtap/cmd_subscribe.go b/cmd/rabtap/cmd_subscribe.go index 3945f16..3ef5007 100644 --- a/cmd/rabtap/cmd_subscribe.go +++ b/cmd/rabtap/cmd_subscribe.go @@ -1,12 +1,12 @@ -// Copyright (C) 2017 Jan Delgado +// subscribe cli command handler +// Copyright (C) 2017-2021 Jan Delgado package main -// subscribe cli command handler - import ( "context" "crypto/tls" + "fmt" "net/url" rabtap "github.com/jandelgado/rabtap/pkg" @@ -22,6 +22,7 @@ type CmdSubscribeArg struct { messageReceiveLoopPred MessageReceiveLoopPred reject bool requeue bool + args rabtap.KeyValueMap } // cmdSub subscribes to messages from the given queue @@ -31,21 +32,23 @@ func cmdSubscribe(ctx context.Context, cmd CmdSubscribeArg) error { ctx, cancel := context.WithCancel(ctx) g, ctx := errgroup.WithContext(ctx) - messageChannel := make(rabtap.TapChannel) - config := rabtap.AmqpSubscriberConfig{Exclusive: false, AutoAck: false} + config := rabtap.AmqpSubscriberConfig{ + Exclusive: false, + Args: rabtap.ToAMQPTable(cmd.args)} subscriber := rabtap.NewAmqpSubscriber(config, cmd.amqpURL, cmd.tlsConfig, log) - g.Go(func() error { return subscriber.EstablishSubscription(ctx, cmd.queue, messageChannel) }) + messageChannel := make(rabtap.TapChannel) + errorChannel := make(rabtap.SubscribeErrorChannel) + g.Go(func() error { return subscriber.EstablishSubscription(ctx, cmd.queue, messageChannel, errorChannel) }) g.Go(func() error { acknowledger := createAcknowledgeFunc(cmd.reject, cmd.requeue) - err := messageReceiveLoop(ctx, messageChannel, cmd.messageReceiveFunc, cmd.messageReceiveLoopPred, acknowledger) + err := messageReceiveLoop(ctx, messageChannel, errorChannel, cmd.messageReceiveFunc, cmd.messageReceiveLoopPred, acknowledger) cancel() return err }) if err := g.Wait(); err != nil { - log.Errorf("subscribe failed with %v", err) - return err + return fmt.Errorf("subscribe failed: %w", err) } return nil } diff --git a/cmd/rabtap/cmd_tap.go b/cmd/rabtap/cmd_tap.go index 150ffc7..5f24fcd 100644 --- a/cmd/rabtap/cmd_tap.go +++ b/cmd/rabtap/cmd_tap.go @@ -22,16 +22,17 @@ func cmdTap(ctx context.Context, tapConfig []rabtap.TapConfiguration, tlsConfig g, ctx := errgroup.WithContext(ctx) tapMessageChannel := make(rabtap.TapChannel) + errorChannel := make(rabtap.SubscribeErrorChannel) for _, config := range tapConfig { tap := rabtap.NewAmqpTap(config.AMQPURL, tlsConfig, log) g.Go(func() error { - return tap.EstablishTap(ctx, config.Exchanges, tapMessageChannel) + return tap.EstablishTap(ctx, config.Exchanges, tapMessageChannel, errorChannel) }) } g.Go(func() error { acknowledger := createAcknowledgeFunc(false, false) // ACK - err := messageReceiveLoop(ctx, tapMessageChannel, messageReceiveFunc, pred, acknowledger) + err := messageReceiveLoop(ctx, tapMessageChannel, errorChannel, messageReceiveFunc, pred, acknowledger) cancel() return err }) diff --git a/cmd/rabtap/command_line.go b/cmd/rabtap/command_line.go index 5a83d98..9a49b8c 100644 --- a/cmd/rabtap/command_line.go +++ b/cmd/rabtap/command_line.go @@ -37,17 +37,19 @@ Usage: rabtap (tap --uri=URI EXCHANGES)... [--saveto=DIR] [--format=FORMAT] [--limit=NUM] [-jknsv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--format=FORMAT] [--limit=NUM] - [(--reject [--requeue])] [-jksvn] + [--offset=OFFSET] [--args=KV]... [(--reject [--requeue])] [-jksvn] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--format=FORMAT] [--routingkey=KEY | (--header=KV)...] [--confirms] [--mandatory] [--delay=DELAY | --speed=FACTOR] [-jkv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] - rabtap exchange create EXCHANGE [--uri=URI] [--type=TYPE] [--args=KV]... [-adkv] + rabtap exchange create EXCHANGE [--uri=URI] [--type=TYPE] [--args=KV]... [-kv] + [--autodelete] [--durable] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap exchange rm EXCHANGE [--uri=URI] [-kv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] - rabtap queue create QUEUE [--uri=URI] [--args=KV]... [-adkv] + rabtap queue create QUEUE [--uri=URI] [--queue-type=TYPE] [--args=KV]... [-kv] + [--autodelete] [--durable] [--lazy] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap queue bind QUEUE to EXCHANGE [--uri=URI] [-kv] (--bindingkey=KEY | (--header=KV)... (--all|--any)) @@ -55,8 +57,10 @@ Usage: rabtap queue unbind QUEUE from EXCHANGE [--uri=URI] [-kv] (--bindingkey=KEY | (--header=KV)... (--all|--any)) [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] - rabtap queue rm QUEUE [--uri=URI] [-kv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] - rabtap queue purge QUEUE [--uri=URI] [-kv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] + rabtap queue rm QUEUE [--uri=URI] [-kv] + [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] + rabtap queue purge QUEUE [--uri=URI] [-kv] + [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap conn close CONNECTION [--api=APIURI] [--reason=REASON] [-kv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap --version @@ -97,6 +101,7 @@ Arguments and options: routing- or binding-key. Can occur multiple times. -j, --json deprecated. Use "--format json" instead. -k, --insecure allow insecure TLS connections (no certificate check). + --lazy create a lazy queue. --limit=NUM Stop afer NUM messages were received. When set to 0, will run until terminated [default: 0]. --mandatory enable mandatory publishing (messages must be delivered to queue). @@ -104,6 +109,11 @@ Arguments and options: [default: byExchange]. -n, --no-color don't colorize output (see also environment variable NO_COLOR). --omit-empty don't show echanges without bindings in info command. + --offset=OFFSET Offset when reading from a stream. Can be 'first', 'last', + 'next', a duration like '10m', a RFC3339-Timestamp or + an integer index value. Basically it is an alias for + '--args=x-stream-offset=OFFSET'. + --queue-type=TYPE type of queue [default: classic]. --reason=REASON reason why the connection was closed [default: closed by rabtap]. --reject Reject messages. Default behaviour is to acknowledge messages. --requeue Instruct broker to requeue rejected message @@ -115,7 +125,7 @@ Arguments and options: -s, --silent suppress message output to stdout. --speed=FACTOR Speed factor to use during publish [default: 1.0]. --stats include statistics in output of info command. - -t, --type=TYPE exchange type [default: fanout]. + -t, --type=TYPE type of exchange [default: fanout]. --tls-cert-file=CERTFILE A Cert file to use for client authentication. --tls-key-file=KEYFILE A Key file to use for client authentication. --tls-ca-file=CAFILE A CA Cert file to use with TLS. @@ -202,7 +212,7 @@ func parseKeyValue(expr string) (string, string, error) { func parseKeyValueList(exprs []string) (map[string]string, error) { if exprs == nil { - return nil, nil + return map[string]string{}, nil } res := make(map[string]string, len(exprs)) for _, expr := range exprs { @@ -249,7 +259,7 @@ type CommandLineArgs struct { QueueName string // queue create, remove, bind, sub QueueBindingKey string // queue bind ExchangeName string // exchange name create, remove or queue bind - ExchangeType string // exchange type create, remove or queue bind + ExchangeType string // exchange type create ShowConsumers bool // info: also show consumer InfoMode string // info: byExchange, byConnection ShowStats bool // info: also show statistics @@ -419,6 +429,10 @@ func parseSubCmdArgs(args map[string]interface{}) (CommandLineArgs, error) { } result.Limit = limit } + result.Args, err = parseKVListOption("--args", args) + if err != nil { + return result, err + } if args["--saveto"] != nil { saveDir := args["--saveto"].(string) @@ -427,6 +441,9 @@ func parseSubCmdArgs(args map[string]interface{}) (CommandLineArgs, error) { if result.AMQPURL, err = parseAMQPURL(args); err != nil { return result, err } + if offset := args["--offset"]; offset != nil { + result.Args["x-stream-offset"] = offset.(string) + } return result, nil } @@ -444,6 +461,17 @@ func parseKVListOption(name string, args map[string]interface{}) (map[string]str return map[string]string{}, nil } +func parseHeaderMode(args map[string]interface{}) HeaderMode { + switch { + case args["--any"].(bool): + return HeaderMatchAny + case args["--all"].(bool): + return HeaderMatchAll + default: + return HeaderNone + } +} + func parseQueueCmdArgs(args map[string]interface{}) (CommandLineArgs, error) { result := CommandLineArgs{ commonArgs: parseCommonArgs(args), @@ -462,6 +490,10 @@ func parseQueueCmdArgs(args map[string]interface{}) (CommandLineArgs, error) { if err != nil { return result, nil } + result.Args["x-queue-type"] = args["--queue-type"].(string) + if args["--lazy"].(bool) { + result.Args["x-queue-mode"] = "lazy" + } case args["rm"].(bool): result.Cmd = QueueRemoveCmd case args["bind"].(bool): @@ -474,14 +506,7 @@ func parseQueueCmdArgs(args map[string]interface{}) (CommandLineArgs, error) { if err != nil { return result, err } - if args["--any"].(bool) { - result.HeaderMode = HeaderMatchAny - } else if args["--all"].(bool) { - result.HeaderMode = HeaderMatchAll - } else { - result.HeaderMode = HeaderNone - } - + result.HeaderMode = parseHeaderMode(args) result.ExchangeName = args["EXCHANGE"].(string) case args["unbind"].(bool): @@ -492,13 +517,7 @@ func parseQueueCmdArgs(args map[string]interface{}) (CommandLineArgs, error) { if err != nil { return result, err } - if args["--any"].(bool) { - result.HeaderMode = HeaderMatchAny - } else if args["--all"].(bool) { - result.HeaderMode = HeaderMatchAll - } else { - result.HeaderMode = HeaderNone - } + result.HeaderMode = parseHeaderMode(args) result.ExchangeName = args["EXCHANGE"].(string) case args["purge"].(bool): result.Cmd = QueuePurgeCmd @@ -509,8 +528,7 @@ func parseQueueCmdArgs(args map[string]interface{}) (CommandLineArgs, error) { func parseExchangeCmdArgs(args map[string]interface{}) (CommandLineArgs, error) { result := CommandLineArgs{ commonArgs: parseCommonArgs(args), - ExchangeName: args["EXCHANGE"].(string), - ExchangeType: args["--type"].(string)} + ExchangeName: args["EXCHANGE"].(string)} var err error if result.AMQPURL, err = parseAMQPURL(args); err != nil { @@ -518,6 +536,7 @@ func parseExchangeCmdArgs(args map[string]interface{}) (CommandLineArgs, error) } switch { case args["create"].(bool): + result.ExchangeType = args["--type"].(string) result.Cmd = ExchangeCreateCmd result.Durable = args["--durable"].(bool) result.Autodelete = args["--autodelete"].(bool) diff --git a/cmd/rabtap/command_line_test.go b/cmd/rabtap/command_line_test.go index 6cfce27..66c5325 100644 --- a/cmd/rabtap/command_line_test.go +++ b/cmd/rabtap/command_line_test.go @@ -457,6 +457,12 @@ func TestCliPubCmdFromStdinWithJsonFormatDeprecatedIsRecognized(t *testing.T) { assert.False(t, args.InsecureTLS) } +func TestCliSubCmdOffsetSetsStreamOffsetArg(t *testing.T) { + args, err := ParseCommandLineArgs([]string{"sub", "queue", "--uri=uri", "--offset=123"}) + assert.NoError(t, err) + assert.Equal(t, args.Args["x-stream-offset"], "123") +} + func TestCliSubCmdInvalidFormatReturnsError(t *testing.T) { _, err := ParseCommandLineArgs([]string{"sub", "queue", "--uri=uri", "--format=invalid"}) assert.Error(t, err) @@ -485,22 +491,25 @@ func TestCliCreateQueue(t *testing.T) { args, err := ParseCommandLineArgs( []string{"queue", "create", "name", "--uri=uri", "--args=x=y"}) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, QueueCreateCmd, args.Cmd) assert.Equal(t, "name", args.QueueName) assertEqualURL(t, "uri", args.AMQPURL) - assert.Equal(t, map[string]string{"x": "y"}, args.Args) + assert.Equal(t, map[string]string{"x": "y", "x-queue-type": "classic"}, args.Args) assert.False(t, args.Durable) assert.False(t, args.Autodelete) } -func TestCliCreateDurableAutodeleteQueue(t *testing.T) { +func TestCliCreateQueueAllOptsSet(t *testing.T) { args, err := ParseCommandLineArgs( []string{"queue", "create", "name", "--uri=uri", - "--durable", "--autodelete"}) + "--durable", "--autodelete", "--lazy", "--queue-type=quorum"}) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, QueueCreateCmd, args.Cmd) + assert.Equal(t, "name", args.QueueName) + assertEqualURL(t, "uri", args.AMQPURL) + assert.Equal(t, map[string]string{"x-queue-type": "quorum", "x-queue-mode": "lazy"}, args.Args) assert.True(t, args.Durable) assert.True(t, args.Autodelete) } @@ -509,7 +518,7 @@ func TestCliRemoveQueue(t *testing.T) { args, err := ParseCommandLineArgs( []string{"queue", "rm", "name", "--uri", "uri"}) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, QueueRemoveCmd, args.Cmd) assert.Equal(t, "name", args.QueueName) assertEqualURL(t, "uri", args.AMQPURL) @@ -519,7 +528,7 @@ func TestCliPurgeQueue(t *testing.T) { args, err := ParseCommandLineArgs( []string{"queue", "purge", "name", "--uri", "uri"}) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, QueuePurgeCmd, args.Cmd) assert.Equal(t, "name", args.QueueName) assertEqualURL(t, "uri", args.AMQPURL) @@ -530,7 +539,7 @@ func TestCliUnbindQueue(t *testing.T) { []string{"queue", "unbind", "queuename", "from", "exchangename", "--bindingkey", "key", "--uri", "uri"}) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, QueueUnbindCmd, args.Cmd) assert.Equal(t, "queuename", args.QueueName) assert.Equal(t, "exchangename", args.ExchangeName) @@ -543,7 +552,7 @@ func TestCliBindQueueWithBindingKey(t *testing.T) { []string{"queue", "bind", "queuename", "to", "exchangename", "--bindingkey", "key", "--uri", "uri"}) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, QueueBindCmd, args.Cmd) assert.Equal(t, "queuename", args.QueueName) assert.Equal(t, "exchangename", args.ExchangeName) diff --git a/cmd/rabtap/main.go b/cmd/rabtap/main.go index 5275faa..9115796 100644 --- a/cmd/rabtap/main.go +++ b/cmd/rabtap/main.go @@ -168,6 +168,7 @@ func startCmdSubscribe(ctx context.Context, args CommandLineArgs) { tlsConfig: getTLSConfig(args.InsecureTLS, args.TLSCertFile, args.TLSKeyFile, args.TLSCaFile), messageReceiveFunc: messageReceiveFunc, messageReceiveLoopPred: pred, + args: args.Args, }) failOnError(err, "error subscribing messages", os.Exit) } diff --git a/cmd/rabtap/subscribe.go b/cmd/rabtap/subscribe.go index ac7b530..a95086c 100644 --- a/cmd/rabtap/subscribe.go +++ b/cmd/rabtap/subscribe.go @@ -29,7 +29,7 @@ type MessageReceiveFuncOptions struct { // MessageReceiveFunc processes receiced messages from a tap. type MessageReceiveFunc func(rabtap.TapMessage) error -//var ErrMessageLoopEnded = errors.New("message loop ended") +// var ErrMessageLoopEnded = errors.New("message loop ended") // messageReceiveLoopPred is called once for each a message that was received. // If it returns true, the subscriber loop continues, otherwise the loop @@ -75,6 +75,7 @@ func createAcknowledgeFunc(reject, requeue bool) AcknowledgeFunc { func messageReceiveLoop(ctx context.Context, messageChan rabtap.TapChannel, + errorChan rabtap.SubscribeErrorChannel, messageReceiveFunc MessageReceiveFunc, pred MessageReceiveLoopPred, acknowledger AcknowledgeFunc) error { @@ -85,10 +86,15 @@ func messageReceiveLoop(ctx context.Context, log.Debugf("subscribe: cancel") return nil + case err, more := <-errorChan: + if more { + log.Errorf("subscribe: %v", err) + } + case message, more := <-messageChan: if !more { log.Debug("subscribe: messageReceiveLoop: channel closed.") - return nil //ErrMessageLoopEnded + return nil // ErrMessageLoopEnded } log.Debugf("subscribe: messageReceiveLoop: new message %+v", message) diff --git a/cmd/rabtap/subscribe_test.go b/cmd/rabtap/subscribe_test.go index 208954e..3beb538 100644 --- a/cmd/rabtap/subscribe_test.go +++ b/cmd/rabtap/subscribe_test.go @@ -9,6 +9,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "io/ioutil" "os" "path" @@ -22,6 +23,68 @@ import ( "github.com/stretchr/testify/require" ) +// a mocked amqp.Acknowldger to test our AcknowledgeFunc +type MockAcknowledger struct { + // store values in a map so being able to manipulate in a value receiver + values map[string]bool +} + +func NewMockAcknowledger() MockAcknowledger { + return MockAcknowledger{values: map[string]bool{}} +} + +func (s MockAcknowledger) isAcked() bool { return s.values["acked"] } +func (s MockAcknowledger) isNacked() bool { return s.values["nacked"] } +func (s MockAcknowledger) isRequeued() bool { return s.values["requeued"] } + +func (s MockAcknowledger) Ack(tag uint64, multiple bool) error { + s.values["acked"] = true + return nil +} + +func (s MockAcknowledger) Nack(tag uint64, multiple, requeue bool) error { + s.values["nacked"] = true + s.values["requeued"] = requeue + return nil +} + +func (s MockAcknowledger) Reject(tag uint64, requeue bool) error { + s.values["nacked"] = true + s.values["requeued"] = requeue + return nil +} + +func TestCreateAcknowledgeFuncReturnedFuncCorreclyAcknowledgesTheMessage(t *testing.T) { + + testcases := []struct { + reject, requeue bool // given + isacked, isnacked, isrequeued bool // expected + }{ + {false, false, true, false, false}, + {false, true, true, false, false}, + {true, false, false, true, false}, + {true, true, false, true, true}, + } + + for i, tc := range testcases { + + // given + info := fmt.Sprintf("testcase %d, %+v", i, tc) + mock := NewMockAcknowledger() + ackFunc := createAcknowledgeFunc(tc.reject, tc.requeue) + msg := rabtap.TapMessage{AmqpMessage: &amqp.Delivery{Acknowledger: mock}} + + // when + err := ackFunc(msg) + + // then + assert.NoError(t, err) + assert.Equal(t, tc.isacked, mock.isAcked(), info) + assert.Equal(t, tc.isnacked, mock.isNacked(), info) + assert.Equal(t, tc.isrequeued, mock.isRequeued(), info) + } +} + func TestCreateCountingMessageReceivePredReturnsTrueIfNumIsZero(t *testing.T) { pred := createCountingMessageReceivePred(0) @@ -182,6 +245,7 @@ func TestCreateMessageReceiveFuncJSONNoPPToFile(t *testing.T) { func TestMessageReceiveLoopForwardsMessagesOnChannel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) messageChan := make(rabtap.TapChannel) + errorChan := make(rabtap.SubscribeErrorChannel) done := make(chan bool) received := 0 @@ -192,7 +256,7 @@ func TestMessageReceiveLoopForwardsMessagesOnChannel(t *testing.T) { } continuePred := func(rabtap.TapMessage) bool { return true } acknowledger := func(rabtap.TapMessage) error { return nil } - go func() { _ = messageReceiveLoop(ctx, messageChan, receiveFunc, continuePred, acknowledger) }() + go func() { _ = messageReceiveLoop(ctx, messageChan, errorChan, receiveFunc, continuePred, acknowledger) }() messageChan <- rabtap.TapMessage{} <-done // TODO add timeout @@ -203,11 +267,12 @@ func TestMessageReceiveLoopForwardsMessagesOnChannel(t *testing.T) { func TestMessageReceiveLoopExitsOnChannelClose(t *testing.T) { ctx := context.Background() messageChan := make(rabtap.TapChannel) + errorChan := make(rabtap.SubscribeErrorChannel) continuePred := func(rabtap.TapMessage) bool { return true } close(messageChan) acknowledger := func(rabtap.TapMessage) error { return nil } - err := messageReceiveLoop(ctx, messageChan, NullMessageReceiveFunc, continuePred, acknowledger) + err := messageReceiveLoop(ctx, messageChan, errorChan, NullMessageReceiveFunc, continuePred, acknowledger) assert.Nil(t, err) } @@ -215,11 +280,12 @@ func TestMessageReceiveLoopExitsOnChannelClose(t *testing.T) { func TestMessageReceiveLoopExitsWhenLoopPredReturnsFalse(t *testing.T) { ctx := context.Background() messageChan := make(rabtap.TapChannel, 1) + errorChan := make(rabtap.SubscribeErrorChannel) stopPred := func(rabtap.TapMessage) bool { return false } messageChan <- rabtap.TapMessage{} acknowledger := func(rabtap.TapMessage) error { return nil } - err := messageReceiveLoop(ctx, messageChan, NullMessageReceiveFunc, stopPred, acknowledger) + err := messageReceiveLoop(ctx, messageChan, errorChan, NullMessageReceiveFunc, stopPred, acknowledger) assert.Nil(t, err) } diff --git a/inttest/rabbitmq/docker-compose.yml b/inttest/rabbitmq/docker-compose.yml index f852e13..cd1bbec 100644 --- a/inttest/rabbitmq/docker-compose.yml +++ b/inttest/rabbitmq/docker-compose.yml @@ -1,7 +1,7 @@ version: '3' services: rabbitmq: - image: rabbitmq:3.8-management-alpine + image: rabbitmq:3.9-management-alpine volumes: - ./definitions.json:/etc/rabbitmq/definitions.json:z - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:z diff --git a/pkg/amqp_connector.go b/pkg/amqp_connector.go index 3ffb502..d2309f6 100644 --- a/pkg/amqp_connector.go +++ b/pkg/amqp_connector.go @@ -55,7 +55,7 @@ func (s *AmqpConnector) Connect(ctx context.Context, worker AmqpWorkerFunc) erro sub, more := <-session if !more { // closed. TODO propagate errors from redial() - return errors.New("initial connection failed") + return errors.New("session factory closed") } s.logger.Debugf("got new amqp session ...") action, err := worker(ctx, sub) @@ -63,9 +63,9 @@ func (s *AmqpConnector) Connect(ctx context.Context, worker AmqpWorkerFunc) erro s.logger.Errorf("worker failed with: %v", err) } if !action.shouldReconnect() { - if errClose := sub.Connection.Close(); errClose != nil { - s.logger.Errorf("connection close failed: %v", errClose) - } + // if errClose := sub.Connection.Close(); errClose != nil { + // s.logger.Errorf("connection close failed: %v", errClose) + // } return err } } diff --git a/pkg/amqp_message_loop.go b/pkg/amqp_message_loop.go index ec88904..c0c122b 100644 --- a/pkg/amqp_message_loop.go +++ b/pkg/amqp_message_loop.go @@ -1,9 +1,10 @@ -// Copyright (C) 2017 Jan Delgado +// Copyright (C) 2017-2021 Jan Delgado package rabtap import ( "context" + "fmt" "time" "github.com/streadway/amqp" @@ -12,33 +13,42 @@ import ( // amqpMessageLoop forwards incoming amqp messages from an "in" chan to an "out" // chan, transforming them into TapMessage objects. Can be terminated // using provided ctx or by closing the in chan. -func amqpMessageLoop(ctx context.Context, - out TapChannel, in <-chan interface{}) ReconnectAction { +func amqpMessageLoop( + ctx context.Context, + outCh TapChannel, + errOutCh SubscribeErrorChannel, + inCh <-chan interface{}) (ReconnectAction, error) { for { select { - case message, more := <-in: - if !more { - return doReconnect - } - // in is chan interface{} because we use FanIn.Ch - amqpMessage, ok := message.(amqp.Delivery) - if !ok { - panic("amqp.Delivery expected") + case message, more := <-inCh: + if !more { + return doReconnect, fmt.Errorf("no more messages") } - received := time.Now() - // Avoid blocking write to out when e.g. on the other end of the - // channel the user pressed Ctrl+S to stop console output - select { - case <-ctx.Done(): - return doNotReconnect - case out <- NewTapMessage(&amqpMessage, received): + switch msg := message.(type) { + + case *amqp.Error: + // TODO ctx? + errOutCh <- &SubscribeError{Reason: SubscribeErrorChannelError, Cause: msg} + + case amqp.Delivery: + received := time.Now() + // Avoid blocking write to out when e.g. on the other end of the + // channel the user pressed Ctrl+S to stop console output + // TODO ctx.Done really needed? + select { + case <-ctx.Done(): + return doNotReconnect, nil + case outCh <- NewTapMessage(&msg, received): + } + default: + panic("unknown message type") } case <-ctx.Done(): - return doNotReconnect + return doNotReconnect, nil } } } diff --git a/pkg/amqp_message_loop_test.go b/pkg/amqp_message_loop_test.go index 5abbe57..fce8569 100644 --- a/pkg/amqp_message_loop_test.go +++ b/pkg/amqp_message_loop_test.go @@ -1,3 +1,4 @@ +// (c) copyright jan delgado 2017-2021 package rabtap import ( @@ -14,9 +15,10 @@ func TestAmqpMessageLoopPanicsWithInvalidMessage(t *testing.T) { out := make(TapChannel) in := make(chan interface{}) done := make(chan bool) + errOut := make(SubscribeErrorChannel) go func() { - assert.Panics(t, func() { amqpMessageLoop(ctx, out, in) }, "did not panic") + assert.Panics(t, func() { _, _ = amqpMessageLoop(ctx, out, errOut, in) }, "did not panic") done <- true }() @@ -38,9 +40,11 @@ func TestAmqpMessageLoopTerminatesWhenInputChannelIsClosed(t *testing.T) { out := make(TapChannel) in := make(chan interface{}) done := make(chan ReconnectAction) + errOut := make(SubscribeErrorChannel) go func() { - done <- amqpMessageLoop(ctx, out, in) + result, _ := amqpMessageLoop(ctx, out, errOut, in) + done <- result }() close(in) @@ -58,9 +62,11 @@ func TestAmqpMessageLoopCancelBlockingWrite(t *testing.T) { out := make(TapChannel) in := make(chan interface{}, 5) done := make(chan ReconnectAction) + errOut := make(SubscribeErrorChannel) go func() { - done <- amqpMessageLoop(ctx, out, in) + result, _ := amqpMessageLoop(ctx, out, errOut, in) + done <- result }() in <- amqp.Delivery{} @@ -79,14 +85,64 @@ func TestAmqpMessageLoopCancelBlockingWrite(t *testing.T) { } +func TestAmqpMessageLoopForwardsAMessage(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + out := make(TapChannel) + in := make(chan interface{}) + done := make(chan ReconnectAction) + errOut := make(SubscribeErrorChannel) + + go func() { + result, _ := amqpMessageLoop(ctx, out, errOut, in) + done <- result + }() + + expected := amqp.Delivery{} + in <- expected + + select { + case msg := <-out: + assert.Equal(t, expected, *msg.AmqpMessage) + case <-time.After(2 * time.Second): + assert.Fail(t, "amqpMessageLoop() did not terminate") + } + cancel() +} + +func TestAmqpMessageLoopForwardsAnError(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + out := make(TapChannel) + in := make(chan interface{}) + done := make(chan ReconnectAction) + errOut := make(SubscribeErrorChannel) + + go func() { + result, _ := amqpMessageLoop(ctx, out, errOut, in) + done <- result + }() + + expected := &amqp.Error{} + in <- expected + + select { + case err := <-errOut: + assert.Equal(t, SubscribeError{Reason: SubscribeErrorChannelError, Cause: expected}, *err) + case <-time.After(2 * time.Second): + assert.Fail(t, "amqpMessageLoop() did not terminate") + } + cancel() +} + func TestAmqpMessageLoopCancelBlockingRead(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) out := make(TapChannel) in := make(chan interface{}) done := make(chan ReconnectAction) + errOut := make(SubscribeErrorChannel) go func() { - done <- amqpMessageLoop(ctx, out, in) + result, _ := amqpMessageLoop(ctx, out, errOut, in) + done <- result }() cancel() @@ -97,5 +153,4 @@ func TestAmqpMessageLoopCancelBlockingRead(t *testing.T) { case <-time.After(2 * time.Second): assert.Fail(t, "amqpMessageLoop() did not terminate") } - } diff --git a/pkg/key_value.go b/pkg/key_value.go index 2cf4bf1..31ba02b 100644 --- a/pkg/key_value.go +++ b/pkg/key_value.go @@ -1,13 +1,34 @@ package rabtap -import "github.com/streadway/amqp" +import ( + "strconv" + "time" + "github.com/streadway/amqp" +) + +// KeyValueMap is a string -> string map used to store key value +// pairs defined on the command line. type KeyValueMap map[string]string +// ToAMQPTable converts a KeyValueMap to an amqp.Table, trying to +// infer data types: +// - integers +// - timestamps in RFC3339 format +// - strings (default) func ToAMQPTable(headers KeyValueMap) amqp.Table { table := amqp.Table{} + for k, v := range headers { - table[k] = v + var val interface{} + if i, err := strconv.Atoi(v); err == nil { + val = i + } else if d, err := time.Parse(time.RFC3339, v); err == nil { + val = d + } else { + val = v + } + table[k] = val } return table } diff --git a/pkg/key_value_test.go b/pkg/key_value_test.go new file mode 100644 index 0000000..7e0ff4e --- /dev/null +++ b/pkg/key_value_test.go @@ -0,0 +1,27 @@ +package rabtap + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestToAmqpTableInfersInteger(t *testing.T) { + m := KeyValueMap{"x": "123"} + + assert.Equal(t, int(123), ToAMQPTable(m)["x"]) +} + +func TestToAmqpTableInfersRFC3339Timestamp(t *testing.T) { + m := KeyValueMap{"x": "2021-11-18T23:05:02-02:00"} + ts, _ := time.Parse(time.RFC3339, "2021-11-18T23:05:02-02:00") + + assert.Equal(t, ts, ToAMQPTable(m)["x"]) +} + +func TestToAmqpTableFallsBackToString(t *testing.T) { + m := KeyValueMap{"x": "hello"} + + assert.Equal(t, "hello", ToAMQPTable(m)["x"]) +} diff --git a/pkg/subscribe.go b/pkg/subscribe.go index c2a6c33..013e9eb 100644 --- a/pkg/subscribe.go +++ b/pkg/subscribe.go @@ -1,10 +1,12 @@ -// Copyright (C) 2017 Jan Delgado +// subscribe to message queues +// Copyright (C) 2017-2021 Jan Delgado package rabtap import ( "context" "crypto/tls" + "fmt" "net/url" "time" @@ -18,7 +20,7 @@ const PrefetchSize = 0 // AmqpSubscriberConfig stores configuration of the subscriber type AmqpSubscriberConfig struct { Exclusive bool - AutoAck bool + Args amqp.Table } // AmqpSubscriber allows to tap to subscribe to queues @@ -28,6 +30,31 @@ type AmqpSubscriber struct { logger Logger } +type SubscribeErrorReason int + +const ( + SubscribeErrorChannelError SubscribeErrorReason = iota +) + +// SubscribeError is sent back trough the error channel when there are problems +// during the subsription of messages +type SubscribeError struct { + Reason SubscribeErrorReason + // Cause holds the error when a ChannelError happened + Cause error +} + +type SubscribeErrorChannel chan *SubscribeError + +func (s *SubscribeError) Error() string { + switch s.Reason { + case SubscribeErrorChannelError: + return fmt.Sprintf("channel error: %s", s.Cause) + default: + return "unexpected error" + } +} + // NewAmqpSubscriber returns a new AmqpSubscriber object associated with the // RabbitMQ broker denoted by the uri parameter. func NewAmqpSubscriber(config AmqpSubscriberConfig, url *url.URL, tlsConfig *tls.Config, logger Logger) *AmqpSubscriber { @@ -54,22 +81,34 @@ type TapChannel chan TapMessage // EstablishSubscription sets up the connection to the broker and sets up // the tap, which is bound to the provided consumer function. Typically // this function is run as a go-routine. -func (s *AmqpSubscriber) EstablishSubscription(ctx context.Context, queueName string, tapCh TapChannel) error { - return s.connection.Connect(ctx, s.createWorkerFunc(queueName, tapCh)) +// +// queueName is the queue to subscribe to. tapCh is where the consumed messages +// are sent to. errCh is the channel where errors are sent to. +// +func (s *AmqpSubscriber) EstablishSubscription( + ctx context.Context, + queueName string, + tapCh TapChannel, + errCh SubscribeErrorChannel) error { + return s.connection.Connect(ctx, s.createWorkerFunc(queueName, tapCh, errCh)) } func (s *AmqpSubscriber) createWorkerFunc( - queueName string, tapCh TapChannel) AmqpWorkerFunc { + queueName string, + outCh TapChannel, + errOutCh SubscribeErrorChannel) AmqpWorkerFunc { return func(ctx context.Context, session Session) (ReconnectAction, error) { ch, err := s.consumeMessages(session, queueName) if err != nil { return doNotReconnect, err } - // messageLoop expects Fanin object, which expects array of channels. - var channels []interface{} - fanin := NewFanin(append(channels, ch)) - return amqpMessageLoop(ctx, tapCh, fanin.Ch), nil + + // also subscribe to channel close notifications + amqpErrorCh := session.Channel.NotifyClose(make(chan *amqp.Error, 1)) + fanin := NewFanin([]interface{}{ch, amqpErrorCh}) + + return amqpMessageLoop(ctx, outCh, errOutCh, fanin.Ch) } } @@ -81,17 +120,13 @@ func (s *AmqpSubscriber) consumeMessages(session Session, return nil, err } - msgs, err := session.Consume( + return session.Consume( queueName, "__rabtap-consumer-"+uuid.Must(uuid.NewRandom()).String()[:8], // TODO param - s.config.AutoAck, + false, // no auto-ack s.config.Exclusive, false, // no-local - unsupported false, // wait - nil, // args + s.config.Args, ) - if err != nil { - return nil, err - } - return msgs, nil } diff --git a/pkg/subscribe_test.go b/pkg/subscribe_test.go index 0b4a3b0..0ec3b72 100644 --- a/pkg/subscribe_test.go +++ b/pkg/subscribe_test.go @@ -1,4 +1,4 @@ -// Copyright (C) 2017 Jan Delgado +// Copyright (C) 2017-2021 Jan Delgado // +build integration package rabtap @@ -13,9 +13,12 @@ import ( "github.com/streadway/amqp" ) -func TestSubscribe(t *testing.T) { +func TestSubscribeReceivesMessages(t *testing.T) { + + // given // establish sending exchange. + messagesPerTest := 5 conn, ch := testcommon.IntegrationTestConnection(t, "subtest-direct-exchange", "direct", 0, false) session := Session{conn, ch} defer conn.Close() @@ -31,13 +34,14 @@ func TestSubscribe(t *testing.T) { finishChan := make(chan int) - config := AmqpSubscriberConfig{Exclusive: false, AutoAck: true} + config := AmqpSubscriberConfig{Exclusive: false} log := testcommon.NewTestLogger() subscriber := NewAmqpSubscriber(config, testcommon.IntegrationURIFromEnv(), &tls.Config{}, log) resultChannel := make(TapChannel) + resultErrChannel := make(SubscribeErrorChannel) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go subscriber.EstablishSubscription(ctx, queueName, resultChannel) + go subscriber.EstablishSubscription(ctx, queueName, resultChannel, resultErrChannel) go func() { numReceived := 0 @@ -50,6 +54,7 @@ func TestSubscribe(t *testing.T) { finishChan <- numReceived return case message := <-resultChannel: + message.AmqpMessage.Ack(false) if message.AmqpMessage != nil { if string(message.AmqpMessage.Body) == "Hello" { numReceived++ @@ -61,7 +66,9 @@ func TestSubscribe(t *testing.T) { time.Sleep(TapReadyDelay) - // inject messages into exchange. - testcommon.PublishTestMessages(t, ch, MessagesPerTest, "subtest-direct-exchange", queueName, nil) - requireIntFromChan(t, finishChan, MessagesPerTest) + // when: inject messages into exchange. + testcommon.PublishTestMessages(t, ch, messagesPerTest, "subtest-direct-exchange", queueName, nil) + + // then + requireIntFromChan(t, finishChan, messagesPerTest) } diff --git a/pkg/tap.go b/pkg/tap.go index 5be7503..69b240c 100644 --- a/pkg/tap.go +++ b/pkg/tap.go @@ -1,4 +1,4 @@ -// Copyright (C) 2017-2019 Jan Delgado +// Copyright (C) 2017-2021 Jan Delgado // RabbitMQ wire-tap. Functions to hook to exchanges and to keep the // connection to the broker alive in case of connection errors. @@ -23,7 +23,7 @@ type AmqpTap struct { // NewAmqpTap returns a new AmqpTap object associated with the RabbitMQ // broker denoted by the uri parameter. func NewAmqpTap(url *url.URL, tlsConfig *tls.Config, logger Logger) *AmqpTap { - config := AmqpSubscriberConfig{Exclusive: true, AutoAck: true} + config := AmqpSubscriberConfig{Exclusive: true} return &AmqpTap{ AmqpSubscriber: NewAmqpSubscriber(config, url, tlsConfig, logger)} } @@ -39,32 +39,38 @@ func getTapQueueNameForExchange(exchange, postfix string) string { // EstablishTap sets up the connection to the broker and sets up // the tap, which is bound to the provided consumer function. Typically // this function is run as a go-routine. -func (s *AmqpTap) EstablishTap(ctx context.Context, exchangeConfigList []ExchangeConfiguration, - tapCh TapChannel) error { - return s.connection.Connect(ctx, s.createWorkerFunc(exchangeConfigList, tapCh)) +func (s *AmqpTap) EstablishTap( + ctx context.Context, + exchangeConfigList []ExchangeConfiguration, + tapCh TapChannel, + errorCh SubscribeErrorChannel) error { + return s.connection.Connect(ctx, s.createWorkerFunc(exchangeConfigList, tapCh, errorCh)) } func (s *AmqpTap) createWorkerFunc( exchangeConfigList []ExchangeConfiguration, - tapCh TapChannel) AmqpWorkerFunc { + outCh TapChannel, + errOutCh SubscribeErrorChannel) AmqpWorkerFunc { return func(ctx context.Context, session Session) (ReconnectAction, error) { - amqpChs, err := s.setupTapsForExchanges(session, exchangeConfigList, tapCh) + tappedChs, err := s.setupTapsForExchanges(session, exchangeConfigList) if err != nil { return doNotReconnect, err } - fanin := NewFanin(amqpChs) + + // also subscribe to channel close notifications + amqpErrorCh := session.Channel.NotifyClose(make(chan *amqp.Error, 1)) + + fanin := NewFanin(append(tappedChs, amqpErrorCh)) defer func() { _ = fanin.Stop() }() - action := amqpMessageLoop(ctx, tapCh, fanin.Ch) - return action, nil + return amqpMessageLoop(ctx, outCh, errOutCh, fanin.Ch) } } func (s *AmqpTap) setupTapsForExchanges( session Session, - exchangeConfigList []ExchangeConfiguration, - tapCh TapChannel) ([]interface{}, error) { + exchangeConfigList []ExchangeConfiguration) ([]interface{}, error) { var channels []interface{} diff --git a/pkg/tap_test.go b/pkg/tap_test.go index 26dca98..fdff120 100644 --- a/pkg/tap_test.go +++ b/pkg/tap_test.go @@ -1,5 +1,6 @@ -// Copyright (C) 2017 Jan Delgado +// Copyright (C) 2017-2021 Jan Delgado // +build integration +// TODO rewrite package rabtap @@ -20,9 +21,8 @@ import ( ) const ( - MessagesPerTest = 5 - ResultTimeout = time.Second * 5 - TapReadyDelay = time.Millisecond * 500 + ResultTimeout = time.Second * 5 + TapReadyDelay = time.Millisecond * 500 ) func TestGetTapQueueNameForExchange(t *testing.T) { @@ -36,6 +36,7 @@ func TestGetTapEchangeNameForExchange(t *testing.T) { assert.Equal(t, "__tap-exchange-for-exchange-1234", getTapExchangeNameForExchange("exchange", "1234")) } + func verifyMessagesOnTap(t *testing.T, consumer string, numExpected int, tapExchangeName, tapQueueName string, success chan<- int) *AmqpTap { @@ -43,13 +44,16 @@ func verifyMessagesOnTap(t *testing.T, consumer string, numExpected int, log := testcommon.NewTestLogger() tap := NewAmqpTap(testcommon.IntegrationURIFromEnv(), &tls.Config{}, log) resultChannel := make(TapChannel) + resultErrChannel := make(SubscribeErrorChannel) + // TODO cancel and return cancel func ctx, cancel := context.WithCancel(context.Background()) go tap.EstablishTap( ctx, []ExchangeConfiguration{ {tapExchangeName, tapQueueName}}, - resultChannel) + resultChannel, + resultErrChannel) func() { numReceived := 0 @@ -62,6 +66,7 @@ func verifyMessagesOnTap(t *testing.T, consumer string, numExpected int, success <- numReceived return case message := <-resultChannel: + message.AmqpMessage.Ack(false) if message.AmqpMessage != nil { if string(message.AmqpMessage.Body) == "Hello" { numReceived++ @@ -86,6 +91,8 @@ func requireIntFromChan(t *testing.T, c <-chan int, expected int) { func TestIntegrationHeadersExchange(t *testing.T) { + messagesPerTest := 5 + // establish sending exchange conn, ch := testcommon.IntegrationTestConnection(t, "headers-exchange", "headers", 2, true) defer conn.Close() @@ -93,7 +100,7 @@ func TestIntegrationHeadersExchange(t *testing.T) { finishChan := make(chan int) // no binding key is needed for the headers exchange - go verifyMessagesOnTap(t, "tap-consumer1", MessagesPerTest, "headers-exchange", "", finishChan) + go verifyMessagesOnTap(t, "tap-consumer1", messagesPerTest, "headers-exchange", "", finishChan) time.Sleep(TapReadyDelay) // inject messages into exchange. Each message should become visible @@ -101,13 +108,13 @@ func TestIntegrationHeadersExchange(t *testing.T) { // must provide a amqp.Table struct with the messages headers, on which // routing is based. See integrationTestConnection() on how the routing // header is constructed. - testcommon.PublishTestMessages(t, ch, MessagesPerTest, "headers-exchange", "", amqp.Table{"header1": "test0"}) + testcommon.PublishTestMessages(t, ch, messagesPerTest, "headers-exchange", "", amqp.Table{"header1": "test0"}) - requireIntFromChan(t, finishChan, MessagesPerTest) + requireIntFromChan(t, finishChan, messagesPerTest) // the original messages should also be delivered. - testcommon.VerifyTestMessageOnQueue(t, ch, "consumer2", MessagesPerTest, "queue-0", finishChan) - requireIntFromChan(t, finishChan, MessagesPerTest) + testcommon.VerifyTestMessageOnQueue(t, ch, "consumer2", messagesPerTest, "queue-0", finishChan) + requireIntFromChan(t, finishChan, messagesPerTest) } func TestIntegrationDirectExchange(t *testing.T) { @@ -119,21 +126,21 @@ func TestIntegrationDirectExchange(t *testing.T) { finishChan := make(chan int) // connect a test-tap and check if we received the test message - MessagesPerTest := MessagesPerTest + messagesPerTest := 5 - go verifyMessagesOnTap(t, "tap-consumer1", MessagesPerTest, "direct-exchange", "queue-0", finishChan) + go verifyMessagesOnTap(t, "tap-consumer1", messagesPerTest, "direct-exchange", "queue-0", finishChan) time.Sleep(TapReadyDelay) // inject messages into exchange. Each message should become visible // in the tap-exchange defined above. - testcommon.PublishTestMessages(t, ch, MessagesPerTest, "direct-exchange", "queue-0", nil) + testcommon.PublishTestMessages(t, ch, messagesPerTest, "direct-exchange", "queue-0", nil) - requireIntFromChan(t, finishChan, MessagesPerTest) + requireIntFromChan(t, finishChan, messagesPerTest) // the original messages should also be delivered. - testcommon.VerifyTestMessageOnQueue(t, ch, "consumer2", MessagesPerTest, "queue-0", finishChan) - requireIntFromChan(t, finishChan, MessagesPerTest) + testcommon.VerifyTestMessageOnQueue(t, ch, "consumer2", messagesPerTest, "queue-0", finishChan) + requireIntFromChan(t, finishChan, messagesPerTest) } // TestIntegrationTopicExchangeTapSingleQueue tests tapping to a topic @@ -148,26 +155,26 @@ func TestIntegrationTopicExchangeTapSingleQueue(t *testing.T) { finishChan := make(chan int) // connect a test-tap and check if we received the test message - MessagesPerTest := MessagesPerTest + messagesPerTest := 5 // tap only messages routed to queue-0 - go verifyMessagesOnTap(t, "tap-consumer1", MessagesPerTest, "topic-exchange", "queue-0", finishChan) + go verifyMessagesOnTap(t, "tap-consumer1", messagesPerTest, "topic-exchange", "queue-0", finishChan) time.Sleep(TapReadyDelay) // inject messages into exchange. Each message should become visible // in the tap-exchange defined above. - testcommon.PublishTestMessages(t, ch, MessagesPerTest, "topic-exchange", "queue-0", nil) - testcommon.PublishTestMessages(t, ch, MessagesPerTest, "topic-exchange", "queue-1", nil) + testcommon.PublishTestMessages(t, ch, messagesPerTest, "topic-exchange", "queue-0", nil) + testcommon.PublishTestMessages(t, ch, messagesPerTest, "topic-exchange", "queue-1", nil) - requireIntFromChan(t, finishChan, MessagesPerTest) + requireIntFromChan(t, finishChan, messagesPerTest) // the original messages should also be delivered. - testcommon.VerifyTestMessageOnQueue(t, ch, "consumer2", MessagesPerTest, "queue-0", finishChan) - requireIntFromChan(t, finishChan, MessagesPerTest) + testcommon.VerifyTestMessageOnQueue(t, ch, "consumer2", messagesPerTest, "queue-0", finishChan) + requireIntFromChan(t, finishChan, messagesPerTest) - testcommon.VerifyTestMessageOnQueue(t, ch, "consumer3", MessagesPerTest, "queue-1", finishChan) - requireIntFromChan(t, finishChan, MessagesPerTest) + testcommon.VerifyTestMessageOnQueue(t, ch, "consumer3", messagesPerTest, "queue-1", finishChan) + requireIntFromChan(t, finishChan, messagesPerTest) } // TestIntegrationTopicExchangeTapWildcard tests tapping to an exechange @@ -181,26 +188,26 @@ func TestIntegrationTopicExchangeTapWildcard(t *testing.T) { finishChan := make(chan int) // connect a test-tap and check if we received the test message - MessagesPerTest := MessagesPerTest + messagesPerTest := 5 // tap all messages on the exchange - go verifyMessagesOnTap(t, "tap-consumer1", MessagesPerTest*2, "topic-exchange", "#", finishChan) + go verifyMessagesOnTap(t, "tap-consumer1", messagesPerTest*2, "topic-exchange", "#", finishChan) time.Sleep(TapReadyDelay) // inject messages into exchange. Each message should become visible // in the tap-exchange defined above. - testcommon.PublishTestMessages(t, ch, MessagesPerTest, "topic-exchange", "queue-0", nil) - testcommon.PublishTestMessages(t, ch, MessagesPerTest, "topic-exchange", "queue-1", nil) + testcommon.PublishTestMessages(t, ch, messagesPerTest, "topic-exchange", "queue-0", nil) + testcommon.PublishTestMessages(t, ch, messagesPerTest, "topic-exchange", "queue-1", nil) - requireIntFromChan(t, finishChan, MessagesPerTest*2) + requireIntFromChan(t, finishChan, messagesPerTest*2) // the original messages should also be delivered. - testcommon.VerifyTestMessageOnQueue(t, ch, "consumer2", MessagesPerTest, "queue-0", finishChan) - requireIntFromChan(t, finishChan, MessagesPerTest) + testcommon.VerifyTestMessageOnQueue(t, ch, "consumer2", messagesPerTest, "queue-0", finishChan) + requireIntFromChan(t, finishChan, messagesPerTest) - testcommon.VerifyTestMessageOnQueue(t, ch, "consumer3", MessagesPerTest, "queue-1", finishChan) - requireIntFromChan(t, finishChan, MessagesPerTest) + testcommon.VerifyTestMessageOnQueue(t, ch, "consumer3", messagesPerTest, "queue-1", finishChan) + requireIntFromChan(t, finishChan, messagesPerTest) } // TestIntegrationInvalidExchange tries to tap to a non existing exhange, we @@ -208,6 +215,7 @@ func TestIntegrationTopicExchangeTapWildcard(t *testing.T) { func TestIntegrationInvalidExchange(t *testing.T) { tapMessages := make(TapChannel) + errChannel := make(SubscribeErrorChannel) log := testcommon.NewTestLogger() tap := NewAmqpTap(testcommon.IntegrationURIFromEnv(), &tls.Config{}, log) ctx := context.Background() @@ -215,7 +223,8 @@ func TestIntegrationInvalidExchange(t *testing.T) { ctx, []ExchangeConfiguration{ {"nonexisting-exchange", "test"}}, - tapMessages) + tapMessages, + errChannel) assert.NotNil(t, err) }