xref: /aosp_15_r20/external/grpc-grpc/src/ruby/lib/grpc/generic/client_stub.rb (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15require_relative 'active_call'
16require_relative '../version'
17
18# GRPC contains the General RPC module.
19module GRPC
20  # rubocop:disable Metrics/ParameterLists
21
22  # ClientStub represents a client connection to a gRPC server, and can be used
23  # to send requests.
24  class ClientStub
25    include Core::StatusCodes
26    include Core::TimeConsts
27
28    # Default timeout is infinity.
29    DEFAULT_TIMEOUT = INFINITE_FUTURE
30
31    # setup_channel is used by #initialize to constuct a channel from its
32    # arguments.
33    def self.setup_channel(alt_chan, host, creds, channel_args = {})
34      unless alt_chan.nil?
35        fail(TypeError, '!Channel') unless alt_chan.is_a?(Core::Channel)
36        return alt_chan
37      end
38      if channel_args['grpc.primary_user_agent'].nil?
39        channel_args['grpc.primary_user_agent'] = ''
40      else
41        channel_args['grpc.primary_user_agent'] += ' '
42      end
43      channel_args['grpc.primary_user_agent'] += "grpc-ruby/#{VERSION}"
44      unless creds.is_a?(Core::ChannelCredentials) ||
45             creds.is_a?(Core::XdsChannelCredentials) ||
46             creds.is_a?(Symbol)
47        fail(TypeError, 'creds is not a ChannelCredentials, XdsChannelCredentials, or Symbol')
48      end
49      Core::Channel.new(host, channel_args, creds)
50    end
51
52    # Allows users of the stub to modify the propagate mask.
53    #
54    # This is an advanced feature for use when making calls to another gRPC
55    # server whilst running in the handler of an existing one.
56    attr_writer :propagate_mask
57
58    # Creates a new ClientStub.
59    #
60    # Minimally, a stub is created with the just the host of the gRPC service
61    # it wishes to access, e.g.,
62    #
63    #   my_stub = ClientStub.new(example.host.com:50505,
64    #                            :this_channel_is_insecure)
65    #
66    # If a channel_override argument is passed, it will be used as the
67    # underlying channel. Otherwise, the channel_args argument will be used
68    # to construct a new underlying channel.
69    #
70    # There are some specific keyword args that are not used to configure the
71    # channel:
72    #
73    # - :channel_override
74    # when present, this must be a pre-created GRPC::Core::Channel.  If it's
75    # present the host and arbitrary keyword arg areignored, and the RPC
76    # connection uses this channel.
77    #
78    # - :timeout
79    # when present, this is the default timeout used for calls
80    #
81    # @param host [String] the host the stub connects to
82    # @param creds [Core::ChannelCredentials|Symbol] the channel credentials, or
83    #     :this_channel_is_insecure, which explicitly indicates that the client
84    #     should be created with an insecure connection. Note: this argument is
85    #     ignored if the channel_override argument is provided.
86    # @param channel_override [Core::Channel] a pre-created channel
87    # @param timeout [Number] the default timeout to use in requests
88    # @param propagate_mask [Number] A bitwise combination of flags in
89    #     GRPC::Core::PropagateMasks. Indicates how data should be propagated
90    #     from parent server calls to child client calls if this client is being
91    #     used within a gRPC server.
92    # @param channel_args [Hash] the channel arguments. Note: this argument is
93    #     ignored if the channel_override argument is provided.
94    # @param interceptors [Array<GRPC::ClientInterceptor>] An array of
95    #     GRPC::ClientInterceptor objects that will be used for
96    #     intercepting calls before they are executed
97    #     Interceptors are an EXPERIMENTAL API.
98    def initialize(host, creds,
99                   channel_override: nil,
100                   timeout: nil,
101                   propagate_mask: nil,
102                   channel_args: {},
103                   interceptors: [])
104      @ch = ClientStub.setup_channel(channel_override, host, creds,
105                                     channel_args.dup)
106      alt_host = channel_args[Core::Channel::SSL_TARGET]
107      @host = alt_host.nil? ? host : alt_host
108      @propagate_mask = propagate_mask
109      @timeout = timeout.nil? ? DEFAULT_TIMEOUT : timeout
110      @interceptors = InterceptorRegistry.new(interceptors)
111    end
112
113    # request_response sends a request to a GRPC server, and returns the
114    # response.
115    #
116    # == Flow Control ==
117    # This is a blocking call.
118    #
119    # * it does not return until a response is received.
120    #
121    # * the requests is sent only when GRPC core's flow control allows it to
122    #   be sent.
123    #
124    # == Errors ==
125    # An RuntimeError is raised if
126    #
127    # * the server responds with a non-OK status
128    #
129    # * the deadline is exceeded
130    #
131    # == Return Value ==
132    #
133    # If return_op is false, the call returns the response
134    #
135    # If return_op is true, the call returns an Operation, calling execute
136    # on the Operation returns the response.
137    #
138    # @param method [String] the RPC method to call on the GRPC server
139    # @param req [Object] the request sent to the server
140    # @param marshal [Function] f(obj)->string that marshals requests
141    # @param unmarshal [Function] f(string)->obj that unmarshals responses
142    # @param deadline [Time] (optional) the time the request should complete
143    # @param return_op [true|false] return an Operation if true
144    # @param parent [Core::Call] a prior call whose reserved metadata
145    #   will be propagated by this one.
146    # @param credentials [Core::CallCredentials] credentials to use when making
147    #   the call
148    # @param metadata [Hash] metadata to be sent to the server
149    # @return [Object] the response received from the server
150    def request_response(method, req, marshal, unmarshal,
151                         deadline: nil,
152                         return_op: false,
153                         parent: nil,
154                         credentials: nil,
155                         metadata: {})
156      c = new_active_call(method, marshal, unmarshal,
157                          deadline: deadline,
158                          parent: parent,
159                          credentials: credentials)
160      interception_context = @interceptors.build_context
161      intercept_args = {
162        method: method,
163        request: req,
164        call: c.interceptable,
165        metadata: metadata
166      }
167      if return_op
168        # return the operation view of the active_call; define #execute as a
169        # new method for this instance that invokes #request_response.
170        c.merge_metadata_to_send(metadata)
171        op = c.operation
172        op.define_singleton_method(:execute) do
173          interception_context.intercept!(:request_response, intercept_args) do
174            c.request_response(req, metadata: metadata)
175          end
176        end
177        op
178      else
179        interception_context.intercept!(:request_response, intercept_args) do
180          c.request_response(req, metadata: metadata)
181        end
182      end
183    end
184
185    # client_streamer sends a stream of requests to a GRPC server, and
186    # returns a single response.
187    #
188    # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
189    # #each enumeration protocol. In the simplest case, requests will be an
190    # array of marshallable objects; in typical case it will be an Enumerable
191    # that allows dynamic construction of the marshallable objects.
192    #
193    # == Flow Control ==
194    # This is a blocking call.
195    #
196    # * it does not return until a response is received.
197    #
198    # * each requests is sent only when GRPC core's flow control allows it to
199    #   be sent.
200    #
201    # == Errors ==
202    # An RuntimeError is raised if
203    #
204    # * the server responds with a non-OK status
205    #
206    # * the deadline is exceeded
207    #
208    # == Return Value ==
209    #
210    # If return_op is false, the call consumes the requests and returns
211    # the response.
212    #
213    # If return_op is true, the call returns the response.
214    #
215    # @param method [String] the RPC method to call on the GRPC server
216    # @param requests [Object] an Enumerable of requests to send
217    # @param marshal [Function] f(obj)->string that marshals requests
218    # @param unmarshal [Function] f(string)->obj that unmarshals responses
219    # @param deadline [Time] (optional) the time the request should complete
220    # @param return_op [true|false] return an Operation if true
221    # @param parent [Core::Call] a prior call whose reserved metadata
222    #   will be propagated by this one.
223    # @param credentials [Core::CallCredentials] credentials to use when making
224    #   the call
225    # @param metadata [Hash] metadata to be sent to the server
226    # @return [Object|Operation] the response received from the server
227    def client_streamer(method, requests, marshal, unmarshal,
228                        deadline: nil,
229                        return_op: false,
230                        parent: nil,
231                        credentials: nil,
232                        metadata: {})
233      c = new_active_call(method, marshal, unmarshal,
234                          deadline: deadline,
235                          parent: parent,
236                          credentials: credentials)
237      interception_context = @interceptors.build_context
238      intercept_args = {
239        method: method,
240        requests: requests,
241        call: c.interceptable,
242        metadata: metadata
243      }
244      if return_op
245        # return the operation view of the active_call; define #execute as a
246        # new method for this instance that invokes #client_streamer.
247        c.merge_metadata_to_send(metadata)
248        op = c.operation
249        op.define_singleton_method(:execute) do
250          interception_context.intercept!(:client_streamer, intercept_args) do
251            c.client_streamer(requests)
252          end
253        end
254        op
255      else
256        interception_context.intercept!(:client_streamer, intercept_args) do
257          c.client_streamer(requests, metadata: metadata)
258        end
259      end
260    end
261
262    # server_streamer sends one request to the GRPC server, which yields a
263    # stream of responses.
264    #
265    # responses provides an enumerator over the streamed responses, i.e. it
266    # follows Ruby's #each iteration protocol.  The enumerator blocks while
267    # waiting for each response, stops when the server signals that no
268    # further responses will be supplied.  If the implicit block is provided,
269    # it is executed with each response as the argument and no result is
270    # returned.
271    #
272    # == Flow Control ==
273    # This is a blocking call.
274    #
275    # * the request is sent only when GRPC core's flow control allows it to
276    #   be sent.
277    #
278    # * the request will not complete until the server sends the final
279    #   response followed by a status message.
280    #
281    # == Errors ==
282    # An RuntimeError is raised if
283    #
284    # * the server responds with a non-OK status when any response is
285    # * retrieved
286    #
287    # * the deadline is exceeded
288    #
289    # == Return Value ==
290    #
291    # if the return_op is false, the return value is an Enumerator of the
292    # results, unless a block is provided, in which case the block is
293    # executed with each response.
294    #
295    # if return_op is true, the function returns an Operation whose #execute
296    # method runs server streamer call. Again, Operation#execute either
297    # calls the given block with each response or returns an Enumerator of the
298    # responses.
299    #
300    # == Keyword Args ==
301    #
302    # Unspecified keyword arguments are treated as metadata to be sent to the
303    # server.
304    #
305    # @param method [String] the RPC method to call on the GRPC server
306    # @param req [Object] the request sent to the server
307    # @param marshal [Function] f(obj)->string that marshals requests
308    # @param unmarshal [Function] f(string)->obj that unmarshals responses
309    # @param deadline [Time] (optional) the time the request should complete
310    # @param return_op [true|false]return an Operation if true
311    # @param parent [Core::Call] a prior call whose reserved metadata
312    #   will be propagated by this one.
313    # @param credentials [Core::CallCredentials] credentials to use when making
314    #   the call
315    # @param metadata [Hash] metadata to be sent to the server
316    # @param blk [Block] when provided, is executed for each response
317    # @return [Enumerator|Operation|nil] as discussed above
318    def server_streamer(method, req, marshal, unmarshal,
319                        deadline: nil,
320                        return_op: false,
321                        parent: nil,
322                        credentials: nil,
323                        metadata: {},
324                        &blk)
325      c = new_active_call(method, marshal, unmarshal,
326                          deadline: deadline,
327                          parent: parent,
328                          credentials: credentials)
329      interception_context = @interceptors.build_context
330      intercept_args = {
331        method: method,
332        request: req,
333        call: c.interceptable,
334        metadata: metadata
335      }
336      if return_op
337        # return the operation view of the active_call; define #execute
338        # as a new method for this instance that invokes #server_streamer
339        c.merge_metadata_to_send(metadata)
340        op = c.operation
341        op.define_singleton_method(:execute) do
342          interception_context.intercept!(:server_streamer, intercept_args) do
343            c.server_streamer(req, &blk)
344          end
345        end
346        op
347      else
348        interception_context.intercept!(:server_streamer, intercept_args) do
349          c.server_streamer(req, metadata: metadata, &blk)
350        end
351      end
352    end
353
354    # bidi_streamer sends a stream of requests to the GRPC server, and yields
355    # a stream of responses.
356    #
357    # This method takes an Enumerable of requests, and returns and enumerable
358    # of responses.
359    #
360    # == requests ==
361    #
362    # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
363    # #each enumeration protocol. In the simplest case, requests will be an
364    # array of marshallable objects; in typical case it will be an
365    # Enumerable that allows dynamic construction of the marshallable
366    # objects.
367    #
368    # == responses ==
369    #
370    # This is an enumerator of responses.  I.e, its #next method blocks
371    # waiting for the next response.  Also, if at any point the block needs
372    # to consume all the remaining responses, this can be done using #each or
373    # #collect.  Calling #each or #collect should only be done if
374    # the_call#writes_done has been called, otherwise the block will loop
375    # forever.
376    #
377    # == Flow Control ==
378    # This is a blocking call.
379    #
380    # * the call completes when the next call to provided block returns
381    #   false
382    #
383    # * the execution block parameters are two objects for sending and
384    #   receiving responses, each of which blocks waiting for flow control.
385    #   E.g, calles to bidi_call#remote_send will wait until flow control
386    #   allows another write before returning; and obviously calls to
387    #   responses#next block until the next response is available.
388    #
389    # == Termination ==
390    #
391    # As well as sending and receiving messages, the block passed to the
392    # function is also responsible for:
393    #
394    # * calling bidi_call#writes_done to indicate no further reqs will be
395    #   sent.
396    #
397    # * returning false if once the bidi stream is functionally completed.
398    #
399    # Note that response#next will indicate that there are no further
400    # responses by throwing StopIteration, but can only happen either
401    # if bidi_call#writes_done is called.
402    #
403    # To properly terminate the RPC, the responses should be completely iterated
404    # through; one way to do this is to loop on responses#next until no further
405    # responses are available.
406    #
407    # == Errors ==
408    # An RuntimeError is raised if
409    #
410    # * the server responds with a non-OK status when any response is
411    # * retrieved
412    #
413    # * the deadline is exceeded
414    #
415    #
416    # == Return Value ==
417    #
418    # if the return_op is false, the return value is an Enumerator of the
419    # results, unless a block is provided, in which case the block is
420    # executed with each response.
421    #
422    # if return_op is true, the function returns an Operation whose #execute
423    # method runs the Bidi call. Again, Operation#execute either calls a
424    # given block with each response or returns an Enumerator of the
425    # responses.
426    #
427    # @param method [String] the RPC method to call on the GRPC server
428    # @param requests [Object] an Enumerable of requests to send
429    # @param marshal [Function] f(obj)->string that marshals requests
430    # @param unmarshal [Function] f(string)->obj that unmarshals responses
431    # @param deadline [Time] (optional) the time the request should complete
432    # @param return_op [true|false] return an Operation if true
433    # @param parent [Core::Call] a prior call whose reserved metadata
434    #   will be propagated by this one.
435    # @param credentials [Core::CallCredentials] credentials to use when making
436    #   the call
437    # @param metadata [Hash] metadata to be sent to the server
438    # @param blk [Block] when provided, is executed for each response
439    # @return [Enumerator|nil|Operation] as discussed above
440    def bidi_streamer(method, requests, marshal, unmarshal,
441                      deadline: nil,
442                      return_op: false,
443                      parent: nil,
444                      credentials: nil,
445                      metadata: {},
446                      &blk)
447      c = new_active_call(method, marshal, unmarshal,
448                          deadline: deadline,
449                          parent: parent,
450                          credentials: credentials)
451      interception_context = @interceptors.build_context
452      intercept_args = {
453        method: method,
454        requests: requests,
455        call: c.interceptable,
456        metadata: metadata
457      }
458      if return_op
459        # return the operation view of the active_call; define #execute
460        # as a new method for this instance that invokes #bidi_streamer
461        c.merge_metadata_to_send(metadata)
462        op = c.operation
463        op.define_singleton_method(:execute) do
464          interception_context.intercept!(:bidi_streamer, intercept_args) do
465            c.bidi_streamer(requests, &blk)
466          end
467        end
468        op
469      else
470        interception_context.intercept!(:bidi_streamer, intercept_args) do
471          c.bidi_streamer(requests, metadata: metadata, &blk)
472        end
473      end
474    end
475
476    private
477
478    # Creates a new active stub
479    #
480    # @param method [string] the method being called.
481    # @param marshal [Function] f(obj)->string that marshals requests
482    # @param unmarshal [Function] f(string)->obj that unmarshals responses
483    # @param parent [Grpc::Call] a parent call, available when calls are
484    #   made from server
485    # @param credentials [Core::CallCredentials] credentials to use when making
486    #   the call
487    def new_active_call(method, marshal, unmarshal,
488                        deadline: nil,
489                        parent: nil,
490                        credentials: nil)
491      deadline = from_relative_time(@timeout) if deadline.nil?
492      # Provide each new client call with its own completion queue
493      call = @ch.create_call(parent, # parent call
494                             @propagate_mask, # propagation options
495                             method,
496                             nil, # host use nil,
497                             deadline)
498      call.set_credentials! credentials unless credentials.nil?
499      ActiveCall.new(call, marshal, unmarshal, deadline,
500                     started: false)
501    end
502  end
503end
504