diff --git a/README.md b/README.md index ff7f2660b..5fe0de55b 100644 --- a/README.md +++ b/README.md @@ -248,7 +248,13 @@ You can then do the following: RAILS_ENV=production script/delayed_job --queue=tracking start RAILS_ENV=production script/delayed_job --queues=mailers,tasks start - # Use the --pool option to specify a worker pool. You can use this option multiple times to start different numbers of workers for different queues. + # Option --exclude-specified-queues will do inverse of queues processing by skipping ones from --queue, --queues. + # If both --pool=* --exclude-specified-queues given, no exclusions will by applied on "*". + # A worker pool's queue list can be prefixed with a ! which has the same effect as setting + # --exclude-specified-queues but only applies it to that specific worker pool. + + # Use the --pool option to specify a worker pool. + # You can use this option multiple times to start different numbers of workers for different queues. # The following command will start 1 worker for the tracking queue, # 2 workers for the mailers and tasks queues, and 2 workers for any jobs: RAILS_ENV=production script/delayed_job --pool=tracking --pool=mailers,tasks:2 --pool=*:2 start @@ -274,6 +280,9 @@ Work off queues by setting the `QUEUE` or `QUEUES` environment variable. QUEUE=tracking rake jobs:work QUEUES=mailers,tasks rake jobs:work +If EXCLUDE_SPECIFIED_QUEUES set to YES, then queues defined by QUEUE, QUEUES will be skipped instead. +See option --exclude-specified-queues description for special case of queue "*" + Restarting delayed_job ====================== diff --git a/lib/delayed/backend/shared_spec.rb b/lib/delayed/backend/shared_spec.rb index 39f497670..ca76c2aa3 100644 --- a/lib/delayed/backend/shared_spec.rb +++ b/lib/delayed/backend/shared_spec.rb @@ -425,6 +425,65 @@ def create_job(opts = {}) expect(SimpleJob.runs).to eq(3) end end + + context 'when asked to exclude specified queues' do + context 'and worker does not have queue set' do + before(:each) do + worker.queues = [] + worker.exclude_specified_queues = true + end + + it 'works off all jobs' do + expect(SimpleJob.runs).to eq(0) + + create_job(:queue => 'one') + create_job(:queue => 'two') + create_job + worker.work_off + + expect(SimpleJob.runs).to eq(3) + end + end + + context 'and worker has one queue set' do + before(:each) do + worker.queues = ['large'] + worker.exclude_specified_queues = true + end + + it 'only works off jobs which are not from selected queues' do + expect(SimpleJob.runs).to eq(0) + + create_job(:queue => 'large') + create_job(:queue => 'small') + create_job(:queue => 'small 2') + worker.work_off + + expect(SimpleJob.runs).to eq(2) + end + end + + context 'and worker has two queue set' do + before(:each) do + worker.queues = %w[large small] + worker.exclude_specified_queues = true + end + + it 'only works off jobs which are not from selected queues' do + expect(SimpleJob.runs).to eq(0) + + create_job(:queue => 'large') + create_job(:queue => 'small') + create_job(:queue => 'medium') + create_job(:queue => 'medium 2') + create_job + + worker.work_off + + expect(SimpleJob.runs).to eq(3) + end + end + end end context 'max_attempts' do diff --git a/lib/delayed/command.rb b/lib/delayed/command.rb index 281078242..225966476 100644 --- a/lib/delayed/command.rb +++ b/lib/delayed/command.rb @@ -71,6 +71,9 @@ def initialize(args) # rubocop:disable MethodLength opt.on('--queue=queue', 'Specify which queue DJ must look up for jobs') do |queue| @options[:queues] = queue.split(',') end + opt.on('--exclude-specified-queues', 'Exclude looking up of queues specified by --queue[s]=') do + @options[:exclude_specified_queues] = true + end opt.on('--pool=queue1[,queue2][:worker_count]', 'Specify queues and number of workers for a worker pool') do |pool| parse_worker_pool(pool) end @@ -119,6 +122,7 @@ def setup_pools end def run_process(process_name, options = {}) + options = normalize_worker_options(options) Delayed::Worker.before_fork Daemons.run_proc(process_name, :dir => options[:pid_dir], :dir_mode => :normal, :monitor => @monitor, :ARGV => @args) do |*_args| $0 = File.join(options[:prefix], process_name) if @options[:prefix] @@ -153,6 +157,20 @@ def parse_worker_pool(pool) @worker_pools << [queues, worker_count] end + def normalize_worker_options(options) + options = options.dup + + # If we haven't explictly said that we do or don't want to exclude specified queues, treat a leading '!' as a negation indicator for that list of queues + # Otherwise, the ! is treated as part of the queue name itself + if options[:exclude_specified_queues].nil? && options[:queues].present? + queues = options[:queues].map {|queue| queue.sub(/^!/, '') } # remove leading ! from all queues even though we only expect the first to have one, this makes it easier to look for changes after + options[:exclude_specified_queues] = queues != options[:queues] + options[:queues] = queues + end + + options + end + def root @root ||= rails_root_defined? ? ::Rails.root : DIR_PWD end diff --git a/lib/delayed/tasks.rb b/lib/delayed/tasks.rb index 409ba48f8..0e7971099 100644 --- a/lib/delayed/tasks.rb +++ b/lib/delayed/tasks.rb @@ -19,6 +19,7 @@ :min_priority => ENV['MIN_PRIORITY'], :max_priority => ENV['MAX_PRIORITY'], :queues => (ENV['QUEUES'] || ENV['QUEUE'] || '').split(','), + :exclude_specified_queues => ENV['EXCLUDE_SPECIFIED_QUEUES'].to_s.casecmp('YES').zero?, :quiet => ENV['QUIET'] } diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index 7b983a206..f351c8a9d 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -10,20 +10,21 @@ module Delayed class Worker # rubocop:disable ClassLength - DEFAULT_LOG_LEVEL = 'info'.freeze - DEFAULT_SLEEP_DELAY = 5 - DEFAULT_MAX_ATTEMPTS = 25 - DEFAULT_MAX_RUN_TIME = 4.hours - DEFAULT_DEFAULT_PRIORITY = 0 - DEFAULT_DELAY_JOBS = true - DEFAULT_QUEUES = [].freeze - DEFAULT_QUEUE_ATTRIBUTES = HashWithIndifferentAccess.new.freeze - DEFAULT_READ_AHEAD = 5 + DEFAULT_LOG_LEVEL = 'info'.freeze + DEFAULT_SLEEP_DELAY = 5 + DEFAULT_MAX_ATTEMPTS = 25 + DEFAULT_MAX_RUN_TIME = 4.hours + DEFAULT_DEFAULT_PRIORITY = 0 + DEFAULT_DELAY_JOBS = true + DEFAULT_QUEUES = [].freeze + DEFAULT_EXCLUDE_SPECIFIED_QUEUES = false + DEFAULT_QUEUE_ATTRIBUTES = HashWithIndifferentAccess.new.freeze + DEFAULT_READ_AHEAD = 5 cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time, :default_priority, :sleep_delay, :logger, :delay_jobs, :queues, - :read_ahead, :plugins, :destroy_failed_jobs, :exit_on_complete, - :default_log_level + :exclude_specified_queues, :read_ahead, :plugins, :destroy_failed_jobs, + :exit_on_complete, :default_log_level # Named queue into which jobs are enqueued by default cattr_accessor :default_queue_name @@ -34,16 +35,17 @@ class Worker # rubocop:disable ClassLength attr_accessor :name_prefix def self.reset - self.default_log_level = DEFAULT_LOG_LEVEL - self.sleep_delay = DEFAULT_SLEEP_DELAY - self.max_attempts = DEFAULT_MAX_ATTEMPTS - self.max_run_time = DEFAULT_MAX_RUN_TIME - self.default_priority = DEFAULT_DEFAULT_PRIORITY - self.delay_jobs = DEFAULT_DELAY_JOBS - self.queues = DEFAULT_QUEUES - self.queue_attributes = DEFAULT_QUEUE_ATTRIBUTES - self.read_ahead = DEFAULT_READ_AHEAD - @lifecycle = nil + self.default_log_level = DEFAULT_LOG_LEVEL + self.sleep_delay = DEFAULT_SLEEP_DELAY + self.max_attempts = DEFAULT_MAX_ATTEMPTS + self.max_run_time = DEFAULT_MAX_RUN_TIME + self.default_priority = DEFAULT_DEFAULT_PRIORITY + self.delay_jobs = DEFAULT_DELAY_JOBS + self.queues = DEFAULT_QUEUES + self.exclude_specified_queues = DEFAULT_EXCLUDE_SPECIFIED_QUEUES + self.queue_attributes = DEFAULT_QUEUE_ATTRIBUTES + self.read_ahead = DEFAULT_READ_AHEAD + @lifecycle = nil end # Add or remove plugins in this list before the worker is instantiated @@ -132,7 +134,8 @@ def initialize(options = {}) @quiet = options.key?(:quiet) ? options[:quiet] : true @failed_reserve_count = 0 - [:min_priority, :max_priority, :sleep_delay, :read_ahead, :queues, :exit_on_complete].each do |option| + [:min_priority, :max_priority, :sleep_delay, :read_ahead, :queues, + :exclude_specified_queues, :exit_on_complete].each do |option| self.class.send("#{option}=", options[option]) if options.key?(option) end diff --git a/spec/delayed/backend/test.rb b/spec/delayed/backend/test.rb index 6835a7b49..4b851941b 100644 --- a/spec/delayed/backend/test.rb +++ b/spec/delayed/backend/test.rb @@ -66,7 +66,12 @@ def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_ti end jobs.select! { |j| j.priority <= Worker.max_priority } if Worker.max_priority jobs.select! { |j| j.priority >= Worker.min_priority } if Worker.min_priority - jobs.select! { |j| Worker.queues.include?(j.queue) } if Worker.queues.any? + if Worker.queues.any? + jobs.select! do |j| + includes = Worker.queues.include?(j.queue) + Worker.exclude_specified_queues ? !includes : includes + end + end jobs.sort_by! { |j| [j.priority, j.run_at] }[0..limit - 1] end diff --git a/spec/delayed/command_spec.rb b/spec/delayed/command_spec.rb index b57cd6efa..0c22d8521 100644 --- a/spec/delayed/command_spec.rb +++ b/spec/delayed/command_spec.rb @@ -175,5 +175,35 @@ command.daemonize end + + it 'should run with respect of exclude queues' do + command = Delayed::Command.new(['--pool=*:1', '--pool=lage,slow,buggy:2', '--exclude-specified-queues']) + expect(FileUtils).to receive(:mkdir_p).with('./tmp/pids').once + + [ + ['delayed_job.0', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => [], :exclude_specified_queues => true}], + ['delayed_job.1', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[lage slow buggy], :exclude_specified_queues => true}], + ['delayed_job.2', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[lage slow buggy], :exclude_specified_queues => true}] + ].each do |args| + expect(command).to receive(:run_process).with(*args).once + end + + command.daemonize + end + + it 'should set queue exclusion to true if a queue starts with a ! and --exclude_specified_queues has not been specified' do + command = Delayed::Command.new(['--pool=fast:1', '--pool=!lage,slow,buggy:2']) + expect(FileUtils).to receive(:mkdir_p).with('./tmp/pids').once + + [ + ['delayed_job.0', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[fast], :exclude_specified_queues => false}], + ['delayed_job.1', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[lage slow buggy], :exclude_specified_queues => true}], + ['delayed_job.2', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[lage slow buggy], :exclude_specified_queues => true}] + ].each do |args| + expect(command).to receive(:run_process).with(*args).once + end + + command.daemonize + end end end