1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "test/core/event_engine/event_engine_test_utils.h"
16
17 #include <stdlib.h>
18
19 #include <algorithm>
20 #include <memory>
21 #include <random>
22 #include <string>
23 #include <utility>
24
25 #include "absl/status/status.h"
26 #include "absl/status/statusor.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/time/clock.h"
29 #include "absl/time/time.h"
30
31 #include <grpc/event_engine/event_engine.h>
32 #include <grpc/event_engine/memory_allocator.h>
33 #include <grpc/event_engine/slice.h>
34 #include <grpc/event_engine/slice_buffer.h>
35 #include <grpc/slice_buffer.h>
36 #include <grpc/support/log.h>
37
38 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
39 #include "src/core/lib/event_engine/tcp_socket_utils.h"
40 #include "src/core/lib/gprpp/notification.h"
41 #include "src/core/lib/gprpp/time.h"
42 #include "src/core/lib/resource_quota/memory_quota.h"
43
44 // IWYU pragma: no_include <sys/socket.h>
45
46 namespace grpc_event_engine {
47 namespace experimental {
48
49 namespace {
50 constexpr int kMinMessageSize = 1024;
51 constexpr int kMaxMessageSize = 4096;
52 } // namespace
53
54 // Returns a random message with bounded length.
GetNextSendMessage()55 std::string GetNextSendMessage() {
56 static const char alphanum[] =
57 "0123456789"
58 "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
59 "abcdefghijklmnopqrstuvwxyz";
60 static std::random_device rd;
61 static std::seed_seq seed{rd()};
62 static std::mt19937 gen(seed);
63 static std::uniform_real_distribution<> dis(kMinMessageSize, kMaxMessageSize);
64 static grpc_core::Mutex g_mu;
65 std::string tmp_s;
66 int len;
67 {
68 grpc_core::MutexLock lock(&g_mu);
69 len = dis(gen);
70 }
71 tmp_s.reserve(len);
72 for (int i = 0; i < len; ++i) {
73 tmp_s += alphanum[rand() % (sizeof(alphanum) - 1)];
74 }
75 return tmp_s;
76 }
77
WaitForSingleOwner(std::shared_ptr<EventEngine> engine)78 void WaitForSingleOwner(std::shared_ptr<EventEngine> engine) {
79 while (engine.use_count() > 1) {
80 GRPC_LOG_EVERY_N_SEC(2, GPR_INFO, "engine.use_count() = %ld",
81 engine.use_count());
82 absl::SleepFor(absl::Milliseconds(100));
83 }
84 }
85
AppendStringToSliceBuffer(SliceBuffer * buf,absl::string_view data)86 void AppendStringToSliceBuffer(SliceBuffer* buf, absl::string_view data) {
87 buf->Append(Slice::FromCopiedString(data));
88 }
89
ExtractSliceBufferIntoString(SliceBuffer * buf)90 std::string ExtractSliceBufferIntoString(SliceBuffer* buf) {
91 if (!buf->Length()) {
92 return std::string();
93 }
94 std::string tmp(buf->Length(), '\0');
95 char* bytes = const_cast<char*>(tmp.c_str());
96 grpc_slice_buffer_move_first_into_buffer(buf->c_slice_buffer(), buf->Length(),
97 bytes);
98 return tmp;
99 }
100
SendValidatePayload(absl::string_view data,EventEngine::Endpoint * send_endpoint,EventEngine::Endpoint * receive_endpoint)101 absl::Status SendValidatePayload(absl::string_view data,
102 EventEngine::Endpoint* send_endpoint,
103 EventEngine::Endpoint* receive_endpoint) {
104 GPR_ASSERT(receive_endpoint != nullptr && send_endpoint != nullptr);
105 int num_bytes_written = data.size();
106 grpc_core::Notification read_signal;
107 grpc_core::Notification write_signal;
108 SliceBuffer read_slice_buf;
109 SliceBuffer read_store_buf;
110 SliceBuffer write_slice_buf;
111
112 read_slice_buf.Clear();
113 write_slice_buf.Clear();
114 read_store_buf.Clear();
115 // std::cout << "SendValidatePayload ... " << std::endl;
116 // fflush(stdout);
117
118 AppendStringToSliceBuffer(&write_slice_buf, data);
119 EventEngine::Endpoint::ReadArgs args = {num_bytes_written};
120 std::function<void(absl::Status)> read_cb;
121 read_cb = [receive_endpoint, &read_slice_buf, &read_store_buf, &read_cb,
122 &read_signal, &args](absl::Status status) {
123 GPR_ASSERT(status.ok());
124 if (read_slice_buf.Length() == static_cast<size_t>(args.read_hint_bytes)) {
125 read_slice_buf.MoveFirstNBytesIntoSliceBuffer(read_slice_buf.Length(),
126 read_store_buf);
127 read_signal.Notify();
128 return;
129 }
130 args.read_hint_bytes -= read_slice_buf.Length();
131 read_slice_buf.MoveFirstNBytesIntoSliceBuffer(read_slice_buf.Length(),
132 read_store_buf);
133 if (receive_endpoint->Read(read_cb, &read_slice_buf, &args)) {
134 GPR_ASSERT(read_slice_buf.Length() != 0);
135 read_cb(absl::OkStatus());
136 }
137 };
138 // Start asynchronous reading at the receive_endpoint.
139 if (receive_endpoint->Read(read_cb, &read_slice_buf, &args)) {
140 read_cb(absl::OkStatus());
141 }
142 // Start asynchronous writing at the send_endpoint.
143 if (send_endpoint->Write(
144 [&write_signal](absl::Status status) {
145 GPR_ASSERT(status.ok());
146 write_signal.Notify();
147 },
148 &write_slice_buf, nullptr)) {
149 write_signal.Notify();
150 }
151 write_signal.WaitForNotification();
152 read_signal.WaitForNotification();
153 // Check if data written == data read
154 std::string data_read = ExtractSliceBufferIntoString(&read_store_buf);
155 if (data != data_read) {
156 gpr_log(GPR_INFO, "Data written = %s", data.data());
157 gpr_log(GPR_INFO, "Data read = %s", data_read.c_str());
158 return absl::CancelledError("Data read != Data written");
159 }
160 return absl::OkStatus();
161 }
162
BindAndStartListener(const std::vector<std::string> & addrs,bool listener_type_oracle)163 absl::Status ConnectionManager::BindAndStartListener(
164 const std::vector<std::string>& addrs, bool listener_type_oracle) {
165 grpc_core::MutexLock lock(&mu_);
166 if (addrs.empty()) {
167 return absl::InvalidArgumentError(
168 "Atleast one bind address must be specified");
169 }
170 for (auto& addr : addrs) {
171 if (listeners_.find(addr) != listeners_.end()) {
172 // There is already a listener at this address. Return error.
173 return absl::AlreadyExistsError(
174 absl::StrCat("Listener already existis for address: ", addr));
175 }
176 }
177 EventEngine::Listener::AcceptCallback accept_cb =
178 [this](std::unique_ptr<EventEngine::Endpoint> ep,
179 MemoryAllocator /*memory_allocator*/) {
180 last_in_progress_connection_.SetServerEndpoint(std::move(ep));
181 };
182
183 EventEngine* event_engine = listener_type_oracle ? oracle_event_engine_.get()
184 : test_event_engine_.get();
185
186 ChannelArgsEndpointConfig config;
187 auto status = event_engine->CreateListener(
188 std::move(accept_cb),
189 [](absl::Status status) { GPR_ASSERT(status.ok()); }, config,
190 std::make_unique<grpc_core::MemoryQuota>("foo"));
191 if (!status.ok()) {
192 return status.status();
193 }
194
195 std::shared_ptr<EventEngine::Listener> listener((*status).release());
196 for (auto& addr : addrs) {
197 auto bind_status = listener->Bind(*URIToResolvedAddress(addr));
198 if (!bind_status.ok()) {
199 gpr_log(GPR_ERROR, "Binding listener failed: %s",
200 bind_status.status().ToString().c_str());
201 return bind_status.status();
202 }
203 }
204 GPR_ASSERT(listener->Start().ok());
205 // Insert same listener pointer for all bind addresses after the listener
206 // has started successfully.
207 for (auto& addr : addrs) {
208 listeners_.insert(std::make_pair(addr, listener));
209 }
210 return absl::OkStatus();
211 }
212
213 absl::StatusOr<std::tuple<std::unique_ptr<EventEngine::Endpoint>,
214 std::unique_ptr<EventEngine::Endpoint>>>
CreateConnection(std::string target_addr,EventEngine::Duration timeout,bool client_type_oracle)215 ConnectionManager::CreateConnection(std::string target_addr,
216 EventEngine::Duration timeout,
217 bool client_type_oracle) {
218 // Only allow one CreateConnection call to proceed at a time.
219 grpc_core::MutexLock lock(&mu_);
220 std::string conn_name =
221 absl::StrCat("connection-", std::to_string(num_processed_connections_++));
222 EventEngine* event_engine = client_type_oracle ? oracle_event_engine_.get()
223 : test_event_engine_.get();
224 ChannelArgsEndpointConfig config;
225 event_engine->Connect(
226 [this](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> status) {
227 if (!status.ok()) {
228 gpr_log(GPR_ERROR, "Connect failed: %s",
229 status.status().ToString().c_str());
230 last_in_progress_connection_.SetClientEndpoint(nullptr);
231 } else {
232 last_in_progress_connection_.SetClientEndpoint(std::move(*status));
233 }
234 },
235 *URIToResolvedAddress(target_addr), config,
236 memory_quota_->CreateMemoryAllocator(conn_name), timeout);
237
238 auto client_endpoint = last_in_progress_connection_.GetClientEndpoint();
239 if (client_endpoint != nullptr &&
240 listeners_.find(target_addr) != listeners_.end()) {
241 // There is a listener for the specified address. Wait until it
242 // creates a ServerEndpoint after accepting the connection.
243 auto server_endpoint = last_in_progress_connection_.GetServerEndpoint();
244 GPR_ASSERT(server_endpoint != nullptr);
245 // Set last_in_progress_connection_ to nullptr
246 return std::make_tuple(std::move(client_endpoint),
247 std::move(server_endpoint));
248 }
249 return absl::CancelledError("Failed to create connection.");
250 }
251
252 } // namespace experimental
253 } // namespace grpc_event_engine
254