Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ OPTIONS:
--redis-host HOST Redis host to connect to (default: 127.0.0.1).
--redis-url URL Redis URL to connect to (e.g.: redis://127.0.0.1:6379/0).
--update-timings Update the global job timings key with the timings of this build. Note: This key is used as the basis for job scheduling.
--timings-key KEY Update KEY instead of the default global timings key.
--file-split-threshold N Split spec files slower than N seconds and schedule them as individual examples.
--early-push-max-jobs N Only emit up to N jobs early
--graceful-shutdown-signal Graceful shutdown worker on signal.
Expand Down Expand Up @@ -100,6 +101,7 @@ $ RSPECQ_BUILD=123 RSPECQ_WORKER=foo1 rspecq spec/
| `RSPECQ_SEED` | RSpec seed |
| `RSPECQ_REDIS` | Redis HOST |
| `RSPECQ_UPDATE_TIMINGS` | Timings |
| `RSPECQ_TIMINGS_KEY` | Timings key to update (reporter only) |
| `RSPECQ_FILE_SPLIT_THRESHOLD` | File split threshold |
| `RSPECQ_EARLY_PUSH_MAX_JOBS` | Push jobs early up to a certain number |
| `RSPECQ_GRACEFUL_SHUTDOWN_SIGNAL` | Graceful worker shutdown signal |
Expand Down
12 changes: 9 additions & 3 deletions bin/rspecq
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ OptionParser.new do |o|
o.on("--update-timings", "Update the global job timings key with the " \
"timings of this build. Note: This key is used as the basis for job " \
"scheduling.") do |v|
opts[:timings] = v
opts[:update_timings] = v
end

o.on("--timings-key KEY", "Update KEY instead of the default global job timings key.") do |v|
opts[:timings_key] = v
end

o.on("--file-split-threshold N", Integer, "Split spec files slower than N " \
Expand Down Expand Up @@ -146,7 +150,8 @@ opts[:build] ||= ENV.fetch("RSPECQ_BUILD", nil)
opts[:worker] ||= ENV.fetch("RSPECQ_WORKER", nil)
opts[:seed] ||= ENV.fetch("RSPECQ_SEED", nil)
opts[:redis_host] ||= ENV["RSPECQ_REDIS"] || DEFAULT_REDIS_HOST
opts[:timings] ||= env_set?("RSPECQ_UPDATE_TIMINGS")
opts[:update_timings] ||= env_set?("RSPECQ_UPDATE_TIMINGS")
opts[:timings_key] ||= ENV.fetch("RSPECQ_TIMINGS_KEY", nil)
opts[:file_split_threshold] ||= Integer(ENV["RSPECQ_FILE_SPLIT_THRESHOLD"] || 9_999_999)
opts[:early_push_max_jobs] ||= Integer(ENV["RSPECQ_EARLY_PUSH_MAX_JOBS"]) if ENV["RSPECQ_EARLY_PUSH_MAX_JOBS"]
opts[:graceful_shutdown_signal] ||= ENV.fetch("RSPECQ_GRACEFUL_SHUTDOWN_SIGNAL", DEFAULT_GRACEFUL_SHUTDOWN_SIGNAL)
Expand Down Expand Up @@ -180,6 +185,8 @@ if opts[:report]
build_id: opts[:build],
timeout: opts[:report_timeout],
redis_opts: redis_opts,
update_timings: opts[:update_timings],
timings_key: opts[:timings_key],
queue_wait_timeout: opts[:queue_wait_timeout]
)

Expand All @@ -197,7 +204,6 @@ else

supervisor.worker.tap do |worker|
worker.files_or_dirs_to_run = ARGV if ARGV.any?
worker.populate_timings = opts[:timings]
worker.file_split_threshold = opts[:file_split_threshold]
worker.early_push_max_jobs = opts[:early_push_max_jobs] if opts[:early_push_max_jobs]
worker.max_requeues = opts[:max_requeues]
Expand Down
2 changes: 1 addition & 1 deletion lib/rspecq/formatters/job_timing_recorder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def initialize(queue, job)
end

def dump_summary(summary)
@queue.record_timing(@job, Float(summary.duration))
@queue.record_build_timing(@job, Float(summary.duration))
end
end
end
Expand Down
48 changes: 44 additions & 4 deletions lib/rspecq/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,14 @@ def record_non_example_error(job, message)
@redis.hset(key_errors, job, message)
end

def record_timing(job, duration)
@redis.zadd(key_timings, duration, job)
def record_build_timing(job, duration)
@redis.zadd(key_build_timings, duration, job)
end

# Persist build timings to the global timings key, so that they can be
# used for scheduling future builds.
def update_global_timings(dst = key_timings)
@redis.copy(key_build_timings, dst, replace: true)
end

def record_build_time(duration)
Expand Down Expand Up @@ -273,8 +279,23 @@ def become_master
end

# ordered by execution time desc (slowest are in the head)
def timings
@redis.zrevrange(key_timings, 0, -1, withscores: true).to_h
def global_timings
redis_timings = @redis.zrevrange(key_timings, 0, -1, withscores: true).to_h

# Populate timings for whole files that were split into individual
# examples, by summing up the timings of their individual parts.
#
# We need that so that the scheduler will be able to mark them for splitting again
whole_file_timings = populate_splitted_file_timings(redis_timings)
return redis_timings if whole_file_timings.empty?

redis_timings.merge!(whole_file_timings)
redis_timings.sort_by { |_j, d| -d }.to_h
end

# ordered by execution time desc (slowest are in the head)
def build_timings
@redis.zrevrange(key_build_timings, 0, -1, withscores: true).to_h
end

def example_failures
Expand Down Expand Up @@ -486,6 +507,10 @@ def key_timings
"timings"
end

def key_build_timings
key("timings")
end

# redis: LIST<duration>
#
# Last build is at the head of the list.
Expand Down Expand Up @@ -515,5 +540,20 @@ def key(*keys)
def current_time
@redis.time[0]
end

# Given a set splitted file timing entries, we reconstruct the file's
# timings by summing up the timings of its individual parts.
def populate_splitted_file_timings(timings)
whole_file_timings = Hash.new(0)

timings.each do |file, duration|
next if !file.include?("[")

base_file = file.split("[").first
whole_file_timings[base_file] += duration
end

whole_file_timings
end
end
end
22 changes: 21 additions & 1 deletion lib/rspecq/reporter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,21 @@ module RSpecQ
#
# Reporters are readers of the queue.
class Reporter
def initialize(build_id:, timeout:, redis_opts:, queue_wait_timeout: 30)
# If true, job timings will be populated in the global Redis timings key
#
# Defaults to false
attr_accessor :update_timings

def initialize(build_id:, timeout:, redis_opts:,
queue_wait_timeout: 30,
update_timings: false,
timings_key: nil)
@build_id = build_id
@timeout = timeout
@queue = Queue.new(build_id, "reporter", redis_opts)
@queue_wait_timeout = queue_wait_timeout
@update_timings = update_timings
@timings_key = timings_key

# We want feedback to be immediattely printed to CI users, so
# we disable buffering.
Expand Down Expand Up @@ -55,6 +65,16 @@ def report
build_duration = test_durations&.first
@queue.record_build_time(build_duration) if build_duration

if update_timings && @queue.build_successful?
if @timings_key
puts "Updating job timings @ #{@timings_key}"
@queue.update_global_timings(@timings_key)
else
puts "Updating global job timings"
@queue.update_global_timings
end
end

flaky_jobs = @queue.flaky_jobs

puts summary(@queue.example_failures, @queue.non_example_errors,
Expand Down
16 changes: 3 additions & 13 deletions lib/rspecq/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@ class Worker
# Defaults to "spec" (similar to RSpec)
attr_accessor :files_or_dirs_to_run

# If true, job timings will be populated in the global Redis timings key
#
# Defaults to false
attr_accessor :populate_timings

# If set, spec files that are known to take more than this value to finish,
# will be split and scheduled on a per-example basis.
#
Expand Down Expand Up @@ -76,7 +71,6 @@ def initialize(build_id:, worker_id:, redis_opts:, shutdown_pipe: nil)
@queue = Queue.new(build_id, worker_id, redis_opts)
@fail_fast = 0
@files_or_dirs_to_run = "spec"
@populate_timings = false
@file_split_threshold = 999_999
@heartbeat_updated_at = nil
@max_requeues = 3
Expand Down Expand Up @@ -143,10 +137,7 @@ def work
RSpec.configuration.add_formatter(Formatters::FailureRecorder.new(queue, job, max_requeues, @worker_id))
RSpec.configuration.add_formatter(Formatters::ExampleCountRecorder.new(queue))
RSpec.configuration.add_formatter(Formatters::WorkerHeartbeatRecorder.new(self))

if populate_timings
RSpec.configuration.add_formatter(Formatters::JobTimingRecorder.new(queue, job))
end
RSpec.configuration.add_formatter(Formatters::JobTimingRecorder.new(queue, job))

options = ["--format", "progress", job]
tags.each { |tag| options.push("--tag", tag) }
Expand All @@ -171,7 +162,7 @@ def update_heartbeat
end

def global_timings
@global_timings ||= queue.timings
@global_timings ||= queue.global_timings
end

def try_publish_queue!(queue)
Expand All @@ -196,7 +187,7 @@ def try_publish_queue!(queue)
if global_timings.empty?
q_size = queue.push_jobs(files_to_run.shuffle, fail_fast)
log_event(
"No timings found! Published queue in random order (size=#{q_size})",
"No global timings found! Published queue in random order (size=#{q_size})",
"warning"
)
return q_size
Expand Down Expand Up @@ -334,7 +325,6 @@ def log_event(msg, level, additional = {})
worker: @worker_id,
queue: queue.inspect,
files_or_dirs_to_run: files_or_dirs_to_run,
populate_timings: populate_timings,
file_split_threshold: file_split_threshold,
heartbeat_updated_at: @heartbeat_updated_at,
object: inspect,
Expand Down
50 changes: 43 additions & 7 deletions test/test_e2e.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,36 +77,72 @@ def test_non_example_error
assert_equal ["./spec/foo_spec.rb"], queue.non_example_errors.keys
end

def test_timings_update
queue = exec_build("timings", "--update-timings")
def test_build_timings_update
queue = exec_build("timings")

assert queue.build_successful?

assert_equal [
"./spec/very_fast_spec.rb",
"./spec/fast_spec.rb",
"./spec/medium_spec.rb",
"./spec/slow_spec.rb",
"./spec/very_slow_spec.rb",
], queue.build_timings.sort_by { |_, v| v }.map(&:first)
end

def test_global_timings_update_different_key
queue = exec_build("timings")

custom_key = "timings-#{rand_id}"
exec_reporter("--update-timings --timings-key=#{custom_key}", build_id: queue.build_id)

assert queue.build_successful?

redis = Redis.new(REDIS_OPTS)
assert_equal [
"./spec/very_fast_spec.rb",
"./spec/fast_spec.rb",
"./spec/medium_spec.rb",
"./spec/slow_spec.rb",
"./spec/very_slow_spec.rb",
], queue.timings.sort_by { |_, v| v }.map(&:first)
], redis.zrevrange(custom_key, 0, -1, withscores: true).to_h
.sort_by { |_, v| v }.map(&:first)
end

def test_global_timings_update
queue = exec_build("timings")
exec_reporter("--update-timings", build_id: queue.build_id)

assert queue.build_successful?

assert_equal [
"./spec/very_fast_spec.rb",
"./spec/fast_spec.rb",
"./spec/medium_spec.rb",
"./spec/slow_spec.rb",
"./spec/very_slow_spec.rb",
], queue.global_timings.sort_by { |_, v| v }.map(&:first)
end

def test_timings_no_update
queue = exec_build("timings")

assert queue.build_successful?
assert_empty queue.timings
assert_empty queue.global_timings
end

def test_spec_file_splitting
queue = exec_build("spec_file_splitting", "--update-timings")
queue = exec_build("spec_file_splitting")
assert queue.build_successful?
refute_empty queue.timings

exec_reporter("--update-timings", build_id: queue.build_id)
refute_empty queue.global_timings

queue = exec_build("spec_file_splitting", "--file-split-threshold 1")

assert queue.build_successful?
refute_empty queue.timings
refute_empty queue.build_timings
assert_processed_jobs([
"./spec/slow_spec.rb[1:2:1]",
"./spec/slow_spec.rb[1:1]",
Expand Down
4 changes: 2 additions & 2 deletions test/test_helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ def start_worker(path, args = "", build_id: nil, worker_id: nil)
# Executes an rspecq reporter.
# @param [String] :build_id By default build_id is generated by the method.
# @return [String] The reporter's output.
def exec_reporter(build_id:)
out = `#{EXEC_CMD} --report --build #{build_id}`
def exec_reporter(args = "", build_id:)
out = `#{EXEC_CMD} --report --build #{build_id} #{args}`
puts out if ENV["RSPECQ_DEBUG"]
out
end
Expand Down
Loading
Loading