xref: /aosp_15_r20/external/grpc-grpc/test/core/xds/xds_transport_fake.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_TEST_CORE_XDS_XDS_TRANSPORT_FAKE_H
18 #define GRPC_TEST_CORE_XDS_XDS_TRANSPORT_FAKE_H
19 
20 #include <stddef.h>
21 
22 #include <deque>
23 #include <functional>
24 #include <map>
25 #include <memory>
26 #include <string>
27 #include <utility>
28 
29 #include "absl/base/thread_annotations.h"
30 #include "absl/status/status.h"
31 #include "absl/strings/string_view.h"
32 #include "absl/time/time.h"
33 #include "absl/types/optional.h"
34 
35 #include <grpc/support/port_platform.h>
36 
37 #include "src/core/ext/xds/xds_bootstrap.h"
38 #include "src/core/ext/xds/xds_transport.h"
39 #include "src/core/lib/gprpp/orphanable.h"
40 #include "src/core/lib/gprpp/ref_counted.h"
41 #include "src/core/lib/gprpp/ref_counted_ptr.h"
42 #include "src/core/lib/gprpp/sync.h"
43 
44 namespace grpc_core {
45 
46 class FakeXdsTransportFactory : public XdsTransportFactory {
47  private:
48   class FakeXdsTransport;
49 
50  public:
51   static constexpr char kAdsMethod[] =
52       "/envoy.service.discovery.v3.AggregatedDiscoveryService/"
53       "StreamAggregatedResources";
54   static constexpr char kLrsMethod[] =
55       "/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats";
56 
57   class FakeStreamingCall : public XdsTransport::StreamingCall {
58    public:
FakeStreamingCall(RefCountedPtr<FakeXdsTransport> transport,const char * method,std::unique_ptr<StreamingCall::EventHandler> event_handler)59     FakeStreamingCall(
60         RefCountedPtr<FakeXdsTransport> transport, const char* method,
61         std::unique_ptr<StreamingCall::EventHandler> event_handler)
62         : transport_(std::move(transport)),
63           method_(method),
64           event_handler_(MakeRefCounted<RefCountedEventHandler>(
65               std::move(event_handler))) {}
66 
67     ~FakeStreamingCall() override;
68 
69     void Orphan() override;
70 
71     void StartRecvMessage() override;
72 
73     using StreamingCall::Ref;  // Make it public.
74 
75     bool HaveMessageFromClient();
76     absl::optional<std::string> WaitForMessageFromClient(
77         absl::Duration timeout);
78 
79     // If FakeXdsTransportFactory::SetAutoCompleteMessagesFromClient()
80     // was called to set the value to false before the creation of the
81     // transport that underlies this stream, then this must be called
82     // to invoke EventHandler::OnRequestSent() for every message read
83     // via WaitForMessageFromClient().
84     void CompleteSendMessageFromClient(bool ok = true);
85 
86     void SendMessageToClient(absl::string_view payload);
87     void MaybeSendStatusToClient(absl::Status status);
88 
89     bool Orphaned();
90 
WaitForReadsStarted(size_t expected,absl::Duration timeout)91     bool WaitForReadsStarted(size_t expected, absl::Duration timeout) {
92       MutexLock lock(&mu_);
93       const absl::Time deadline = absl::Now() + timeout;
94       do {
95         if (reads_started_ == expected) {
96           return true;
97         }
98       } while (!cv_reads_started_.WaitWithDeadline(&mu_, deadline));
99       return false;
100     }
101 
102    private:
103     class RefCountedEventHandler : public RefCounted<RefCountedEventHandler> {
104      public:
RefCountedEventHandler(std::unique_ptr<StreamingCall::EventHandler> event_handler)105       explicit RefCountedEventHandler(
106           std::unique_ptr<StreamingCall::EventHandler> event_handler)
107           : event_handler_(std::move(event_handler)) {}
108 
OnRequestSent(bool ok)109       void OnRequestSent(bool ok) { event_handler_->OnRequestSent(ok); }
OnRecvMessage(absl::string_view payload)110       void OnRecvMessage(absl::string_view payload) {
111         event_handler_->OnRecvMessage(payload);
112       }
OnStatusReceived(absl::Status status)113       void OnStatusReceived(absl::Status status) {
114         event_handler_->OnStatusReceived(std::move(status));
115       }
116 
117      private:
118       std::unique_ptr<StreamingCall::EventHandler> event_handler_;
119     };
120 
121     void SendMessage(std::string payload) override;
122 
123     void CompleteSendMessageFromClientLocked(bool ok)
124         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
125     void MaybeDeliverMessageToClient();
126 
127     RefCountedPtr<FakeXdsTransport> transport_;
128     const char* method_;
129 
130     Mutex mu_;
131     CondVar cv_reads_started_;
132     CondVar cv_client_msg_;
133     RefCountedPtr<RefCountedEventHandler> event_handler_ ABSL_GUARDED_BY(&mu_);
134     std::deque<std::string> from_client_messages_ ABSL_GUARDED_BY(&mu_);
135     bool status_sent_ ABSL_GUARDED_BY(&mu_) = false;
136     bool orphaned_ ABSL_GUARDED_BY(&mu_) = false;
137     size_t reads_started_ ABSL_GUARDED_BY(&mu_) = 0;
138     size_t num_pending_reads_ ABSL_GUARDED_BY(&mu_) = 0;
139     std::deque<std::string> to_client_messages_ ABSL_GUARDED_BY(&mu_);
140   };
141 
FakeXdsTransportFactory(std::function<void ()> too_many_pending_reads_callback)142   explicit FakeXdsTransportFactory(
143       std::function<void()> too_many_pending_reads_callback)
144       : too_many_pending_reads_callback_(
145             std::move(too_many_pending_reads_callback)) {}
146 
147   using XdsTransportFactory::Ref;  // Make it public.
148 
149   void TriggerConnectionFailure(const XdsBootstrap::XdsServer& server,
150                                 absl::Status status);
151 
152   // By default, FakeStreamingCall will automatically invoke
153   // EventHandler::OnRequestSent() upon reading a request from the client.
154   // If this is set to false, that behavior will be inhibited, and
155   // EventHandler::OnRequestSent() will not be called until the test
156   // explicitly calls FakeStreamingCall::CompleteSendMessageFromClient().
157   //
158   // This value affects all transports created after this call is
159   // complete.  Any transport that already exists prior to this call
160   // will not be affected.
161   void SetAutoCompleteMessagesFromClient(bool value);
162 
163   // By default, FakeStreamingCall will automatically crash on
164   // destruction if there are messages from the client that have not
165   // been drained from the queue.  If this is set to false, that
166   // behavior will be inhibited.
167   //
168   // This value affects all transports created after this call is
169   // complete.  Any transport that already exists prior to this call
170   // will not be affected.
171   void SetAbortOnUndrainedMessages(bool value);
172 
173   RefCountedPtr<FakeStreamingCall> WaitForStream(
174       const XdsBootstrap::XdsServer& server, const char* method,
175       absl::Duration timeout);
176 
Orphan()177   void Orphan() override { Unref(); }
178 
179  private:
180   class FakeXdsTransport : public XdsTransport {
181    public:
FakeXdsTransport(RefCountedPtr<FakeXdsTransportFactory> factory,const XdsBootstrap::XdsServer & server,std::function<void (absl::Status)> on_connectivity_failure,bool auto_complete_messages_from_client,bool abort_on_undrained_messages)182     FakeXdsTransport(RefCountedPtr<FakeXdsTransportFactory> factory,
183                      const XdsBootstrap::XdsServer& server,
184                      std::function<void(absl::Status)> on_connectivity_failure,
185                      bool auto_complete_messages_from_client,
186                      bool abort_on_undrained_messages)
187         : factory_(std::move(factory)),
188           server_(server),
189           auto_complete_messages_from_client_(
190               auto_complete_messages_from_client),
191           abort_on_undrained_messages_(abort_on_undrained_messages),
192           on_connectivity_failure_(
193               MakeRefCounted<RefCountedOnConnectivityFailure>(
194                   std::move(on_connectivity_failure))) {}
195 
196     void Orphan() override;
197 
auto_complete_messages_from_client()198     bool auto_complete_messages_from_client() const {
199       return auto_complete_messages_from_client_;
200     }
201 
abort_on_undrained_messages()202     bool abort_on_undrained_messages() const {
203       return abort_on_undrained_messages_;
204     }
205 
206     using XdsTransport::Ref;  // Make it public.
207 
208     void TriggerConnectionFailure(absl::Status status);
209 
210     RefCountedPtr<FakeStreamingCall> WaitForStream(const char* method,
211                                                    absl::Duration timeout);
212 
213     void RemoveStream(const char* method, FakeStreamingCall* call);
214 
factory()215     FakeXdsTransportFactory* factory() const { return factory_.get(); }
216 
server()217     const XdsBootstrap::XdsServer* server() const { return &server_; }
218 
219    private:
220     class RefCountedOnConnectivityFailure
221         : public RefCounted<RefCountedOnConnectivityFailure> {
222      public:
RefCountedOnConnectivityFailure(std::function<void (absl::Status)> on_connectivity_failure)223       explicit RefCountedOnConnectivityFailure(
224           std::function<void(absl::Status)> on_connectivity_failure)
225           : on_connectivity_failure_(std::move(on_connectivity_failure)) {}
226 
Run(absl::Status status)227       void Run(absl::Status status) {
228         on_connectivity_failure_(std::move(status));
229       }
230 
231      private:
232       std::function<void(absl::Status)> on_connectivity_failure_;
233     };
234 
235     OrphanablePtr<StreamingCall> CreateStreamingCall(
236         const char* method,
237         std::unique_ptr<StreamingCall::EventHandler> event_handler) override;
238 
ResetBackoff()239     void ResetBackoff() override {}
240 
241     RefCountedPtr<FakeXdsTransportFactory> factory_;
242     const XdsBootstrap::XdsServer& server_;
243     const bool auto_complete_messages_from_client_;
244     const bool abort_on_undrained_messages_;
245 
246     Mutex mu_;
247     CondVar cv_;
248     RefCountedPtr<RefCountedOnConnectivityFailure> on_connectivity_failure_
249         ABSL_GUARDED_BY(&mu_);
250     std::map<std::string /*method*/, RefCountedPtr<FakeStreamingCall>>
251         active_calls_ ABSL_GUARDED_BY(&mu_);
252   };
253 
254   OrphanablePtr<XdsTransport> Create(
255       const XdsBootstrap::XdsServer& server,
256       std::function<void(absl::Status)> on_connectivity_failure,
257       absl::Status* status) override;
258 
259   RefCountedPtr<FakeXdsTransport> GetTransport(
260       const XdsBootstrap::XdsServer& server);
261 
262   Mutex mu_;
263   std::map<std::string /*XdsServer key*/, RefCountedPtr<FakeXdsTransport>>
264       transport_map_ ABSL_GUARDED_BY(&mu_);
265   bool auto_complete_messages_from_client_ ABSL_GUARDED_BY(&mu_) = true;
266   bool abort_on_undrained_messages_ ABSL_GUARDED_BY(&mu_) = true;
267   std::function<void()> too_many_pending_reads_callback_;
268 };
269 
270 }  // namespace grpc_core
271 
272 #endif  // GRPC_TEST_CORE_XDS_XDS_TRANSPORT_FAKE_H
273