1 // 2 // Copyright 2022 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_TEST_CORE_XDS_XDS_TRANSPORT_FAKE_H 18 #define GRPC_TEST_CORE_XDS_XDS_TRANSPORT_FAKE_H 19 20 #include <stddef.h> 21 22 #include <deque> 23 #include <functional> 24 #include <map> 25 #include <memory> 26 #include <string> 27 #include <utility> 28 29 #include "absl/base/thread_annotations.h" 30 #include "absl/status/status.h" 31 #include "absl/strings/string_view.h" 32 #include "absl/time/time.h" 33 #include "absl/types/optional.h" 34 35 #include <grpc/support/port_platform.h> 36 37 #include "src/core/ext/xds/xds_bootstrap.h" 38 #include "src/core/ext/xds/xds_transport.h" 39 #include "src/core/lib/gprpp/orphanable.h" 40 #include "src/core/lib/gprpp/ref_counted.h" 41 #include "src/core/lib/gprpp/ref_counted_ptr.h" 42 #include "src/core/lib/gprpp/sync.h" 43 44 namespace grpc_core { 45 46 class FakeXdsTransportFactory : public XdsTransportFactory { 47 private: 48 class FakeXdsTransport; 49 50 public: 51 static constexpr char kAdsMethod[] = 52 "/envoy.service.discovery.v3.AggregatedDiscoveryService/" 53 "StreamAggregatedResources"; 54 static constexpr char kLrsMethod[] = 55 "/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats"; 56 57 class FakeStreamingCall : public XdsTransport::StreamingCall { 58 public: FakeStreamingCall(RefCountedPtr<FakeXdsTransport> transport,const char * method,std::unique_ptr<StreamingCall::EventHandler> event_handler)59 FakeStreamingCall( 60 RefCountedPtr<FakeXdsTransport> transport, const char* method, 61 std::unique_ptr<StreamingCall::EventHandler> event_handler) 62 : transport_(std::move(transport)), 63 method_(method), 64 event_handler_(MakeRefCounted<RefCountedEventHandler>( 65 std::move(event_handler))) {} 66 67 ~FakeStreamingCall() override; 68 69 void Orphan() override; 70 71 void StartRecvMessage() override; 72 73 using StreamingCall::Ref; // Make it public. 74 75 bool HaveMessageFromClient(); 76 absl::optional<std::string> WaitForMessageFromClient( 77 absl::Duration timeout); 78 79 // If FakeXdsTransportFactory::SetAutoCompleteMessagesFromClient() 80 // was called to set the value to false before the creation of the 81 // transport that underlies this stream, then this must be called 82 // to invoke EventHandler::OnRequestSent() for every message read 83 // via WaitForMessageFromClient(). 84 void CompleteSendMessageFromClient(bool ok = true); 85 86 void SendMessageToClient(absl::string_view payload); 87 void MaybeSendStatusToClient(absl::Status status); 88 89 bool Orphaned(); 90 WaitForReadsStarted(size_t expected,absl::Duration timeout)91 bool WaitForReadsStarted(size_t expected, absl::Duration timeout) { 92 MutexLock lock(&mu_); 93 const absl::Time deadline = absl::Now() + timeout; 94 do { 95 if (reads_started_ == expected) { 96 return true; 97 } 98 } while (!cv_reads_started_.WaitWithDeadline(&mu_, deadline)); 99 return false; 100 } 101 102 private: 103 class RefCountedEventHandler : public RefCounted<RefCountedEventHandler> { 104 public: RefCountedEventHandler(std::unique_ptr<StreamingCall::EventHandler> event_handler)105 explicit RefCountedEventHandler( 106 std::unique_ptr<StreamingCall::EventHandler> event_handler) 107 : event_handler_(std::move(event_handler)) {} 108 OnRequestSent(bool ok)109 void OnRequestSent(bool ok) { event_handler_->OnRequestSent(ok); } OnRecvMessage(absl::string_view payload)110 void OnRecvMessage(absl::string_view payload) { 111 event_handler_->OnRecvMessage(payload); 112 } OnStatusReceived(absl::Status status)113 void OnStatusReceived(absl::Status status) { 114 event_handler_->OnStatusReceived(std::move(status)); 115 } 116 117 private: 118 std::unique_ptr<StreamingCall::EventHandler> event_handler_; 119 }; 120 121 void SendMessage(std::string payload) override; 122 123 void CompleteSendMessageFromClientLocked(bool ok) 124 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); 125 void MaybeDeliverMessageToClient(); 126 127 RefCountedPtr<FakeXdsTransport> transport_; 128 const char* method_; 129 130 Mutex mu_; 131 CondVar cv_reads_started_; 132 CondVar cv_client_msg_; 133 RefCountedPtr<RefCountedEventHandler> event_handler_ ABSL_GUARDED_BY(&mu_); 134 std::deque<std::string> from_client_messages_ ABSL_GUARDED_BY(&mu_); 135 bool status_sent_ ABSL_GUARDED_BY(&mu_) = false; 136 bool orphaned_ ABSL_GUARDED_BY(&mu_) = false; 137 size_t reads_started_ ABSL_GUARDED_BY(&mu_) = 0; 138 size_t num_pending_reads_ ABSL_GUARDED_BY(&mu_) = 0; 139 std::deque<std::string> to_client_messages_ ABSL_GUARDED_BY(&mu_); 140 }; 141 FakeXdsTransportFactory(std::function<void ()> too_many_pending_reads_callback)142 explicit FakeXdsTransportFactory( 143 std::function<void()> too_many_pending_reads_callback) 144 : too_many_pending_reads_callback_( 145 std::move(too_many_pending_reads_callback)) {} 146 147 using XdsTransportFactory::Ref; // Make it public. 148 149 void TriggerConnectionFailure(const XdsBootstrap::XdsServer& server, 150 absl::Status status); 151 152 // By default, FakeStreamingCall will automatically invoke 153 // EventHandler::OnRequestSent() upon reading a request from the client. 154 // If this is set to false, that behavior will be inhibited, and 155 // EventHandler::OnRequestSent() will not be called until the test 156 // explicitly calls FakeStreamingCall::CompleteSendMessageFromClient(). 157 // 158 // This value affects all transports created after this call is 159 // complete. Any transport that already exists prior to this call 160 // will not be affected. 161 void SetAutoCompleteMessagesFromClient(bool value); 162 163 // By default, FakeStreamingCall will automatically crash on 164 // destruction if there are messages from the client that have not 165 // been drained from the queue. If this is set to false, that 166 // behavior will be inhibited. 167 // 168 // This value affects all transports created after this call is 169 // complete. Any transport that already exists prior to this call 170 // will not be affected. 171 void SetAbortOnUndrainedMessages(bool value); 172 173 RefCountedPtr<FakeStreamingCall> WaitForStream( 174 const XdsBootstrap::XdsServer& server, const char* method, 175 absl::Duration timeout); 176 Orphan()177 void Orphan() override { Unref(); } 178 179 private: 180 class FakeXdsTransport : public XdsTransport { 181 public: FakeXdsTransport(RefCountedPtr<FakeXdsTransportFactory> factory,const XdsBootstrap::XdsServer & server,std::function<void (absl::Status)> on_connectivity_failure,bool auto_complete_messages_from_client,bool abort_on_undrained_messages)182 FakeXdsTransport(RefCountedPtr<FakeXdsTransportFactory> factory, 183 const XdsBootstrap::XdsServer& server, 184 std::function<void(absl::Status)> on_connectivity_failure, 185 bool auto_complete_messages_from_client, 186 bool abort_on_undrained_messages) 187 : factory_(std::move(factory)), 188 server_(server), 189 auto_complete_messages_from_client_( 190 auto_complete_messages_from_client), 191 abort_on_undrained_messages_(abort_on_undrained_messages), 192 on_connectivity_failure_( 193 MakeRefCounted<RefCountedOnConnectivityFailure>( 194 std::move(on_connectivity_failure))) {} 195 196 void Orphan() override; 197 auto_complete_messages_from_client()198 bool auto_complete_messages_from_client() const { 199 return auto_complete_messages_from_client_; 200 } 201 abort_on_undrained_messages()202 bool abort_on_undrained_messages() const { 203 return abort_on_undrained_messages_; 204 } 205 206 using XdsTransport::Ref; // Make it public. 207 208 void TriggerConnectionFailure(absl::Status status); 209 210 RefCountedPtr<FakeStreamingCall> WaitForStream(const char* method, 211 absl::Duration timeout); 212 213 void RemoveStream(const char* method, FakeStreamingCall* call); 214 factory()215 FakeXdsTransportFactory* factory() const { return factory_.get(); } 216 server()217 const XdsBootstrap::XdsServer* server() const { return &server_; } 218 219 private: 220 class RefCountedOnConnectivityFailure 221 : public RefCounted<RefCountedOnConnectivityFailure> { 222 public: RefCountedOnConnectivityFailure(std::function<void (absl::Status)> on_connectivity_failure)223 explicit RefCountedOnConnectivityFailure( 224 std::function<void(absl::Status)> on_connectivity_failure) 225 : on_connectivity_failure_(std::move(on_connectivity_failure)) {} 226 Run(absl::Status status)227 void Run(absl::Status status) { 228 on_connectivity_failure_(std::move(status)); 229 } 230 231 private: 232 std::function<void(absl::Status)> on_connectivity_failure_; 233 }; 234 235 OrphanablePtr<StreamingCall> CreateStreamingCall( 236 const char* method, 237 std::unique_ptr<StreamingCall::EventHandler> event_handler) override; 238 ResetBackoff()239 void ResetBackoff() override {} 240 241 RefCountedPtr<FakeXdsTransportFactory> factory_; 242 const XdsBootstrap::XdsServer& server_; 243 const bool auto_complete_messages_from_client_; 244 const bool abort_on_undrained_messages_; 245 246 Mutex mu_; 247 CondVar cv_; 248 RefCountedPtr<RefCountedOnConnectivityFailure> on_connectivity_failure_ 249 ABSL_GUARDED_BY(&mu_); 250 std::map<std::string /*method*/, RefCountedPtr<FakeStreamingCall>> 251 active_calls_ ABSL_GUARDED_BY(&mu_); 252 }; 253 254 OrphanablePtr<XdsTransport> Create( 255 const XdsBootstrap::XdsServer& server, 256 std::function<void(absl::Status)> on_connectivity_failure, 257 absl::Status* status) override; 258 259 RefCountedPtr<FakeXdsTransport> GetTransport( 260 const XdsBootstrap::XdsServer& server); 261 262 Mutex mu_; 263 std::map<std::string /*XdsServer key*/, RefCountedPtr<FakeXdsTransport>> 264 transport_map_ ABSL_GUARDED_BY(&mu_); 265 bool auto_complete_messages_from_client_ ABSL_GUARDED_BY(&mu_) = true; 266 bool abort_on_undrained_messages_ ABSL_GUARDED_BY(&mu_) = true; 267 std::function<void()> too_many_pending_reads_callback_; 268 }; 269 270 } // namespace grpc_core 271 272 #endif // GRPC_TEST_CORE_XDS_XDS_TRANSPORT_FAKE_H 273