xref: /aosp_15_r20/external/grpc-grpc/test/core/end2end/end2end_tests.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 
2 //
3 //
4 // Copyright 2015 gRPC authors.
5 //
6 // Licensed under the Apache License, Version 2.0 (the "License");
7 // you may not use this file except in compliance with the License.
8 // You may obtain a copy of the License at
9 //
10 //     http://www.apache.org/licenses/LICENSE-2.0
11 //
12 // Unless required by applicable law or agreed to in writing, software
13 // distributed under the License is distributed on an "AS IS" BASIS,
14 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 // See the License for the specific language governing permissions and
16 // limitations under the License.
17 //
18 //
19 
20 #include "test/core/end2end/end2end_tests.h"
21 
22 #include <regex>
23 #include <tuple>
24 
25 #include "absl/memory/memory.h"
26 #include "absl/random/random.h"
27 
28 #include <grpc/byte_buffer_reader.h>
29 #include <grpc/compression.h>
30 #include <grpc/grpc.h>
31 
32 #include "src/core/lib/compression/message_compress.h"
33 #include "src/core/lib/config/core_configuration.h"
34 #include "src/core/lib/event_engine/default_event_engine.h"
35 #include "src/core/lib/gprpp/no_destruct.h"
36 #include "test/core/end2end/cq_verifier.h"
37 
38 namespace grpc_core {
39 
40 bool g_is_fuzzing_core_e2e_tests = false;
41 
RandomSlice(size_t length)42 Slice RandomSlice(size_t length) {
43   size_t i;
44   static const char chars[] = "abcdefghijklmnopqrstuvwxyz1234567890";
45   std::vector<char> output;
46   output.resize(length);
47   for (i = 0; i < length; ++i) {
48     output[i] = chars[rand() % static_cast<int>(sizeof(chars) - 1)];
49   }
50   return Slice::FromCopiedBuffer(output);
51 }
52 
RandomBinarySlice(size_t length)53 Slice RandomBinarySlice(size_t length) {
54   size_t i;
55   std::vector<uint8_t> output;
56   output.resize(length);
57   for (i = 0; i < length; ++i) {
58     output[i] = rand();
59   }
60   return Slice::FromCopiedBuffer(output);
61 }
62 
ByteBufferFromSlice(Slice slice)63 ByteBufferUniquePtr ByteBufferFromSlice(Slice slice) {
64   return ByteBufferUniquePtr(
65       grpc_raw_byte_buffer_create(const_cast<grpc_slice*>(&slice.c_slice()), 1),
66       grpc_byte_buffer_destroy);
67 }
68 
69 namespace {
FindInMetadataArray(const grpc_metadata_array & md,absl::string_view key)70 absl::optional<std::string> FindInMetadataArray(const grpc_metadata_array& md,
71                                                 absl::string_view key) {
72   for (size_t i = 0; i < md.count; i++) {
73     if (key == StringViewFromSlice(md.metadata[i].key)) {
74       return std::string(StringViewFromSlice(md.metadata[i].value));
75     }
76   }
77   return absl::nullopt;
78 }
79 }  // namespace
80 
SetUp()81 void CoreEnd2endTest::SetUp() {
82   CoreConfiguration::Reset();
83   initialized_ = false;
84 }
85 
TearDown()86 void CoreEnd2endTest::TearDown() {
87   const bool do_shutdown = fixture_ != nullptr;
88   std::shared_ptr<grpc_event_engine::experimental::EventEngine> ee;
89 // TODO(hork): locate the windows leak so we can enable end2end experiments.
90 #ifndef GPR_WINDOWS
91   if (grpc_is_initialized()) {
92     ee = grpc_event_engine::experimental::GetDefaultEventEngine();
93   }
94 #endif
95   ShutdownAndDestroyClient();
96   ShutdownAndDestroyServer();
97   cq_verifier_.reset();
98   if (cq_ != nullptr) {
99     grpc_completion_queue_shutdown(cq_);
100     grpc_event ev;
101     do {
102       ev = grpc_completion_queue_next(cq_, grpc_timeout_seconds_to_deadline(5),
103                                       nullptr);
104     } while (ev.type != GRPC_QUEUE_SHUTDOWN);
105     grpc_completion_queue_destroy(cq_);
106     cq_ = nullptr;
107   }
108   fixture_.reset();
109   // Creating an EventEngine requires gRPC initialization, which the NoOp test
110   // does not do. Skip the EventEngine check if unnecessary.
111   if (ee != nullptr) {
112     quiesce_event_engine_(std::move(ee));
113   }
114   if (do_shutdown) {
115     grpc_shutdown_blocking();
116     // This will wait until gRPC shutdown has actually happened to make sure
117     // no gRPC resources (such as thread) are active. (timeout = 10s)
118     if (!grpc_wait_until_shutdown(10)) {
119       gpr_log(GPR_ERROR, "Timeout in waiting for gRPC shutdown");
120     }
121   }
122   GPR_ASSERT(client_ == nullptr);
123   GPR_ASSERT(server_ == nullptr);
124   initialized_ = false;
125 }
126 
Get(absl::string_view key) const127 absl::optional<std::string> CoreEnd2endTest::IncomingMetadata::Get(
128     absl::string_view key) const {
129   return FindInMetadataArray(*metadata_, key);
130 }
131 
MakeOp()132 grpc_op CoreEnd2endTest::IncomingMetadata::MakeOp() {
133   grpc_op op;
134   memset(&op, 0, sizeof(op));
135   op.op = GRPC_OP_RECV_INITIAL_METADATA;
136   op.data.recv_initial_metadata.recv_initial_metadata = metadata_.get();
137   return op;
138 }
139 
GetSuccessfulStateString()140 std::string CoreEnd2endTest::IncomingMetadata::GetSuccessfulStateString() {
141   std::string out = "incoming_metadata: {";
142   for (size_t i = 0; i < metadata_->count; i++) {
143     absl::StrAppend(&out, StringViewFromSlice(metadata_->metadata[i].key), ":",
144                     StringViewFromSlice(metadata_->metadata[i].value), ",");
145   }
146   return out + "}";
147 }
148 
payload() const149 std::string CoreEnd2endTest::IncomingMessage::payload() const {
150   Slice out;
151   if (payload_->data.raw.compression > GRPC_COMPRESS_NONE) {
152     grpc_slice_buffer decompressed_buffer;
153     grpc_slice_buffer_init(&decompressed_buffer);
154     GPR_ASSERT(grpc_msg_decompress(payload_->data.raw.compression,
155                                    &payload_->data.raw.slice_buffer,
156                                    &decompressed_buffer));
157     grpc_byte_buffer* rbb = grpc_raw_byte_buffer_create(
158         decompressed_buffer.slices, decompressed_buffer.count);
159     grpc_byte_buffer_reader reader;
160     GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, rbb));
161     out = Slice(grpc_byte_buffer_reader_readall(&reader));
162     grpc_byte_buffer_reader_destroy(&reader);
163     grpc_byte_buffer_destroy(rbb);
164     grpc_slice_buffer_destroy(&decompressed_buffer);
165   } else {
166     grpc_byte_buffer_reader reader;
167     GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, payload_));
168     out = Slice(grpc_byte_buffer_reader_readall(&reader));
169     grpc_byte_buffer_reader_destroy(&reader);
170   }
171   return std::string(out.begin(), out.end());
172 }
173 
MakeOp()174 grpc_op CoreEnd2endTest::IncomingMessage::MakeOp() {
175   grpc_op op;
176   memset(&op, 0, sizeof(op));
177   op.op = GRPC_OP_RECV_MESSAGE;
178   op.data.recv_message.recv_message = &payload_;
179   return op;
180 }
181 
182 absl::optional<std::string>
GetTrailingMetadata(absl::string_view key) const183 CoreEnd2endTest::IncomingStatusOnClient::GetTrailingMetadata(
184     absl::string_view key) const {
185   return FindInMetadataArray(data_->trailing_metadata, key);
186 }
187 
188 std::string
GetSuccessfulStateString()189 CoreEnd2endTest::IncomingStatusOnClient::GetSuccessfulStateString() {
190   std::string out = absl::StrCat(
191       "status_on_client: status=", data_->status,
192       " msg=", data_->status_details.as_string_view(), " trailing_metadata={");
193   for (size_t i = 0; i < data_->trailing_metadata.count; i++) {
194     absl::StrAppend(
195         &out, StringViewFromSlice(data_->trailing_metadata.metadata[i].key),
196         ": ", StringViewFromSlice(data_->trailing_metadata.metadata[i].value),
197         ",");
198   }
199   return out + "}";
200 }
201 
GetSuccessfulStateString()202 std::string CoreEnd2endTest::IncomingMessage::GetSuccessfulStateString() {
203   if (payload_ == nullptr) return "message: empty";
204   return absl::StrCat("message: ", payload().size(), "b uncompressed");
205 }
206 
MakeOp()207 grpc_op CoreEnd2endTest::IncomingStatusOnClient::MakeOp() {
208   grpc_op op;
209   memset(&op, 0, sizeof(op));
210   op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
211   op.data.recv_status_on_client.trailing_metadata = &data_->trailing_metadata;
212   op.data.recv_status_on_client.status = &data_->status;
213   op.data.recv_status_on_client.status_details =
214       const_cast<grpc_slice*>(&data_->status_details.c_slice());
215   op.data.recv_status_on_client.error_string = &data_->error_string;
216   return op;
217 }
218 
MakeOp()219 grpc_op CoreEnd2endTest::IncomingCloseOnServer::MakeOp() {
220   grpc_op op;
221   memset(&op, 0, sizeof(op));
222   op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
223   op.data.recv_close_on_server.cancelled = &cancelled_;
224   return op;
225 }
226 
227 CoreEnd2endTest::BatchBuilder&
SendInitialMetadata(std::initializer_list<std::pair<absl::string_view,absl::string_view>> md,uint32_t flags,absl::optional<grpc_compression_level> compression_level)228 CoreEnd2endTest::BatchBuilder::SendInitialMetadata(
229     std::initializer_list<std::pair<absl::string_view, absl::string_view>> md,
230     uint32_t flags, absl::optional<grpc_compression_level> compression_level) {
231   auto& v = Make<std::vector<grpc_metadata>>();
232   for (const auto& p : md) {
233     grpc_metadata m;
234     m.key = Make<Slice>(Slice::FromCopiedString(p.first)).c_slice();
235     m.value = Make<Slice>(Slice::FromCopiedString(p.second)).c_slice();
236     v.push_back(m);
237   }
238   grpc_op op;
239   memset(&op, 0, sizeof(op));
240   op.op = GRPC_OP_SEND_INITIAL_METADATA;
241   op.flags = flags;
242   op.data.send_initial_metadata.count = v.size();
243   op.data.send_initial_metadata.metadata = v.data();
244   if (compression_level.has_value()) {
245     op.data.send_initial_metadata.maybe_compression_level.is_set = 1;
246     op.data.send_initial_metadata.maybe_compression_level.level =
247         compression_level.value();
248   }
249   ops_.push_back(op);
250   return *this;
251 }
252 
SendMessage(Slice payload,uint32_t flags)253 CoreEnd2endTest::BatchBuilder& CoreEnd2endTest::BatchBuilder::SendMessage(
254     Slice payload, uint32_t flags) {
255   grpc_op op;
256   memset(&op, 0, sizeof(op));
257   op.op = GRPC_OP_SEND_MESSAGE;
258   op.data.send_message.send_message =
259       Make<ByteBufferUniquePtr>(ByteBufferFromSlice(std::move(payload))).get();
260   op.flags = flags;
261   ops_.push_back(op);
262   return *this;
263 }
264 
265 CoreEnd2endTest::BatchBuilder&
SendCloseFromClient()266 CoreEnd2endTest::BatchBuilder::SendCloseFromClient() {
267   grpc_op op;
268   memset(&op, 0, sizeof(op));
269   op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
270   ops_.push_back(op);
271   return *this;
272 }
273 
274 CoreEnd2endTest::BatchBuilder&
SendStatusFromServer(grpc_status_code status,absl::string_view message,std::initializer_list<std::pair<absl::string_view,absl::string_view>> md)275 CoreEnd2endTest::BatchBuilder::SendStatusFromServer(
276     grpc_status_code status, absl::string_view message,
277     std::initializer_list<std::pair<absl::string_view, absl::string_view>> md) {
278   auto& v = Make<std::vector<grpc_metadata>>();
279   for (const auto& p : md) {
280     grpc_metadata m;
281     m.key = Make<Slice>(Slice::FromCopiedString(p.first)).c_slice();
282     m.value = Make<Slice>(Slice::FromCopiedString(p.second)).c_slice();
283     v.push_back(m);
284   }
285   grpc_op op;
286   memset(&op, 0, sizeof(op));
287   op.op = GRPC_OP_SEND_STATUS_FROM_SERVER;
288   op.data.send_status_from_server.trailing_metadata_count = v.size();
289   op.data.send_status_from_server.trailing_metadata = v.data();
290   op.data.send_status_from_server.status = status;
291   op.data.send_status_from_server.status_details = &Make<grpc_slice>(
292       Make<Slice>(Slice::FromCopiedString(message)).c_slice());
293   ops_.push_back(op);
294   return *this;
295 }
296 
~BatchBuilder()297 CoreEnd2endTest::BatchBuilder::~BatchBuilder() {
298   grpc_call_error err = grpc_call_start_batch(call_, ops_.data(), ops_.size(),
299                                               CqVerifier::tag(tag_), nullptr);
300   EXPECT_EQ(err, GRPC_CALL_OK) << grpc_call_error_to_string(err);
301 }
302 
Create()303 CoreEnd2endTest::Call CoreEnd2endTest::ClientCallBuilder::Create() {
304   if (auto* u = absl::get_if<UnregisteredCall>(&call_selector_)) {
305     absl::optional<Slice> host;
306     if (u->host.has_value()) host = Slice::FromCopiedString(*u->host);
307     test_.ForceInitialized();
308     return Call(
309         grpc_channel_create_call(
310             test_.client(), parent_call_, propagation_mask_, test_.cq(),
311             Slice::FromCopiedString(u->method).c_slice(),
312             host.has_value() ? &host->c_slice() : nullptr, deadline_, nullptr),
313         &test_);
314   } else {
315     return Call(grpc_channel_create_registered_call(
316                     test_.client(), parent_call_, propagation_mask_, test_.cq(),
317                     absl::get<void*>(call_selector_), deadline_, nullptr),
318                 &test_);
319   }
320 }
321 
ServerRegisteredMethod(CoreEnd2endTest * test,absl::string_view name,grpc_server_register_method_payload_handling payload_handling)322 CoreEnd2endTest::ServerRegisteredMethod::ServerRegisteredMethod(
323     CoreEnd2endTest* test, absl::string_view name,
324     grpc_server_register_method_payload_handling payload_handling) {
325   GPR_ASSERT(test->server_ == nullptr);
326   test->pre_server_start_ = [old = std::move(test->pre_server_start_),
327                              handle = handle_, name = std::string(name),
328                              payload_handling](grpc_server* server) mutable {
329     *handle = grpc_server_register_method(server, name.c_str(), nullptr,
330                                           payload_handling, 0);
331     old(server);
332   };
333 }
334 
IncomingCall(CoreEnd2endTest & test,int tag)335 CoreEnd2endTest::IncomingCall::IncomingCall(CoreEnd2endTest& test, int tag)
336     : impl_(std::make_unique<Impl>(&test)) {
337   test.ForceInitialized();
338   EXPECT_EQ(
339       grpc_server_request_call(test.server(), impl_->call.call_ptr(),
340                                &impl_->call_details, &impl_->request_metadata,
341                                test.cq(), test.cq(), CqVerifier::tag(tag)),
342       GRPC_CALL_OK);
343 }
344 
IncomingCall(CoreEnd2endTest & test,void * method,IncomingMessage * message,int tag)345 CoreEnd2endTest::IncomingCall::IncomingCall(CoreEnd2endTest& test, void* method,
346                                             IncomingMessage* message, int tag)
347     : impl_(std::make_unique<Impl>(&test)) {
348   test.ForceInitialized();
349   impl_->call_details.method = grpc_empty_slice();
350   EXPECT_EQ(grpc_server_request_registered_call(
351                 test.server(), method, impl_->call.call_ptr(),
352                 &impl_->call_details.deadline, &impl_->request_metadata,
353                 message == nullptr ? nullptr : &message->payload_, test.cq(),
354                 test.cq(), CqVerifier::tag(tag)),
355             GRPC_CALL_OK);
356 }
357 
GetInitialMetadata(absl::string_view key) const358 absl::optional<std::string> CoreEnd2endTest::IncomingCall::GetInitialMetadata(
359     absl::string_view key) const {
360   return FindInMetadataArray(impl_->request_metadata, key);
361 }
362 
ForceInitialized()363 void CoreEnd2endTest::ForceInitialized() {
364   if (!initialized_) {
365     initialized_ = true;
366     InitServer(ChannelArgs());
367     InitClient(ChannelArgs());
368   }
369 }
370 
RegisterTest(absl::string_view suite,absl::string_view name,MakeTestFn make_test,SourceLocation)371 void CoreEnd2endTestRegistry::RegisterTest(absl::string_view suite,
372                                            absl::string_view name,
373                                            MakeTestFn make_test,
374                                            SourceLocation) {
375   if (absl::StartsWith(name, "DISABLED_")) return;
376   auto& tests = tests_by_suite_[suite];
377   GPR_ASSERT(tests.count(name) == 0);
378   tests[name] = std::move(make_test);
379 }
380 
RegisterSuite(absl::string_view suite,std::vector<const CoreTestConfiguration * > configs,SourceLocation)381 void CoreEnd2endTestRegistry::RegisterSuite(
382     absl::string_view suite, std::vector<const CoreTestConfiguration*> configs,
383     SourceLocation) {
384   GPR_ASSERT(suites_.count(suite) == 0);
385   suites_[suite] = std::move(configs);
386 }
387 
388 namespace {
389 template <typename Map>
KeysFrom(const Map & map)390 std::vector<absl::string_view> KeysFrom(const Map& map) {
391   std::vector<absl::string_view> out;
392   out.reserve(map.size());
393   for (const auto& elem : map) {
394     out.push_back(elem.first);
395   }
396   return out;
397 }
398 }  // namespace
399 
AllTests()400 std::vector<CoreEnd2endTestRegistry::Test> CoreEnd2endTestRegistry::AllTests() {
401   std::vector<Test> tests;
402   // Sort inputs to ensure outputs are deterministic
403   for (auto& suite_configs : suites_) {
404     std::sort(suite_configs.second.begin(), suite_configs.second.end(),
405               [](const auto* a, const auto* b) { return a->name < b->name; });
406   }
407   for (const auto& suite_configs : suites_) {
408     if (suite_configs.second.empty()) {
409       fprintf(
410           stderr, "%s\n",
411           absl::StrCat("Suite ", suite_configs.first, " has no tests").c_str());
412     }
413     for (const auto& test_factory : tests_by_suite_[suite_configs.first]) {
414       for (const auto* config : suite_configs.second) {
415         tests.push_back(Test{suite_configs.first, test_factory.first, config,
416                              test_factory.second});
417       }
418     }
419   }
420   return tests;
421 }
422 
423 }  // namespace grpc_core
424