Skip to content

Commit 2b01d29

Browse files
committed
Fix handling of timeouts/deadlines.
1 parent 5a64350 commit 2b01d29

4 files changed

Lines changed: 48 additions & 7 deletions

File tree

fixtures/async/grpc/test_interface.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class TestInterface < Protocol::GRPC::Interface
2222
response_class: Protocol::GRPC::Fixtures::TestMessage, streaming: :bidirectional
2323
rpc :ClientStreamingCall, request_class: Protocol::GRPC::Fixtures::TestMessage,
2424
response_class: Protocol::GRPC::Fixtures::TestMessage, streaming: :client_streaming
25+
rpc :SlowCall
2526
end
2627

2728
# Test service implementation
@@ -53,7 +54,6 @@ def bidirectional_call(input, output, _call)
5354
response = Protocol::GRPC::Fixtures::TestMessage.new(value: "Echo: #{request.value}")
5455
output.write(response)
5556
end
56-
puts "Closing write"
5757
output.close_write
5858
end
5959

@@ -66,6 +66,13 @@ def client_streaming_call(input, output, _call)
6666
response = Protocol::GRPC::Fixtures::TestMessage.new(value: "Received: #{values.join(', ')}")
6767
output.write(response)
6868
end
69+
70+
def slow_call(input, output, _call)
71+
request = input.read
72+
sleep 1 # Simulate a slow operation
73+
response = Protocol::GRPC::Fixtures::TestMessage.new(value: "Slow response: #{request.value}")
74+
output.write(response)
75+
end
6976
end
7077
end
7178
end

lib/async/grpc/dispatcher.rb

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,24 +56,31 @@ def invoke_service(service, handler_method, input, output, call)
5656
end
5757

5858
# Mark trailers and add status (if not already set by handler):
59-
if call.response&.headers
60-
call.response.headers.trailer!
61-
59+
if headers = call.response&.headers
6260
# Only add OK status if grpc-status hasn't been set by the handler:
63-
unless call.response.headers["grpc-status"]
64-
Protocol::GRPC::Metadata.add_status!(call.response.headers, status: Protocol::GRPC::Status::OK)
61+
unless headers["grpc-status"]
62+
Protocol::GRPC::Metadata.add_status!(headers, status: Protocol::GRPC::Status::OK)
6563
end
6664
end
6765
end
6866

6967
def dispatch_to_service(service, handler_method, input, output, call, deadline, parent: Async::Task.current)
7068
if deadline
71-
parent.with_timeout(deadline) do
69+
parent.with_timeout(deadline.remaining) do
7270
invoke_service(service, handler_method, input, output, call)
7371
end
7472
else
7573
invoke_service(service, handler_method, input, output, call)
7674
end
75+
rescue Async::TimeoutError
76+
# Close input and output streams:
77+
input.close
78+
output.close_write unless output.closed?
79+
80+
# Set DEADLINE_EXCEEDED status in trailers:
81+
if headers = call.response&.headers
82+
Protocol::GRPC::Metadata.add_status!(headers, status: Protocol::GRPC::Status::DEADLINE_EXCEEDED, message: "Deadline exceeded")
83+
end
7784
end
7885

7986
# Dispatch the request to the appropriate service.

releases.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Releases
22

3+
## Unreleased
4+
5+
- Fix handling of timeouts/deadlines.
6+
37
## v0.4.0
48

59
- Fix handling of trailers.

test/async/grpc/dispatcher.rb

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
# Released under the MIT License.
44
# Copyright, 2025-2026, by Samuel Williams.
55

6+
require "sus/fixtures/async/scheduler_context"
7+
68
require "async/grpc/dispatcher"
79
require "async/grpc/service"
810
require "protocol/http"
@@ -102,5 +104,26 @@
102104

103105
expect(response.status).to be == 404
104106
end
107+
108+
it "handles timeout correctly" do
109+
path = Protocol::GRPC::Methods.build_path(service_name, "SlowCall")
110+
request = Protocol::HTTP::Request.new("http", "localhost", "POST", path, nil, headers, request_body)
111+
request.headers["grpc-timeout"] = "100m" # 100 milliseconds
112+
113+
response = dispatcher.call(request)
114+
115+
expect(response.status).to be == 200
116+
117+
# The response body should be consumed to access trailers:
118+
response_body = Protocol::GRPC::Body::ReadableBody.wrap(response, message_class: Protocol::GRPC::Fixtures::TestMessage)
119+
response_body.finish
120+
121+
# Check that grpc-status is DEADLINE_EXCEEDED (4):
122+
status = Protocol::GRPC::Metadata.extract_status(response.headers)
123+
expect(status).to be == Protocol::GRPC::Status::DEADLINE_EXCEEDED
124+
125+
message = Protocol::GRPC::Metadata.extract_message(response.headers)
126+
expect(message).to be == "Deadline exceeded"
127+
end
105128
end
106129
end

0 commit comments

Comments
 (0)