Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in config option to purge all queues on startup. #61

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Emque Consuming CHANGELOG

- [Add in a configuration option to purge queues on startup](https://github.com/emque/emque-consuming/pull/61) 1.3.0
- [Add error logging when an exception is thrown.](https://github.com/emque/emque-consuming/pull/65) 1.2.3
- [Remove double ack when consuming a message and ending up in an error state. This was causing consumers to die silently.](https://github.com/emque/emque-consuming/pull/59) 1.2.1
- [Add in the ability to retry errors and back off with an exponential delay](https://github.com/emque/emque-consuming/pull/55) 1.2.0
Expand Down
2 changes: 2 additions & 0 deletions lib/emque/consuming/adapters/rabbit_mq/manager.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
require "bunny"
require_relative "error_worker"
require_relative "delayed_message_worker"

module Emque
module Consuming
Expand Down
27 changes: 16 additions & 11 deletions lib/emque/consuming/adapters/rabbit_mq/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,22 @@ def initialize(connection, topic)

self.queue =
channel
.queue(
"emque.#{config.app_name}.#{topic}",
:durable => config.adapter.options[:durable],
:auto_delete => config.adapter.options[:auto_delete],
:arguments => {
"x-dead-letter-exchange" => "#{config.app_name}.error"
}
)
.bind(
channel.fanout(topic, :durable => true, :auto_delete => false)
)
.queue(
"emque.#{config.app_name}.#{topic}",
:durable => config.adapter.options[:durable],
:auto_delete => config.adapter.options[:auto_delete],
:arguments => {
"x-dead-letter-exchange" => "#{config.app_name}.error"
}
)
.bind(
channel.fanout(topic, :durable => true, :auto_delete => false)
)

if config.purge_queues_on_start
logger.info "#{log_prefix} is purging it's queue"
queue.purge
end
end

def start
Expand Down
8 changes: 5 additions & 3 deletions lib/emque/consuming/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ module Consuming
class Configuration
attr_accessor :app_name, :adapter, :auto_shutdown,
:delayed_message_workers, :enable_delayed_message, :error_handlers,
:error_limit, :error_expiration, :retryable_errors,
:retryable_error_limit, :status, :status_port, :status_host,
:socket_path, :shutdown_handlers
:error_limit, :error_expiration, :purge_queues_on_start,
:retryable_errors, :retryable_error_limit, :status, :status_port,
:status_host, :socket_path, :shutdown_handlers
attr_writer :env, :log_level

def initialize
Expand All @@ -19,6 +19,7 @@ def initialize
@error_limit = 5
@error_expiration = 3600 # 60 minutes
@log_level = nil
@purge_queues_on_start = false
@retryable_errors = []
@retryable_error_limit = 3
@status_port = 10000
Expand Down Expand Up @@ -56,6 +57,7 @@ def to_hsh
:error_handlers,
:error_limit,
:error_expiration,
:purge_queues_on_start,
:log_level,
:retryable_errors,
:retryable_error_limit,
Expand Down
2 changes: 1 addition & 1 deletion lib/emque/consuming/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Emque
module Consuming
VERSION = "1.2.3"
VERSION = "1.3.0"
end
end
2 changes: 1 addition & 1 deletion lib/templates/config/application.rb.tt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ module <%= @name %>
[].tap { |options_out|
if @options.has_key?(:app_name)
options_out << "config.app_name = \"#{@options[:app_name]}\""
end
end
if @options.has_key?(:error_limit)
options_out << "config.error_limit = #{@options[:error_limit]}"
end
Expand Down
95 changes: 95 additions & 0 deletions spec/adapters/rabbit_mq/worker/worker_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
require "spec_helper"
require "emque/consuming/adapters/rabbit_mq/worker"
require "pry"

module Emque
module Consuming
module Adapters
module RabbitMq
class Worker
def start
logger.info "#{log_prefix} starting..."
logger.info "RabbitMQ Worker: Skipping consuming during test."
logger.debug "#{log_prefix} started"
end
end
end
end
end
end

describe Emque::Consuming::Adapters::RabbitMq::Worker do
describe "#initialize" do
before do
@connection = Bunny.new
@connection.start
@channel = @connection.create_channel
@channel.queue_delete("emque.dummy.spec")
@fanout = @channel.fanout("dummy.spec",
:durable => true,
:auto_delete => false
)
@queue = @channel
.queue("emque.dummy.spec", {
:durable => true,
:auto_delete => false,
:arguments => {
"x-dead-letter-exchange" => "dummy.error"
}
}).bind(@fanout)
@queue.publish(Oj.dump({
:metadata => {
:topic => "spec",
:type => "dummy.spec"
}
}))
end

after do
@channel.queue_delete("emque.dummy.spec")
@connection.close
end

it "should not purge queues on start" do
Dummy::Application.config.purge_queues_on_start = false
Dummy::Application.config.set_adapter(:rabbit_mq)
Dummy::Application.router.map do
topic "spec" => SpecConsumer do; end
end
app = Dummy::Application.new
connection = Bunny.new
connection.start
connection.with_channel do |channel|
@queue = channel.queue("emque.dummy.spec", :passive => true)
expect(@queue.message_count).to eq(1)
end
app.start
sleep 0.3
connection.with_channel do |channel|
@queue = channel.queue("emque.dummy.spec", :passive => true)
expect(@queue.message_count).to eq(1)
end
end

it "should purge queues on start" do
Dummy::Application.config.purge_queues_on_start = true
Dummy::Application.config.set_adapter(:rabbit_mq)
Dummy::Application.router.map do
topic "spec" => SpecConsumer do; end
end
app = Dummy::Application.new
connection = Bunny.new
connection.start
connection.with_channel do |channel|
@queue = channel.queue("emque.dummy.spec", :passive => true)
expect(@queue.message_count).to eq(1)
end
app.start
sleep 0.3
connection.with_channel do |channel|
@queue = channel.queue("emque.dummy.spec", :passive => true)
expect(@queue.message_count).to eq(0)
end
end
end
end
1 change: 0 additions & 1 deletion spec/application_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@

app.notice_error({ :test => "failure" })
app.notice_error({ :test => "another failure" })

end
end
end
19 changes: 16 additions & 3 deletions spec/configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,19 @@
end
end

describe "purge_queues_on_start" do
it "has a default" do
config = Emque::Consuming::Configuration.new
expect(config.purge_queues_on_start).to eq(false)
end

it "prefers the assigned value" do
config = Emque::Consuming::Configuration.new
config.purge_queues_on_start = true
expect(config.purge_queues_on_start).to eq(true)
end
end

describe "#to_hsh" do
it "returns a hash" do
config = Emque::Consuming::Configuration.new
Expand All @@ -63,9 +76,9 @@
accessors = [
:app_name, :adapter, :auto_shutdown, :delayed_message_workers,
:env, :enable_delayed_message, :error_handlers, :error_limit,
:error_expiration, :log_level, :retryable_errors,
:retryable_error_limit, :status_port, :status_host, :status,
:socket_path, :shutdown_handlers
:error_expiration, :purge_queues_on_start, :log_level,
:retryable_errors, :retryable_error_limit, :status_port, :status_host,
:status, :socket_path, :shutdown_handlers
]
config = Emque::Consuming::Configuration.new

Expand Down
10 changes: 9 additions & 1 deletion spec/dummy/config/application.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
require "emque/consuming"
require "emque/consuming/adapters/rabbit_mq/manager"

ENV["EMQUE_ENV"] = "test"

module Emque
module Consuming
module Adapters
module RabbitMq
def self.default_options; { :durable => true }; end
def self.load; end
def self.manager
Emque::Consuming::Adapters::RabbitMq::Manager
end
end
module TestAdapter
def self.default_options; {}; end
def self.default_options; { :durable => true }; end
def self.load; end
def self.manager
Emque::Consuming::Adapters::TestAdapter::Manager
Expand Down
3 changes: 3 additions & 0 deletions spec/dummy/consumers/spec_consumer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class SpecConsumer
include Emque::Consuming.consumer
end
1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
require "timecop"
require "fileutils"
require_relative "dummy/config/application"
require_relative "dummy/consumers/spec_consumer"

module VerifyAndResetHelpers
def verify(object)
Expand Down