1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "test/core/xds/xds_transport_fake.h"
18
19 #include <functional>
20 #include <memory>
21 #include <string>
22 #include <thread>
23 #include <type_traits>
24 #include <utility>
25
26 #include <grpc/event_engine/event_engine.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/port_platform.h>
29
30 #include "src/core/ext/xds/xds_bootstrap.h"
31 #include "src/core/lib/event_engine/default_event_engine.h"
32 #include "src/core/lib/gprpp/orphanable.h"
33 #include "src/core/lib/gprpp/ref_counted_ptr.h"
34 #include "src/core/lib/iomgr/exec_ctx.h"
35 #include "test/core/util/test_config.h"
36
37 using grpc_event_engine::experimental::GetDefaultEventEngine;
38
39 namespace grpc_core {
40
41 //
42 // FakeXdsTransportFactory::FakeStreamingCall
43 //
44
~FakeStreamingCall()45 FakeXdsTransportFactory::FakeStreamingCall::~FakeStreamingCall() {
46 // Tests should not fail to read any messages from the client.
47 {
48 MutexLock lock(&mu_);
49 if (transport_->abort_on_undrained_messages()) {
50 for (const auto& message : from_client_messages_) {
51 gpr_log(GPR_ERROR, "[%s] %p From client message left in queue: %s",
52 transport_->server()->server_uri().c_str(), this,
53 message.c_str());
54 }
55 GPR_ASSERT(from_client_messages_.empty());
56 }
57 }
58 // Can't call event_handler_->OnStatusReceived() or unref event_handler_
59 // synchronously, since those operations will trigger code in
60 // XdsClient that acquires its mutex, but it was already holding its
61 // mutex when it called us, so it would deadlock.
62 GetDefaultEventEngine()->Run([event_handler = std::move(event_handler_),
63 status_sent = status_sent_]() mutable {
64 ExecCtx exec_ctx;
65 if (!status_sent) event_handler->OnStatusReceived(absl::OkStatus());
66 event_handler.reset();
67 });
68 }
69
Orphan()70 void FakeXdsTransportFactory::FakeStreamingCall::Orphan() {
71 {
72 MutexLock lock(&mu_);
73 orphaned_ = true;
74 }
75 transport_->RemoveStream(method_, this);
76 Unref();
77 }
78
SendMessage(std::string payload)79 void FakeXdsTransportFactory::FakeStreamingCall::SendMessage(
80 std::string payload) {
81 MutexLock lock(&mu_);
82 GPR_ASSERT(!orphaned_);
83 from_client_messages_.push_back(std::move(payload));
84 cv_client_msg_.Signal();
85 if (transport_->auto_complete_messages_from_client()) {
86 CompleteSendMessageFromClientLocked(/*ok=*/true);
87 }
88 }
89
HaveMessageFromClient()90 bool FakeXdsTransportFactory::FakeStreamingCall::HaveMessageFromClient() {
91 MutexLock lock(&mu_);
92 return !from_client_messages_.empty();
93 }
94
95 absl::optional<std::string>
WaitForMessageFromClient(absl::Duration timeout)96 FakeXdsTransportFactory::FakeStreamingCall::WaitForMessageFromClient(
97 absl::Duration timeout) {
98 MutexLock lock(&mu_);
99 while (from_client_messages_.empty()) {
100 if (cv_client_msg_.WaitWithTimeout(&mu_,
101 timeout * grpc_test_slowdown_factor())) {
102 return absl::nullopt;
103 }
104 }
105 std::string payload = from_client_messages_.front();
106 from_client_messages_.pop_front();
107 return payload;
108 }
109
110 void FakeXdsTransportFactory::FakeStreamingCall::
CompleteSendMessageFromClientLocked(bool ok)111 CompleteSendMessageFromClientLocked(bool ok) {
112 // Can't call event_handler_->OnRequestSent() synchronously, since that
113 // operation will trigger code in XdsClient that acquires its mutex, but it
114 // was already holding its mutex when it called us, so it would deadlock.
115 GetDefaultEventEngine()->Run(
116 [event_handler = event_handler_->Ref(), ok]() mutable {
117 ExecCtx exec_ctx;
118 event_handler->OnRequestSent(ok);
119 event_handler.reset();
120 });
121 }
122
CompleteSendMessageFromClient(bool ok)123 void FakeXdsTransportFactory::FakeStreamingCall::CompleteSendMessageFromClient(
124 bool ok) {
125 GPR_ASSERT(!transport_->auto_complete_messages_from_client());
126 MutexLock lock(&mu_);
127 CompleteSendMessageFromClientLocked(ok);
128 }
129
StartRecvMessage()130 void FakeXdsTransportFactory::FakeStreamingCall::StartRecvMessage() {
131 MutexLock lock(&mu_);
132 if (num_pending_reads_ > 0) {
133 transport_->factory()->too_many_pending_reads_callback_();
134 }
135 ++reads_started_;
136 ++num_pending_reads_;
137 cv_reads_started_.SignalAll();
138 if (!to_client_messages_.empty()) {
139 // Dispatch pending message (if there's one) on a separate thread to avoid
140 // recursion
141 GetDefaultEventEngine()->Run([call = RefAsSubclass<FakeStreamingCall>()]() {
142 call->MaybeDeliverMessageToClient();
143 });
144 }
145 }
146
SendMessageToClient(absl::string_view payload)147 void FakeXdsTransportFactory::FakeStreamingCall::SendMessageToClient(
148 absl::string_view payload) {
149 {
150 MutexLock lock(&mu_);
151 to_client_messages_.emplace_back(payload);
152 }
153 MaybeDeliverMessageToClient();
154 }
155
MaybeDeliverMessageToClient()156 void FakeXdsTransportFactory::FakeStreamingCall::MaybeDeliverMessageToClient() {
157 RefCountedPtr<RefCountedEventHandler> event_handler;
158 std::string message;
159 // Loop terminates with a break inside
160 while (true) {
161 {
162 MutexLock lock(&mu_);
163 if (num_pending_reads_ == 0 || to_client_messages_.empty()) {
164 break;
165 }
166 --num_pending_reads_;
167 message = std::move(to_client_messages_.front());
168 to_client_messages_.pop_front();
169 event_handler = event_handler_;
170 }
171 ExecCtx exec_ctx;
172 event_handler->OnRecvMessage(message);
173 }
174 }
175
MaybeSendStatusToClient(absl::Status status)176 void FakeXdsTransportFactory::FakeStreamingCall::MaybeSendStatusToClient(
177 absl::Status status) {
178 ExecCtx exec_ctx;
179 RefCountedPtr<RefCountedEventHandler> event_handler;
180 {
181 MutexLock lock(&mu_);
182 if (status_sent_) return;
183 status_sent_ = true;
184 event_handler = event_handler_->Ref();
185 }
186 event_handler->OnStatusReceived(std::move(status));
187 }
188
Orphaned()189 bool FakeXdsTransportFactory::FakeStreamingCall::Orphaned() {
190 MutexLock lock(&mu_);
191 return orphaned_;
192 }
193
194 //
195 // FakeXdsTransportFactory::FakeXdsTransport
196 //
197
TriggerConnectionFailure(absl::Status status)198 void FakeXdsTransportFactory::FakeXdsTransport::TriggerConnectionFailure(
199 absl::Status status) {
200 RefCountedPtr<RefCountedOnConnectivityFailure> on_connectivity_failure;
201 {
202 MutexLock lock(&mu_);
203 on_connectivity_failure = on_connectivity_failure_;
204 }
205 ExecCtx exec_ctx;
206 if (on_connectivity_failure != nullptr) {
207 on_connectivity_failure->Run(std::move(status));
208 }
209 }
210
Orphan()211 void FakeXdsTransportFactory::FakeXdsTransport::Orphan() {
212 {
213 MutexLock lock(&factory_->mu_);
214 auto it = factory_->transport_map_.find(server_.Key());
215 if (it != factory_->transport_map_.end() && it->second == this) {
216 factory_->transport_map_.erase(it);
217 }
218 }
219 factory_.reset();
220 {
221 MutexLock lock(&mu_);
222 // Can't destroy on_connectivity_failure_ synchronously, since that
223 // operation will trigger code in XdsClient that acquires its mutex, but
224 // it was already holding its mutex when it called us, so it would deadlock.
225 GetDefaultEventEngine()->Run([on_connectivity_failure = std::move(
226 on_connectivity_failure_)]() mutable {
227 ExecCtx exec_ctx;
228 on_connectivity_failure.reset();
229 });
230 }
231 Unref();
232 }
233
234 RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall>
WaitForStream(const char * method,absl::Duration timeout)235 FakeXdsTransportFactory::FakeXdsTransport::WaitForStream(
236 const char* method, absl::Duration timeout) {
237 MutexLock lock(&mu_);
238 auto it = active_calls_.find(method);
239 while (it == active_calls_.end() || it->second == nullptr) {
240 if (cv_.WaitWithTimeout(&mu_, timeout * grpc_test_slowdown_factor())) {
241 return nullptr;
242 }
243 it = active_calls_.find(method);
244 }
245 return it->second;
246 }
247
RemoveStream(const char * method,FakeStreamingCall * call)248 void FakeXdsTransportFactory::FakeXdsTransport::RemoveStream(
249 const char* method, FakeStreamingCall* call) {
250 MutexLock lock(&mu_);
251 auto it = active_calls_.find(method);
252 if (it != active_calls_.end() && it->second.get() == call) {
253 active_calls_.erase(it);
254 }
255 }
256
257 OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall>
CreateStreamingCall(const char * method,std::unique_ptr<StreamingCall::EventHandler> event_handler)258 FakeXdsTransportFactory::FakeXdsTransport::CreateStreamingCall(
259 const char* method,
260 std::unique_ptr<StreamingCall::EventHandler> event_handler) {
261 auto call = MakeOrphanable<FakeStreamingCall>(
262 RefAsSubclass<FakeXdsTransport>(), method, std::move(event_handler));
263 MutexLock lock(&mu_);
264 active_calls_[method] = call->Ref().TakeAsSubclass<FakeStreamingCall>();
265 cv_.Signal();
266 return call;
267 }
268
269 //
270 // FakeXdsTransportFactory
271 //
272
273 constexpr char FakeXdsTransportFactory::kAdsMethod[];
274 constexpr char FakeXdsTransportFactory::kLrsMethod[];
275
276 OrphanablePtr<XdsTransportFactory::XdsTransport>
Create(const XdsBootstrap::XdsServer & server,std::function<void (absl::Status)> on_connectivity_failure,absl::Status *)277 FakeXdsTransportFactory::Create(
278 const XdsBootstrap::XdsServer& server,
279 std::function<void(absl::Status)> on_connectivity_failure,
280 absl::Status* /*status*/) {
281 MutexLock lock(&mu_);
282 auto& entry = transport_map_[server.Key()];
283 GPR_ASSERT(entry == nullptr);
284 auto transport = MakeOrphanable<FakeXdsTransport>(
285 RefAsSubclass<FakeXdsTransportFactory>(), server,
286 std::move(on_connectivity_failure), auto_complete_messages_from_client_,
287 abort_on_undrained_messages_);
288 entry = transport->Ref().TakeAsSubclass<FakeXdsTransport>();
289 return transport;
290 }
291
TriggerConnectionFailure(const XdsBootstrap::XdsServer & server,absl::Status status)292 void FakeXdsTransportFactory::TriggerConnectionFailure(
293 const XdsBootstrap::XdsServer& server, absl::Status status) {
294 auto transport = GetTransport(server);
295 if (transport == nullptr) return;
296 transport->TriggerConnectionFailure(std::move(status));
297 }
298
SetAutoCompleteMessagesFromClient(bool value)299 void FakeXdsTransportFactory::SetAutoCompleteMessagesFromClient(bool value) {
300 MutexLock lock(&mu_);
301 auto_complete_messages_from_client_ = value;
302 }
303
SetAbortOnUndrainedMessages(bool value)304 void FakeXdsTransportFactory::SetAbortOnUndrainedMessages(bool value) {
305 MutexLock lock(&mu_);
306 abort_on_undrained_messages_ = value;
307 }
308
309 RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall>
WaitForStream(const XdsBootstrap::XdsServer & server,const char * method,absl::Duration timeout)310 FakeXdsTransportFactory::WaitForStream(const XdsBootstrap::XdsServer& server,
311 const char* method,
312 absl::Duration timeout) {
313 auto transport = GetTransport(server);
314 if (transport == nullptr) return nullptr;
315 return transport->WaitForStream(method, timeout);
316 }
317
318 RefCountedPtr<FakeXdsTransportFactory::FakeXdsTransport>
GetTransport(const XdsBootstrap::XdsServer & server)319 FakeXdsTransportFactory::GetTransport(const XdsBootstrap::XdsServer& server) {
320 MutexLock lock(&mu_);
321 return transport_map_[server.Key()];
322 }
323
324 } // namespace grpc_core
325