xref: /aosp_15_r20/external/grpc-grpc/test/core/event_engine/event_engine_test_utils.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "test/core/event_engine/event_engine_test_utils.h"
16 
17 #include <stdlib.h>
18 
19 #include <algorithm>
20 #include <memory>
21 #include <random>
22 #include <string>
23 #include <utility>
24 
25 #include "absl/status/status.h"
26 #include "absl/status/statusor.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/time/clock.h"
29 #include "absl/time/time.h"
30 
31 #include <grpc/event_engine/event_engine.h>
32 #include <grpc/event_engine/memory_allocator.h>
33 #include <grpc/event_engine/slice.h>
34 #include <grpc/event_engine/slice_buffer.h>
35 #include <grpc/slice_buffer.h>
36 #include <grpc/support/log.h>
37 
38 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
39 #include "src/core/lib/event_engine/tcp_socket_utils.h"
40 #include "src/core/lib/gprpp/notification.h"
41 #include "src/core/lib/gprpp/time.h"
42 #include "src/core/lib/resource_quota/memory_quota.h"
43 
44 // IWYU pragma: no_include <sys/socket.h>
45 
46 namespace grpc_event_engine {
47 namespace experimental {
48 
49 namespace {
50 constexpr int kMinMessageSize = 1024;
51 constexpr int kMaxMessageSize = 4096;
52 }  // namespace
53 
54 // Returns a random message with bounded length.
GetNextSendMessage()55 std::string GetNextSendMessage() {
56   static const char alphanum[] =
57       "0123456789"
58       "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
59       "abcdefghijklmnopqrstuvwxyz";
60   static std::random_device rd;
61   static std::seed_seq seed{rd()};
62   static std::mt19937 gen(seed);
63   static std::uniform_real_distribution<> dis(kMinMessageSize, kMaxMessageSize);
64   static grpc_core::Mutex g_mu;
65   std::string tmp_s;
66   int len;
67   {
68     grpc_core::MutexLock lock(&g_mu);
69     len = dis(gen);
70   }
71   tmp_s.reserve(len);
72   for (int i = 0; i < len; ++i) {
73     tmp_s += alphanum[rand() % (sizeof(alphanum) - 1)];
74   }
75   return tmp_s;
76 }
77 
WaitForSingleOwner(std::shared_ptr<EventEngine> engine)78 void WaitForSingleOwner(std::shared_ptr<EventEngine> engine) {
79   while (engine.use_count() > 1) {
80     GRPC_LOG_EVERY_N_SEC(2, GPR_INFO, "engine.use_count() = %ld",
81                          engine.use_count());
82     absl::SleepFor(absl::Milliseconds(100));
83   }
84 }
85 
AppendStringToSliceBuffer(SliceBuffer * buf,absl::string_view data)86 void AppendStringToSliceBuffer(SliceBuffer* buf, absl::string_view data) {
87   buf->Append(Slice::FromCopiedString(data));
88 }
89 
ExtractSliceBufferIntoString(SliceBuffer * buf)90 std::string ExtractSliceBufferIntoString(SliceBuffer* buf) {
91   if (!buf->Length()) {
92     return std::string();
93   }
94   std::string tmp(buf->Length(), '\0');
95   char* bytes = const_cast<char*>(tmp.c_str());
96   grpc_slice_buffer_move_first_into_buffer(buf->c_slice_buffer(), buf->Length(),
97                                            bytes);
98   return tmp;
99 }
100 
SendValidatePayload(absl::string_view data,EventEngine::Endpoint * send_endpoint,EventEngine::Endpoint * receive_endpoint)101 absl::Status SendValidatePayload(absl::string_view data,
102                                  EventEngine::Endpoint* send_endpoint,
103                                  EventEngine::Endpoint* receive_endpoint) {
104   GPR_ASSERT(receive_endpoint != nullptr && send_endpoint != nullptr);
105   int num_bytes_written = data.size();
106   grpc_core::Notification read_signal;
107   grpc_core::Notification write_signal;
108   SliceBuffer read_slice_buf;
109   SliceBuffer read_store_buf;
110   SliceBuffer write_slice_buf;
111 
112   read_slice_buf.Clear();
113   write_slice_buf.Clear();
114   read_store_buf.Clear();
115   // std::cout << "SendValidatePayload ... " << std::endl;
116   // fflush(stdout);
117 
118   AppendStringToSliceBuffer(&write_slice_buf, data);
119   EventEngine::Endpoint::ReadArgs args = {num_bytes_written};
120   std::function<void(absl::Status)> read_cb;
121   read_cb = [receive_endpoint, &read_slice_buf, &read_store_buf, &read_cb,
122              &read_signal, &args](absl::Status status) {
123     GPR_ASSERT(status.ok());
124     if (read_slice_buf.Length() == static_cast<size_t>(args.read_hint_bytes)) {
125       read_slice_buf.MoveFirstNBytesIntoSliceBuffer(read_slice_buf.Length(),
126                                                     read_store_buf);
127       read_signal.Notify();
128       return;
129     }
130     args.read_hint_bytes -= read_slice_buf.Length();
131     read_slice_buf.MoveFirstNBytesIntoSliceBuffer(read_slice_buf.Length(),
132                                                   read_store_buf);
133     if (receive_endpoint->Read(read_cb, &read_slice_buf, &args)) {
134       GPR_ASSERT(read_slice_buf.Length() != 0);
135       read_cb(absl::OkStatus());
136     }
137   };
138   // Start asynchronous reading at the receive_endpoint.
139   if (receive_endpoint->Read(read_cb, &read_slice_buf, &args)) {
140     read_cb(absl::OkStatus());
141   }
142   // Start asynchronous writing at the send_endpoint.
143   if (send_endpoint->Write(
144           [&write_signal](absl::Status status) {
145             GPR_ASSERT(status.ok());
146             write_signal.Notify();
147           },
148           &write_slice_buf, nullptr)) {
149     write_signal.Notify();
150   }
151   write_signal.WaitForNotification();
152   read_signal.WaitForNotification();
153   // Check if data written == data read
154   std::string data_read = ExtractSliceBufferIntoString(&read_store_buf);
155   if (data != data_read) {
156     gpr_log(GPR_INFO, "Data written = %s", data.data());
157     gpr_log(GPR_INFO, "Data read = %s", data_read.c_str());
158     return absl::CancelledError("Data read != Data written");
159   }
160   return absl::OkStatus();
161 }
162 
BindAndStartListener(const std::vector<std::string> & addrs,bool listener_type_oracle)163 absl::Status ConnectionManager::BindAndStartListener(
164     const std::vector<std::string>& addrs, bool listener_type_oracle) {
165   grpc_core::MutexLock lock(&mu_);
166   if (addrs.empty()) {
167     return absl::InvalidArgumentError(
168         "Atleast one bind address must be specified");
169   }
170   for (auto& addr : addrs) {
171     if (listeners_.find(addr) != listeners_.end()) {
172       // There is already a listener at this address. Return error.
173       return absl::AlreadyExistsError(
174           absl::StrCat("Listener already existis for address: ", addr));
175     }
176   }
177   EventEngine::Listener::AcceptCallback accept_cb =
178       [this](std::unique_ptr<EventEngine::Endpoint> ep,
179              MemoryAllocator /*memory_allocator*/) {
180         last_in_progress_connection_.SetServerEndpoint(std::move(ep));
181       };
182 
183   EventEngine* event_engine = listener_type_oracle ? oracle_event_engine_.get()
184                                                    : test_event_engine_.get();
185 
186   ChannelArgsEndpointConfig config;
187   auto status = event_engine->CreateListener(
188       std::move(accept_cb),
189       [](absl::Status status) { GPR_ASSERT(status.ok()); }, config,
190       std::make_unique<grpc_core::MemoryQuota>("foo"));
191   if (!status.ok()) {
192     return status.status();
193   }
194 
195   std::shared_ptr<EventEngine::Listener> listener((*status).release());
196   for (auto& addr : addrs) {
197     auto bind_status = listener->Bind(*URIToResolvedAddress(addr));
198     if (!bind_status.ok()) {
199       gpr_log(GPR_ERROR, "Binding listener failed: %s",
200               bind_status.status().ToString().c_str());
201       return bind_status.status();
202     }
203   }
204   GPR_ASSERT(listener->Start().ok());
205   // Insert same listener pointer for all bind addresses after the listener
206   // has started successfully.
207   for (auto& addr : addrs) {
208     listeners_.insert(std::make_pair(addr, listener));
209   }
210   return absl::OkStatus();
211 }
212 
213 absl::StatusOr<std::tuple<std::unique_ptr<EventEngine::Endpoint>,
214                           std::unique_ptr<EventEngine::Endpoint>>>
CreateConnection(std::string target_addr,EventEngine::Duration timeout,bool client_type_oracle)215 ConnectionManager::CreateConnection(std::string target_addr,
216                                     EventEngine::Duration timeout,
217                                     bool client_type_oracle) {
218   // Only allow one CreateConnection call to proceed at a time.
219   grpc_core::MutexLock lock(&mu_);
220   std::string conn_name =
221       absl::StrCat("connection-", std::to_string(num_processed_connections_++));
222   EventEngine* event_engine = client_type_oracle ? oracle_event_engine_.get()
223                                                  : test_event_engine_.get();
224   ChannelArgsEndpointConfig config;
225   event_engine->Connect(
226       [this](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> status) {
227         if (!status.ok()) {
228           gpr_log(GPR_ERROR, "Connect failed: %s",
229                   status.status().ToString().c_str());
230           last_in_progress_connection_.SetClientEndpoint(nullptr);
231         } else {
232           last_in_progress_connection_.SetClientEndpoint(std::move(*status));
233         }
234       },
235       *URIToResolvedAddress(target_addr), config,
236       memory_quota_->CreateMemoryAllocator(conn_name), timeout);
237 
238   auto client_endpoint = last_in_progress_connection_.GetClientEndpoint();
239   if (client_endpoint != nullptr &&
240       listeners_.find(target_addr) != listeners_.end()) {
241     // There is a listener for the specified address. Wait until it
242     // creates a ServerEndpoint after accepting the connection.
243     auto server_endpoint = last_in_progress_connection_.GetServerEndpoint();
244     GPR_ASSERT(server_endpoint != nullptr);
245     // Set last_in_progress_connection_ to nullptr
246     return std::make_tuple(std::move(client_endpoint),
247                            std::move(server_endpoint));
248   }
249   return absl::CancelledError("Failed to create connection.");
250 }
251 
252 }  // namespace experimental
253 }  // namespace grpc_event_engine
254