1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15require 'spec_helper' 16 17Thread.abort_on_exception = true 18 19def wakey_thread(&blk) 20 n = GRPC::Notifier.new 21 t = Thread.new do 22 blk.call(n) 23 end 24 t.abort_on_exception = true 25 n.wait 26 t 27end 28 29def load_test_certs 30 test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata') 31 files = ['ca.pem', 'server1.key', 'server1.pem'] 32 files.map { |f| File.open(File.join(test_root, f)).read } 33end 34 35include GRPC::Core::StatusCodes 36include GRPC::Core::TimeConsts 37include GRPC::Core::CallOps 38 39# check that methods on a finished/closed call t crash 40def check_op_view_of_finished_client_call(op_view, 41 expected_metadata, 42 expected_trailing_metadata) 43 # use read_response_stream to try to iterate through 44 # possible response stream 45 fail('need something to attempt reads') unless block_given? 46 expect do 47 resp = op_view.execute 48 yield resp 49 end.to raise_error(GRPC::Core::CallError) 50 51 expect { op_view.start_call }.to raise_error(RuntimeError) 52 53 sanity_check_values_of_accessors(op_view, 54 expected_metadata, 55 expected_trailing_metadata) 56 57 expect do 58 op_view.wait 59 op_view.cancel 60 op_view.write_flag = 1 61 end.to_not raise_error 62end 63 64def sanity_check_values_of_accessors(op_view, 65 expected_metadata, 66 expected_trailing_metadata) 67 expected_status = Struct::Status.new 68 expected_status.code = 0 69 expected_status.details = 'OK' 70 expected_status.metadata = expected_trailing_metadata 71 72 expect(op_view.status).to eq(expected_status) 73 expect(op_view.metadata).to eq(expected_metadata) 74 expect(op_view.trailing_metadata).to eq(expected_trailing_metadata) 75 76 expect(op_view.cancelled?).to be(false) 77 expect(op_view.write_flag).to be(nil) 78 79 # The deadline attribute of a call can be either 80 # a GRPC::Core::TimeSpec or a Time, which are mutually exclusive. 81 # TODO: fix so that the accessor always returns the same type. 82 expect(op_view.deadline.is_a?(GRPC::Core::TimeSpec) || 83 op_view.deadline.is_a?(Time)).to be(true) 84end 85 86def close_active_server_call(active_server_call) 87 active_server_call.send(:set_input_stream_done) 88 active_server_call.send(:set_output_stream_done) 89end 90 91describe 'ClientStub' do # rubocop:disable Metrics/BlockLength 92 let(:noop) { proc { |x| x } } 93 94 before(:each) do 95 Thread.abort_on_exception = true 96 @server = nil 97 @method = 'an_rpc_method' 98 @pass = OK 99 @fail = INTERNAL 100 @metadata = { k1: 'v1', k2: 'v2' } 101 end 102 103 after(:each) do 104 unless @server.nil? 105 @server.shutdown_and_notify(from_relative_time(2)) 106 @server.close 107 end 108 end 109 110 describe '#new' do 111 let(:fake_host) { 'localhost:0' } 112 it 'can be created from a host and args' do 113 opts = { channel_args: { a_channel_arg: 'an_arg' } } 114 blk = proc do 115 GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts) 116 end 117 expect(&blk).not_to raise_error 118 end 119 120 it 'can be created with an channel override' do 121 opts = { 122 channel_args: { a_channel_arg: 'an_arg' }, 123 channel_override: @ch 124 } 125 blk = proc do 126 GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts) 127 end 128 expect(&blk).not_to raise_error 129 end 130 131 it 'cannot be created with a bad channel override' do 132 blk = proc do 133 opts = { 134 channel_args: { a_channel_arg: 'an_arg' }, 135 channel_override: Object.new 136 } 137 GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts) 138 end 139 expect(&blk).to raise_error 140 end 141 142 it 'cannot be created with bad credentials' do 143 blk = proc do 144 opts = { channel_args: { a_channel_arg: 'an_arg' } } 145 GRPC::ClientStub.new(fake_host, Object.new, **opts) 146 end 147 expect(&blk).to raise_error 148 end 149 150 it 'can be created with test test credentials' do 151 certs = load_test_certs 152 blk = proc do 153 opts = { 154 channel_args: { 155 GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr', 156 a_channel_arg: 'an_arg' 157 } 158 } 159 creds = GRPC::Core::ChannelCredentials.new(certs[0], nil, nil) 160 GRPC::ClientStub.new(fake_host, creds, **opts) 161 end 162 expect(&blk).to_not raise_error 163 end 164 end 165 166 describe '#request_response', request_response: true do 167 before(:each) do 168 @sent_msg, @resp = 'a_msg', 'a_reply' 169 end 170 171 shared_examples 'request response' do 172 it 'should send a request to/receive a reply from a server' do 173 server_port = create_test_server 174 th = run_request_response(@sent_msg, @resp, @pass) 175 stub = GRPC::ClientStub.new("localhost:#{server_port}", 176 :this_channel_is_insecure) 177 expect(get_response(stub)).to eq(@resp) 178 th.join 179 end 180 181 def metadata_test(md) 182 server_port = create_test_server 183 host = "localhost:#{server_port}" 184 th = run_request_response(@sent_msg, @resp, @pass, 185 expected_metadata: md) 186 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 187 @metadata = md 188 expect(get_response(stub)).to eq(@resp) 189 th.join 190 end 191 192 it 'should send metadata to the server ok' do 193 metadata_test(k1: 'v1', k2: 'v2') 194 end 195 196 # these tests mostly try to exercise when md might be allocated 197 # instead of inlined 198 it 'should send metadata with multiple large md to the server ok' do 199 val_array = %w( 200 '00000000000000000000000000000000000000000000000000000000000000', 201 '11111111111111111111111111111111111111111111111111111111111111', 202 '22222222222222222222222222222222222222222222222222222222222222', 203 ) 204 md = { 205 k1: val_array, 206 k2: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', 207 k3: 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb', 208 k4: 'cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc', 209 keeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeey5: 'v5', 210 'k66666666666666666666666666666666666666666666666666666' => 'v6', 211 'k77777777777777777777777777777777777777777777777777777' => 'v7', 212 'k88888888888888888888888888888888888888888888888888888' => 'v8' 213 } 214 metadata_test(md) 215 end 216 217 it 'should send a request when configured using an override channel' do 218 server_port = create_test_server 219 alt_host = "localhost:#{server_port}" 220 th = run_request_response(@sent_msg, @resp, @pass) 221 ch = GRPC::Core::Channel.new(alt_host, nil, :this_channel_is_insecure) 222 stub = GRPC::ClientStub.new('ignored-host', 223 :this_channel_is_insecure, 224 channel_override: ch) 225 expect(get_response(stub)).to eq(@resp) 226 th.join 227 end 228 229 it 'should raise an error if the status is not OK' do 230 server_port = create_test_server 231 host = "localhost:#{server_port}" 232 th = run_request_response(@sent_msg, @resp, @fail) 233 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 234 blk = proc { get_response(stub) } 235 expect(&blk).to raise_error(GRPC::BadStatus) 236 th.join 237 end 238 239 it 'should receive UNAVAILABLE if call credentials plugin fails' do 240 server_port = create_secure_test_server 241 server_started_notifier = GRPC::Notifier.new 242 th = Thread.new do 243 @server.start 244 server_started_notifier.notify(nil) 245 # Poll on the server so that the client connection can proceed. 246 # We don't expect the server to actually accept a call though. 247 expect { @server.request_call }.to raise_error(GRPC::Core::CallError) 248 end 249 server_started_notifier.wait 250 251 certs = load_test_certs 252 secure_channel_creds = GRPC::Core::ChannelCredentials.new( 253 certs[0], nil, nil) 254 secure_stub_opts = { 255 channel_args: { 256 GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr' 257 } 258 } 259 stub = GRPC::ClientStub.new("localhost:#{server_port}", 260 secure_channel_creds, **secure_stub_opts) 261 262 error_message = 'Failing call credentials callback' 263 failing_auth = proc do 264 fail error_message 265 end 266 creds = GRPC::Core::CallCredentials.new(failing_auth) 267 268 unavailable_error_occurred = false 269 begin 270 get_response(stub, credentials: creds) 271 rescue GRPC::Unavailable => e 272 unavailable_error_occurred = true 273 expect(e.details.include?(error_message)).to be true 274 end 275 expect(unavailable_error_occurred).to eq(true) 276 277 @server.shutdown_and_notify(Time.now + 3) 278 th.join 279 @server.close 280 end 281 282 it 'should raise ArgumentError if metadata contains invalid values' do 283 @metadata.merge!(k3: 3) 284 server_port = create_test_server 285 host = "localhost:#{server_port}" 286 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 287 expect do 288 get_response(stub) 289 end.to raise_error(ArgumentError, 290 /Header values must be of type string or array/) 291 end 292 end 293 294 describe 'without a call operation' do 295 def get_response(stub, credentials: nil) 296 GRPC.logger.info(credentials.inspect) 297 stub.request_response(@method, @sent_msg, noop, noop, 298 metadata: @metadata, 299 credentials: credentials) 300 end 301 302 it_behaves_like 'request response' 303 end 304 305 describe 'via a call operation' do 306 after(:each) do 307 # make sure op.wait doesn't freeze, even if there's a bad status 308 @op.wait 309 end 310 def get_response(stub, run_start_call_first: false, credentials: nil) 311 @op = stub.request_response(@method, @sent_msg, noop, noop, 312 return_op: true, 313 metadata: @metadata, 314 deadline: from_relative_time(2), 315 credentials: credentials) 316 expect(@op).to be_a(GRPC::ActiveCall::Operation) 317 @op.start_call if run_start_call_first 318 result = @op.execute 319 result 320 end 321 322 it_behaves_like 'request response' 323 324 def run_op_view_metadata_test(run_start_call_first) 325 server_port = create_test_server 326 host = "localhost:#{server_port}" 327 328 @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } 329 @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } 330 th = run_request_response( 331 @sent_msg, @resp, @pass, 332 expected_metadata: @metadata, 333 server_initial_md: @server_initial_md, 334 server_trailing_md: @server_trailing_md) 335 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 336 expect( 337 get_response(stub, 338 run_start_call_first: run_start_call_first)).to eq(@resp) 339 th.join 340 end 341 342 it 'sends metadata to the server ok when running start_call first' do 343 run_op_view_metadata_test(true) 344 check_op_view_of_finished_client_call( 345 @op, @server_initial_md, @server_trailing_md 346 ) { |r| GRPC.logger.info(r) } 347 end 348 349 it 'does not crash when used after the call has been finished' do 350 run_op_view_metadata_test(false) 351 check_op_view_of_finished_client_call( 352 @op, @server_initial_md, @server_trailing_md 353 ) { |r| GRPC.logger.info(r) } 354 end 355 end 356 end 357 358 describe '#client_streamer', client_streamer: true do 359 before(:each) do 360 Thread.abort_on_exception = true 361 server_port = create_test_server 362 host = "localhost:#{server_port}" 363 @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 364 @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } 365 @resp = 'a_reply' 366 end 367 368 shared_examples 'client streaming' do 369 it 'should send requests to/receive a reply from a server' do 370 th = run_client_streamer(@sent_msgs, @resp, @pass) 371 expect(get_response(@stub)).to eq(@resp) 372 th.join 373 end 374 375 it 'should send metadata to the server ok' do 376 th = run_client_streamer(@sent_msgs, @resp, @pass, 377 expected_metadata: @metadata) 378 expect(get_response(@stub)).to eq(@resp) 379 th.join 380 end 381 382 it 'should raise an error if the status is not ok' do 383 th = run_client_streamer(@sent_msgs, @resp, @fail) 384 blk = proc { get_response(@stub) } 385 expect(&blk).to raise_error(GRPC::BadStatus) 386 th.join 387 end 388 389 it 'should raise ArgumentError if metadata contains invalid values' do 390 @metadata.merge!(k3: 3) 391 expect do 392 get_response(@stub) 393 end.to raise_error(ArgumentError, 394 /Header values must be of type string or array/) 395 end 396 end 397 398 describe 'without a call operation' do 399 def get_response(stub) 400 stub.client_streamer(@method, @sent_msgs, noop, noop, 401 metadata: @metadata) 402 end 403 404 it_behaves_like 'client streaming' 405 end 406 407 describe 'via a call operation' do 408 after(:each) do 409 # make sure op.wait doesn't freeze, even if there's a bad status 410 @op.wait 411 end 412 def get_response(stub, run_start_call_first: false) 413 @op = stub.client_streamer(@method, @sent_msgs, noop, noop, 414 return_op: true, metadata: @metadata) 415 expect(@op).to be_a(GRPC::ActiveCall::Operation) 416 @op.start_call if run_start_call_first 417 result = @op.execute 418 result 419 end 420 421 it_behaves_like 'client streaming' 422 423 def run_op_view_metadata_test(run_start_call_first) 424 @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } 425 @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } 426 th = run_client_streamer( 427 @sent_msgs, @resp, @pass, 428 expected_metadata: @metadata, 429 server_initial_md: @server_initial_md, 430 server_trailing_md: @server_trailing_md) 431 expect( 432 get_response(@stub, 433 run_start_call_first: run_start_call_first)).to eq(@resp) 434 th.join 435 end 436 437 it 'sends metadata to the server ok when running start_call first' do 438 run_op_view_metadata_test(true) 439 check_op_view_of_finished_client_call( 440 @op, @server_initial_md, @server_trailing_md 441 ) { |r| GRPC.logger.info(r) } 442 end 443 444 it 'does not crash when used after the call has been finished' do 445 run_op_view_metadata_test(false) 446 check_op_view_of_finished_client_call( 447 @op, @server_initial_md, @server_trailing_md 448 ) { |r| GRPC.logger.info(r) } 449 end 450 end 451 end 452 453 describe '#server_streamer', server_streamer: true do 454 before(:each) do 455 @sent_msg = 'a_msg' 456 @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } 457 end 458 459 shared_examples 'server streaming' do 460 it 'should send a request to/receive replies from a server' do 461 server_port = create_test_server 462 host = "localhost:#{server_port}" 463 th = run_server_streamer(@sent_msg, @replys, @pass) 464 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 465 expect(get_responses(stub).collect { |r| r }).to eq(@replys) 466 th.join 467 end 468 469 it 'should raise an error if the status is not ok' do 470 server_port = create_test_server 471 host = "localhost:#{server_port}" 472 th = run_server_streamer(@sent_msg, @replys, @fail) 473 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 474 e = get_responses(stub) 475 expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) 476 th.join 477 end 478 479 it 'should send metadata to the server ok' do 480 server_port = create_test_server 481 host = "localhost:#{server_port}" 482 th = run_server_streamer(@sent_msg, @replys, @fail, 483 expected_metadata: { k1: 'v1', k2: 'v2' }) 484 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 485 e = get_responses(stub) 486 expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) 487 th.join 488 end 489 490 it 'should raise ArgumentError if metadata contains invalid values' do 491 @metadata.merge!(k3: 3) 492 server_port = create_test_server 493 host = "localhost:#{server_port}" 494 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 495 expect do 496 get_responses(stub).collect { |r| r } 497 end.to raise_error(ArgumentError, 498 /Header values must be of type string or array/) 499 end 500 501 it 'the call terminates when there is an unmarshalling error' do 502 server_port = create_test_server 503 host = "localhost:#{server_port}" 504 th = run_server_streamer_handle_client_cancellation( 505 @sent_msg, @replys) 506 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 507 508 unmarshal = proc { fail(ArgumentError, 'test unmarshalling error') } 509 expect do 510 get_responses(stub, unmarshal: unmarshal).collect { |r| r } 511 end.to raise_error(ArgumentError, 'test unmarshalling error') 512 th.join 513 end 514 end 515 516 describe 'without a call operation' do 517 def get_responses(stub, unmarshal: noop) 518 e = stub.server_streamer(@method, @sent_msg, noop, unmarshal, 519 metadata: @metadata) 520 expect(e).to be_a(Enumerator) 521 e 522 end 523 524 it_behaves_like 'server streaming' 525 end 526 527 describe 'via a call operation' do 528 after(:each) do 529 @op.wait # make sure wait doesn't freeze 530 end 531 def get_responses(stub, run_start_call_first: false, unmarshal: noop) 532 @op = stub.server_streamer(@method, @sent_msg, noop, unmarshal, 533 return_op: true, 534 metadata: @metadata) 535 expect(@op).to be_a(GRPC::ActiveCall::Operation) 536 @op.start_call if run_start_call_first 537 e = @op.execute 538 expect(e).to be_a(Enumerator) 539 e 540 end 541 542 it_behaves_like 'server streaming' 543 544 def run_op_view_metadata_test(run_start_call_first) 545 server_port = create_test_server 546 host = "localhost:#{server_port}" 547 @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } 548 @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } 549 th = run_server_streamer( 550 @sent_msg, @replys, @pass, 551 expected_metadata: @metadata, 552 server_initial_md: @server_initial_md, 553 server_trailing_md: @server_trailing_md) 554 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 555 e = get_responses(stub, run_start_call_first: run_start_call_first) 556 expect(e.collect { |r| r }).to eq(@replys) 557 th.join 558 end 559 560 it 'should send metadata to the server ok when start_call is run first' do 561 run_op_view_metadata_test(true) 562 check_op_view_of_finished_client_call( 563 @op, @server_initial_md, @server_trailing_md) do |responses| 564 responses.each { |r| GRPC.logger.info(r) } 565 end 566 end 567 568 it 'does not crash when used after the call has been finished' do 569 run_op_view_metadata_test(false) 570 check_op_view_of_finished_client_call( 571 @op, @server_initial_md, @server_trailing_md) do |responses| 572 responses.each { |r| GRPC.logger.info(r) } 573 end 574 end 575 576 it 'raises GRPC::Cancelled after the call has been cancelled' do 577 server_port = create_test_server 578 host = "localhost:#{server_port}" 579 th = run_server_streamer_handle_client_cancellation( 580 @sent_msg, @replys) 581 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 582 resp = get_responses(stub, run_start_call_first: false) 583 expect(resp.next).to eq('reply_1') 584 @op.cancel 585 expect { resp.next }.to raise_error(GRPC::Cancelled) 586 th.join 587 end 588 end 589 end 590 591 describe '#bidi_streamer', bidi: true do 592 before(:each) do 593 @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } 594 @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } 595 server_port = create_test_server 596 @host = "localhost:#{server_port}" 597 end 598 599 shared_examples 'bidi streaming' do 600 it 'supports sending all the requests first' do 601 th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, 602 @pass) 603 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 604 e = get_responses(stub) 605 expect(e.collect { |r| r }).to eq(@replys) 606 th.join 607 end 608 609 it 'supports client-initiated ping pong' do 610 th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true) 611 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 612 e = get_responses(stub) 613 expect(e.collect { |r| r }).to eq(@sent_msgs) 614 th.join 615 end 616 617 it 'supports a server-initiated ping pong' do 618 th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false) 619 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 620 e = get_responses(stub) 621 expect(e.collect { |r| r }).to eq(@sent_msgs) 622 th.join 623 end 624 625 it 'should raise an error if the status is not ok' do 626 th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @fail, false) 627 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 628 e = get_responses(stub) 629 expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) 630 th.join 631 end 632 633 it 'should raise ArgumentError if metadata contains invalid values' do 634 @metadata.merge!(k3: 3) 635 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 636 expect do 637 get_responses(stub).collect { |r| r } 638 end.to raise_error(ArgumentError, 639 /Header values must be of type string or array/) 640 end 641 642 it 'terminates if the call fails to start' do 643 # don't start the server 644 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 645 expect do 646 get_responses(stub, deadline: from_relative_time(0)).collect { |r| r } 647 end.to raise_error(GRPC::BadStatus) 648 end 649 650 it 'should send metadata to the server ok' do 651 th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true, 652 expected_metadata: @metadata) 653 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 654 e = get_responses(stub) 655 expect(e.collect { |r| r }).to eq(@sent_msgs) 656 th.join 657 end 658 659 # Prompted by grpc/github #10526 660 describe 'surfacing of errors when sending requests' do 661 def run_server_bidi_send_one_then_read_indefinitely 662 @server.start 663 recvd_rpc = @server.request_call 664 recvd_call = recvd_rpc.call 665 server_call = GRPC::ActiveCall.new( 666 recvd_call, noop, noop, INFINITE_FUTURE, 667 metadata_received: true, started: false) 668 server_call.send_initial_metadata 669 server_call.remote_send('server response') 670 loop do 671 m = server_call.remote_read 672 break if m.nil? 673 end 674 # can't fail since initial metadata already sent 675 server_call.send_status(@pass, 'OK', true) 676 close_active_server_call(server_call) 677 end 678 679 def verify_error_from_write_thread(stub, requests_to_push, 680 request_queue, expected_description) 681 # TODO: an improvement might be to raise the original exception from 682 # bidi call write loops instead of only cancelling the call 683 failing_marshal_proc = proc do |req| 684 fail req if req.is_a?(StandardError) 685 req 686 end 687 begin 688 e = get_responses(stub, marshal_proc: failing_marshal_proc) 689 first_response = e.next 690 expect(first_response).to eq('server response') 691 requests_to_push.each { |req| request_queue.push(req) } 692 e.collect { |r| r } 693 rescue GRPC::Unknown => e 694 exception = e 695 end 696 expect(exception.message.include?(expected_description)).to be(true) 697 end 698 699 # Provides an Enumerable view of a Queue 700 class BidiErrorTestingEnumerateForeverQueue 701 def initialize(queue) 702 @queue = queue 703 end 704 705 def each 706 loop do 707 msg = @queue.pop 708 yield msg 709 end 710 end 711 end 712 713 def run_error_in_client_request_stream_test(requests_to_push, 714 expected_error_message) 715 # start a server that waits on a read indefinitely - it should 716 # see a cancellation and be able to break out 717 th = Thread.new { run_server_bidi_send_one_then_read_indefinitely } 718 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 719 720 request_queue = Queue.new 721 @sent_msgs = BidiErrorTestingEnumerateForeverQueue.new(request_queue) 722 723 verify_error_from_write_thread(stub, 724 requests_to_push, 725 request_queue, 726 expected_error_message) 727 # the write loop errror should cancel the call and end the 728 # server's request stream 729 th.join 730 end 731 732 it 'non-GRPC errors from the write loop surface when raised ' \ 733 'at the start of a request stream' do 734 expected_error_message = 'expect error on first request' 735 requests_to_push = [StandardError.new(expected_error_message)] 736 run_error_in_client_request_stream_test(requests_to_push, 737 expected_error_message) 738 end 739 740 it 'non-GRPC errors from the write loop surface when raised ' \ 741 'during the middle of a request stream' do 742 expected_error_message = 'expect error on last request' 743 requests_to_push = %w( one two ) 744 requests_to_push << StandardError.new(expected_error_message) 745 run_error_in_client_request_stream_test(requests_to_push, 746 expected_error_message) 747 end 748 end 749 750 # Prompted by grpc/github #14853 751 describe 'client-side error handling on bidi streams' do 752 class EnumeratorQueue 753 def initialize(queue) 754 @queue = queue 755 end 756 757 def each 758 loop do 759 msg = @queue.pop 760 break if msg.nil? 761 yield msg 762 end 763 end 764 end 765 766 def run_server_bidi_shutdown_after_one_read 767 @server.start 768 recvd_rpc = @server.request_call 769 recvd_call = recvd_rpc.call 770 server_call = GRPC::ActiveCall.new( 771 recvd_call, noop, noop, INFINITE_FUTURE, 772 metadata_received: true, started: false) 773 expect(server_call.remote_read).to eq('first message') 774 @server.shutdown_and_notify(from_relative_time(0)) 775 @server.close 776 end 777 778 it 'receives a grpc status code when writes to a bidi stream fail' do 779 # This test tries to trigger the case when a 'SEND_MESSAGE' op 780 # and subseqeunt 'SEND_CLOSE_FROM_CLIENT' op of a bidi stream fails. 781 # In this case, iteration through the response stream should result 782 # in a grpc status code, and the writer thread should not raise an 783 # exception. 784 server_thread = Thread.new do 785 run_server_bidi_shutdown_after_one_read 786 end 787 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 788 request_queue = Queue.new 789 @sent_msgs = EnumeratorQueue.new(request_queue) 790 responses = get_responses(stub) 791 request_queue.push('first message') 792 # Now wait for the server to shut down. 793 server_thread.join 794 # Sanity check. This test is not interesting if 795 # Thread.abort_on_exception is not set. 796 expect(Thread.abort_on_exception).to be(true) 797 # An attempt to send a second message should fail now that the 798 # server is down. 799 request_queue.push('second message') 800 request_queue.push(nil) 801 expect { responses.next }.to raise_error(GRPC::BadStatus) 802 end 803 804 def run_server_bidi_shutdown_after_one_write 805 @server.start 806 recvd_rpc = @server.request_call 807 recvd_call = recvd_rpc.call 808 server_call = GRPC::ActiveCall.new( 809 recvd_call, noop, noop, INFINITE_FUTURE, 810 metadata_received: true, started: false) 811 server_call.send_initial_metadata 812 server_call.remote_send('message') 813 @server.shutdown_and_notify(from_relative_time(0)) 814 @server.close 815 end 816 817 it 'receives a grpc status code when reading from a failed bidi call' do 818 server_thread = Thread.new do 819 run_server_bidi_shutdown_after_one_write 820 end 821 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 822 request_queue = Queue.new 823 @sent_msgs = EnumeratorQueue.new(request_queue) 824 responses = get_responses(stub) 825 expect(responses.next).to eq('message') 826 # Wait for the server to shut down 827 server_thread.join 828 expect { responses.next }.to raise_error(GRPC::BadStatus) 829 # Push a sentinel to allow the writer thread to finish 830 request_queue.push(nil) 831 end 832 end 833 end 834 835 describe 'without a call operation' do 836 def get_responses(stub, deadline: nil, marshal_proc: noop) 837 e = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop, 838 metadata: @metadata, deadline: deadline) 839 expect(e).to be_a(Enumerator) 840 e 841 end 842 843 it_behaves_like 'bidi streaming' 844 end 845 846 describe 'via a call operation' do 847 after(:each) do 848 @op.wait # make sure wait doesn't freeze 849 end 850 def get_responses(stub, run_start_call_first: false, deadline: nil, 851 marshal_proc: noop) 852 @op = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop, 853 return_op: true, 854 metadata: @metadata, deadline: deadline) 855 expect(@op).to be_a(GRPC::ActiveCall::Operation) 856 @op.start_call if run_start_call_first 857 e = @op.execute 858 expect(e).to be_a(Enumerator) 859 e 860 end 861 862 it_behaves_like 'bidi streaming' 863 864 def run_op_view_metadata_test(run_start_call_first) 865 @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } 866 @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } 867 th = run_bidi_streamer_echo_ping_pong( 868 @sent_msgs, @pass, true, 869 expected_metadata: @metadata, 870 server_initial_md: @server_initial_md, 871 server_trailing_md: @server_trailing_md) 872 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 873 e = get_responses(stub, run_start_call_first: run_start_call_first) 874 expect(e.collect { |r| r }).to eq(@sent_msgs) 875 th.join 876 end 877 878 it 'can run start_call before executing the call' do 879 run_op_view_metadata_test(true) 880 check_op_view_of_finished_client_call( 881 @op, @server_initial_md, @server_trailing_md) do |responses| 882 responses.each { |r| GRPC.logger.info(r) } 883 end 884 end 885 886 it 'doesnt crash when op_view used after call has finished' do 887 run_op_view_metadata_test(false) 888 check_op_view_of_finished_client_call( 889 @op, @server_initial_md, @server_trailing_md) do |responses| 890 responses.each { |r| GRPC.logger.info(r) } 891 end 892 end 893 894 def run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback) 895 @server.start 896 recvd_rpc = @server.request_call 897 recvd_call = recvd_rpc.call 898 server_call = GRPC::ActiveCall.new( 899 recvd_call, noop, noop, INFINITE_FUTURE, 900 metadata_received: true, started: false) 901 server_call.send_initial_metadata 902 server_call.remote_send('server call received') 903 wait_for_shutdown_ok_callback.call 904 # since the client is cancelling the call, 905 # we should be able to shut down cleanly 906 @server.shutdown_and_notify(nil) 907 @server.close 908 end 909 910 it 'receives a grpc status code when reading from a cancelled bidi call' do 911 # This test tries to trigger a 'RECV_INITIAL_METADATA' and/or 912 # 'RECV_MESSAGE' op failure. 913 # An attempt to read a message might fail; in that case, iteration 914 # through the response stream should still result in a grpc status. 915 server_can_shutdown = false 916 server_can_shutdown_mu = Mutex.new 917 server_can_shutdown_cv = ConditionVariable.new 918 wait_for_shutdown_ok_callback = proc do 919 server_can_shutdown_mu.synchronize do 920 server_can_shutdown_cv.wait(server_can_shutdown_mu) until server_can_shutdown 921 end 922 end 923 server_thread = Thread.new do 924 run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback) 925 end 926 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 927 request_queue = Queue.new 928 @sent_msgs = EnumeratorQueue.new(request_queue) 929 responses = get_responses(stub) 930 expect(responses.next).to eq('server call received') 931 @op.cancel 932 expect { responses.next }.to raise_error(GRPC::Cancelled) 933 # Now let the server proceed to shut down. 934 server_can_shutdown_mu.synchronize do 935 server_can_shutdown = true 936 server_can_shutdown_cv.broadcast 937 end 938 server_thread.join 939 # Push a sentinel to allow the writer thread to finish 940 request_queue.push(nil) 941 end 942 end 943 end 944 945 def run_server_streamer(expected_input, replys, status, 946 expected_metadata: {}, 947 server_initial_md: {}, 948 server_trailing_md: {}) 949 wanted_metadata = expected_metadata.clone 950 wakey_thread do |notifier| 951 c = expect_server_to_be_invoked( 952 notifier, metadata_to_send: server_initial_md) 953 wanted_metadata.each do |k, v| 954 expect(c.metadata[k.to_s]).to eq(v) 955 end 956 expect(c.remote_read).to eq(expected_input) 957 replys.each { |r| c.remote_send(r) } 958 c.send_status(status, status == @pass ? 'OK' : 'NOK', true, 959 metadata: server_trailing_md) 960 close_active_server_call(c) 961 end 962 end 963 964 def run_bidi_streamer_handle_inputs_first(expected_inputs, replys, 965 status) 966 wakey_thread do |notifier| 967 c = expect_server_to_be_invoked(notifier) 968 expected_inputs.each { |i| expect(c.remote_read).to eq(i) } 969 replys.each { |r| c.remote_send(r) } 970 c.send_status(status, status == @pass ? 'OK' : 'NOK', true) 971 close_active_server_call(c) 972 end 973 end 974 975 def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts, 976 expected_metadata: {}, 977 server_initial_md: {}, 978 server_trailing_md: {}) 979 wanted_metadata = expected_metadata.clone 980 wakey_thread do |notifier| 981 c = expect_server_to_be_invoked( 982 notifier, metadata_to_send: server_initial_md) 983 wanted_metadata.each do |k, v| 984 expect(c.metadata[k.to_s]).to eq(v) 985 end 986 expected_inputs.each do |i| 987 if client_starts 988 expect(c.remote_read).to eq(i) 989 c.remote_send(i) 990 else 991 c.remote_send(i) 992 expect(c.remote_read).to eq(i) 993 end 994 end 995 c.send_status(status, status == @pass ? 'OK' : 'NOK', true, 996 metadata: server_trailing_md) 997 close_active_server_call(c) 998 end 999 end 1000 1001 def run_client_streamer(expected_inputs, resp, status, 1002 expected_metadata: {}, 1003 server_initial_md: {}, 1004 server_trailing_md: {}) 1005 wanted_metadata = expected_metadata.clone 1006 wakey_thread do |notifier| 1007 c = expect_server_to_be_invoked( 1008 notifier, metadata_to_send: server_initial_md) 1009 expected_inputs.each { |i| expect(c.remote_read).to eq(i) } 1010 wanted_metadata.each do |k, v| 1011 expect(c.metadata[k.to_s]).to eq(v) 1012 end 1013 c.remote_send(resp) 1014 c.send_status(status, status == @pass ? 'OK' : 'NOK', true, 1015 metadata: server_trailing_md) 1016 close_active_server_call(c) 1017 end 1018 end 1019 1020 def run_server_streamer_handle_client_cancellation( 1021 expected_input, replys) 1022 wakey_thread do |notifier| 1023 c = expect_server_to_be_invoked(notifier) 1024 expect(c.remote_read).to eq(expected_input) 1025 begin 1026 replys.each { |r| c.remote_send(r) } 1027 rescue GRPC::Core::CallError 1028 # An attempt to write to the client might fail. This is ok 1029 # because the client call is expected to cancel the call, 1030 # and there is a race as for when the server-side call will 1031 # start to fail. 1032 p 'remote_send failed (allowed because call expected to cancel)' 1033 ensure 1034 c.send_status(OK, 'OK', true) 1035 close_active_server_call(c) 1036 end 1037 end 1038 end 1039 1040 def run_request_response(expected_input, resp, status, 1041 expected_metadata: {}, 1042 server_initial_md: {}, 1043 server_trailing_md: {}) 1044 wanted_metadata = expected_metadata.clone 1045 wakey_thread do |notifier| 1046 c = expect_server_to_be_invoked( 1047 notifier, metadata_to_send: server_initial_md) 1048 expect(c.remote_read).to eq(expected_input) 1049 wanted_metadata.each do |k, v| 1050 expect(c.metadata[k.to_s]).to eq(v) 1051 end 1052 c.remote_send(resp) 1053 c.send_status(status, status == @pass ? 'OK' : 'NOK', true, 1054 metadata: server_trailing_md) 1055 close_active_server_call(c) 1056 end 1057 end 1058 1059 def create_secure_test_server 1060 certs = load_test_certs 1061 secure_credentials = GRPC::Core::ServerCredentials.new( 1062 nil, [{ private_key: certs[1], cert_chain: certs[2] }], false) 1063 1064 @server = new_core_server_for_testing(nil) 1065 @server.add_http2_port('0.0.0.0:0', secure_credentials) 1066 end 1067 1068 def create_test_server 1069 @server = new_core_server_for_testing(nil) 1070 @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure) 1071 end 1072 1073 def expect_server_to_be_invoked(notifier, metadata_to_send: nil) 1074 @server.start 1075 notifier.notify(nil) 1076 recvd_rpc = @server.request_call 1077 recvd_call = recvd_rpc.call 1078 recvd_call.metadata = recvd_rpc.metadata 1079 recvd_call.run_batch(SEND_INITIAL_METADATA => metadata_to_send) 1080 GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE, 1081 metadata_received: true) 1082 end 1083end 1084