Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
artery (1.4.1)
artery (1.4.2)
artery-browser (~> 0.1)
concurrent-ruby (~> 1.0)
multiblock (~> 0.2)
Expand Down
60 changes: 40 additions & 20 deletions lib/artery/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

module Artery
class Publisher
DISCOVERY_INTERVAL = 5
DISCOVERY_INTERVAL = 30
POLL_INTERVAL = 0.5
BATCH_SIZE = 100

Expand All @@ -27,44 +27,64 @@ def publisher_loop
max_queue: 0,
fallback_policy: :caller_runs
)
@running_models = Concurrent::Set.new
@known_models = Concurrent::Set.new
@busy_models = Concurrent::Set.new
@last_discovery = Time.at(0)

Instrumentation.instrument(:publisher, action: :started)

loop do
models = Artery.model_info_class.pluck(:model)
discover_models if discovery_due?

models.each do |model|
next if @running_models.include?(model)
@known_models.each do |model|
next if @busy_models.include?(model)

@running_models.add(model)
@pool.post { model_loop(model) }
@busy_models.add(model)
@pool.post { process_model(model) }
end

sleep DISCOVERY_INTERVAL
sleep POLL_INTERVAL
end
end

def model_loop(model)
Artery.logger.tagged('Publisher', model) do
def discover_models
current = Artery.model_info_class.pluck(:model)

(@known_models - current).each do |removed|
@known_models.delete(removed)
Instrumentation.instrument(:publisher, action: :model_removed, model: removed)
end

current.each do |model|
next if @known_models.include?(model)

@known_models.add(model)
Artery.model_info_class.ensure_initialized!(model)
Instrumentation.instrument(:publisher, action: :model_started, model: model)
end

@last_discovery = Time.now
end

def discovery_due?
Time.now - @last_discovery >= DISCOVERY_INTERVAL
end

def process_model(model)
Artery.logger.tagged('Publisher', model) do
loop do
published = publish_batch(model)
sleep POLL_INTERVAL if published < BATCH_SIZE
break if published < BATCH_SIZE
end
rescue StandardError => e
Instrumentation.instrument(:publisher, action: :error, model: model, error: e.message)
Artery.handle_error Error.new(
"Publisher error for #{model}: #{e.message}",
original_exception: e
)
sleep POLL_INTERVAL
retry
end
rescue StandardError => e
Instrumentation.instrument(:publisher, action: :error, model: model, error: e.message)
Artery.handle_error Error.new(
"Publisher error for #{model}: #{e.message}",
original_exception: e
)
ensure
@running_models.delete(model)
@busy_models&.delete(model)
end

def publish_batch(model)
Expand Down
2 changes: 1 addition & 1 deletion lib/artery/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Artery
VERSION = '1.4.1'
VERSION = '1.4.2'
end
45 changes: 37 additions & 8 deletions spec/dummy/spec/models/artery/publisher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
RSpec.describe Artery::Publisher do
subject(:publisher) { described_class.new }

describe '#publish_batch (via send)' do
let(:model_name) { 'source' }
let(:model_name) { 'source' }

before do
Artery::ActiveRecord::ModelInfo.find_or_create_by!(model: model_name) do |r|
r.latest_index = 0
r.last_published_id = 0
end
before do
Artery::ActiveRecord::ModelInfo.find_or_create_by!(model: model_name) do |r|
r.latest_index = 0
r.last_published_id = 0
end
end

describe '#publish_batch (via send)' do
it 'returns 0 when no unpublished messages exist' do
result = publisher.send(:publish_batch, model_name)
expect(result).to eq(0)
Expand All @@ -37,7 +37,7 @@
expect(row.last_published_id).to eq(messages.last.id)
end

it 'builds correct _previous_index chain' do
it 'builds correct _previous_index chain' do # rubocop:disable RSpec/MultipleExpectations
3.times { create(:source) }
messages = Artery.message_class.where(model: model_name).order(:id).to_a

Expand Down Expand Up @@ -72,4 +72,33 @@
expect(received[0]['_previous_index']).to eq(first_message.id)
end
end

describe '#process_model (via send)' do
it 'publishes all pending messages and releases' do
3.times { create(:source) }

received = []
Artery.subscribe('test.source.create') { |m| received << m }

publisher.send(:process_model, model_name)

sleep 0.2

expect(received.size).to eq(3)

row = Artery::ActiveRecord::ModelInfo.find_by!(model: model_name)
expect(row.last_published_id).to eq(Artery.message_class.where(model: model_name).maximum(:id))
end

it 'handles errors without raising' do
allow(Artery.model_info_class).to receive(:transaction).and_raise(StandardError, 'db gone')
allow(Artery).to receive(:handle_error)

expect { publisher.send(:process_model, model_name) }.not_to raise_error

expect(Artery).to have_received(:handle_error).with(
an_instance_of(Artery::Error).and(having_attributes(message: /db gone/))
)
end
end
end
Loading