xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio_tests/tests/unit/test_common.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Common code used throughout tests of gRPC."""
15
16import collections
17from concurrent import futures
18import threading
19
20import grpc
21
22INVOCATION_INITIAL_METADATA = (
23    ("0", "abc"),
24    ("1", "def"),
25    ("2", "ghi"),
26)
27SERVICE_INITIAL_METADATA = (
28    ("3", "jkl"),
29    ("4", "mno"),
30    ("5", "pqr"),
31)
32SERVICE_TERMINAL_METADATA = (
33    ("6", "stu"),
34    ("7", "vwx"),
35    ("8", "yza"),
36)
37DETAILS = "test details"
38
39
40def metadata_transmitted(original_metadata, transmitted_metadata):
41    """Judges whether or not metadata was acceptably transmitted.
42
43    gRPC is allowed to insert key-value pairs into the metadata values given by
44    applications and to reorder key-value pairs with different keys but it is not
45    allowed to alter existing key-value pairs or to reorder key-value pairs with
46    the same key.
47
48    Args:
49      original_metadata: A metadata value used in a test of gRPC. An iterable over
50        iterables of length 2.
51      transmitted_metadata: A metadata value corresponding to original_metadata
52        after having been transmitted via gRPC. An iterable over iterables of
53        length 2.
54
55    Returns:
56       A boolean indicating whether transmitted_metadata accurately reflects
57        original_metadata after having been transmitted via gRPC.
58    """
59    original = collections.defaultdict(list)
60    for key, value in original_metadata:
61        original[key].append(value)
62    transmitted = collections.defaultdict(list)
63    for key, value in transmitted_metadata:
64        transmitted[key].append(value)
65
66    for key, values in original.items():
67        transmitted_values = transmitted[key]
68        transmitted_iterator = iter(transmitted_values)
69        try:
70            for value in values:
71                while True:
72                    transmitted_value = next(transmitted_iterator)
73                    if value == transmitted_value:
74                        break
75        except StopIteration:
76            return False
77    else:
78        return True
79
80
81def test_secure_channel(target, channel_credentials, server_host_override):
82    """Creates an insecure Channel to a remote host.
83
84    Args:
85      host: The name of the remote host to which to connect.
86      port: The port of the remote host to which to connect.
87      channel_credentials: The implementations.ChannelCredentials with which to
88        connect.
89      server_host_override: The target name used for SSL host name checking.
90
91    Returns:
92      An implementations.Channel to the remote host through which RPCs may be
93        conducted.
94    """
95    channel = grpc.secure_channel(
96        target,
97        channel_credentials,
98        (
99            (
100                "grpc.ssl_target_name_override",
101                server_host_override,
102            ),
103        ),
104    )
105    return channel
106
107
108def test_server(max_workers=10, reuse_port=False):
109    """Creates an insecure grpc server.
110
111    These servers have SO_REUSEPORT disabled to prevent cross-talk.
112    """
113    return grpc.server(
114        futures.ThreadPoolExecutor(max_workers=max_workers),
115        options=(("grpc.so_reuseport", int(reuse_port)),),
116    )
117
118
119class WaitGroup(object):
120    def __init__(self, n=0):
121        self.count = n
122        self.cv = threading.Condition()
123
124    def add(self, n):
125        self.cv.acquire()
126        self.count += n
127        self.cv.release()
128
129    def done(self):
130        self.cv.acquire()
131        self.count -= 1
132        if self.count == 0:
133            self.cv.notify_all()
134        self.cv.release()
135
136    def wait(self):
137        self.cv.acquire()
138        while self.count > 0:
139            self.cv.wait()
140        self.cv.release()
141
142
143def running_under_gevent():
144    try:
145        from gevent import monkey
146        import gevent.socket
147    except ImportError:
148        return False
149    else:
150        import socket
151
152        return socket.socket is gevent.socket.socket
153