Skip to content

Commit 0e9a919

Browse files
Support initial bidirectional stream messages.
1 parent 104bf0b commit 0e9a919

3 files changed

Lines changed: 28 additions & 7 deletions

File tree

lib/async/grpc/client.rb

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,12 @@ def call(request)
113113
# @parameter metadata [Hash] Custom metadata headers
114114
# @parameter timeout [Numeric | Nil] Optional timeout in seconds
115115
# @parameter encoding [String | Nil] Optional compression encoding
116+
# @parameter initial [Object | Array | Nil] Optional initial message(s) for bidirectional streaming
116117
# @yields {|input, output| ...} Block for streaming calls
117118
# @returns [Object | Protocol::GRPC::Body::ReadableBody] Response message or readable body for streaming
118119
# @raises [ArgumentError] If method is unknown or streaming type is invalid
119120
# @raises [Protocol::GRPC::Error] If the gRPC call fails
120-
def invoke(service, method, request = nil, metadata: {}, timeout: nil, encoding: nil, &block)
121+
def invoke(service, method, request = nil, metadata: {}, timeout: nil, encoding: nil, initial: nil, &block)
121122
rpc = service.class.lookup_rpc(method)
122123
raise ArgumentError, "Unknown method: #{method}" unless rpc
123124

@@ -141,7 +142,7 @@ def invoke(service, method, request = nil, metadata: {}, timeout: nil, encoding:
141142
when :client_streaming
142143
client_streaming_call(path, headers, request_class, response_class, encoding, &block)
143144
when :bidirectional
144-
bidirectional_call(path, headers, request_class, response_class, encoding, &block)
145+
bidirectional_call(path, headers, request_class, response_class, encoding, initial: initial, &block)
145146
else
146147
raise ArgumentError, "Unknown streaming type: #{streaming}"
147148
end
@@ -273,14 +274,16 @@ def client_streaming_call(path, headers, request_class, response_class, encoding
273274
# @parameter request_class [Class] Request message class
274275
# @parameter response_class [Class] Response message class
275276
# @parameter encoding [String | Nil] Compression encoding
277+
# @parameter initial [Object | Array | Nil] Optional initial message(s) to write before waiting for the response
276278
# @yields {|input, output| ...} Block to handle bidirectional streaming
277279
# @returns [Protocol::GRPC::Body::ReadableBody] Readable body for streaming messages
278280
# @raises [Protocol::GRPC::Error] If the gRPC call fails
279-
def bidirectional_call(path, headers, request_class, response_class, encoding, &block)
281+
def bidirectional_call(path, headers, request_class, response_class, encoding, initial: nil, &block)
280282
body = Protocol::GRPC::Body::WritableBody.new(
281283
encoding: encoding,
282284
message_class: request_class
283285
)
286+
Array(initial).each{|message| body.write(message)}
284287

285288
http_request = Protocol::HTTP::Request["POST", path, headers, body]
286289
response = call(http_request)

lib/async/grpc/stub.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def initialize(client, interface)
3636
# Uses snake_case method names (Ruby convention).
3737
# @parameter method_name [Symbol] The method name to call (snake_case)
3838
# @parameter args [Array] Positional arguments (first is the request message)
39-
# @parameter options [Hash] Keyword arguments (metadata, timeout, encoding)
39+
# @parameter options [Hash] Keyword arguments (metadata, timeout, encoding, initial)
4040
# @yields {|input, output| ...} Block for streaming calls
4141
# @returns [Object | Protocol::GRPC::Body::ReadableBody] Response message or readable body
4242
# @raises [NoMethodError] If the method is not found
@@ -47,13 +47,14 @@ def method_missing(method_name, *args, **options, &block)
4747
# Extract request from args (first positional argument):
4848
request = args.first
4949

50-
# Extract metadata, timeout, encoding from options:
50+
# Extract metadata, timeout, encoding and initial messages from options:
5151
metadata = options.delete(:metadata) || {}
5252
timeout = options.delete(:timeout)
5353
encoding = options.delete(:encoding)
54+
initial = options.delete(:initial)
5455

5556
# Delegate to client.invoke with PascalCase method name (for interface lookup):
56-
@client.invoke(@interface, interface_method_name, request, metadata: metadata, timeout: timeout, encoding: encoding, &block)
57+
@client.invoke(@interface, interface_method_name, request, metadata: metadata, timeout: timeout, encoding: encoding, initial: initial, &block)
5758
else
5859
super
5960
end

test/async/grpc/client.rb

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,24 @@
109109
expect(received_messages[1].value).to be == "Echo: message2"
110110
expect(received_messages[2].value).to be == "Echo: message3"
111111
end
112-
112+
it "can send initial bidirectional streaming messages before receiving a response" do
113+
grpc_client = Async::GRPC::Client.new(client)
114+
stub = grpc_client.stub(Async::GRPC::Fixtures::TestInterface, service_name)
115+
initial_messages = [
116+
Protocol::GRPC::Fixtures::TestMessage.new(value: "initial1"),
117+
Protocol::GRPC::Fixtures::TestMessage.new(value: "initial2")
118+
]
119+
received_messages = []
120+
stub.bidirectional_call(initial: initial_messages) do |output, input|
121+
output.close_write
122+
input.each do |response|
123+
received_messages << response
124+
end
125+
end
126+
expect(received_messages.length).to be == 2
127+
expect(received_messages[0].value).to be == "Echo: initial1"
128+
expect(received_messages[1].value).to be == "Echo: initial2"
129+
end
113130
it "handles metadata" do
114131
grpc_client = Async::GRPC::Client.new(client)
115132
stub = grpc_client.stub(Async::GRPC::Fixtures::TestInterface, service_name)

0 commit comments

Comments
 (0)