1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15require_relative 'active_call' 16require_relative '../version' 17 18# GRPC contains the General RPC module. 19module GRPC 20 # rubocop:disable Metrics/ParameterLists 21 22 # ClientStub represents a client connection to a gRPC server, and can be used 23 # to send requests. 24 class ClientStub 25 include Core::StatusCodes 26 include Core::TimeConsts 27 28 # Default timeout is infinity. 29 DEFAULT_TIMEOUT = INFINITE_FUTURE 30 31 # setup_channel is used by #initialize to constuct a channel from its 32 # arguments. 33 def self.setup_channel(alt_chan, host, creds, channel_args = {}) 34 unless alt_chan.nil? 35 fail(TypeError, '!Channel') unless alt_chan.is_a?(Core::Channel) 36 return alt_chan 37 end 38 if channel_args['grpc.primary_user_agent'].nil? 39 channel_args['grpc.primary_user_agent'] = '' 40 else 41 channel_args['grpc.primary_user_agent'] += ' ' 42 end 43 channel_args['grpc.primary_user_agent'] += "grpc-ruby/#{VERSION}" 44 unless creds.is_a?(Core::ChannelCredentials) || 45 creds.is_a?(Core::XdsChannelCredentials) || 46 creds.is_a?(Symbol) 47 fail(TypeError, 'creds is not a ChannelCredentials, XdsChannelCredentials, or Symbol') 48 end 49 Core::Channel.new(host, channel_args, creds) 50 end 51 52 # Allows users of the stub to modify the propagate mask. 53 # 54 # This is an advanced feature for use when making calls to another gRPC 55 # server whilst running in the handler of an existing one. 56 attr_writer :propagate_mask 57 58 # Creates a new ClientStub. 59 # 60 # Minimally, a stub is created with the just the host of the gRPC service 61 # it wishes to access, e.g., 62 # 63 # my_stub = ClientStub.new(example.host.com:50505, 64 # :this_channel_is_insecure) 65 # 66 # If a channel_override argument is passed, it will be used as the 67 # underlying channel. Otherwise, the channel_args argument will be used 68 # to construct a new underlying channel. 69 # 70 # There are some specific keyword args that are not used to configure the 71 # channel: 72 # 73 # - :channel_override 74 # when present, this must be a pre-created GRPC::Core::Channel. If it's 75 # present the host and arbitrary keyword arg areignored, and the RPC 76 # connection uses this channel. 77 # 78 # - :timeout 79 # when present, this is the default timeout used for calls 80 # 81 # @param host [String] the host the stub connects to 82 # @param creds [Core::ChannelCredentials|Symbol] the channel credentials, or 83 # :this_channel_is_insecure, which explicitly indicates that the client 84 # should be created with an insecure connection. Note: this argument is 85 # ignored if the channel_override argument is provided. 86 # @param channel_override [Core::Channel] a pre-created channel 87 # @param timeout [Number] the default timeout to use in requests 88 # @param propagate_mask [Number] A bitwise combination of flags in 89 # GRPC::Core::PropagateMasks. Indicates how data should be propagated 90 # from parent server calls to child client calls if this client is being 91 # used within a gRPC server. 92 # @param channel_args [Hash] the channel arguments. Note: this argument is 93 # ignored if the channel_override argument is provided. 94 # @param interceptors [Array<GRPC::ClientInterceptor>] An array of 95 # GRPC::ClientInterceptor objects that will be used for 96 # intercepting calls before they are executed 97 # Interceptors are an EXPERIMENTAL API. 98 def initialize(host, creds, 99 channel_override: nil, 100 timeout: nil, 101 propagate_mask: nil, 102 channel_args: {}, 103 interceptors: []) 104 @ch = ClientStub.setup_channel(channel_override, host, creds, 105 channel_args.dup) 106 alt_host = channel_args[Core::Channel::SSL_TARGET] 107 @host = alt_host.nil? ? host : alt_host 108 @propagate_mask = propagate_mask 109 @timeout = timeout.nil? ? DEFAULT_TIMEOUT : timeout 110 @interceptors = InterceptorRegistry.new(interceptors) 111 end 112 113 # request_response sends a request to a GRPC server, and returns the 114 # response. 115 # 116 # == Flow Control == 117 # This is a blocking call. 118 # 119 # * it does not return until a response is received. 120 # 121 # * the requests is sent only when GRPC core's flow control allows it to 122 # be sent. 123 # 124 # == Errors == 125 # An RuntimeError is raised if 126 # 127 # * the server responds with a non-OK status 128 # 129 # * the deadline is exceeded 130 # 131 # == Return Value == 132 # 133 # If return_op is false, the call returns the response 134 # 135 # If return_op is true, the call returns an Operation, calling execute 136 # on the Operation returns the response. 137 # 138 # @param method [String] the RPC method to call on the GRPC server 139 # @param req [Object] the request sent to the server 140 # @param marshal [Function] f(obj)->string that marshals requests 141 # @param unmarshal [Function] f(string)->obj that unmarshals responses 142 # @param deadline [Time] (optional) the time the request should complete 143 # @param return_op [true|false] return an Operation if true 144 # @param parent [Core::Call] a prior call whose reserved metadata 145 # will be propagated by this one. 146 # @param credentials [Core::CallCredentials] credentials to use when making 147 # the call 148 # @param metadata [Hash] metadata to be sent to the server 149 # @return [Object] the response received from the server 150 def request_response(method, req, marshal, unmarshal, 151 deadline: nil, 152 return_op: false, 153 parent: nil, 154 credentials: nil, 155 metadata: {}) 156 c = new_active_call(method, marshal, unmarshal, 157 deadline: deadline, 158 parent: parent, 159 credentials: credentials) 160 interception_context = @interceptors.build_context 161 intercept_args = { 162 method: method, 163 request: req, 164 call: c.interceptable, 165 metadata: metadata 166 } 167 if return_op 168 # return the operation view of the active_call; define #execute as a 169 # new method for this instance that invokes #request_response. 170 c.merge_metadata_to_send(metadata) 171 op = c.operation 172 op.define_singleton_method(:execute) do 173 interception_context.intercept!(:request_response, intercept_args) do 174 c.request_response(req, metadata: metadata) 175 end 176 end 177 op 178 else 179 interception_context.intercept!(:request_response, intercept_args) do 180 c.request_response(req, metadata: metadata) 181 end 182 end 183 end 184 185 # client_streamer sends a stream of requests to a GRPC server, and 186 # returns a single response. 187 # 188 # requests provides an 'iterable' of Requests. I.e. it follows Ruby's 189 # #each enumeration protocol. In the simplest case, requests will be an 190 # array of marshallable objects; in typical case it will be an Enumerable 191 # that allows dynamic construction of the marshallable objects. 192 # 193 # == Flow Control == 194 # This is a blocking call. 195 # 196 # * it does not return until a response is received. 197 # 198 # * each requests is sent only when GRPC core's flow control allows it to 199 # be sent. 200 # 201 # == Errors == 202 # An RuntimeError is raised if 203 # 204 # * the server responds with a non-OK status 205 # 206 # * the deadline is exceeded 207 # 208 # == Return Value == 209 # 210 # If return_op is false, the call consumes the requests and returns 211 # the response. 212 # 213 # If return_op is true, the call returns the response. 214 # 215 # @param method [String] the RPC method to call on the GRPC server 216 # @param requests [Object] an Enumerable of requests to send 217 # @param marshal [Function] f(obj)->string that marshals requests 218 # @param unmarshal [Function] f(string)->obj that unmarshals responses 219 # @param deadline [Time] (optional) the time the request should complete 220 # @param return_op [true|false] return an Operation if true 221 # @param parent [Core::Call] a prior call whose reserved metadata 222 # will be propagated by this one. 223 # @param credentials [Core::CallCredentials] credentials to use when making 224 # the call 225 # @param metadata [Hash] metadata to be sent to the server 226 # @return [Object|Operation] the response received from the server 227 def client_streamer(method, requests, marshal, unmarshal, 228 deadline: nil, 229 return_op: false, 230 parent: nil, 231 credentials: nil, 232 metadata: {}) 233 c = new_active_call(method, marshal, unmarshal, 234 deadline: deadline, 235 parent: parent, 236 credentials: credentials) 237 interception_context = @interceptors.build_context 238 intercept_args = { 239 method: method, 240 requests: requests, 241 call: c.interceptable, 242 metadata: metadata 243 } 244 if return_op 245 # return the operation view of the active_call; define #execute as a 246 # new method for this instance that invokes #client_streamer. 247 c.merge_metadata_to_send(metadata) 248 op = c.operation 249 op.define_singleton_method(:execute) do 250 interception_context.intercept!(:client_streamer, intercept_args) do 251 c.client_streamer(requests) 252 end 253 end 254 op 255 else 256 interception_context.intercept!(:client_streamer, intercept_args) do 257 c.client_streamer(requests, metadata: metadata) 258 end 259 end 260 end 261 262 # server_streamer sends one request to the GRPC server, which yields a 263 # stream of responses. 264 # 265 # responses provides an enumerator over the streamed responses, i.e. it 266 # follows Ruby's #each iteration protocol. The enumerator blocks while 267 # waiting for each response, stops when the server signals that no 268 # further responses will be supplied. If the implicit block is provided, 269 # it is executed with each response as the argument and no result is 270 # returned. 271 # 272 # == Flow Control == 273 # This is a blocking call. 274 # 275 # * the request is sent only when GRPC core's flow control allows it to 276 # be sent. 277 # 278 # * the request will not complete until the server sends the final 279 # response followed by a status message. 280 # 281 # == Errors == 282 # An RuntimeError is raised if 283 # 284 # * the server responds with a non-OK status when any response is 285 # * retrieved 286 # 287 # * the deadline is exceeded 288 # 289 # == Return Value == 290 # 291 # if the return_op is false, the return value is an Enumerator of the 292 # results, unless a block is provided, in which case the block is 293 # executed with each response. 294 # 295 # if return_op is true, the function returns an Operation whose #execute 296 # method runs server streamer call. Again, Operation#execute either 297 # calls the given block with each response or returns an Enumerator of the 298 # responses. 299 # 300 # == Keyword Args == 301 # 302 # Unspecified keyword arguments are treated as metadata to be sent to the 303 # server. 304 # 305 # @param method [String] the RPC method to call on the GRPC server 306 # @param req [Object] the request sent to the server 307 # @param marshal [Function] f(obj)->string that marshals requests 308 # @param unmarshal [Function] f(string)->obj that unmarshals responses 309 # @param deadline [Time] (optional) the time the request should complete 310 # @param return_op [true|false]return an Operation if true 311 # @param parent [Core::Call] a prior call whose reserved metadata 312 # will be propagated by this one. 313 # @param credentials [Core::CallCredentials] credentials to use when making 314 # the call 315 # @param metadata [Hash] metadata to be sent to the server 316 # @param blk [Block] when provided, is executed for each response 317 # @return [Enumerator|Operation|nil] as discussed above 318 def server_streamer(method, req, marshal, unmarshal, 319 deadline: nil, 320 return_op: false, 321 parent: nil, 322 credentials: nil, 323 metadata: {}, 324 &blk) 325 c = new_active_call(method, marshal, unmarshal, 326 deadline: deadline, 327 parent: parent, 328 credentials: credentials) 329 interception_context = @interceptors.build_context 330 intercept_args = { 331 method: method, 332 request: req, 333 call: c.interceptable, 334 metadata: metadata 335 } 336 if return_op 337 # return the operation view of the active_call; define #execute 338 # as a new method for this instance that invokes #server_streamer 339 c.merge_metadata_to_send(metadata) 340 op = c.operation 341 op.define_singleton_method(:execute) do 342 interception_context.intercept!(:server_streamer, intercept_args) do 343 c.server_streamer(req, &blk) 344 end 345 end 346 op 347 else 348 interception_context.intercept!(:server_streamer, intercept_args) do 349 c.server_streamer(req, metadata: metadata, &blk) 350 end 351 end 352 end 353 354 # bidi_streamer sends a stream of requests to the GRPC server, and yields 355 # a stream of responses. 356 # 357 # This method takes an Enumerable of requests, and returns and enumerable 358 # of responses. 359 # 360 # == requests == 361 # 362 # requests provides an 'iterable' of Requests. I.e. it follows Ruby's 363 # #each enumeration protocol. In the simplest case, requests will be an 364 # array of marshallable objects; in typical case it will be an 365 # Enumerable that allows dynamic construction of the marshallable 366 # objects. 367 # 368 # == responses == 369 # 370 # This is an enumerator of responses. I.e, its #next method blocks 371 # waiting for the next response. Also, if at any point the block needs 372 # to consume all the remaining responses, this can be done using #each or 373 # #collect. Calling #each or #collect should only be done if 374 # the_call#writes_done has been called, otherwise the block will loop 375 # forever. 376 # 377 # == Flow Control == 378 # This is a blocking call. 379 # 380 # * the call completes when the next call to provided block returns 381 # false 382 # 383 # * the execution block parameters are two objects for sending and 384 # receiving responses, each of which blocks waiting for flow control. 385 # E.g, calles to bidi_call#remote_send will wait until flow control 386 # allows another write before returning; and obviously calls to 387 # responses#next block until the next response is available. 388 # 389 # == Termination == 390 # 391 # As well as sending and receiving messages, the block passed to the 392 # function is also responsible for: 393 # 394 # * calling bidi_call#writes_done to indicate no further reqs will be 395 # sent. 396 # 397 # * returning false if once the bidi stream is functionally completed. 398 # 399 # Note that response#next will indicate that there are no further 400 # responses by throwing StopIteration, but can only happen either 401 # if bidi_call#writes_done is called. 402 # 403 # To properly terminate the RPC, the responses should be completely iterated 404 # through; one way to do this is to loop on responses#next until no further 405 # responses are available. 406 # 407 # == Errors == 408 # An RuntimeError is raised if 409 # 410 # * the server responds with a non-OK status when any response is 411 # * retrieved 412 # 413 # * the deadline is exceeded 414 # 415 # 416 # == Return Value == 417 # 418 # if the return_op is false, the return value is an Enumerator of the 419 # results, unless a block is provided, in which case the block is 420 # executed with each response. 421 # 422 # if return_op is true, the function returns an Operation whose #execute 423 # method runs the Bidi call. Again, Operation#execute either calls a 424 # given block with each response or returns an Enumerator of the 425 # responses. 426 # 427 # @param method [String] the RPC method to call on the GRPC server 428 # @param requests [Object] an Enumerable of requests to send 429 # @param marshal [Function] f(obj)->string that marshals requests 430 # @param unmarshal [Function] f(string)->obj that unmarshals responses 431 # @param deadline [Time] (optional) the time the request should complete 432 # @param return_op [true|false] return an Operation if true 433 # @param parent [Core::Call] a prior call whose reserved metadata 434 # will be propagated by this one. 435 # @param credentials [Core::CallCredentials] credentials to use when making 436 # the call 437 # @param metadata [Hash] metadata to be sent to the server 438 # @param blk [Block] when provided, is executed for each response 439 # @return [Enumerator|nil|Operation] as discussed above 440 def bidi_streamer(method, requests, marshal, unmarshal, 441 deadline: nil, 442 return_op: false, 443 parent: nil, 444 credentials: nil, 445 metadata: {}, 446 &blk) 447 c = new_active_call(method, marshal, unmarshal, 448 deadline: deadline, 449 parent: parent, 450 credentials: credentials) 451 interception_context = @interceptors.build_context 452 intercept_args = { 453 method: method, 454 requests: requests, 455 call: c.interceptable, 456 metadata: metadata 457 } 458 if return_op 459 # return the operation view of the active_call; define #execute 460 # as a new method for this instance that invokes #bidi_streamer 461 c.merge_metadata_to_send(metadata) 462 op = c.operation 463 op.define_singleton_method(:execute) do 464 interception_context.intercept!(:bidi_streamer, intercept_args) do 465 c.bidi_streamer(requests, &blk) 466 end 467 end 468 op 469 else 470 interception_context.intercept!(:bidi_streamer, intercept_args) do 471 c.bidi_streamer(requests, metadata: metadata, &blk) 472 end 473 end 474 end 475 476 private 477 478 # Creates a new active stub 479 # 480 # @param method [string] the method being called. 481 # @param marshal [Function] f(obj)->string that marshals requests 482 # @param unmarshal [Function] f(string)->obj that unmarshals responses 483 # @param parent [Grpc::Call] a parent call, available when calls are 484 # made from server 485 # @param credentials [Core::CallCredentials] credentials to use when making 486 # the call 487 def new_active_call(method, marshal, unmarshal, 488 deadline: nil, 489 parent: nil, 490 credentials: nil) 491 deadline = from_relative_time(@timeout) if deadline.nil? 492 # Provide each new client call with its own completion queue 493 call = @ch.create_call(parent, # parent call 494 @propagate_mask, # propagation options 495 method, 496 nil, # host use nil, 497 deadline) 498 call.set_credentials! credentials unless credentials.nil? 499 ActiveCall.new(call, marshal, unmarshal, deadline, 500 started: false) 501 end 502 end 503end 504