xref: /aosp_15_r20/external/grpc-grpc/src/ruby/spec/generic/client_stub_spec.rb (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15require 'spec_helper'
16
17Thread.abort_on_exception = true
18
19def wakey_thread(&blk)
20  n = GRPC::Notifier.new
21  t = Thread.new do
22    blk.call(n)
23  end
24  t.abort_on_exception = true
25  n.wait
26  t
27end
28
29def load_test_certs
30  test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')
31  files = ['ca.pem', 'server1.key', 'server1.pem']
32  files.map { |f| File.open(File.join(test_root, f)).read }
33end
34
35include GRPC::Core::StatusCodes
36include GRPC::Core::TimeConsts
37include GRPC::Core::CallOps
38
39# check that methods on a finished/closed call t crash
40def check_op_view_of_finished_client_call(op_view,
41                                          expected_metadata,
42                                          expected_trailing_metadata)
43  # use read_response_stream to try to iterate through
44  # possible response stream
45  fail('need something to attempt reads') unless block_given?
46  expect do
47    resp = op_view.execute
48    yield resp
49  end.to raise_error(GRPC::Core::CallError)
50
51  expect { op_view.start_call }.to raise_error(RuntimeError)
52
53  sanity_check_values_of_accessors(op_view,
54                                   expected_metadata,
55                                   expected_trailing_metadata)
56
57  expect do
58    op_view.wait
59    op_view.cancel
60    op_view.write_flag = 1
61  end.to_not raise_error
62end
63
64def sanity_check_values_of_accessors(op_view,
65                                     expected_metadata,
66                                     expected_trailing_metadata)
67  expected_status = Struct::Status.new
68  expected_status.code = 0
69  expected_status.details = 'OK'
70  expected_status.metadata = expected_trailing_metadata
71
72  expect(op_view.status).to eq(expected_status)
73  expect(op_view.metadata).to eq(expected_metadata)
74  expect(op_view.trailing_metadata).to eq(expected_trailing_metadata)
75
76  expect(op_view.cancelled?).to be(false)
77  expect(op_view.write_flag).to be(nil)
78
79  # The deadline attribute of a call can be either
80  # a GRPC::Core::TimeSpec or a Time, which are mutually exclusive.
81  # TODO: fix so that the accessor always returns the same type.
82  expect(op_view.deadline.is_a?(GRPC::Core::TimeSpec) ||
83         op_view.deadline.is_a?(Time)).to be(true)
84end
85
86def close_active_server_call(active_server_call)
87  active_server_call.send(:set_input_stream_done)
88  active_server_call.send(:set_output_stream_done)
89end
90
91describe 'ClientStub' do  # rubocop:disable Metrics/BlockLength
92  let(:noop) { proc { |x| x } }
93
94  before(:each) do
95    Thread.abort_on_exception = true
96    @server = nil
97    @method = 'an_rpc_method'
98    @pass = OK
99    @fail = INTERNAL
100    @metadata = { k1: 'v1', k2: 'v2' }
101  end
102
103  after(:each) do
104    unless @server.nil?
105      @server.shutdown_and_notify(from_relative_time(2))
106      @server.close
107    end
108  end
109
110  describe '#new' do
111    let(:fake_host) { 'localhost:0' }
112    it 'can be created from a host and args' do
113      opts = { channel_args: { a_channel_arg: 'an_arg' } }
114      blk = proc do
115        GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
116      end
117      expect(&blk).not_to raise_error
118    end
119
120    it 'can be created with an channel override' do
121      opts = {
122        channel_args: { a_channel_arg: 'an_arg' },
123        channel_override: @ch
124      }
125      blk = proc do
126        GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
127      end
128      expect(&blk).not_to raise_error
129    end
130
131    it 'cannot be created with a bad channel override' do
132      blk = proc do
133        opts = {
134          channel_args: { a_channel_arg: 'an_arg' },
135          channel_override: Object.new
136        }
137        GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
138      end
139      expect(&blk).to raise_error
140    end
141
142    it 'cannot be created with bad credentials' do
143      blk = proc do
144        opts = { channel_args: { a_channel_arg: 'an_arg' } }
145        GRPC::ClientStub.new(fake_host, Object.new, **opts)
146      end
147      expect(&blk).to raise_error
148    end
149
150    it 'can be created with test test credentials' do
151      certs = load_test_certs
152      blk = proc do
153        opts = {
154          channel_args: {
155            GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr',
156            a_channel_arg: 'an_arg'
157          }
158        }
159        creds = GRPC::Core::ChannelCredentials.new(certs[0], nil, nil)
160        GRPC::ClientStub.new(fake_host, creds,  **opts)
161      end
162      expect(&blk).to_not raise_error
163    end
164  end
165
166  describe '#request_response', request_response: true do
167    before(:each) do
168      @sent_msg, @resp = 'a_msg', 'a_reply'
169    end
170
171    shared_examples 'request response' do
172      it 'should send a request to/receive a reply from a server' do
173        server_port = create_test_server
174        th = run_request_response(@sent_msg, @resp, @pass)
175        stub = GRPC::ClientStub.new("localhost:#{server_port}",
176                                    :this_channel_is_insecure)
177        expect(get_response(stub)).to eq(@resp)
178        th.join
179      end
180
181      def metadata_test(md)
182        server_port = create_test_server
183        host = "localhost:#{server_port}"
184        th = run_request_response(@sent_msg, @resp, @pass,
185                                  expected_metadata: md)
186        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
187        @metadata = md
188        expect(get_response(stub)).to eq(@resp)
189        th.join
190      end
191
192      it 'should send metadata to the server ok' do
193        metadata_test(k1: 'v1', k2: 'v2')
194      end
195
196      # these tests mostly try to exercise when md might be allocated
197      # instead of inlined
198      it 'should send metadata with multiple large md to the server ok' do
199        val_array = %w(
200          '00000000000000000000000000000000000000000000000000000000000000',
201          '11111111111111111111111111111111111111111111111111111111111111',
202          '22222222222222222222222222222222222222222222222222222222222222',
203        )
204        md = {
205          k1: val_array,
206          k2: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',
207          k3: 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb',
208          k4: 'cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc',
209          keeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeey5: 'v5',
210          'k66666666666666666666666666666666666666666666666666666' => 'v6',
211          'k77777777777777777777777777777777777777777777777777777' => 'v7',
212          'k88888888888888888888888888888888888888888888888888888' => 'v8'
213        }
214        metadata_test(md)
215      end
216
217      it 'should send a request when configured using an override channel' do
218        server_port = create_test_server
219        alt_host = "localhost:#{server_port}"
220        th = run_request_response(@sent_msg, @resp, @pass)
221        ch = GRPC::Core::Channel.new(alt_host, nil, :this_channel_is_insecure)
222        stub = GRPC::ClientStub.new('ignored-host',
223                                    :this_channel_is_insecure,
224                                    channel_override: ch)
225        expect(get_response(stub)).to eq(@resp)
226        th.join
227      end
228
229      it 'should raise an error if the status is not OK' do
230        server_port = create_test_server
231        host = "localhost:#{server_port}"
232        th = run_request_response(@sent_msg, @resp, @fail)
233        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
234        blk = proc { get_response(stub) }
235        expect(&blk).to raise_error(GRPC::BadStatus)
236        th.join
237      end
238
239      it 'should receive UNAVAILABLE if call credentials plugin fails' do
240        server_port = create_secure_test_server
241        server_started_notifier = GRPC::Notifier.new
242        th = Thread.new do
243          @server.start
244          server_started_notifier.notify(nil)
245          # Poll on the server so that the client connection can proceed.
246          # We don't expect the server to actually accept a call though.
247          expect { @server.request_call }.to raise_error(GRPC::Core::CallError)
248        end
249        server_started_notifier.wait
250
251        certs = load_test_certs
252        secure_channel_creds = GRPC::Core::ChannelCredentials.new(
253          certs[0], nil, nil)
254        secure_stub_opts = {
255          channel_args: {
256            GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr'
257          }
258        }
259        stub = GRPC::ClientStub.new("localhost:#{server_port}",
260                                    secure_channel_creds, **secure_stub_opts)
261
262        error_message = 'Failing call credentials callback'
263        failing_auth = proc do
264          fail error_message
265        end
266        creds = GRPC::Core::CallCredentials.new(failing_auth)
267
268        unavailable_error_occurred = false
269        begin
270          get_response(stub, credentials: creds)
271        rescue GRPC::Unavailable => e
272          unavailable_error_occurred = true
273          expect(e.details.include?(error_message)).to be true
274        end
275        expect(unavailable_error_occurred).to eq(true)
276
277        @server.shutdown_and_notify(Time.now + 3)
278        th.join
279        @server.close
280      end
281
282      it 'should raise ArgumentError if metadata contains invalid values' do
283        @metadata.merge!(k3: 3)
284        server_port = create_test_server
285        host = "localhost:#{server_port}"
286        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
287        expect do
288          get_response(stub)
289        end.to raise_error(ArgumentError,
290                           /Header values must be of type string or array/)
291      end
292    end
293
294    describe 'without a call operation' do
295      def get_response(stub, credentials: nil)
296        GRPC.logger.info(credentials.inspect)
297        stub.request_response(@method, @sent_msg, noop, noop,
298                              metadata: @metadata,
299                              credentials: credentials)
300      end
301
302      it_behaves_like 'request response'
303    end
304
305    describe 'via a call operation' do
306      after(:each) do
307        # make sure op.wait doesn't freeze, even if there's a bad status
308        @op.wait
309      end
310      def get_response(stub, run_start_call_first: false, credentials: nil)
311        @op = stub.request_response(@method, @sent_msg, noop, noop,
312                                    return_op: true,
313                                    metadata: @metadata,
314                                    deadline: from_relative_time(2),
315                                    credentials: credentials)
316        expect(@op).to be_a(GRPC::ActiveCall::Operation)
317        @op.start_call if run_start_call_first
318        result = @op.execute
319        result
320      end
321
322      it_behaves_like 'request response'
323
324      def run_op_view_metadata_test(run_start_call_first)
325        server_port = create_test_server
326        host = "localhost:#{server_port}"
327
328        @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
329        @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
330        th = run_request_response(
331          @sent_msg, @resp, @pass,
332          expected_metadata: @metadata,
333          server_initial_md: @server_initial_md,
334          server_trailing_md: @server_trailing_md)
335        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
336        expect(
337          get_response(stub,
338                       run_start_call_first: run_start_call_first)).to eq(@resp)
339        th.join
340      end
341
342      it 'sends metadata to the server ok when running start_call first' do
343        run_op_view_metadata_test(true)
344        check_op_view_of_finished_client_call(
345          @op, @server_initial_md, @server_trailing_md
346        ) { |r| GRPC.logger.info(r) }
347      end
348
349      it 'does not crash when used after the call has been finished' do
350        run_op_view_metadata_test(false)
351        check_op_view_of_finished_client_call(
352          @op, @server_initial_md, @server_trailing_md
353        ) { |r| GRPC.logger.info(r) }
354      end
355    end
356  end
357
358  describe '#client_streamer', client_streamer: true do
359    before(:each) do
360      Thread.abort_on_exception = true
361      server_port = create_test_server
362      host = "localhost:#{server_port}"
363      @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
364      @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
365      @resp = 'a_reply'
366    end
367
368    shared_examples 'client streaming' do
369      it 'should send requests to/receive a reply from a server' do
370        th = run_client_streamer(@sent_msgs, @resp, @pass)
371        expect(get_response(@stub)).to eq(@resp)
372        th.join
373      end
374
375      it 'should send metadata to the server ok' do
376        th = run_client_streamer(@sent_msgs, @resp, @pass,
377                                 expected_metadata: @metadata)
378        expect(get_response(@stub)).to eq(@resp)
379        th.join
380      end
381
382      it 'should raise an error if the status is not ok' do
383        th = run_client_streamer(@sent_msgs, @resp, @fail)
384        blk = proc { get_response(@stub) }
385        expect(&blk).to raise_error(GRPC::BadStatus)
386        th.join
387      end
388
389      it 'should raise ArgumentError if metadata contains invalid values' do
390        @metadata.merge!(k3: 3)
391        expect do
392          get_response(@stub)
393        end.to raise_error(ArgumentError,
394                           /Header values must be of type string or array/)
395      end
396    end
397
398    describe 'without a call operation' do
399      def get_response(stub)
400        stub.client_streamer(@method, @sent_msgs, noop, noop,
401                             metadata: @metadata)
402      end
403
404      it_behaves_like 'client streaming'
405    end
406
407    describe 'via a call operation' do
408      after(:each) do
409        # make sure op.wait doesn't freeze, even if there's a bad status
410        @op.wait
411      end
412      def get_response(stub, run_start_call_first: false)
413        @op = stub.client_streamer(@method, @sent_msgs, noop, noop,
414                                   return_op: true, metadata: @metadata)
415        expect(@op).to be_a(GRPC::ActiveCall::Operation)
416        @op.start_call if run_start_call_first
417        result = @op.execute
418        result
419      end
420
421      it_behaves_like 'client streaming'
422
423      def run_op_view_metadata_test(run_start_call_first)
424        @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
425        @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
426        th = run_client_streamer(
427          @sent_msgs, @resp, @pass,
428          expected_metadata: @metadata,
429          server_initial_md: @server_initial_md,
430          server_trailing_md: @server_trailing_md)
431        expect(
432          get_response(@stub,
433                       run_start_call_first: run_start_call_first)).to eq(@resp)
434        th.join
435      end
436
437      it 'sends metadata to the server ok when running start_call first' do
438        run_op_view_metadata_test(true)
439        check_op_view_of_finished_client_call(
440          @op, @server_initial_md, @server_trailing_md
441        ) { |r| GRPC.logger.info(r) }
442      end
443
444      it 'does not crash when used after the call has been finished' do
445        run_op_view_metadata_test(false)
446        check_op_view_of_finished_client_call(
447          @op, @server_initial_md, @server_trailing_md
448        ) { |r| GRPC.logger.info(r) }
449      end
450    end
451  end
452
453  describe '#server_streamer', server_streamer: true do
454    before(:each) do
455      @sent_msg = 'a_msg'
456      @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
457    end
458
459    shared_examples 'server streaming' do
460      it 'should send a request to/receive replies from a server' do
461        server_port = create_test_server
462        host = "localhost:#{server_port}"
463        th = run_server_streamer(@sent_msg, @replys, @pass)
464        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
465        expect(get_responses(stub).collect { |r| r }).to eq(@replys)
466        th.join
467      end
468
469      it 'should raise an error if the status is not ok' do
470        server_port = create_test_server
471        host = "localhost:#{server_port}"
472        th = run_server_streamer(@sent_msg, @replys, @fail)
473        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
474        e = get_responses(stub)
475        expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
476        th.join
477      end
478
479      it 'should send metadata to the server ok' do
480        server_port = create_test_server
481        host = "localhost:#{server_port}"
482        th = run_server_streamer(@sent_msg, @replys, @fail,
483                                 expected_metadata: { k1: 'v1', k2: 'v2' })
484        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
485        e = get_responses(stub)
486        expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
487        th.join
488      end
489
490      it 'should raise ArgumentError if metadata contains invalid values' do
491        @metadata.merge!(k3: 3)
492        server_port = create_test_server
493        host = "localhost:#{server_port}"
494        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
495        expect do
496          get_responses(stub).collect { |r| r }
497        end.to raise_error(ArgumentError,
498                           /Header values must be of type string or array/)
499      end
500
501      it 'the call terminates when there is an unmarshalling error' do
502        server_port = create_test_server
503        host = "localhost:#{server_port}"
504        th = run_server_streamer_handle_client_cancellation(
505          @sent_msg, @replys)
506        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
507
508        unmarshal = proc { fail(ArgumentError, 'test unmarshalling error') }
509        expect do
510          get_responses(stub, unmarshal: unmarshal).collect { |r| r }
511        end.to raise_error(ArgumentError, 'test unmarshalling error')
512        th.join
513      end
514    end
515
516    describe 'without a call operation' do
517      def get_responses(stub, unmarshal: noop)
518        e = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
519                                 metadata: @metadata)
520        expect(e).to be_a(Enumerator)
521        e
522      end
523
524      it_behaves_like 'server streaming'
525    end
526
527    describe 'via a call operation' do
528      after(:each) do
529        @op.wait # make sure wait doesn't freeze
530      end
531      def get_responses(stub, run_start_call_first: false, unmarshal: noop)
532        @op = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
533                                   return_op: true,
534                                   metadata: @metadata)
535        expect(@op).to be_a(GRPC::ActiveCall::Operation)
536        @op.start_call if run_start_call_first
537        e = @op.execute
538        expect(e).to be_a(Enumerator)
539        e
540      end
541
542      it_behaves_like 'server streaming'
543
544      def run_op_view_metadata_test(run_start_call_first)
545        server_port = create_test_server
546        host = "localhost:#{server_port}"
547        @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
548        @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
549        th = run_server_streamer(
550          @sent_msg, @replys, @pass,
551          expected_metadata: @metadata,
552          server_initial_md: @server_initial_md,
553          server_trailing_md: @server_trailing_md)
554        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
555        e = get_responses(stub, run_start_call_first: run_start_call_first)
556        expect(e.collect { |r| r }).to eq(@replys)
557        th.join
558      end
559
560      it 'should send metadata to the server ok when start_call is run first' do
561        run_op_view_metadata_test(true)
562        check_op_view_of_finished_client_call(
563          @op, @server_initial_md, @server_trailing_md) do |responses|
564          responses.each { |r| GRPC.logger.info(r) }
565        end
566      end
567
568      it 'does not crash when used after the call has been finished' do
569        run_op_view_metadata_test(false)
570        check_op_view_of_finished_client_call(
571          @op, @server_initial_md, @server_trailing_md) do |responses|
572          responses.each { |r| GRPC.logger.info(r) }
573        end
574      end
575
576      it 'raises GRPC::Cancelled after the call has been cancelled' do
577        server_port = create_test_server
578        host = "localhost:#{server_port}"
579        th = run_server_streamer_handle_client_cancellation(
580          @sent_msg, @replys)
581        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
582        resp = get_responses(stub, run_start_call_first: false)
583        expect(resp.next).to eq('reply_1')
584        @op.cancel
585        expect { resp.next }.to raise_error(GRPC::Cancelled)
586        th.join
587      end
588    end
589  end
590
591  describe '#bidi_streamer', bidi: true do
592    before(:each) do
593      @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
594      @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
595      server_port = create_test_server
596      @host = "localhost:#{server_port}"
597    end
598
599    shared_examples 'bidi streaming' do
600      it 'supports sending all the requests first' do
601        th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
602                                                   @pass)
603        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
604        e = get_responses(stub)
605        expect(e.collect { |r| r }).to eq(@replys)
606        th.join
607      end
608
609      it 'supports client-initiated ping pong' do
610        th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true)
611        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
612        e = get_responses(stub)
613        expect(e.collect { |r| r }).to eq(@sent_msgs)
614        th.join
615      end
616
617      it 'supports a server-initiated ping pong' do
618        th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)
619        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
620        e = get_responses(stub)
621        expect(e.collect { |r| r }).to eq(@sent_msgs)
622        th.join
623      end
624
625      it 'should raise an error if the status is not ok' do
626        th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @fail, false)
627        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
628        e = get_responses(stub)
629        expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
630        th.join
631      end
632
633      it 'should raise ArgumentError if metadata contains invalid values' do
634        @metadata.merge!(k3: 3)
635        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
636        expect do
637          get_responses(stub).collect { |r| r }
638        end.to raise_error(ArgumentError,
639                           /Header values must be of type string or array/)
640      end
641
642      it 'terminates if the call fails to start' do
643        # don't start the server
644        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
645        expect do
646          get_responses(stub, deadline: from_relative_time(0)).collect { |r| r }
647        end.to raise_error(GRPC::BadStatus)
648      end
649
650      it 'should send metadata to the server ok' do
651        th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true,
652                                              expected_metadata: @metadata)
653        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
654        e = get_responses(stub)
655        expect(e.collect { |r| r }).to eq(@sent_msgs)
656        th.join
657      end
658
659      # Prompted by grpc/github #10526
660      describe 'surfacing of errors when sending requests' do
661        def run_server_bidi_send_one_then_read_indefinitely
662          @server.start
663          recvd_rpc = @server.request_call
664          recvd_call = recvd_rpc.call
665          server_call = GRPC::ActiveCall.new(
666            recvd_call, noop, noop, INFINITE_FUTURE,
667            metadata_received: true, started: false)
668          server_call.send_initial_metadata
669          server_call.remote_send('server response')
670          loop do
671            m = server_call.remote_read
672            break if m.nil?
673          end
674          # can't fail since initial metadata already sent
675          server_call.send_status(@pass, 'OK', true)
676          close_active_server_call(server_call)
677        end
678
679        def verify_error_from_write_thread(stub, requests_to_push,
680                                           request_queue, expected_description)
681          # TODO: an improvement might be to raise the original exception from
682          # bidi call write loops instead of only cancelling the call
683          failing_marshal_proc = proc do |req|
684            fail req if req.is_a?(StandardError)
685            req
686          end
687          begin
688            e = get_responses(stub, marshal_proc: failing_marshal_proc)
689            first_response = e.next
690            expect(first_response).to eq('server response')
691            requests_to_push.each { |req| request_queue.push(req) }
692            e.collect { |r| r }
693          rescue GRPC::Unknown => e
694            exception = e
695          end
696          expect(exception.message.include?(expected_description)).to be(true)
697        end
698
699        # Provides an Enumerable view of a Queue
700        class BidiErrorTestingEnumerateForeverQueue
701          def initialize(queue)
702            @queue = queue
703          end
704
705          def each
706            loop do
707              msg = @queue.pop
708              yield msg
709            end
710          end
711        end
712
713        def run_error_in_client_request_stream_test(requests_to_push,
714                                                    expected_error_message)
715          # start a server that waits on a read indefinitely - it should
716          # see a cancellation and be able to break out
717          th = Thread.new { run_server_bidi_send_one_then_read_indefinitely }
718          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
719
720          request_queue = Queue.new
721          @sent_msgs = BidiErrorTestingEnumerateForeverQueue.new(request_queue)
722
723          verify_error_from_write_thread(stub,
724                                         requests_to_push,
725                                         request_queue,
726                                         expected_error_message)
727          # the write loop errror should cancel the call and end the
728          # server's request stream
729          th.join
730        end
731
732        it 'non-GRPC errors from the write loop surface when raised ' \
733          'at the start of a request stream' do
734          expected_error_message = 'expect error on first request'
735          requests_to_push = [StandardError.new(expected_error_message)]
736          run_error_in_client_request_stream_test(requests_to_push,
737                                                  expected_error_message)
738        end
739
740        it 'non-GRPC errors from the write loop surface when raised ' \
741          'during the middle of a request stream' do
742          expected_error_message = 'expect error on last request'
743          requests_to_push = %w( one two )
744          requests_to_push << StandardError.new(expected_error_message)
745          run_error_in_client_request_stream_test(requests_to_push,
746                                                  expected_error_message)
747        end
748      end
749
750      # Prompted by grpc/github #14853
751      describe 'client-side error handling on bidi streams' do
752        class EnumeratorQueue
753          def initialize(queue)
754            @queue = queue
755          end
756
757          def each
758            loop do
759              msg = @queue.pop
760              break if msg.nil?
761              yield msg
762            end
763          end
764        end
765
766        def run_server_bidi_shutdown_after_one_read
767          @server.start
768          recvd_rpc = @server.request_call
769          recvd_call = recvd_rpc.call
770          server_call = GRPC::ActiveCall.new(
771            recvd_call, noop, noop, INFINITE_FUTURE,
772            metadata_received: true, started: false)
773          expect(server_call.remote_read).to eq('first message')
774          @server.shutdown_and_notify(from_relative_time(0))
775          @server.close
776        end
777
778        it 'receives a grpc status code when writes to a bidi stream fail' do
779          # This test tries to trigger the case when a 'SEND_MESSAGE' op
780          # and subseqeunt 'SEND_CLOSE_FROM_CLIENT' op of a bidi stream fails.
781          # In this case, iteration through the response stream should result
782          # in a grpc status code, and the writer thread should not raise an
783          # exception.
784          server_thread = Thread.new do
785            run_server_bidi_shutdown_after_one_read
786          end
787          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
788          request_queue = Queue.new
789          @sent_msgs = EnumeratorQueue.new(request_queue)
790          responses = get_responses(stub)
791          request_queue.push('first message')
792          # Now wait for the server to shut down.
793          server_thread.join
794          # Sanity check. This test is not interesting if
795          # Thread.abort_on_exception is not set.
796          expect(Thread.abort_on_exception).to be(true)
797          # An attempt to send a second message should fail now that the
798          # server is down.
799          request_queue.push('second message')
800          request_queue.push(nil)
801          expect { responses.next }.to raise_error(GRPC::BadStatus)
802        end
803
804        def run_server_bidi_shutdown_after_one_write
805          @server.start
806          recvd_rpc = @server.request_call
807          recvd_call = recvd_rpc.call
808          server_call = GRPC::ActiveCall.new(
809            recvd_call, noop, noop, INFINITE_FUTURE,
810            metadata_received: true, started: false)
811          server_call.send_initial_metadata
812          server_call.remote_send('message')
813          @server.shutdown_and_notify(from_relative_time(0))
814          @server.close
815        end
816
817        it 'receives a grpc status code when reading from a failed bidi call' do
818          server_thread = Thread.new do
819            run_server_bidi_shutdown_after_one_write
820          end
821          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
822          request_queue = Queue.new
823          @sent_msgs = EnumeratorQueue.new(request_queue)
824          responses = get_responses(stub)
825          expect(responses.next).to eq('message')
826          # Wait for the server to shut down
827          server_thread.join
828          expect { responses.next }.to raise_error(GRPC::BadStatus)
829          # Push a sentinel to allow the writer thread to finish
830          request_queue.push(nil)
831        end
832      end
833    end
834
835    describe 'without a call operation' do
836      def get_responses(stub, deadline: nil, marshal_proc: noop)
837        e = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop,
838                               metadata: @metadata, deadline: deadline)
839        expect(e).to be_a(Enumerator)
840        e
841      end
842
843      it_behaves_like 'bidi streaming'
844    end
845
846    describe 'via a call operation' do
847      after(:each) do
848        @op.wait # make sure wait doesn't freeze
849      end
850      def get_responses(stub, run_start_call_first: false, deadline: nil,
851                        marshal_proc: noop)
852        @op = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop,
853                                 return_op: true,
854                                 metadata: @metadata, deadline: deadline)
855        expect(@op).to be_a(GRPC::ActiveCall::Operation)
856        @op.start_call if run_start_call_first
857        e = @op.execute
858        expect(e).to be_a(Enumerator)
859        e
860      end
861
862      it_behaves_like 'bidi streaming'
863
864      def run_op_view_metadata_test(run_start_call_first)
865        @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
866        @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
867        th = run_bidi_streamer_echo_ping_pong(
868          @sent_msgs, @pass, true,
869          expected_metadata: @metadata,
870          server_initial_md: @server_initial_md,
871          server_trailing_md: @server_trailing_md)
872        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
873        e = get_responses(stub, run_start_call_first: run_start_call_first)
874        expect(e.collect { |r| r }).to eq(@sent_msgs)
875        th.join
876      end
877
878      it 'can run start_call before executing the call' do
879        run_op_view_metadata_test(true)
880        check_op_view_of_finished_client_call(
881          @op, @server_initial_md, @server_trailing_md) do |responses|
882          responses.each { |r| GRPC.logger.info(r) }
883        end
884      end
885
886      it 'doesnt crash when op_view used after call has finished' do
887        run_op_view_metadata_test(false)
888        check_op_view_of_finished_client_call(
889          @op, @server_initial_md, @server_trailing_md) do |responses|
890          responses.each { |r| GRPC.logger.info(r) }
891        end
892      end
893
894      def run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback)
895        @server.start
896        recvd_rpc = @server.request_call
897        recvd_call = recvd_rpc.call
898        server_call = GRPC::ActiveCall.new(
899          recvd_call, noop, noop, INFINITE_FUTURE,
900          metadata_received: true, started: false)
901        server_call.send_initial_metadata
902        server_call.remote_send('server call received')
903        wait_for_shutdown_ok_callback.call
904        # since the client is cancelling the call,
905        # we should be able to shut down cleanly
906        @server.shutdown_and_notify(nil)
907        @server.close
908      end
909
910      it 'receives a grpc status code when reading from a cancelled bidi call' do
911        # This test tries to trigger a 'RECV_INITIAL_METADATA' and/or
912        # 'RECV_MESSAGE' op failure.
913        # An attempt to read a message might fail; in that case, iteration
914        # through the response stream should still result in a grpc status.
915        server_can_shutdown = false
916        server_can_shutdown_mu = Mutex.new
917        server_can_shutdown_cv = ConditionVariable.new
918        wait_for_shutdown_ok_callback = proc do
919          server_can_shutdown_mu.synchronize do
920            server_can_shutdown_cv.wait(server_can_shutdown_mu) until server_can_shutdown
921          end
922        end
923        server_thread = Thread.new do
924          run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback)
925        end
926        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
927        request_queue = Queue.new
928        @sent_msgs = EnumeratorQueue.new(request_queue)
929        responses = get_responses(stub)
930        expect(responses.next).to eq('server call received')
931        @op.cancel
932        expect { responses.next }.to raise_error(GRPC::Cancelled)
933        # Now let the server proceed to shut down.
934        server_can_shutdown_mu.synchronize do
935          server_can_shutdown = true
936          server_can_shutdown_cv.broadcast
937        end
938        server_thread.join
939        # Push a sentinel to allow the writer thread to finish
940        request_queue.push(nil)
941      end
942    end
943  end
944
945  def run_server_streamer(expected_input, replys, status,
946                          expected_metadata: {},
947                          server_initial_md: {},
948                          server_trailing_md: {})
949    wanted_metadata = expected_metadata.clone
950    wakey_thread do |notifier|
951      c = expect_server_to_be_invoked(
952        notifier, metadata_to_send: server_initial_md)
953      wanted_metadata.each do |k, v|
954        expect(c.metadata[k.to_s]).to eq(v)
955      end
956      expect(c.remote_read).to eq(expected_input)
957      replys.each { |r| c.remote_send(r) }
958      c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
959                    metadata: server_trailing_md)
960      close_active_server_call(c)
961    end
962  end
963
964  def run_bidi_streamer_handle_inputs_first(expected_inputs, replys,
965                                            status)
966    wakey_thread do |notifier|
967      c = expect_server_to_be_invoked(notifier)
968      expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
969      replys.each { |r| c.remote_send(r) }
970      c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
971      close_active_server_call(c)
972    end
973  end
974
975  def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts,
976                                       expected_metadata: {},
977                                       server_initial_md: {},
978                                       server_trailing_md: {})
979    wanted_metadata = expected_metadata.clone
980    wakey_thread do |notifier|
981      c = expect_server_to_be_invoked(
982        notifier, metadata_to_send: server_initial_md)
983      wanted_metadata.each do |k, v|
984        expect(c.metadata[k.to_s]).to eq(v)
985      end
986      expected_inputs.each do |i|
987        if client_starts
988          expect(c.remote_read).to eq(i)
989          c.remote_send(i)
990        else
991          c.remote_send(i)
992          expect(c.remote_read).to eq(i)
993        end
994      end
995      c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
996                    metadata: server_trailing_md)
997      close_active_server_call(c)
998    end
999  end
1000
1001  def run_client_streamer(expected_inputs, resp, status,
1002                          expected_metadata: {},
1003                          server_initial_md: {},
1004                          server_trailing_md: {})
1005    wanted_metadata = expected_metadata.clone
1006    wakey_thread do |notifier|
1007      c = expect_server_to_be_invoked(
1008        notifier, metadata_to_send: server_initial_md)
1009      expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
1010      wanted_metadata.each do |k, v|
1011        expect(c.metadata[k.to_s]).to eq(v)
1012      end
1013      c.remote_send(resp)
1014      c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
1015                    metadata: server_trailing_md)
1016      close_active_server_call(c)
1017    end
1018  end
1019
1020  def run_server_streamer_handle_client_cancellation(
1021    expected_input, replys)
1022    wakey_thread do |notifier|
1023      c = expect_server_to_be_invoked(notifier)
1024      expect(c.remote_read).to eq(expected_input)
1025      begin
1026        replys.each { |r| c.remote_send(r) }
1027      rescue GRPC::Core::CallError
1028        # An attempt to write to the client might fail. This is ok
1029        # because the client call is expected to cancel the call,
1030        # and there is a race as for when the server-side call will
1031        # start to fail.
1032        p 'remote_send failed (allowed because call expected to cancel)'
1033      ensure
1034        c.send_status(OK, 'OK', true)
1035        close_active_server_call(c)
1036      end
1037    end
1038  end
1039
1040  def run_request_response(expected_input, resp, status,
1041                           expected_metadata: {},
1042                           server_initial_md: {},
1043                           server_trailing_md: {})
1044    wanted_metadata = expected_metadata.clone
1045    wakey_thread do |notifier|
1046      c = expect_server_to_be_invoked(
1047        notifier, metadata_to_send: server_initial_md)
1048      expect(c.remote_read).to eq(expected_input)
1049      wanted_metadata.each do |k, v|
1050        expect(c.metadata[k.to_s]).to eq(v)
1051      end
1052      c.remote_send(resp)
1053      c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
1054                    metadata: server_trailing_md)
1055      close_active_server_call(c)
1056    end
1057  end
1058
1059  def create_secure_test_server
1060    certs = load_test_certs
1061    secure_credentials = GRPC::Core::ServerCredentials.new(
1062      nil, [{ private_key: certs[1], cert_chain: certs[2] }], false)
1063
1064    @server = new_core_server_for_testing(nil)
1065    @server.add_http2_port('0.0.0.0:0', secure_credentials)
1066  end
1067
1068  def create_test_server
1069    @server = new_core_server_for_testing(nil)
1070    @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
1071  end
1072
1073  def expect_server_to_be_invoked(notifier, metadata_to_send: nil)
1074    @server.start
1075    notifier.notify(nil)
1076    recvd_rpc = @server.request_call
1077    recvd_call = recvd_rpc.call
1078    recvd_call.metadata = recvd_rpc.metadata
1079    recvd_call.run_batch(SEND_INITIAL_METADATA => metadata_to_send)
1080    GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE,
1081                         metadata_received: true)
1082  end
1083end
1084