Skip to content

Commit

Permalink
eventsimple outbox processor is instantiated once
Browse files Browse the repository at this point in the history
  • Loading branch information
desheikh committed Apr 23, 2024
1 parent 0f822af commit 7620e86
Show file tree
Hide file tree
Showing 17 changed files with 188 additions and 38 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

## 1.4.0 - 2024-04-02
### Changed
- The outbox processor is instantiated once and takes the event as an argument.
- Added SIGTERM event handling and fixed shutdown behaviour.

## 1.3.3 - 2024-04-02
### Changed
- add `parent_record` configuration so it can be easily overwritten
Expand Down
4 changes: 4 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,9 @@

source 'https://rubygems.org'

group :test do
gem 'factory_bot_rails'
end

# Specify your gem's dependencies in eventsimple.gemspec
gemspec
12 changes: 8 additions & 4 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
PATH
remote: .
specs:
eventsimple (1.3.3)
eventsimple (1.4.0)
dry-struct (~> 1.6)
dry-types (~> 1.7)
pg (~> 1.4)
rails (~> 7.0)
retriable (~> 3.1)
with_advisory_lock (>= 5.1)

GEM
remote: https://rubygems.org/
Expand Down Expand Up @@ -176,7 +177,7 @@ GEM
net-imap
net-pop
net-smtp
marcel (1.0.3)
marcel (1.0.4)
method_source (1.0.0)
mini_mime (1.1.5)
minitest (5.22.2)
Expand All @@ -189,7 +190,7 @@ GEM
net-protocol
net-protocol (0.2.2)
timeout
net-smtp (0.4.0.1)
net-smtp (0.5.0)
net-protocol
nio4r (2.7.0)
nokogiri (1.16.2-arm64-darwin)
Expand All @@ -205,7 +206,7 @@ GEM
parser (3.3.0.5)
ast (~> 2.4.1)
racc
pg (1.5.5)
pg (1.5.6)
polyglot (0.3.5)
pry (0.14.2)
coderay (~> 1.1)
Expand Down Expand Up @@ -348,6 +349,9 @@ GEM
websocket-driver (0.7.6)
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.5)
with_advisory_lock (5.1.0)
activerecord (>= 6.1)
zeitwerk (>= 2.6)
ws-style (7.4.3)
rubocop-rspec (>= 2.2.0)
rubocop-vendor (>= 0.11)
Expand Down
7 changes: 1 addition & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,7 @@ end
```ruby
module UserComponent
class EventProcessor
def initialize(event)
@event = event
end
attr_reader :event

def call
def call(event)
Rails.logger.info("PROCESSING EVENT: #{event.id}")
end
end
Expand Down
2 changes: 1 addition & 1 deletion eventsimple.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ Gem::Specification.new do |spec|
spec.add_runtime_dependency 'pg', '~> 1.4'
spec.add_runtime_dependency 'rails', '~> 7.0'
spec.add_runtime_dependency 'retriable', '~> 3.1'
spec.add_runtime_dependency 'with_advisory_lock', '>= 5.1'

spec.add_development_dependency 'bundle-audit'
spec.add_development_dependency 'factory_bot_rails'
spec.add_development_dependency 'fuubar'
spec.add_development_dependency 'git'
spec.add_development_dependency 'guard-rspec'
Expand Down
6 changes: 3 additions & 3 deletions lib/eventsimple/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ def drives_events_for(aggregate_klass, aggregate_id:, events_namespace: nil)
class_attribute :_aggregate_id
self._aggregate_id = aggregate_id

class_attribute :_outbox_mode
class_attribute :_outbox_concurrency
class_attribute :_outbox_enabled
class_attribute :_consumer_group_size

class_attribute :_on_invalid_transition
self._on_invalid_transition = ->(error) { raise error }
Expand Down Expand Up @@ -161,7 +161,7 @@ def create!(*args, &block)
end

def with_locks(&block)
if _outbox_mode
if _outbox_enabled
base_class.with_advisory_lock(base_class.name, { transaction: true }, &block)
else
yield
Expand Down
37 changes: 24 additions & 13 deletions lib/eventsimple/outbox/consumer.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# frozen_string_literal: true

require 'with_advisory_lock'
require 'eventsimple/outbox/models/cursor'

module Eventsimple
Expand All @@ -9,39 +10,49 @@ def self.extended(klass)
klass.class_exec do
class_attribute :_event_klass
class_attribute :_processor_klass
class_attribute :_processor
class_attribute :stop_consumer, default: false

Signal.trap('SIGINT') do
self.stop_consumer = true
$stdout.puts('SIGINT received, stopping consumer')
end
end
end

def consumes_event(event_klass, concurrency: 1)
event_klass._outbox_mode = true
event_klass._outbox_concurrency = concurrency
def consumes_event(event_klass, group_size: 1)
event_klass._outbox_enabled = true
event_klass._consumer_group_size = group_size

self._event_klass = event_klass
end

def processor(processor_klass)
self._processor_klass = processor_klass
self._processor = processor_klass.new
end

def start(group_number: 0) # rubocop:disable Metrics/AbcSize
Signal.trap('INT') do
self.stop_consumer = true
$stdout.puts('INT received, stopping consumer')
end
Signal.trap('TERM') do
self.stop_consumer = true
$stdout.puts('TERM received, stopping consumer')
end

run_consumer(group_number: group_number)
end

def start # rubocop:disable Metrics/AbcSize
cursor = Outbox::Cursor.fetch(_event_klass, 0)
def run_consumer(group_number:)
cursor = Outbox::Cursor.fetch(_event_klass, group_number: group_number)

until stop_consumer
_event_klass.unscoped.in_batches(start: cursor + 1, load: true).each do |batch|
batch.each do |event|
_processor_klass.new(event).call
_processor.call(event)

cursor = event.id
break if stop_consumer
end

cursor = batch.last.id
Outbox::Cursor.set(_event_klass, 0, cursor)
Outbox::Cursor.set(_event_klass, cursor, group_number: group_number)
end

sleep(1)
Expand Down
4 changes: 2 additions & 2 deletions lib/eventsimple/outbox/models/cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ module Outbox
class Cursor < Eventsimple.configuration.parent_record_klass
self.table_name = 'eventsimple_outbox_cursors'

def self.fetch(event_klass, group_number)
def self.fetch(event_klass, group_number: 0)
existing = find_by(event_klass: event_klass.to_s, group_number: group_number)
existing ? existing.cursor : 0
end

def self.set(event_klass, group_number, cursor)
def self.set(event_klass, cursor, group_number: 0)
upsert(
{
event_klass: event_klass.to_s,
Expand Down
2 changes: 1 addition & 1 deletion lib/eventsimple/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Eventsimple
VERSION = '1.3.3'
VERSION = '1.4.0'
end
10 changes: 10 additions & 0 deletions spec/dummy/app/components/user_component/consumer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
require 'eventsimple/outbox/consumer'

module UserComponent
class Consumer
extend Eventsimple::Outbox::Consumer

consumes_event UserEvent
processor EventProcessor
end
end
7 changes: 7 additions & 0 deletions spec/dummy/app/components/user_component/event_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module UserComponent
class EventProcessor
def call(event)
# no-op
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# frozen_string_literal: true

class CreateEventsimpleOutboxCursor < ActiveRecord::Migration[7.1]
def change
create_table :eventsimple_outbox_cursors do |t|
t.string :event_klass, null: false
t.integer :group_number, null: false
t.bigint :cursor, null: false

t.index [:event_klass, :group_number], unique: true
end
end
end
9 changes: 8 additions & 1 deletion spec/dummy/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema[7.1].define(version: 2022_09_17_150839) do
ActiveRecord::Schema[7.1].define(version: 2024_04_19_175459) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"

create_table "eventsimple_outbox_cursors", force: :cascade do |t|
t.string "event_klass", null: false
t.integer "group_number", null: false
t.bigint "cursor", null: false
t.index ["event_klass", "group_number"], name: "idx_on_event_klass_group_number_24bcae0807", unique: true
end

create_table "user_events", force: :cascade do |t|
t.string "aggregate_id", null: false
t.string "idempotency_key"
Expand Down
73 changes: 73 additions & 0 deletions spec/dummy/spec/components/user_component/consumer_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
RSpec.describe UserComponent::Consumer do
subject(:run_consumer) { described_class.run_consumer(group_number: 0) }

before do
allow(described_class).to receive(:sleep) do
described_class.stop_consumer = true
end
end

after do
described_class.stop_consumer = false
end

it 'consumes an event' do
expect(described_class._event_klass).to eq(UserEvent)
end

it 'has a processor' do
expect(described_class._processor_klass).to eq(UserComponent::EventProcessor)
end

describe '.run_consumer' do
it 'records the last processed event position' do
event = create(:user_event)

cursor = Eventsimple::Outbox::Cursor.fetch(UserEvent, group_number: 0)
expect(cursor).to be(0)

expect(described_class._processor).to receive(:call).once

run_consumer

cursor = Eventsimple::Outbox::Cursor.fetch(UserEvent, group_number: 0)
expect(cursor).to eq(event.id)
end

context 'when consumer is stopped inside batch' do
let!(:events) { create_list(:user_event, 5) }

it 'sets the cursor to the last processed event position' do
allow(described_class._processor).to receive(:call) do |e|
expect(e.id).to be_in(events[0..1].map(&:id))

# stop consumer after the second event in the batch is processed
if e.id == events[1].id
described_class.stop_consumer = true
end
end

run_consumer

expect(described_class._processor).to have_received(:call).exactly(2).times
expect(Eventsimple::Outbox::Cursor.fetch(UserEvent)).to eq(events[1].id)
end
end

context 'with an existing cursor' do
let!(:events) { create_list(:user_event, 5) }

before do
Eventsimple::Outbox::Cursor.set(UserEvent, events[2].id)
allow(described_class._processor).to receive(:call)
end

it 'starts after the last processed event position' do
run_consumer

expect(described_class._processor).to have_received(:call).twice
expect(Eventsimple::Outbox::Cursor.fetch(UserEvent)).to eq(events[4].id)
end
end
end
end
15 changes: 15 additions & 0 deletions spec/dummy/spec/factories/user_event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# frozen_string_literal: true

FactoryBot.define do
factory :user_event do
user
type { 'UserComponent::Events::Created' }
data {
{
canonical_id: SecureRandom.uuid,
username: 'test-user',
email: '[email protected]',
}
}
end
end
6 changes: 5 additions & 1 deletion spec/lib/eventsimple/event_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
end

context 'when entity is stale' do
# disable transactional tests so we can test the retry logic
def self.uses_transaction?(_method) = true
after { UserEvent.delete_all }

it 'retries and successfully writes the event' do
stale_user = User.find_by(canonical_id: user_canonical_id)

Expand Down Expand Up @@ -48,7 +52,7 @@

context 'when an event class no longer exists' do
it 'uses a no-op deleted class' do
UserEvent.insert({ type: 'NonExistentEvent', aggregate_id: user_canonical_id })
UserEvent.create!(type: 'NonExistentEvent', aggregate_id: user_canonical_id)

event = UserEvent.last
expect(event).to be_a(UserEvent::Deleted__NonExistentEvent)
Expand Down
Loading

0 comments on commit 7620e86

Please sign in to comment.