xref: /aosp_15_r20/external/grpc-grpc/test/core/xds/xds_transport_fake.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "test/core/xds/xds_transport_fake.h"
18 
19 #include <functional>
20 #include <memory>
21 #include <string>
22 #include <thread>
23 #include <type_traits>
24 #include <utility>
25 
26 #include <grpc/event_engine/event_engine.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/port_platform.h>
29 
30 #include "src/core/ext/xds/xds_bootstrap.h"
31 #include "src/core/lib/event_engine/default_event_engine.h"
32 #include "src/core/lib/gprpp/orphanable.h"
33 #include "src/core/lib/gprpp/ref_counted_ptr.h"
34 #include "src/core/lib/iomgr/exec_ctx.h"
35 #include "test/core/util/test_config.h"
36 
37 using grpc_event_engine::experimental::GetDefaultEventEngine;
38 
39 namespace grpc_core {
40 
41 //
42 // FakeXdsTransportFactory::FakeStreamingCall
43 //
44 
~FakeStreamingCall()45 FakeXdsTransportFactory::FakeStreamingCall::~FakeStreamingCall() {
46   // Tests should not fail to read any messages from the client.
47   {
48     MutexLock lock(&mu_);
49     if (transport_->abort_on_undrained_messages()) {
50       for (const auto& message : from_client_messages_) {
51         gpr_log(GPR_ERROR, "[%s] %p From client message left in queue: %s",
52                 transport_->server()->server_uri().c_str(), this,
53                 message.c_str());
54       }
55       GPR_ASSERT(from_client_messages_.empty());
56     }
57   }
58   // Can't call event_handler_->OnStatusReceived() or unref event_handler_
59   // synchronously, since those operations will trigger code in
60   // XdsClient that acquires its mutex, but it was already holding its
61   // mutex when it called us, so it would deadlock.
62   GetDefaultEventEngine()->Run([event_handler = std::move(event_handler_),
63                                 status_sent = status_sent_]() mutable {
64     ExecCtx exec_ctx;
65     if (!status_sent) event_handler->OnStatusReceived(absl::OkStatus());
66     event_handler.reset();
67   });
68 }
69 
Orphan()70 void FakeXdsTransportFactory::FakeStreamingCall::Orphan() {
71   {
72     MutexLock lock(&mu_);
73     orphaned_ = true;
74   }
75   transport_->RemoveStream(method_, this);
76   Unref();
77 }
78 
SendMessage(std::string payload)79 void FakeXdsTransportFactory::FakeStreamingCall::SendMessage(
80     std::string payload) {
81   MutexLock lock(&mu_);
82   GPR_ASSERT(!orphaned_);
83   from_client_messages_.push_back(std::move(payload));
84   cv_client_msg_.Signal();
85   if (transport_->auto_complete_messages_from_client()) {
86     CompleteSendMessageFromClientLocked(/*ok=*/true);
87   }
88 }
89 
HaveMessageFromClient()90 bool FakeXdsTransportFactory::FakeStreamingCall::HaveMessageFromClient() {
91   MutexLock lock(&mu_);
92   return !from_client_messages_.empty();
93 }
94 
95 absl::optional<std::string>
WaitForMessageFromClient(absl::Duration timeout)96 FakeXdsTransportFactory::FakeStreamingCall::WaitForMessageFromClient(
97     absl::Duration timeout) {
98   MutexLock lock(&mu_);
99   while (from_client_messages_.empty()) {
100     if (cv_client_msg_.WaitWithTimeout(&mu_,
101                                        timeout * grpc_test_slowdown_factor())) {
102       return absl::nullopt;
103     }
104   }
105   std::string payload = from_client_messages_.front();
106   from_client_messages_.pop_front();
107   return payload;
108 }
109 
110 void FakeXdsTransportFactory::FakeStreamingCall::
CompleteSendMessageFromClientLocked(bool ok)111     CompleteSendMessageFromClientLocked(bool ok) {
112   // Can't call event_handler_->OnRequestSent() synchronously, since that
113   // operation will trigger code in XdsClient that acquires its mutex, but it
114   // was already holding its mutex when it called us, so it would deadlock.
115   GetDefaultEventEngine()->Run(
116       [event_handler = event_handler_->Ref(), ok]() mutable {
117         ExecCtx exec_ctx;
118         event_handler->OnRequestSent(ok);
119         event_handler.reset();
120       });
121 }
122 
CompleteSendMessageFromClient(bool ok)123 void FakeXdsTransportFactory::FakeStreamingCall::CompleteSendMessageFromClient(
124     bool ok) {
125   GPR_ASSERT(!transport_->auto_complete_messages_from_client());
126   MutexLock lock(&mu_);
127   CompleteSendMessageFromClientLocked(ok);
128 }
129 
StartRecvMessage()130 void FakeXdsTransportFactory::FakeStreamingCall::StartRecvMessage() {
131   MutexLock lock(&mu_);
132   if (num_pending_reads_ > 0) {
133     transport_->factory()->too_many_pending_reads_callback_();
134   }
135   ++reads_started_;
136   ++num_pending_reads_;
137   cv_reads_started_.SignalAll();
138   if (!to_client_messages_.empty()) {
139     // Dispatch pending message (if there's one) on a separate thread to avoid
140     // recursion
141     GetDefaultEventEngine()->Run([call = RefAsSubclass<FakeStreamingCall>()]() {
142       call->MaybeDeliverMessageToClient();
143     });
144   }
145 }
146 
SendMessageToClient(absl::string_view payload)147 void FakeXdsTransportFactory::FakeStreamingCall::SendMessageToClient(
148     absl::string_view payload) {
149   {
150     MutexLock lock(&mu_);
151     to_client_messages_.emplace_back(payload);
152   }
153   MaybeDeliverMessageToClient();
154 }
155 
MaybeDeliverMessageToClient()156 void FakeXdsTransportFactory::FakeStreamingCall::MaybeDeliverMessageToClient() {
157   RefCountedPtr<RefCountedEventHandler> event_handler;
158   std::string message;
159   // Loop terminates with a break inside
160   while (true) {
161     {
162       MutexLock lock(&mu_);
163       if (num_pending_reads_ == 0 || to_client_messages_.empty()) {
164         break;
165       }
166       --num_pending_reads_;
167       message = std::move(to_client_messages_.front());
168       to_client_messages_.pop_front();
169       event_handler = event_handler_;
170     }
171     ExecCtx exec_ctx;
172     event_handler->OnRecvMessage(message);
173   }
174 }
175 
MaybeSendStatusToClient(absl::Status status)176 void FakeXdsTransportFactory::FakeStreamingCall::MaybeSendStatusToClient(
177     absl::Status status) {
178   ExecCtx exec_ctx;
179   RefCountedPtr<RefCountedEventHandler> event_handler;
180   {
181     MutexLock lock(&mu_);
182     if (status_sent_) return;
183     status_sent_ = true;
184     event_handler = event_handler_->Ref();
185   }
186   event_handler->OnStatusReceived(std::move(status));
187 }
188 
Orphaned()189 bool FakeXdsTransportFactory::FakeStreamingCall::Orphaned() {
190   MutexLock lock(&mu_);
191   return orphaned_;
192 }
193 
194 //
195 // FakeXdsTransportFactory::FakeXdsTransport
196 //
197 
TriggerConnectionFailure(absl::Status status)198 void FakeXdsTransportFactory::FakeXdsTransport::TriggerConnectionFailure(
199     absl::Status status) {
200   RefCountedPtr<RefCountedOnConnectivityFailure> on_connectivity_failure;
201   {
202     MutexLock lock(&mu_);
203     on_connectivity_failure = on_connectivity_failure_;
204   }
205   ExecCtx exec_ctx;
206   if (on_connectivity_failure != nullptr) {
207     on_connectivity_failure->Run(std::move(status));
208   }
209 }
210 
Orphan()211 void FakeXdsTransportFactory::FakeXdsTransport::Orphan() {
212   {
213     MutexLock lock(&factory_->mu_);
214     auto it = factory_->transport_map_.find(server_.Key());
215     if (it != factory_->transport_map_.end() && it->second == this) {
216       factory_->transport_map_.erase(it);
217     }
218   }
219   factory_.reset();
220   {
221     MutexLock lock(&mu_);
222     // Can't destroy on_connectivity_failure_ synchronously, since that
223     // operation will trigger code in XdsClient that acquires its mutex, but
224     // it was already holding its mutex when it called us, so it would deadlock.
225     GetDefaultEventEngine()->Run([on_connectivity_failure = std::move(
226                                       on_connectivity_failure_)]() mutable {
227       ExecCtx exec_ctx;
228       on_connectivity_failure.reset();
229     });
230   }
231   Unref();
232 }
233 
234 RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall>
WaitForStream(const char * method,absl::Duration timeout)235 FakeXdsTransportFactory::FakeXdsTransport::WaitForStream(
236     const char* method, absl::Duration timeout) {
237   MutexLock lock(&mu_);
238   auto it = active_calls_.find(method);
239   while (it == active_calls_.end() || it->second == nullptr) {
240     if (cv_.WaitWithTimeout(&mu_, timeout * grpc_test_slowdown_factor())) {
241       return nullptr;
242     }
243     it = active_calls_.find(method);
244   }
245   return it->second;
246 }
247 
RemoveStream(const char * method,FakeStreamingCall * call)248 void FakeXdsTransportFactory::FakeXdsTransport::RemoveStream(
249     const char* method, FakeStreamingCall* call) {
250   MutexLock lock(&mu_);
251   auto it = active_calls_.find(method);
252   if (it != active_calls_.end() && it->second.get() == call) {
253     active_calls_.erase(it);
254   }
255 }
256 
257 OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall>
CreateStreamingCall(const char * method,std::unique_ptr<StreamingCall::EventHandler> event_handler)258 FakeXdsTransportFactory::FakeXdsTransport::CreateStreamingCall(
259     const char* method,
260     std::unique_ptr<StreamingCall::EventHandler> event_handler) {
261   auto call = MakeOrphanable<FakeStreamingCall>(
262       RefAsSubclass<FakeXdsTransport>(), method, std::move(event_handler));
263   MutexLock lock(&mu_);
264   active_calls_[method] = call->Ref().TakeAsSubclass<FakeStreamingCall>();
265   cv_.Signal();
266   return call;
267 }
268 
269 //
270 // FakeXdsTransportFactory
271 //
272 
273 constexpr char FakeXdsTransportFactory::kAdsMethod[];
274 constexpr char FakeXdsTransportFactory::kLrsMethod[];
275 
276 OrphanablePtr<XdsTransportFactory::XdsTransport>
Create(const XdsBootstrap::XdsServer & server,std::function<void (absl::Status)> on_connectivity_failure,absl::Status *)277 FakeXdsTransportFactory::Create(
278     const XdsBootstrap::XdsServer& server,
279     std::function<void(absl::Status)> on_connectivity_failure,
280     absl::Status* /*status*/) {
281   MutexLock lock(&mu_);
282   auto& entry = transport_map_[server.Key()];
283   GPR_ASSERT(entry == nullptr);
284   auto transport = MakeOrphanable<FakeXdsTransport>(
285       RefAsSubclass<FakeXdsTransportFactory>(), server,
286       std::move(on_connectivity_failure), auto_complete_messages_from_client_,
287       abort_on_undrained_messages_);
288   entry = transport->Ref().TakeAsSubclass<FakeXdsTransport>();
289   return transport;
290 }
291 
TriggerConnectionFailure(const XdsBootstrap::XdsServer & server,absl::Status status)292 void FakeXdsTransportFactory::TriggerConnectionFailure(
293     const XdsBootstrap::XdsServer& server, absl::Status status) {
294   auto transport = GetTransport(server);
295   if (transport == nullptr) return;
296   transport->TriggerConnectionFailure(std::move(status));
297 }
298 
SetAutoCompleteMessagesFromClient(bool value)299 void FakeXdsTransportFactory::SetAutoCompleteMessagesFromClient(bool value) {
300   MutexLock lock(&mu_);
301   auto_complete_messages_from_client_ = value;
302 }
303 
SetAbortOnUndrainedMessages(bool value)304 void FakeXdsTransportFactory::SetAbortOnUndrainedMessages(bool value) {
305   MutexLock lock(&mu_);
306   abort_on_undrained_messages_ = value;
307 }
308 
309 RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall>
WaitForStream(const XdsBootstrap::XdsServer & server,const char * method,absl::Duration timeout)310 FakeXdsTransportFactory::WaitForStream(const XdsBootstrap::XdsServer& server,
311                                        const char* method,
312                                        absl::Duration timeout) {
313   auto transport = GetTransport(server);
314   if (transport == nullptr) return nullptr;
315   return transport->WaitForStream(method, timeout);
316 }
317 
318 RefCountedPtr<FakeXdsTransportFactory::FakeXdsTransport>
GetTransport(const XdsBootstrap::XdsServer & server)319 FakeXdsTransportFactory::GetTransport(const XdsBootstrap::XdsServer& server) {
320   MutexLock lock(&mu_);
321   return transport_map_[server.Key()];
322 }
323 
324 }  // namespace grpc_core
325