Skip to content

Commit

Permalink
Fix orphan workers on restart
Browse files Browse the repository at this point in the history
If 1 daemonized DJ worker is running and you execute restart with a
different n value, the existing worker will effectively be orphaned, in
that it continues to run through future stop/start/restart commands
with the new n value. The opposite is also true. If you change from an
n value of 2+ to 1 when you restart then the running workers will be
orphaned. This fixes that problem by looking in the pid_dir to identify
all worker process names and stopping them ALL prior to starting n new
workers.
  • Loading branch information
dan-jensen committed Jun 2, 2019
1 parent 460018e commit 83d496d
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 0 deletions.
29 changes: 29 additions & 0 deletions lib/delayed/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def daemonize # rubocop:disable PerceivedComplexity
end
# rubocop:enable GuardClause
else
stop_start_if_restart
worker_count.times do |worker_index|
process_name = worker_count == 1 ? 'delayed_job' : "delayed_job.#{worker_index}"
run_process(process_name, @options)
Expand Down Expand Up @@ -144,6 +145,18 @@ def run(worker_name = nil, options = {})

private

def args_command
@args.find { |arg| args_command_options.include?(arg) }
end

def args_command_index
@args.index(args_command)
end

def args_command_options
%w[start stop restart run]
end

def parse_worker_pool(pool)
@worker_pools ||= []

Expand All @@ -165,6 +178,22 @@ def rails_logger_defined?
defined?(::Rails.logger)
end

def stop_all_workers
original_command = args_command
@args[args_command_index] = 'stop'
Dir.glob("#{@options[:pid_dir]}/delayed_job*").each do |file_path|
process_name = File.basename(file_path, '.*')
run_process(process_name, @options)
end
@args[args_command_index] = original_command
end

def stop_start_if_restart
return unless args_command == 'restart'
stop_all_workers
@args[args_command_index] = 'start'
end

def exit_with_error_status
exit 1
end
Expand Down
28 changes: 28 additions & 0 deletions spec/delayed/command_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,34 @@
end
end

describe 'restart' do
before(:each) do
Dir.mkdir(run_process_options[:pid_dir])
File.open(pid_filepath, 'w')
end
after(:each) do
File.delete(pid_filepath)
Dir.rmdir(run_process_options[:pid_dir])
end
let(:command) { Delayed::Command.new(['restart']) }
let(:pid_filepath) { "#{run_process_options[:pid_dir]}/delayed_job.orig.pid" }
let(:run_process_options) do
{:log_dir => './log', :pid_dir => './tmp/pids', :quiet => true}
end

it 'sends stop for all workers then start' do
expect(command).to receive(:run_process).with('delayed_job.orig', run_process_options) do
expect(command.instance_variable_get('@args')).to eql(['stop'])
end

expect(command).to receive(:run_process).with('delayed_job', run_process_options) do
expect(command.instance_variable_get('@args')).to eql(['start'])
end

command.daemonize
end
end

describe 'run' do
it 'sets the Delayed::Worker logger' do
expect(Delayed::Worker).to receive(:logger=).with(logger)
Expand Down

0 comments on commit 83d496d

Please sign in to comment.