Skip to content

Commit

Permalink
Add in config option to purge all queues on startup.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tyler Knappe committed Nov 1, 2016
1 parent 348adc7 commit 08db6b8
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 17 deletions.
27 changes: 16 additions & 11 deletions lib/emque/consuming/adapters/rabbit_mq/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,22 @@ def initialize(connection, topic)

self.queue =
channel
.queue(
"emque.#{config.app_name}.#{topic}",
:durable => config.adapter.options[:durable],
:auto_delete => config.adapter.options[:auto_delete],
:arguments => {
"x-dead-letter-exchange" => "#{config.app_name}.error"
}
)
.bind(
channel.fanout(topic, :durable => true, :auto_delete => false)
)
.queue(
"emque.#{config.app_name}.#{topic}",
:durable => config.adapter.options[:durable],
:auto_delete => config.adapter.options[:auto_delete],
:arguments => {
"x-dead-letter-exchange" => "#{config.app_name}.error"
}
)
.bind(
channel.fanout(topic, :durable => true, :auto_delete => false)
)

if config.purge_queues_on_start
logger.info "#{log_prefix} is purging it's queue"
queue.purge
end
end

def start
Expand Down
8 changes: 5 additions & 3 deletions lib/emque/consuming/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ module Consuming
class Configuration
attr_accessor :app_name, :adapter, :auto_shutdown,
:delayed_message_workers, :enable_delayed_message, :error_handlers,
:error_limit, :error_expiration, :retryable_errors,
:retryable_error_limit, :status, :status_port, :status_host,
:socket_path, :shutdown_handlers
:error_limit, :error_expiration, :purge_queues_on_start,
:retryable_errors, :retryable_error_limit, :status, :status_port,
:status_host, :socket_path, :shutdown_handlers
attr_writer :env, :log_level

def initialize
Expand All @@ -19,6 +19,7 @@ def initialize
@error_limit = 5
@error_expiration = 3600 # 60 minutes
@log_level = nil
@purge_queues_on_start = false
@retryable_errors = []
@retryable_error_limit = 3
@status_port = 10000
Expand Down Expand Up @@ -56,6 +57,7 @@ def to_hsh
:error_handlers,
:error_limit,
:error_expiration,
:purge_queues_on_start,
:log_level,
:retryable_errors,
:retryable_error_limit,
Expand Down
6 changes: 3 additions & 3 deletions spec/configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@
accessors = [
:app_name, :adapter, :auto_shutdown, :delayed_message_workers,
:env, :enable_delayed_message, :error_handlers, :error_limit,
:error_expiration, :log_level, :retryable_errors,
:retryable_error_limit, :status_port, :status_host, :status,
:socket_path, :shutdown_handlers
:error_expiration, :purge_queues_on_start, :log_level,
:retryable_errors, :retryable_error_limit, :status_port, :status_host,
:status, :socket_path, :shutdown_handlers
]
config = Emque::Consuming::Configuration.new

Expand Down

0 comments on commit 08db6b8

Please sign in to comment.