1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import collections 16import contextlib 17import errno 18import itertools 19import os 20import shutil 21import subprocess 22import sys 23import tempfile 24import threading 25import unittest 26 27import grpc 28import grpc.experimental 29 30import tests.protoc_plugin.protos.payload.test_payload_pb2 as payload_pb2 31import tests.protoc_plugin.protos.requests.r.test_requests_pb2 as request_pb2 32import tests.protoc_plugin.protos.responses.test_responses_pb2 as response_pb2 33import tests.protoc_plugin.protos.service.test_service_pb2_grpc as service_pb2_grpc 34from tests.unit import test_common 35from tests.unit.framework.common import test_constants 36 37# Identifiers of entities we expect to find in the generated module. 38STUB_IDENTIFIER = "TestServiceStub" 39SERVICER_IDENTIFIER = "TestServiceServicer" 40ADD_SERVICER_TO_SERVER_IDENTIFIER = "add_TestServiceServicer_to_server" 41 42 43class _ServicerMethods(object): 44 def __init__(self): 45 self._condition = threading.Condition() 46 self._paused = False 47 self._fail = False 48 49 @contextlib.contextmanager 50 def pause(self): # pylint: disable=invalid-name 51 with self._condition: 52 self._paused = True 53 yield 54 with self._condition: 55 self._paused = False 56 self._condition.notify_all() 57 58 @contextlib.contextmanager 59 def fail(self): # pylint: disable=invalid-name 60 with self._condition: 61 self._fail = True 62 yield 63 with self._condition: 64 self._fail = False 65 66 def _control(self): # pylint: disable=invalid-name 67 with self._condition: 68 if self._fail: 69 raise ValueError() 70 while self._paused: 71 self._condition.wait() 72 73 def UnaryCall(self, request, unused_rpc_context): 74 response = response_pb2.SimpleResponse() 75 response.payload.payload_type = payload_pb2.COMPRESSABLE 76 response.payload.payload_compressable = "a" * request.response_size 77 self._control() 78 return response 79 80 def StreamingOutputCall(self, request, unused_rpc_context): 81 for parameter in request.response_parameters: 82 response = response_pb2.StreamingOutputCallResponse() 83 response.payload.payload_type = payload_pb2.COMPRESSABLE 84 response.payload.payload_compressable = "a" * parameter.size 85 self._control() 86 yield response 87 88 def StreamingInputCall(self, request_iter, unused_rpc_context): 89 response = response_pb2.StreamingInputCallResponse() 90 aggregated_payload_size = 0 91 for request in request_iter: 92 aggregated_payload_size += len(request.payload.payload_compressable) 93 response.aggregated_payload_size = aggregated_payload_size 94 self._control() 95 return response 96 97 def FullDuplexCall(self, request_iter, unused_rpc_context): 98 for request in request_iter: 99 for parameter in request.response_parameters: 100 response = response_pb2.StreamingOutputCallResponse() 101 response.payload.payload_type = payload_pb2.COMPRESSABLE 102 response.payload.payload_compressable = "a" * parameter.size 103 self._control() 104 yield response 105 106 def HalfDuplexCall(self, request_iter, unused_rpc_context): 107 responses = [] 108 for request in request_iter: 109 for parameter in request.response_parameters: 110 response = response_pb2.StreamingOutputCallResponse() 111 response.payload.payload_type = payload_pb2.COMPRESSABLE 112 response.payload.payload_compressable = "a" * parameter.size 113 self._control() 114 responses.append(response) 115 for response in responses: 116 yield response 117 118 119class _Service( 120 collections.namedtuple( 121 "_Service", 122 ( 123 "servicer_methods", 124 "server", 125 "stub", 126 ), 127 ) 128): 129 """A live and running service. 130 131 Attributes: 132 servicer_methods: The _ServicerMethods servicing RPCs. 133 server: The grpc.Server servicing RPCs. 134 stub: A stub on which to invoke RPCs. 135 """ 136 137 138def _CreateService(): 139 """Provides a servicer backend and a stub. 140 141 Returns: 142 A _Service with which to test RPCs. 143 """ 144 servicer_methods = _ServicerMethods() 145 146 class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)): 147 def UnaryCall(self, request, context): 148 return servicer_methods.UnaryCall(request, context) 149 150 def StreamingOutputCall(self, request, context): 151 return servicer_methods.StreamingOutputCall(request, context) 152 153 def StreamingInputCall(self, request_iterator, context): 154 return servicer_methods.StreamingInputCall( 155 request_iterator, context 156 ) 157 158 def FullDuplexCall(self, request_iterator, context): 159 return servicer_methods.FullDuplexCall(request_iterator, context) 160 161 def HalfDuplexCall(self, request_iterator, context): 162 return servicer_methods.HalfDuplexCall(request_iterator, context) 163 164 server = test_common.test_server() 165 getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)( 166 Servicer(), server 167 ) 168 port = server.add_insecure_port("[::]:0") 169 server.start() 170 channel = grpc.insecure_channel("localhost:{}".format(port)) 171 stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel) 172 return _Service(servicer_methods, server, stub) 173 174 175def _CreateIncompleteService(): 176 """Provides a servicer backend that fails to implement methods and its stub. 177 178 Returns: 179 A _Service with which to test RPCs. The returned _Service's 180 servicer_methods implements none of the methods required of it. 181 """ 182 183 class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)): 184 pass 185 186 server = test_common.test_server() 187 getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)( 188 Servicer(), server 189 ) 190 port = server.add_insecure_port("[::]:0") 191 server.start() 192 channel = grpc.insecure_channel("localhost:{}".format(port)) 193 stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel) 194 return _Service(None, server, stub) 195 196 197def _streaming_input_request_iterator(): 198 for _ in range(3): 199 request = request_pb2.StreamingInputCallRequest() 200 request.payload.payload_type = payload_pb2.COMPRESSABLE 201 request.payload.payload_compressable = "a" 202 yield request 203 204 205def _streaming_output_request(): 206 request = request_pb2.StreamingOutputCallRequest() 207 sizes = [1, 2, 3] 208 request.response_parameters.add(size=sizes[0], interval_us=0) 209 request.response_parameters.add(size=sizes[1], interval_us=0) 210 request.response_parameters.add(size=sizes[2], interval_us=0) 211 return request 212 213 214def _full_duplex_request_iterator(): 215 request = request_pb2.StreamingOutputCallRequest() 216 request.response_parameters.add(size=1, interval_us=0) 217 yield request 218 request = request_pb2.StreamingOutputCallRequest() 219 request.response_parameters.add(size=2, interval_us=0) 220 request.response_parameters.add(size=3, interval_us=0) 221 yield request 222 223 224class PythonPluginTest(unittest.TestCase): 225 """Test case for the gRPC Python protoc-plugin. 226 227 While reading these tests, remember that the futures API 228 (`stub.method.future()`) only gives futures for the *response-unary* 229 methods and does not exist for response-streaming methods. 230 """ 231 232 def testImportAttributes(self): 233 # check that we can access the generated module and its members. 234 self.assertIsNotNone(getattr(service_pb2_grpc, STUB_IDENTIFIER, None)) 235 self.assertIsNotNone( 236 getattr(service_pb2_grpc, SERVICER_IDENTIFIER, None) 237 ) 238 self.assertIsNotNone( 239 getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER, None) 240 ) 241 242 def testUpDown(self): 243 service = _CreateService() 244 self.assertIsNotNone(service.servicer_methods) 245 self.assertIsNotNone(service.server) 246 self.assertIsNotNone(service.stub) 247 service.server.stop(None) 248 249 def testIncompleteServicer(self): 250 service = _CreateIncompleteService() 251 request = request_pb2.SimpleRequest(response_size=13) 252 with self.assertRaises(grpc.RpcError) as exception_context: 253 service.stub.UnaryCall(request) 254 self.assertIs( 255 exception_context.exception.code(), grpc.StatusCode.UNIMPLEMENTED 256 ) 257 service.server.stop(None) 258 259 def testUnaryCall(self): 260 service = _CreateService() 261 request = request_pb2.SimpleRequest(response_size=13) 262 response = service.stub.UnaryCall(request) 263 expected_response = service.servicer_methods.UnaryCall( 264 request, "not a real context!" 265 ) 266 self.assertEqual(expected_response, response) 267 service.server.stop(None) 268 269 def testUnaryCallFuture(self): 270 service = _CreateService() 271 request = request_pb2.SimpleRequest(response_size=13) 272 # Check that the call does not block waiting for the server to respond. 273 with service.servicer_methods.pause(): 274 response_future = service.stub.UnaryCall.future(request) 275 response = response_future.result() 276 expected_response = service.servicer_methods.UnaryCall( 277 request, "not a real RpcContext!" 278 ) 279 self.assertEqual(expected_response, response) 280 service.server.stop(None) 281 282 def testUnaryCallFutureExpired(self): 283 service = _CreateService() 284 request = request_pb2.SimpleRequest(response_size=13) 285 with service.servicer_methods.pause(): 286 response_future = service.stub.UnaryCall.future( 287 request, timeout=test_constants.SHORT_TIMEOUT 288 ) 289 with self.assertRaises(grpc.RpcError) as exception_context: 290 response_future.result() 291 self.assertIs( 292 exception_context.exception.code(), 293 grpc.StatusCode.DEADLINE_EXCEEDED, 294 ) 295 self.assertIs(response_future.code(), grpc.StatusCode.DEADLINE_EXCEEDED) 296 service.server.stop(None) 297 298 def testUnaryCallFutureCancelled(self): 299 service = _CreateService() 300 request = request_pb2.SimpleRequest(response_size=13) 301 with service.servicer_methods.pause(): 302 response_future = service.stub.UnaryCall.future(request) 303 response_future.cancel() 304 self.assertTrue(response_future.cancelled()) 305 self.assertIs(response_future.code(), grpc.StatusCode.CANCELLED) 306 service.server.stop(None) 307 308 def testUnaryCallFutureFailed(self): 309 service = _CreateService() 310 request = request_pb2.SimpleRequest(response_size=13) 311 with service.servicer_methods.fail(): 312 response_future = service.stub.UnaryCall.future(request) 313 self.assertIsNotNone(response_future.exception()) 314 self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN) 315 service.server.stop(None) 316 317 def testStreamingOutputCall(self): 318 service = _CreateService() 319 request = _streaming_output_request() 320 responses = service.stub.StreamingOutputCall(request) 321 expected_responses = service.servicer_methods.StreamingOutputCall( 322 request, "not a real RpcContext!" 323 ) 324 for expected_response, response in itertools.zip_longest( 325 expected_responses, responses 326 ): 327 self.assertEqual(expected_response, response) 328 service.server.stop(None) 329 330 def testStreamingOutputCallExpired(self): 331 service = _CreateService() 332 request = _streaming_output_request() 333 with service.servicer_methods.pause(): 334 responses = service.stub.StreamingOutputCall( 335 request, timeout=test_constants.SHORT_TIMEOUT 336 ) 337 with self.assertRaises(grpc.RpcError) as exception_context: 338 list(responses) 339 self.assertIs( 340 exception_context.exception.code(), 341 grpc.StatusCode.DEADLINE_EXCEEDED, 342 ) 343 service.server.stop(None) 344 345 def testStreamingOutputCallCancelled(self): 346 service = _CreateService() 347 request = _streaming_output_request() 348 responses = service.stub.StreamingOutputCall(request) 349 next(responses) 350 responses.cancel() 351 with self.assertRaises(grpc.RpcError) as exception_context: 352 next(responses) 353 self.assertIs(responses.code(), grpc.StatusCode.CANCELLED) 354 service.server.stop(None) 355 356 def testStreamingOutputCallFailed(self): 357 service = _CreateService() 358 request = _streaming_output_request() 359 with service.servicer_methods.fail(): 360 responses = service.stub.StreamingOutputCall(request) 361 self.assertIsNotNone(responses) 362 with self.assertRaises(grpc.RpcError) as exception_context: 363 next(responses) 364 self.assertIs( 365 exception_context.exception.code(), grpc.StatusCode.UNKNOWN 366 ) 367 service.server.stop(None) 368 369 def testStreamingInputCall(self): 370 service = _CreateService() 371 response = service.stub.StreamingInputCall( 372 _streaming_input_request_iterator() 373 ) 374 expected_response = service.servicer_methods.StreamingInputCall( 375 _streaming_input_request_iterator(), "not a real RpcContext!" 376 ) 377 self.assertEqual(expected_response, response) 378 service.server.stop(None) 379 380 def testStreamingInputCallFuture(self): 381 service = _CreateService() 382 with service.servicer_methods.pause(): 383 response_future = service.stub.StreamingInputCall.future( 384 _streaming_input_request_iterator() 385 ) 386 response = response_future.result() 387 expected_response = service.servicer_methods.StreamingInputCall( 388 _streaming_input_request_iterator(), "not a real RpcContext!" 389 ) 390 self.assertEqual(expected_response, response) 391 service.server.stop(None) 392 393 def testStreamingInputCallFutureExpired(self): 394 service = _CreateService() 395 with service.servicer_methods.pause(): 396 response_future = service.stub.StreamingInputCall.future( 397 _streaming_input_request_iterator(), 398 timeout=test_constants.SHORT_TIMEOUT, 399 ) 400 with self.assertRaises(grpc.RpcError) as exception_context: 401 response_future.result() 402 self.assertIsInstance(response_future.exception(), grpc.RpcError) 403 self.assertIs( 404 response_future.exception().code(), 405 grpc.StatusCode.DEADLINE_EXCEEDED, 406 ) 407 self.assertIs( 408 exception_context.exception.code(), 409 grpc.StatusCode.DEADLINE_EXCEEDED, 410 ) 411 service.server.stop(None) 412 413 def testStreamingInputCallFutureCancelled(self): 414 service = _CreateService() 415 with service.servicer_methods.pause(): 416 response_future = service.stub.StreamingInputCall.future( 417 _streaming_input_request_iterator() 418 ) 419 response_future.cancel() 420 self.assertTrue(response_future.cancelled()) 421 with self.assertRaises(grpc.FutureCancelledError): 422 response_future.result() 423 service.server.stop(None) 424 425 def testStreamingInputCallFutureFailed(self): 426 service = _CreateService() 427 with service.servicer_methods.fail(): 428 response_future = service.stub.StreamingInputCall.future( 429 _streaming_input_request_iterator() 430 ) 431 self.assertIsNotNone(response_future.exception()) 432 self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN) 433 service.server.stop(None) 434 435 def testFullDuplexCall(self): 436 service = _CreateService() 437 responses = service.stub.FullDuplexCall(_full_duplex_request_iterator()) 438 expected_responses = service.servicer_methods.FullDuplexCall( 439 _full_duplex_request_iterator(), "not a real RpcContext!" 440 ) 441 for expected_response, response in itertools.zip_longest( 442 expected_responses, responses 443 ): 444 self.assertEqual(expected_response, response) 445 service.server.stop(None) 446 447 def testFullDuplexCallExpired(self): 448 request_iterator = _full_duplex_request_iterator() 449 service = _CreateService() 450 with service.servicer_methods.pause(): 451 responses = service.stub.FullDuplexCall( 452 request_iterator, timeout=test_constants.SHORT_TIMEOUT 453 ) 454 with self.assertRaises(grpc.RpcError) as exception_context: 455 list(responses) 456 self.assertIs( 457 exception_context.exception.code(), 458 grpc.StatusCode.DEADLINE_EXCEEDED, 459 ) 460 service.server.stop(None) 461 462 def testFullDuplexCallCancelled(self): 463 service = _CreateService() 464 request_iterator = _full_duplex_request_iterator() 465 responses = service.stub.FullDuplexCall(request_iterator) 466 next(responses) 467 responses.cancel() 468 with self.assertRaises(grpc.RpcError) as exception_context: 469 next(responses) 470 self.assertIs( 471 exception_context.exception.code(), grpc.StatusCode.CANCELLED 472 ) 473 service.server.stop(None) 474 475 def testFullDuplexCallFailed(self): 476 request_iterator = _full_duplex_request_iterator() 477 service = _CreateService() 478 with service.servicer_methods.fail(): 479 responses = service.stub.FullDuplexCall(request_iterator) 480 with self.assertRaises(grpc.RpcError) as exception_context: 481 next(responses) 482 self.assertIs( 483 exception_context.exception.code(), grpc.StatusCode.UNKNOWN 484 ) 485 service.server.stop(None) 486 487 def testHalfDuplexCall(self): 488 service = _CreateService() 489 490 def half_duplex_request_iterator(): 491 request = request_pb2.StreamingOutputCallRequest() 492 request.response_parameters.add(size=1, interval_us=0) 493 yield request 494 request = request_pb2.StreamingOutputCallRequest() 495 request.response_parameters.add(size=2, interval_us=0) 496 request.response_parameters.add(size=3, interval_us=0) 497 yield request 498 499 responses = service.stub.HalfDuplexCall(half_duplex_request_iterator()) 500 expected_responses = service.servicer_methods.HalfDuplexCall( 501 half_duplex_request_iterator(), "not a real RpcContext!" 502 ) 503 for expected_response, response in itertools.zip_longest( 504 expected_responses, responses 505 ): 506 self.assertEqual(expected_response, response) 507 service.server.stop(None) 508 509 def testHalfDuplexCallWedged(self): 510 condition = threading.Condition() 511 wait_cell = [False] 512 513 @contextlib.contextmanager 514 def wait(): # pylint: disable=invalid-name 515 # Where's Python 3's 'nonlocal' statement when you need it? 516 with condition: 517 wait_cell[0] = True 518 yield 519 with condition: 520 wait_cell[0] = False 521 condition.notify_all() 522 523 def half_duplex_request_iterator(): 524 request = request_pb2.StreamingOutputCallRequest() 525 request.response_parameters.add(size=1, interval_us=0) 526 yield request 527 with condition: 528 while wait_cell[0]: 529 condition.wait() 530 531 service = _CreateService() 532 with wait(): 533 responses = service.stub.HalfDuplexCall( 534 half_duplex_request_iterator(), 535 timeout=test_constants.SHORT_TIMEOUT, 536 ) 537 # half-duplex waits for the client to send all info 538 with self.assertRaises(grpc.RpcError) as exception_context: 539 next(responses) 540 self.assertIs( 541 exception_context.exception.code(), 542 grpc.StatusCode.DEADLINE_EXCEEDED, 543 ) 544 service.server.stop(None) 545 546 def testRegisteredMethod(self): 547 """Tests that we're setting _registered_call_handle when create call using generated stub.""" 548 service = _CreateService() 549 self.assertTrue(service.stub.UnaryCall._registered_call_handle) 550 self.assertTrue( 551 service.stub.StreamingOutputCall._registered_call_handle 552 ) 553 self.assertTrue(service.stub.StreamingInputCall._registered_call_handle) 554 self.assertTrue(service.stub.FullDuplexCall._registered_call_handle) 555 service.server.stop(None) 556 557 558@unittest.skipIf( 559 sys.version_info[0] < 3 or sys.version_info[1] < 6, 560 "Unsupported on Python 2.", 561) 562class SimpleStubsPluginTest(unittest.TestCase): 563 servicer_methods = _ServicerMethods() 564 565 class Servicer(service_pb2_grpc.TestServiceServicer): 566 def UnaryCall(self, request, context): 567 return SimpleStubsPluginTest.servicer_methods.UnaryCall( 568 request, context 569 ) 570 571 def StreamingOutputCall(self, request, context): 572 return SimpleStubsPluginTest.servicer_methods.StreamingOutputCall( 573 request, context 574 ) 575 576 def StreamingInputCall(self, request_iterator, context): 577 return SimpleStubsPluginTest.servicer_methods.StreamingInputCall( 578 request_iterator, context 579 ) 580 581 def FullDuplexCall(self, request_iterator, context): 582 return SimpleStubsPluginTest.servicer_methods.FullDuplexCall( 583 request_iterator, context 584 ) 585 586 def HalfDuplexCall(self, request_iterator, context): 587 return SimpleStubsPluginTest.servicer_methods.HalfDuplexCall( 588 request_iterator, context 589 ) 590 591 def setUp(self): 592 super(SimpleStubsPluginTest, self).setUp() 593 self._server = test_common.test_server() 594 service_pb2_grpc.add_TestServiceServicer_to_server( 595 self.Servicer(), self._server 596 ) 597 self._port = self._server.add_insecure_port("[::]:0") 598 self._server.start() 599 self._target = "localhost:{}".format(self._port) 600 601 def tearDown(self): 602 self._server.stop(None) 603 super(SimpleStubsPluginTest, self).tearDown() 604 605 def testUnaryCall(self): 606 request = request_pb2.SimpleRequest(response_size=13) 607 response = service_pb2_grpc.TestService.UnaryCall( 608 request, 609 self._target, 610 channel_credentials=grpc.experimental.insecure_channel_credentials(), 611 wait_for_ready=True, 612 ) 613 expected_response = self.servicer_methods.UnaryCall( 614 request, "not a real context!" 615 ) 616 self.assertEqual(expected_response, response) 617 618 def testUnaryCallInsecureSugar(self): 619 request = request_pb2.SimpleRequest(response_size=13) 620 response = service_pb2_grpc.TestService.UnaryCall( 621 request, self._target, insecure=True, wait_for_ready=True 622 ) 623 expected_response = self.servicer_methods.UnaryCall( 624 request, "not a real context!" 625 ) 626 self.assertEqual(expected_response, response) 627 628 def testStreamingOutputCall(self): 629 request = _streaming_output_request() 630 expected_responses = self.servicer_methods.StreamingOutputCall( 631 request, "not a real RpcContext!" 632 ) 633 responses = service_pb2_grpc.TestService.StreamingOutputCall( 634 request, 635 self._target, 636 channel_credentials=grpc.experimental.insecure_channel_credentials(), 637 wait_for_ready=True, 638 ) 639 for expected_response, response in itertools.zip_longest( 640 expected_responses, responses 641 ): 642 self.assertEqual(expected_response, response) 643 644 def testStreamingInputCall(self): 645 response = service_pb2_grpc.TestService.StreamingInputCall( 646 _streaming_input_request_iterator(), 647 self._target, 648 channel_credentials=grpc.experimental.insecure_channel_credentials(), 649 wait_for_ready=True, 650 ) 651 expected_response = self.servicer_methods.StreamingInputCall( 652 _streaming_input_request_iterator(), "not a real RpcContext!" 653 ) 654 self.assertEqual(expected_response, response) 655 656 def testFullDuplexCall(self): 657 responses = service_pb2_grpc.TestService.FullDuplexCall( 658 _full_duplex_request_iterator(), 659 self._target, 660 channel_credentials=grpc.experimental.insecure_channel_credentials(), 661 wait_for_ready=True, 662 ) 663 expected_responses = self.servicer_methods.FullDuplexCall( 664 _full_duplex_request_iterator(), "not a real RpcContext!" 665 ) 666 for expected_response, response in itertools.zip_longest( 667 expected_responses, responses 668 ): 669 self.assertEqual(expected_response, response) 670 671 def testHalfDuplexCall(self): 672 def half_duplex_request_iterator(): 673 request = request_pb2.StreamingOutputCallRequest() 674 request.response_parameters.add(size=1, interval_us=0) 675 yield request 676 request = request_pb2.StreamingOutputCallRequest() 677 request.response_parameters.add(size=2, interval_us=0) 678 request.response_parameters.add(size=3, interval_us=0) 679 yield request 680 681 responses = service_pb2_grpc.TestService.HalfDuplexCall( 682 half_duplex_request_iterator(), 683 self._target, 684 channel_credentials=grpc.experimental.insecure_channel_credentials(), 685 wait_for_ready=True, 686 ) 687 expected_responses = self.servicer_methods.HalfDuplexCall( 688 half_duplex_request_iterator(), "not a real RpcContext!" 689 ) 690 for expected_response, response in itertools.zip_longest( 691 expected_responses, responses 692 ): 693 self.assertEqual(expected_response, response) 694 695 696class ModuleMainTest(unittest.TestCase): 697 """Test case for running `python -m grpc_tools.protoc`.""" 698 699 def test_clean_output(self): 700 if sys.executable is None: 701 raise unittest.SkipTest( 702 "Running on a interpreter that cannot be invoked from the CLI." 703 ) 704 proto_dir_path = os.path.join("src", "proto") 705 test_proto_path = os.path.join( 706 proto_dir_path, "grpc", "testing", "empty.proto" 707 ) 708 streams = tuple(tempfile.TemporaryFile() for _ in range(2)) 709 work_dir = tempfile.mkdtemp() 710 try: 711 invocation = ( 712 sys.executable, 713 "-m", 714 "grpc_tools.protoc", 715 "--proto_path", 716 proto_dir_path, 717 "--python_out", 718 work_dir, 719 "--grpc_python_out", 720 work_dir, 721 test_proto_path, 722 ) 723 proc = subprocess.Popen( 724 invocation, stdout=streams[0], stderr=streams[1] 725 ) 726 proc.wait() 727 outs = [] 728 for stream in streams: 729 stream.seek(0) 730 self.assertEqual(0, len(stream.read())) 731 self.assertEqual(0, proc.returncode) 732 except Exception: # pylint: disable=broad-except 733 shutil.rmtree(work_dir) 734 735 736if __name__ == "__main__": 737 unittest.main(verbosity=2) 738