Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add each_iteration callback #22

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ module Iteration
define_callbacks :start
define_callbacks :shutdown
define_callbacks :complete
define_callbacks :each_iteration
end

module ClassMethods
Expand All @@ -36,6 +37,14 @@ def on_complete(*filters, &blk)
set_callback(:complete, :after, *filters, &blk)
end

def before_each_iteration(*filters, &blk)
set_callback(:each_iteration, :before, *filters, &blk)
end

def after_each_iteration(*filters, &blk)
set_callback(:each_iteration, :after, *filters, &blk)
end

def supports_interruption?
true
end
Expand Down Expand Up @@ -123,7 +132,9 @@ def iterate_with_enumerator(enumerator, arguments)
arguments = arguments.dup.freeze
enumerator.each do |iteration, index|
record_unit_of_work do
each_iteration(iteration, *arguments)
run_callbacks(:each_iteration) do
each_iteration(iteration, *arguments)
end
self.cursor_position = index
end

Expand Down
51 changes: 51 additions & 0 deletions test/unit/active_job_iteration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ class SimpleIterationJob < ActiveJob::Base
self.on_complete_called = 0
cattr_accessor :on_shutdown_called, instance_accessor: false
self.on_shutdown_called = 0
cattr_accessor :before_each_iteration_called, instance_accessor: false
self.before_each_iteration_called = 0
cattr_accessor :after_each_iteration_called, instance_accessor: false
self.after_each_iteration_called = 0

on_start do
self.class.on_start_called += 1
Expand All @@ -26,6 +30,14 @@ class SimpleIterationJob < ActiveJob::Base
on_shutdown do
self.class.on_shutdown_called += 1
end

before_each_iteration do
self.class.before_each_iteration_called += 1
end

after_each_iteration do
self.class.after_each_iteration_called += 1
end
end

class MultiArgumentIterationJob < SimpleIterationJob
Expand Down Expand Up @@ -276,6 +288,9 @@ def setup
klass.on_start_called = 0
klass.on_complete_called = 0
klass.on_shutdown_called = 0
klass.on_shutdown_called = 0
klass.before_each_iteration_called = 0
klass.after_each_iteration_called = 0
end
JobShouldExitJob.records_performed = []
super
Expand Down Expand Up @@ -345,6 +360,16 @@ def test_failing_job
assert_jobs_in_queue 0
end

def test_failing_job_runs_each_iteration_callbacks
push(FailingIterationJob)

work_one_job
assert_jobs_in_queue 1

assert_equal 4, FailingIterationJob.before_each_iteration_called
assert_equal 3, FailingIterationJob.after_each_iteration_called
end

def test_active_record_job
iterate_exact_times(2.times)

Expand Down Expand Up @@ -440,6 +465,15 @@ def test_multiple_columns
assert_equal first_products, MultipleColumnsActiveRecordIterationJob.records_performed
end

def test_multiple_columns_callback_for_each_iteration
iterate_exact_times(3.times)
push(MultipleColumnsActiveRecordIterationJob)
work_one_job

assert_equal 3, MultipleColumnsActiveRecordIterationJob.before_each_iteration_called
assert_equal 3, MultipleColumnsActiveRecordIterationJob.after_each_iteration_called
end

def test_single_iteration
push(SingleIterationJob)

Expand All @@ -452,6 +486,15 @@ def test_single_iteration
assert_equal 1, SingleIterationJob.on_complete_called
end

def test_single_iteration_callback_for_each_iteration
push(SingleIterationJob)
work_one_job
assert_jobs_in_queue 0

assert_equal 1, SingleIterationJob.before_each_iteration_called
assert_equal 1, SingleIterationJob.after_each_iteration_called
end

def test_relation_with_limit
push(LimitActiveRecordIterationJob)

Expand Down Expand Up @@ -557,6 +600,14 @@ def test_log_completion_data
end
end

def test_aborting_in_each_iteration_job_will_execute_each_iteration_callback
push(AbortingActiveRecordIterationJob)
work_one_job
assert_equal 2, AbortingActiveRecordIterationJob.records_performed.size
assert_equal 3, AbortingActiveRecordIterationJob.before_each_iteration_called
assert_equal 2, AbortingActiveRecordIterationJob.after_each_iteration_called
end

def test_aborting_in_each_iteration_job_will_execute_on_complete_callback
push(AbortingActiveRecordIterationJob)
work_one_job
Expand Down