Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions lib/output_workflows/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module OutputWorkflows
class Client
attr_reader :configuration

def initialize(api_url: nil, api_key: nil, **options)
def initialize(api_url: nil, api_key: nil, **_options)
@configuration = OutputWorkflows.configuration.dup
@configuration.api_url = api_url if api_url
@configuration.api_key = api_key if api_key
Expand Down Expand Up @@ -63,20 +63,26 @@ def workflow_history(workflow_id, run_id: nil, page_size: 50, page_token: nil, i
handle_faraday_error("get history for #{workflow_label(workflow_id, run_id)}", e)
end

# Cancel/stop a running workflow. Returns true if cancelled, false if it doesn't exist.
# Statuses the stop endpoint returns when the run can't be stopped because
# it's already in a terminal-ish state: invalid stop on a finished run
# (400), already cancelled/conflicting (409), gone (404), expired (410).
# These are functionally "already stopped". Other client errors (401 auth,
# 403 forbidden, 408 timeout, 429 rate limit) are real failures that should
# surface, not be silently swallowed as a successful no-op.
ALREADY_STOPPED_STATUSES = [400, 404, 409, 410].freeze

# Cancel/stop a running workflow. Returns true if cancelled, false if it was
# already in a state that can't be stopped (see ALREADY_STOPPED_STATUSES).
def cancel_workflow(workflow_id, run_id: nil)
connection.patch(run_scoped_path(workflow_id, "stop", run_id))
true
rescue Faraday::ResourceNotFound, Faraday::ClientError => e
status = e.response_status if e.respond_to?(:response_status)
if [404, 410].include?(status)
log_info("#{workflow_label(workflow_id, run_id).capitalize} already stopped (#{status})")
false
else
raise
end
rescue Faraday::Error => e
handle_faraday_error("cancel #{workflow_label(workflow_id, run_id)}", e)
status = e.response_status if e.respond_to?(:response_status)
return handle_faraday_error("cancel #{workflow_label(workflow_id, run_id)}", e) unless
ALREADY_STOPPED_STATUSES.include?(status)

log_info("#{workflow_label(workflow_id, run_id).capitalize} already stopped (#{status})")
false
end

# Wait for workflow completion by polling status.
Expand All @@ -89,9 +95,7 @@ def wait_for_completion(workflow_id, poll_interval: nil, timeout: nil, run_id: n

loop do
elapsed = Time.now - start_time
if elapsed > timeout
raise TimeoutError, "Workflow #{workflow_id} timed out after #{timeout} seconds"
end
raise TimeoutError, "Workflow #{workflow_id} timed out after #{timeout} seconds" if elapsed > timeout

status_response = workflow_status(workflow_id, run_id: run_id)
raise WorkflowNotFoundError, "Workflow #{workflow_id} not found" unless status_response
Expand Down Expand Up @@ -168,9 +172,9 @@ def handle_faraday_error(action, error)
end

def log_info(message)
if defined?(::Rails)
::Rails.logger.info(message)
end
return unless defined?(::Rails)

::Rails.logger.info(message)
end
end
end
23 changes: 23 additions & 0 deletions test/test_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,29 @@ def test_cancel_workflow_with_run_id_hits_run_scoped_endpoint
assert_requested :patch, "http://test.local/workflow/wf_abc/runs/run_xyz/stop"
end

# The stop endpoint returns these when the run can't be stopped because it's
# already terminal/gone/expired — functionally "already stopped". Return
# false, don't raise. (Was failing on a 400 here.)
[400, 404, 409, 410].each do |status|
define_method("test_cancel_workflow_treats_#{status}_as_already_stopped") do
stub_request(:patch, "http://test.local/workflow/wf_abc/stop")
.to_return(status:, body: "", headers: { "Content-Type" => "application/json" })

assert_equal false, @client.cancel_workflow("wf_abc")
end
end

# Other client errors are real failures (auth, forbidden, rate limit) — they
# must surface as APIError, not be swallowed as a successful no-op.
[401, 403, 408, 429].each do |status|
define_method("test_cancel_workflow_raises_on_#{status}") do
stub_request(:patch, "http://test.local/workflow/wf_abc/stop")
.to_return(status:, body: "", headers: { "Content-Type" => "application/json" })

assert_raises(OutputWorkflows::APIError) { @client.cancel_workflow("wf_abc") }
end
end

# --- workflow_result -------------------------------------------------------

def test_workflow_result_without_run_id_hits_unpinned_endpoint
Expand Down