Skip to content

Commit

Permalink
eventsimple outbox processor is instantiated once
Browse files Browse the repository at this point in the history
  • Loading branch information
desheikh committed Apr 23, 2024
1 parent 0f822af commit 5fab855
Show file tree
Hide file tree
Showing 17 changed files with 154 additions and 36 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

## 1.4.0 - 2024-04-02
### Changed
- The outbox processor is instantiated once and takes the event as an argument.
- Added SIGTERM event handling and fixed shutdown behaviour.

## 1.3.3 - 2024-04-02
### Changed
- add `parent_record` configuration so it can be easily overwritten
Expand Down
4 changes: 4 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,9 @@

source 'https://rubygems.org'

group :test do
gem 'factory_bot_rails'
end

# Specify your gem's dependencies in eventsimple.gemspec
gemspec
12 changes: 8 additions & 4 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
PATH
remote: .
specs:
eventsimple (1.3.3)
eventsimple (1.4.0)
dry-struct (~> 1.6)
dry-types (~> 1.7)
pg (~> 1.4)
rails (~> 7.0)
retriable (~> 3.1)
with_advisory_lock (>= 5.1)

GEM
remote: https://rubygems.org/
Expand Down Expand Up @@ -176,7 +177,7 @@ GEM
net-imap
net-pop
net-smtp
marcel (1.0.3)
marcel (1.0.4)
method_source (1.0.0)
mini_mime (1.1.5)
minitest (5.22.2)
Expand All @@ -189,7 +190,7 @@ GEM
net-protocol
net-protocol (0.2.2)
timeout
net-smtp (0.4.0.1)
net-smtp (0.5.0)
net-protocol
nio4r (2.7.0)
nokogiri (1.16.2-arm64-darwin)
Expand All @@ -205,7 +206,7 @@ GEM
parser (3.3.0.5)
ast (~> 2.4.1)
racc
pg (1.5.5)
pg (1.5.6)
polyglot (0.3.5)
pry (0.14.2)
coderay (~> 1.1)
Expand Down Expand Up @@ -348,6 +349,9 @@ GEM
websocket-driver (0.7.6)
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.5)
with_advisory_lock (5.1.0)
activerecord (>= 6.1)
zeitwerk (>= 2.6)
ws-style (7.4.3)
rubocop-rspec (>= 2.2.0)
rubocop-vendor (>= 0.11)
Expand Down
7 changes: 1 addition & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,7 @@ end
```ruby
module UserComponent
class EventProcessor
def initialize(event)
@event = event
end
attr_reader :event

def call
def call(event)
Rails.logger.info("PROCESSING EVENT: #{event.id}")
end
end
Expand Down
2 changes: 1 addition & 1 deletion eventsimple.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ Gem::Specification.new do |spec|
spec.add_runtime_dependency 'pg', '~> 1.4'
spec.add_runtime_dependency 'rails', '~> 7.0'
spec.add_runtime_dependency 'retriable', '~> 3.1'
spec.add_runtime_dependency 'with_advisory_lock', '>= 5.1'

spec.add_development_dependency 'bundle-audit'
spec.add_development_dependency 'factory_bot_rails'
spec.add_development_dependency 'fuubar'
spec.add_development_dependency 'git'
spec.add_development_dependency 'guard-rspec'
Expand Down
6 changes: 3 additions & 3 deletions lib/eventsimple/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ def drives_events_for(aggregate_klass, aggregate_id:, events_namespace: nil)
class_attribute :_aggregate_id
self._aggregate_id = aggregate_id

class_attribute :_outbox_mode
class_attribute :_outbox_concurrency
class_attribute :_outbox_enabled
class_attribute :_consumer_group_size

class_attribute :_on_invalid_transition
self._on_invalid_transition = ->(error) { raise error }
Expand Down Expand Up @@ -161,7 +161,7 @@ def create!(*args, &block)
end

def with_locks(&block)
if _outbox_mode
if _outbox_enabled
base_class.with_advisory_lock(base_class.name, { transaction: true }, &block)
else
yield
Expand Down
36 changes: 24 additions & 12 deletions lib/eventsimple/outbox/consumer.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# frozen_string_literal: true

require 'with_advisory_lock'
require 'eventsimple/outbox/models/cursor'

module Eventsimple
Expand All @@ -9,41 +10,52 @@ def self.extended(klass)
klass.class_exec do
class_attribute :_event_klass
class_attribute :_processor_klass
class_attribute :_processor
class_attribute :stop_consumer, default: false

Signal.trap('SIGINT') do
self.stop_consumer = true
$stdout.puts('SIGINT received, stopping consumer')
end
end
end

def consumes_event(event_klass, concurrency: 1)
event_klass._outbox_mode = true
event_klass._outbox_concurrency = concurrency
def consumes_event(event_klass, group_size: 1)
event_klass._outbox_enabled = true
event_klass._consumer_group_size = group_size

self._event_klass = event_klass
end

def processor(processor_klass)
self._processor_klass = processor_klass
self._processor = processor_klass.new
end

def start(group_number: 0, stop_at_end: false) # rubocop:disable Metrics/AbcSize
Signal.trap('INT') do
self.stop_consumer = true
$stdout.puts('INT received, stopping consumer')
end
Signal.trap('TERM') do
self.stop_consumer = true
$stdout.puts('TERM received, stopping consumer')
end

run_consumer(group_number: group_number, stop_at_end: stop_at_end)
end

def start # rubocop:disable Metrics/AbcSize
cursor = Outbox::Cursor.fetch(_event_klass, 0)
def run_consumer(group_number:, stop_at_end:)
cursor = Outbox::Cursor.fetch(_event_klass, group_number: group_number)

until stop_consumer
_event_klass.unscoped.in_batches(start: cursor + 1, load: true).each do |batch|
batch.each do |event|
_processor_klass.new(event).call
_processor.call(event)

break if stop_consumer
end

cursor = batch.last.id
Outbox::Cursor.set(_event_klass, 0, cursor)
Outbox::Cursor.set(_event_klass, group_number, cursor)
end

break if stop_at_end
sleep(1)
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/eventsimple/outbox/models/cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module Outbox
class Cursor < Eventsimple.configuration.parent_record_klass
self.table_name = 'eventsimple_outbox_cursors'

def self.fetch(event_klass, group_number)
def self.fetch(event_klass, group_number: 0)
existing = find_by(event_klass: event_klass.to_s, group_number: group_number)
existing ? existing.cursor : 0
end
Expand Down
2 changes: 1 addition & 1 deletion lib/eventsimple/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Eventsimple
VERSION = '1.3.3'
VERSION = '1.4.0'
end
10 changes: 10 additions & 0 deletions spec/dummy/app/components/user_component/consumer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
require 'eventsimple/outbox/consumer'

module UserComponent
class Consumer
extend Eventsimple::Outbox::Consumer

consumes_event UserEvent
processor EventProcessor
end
end
7 changes: 7 additions & 0 deletions spec/dummy/app/components/user_component/event_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module UserComponent
class EventProcessor
def call(event)
# no-op
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# frozen_string_literal: true

class CreateEventsimpleOutboxCursor < ActiveRecord::Migration[7.1]
def change
create_table :eventsimple_outbox_cursors do |t|
t.string :event_klass, null: false
t.integer :group_number, null: false
t.bigint :cursor, null: false

t.index [:event_klass, :group_number], unique: true
end
end
end
9 changes: 8 additions & 1 deletion spec/dummy/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema[7.1].define(version: 2022_09_17_150839) do
ActiveRecord::Schema[7.1].define(version: 2024_04_19_175459) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"

create_table "eventsimple_outbox_cursors", force: :cascade do |t|
t.string "event_klass", null: false
t.integer "group_number", null: false
t.bigint "cursor", null: false
t.index ["event_klass", "group_number"], name: "idx_on_event_klass_group_number_24bcae0807", unique: true
end

create_table "user_events", force: :cascade do |t|
t.string "aggregate_id", null: false
t.string "idempotency_key"
Expand Down
41 changes: 41 additions & 0 deletions spec/dummy/spec/components/user_component/consumer_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
RSpec.describe UserComponent::Consumer do
let(:mock_processor) { instance_double(UserComponent::EventProcessor, :call) }

before do
described_class._processor = mock_processor
allow(mock_processor).to receive(:call)
end

it 'consumes events' do
expect(described_class._event_klass).to eq(UserEvent)
end

it 'has a processor' do
expect(described_class._processor_klass).to eq(UserComponent::EventProcessor)
end

describe '.run_consumer' do
before do
UserComponent::Events::Created.create!(
user: User.new,
data: {
canonical_id: SecureRandom.uuid,
username: 'test-user',
email: '[email protected]',
},
)
end

it 'records the last processed event position' do
cursor = Eventsimple::Outbox::Cursor.fetch(UserEvent, group_number: 0)
expect(cursor).to be(0)

described_class.run_consumer(group_number: 0, stop_at_end: true)

expect(mock_processor).to have_received(:call).once

cursor = Eventsimple::Outbox::Cursor.fetch(UserEvent, group_number: 0)
expect(cursor).to eq(UserEvent.last.id)
end
end
end
14 changes: 14 additions & 0 deletions spec/dummy/spec/factories/user_event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true

FactoryBot.define do
factory :user_event do
user { build(:user) }
data {
{
canonical_id: SecureRandom.uuid,
username: 'test-user',
email: '[email protected]',
}
}
end
end
6 changes: 5 additions & 1 deletion spec/lib/eventsimple/event_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
end

context 'when entity is stale' do
# disable transactional tests so we can test the retry logic
def self.uses_transaction?(_method) = true
after { UserEvent.delete_all }

it 'retries and successfully writes the event' do
stale_user = User.find_by(canonical_id: user_canonical_id)

Expand Down Expand Up @@ -48,7 +52,7 @@

context 'when an event class no longer exists' do
it 'uses a no-op deleted class' do
UserEvent.insert({ type: 'NonExistentEvent', aggregate_id: user_canonical_id })
UserEvent.create!(type: 'NonExistentEvent', aggregate_id: user_canonical_id)

event = UserEvent.last
expect(event).to be_a(UserEvent::Deleted__NonExistentEvent)
Expand Down
14 changes: 8 additions & 6 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
# frozen_string_literal: true

ENV['RAILS_ENV'] ||= 'test'

require File.expand_path('../spec/dummy/config/environment.rb', __dir__)
ENV['RAILS_ROOT'] ||= "#{File.dirname(__FILE__)}../../../spec/dummy"

require 'pry'
require 'eventsimple'
require 'eventsimple/support/spec_helpers'

require 'retriable'
require 'rspec/rails'

RSpec.configure do |config|
config.expect_with :rspec do |expectations|
Expand All @@ -18,6 +24,7 @@
config.filter_run_when_matching :focus
config.example_status_persistence_file_path = "spec/examples.txt"
config.disable_monkey_patching!
config.use_transactional_fixtures = true

if config.files_to_run.one?
config.default_formatter = "doc"
Expand All @@ -27,11 +34,6 @@

Kernel.srand config.seed

require File.expand_path('../spec/dummy/config/environment.rb', __dir__)
ENV['RAILS_ROOT'] ||= "#{File.dirname(__FILE__)}../../../spec/dummy"

require 'rspec/rails'

ActiveRecord::Migration.maintain_test_schema!

Retriable.configure do |c|
Expand Down

0 comments on commit 5fab855

Please sign in to comment.