xref: /aosp_15_r20/external/grpc-grpc/test/core/iomgr/endpoint_pair_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/lib/iomgr/endpoint_pair.h"
20 
21 #include <chrono>
22 
23 #include <gtest/gtest.h>
24 
25 #include <grpc/event_engine/event_engine.h>
26 #include <grpc/grpc.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/time.h>
30 
31 #include "src/core/lib/channel/channel_args.h"
32 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
33 #include "src/core/lib/event_engine/default_event_engine.h"
34 #include "src/core/lib/event_engine/shim.h"
35 #include "src/core/lib/event_engine/tcp_socket_utils.h"
36 #include "src/core/lib/gpr/useful.h"
37 #include "src/core/lib/gprpp/notification.h"
38 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
39 #include "src/core/lib/resource_quota/memory_quota.h"
40 #include "test/core/iomgr/endpoint_tests.h"
41 #include "test/core/util/port.h"
42 #include "test/core/util/test_config.h"
43 
44 using namespace std::chrono_literals;
45 
46 namespace {
47 
48 using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
49 using ::grpc_event_engine::experimental::EventEngine;
50 using ::grpc_event_engine::experimental::URIToResolvedAddress;
51 
grpc_iomgr_event_engine_shim_endpoint_pair(grpc_channel_args * c_args)52 grpc_endpoint_pair grpc_iomgr_event_engine_shim_endpoint_pair(
53     grpc_channel_args* c_args) {
54   grpc_core::ExecCtx ctx;
55   grpc_endpoint_pair p;
56   auto ee = grpc_event_engine::experimental::GetDefaultEventEngine();
57   auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
58   std::string target_addr = absl::StrCat(
59       "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
60   auto resolved_addr = URIToResolvedAddress(target_addr);
61   GPR_ASSERT(resolved_addr.ok());
62   std::unique_ptr<EventEngine::Endpoint> client_endpoint;
63   std::unique_ptr<EventEngine::Endpoint> server_endpoint;
64   grpc_core::Notification client_signal;
65   grpc_core::Notification server_signal;
66 
67   EventEngine::Listener::AcceptCallback accept_cb =
68       [&server_endpoint, &server_signal](
69           std::unique_ptr<EventEngine::Endpoint> ep,
70           grpc_core::MemoryAllocator /*memory_allocator*/) {
71         server_endpoint = std::move(ep);
72         server_signal.Notify();
73       };
74 
75   auto args = grpc_core::CoreConfiguration::Get()
76                   .channel_args_preconditioning()
77                   .PreconditionChannelArgs(c_args);
78   ChannelArgsEndpointConfig config(args);
79   auto listener = *ee->CreateListener(
80       std::move(accept_cb), [](absl::Status /*status*/) {}, config,
81       std::make_unique<grpc_core::MemoryQuota>("foo"));
82 
83   GPR_ASSERT(listener->Bind(*resolved_addr).ok());
84   GPR_ASSERT(listener->Start().ok());
85 
86   ee->Connect(
87       [&client_endpoint, &client_signal](
88           absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> endpoint) {
89         GPR_ASSERT(endpoint.ok());
90         client_endpoint = std::move(*endpoint);
91         client_signal.Notify();
92       },
93       *resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-1"),
94       24h);
95 
96   client_signal.WaitForNotification();
97   server_signal.WaitForNotification();
98 
99   p.client = grpc_event_engine::experimental::grpc_event_engine_endpoint_create(
100       std::move(client_endpoint));
101   p.server = grpc_event_engine::experimental::grpc_event_engine_endpoint_create(
102       std::move(server_endpoint));
103   return p;
104 }
105 
106 }  // namespace
107 
108 static gpr_mu* g_mu;
109 static grpc_pollset* g_pollset;
110 
clean_up(void)111 static void clean_up(void) {}
112 
create_fixture_endpoint_pair(size_t slice_size)113 static grpc_endpoint_test_fixture create_fixture_endpoint_pair(
114     size_t slice_size) {
115   grpc_core::ExecCtx exec_ctx;
116   grpc_endpoint_test_fixture f;
117   grpc_arg a[1];
118   a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
119   a[0].type = GRPC_ARG_INTEGER;
120   a[0].value.integer = static_cast<int>(slice_size);
121   grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
122   grpc_endpoint_pair p;
123   if (grpc_event_engine::experimental::UseEventEngineClient()) {
124     p = grpc_iomgr_event_engine_shim_endpoint_pair(&args);
125   } else {
126     p = grpc_iomgr_create_endpoint_pair("test", &args);
127   }
128   f.client_ep = p.client;
129   f.server_ep = p.server;
130   grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
131   grpc_endpoint_add_to_pollset(f.server_ep, g_pollset);
132 
133   return f;
134 }
135 
136 static grpc_endpoint_test_config configs[] = {
137     {"tcp/tcp_socketpair", create_fixture_endpoint_pair, clean_up},
138 };
139 
destroy_pollset(void * p,grpc_error_handle)140 static void destroy_pollset(void* p, grpc_error_handle /*error*/) {
141   grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
142 }
143 
TEST(EndpointPairTest,MainTest)144 TEST(EndpointPairTest, MainTest) {
145 #ifdef GPR_WINDOWS
146   if (grpc_event_engine::experimental::UseEventEngineClient()) {
147     gpr_log(GPR_INFO, "Skipping pathological EventEngine test on Windows");
148     return;
149   }
150 #endif
151   grpc_closure destroyed;
152   grpc_init();
153   {
154     grpc_core::ExecCtx exec_ctx;
155     g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
156     grpc_pollset_init(g_pollset, &g_mu);
157     grpc_endpoint_tests(configs[0], g_pollset, g_mu);
158     GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
159                       grpc_schedule_on_exec_ctx);
160     grpc_pollset_shutdown(g_pollset, &destroyed);
161   }
162   grpc_shutdown();
163   gpr_free(g_pollset);
164 }
165 
main(int argc,char ** argv)166 int main(int argc, char** argv) {
167   grpc::testing::TestEnvironment env(&argc, argv);
168   ::testing::InitGoogleTest(&argc, argv);
169   return RUN_ALL_TESTS();
170 }
171