xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import collections
16import contextlib
17import errno
18import itertools
19import os
20import shutil
21import subprocess
22import sys
23import tempfile
24import threading
25import unittest
26
27import grpc
28import grpc.experimental
29
30import tests.protoc_plugin.protos.payload.test_payload_pb2 as payload_pb2
31import tests.protoc_plugin.protos.requests.r.test_requests_pb2 as request_pb2
32import tests.protoc_plugin.protos.responses.test_responses_pb2 as response_pb2
33import tests.protoc_plugin.protos.service.test_service_pb2_grpc as service_pb2_grpc
34from tests.unit import test_common
35from tests.unit.framework.common import test_constants
36
37# Identifiers of entities we expect to find in the generated module.
38STUB_IDENTIFIER = "TestServiceStub"
39SERVICER_IDENTIFIER = "TestServiceServicer"
40ADD_SERVICER_TO_SERVER_IDENTIFIER = "add_TestServiceServicer_to_server"
41
42
43class _ServicerMethods(object):
44    def __init__(self):
45        self._condition = threading.Condition()
46        self._paused = False
47        self._fail = False
48
49    @contextlib.contextmanager
50    def pause(self):  # pylint: disable=invalid-name
51        with self._condition:
52            self._paused = True
53        yield
54        with self._condition:
55            self._paused = False
56            self._condition.notify_all()
57
58    @contextlib.contextmanager
59    def fail(self):  # pylint: disable=invalid-name
60        with self._condition:
61            self._fail = True
62        yield
63        with self._condition:
64            self._fail = False
65
66    def _control(self):  # pylint: disable=invalid-name
67        with self._condition:
68            if self._fail:
69                raise ValueError()
70            while self._paused:
71                self._condition.wait()
72
73    def UnaryCall(self, request, unused_rpc_context):
74        response = response_pb2.SimpleResponse()
75        response.payload.payload_type = payload_pb2.COMPRESSABLE
76        response.payload.payload_compressable = "a" * request.response_size
77        self._control()
78        return response
79
80    def StreamingOutputCall(self, request, unused_rpc_context):
81        for parameter in request.response_parameters:
82            response = response_pb2.StreamingOutputCallResponse()
83            response.payload.payload_type = payload_pb2.COMPRESSABLE
84            response.payload.payload_compressable = "a" * parameter.size
85            self._control()
86            yield response
87
88    def StreamingInputCall(self, request_iter, unused_rpc_context):
89        response = response_pb2.StreamingInputCallResponse()
90        aggregated_payload_size = 0
91        for request in request_iter:
92            aggregated_payload_size += len(request.payload.payload_compressable)
93        response.aggregated_payload_size = aggregated_payload_size
94        self._control()
95        return response
96
97    def FullDuplexCall(self, request_iter, unused_rpc_context):
98        for request in request_iter:
99            for parameter in request.response_parameters:
100                response = response_pb2.StreamingOutputCallResponse()
101                response.payload.payload_type = payload_pb2.COMPRESSABLE
102                response.payload.payload_compressable = "a" * parameter.size
103                self._control()
104                yield response
105
106    def HalfDuplexCall(self, request_iter, unused_rpc_context):
107        responses = []
108        for request in request_iter:
109            for parameter in request.response_parameters:
110                response = response_pb2.StreamingOutputCallResponse()
111                response.payload.payload_type = payload_pb2.COMPRESSABLE
112                response.payload.payload_compressable = "a" * parameter.size
113                self._control()
114                responses.append(response)
115        for response in responses:
116            yield response
117
118
119class _Service(
120    collections.namedtuple(
121        "_Service",
122        (
123            "servicer_methods",
124            "server",
125            "stub",
126        ),
127    )
128):
129    """A live and running service.
130
131    Attributes:
132      servicer_methods: The _ServicerMethods servicing RPCs.
133      server: The grpc.Server servicing RPCs.
134      stub: A stub on which to invoke RPCs.
135    """
136
137
138def _CreateService():
139    """Provides a servicer backend and a stub.
140
141    Returns:
142      A _Service with which to test RPCs.
143    """
144    servicer_methods = _ServicerMethods()
145
146    class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)):
147        def UnaryCall(self, request, context):
148            return servicer_methods.UnaryCall(request, context)
149
150        def StreamingOutputCall(self, request, context):
151            return servicer_methods.StreamingOutputCall(request, context)
152
153        def StreamingInputCall(self, request_iterator, context):
154            return servicer_methods.StreamingInputCall(
155                request_iterator, context
156            )
157
158        def FullDuplexCall(self, request_iterator, context):
159            return servicer_methods.FullDuplexCall(request_iterator, context)
160
161        def HalfDuplexCall(self, request_iterator, context):
162            return servicer_methods.HalfDuplexCall(request_iterator, context)
163
164    server = test_common.test_server()
165    getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(
166        Servicer(), server
167    )
168    port = server.add_insecure_port("[::]:0")
169    server.start()
170    channel = grpc.insecure_channel("localhost:{}".format(port))
171    stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel)
172    return _Service(servicer_methods, server, stub)
173
174
175def _CreateIncompleteService():
176    """Provides a servicer backend that fails to implement methods and its stub.
177
178    Returns:
179      A _Service with which to test RPCs. The returned _Service's
180        servicer_methods implements none of the methods required of it.
181    """
182
183    class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)):
184        pass
185
186    server = test_common.test_server()
187    getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(
188        Servicer(), server
189    )
190    port = server.add_insecure_port("[::]:0")
191    server.start()
192    channel = grpc.insecure_channel("localhost:{}".format(port))
193    stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel)
194    return _Service(None, server, stub)
195
196
197def _streaming_input_request_iterator():
198    for _ in range(3):
199        request = request_pb2.StreamingInputCallRequest()
200        request.payload.payload_type = payload_pb2.COMPRESSABLE
201        request.payload.payload_compressable = "a"
202        yield request
203
204
205def _streaming_output_request():
206    request = request_pb2.StreamingOutputCallRequest()
207    sizes = [1, 2, 3]
208    request.response_parameters.add(size=sizes[0], interval_us=0)
209    request.response_parameters.add(size=sizes[1], interval_us=0)
210    request.response_parameters.add(size=sizes[2], interval_us=0)
211    return request
212
213
214def _full_duplex_request_iterator():
215    request = request_pb2.StreamingOutputCallRequest()
216    request.response_parameters.add(size=1, interval_us=0)
217    yield request
218    request = request_pb2.StreamingOutputCallRequest()
219    request.response_parameters.add(size=2, interval_us=0)
220    request.response_parameters.add(size=3, interval_us=0)
221    yield request
222
223
224class PythonPluginTest(unittest.TestCase):
225    """Test case for the gRPC Python protoc-plugin.
226
227    While reading these tests, remember that the futures API
228    (`stub.method.future()`) only gives futures for the *response-unary*
229    methods and does not exist for response-streaming methods.
230    """
231
232    def testImportAttributes(self):
233        # check that we can access the generated module and its members.
234        self.assertIsNotNone(getattr(service_pb2_grpc, STUB_IDENTIFIER, None))
235        self.assertIsNotNone(
236            getattr(service_pb2_grpc, SERVICER_IDENTIFIER, None)
237        )
238        self.assertIsNotNone(
239            getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER, None)
240        )
241
242    def testUpDown(self):
243        service = _CreateService()
244        self.assertIsNotNone(service.servicer_methods)
245        self.assertIsNotNone(service.server)
246        self.assertIsNotNone(service.stub)
247        service.server.stop(None)
248
249    def testIncompleteServicer(self):
250        service = _CreateIncompleteService()
251        request = request_pb2.SimpleRequest(response_size=13)
252        with self.assertRaises(grpc.RpcError) as exception_context:
253            service.stub.UnaryCall(request)
254        self.assertIs(
255            exception_context.exception.code(), grpc.StatusCode.UNIMPLEMENTED
256        )
257        service.server.stop(None)
258
259    def testUnaryCall(self):
260        service = _CreateService()
261        request = request_pb2.SimpleRequest(response_size=13)
262        response = service.stub.UnaryCall(request)
263        expected_response = service.servicer_methods.UnaryCall(
264            request, "not a real context!"
265        )
266        self.assertEqual(expected_response, response)
267        service.server.stop(None)
268
269    def testUnaryCallFuture(self):
270        service = _CreateService()
271        request = request_pb2.SimpleRequest(response_size=13)
272        # Check that the call does not block waiting for the server to respond.
273        with service.servicer_methods.pause():
274            response_future = service.stub.UnaryCall.future(request)
275        response = response_future.result()
276        expected_response = service.servicer_methods.UnaryCall(
277            request, "not a real RpcContext!"
278        )
279        self.assertEqual(expected_response, response)
280        service.server.stop(None)
281
282    def testUnaryCallFutureExpired(self):
283        service = _CreateService()
284        request = request_pb2.SimpleRequest(response_size=13)
285        with service.servicer_methods.pause():
286            response_future = service.stub.UnaryCall.future(
287                request, timeout=test_constants.SHORT_TIMEOUT
288            )
289            with self.assertRaises(grpc.RpcError) as exception_context:
290                response_future.result()
291        self.assertIs(
292            exception_context.exception.code(),
293            grpc.StatusCode.DEADLINE_EXCEEDED,
294        )
295        self.assertIs(response_future.code(), grpc.StatusCode.DEADLINE_EXCEEDED)
296        service.server.stop(None)
297
298    def testUnaryCallFutureCancelled(self):
299        service = _CreateService()
300        request = request_pb2.SimpleRequest(response_size=13)
301        with service.servicer_methods.pause():
302            response_future = service.stub.UnaryCall.future(request)
303            response_future.cancel()
304        self.assertTrue(response_future.cancelled())
305        self.assertIs(response_future.code(), grpc.StatusCode.CANCELLED)
306        service.server.stop(None)
307
308    def testUnaryCallFutureFailed(self):
309        service = _CreateService()
310        request = request_pb2.SimpleRequest(response_size=13)
311        with service.servicer_methods.fail():
312            response_future = service.stub.UnaryCall.future(request)
313            self.assertIsNotNone(response_future.exception())
314        self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN)
315        service.server.stop(None)
316
317    def testStreamingOutputCall(self):
318        service = _CreateService()
319        request = _streaming_output_request()
320        responses = service.stub.StreamingOutputCall(request)
321        expected_responses = service.servicer_methods.StreamingOutputCall(
322            request, "not a real RpcContext!"
323        )
324        for expected_response, response in itertools.zip_longest(
325            expected_responses, responses
326        ):
327            self.assertEqual(expected_response, response)
328        service.server.stop(None)
329
330    def testStreamingOutputCallExpired(self):
331        service = _CreateService()
332        request = _streaming_output_request()
333        with service.servicer_methods.pause():
334            responses = service.stub.StreamingOutputCall(
335                request, timeout=test_constants.SHORT_TIMEOUT
336            )
337            with self.assertRaises(grpc.RpcError) as exception_context:
338                list(responses)
339        self.assertIs(
340            exception_context.exception.code(),
341            grpc.StatusCode.DEADLINE_EXCEEDED,
342        )
343        service.server.stop(None)
344
345    def testStreamingOutputCallCancelled(self):
346        service = _CreateService()
347        request = _streaming_output_request()
348        responses = service.stub.StreamingOutputCall(request)
349        next(responses)
350        responses.cancel()
351        with self.assertRaises(grpc.RpcError) as exception_context:
352            next(responses)
353        self.assertIs(responses.code(), grpc.StatusCode.CANCELLED)
354        service.server.stop(None)
355
356    def testStreamingOutputCallFailed(self):
357        service = _CreateService()
358        request = _streaming_output_request()
359        with service.servicer_methods.fail():
360            responses = service.stub.StreamingOutputCall(request)
361            self.assertIsNotNone(responses)
362            with self.assertRaises(grpc.RpcError) as exception_context:
363                next(responses)
364        self.assertIs(
365            exception_context.exception.code(), grpc.StatusCode.UNKNOWN
366        )
367        service.server.stop(None)
368
369    def testStreamingInputCall(self):
370        service = _CreateService()
371        response = service.stub.StreamingInputCall(
372            _streaming_input_request_iterator()
373        )
374        expected_response = service.servicer_methods.StreamingInputCall(
375            _streaming_input_request_iterator(), "not a real RpcContext!"
376        )
377        self.assertEqual(expected_response, response)
378        service.server.stop(None)
379
380    def testStreamingInputCallFuture(self):
381        service = _CreateService()
382        with service.servicer_methods.pause():
383            response_future = service.stub.StreamingInputCall.future(
384                _streaming_input_request_iterator()
385            )
386        response = response_future.result()
387        expected_response = service.servicer_methods.StreamingInputCall(
388            _streaming_input_request_iterator(), "not a real RpcContext!"
389        )
390        self.assertEqual(expected_response, response)
391        service.server.stop(None)
392
393    def testStreamingInputCallFutureExpired(self):
394        service = _CreateService()
395        with service.servicer_methods.pause():
396            response_future = service.stub.StreamingInputCall.future(
397                _streaming_input_request_iterator(),
398                timeout=test_constants.SHORT_TIMEOUT,
399            )
400            with self.assertRaises(grpc.RpcError) as exception_context:
401                response_future.result()
402        self.assertIsInstance(response_future.exception(), grpc.RpcError)
403        self.assertIs(
404            response_future.exception().code(),
405            grpc.StatusCode.DEADLINE_EXCEEDED,
406        )
407        self.assertIs(
408            exception_context.exception.code(),
409            grpc.StatusCode.DEADLINE_EXCEEDED,
410        )
411        service.server.stop(None)
412
413    def testStreamingInputCallFutureCancelled(self):
414        service = _CreateService()
415        with service.servicer_methods.pause():
416            response_future = service.stub.StreamingInputCall.future(
417                _streaming_input_request_iterator()
418            )
419            response_future.cancel()
420        self.assertTrue(response_future.cancelled())
421        with self.assertRaises(grpc.FutureCancelledError):
422            response_future.result()
423        service.server.stop(None)
424
425    def testStreamingInputCallFutureFailed(self):
426        service = _CreateService()
427        with service.servicer_methods.fail():
428            response_future = service.stub.StreamingInputCall.future(
429                _streaming_input_request_iterator()
430            )
431            self.assertIsNotNone(response_future.exception())
432            self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN)
433        service.server.stop(None)
434
435    def testFullDuplexCall(self):
436        service = _CreateService()
437        responses = service.stub.FullDuplexCall(_full_duplex_request_iterator())
438        expected_responses = service.servicer_methods.FullDuplexCall(
439            _full_duplex_request_iterator(), "not a real RpcContext!"
440        )
441        for expected_response, response in itertools.zip_longest(
442            expected_responses, responses
443        ):
444            self.assertEqual(expected_response, response)
445        service.server.stop(None)
446
447    def testFullDuplexCallExpired(self):
448        request_iterator = _full_duplex_request_iterator()
449        service = _CreateService()
450        with service.servicer_methods.pause():
451            responses = service.stub.FullDuplexCall(
452                request_iterator, timeout=test_constants.SHORT_TIMEOUT
453            )
454            with self.assertRaises(grpc.RpcError) as exception_context:
455                list(responses)
456        self.assertIs(
457            exception_context.exception.code(),
458            grpc.StatusCode.DEADLINE_EXCEEDED,
459        )
460        service.server.stop(None)
461
462    def testFullDuplexCallCancelled(self):
463        service = _CreateService()
464        request_iterator = _full_duplex_request_iterator()
465        responses = service.stub.FullDuplexCall(request_iterator)
466        next(responses)
467        responses.cancel()
468        with self.assertRaises(grpc.RpcError) as exception_context:
469            next(responses)
470        self.assertIs(
471            exception_context.exception.code(), grpc.StatusCode.CANCELLED
472        )
473        service.server.stop(None)
474
475    def testFullDuplexCallFailed(self):
476        request_iterator = _full_duplex_request_iterator()
477        service = _CreateService()
478        with service.servicer_methods.fail():
479            responses = service.stub.FullDuplexCall(request_iterator)
480            with self.assertRaises(grpc.RpcError) as exception_context:
481                next(responses)
482        self.assertIs(
483            exception_context.exception.code(), grpc.StatusCode.UNKNOWN
484        )
485        service.server.stop(None)
486
487    def testHalfDuplexCall(self):
488        service = _CreateService()
489
490        def half_duplex_request_iterator():
491            request = request_pb2.StreamingOutputCallRequest()
492            request.response_parameters.add(size=1, interval_us=0)
493            yield request
494            request = request_pb2.StreamingOutputCallRequest()
495            request.response_parameters.add(size=2, interval_us=0)
496            request.response_parameters.add(size=3, interval_us=0)
497            yield request
498
499        responses = service.stub.HalfDuplexCall(half_duplex_request_iterator())
500        expected_responses = service.servicer_methods.HalfDuplexCall(
501            half_duplex_request_iterator(), "not a real RpcContext!"
502        )
503        for expected_response, response in itertools.zip_longest(
504            expected_responses, responses
505        ):
506            self.assertEqual(expected_response, response)
507        service.server.stop(None)
508
509    def testHalfDuplexCallWedged(self):
510        condition = threading.Condition()
511        wait_cell = [False]
512
513        @contextlib.contextmanager
514        def wait():  # pylint: disable=invalid-name
515            # Where's Python 3's 'nonlocal' statement when you need it?
516            with condition:
517                wait_cell[0] = True
518            yield
519            with condition:
520                wait_cell[0] = False
521                condition.notify_all()
522
523        def half_duplex_request_iterator():
524            request = request_pb2.StreamingOutputCallRequest()
525            request.response_parameters.add(size=1, interval_us=0)
526            yield request
527            with condition:
528                while wait_cell[0]:
529                    condition.wait()
530
531        service = _CreateService()
532        with wait():
533            responses = service.stub.HalfDuplexCall(
534                half_duplex_request_iterator(),
535                timeout=test_constants.SHORT_TIMEOUT,
536            )
537            # half-duplex waits for the client to send all info
538            with self.assertRaises(grpc.RpcError) as exception_context:
539                next(responses)
540        self.assertIs(
541            exception_context.exception.code(),
542            grpc.StatusCode.DEADLINE_EXCEEDED,
543        )
544        service.server.stop(None)
545
546    def testRegisteredMethod(self):
547        """Tests that we're setting _registered_call_handle when create call using generated stub."""
548        service = _CreateService()
549        self.assertTrue(service.stub.UnaryCall._registered_call_handle)
550        self.assertTrue(
551            service.stub.StreamingOutputCall._registered_call_handle
552        )
553        self.assertTrue(service.stub.StreamingInputCall._registered_call_handle)
554        self.assertTrue(service.stub.FullDuplexCall._registered_call_handle)
555        service.server.stop(None)
556
557
558@unittest.skipIf(
559    sys.version_info[0] < 3 or sys.version_info[1] < 6,
560    "Unsupported on Python 2.",
561)
562class SimpleStubsPluginTest(unittest.TestCase):
563    servicer_methods = _ServicerMethods()
564
565    class Servicer(service_pb2_grpc.TestServiceServicer):
566        def UnaryCall(self, request, context):
567            return SimpleStubsPluginTest.servicer_methods.UnaryCall(
568                request, context
569            )
570
571        def StreamingOutputCall(self, request, context):
572            return SimpleStubsPluginTest.servicer_methods.StreamingOutputCall(
573                request, context
574            )
575
576        def StreamingInputCall(self, request_iterator, context):
577            return SimpleStubsPluginTest.servicer_methods.StreamingInputCall(
578                request_iterator, context
579            )
580
581        def FullDuplexCall(self, request_iterator, context):
582            return SimpleStubsPluginTest.servicer_methods.FullDuplexCall(
583                request_iterator, context
584            )
585
586        def HalfDuplexCall(self, request_iterator, context):
587            return SimpleStubsPluginTest.servicer_methods.HalfDuplexCall(
588                request_iterator, context
589            )
590
591    def setUp(self):
592        super(SimpleStubsPluginTest, self).setUp()
593        self._server = test_common.test_server()
594        service_pb2_grpc.add_TestServiceServicer_to_server(
595            self.Servicer(), self._server
596        )
597        self._port = self._server.add_insecure_port("[::]:0")
598        self._server.start()
599        self._target = "localhost:{}".format(self._port)
600
601    def tearDown(self):
602        self._server.stop(None)
603        super(SimpleStubsPluginTest, self).tearDown()
604
605    def testUnaryCall(self):
606        request = request_pb2.SimpleRequest(response_size=13)
607        response = service_pb2_grpc.TestService.UnaryCall(
608            request,
609            self._target,
610            channel_credentials=grpc.experimental.insecure_channel_credentials(),
611            wait_for_ready=True,
612        )
613        expected_response = self.servicer_methods.UnaryCall(
614            request, "not a real context!"
615        )
616        self.assertEqual(expected_response, response)
617
618    def testUnaryCallInsecureSugar(self):
619        request = request_pb2.SimpleRequest(response_size=13)
620        response = service_pb2_grpc.TestService.UnaryCall(
621            request, self._target, insecure=True, wait_for_ready=True
622        )
623        expected_response = self.servicer_methods.UnaryCall(
624            request, "not a real context!"
625        )
626        self.assertEqual(expected_response, response)
627
628    def testStreamingOutputCall(self):
629        request = _streaming_output_request()
630        expected_responses = self.servicer_methods.StreamingOutputCall(
631            request, "not a real RpcContext!"
632        )
633        responses = service_pb2_grpc.TestService.StreamingOutputCall(
634            request,
635            self._target,
636            channel_credentials=grpc.experimental.insecure_channel_credentials(),
637            wait_for_ready=True,
638        )
639        for expected_response, response in itertools.zip_longest(
640            expected_responses, responses
641        ):
642            self.assertEqual(expected_response, response)
643
644    def testStreamingInputCall(self):
645        response = service_pb2_grpc.TestService.StreamingInputCall(
646            _streaming_input_request_iterator(),
647            self._target,
648            channel_credentials=grpc.experimental.insecure_channel_credentials(),
649            wait_for_ready=True,
650        )
651        expected_response = self.servicer_methods.StreamingInputCall(
652            _streaming_input_request_iterator(), "not a real RpcContext!"
653        )
654        self.assertEqual(expected_response, response)
655
656    def testFullDuplexCall(self):
657        responses = service_pb2_grpc.TestService.FullDuplexCall(
658            _full_duplex_request_iterator(),
659            self._target,
660            channel_credentials=grpc.experimental.insecure_channel_credentials(),
661            wait_for_ready=True,
662        )
663        expected_responses = self.servicer_methods.FullDuplexCall(
664            _full_duplex_request_iterator(), "not a real RpcContext!"
665        )
666        for expected_response, response in itertools.zip_longest(
667            expected_responses, responses
668        ):
669            self.assertEqual(expected_response, response)
670
671    def testHalfDuplexCall(self):
672        def half_duplex_request_iterator():
673            request = request_pb2.StreamingOutputCallRequest()
674            request.response_parameters.add(size=1, interval_us=0)
675            yield request
676            request = request_pb2.StreamingOutputCallRequest()
677            request.response_parameters.add(size=2, interval_us=0)
678            request.response_parameters.add(size=3, interval_us=0)
679            yield request
680
681        responses = service_pb2_grpc.TestService.HalfDuplexCall(
682            half_duplex_request_iterator(),
683            self._target,
684            channel_credentials=grpc.experimental.insecure_channel_credentials(),
685            wait_for_ready=True,
686        )
687        expected_responses = self.servicer_methods.HalfDuplexCall(
688            half_duplex_request_iterator(), "not a real RpcContext!"
689        )
690        for expected_response, response in itertools.zip_longest(
691            expected_responses, responses
692        ):
693            self.assertEqual(expected_response, response)
694
695
696class ModuleMainTest(unittest.TestCase):
697    """Test case for running `python -m grpc_tools.protoc`."""
698
699    def test_clean_output(self):
700        if sys.executable is None:
701            raise unittest.SkipTest(
702                "Running on a interpreter that cannot be invoked from the CLI."
703            )
704        proto_dir_path = os.path.join("src", "proto")
705        test_proto_path = os.path.join(
706            proto_dir_path, "grpc", "testing", "empty.proto"
707        )
708        streams = tuple(tempfile.TemporaryFile() for _ in range(2))
709        work_dir = tempfile.mkdtemp()
710        try:
711            invocation = (
712                sys.executable,
713                "-m",
714                "grpc_tools.protoc",
715                "--proto_path",
716                proto_dir_path,
717                "--python_out",
718                work_dir,
719                "--grpc_python_out",
720                work_dir,
721                test_proto_path,
722            )
723            proc = subprocess.Popen(
724                invocation, stdout=streams[0], stderr=streams[1]
725            )
726            proc.wait()
727            outs = []
728            for stream in streams:
729                stream.seek(0)
730                self.assertEqual(0, len(stream.read()))
731            self.assertEqual(0, proc.returncode)
732        except Exception:  # pylint: disable=broad-except
733            shutil.rmtree(work_dir)
734
735
736if __name__ == "__main__":
737    unittest.main(verbosity=2)
738