xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/xds/xds_server.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2017 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_TEST_CPP_END2END_XDS_XDS_SERVER_H
18 #define GRPC_TEST_CPP_END2END_XDS_XDS_SERVER_H
19 
20 #include <deque>
21 #include <set>
22 #include <string>
23 #include <thread>
24 #include <vector>
25 
26 #include "absl/types/optional.h"
27 
28 #include <grpc/support/log.h>
29 #include <grpcpp/support/status.h>
30 
31 #include "src/core/lib/address_utils/parse_address.h"
32 #include "src/core/lib/gprpp/crash.h"
33 #include "src/core/lib/gprpp/sync.h"
34 #include "src/proto/grpc/testing/xds/v3/ads.grpc.pb.h"
35 #include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h"
36 #include "src/proto/grpc/testing/xds/v3/discovery.grpc.pb.h"
37 #include "src/proto/grpc/testing/xds/v3/discovery.pb.h"
38 #include "src/proto/grpc/testing/xds/v3/endpoint.grpc.pb.h"
39 #include "src/proto/grpc/testing/xds/v3/listener.grpc.pb.h"
40 #include "src/proto/grpc/testing/xds/v3/lrs.grpc.pb.h"
41 #include "src/proto/grpc/testing/xds/v3/route.grpc.pb.h"
42 #include "test/core/util/test_config.h"
43 #include "test/cpp/end2end/counted_service.h"
44 
45 namespace grpc {
46 namespace testing {
47 
48 constexpr char kLdsTypeUrl[] =
49     "type.googleapis.com/envoy.config.listener.v3.Listener";
50 constexpr char kRdsTypeUrl[] =
51     "type.googleapis.com/envoy.config.route.v3.RouteConfiguration";
52 constexpr char kCdsTypeUrl[] =
53     "type.googleapis.com/envoy.config.cluster.v3.Cluster";
54 constexpr char kEdsTypeUrl[] =
55     "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
56 
57 // An ADS service implementation.
58 class AdsServiceImpl
59     : public CountedService<
60           ::envoy::service::discovery::v3::AggregatedDiscoveryService::Service>,
61       public std::enable_shared_from_this<AdsServiceImpl> {
62  public:
63   using DiscoveryRequest = ::envoy::service::discovery::v3::DiscoveryRequest;
64   using DiscoveryResponse = ::envoy::service::discovery::v3::DiscoveryResponse;
65 
66   // State for a given xDS resource type.
67   struct ResponseState {
68     enum State {
69       ACKED,   // ACK received.
70       NACKED,  // NACK received; error_message will contain the error.
71     };
72     State state = ACKED;
73     std::string error_message;
74   };
75 
76   explicit AdsServiceImpl(
77       std::function<void(const DiscoveryRequest& request)> check_first_request =
78           nullptr,
79       std::function<void(absl::StatusCode)> check_nack_status_code = nullptr,
80       absl::string_view debug_label = "")
check_first_request_(std::move (check_first_request))81       : check_first_request_(std::move(check_first_request)),
82         check_nack_status_code_(std::move(check_nack_status_code)),
83         debug_label_(absl::StrFormat(
84             "%p%s%s", this, debug_label.empty() ? "" : ":", debug_label)) {}
85 
set_wrap_resources(bool wrap_resources)86   void set_wrap_resources(bool wrap_resources) {
87     grpc_core::MutexLock lock(&ads_mu_);
88     wrap_resources_ = wrap_resources;
89   }
90 
91   // Sets a resource to a particular value, overwriting any previous value.
92   void SetResource(google::protobuf::Any resource, const std::string& type_url,
93                    const std::string& name);
94 
95   // Removes a resource from the server's state.
96   void UnsetResource(const std::string& type_url, const std::string& name);
97 
SetLdsResource(const::envoy::config::listener::v3::Listener & listener)98   void SetLdsResource(const ::envoy::config::listener::v3::Listener& listener) {
99     google::protobuf::Any resource;
100     resource.PackFrom(listener);
101     SetResource(std::move(resource), kLdsTypeUrl, listener.name());
102   }
103 
SetRdsResource(const::envoy::config::route::v3::RouteConfiguration & route)104   void SetRdsResource(
105       const ::envoy::config::route::v3::RouteConfiguration& route) {
106     google::protobuf::Any resource;
107     resource.PackFrom(route);
108     SetResource(std::move(resource), kRdsTypeUrl, route.name());
109   }
110 
SetCdsResource(const::envoy::config::cluster::v3::Cluster & cluster)111   void SetCdsResource(const ::envoy::config::cluster::v3::Cluster& cluster) {
112     google::protobuf::Any resource;
113     resource.PackFrom(cluster);
114     SetResource(std::move(resource), kCdsTypeUrl, cluster.name());
115   }
116 
SetEdsResource(const::envoy::config::endpoint::v3::ClusterLoadAssignment & assignment)117   void SetEdsResource(
118       const ::envoy::config::endpoint::v3::ClusterLoadAssignment& assignment) {
119     google::protobuf::Any resource;
120     resource.PackFrom(assignment);
121     SetResource(std::move(resource), kEdsTypeUrl, assignment.cluster_name());
122   }
123 
124   // Tells the server to ignore requests from the client for a given
125   // resource type.
IgnoreResourceType(const std::string & type_url)126   void IgnoreResourceType(const std::string& type_url) {
127     grpc_core::MutexLock lock(&ads_mu_);
128     resource_types_to_ignore_.emplace(type_url);
129   }
130 
131   // Sets a callback to be invoked on request messages with respoonse_nonce
132   // set.  The callback is passed the resource type and version.
SetCheckVersionCallback(std::function<void (absl::string_view,int)> check_version_callack)133   void SetCheckVersionCallback(
134       std::function<void(absl::string_view, int)> check_version_callack) {
135     grpc_core::MutexLock lock(&ads_mu_);
136     check_version_callack_ = std::move(check_version_callack);
137   }
138 
139   // Get the list of response state for each resource type.
140   // TODO(roth): Consider adding an absl::Notification-based mechanism
141   // here to avoid the need for tests to poll the response state.
GetResponseState(const std::string & type_url)142   absl::optional<ResponseState> GetResponseState(const std::string& type_url) {
143     grpc_core::MutexLock lock(&ads_mu_);
144     if (resource_type_response_state_[type_url].empty()) {
145       return absl::nullopt;
146     }
147     auto response = resource_type_response_state_[type_url].front();
148     resource_type_response_state_[type_url].pop_front();
149     return response;
150   }
lds_response_state()151   absl::optional<ResponseState> lds_response_state() {
152     return GetResponseState(kLdsTypeUrl);
153   }
rds_response_state()154   absl::optional<ResponseState> rds_response_state() {
155     return GetResponseState(kRdsTypeUrl);
156   }
cds_response_state()157   absl::optional<ResponseState> cds_response_state() {
158     return GetResponseState(kCdsTypeUrl);
159   }
eds_response_state()160   absl::optional<ResponseState> eds_response_state() {
161     return GetResponseState(kEdsTypeUrl);
162   }
163 
164   // Starts the service.
165   void Start();
166 
167   // Shuts down the service.
168   void Shutdown();
169 
170   // Returns the peer names of clients currently connected to the service.
clients()171   std::set<std::string> clients() {
172     grpc_core::MutexLock lock(&clients_mu_);
173     return clients_;
174   }
175 
ForceADSFailure(Status status)176   void ForceADSFailure(Status status) {
177     grpc_core::MutexLock lock(&ads_mu_);
178     forced_ads_failure_ = std::move(status);
179   }
180 
ClearADSFailure()181   void ClearADSFailure() {
182     grpc_core::MutexLock lock(&ads_mu_);
183     forced_ads_failure_ = absl::nullopt;
184   }
185 
186  private:
187   // A queue of resource type/name pairs that have changed since the client
188   // subscribed to them.
189   using UpdateQueue = std::deque<
190       std::pair<std::string /* type url */, std::string /* resource name */>>;
191 
192   // A struct representing a client's subscription to a particular resource.
193   struct SubscriptionState {
194     // The queue upon which to place updates when the resource is updated.
195     UpdateQueue* update_queue;
196   };
197 
198   // A struct representing the a client's subscription to all the resources.
199   using SubscriptionNameMap =
200       std::map<std::string /* resource_name */, SubscriptionState>;
201   using SubscriptionMap =
202       std::map<std::string /* type_url */, SubscriptionNameMap>;
203 
204   // Sent state for a given resource type.
205   struct SentState {
206     int nonce = 0;
207     int resource_type_version = 0;
208   };
209 
210   // A struct representing the current state for an individual resource.
211   struct ResourceState {
212     // The resource itself, if present.
213     absl::optional<google::protobuf::Any> resource;
214     // The resource type version that this resource was last updated in.
215     int resource_type_version = 0;
216     // A list of subscriptions to this resource.
217     std::set<SubscriptionState*> subscriptions;
218   };
219 
220   // The current state for all individual resources of a given type.
221   using ResourceNameMap =
222       std::map<std::string /* resource_name */, ResourceState>;
223 
224   struct ResourceTypeState {
225     int resource_type_version = 0;
226     ResourceNameMap resource_name_map;
227   };
228 
229   using ResourceMap = std::map<std::string /* type_url */, ResourceTypeState>;
230 
231   using Stream = ServerReaderWriter<DiscoveryResponse, DiscoveryRequest>;
232 
StreamAggregatedResources(ServerContext * context,Stream * stream)233   Status StreamAggregatedResources(ServerContext* context,
234                                    Stream* stream) override {
235     gpr_log(GPR_INFO, "ADS[%s]: StreamAggregatedResources starts",
236             debug_label_.c_str());
237     {
238       grpc_core::MutexLock lock(&ads_mu_);
239       if (forced_ads_failure_.has_value()) {
240         gpr_log(GPR_INFO,
241                 "ADS[%s]: StreamAggregatedResources forcing early failure "
242                 "with status code: %d, message: %s",
243                 debug_label_.c_str(), forced_ads_failure_.value().error_code(),
244                 forced_ads_failure_.value().error_message().c_str());
245         return forced_ads_failure_.value();
246       }
247     }
248     AddClient(context->peer());
249     // Take a reference of the AdsServiceImpl object, which will go
250     // out of scope when this request handler returns.  This ensures
251     // that the parent won't be destroyed until this stream is complete.
252     std::shared_ptr<AdsServiceImpl> ads_service_impl = shared_from_this();
253     // Resources (type/name pairs) that have changed since the client
254     // subscribed to them.
255     UpdateQueue update_queue;
256     // Resources that the client will be subscribed to keyed by resource type
257     // url.
258     SubscriptionMap subscription_map;
259     // Sent state for each resource type.
260     std::map<std::string /*type_url*/, SentState> sent_state_map;
261     // Spawn a thread to read requests from the stream.
262     // Requests will be delivered to this thread in a queue.
263     std::deque<DiscoveryRequest> requests;
264     bool stream_closed = false;
265     std::thread reader(std::bind(&AdsServiceImpl::BlockingRead, this, stream,
266                                  &requests, &stream_closed));
267     // Main loop to process requests and updates.
268     while (true) {
269       // Boolean to keep track if the loop received any work to do: a
270       // request or an update; regardless whether a response was actually
271       // sent out.
272       bool did_work = false;
273       // Look for new requests and decide what to handle.
274       absl::optional<DiscoveryResponse> response;
275       {
276         grpc_core::MutexLock lock(&ads_mu_);
277         // If the stream has been closed or our parent is being shut
278         // down, stop immediately.
279         if (stream_closed || ads_done_) break;
280         // Otherwise, see if there's a request to read from the queue.
281         if (!requests.empty()) {
282           DiscoveryRequest request = std::move(requests.front());
283           requests.pop_front();
284           did_work = true;
285           gpr_log(GPR_INFO,
286                   "ADS[%s]: Received request for type %s with content %s",
287                   debug_label_.c_str(), request.type_url().c_str(),
288                   request.DebugString().c_str());
289           SentState& sent_state = sent_state_map[request.type_url()];
290           // Process request.
291           ProcessRequest(request, &update_queue, &subscription_map, &sent_state,
292                          &response);
293         }
294       }
295       if (response.has_value()) {
296         gpr_log(GPR_INFO, "ADS[%s]: Sending response: %s", debug_label_.c_str(),
297                 response->DebugString().c_str());
298         stream->Write(response.value());
299       }
300       response.reset();
301       // Look for updates and decide what to handle.
302       {
303         grpc_core::MutexLock lock(&ads_mu_);
304         if (!update_queue.empty()) {
305           const std::string resource_type =
306               std::move(update_queue.front().first);
307           const std::string resource_name =
308               std::move(update_queue.front().second);
309           update_queue.pop_front();
310           did_work = true;
311           SentState& sent_state = sent_state_map[resource_type];
312           ProcessUpdate(resource_type, resource_name, &subscription_map,
313                         &sent_state, &response);
314         }
315       }
316       if (response.has_value()) {
317         gpr_log(GPR_INFO, "ADS[%s]: Sending update response: %s",
318                 debug_label_.c_str(), response->DebugString().c_str());
319         stream->Write(response.value());
320       }
321       {
322         grpc_core::MutexLock lock(&ads_mu_);
323         if (ads_done_) {
324           break;
325         }
326       }
327       // If we didn't find anything to do, delay before the next loop
328       // iteration; otherwise, check whether we should exit and then
329       // immediately continue.
330       gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(did_work ? 0 : 10));
331     }
332     // Done with main loop.  Clean up before returning.
333     // Join reader thread.
334     reader.join();
335     // Clean up any subscriptions that were still active when the call
336     // finished.
337     {
338       grpc_core::MutexLock lock(&ads_mu_);
339       for (auto& p : subscription_map) {
340         const std::string& type_url = p.first;
341         SubscriptionNameMap& subscription_name_map = p.second;
342         for (auto& q : subscription_name_map) {
343           const std::string& resource_name = q.first;
344           SubscriptionState& subscription_state = q.second;
345           ResourceNameMap& resource_name_map =
346               resource_map_[type_url].resource_name_map;
347           ResourceState& resource_state = resource_name_map[resource_name];
348           resource_state.subscriptions.erase(&subscription_state);
349         }
350       }
351     }
352     gpr_log(GPR_INFO, "ADS[%s]: StreamAggregatedResources done",
353             debug_label_.c_str());
354     RemoveClient(context->peer());
355     return Status::OK;
356   }
357 
358   // Processes a response read from the client.
359   // Populates response if needed.
ProcessRequest(const DiscoveryRequest & request,UpdateQueue * update_queue,SubscriptionMap * subscription_map,SentState * sent_state,absl::optional<DiscoveryResponse> * response)360   void ProcessRequest(const DiscoveryRequest& request,
361                       UpdateQueue* update_queue,
362                       SubscriptionMap* subscription_map, SentState* sent_state,
363                       absl::optional<DiscoveryResponse>* response)
364       ABSL_EXCLUSIVE_LOCKS_REQUIRED(ads_mu_) {
365     // Check the nonce sent by the client, if any.
366     // (This will be absent on the first request on a stream.)
367     if (request.response_nonce().empty()) {
368       int client_resource_type_version = 0;
369       if (!request.version_info().empty()) {
370         GPR_ASSERT(absl::SimpleAtoi(request.version_info(),
371                                     &client_resource_type_version));
372       }
373       if (check_version_callack_ != nullptr) {
374         check_version_callack_(request.type_url(),
375                                client_resource_type_version);
376       }
377     } else {
378       int client_nonce;
379       GPR_ASSERT(absl::SimpleAtoi(request.response_nonce(), &client_nonce));
380       // Check for ACK or NACK.
381       ResponseState response_state;
382       if (!request.has_error_detail()) {
383         response_state.state = ResponseState::ACKED;
384         gpr_log(GPR_INFO, "ADS[%s]: client ACKed resource_type=%s version=%s",
385                 debug_label_.c_str(), request.type_url().c_str(),
386                 request.version_info().c_str());
387       } else {
388         response_state.state = ResponseState::NACKED;
389         if (check_nack_status_code_ != nullptr) {
390           check_nack_status_code_(
391               static_cast<absl::StatusCode>(request.error_detail().code()));
392         }
393         response_state.error_message = request.error_detail().message();
394         gpr_log(GPR_INFO,
395                 "ADS[%s]: client NACKed resource_type=%s version=%s: %s",
396                 debug_label_.c_str(), request.type_url().c_str(),
397                 request.version_info().c_str(),
398                 response_state.error_message.c_str());
399       }
400       resource_type_response_state_[request.type_url()].emplace_back(
401           std::move(response_state));
402       // Ignore requests with stale nonces.
403       if (client_nonce < sent_state->nonce) return;
404     }
405     // Ignore resource types as requested by tests.
406     if (resource_types_to_ignore_.find(request.type_url()) !=
407         resource_types_to_ignore_.end()) {
408       return;
409     }
410     // Look at all the resource names in the request.
411     auto& subscription_name_map = (*subscription_map)[request.type_url()];
412     auto& resource_type_state = resource_map_[request.type_url()];
413     auto& resource_name_map = resource_type_state.resource_name_map;
414     std::set<std::string> resources_in_current_request;
415     std::set<std::string> resources_added_to_response;
416     for (const std::string& resource_name : request.resource_names()) {
417       resources_in_current_request.emplace(resource_name);
418       auto& subscription_state = subscription_name_map[resource_name];
419       auto& resource_state = resource_name_map[resource_name];
420       // Subscribe if needed.
421       // Send the resource in the response if either (a) this is
422       // a new subscription or (b) there is an updated version of
423       // this resource to send.
424       if (MaybeSubscribe(request.type_url(), resource_name, &subscription_state,
425                          &resource_state, update_queue) ||
426           ClientNeedsResourceUpdate(resource_type_state, resource_state,
427                                     sent_state->resource_type_version)) {
428         gpr_log(GPR_INFO, "ADS[%s]: Sending update for type=%s name=%s",
429                 debug_label_.c_str(), request.type_url().c_str(),
430                 resource_name.c_str());
431         resources_added_to_response.emplace(resource_name);
432         if (!response->has_value()) response->emplace();
433         if (resource_state.resource.has_value()) {
434           auto* resource = (*response)->add_resources();
435           resource->CopyFrom(resource_state.resource.value());
436           if (wrap_resources_) {
437             envoy::service::discovery::v3::Resource resource_wrapper;
438             *resource_wrapper.mutable_resource() = std::move(*resource);
439             resource->PackFrom(resource_wrapper);
440           }
441         }
442       } else {
443         gpr_log(GPR_INFO,
444                 "ADS[%s]: client does not need update for type=%s name=%s",
445                 debug_label_.c_str(), request.type_url().c_str(),
446                 resource_name.c_str());
447       }
448     }
449     // Process unsubscriptions for any resource no longer
450     // present in the request's resource list.
451     ProcessUnsubscriptions(request.type_url(), resources_in_current_request,
452                            &subscription_name_map, &resource_name_map);
453     // Construct response if needed.
454     if (!resources_added_to_response.empty()) {
455       CompleteBuildingDiscoveryResponse(
456           request.type_url(), resource_type_state.resource_type_version,
457           subscription_name_map, resources_added_to_response, sent_state,
458           &response->value());
459     }
460   }
461 
462   // Processes a resource update from the test.
463   // Populates response if needed.
ProcessUpdate(const std::string & resource_type,const std::string & resource_name,SubscriptionMap * subscription_map,SentState * sent_state,absl::optional<DiscoveryResponse> * response)464   void ProcessUpdate(const std::string& resource_type,
465                      const std::string& resource_name,
466                      SubscriptionMap* subscription_map, SentState* sent_state,
467                      absl::optional<DiscoveryResponse>* response)
468       ABSL_EXCLUSIVE_LOCKS_REQUIRED(ads_mu_) {
469     gpr_log(GPR_INFO, "ADS[%s]: Received update for type=%s name=%s",
470             debug_label_.c_str(), resource_type.c_str(), resource_name.c_str());
471     auto& subscription_name_map = (*subscription_map)[resource_type];
472     auto& resource_type_state = resource_map_[resource_type];
473     auto& resource_name_map = resource_type_state.resource_name_map;
474     auto it = subscription_name_map.find(resource_name);
475     if (it != subscription_name_map.end()) {
476       ResourceState& resource_state = resource_name_map[resource_name];
477       if (ClientNeedsResourceUpdate(resource_type_state, resource_state,
478                                     sent_state->resource_type_version)) {
479         gpr_log(GPR_INFO, "ADS[%s]: Sending update for type=%s name=%s",
480                 debug_label_.c_str(), resource_type.c_str(),
481                 resource_name.c_str());
482         response->emplace();
483         if (resource_state.resource.has_value()) {
484           auto* resource = (*response)->add_resources();
485           resource->CopyFrom(resource_state.resource.value());
486         }
487         CompleteBuildingDiscoveryResponse(
488             resource_type, resource_type_state.resource_type_version,
489             subscription_name_map, {resource_name}, sent_state,
490             &response->value());
491       }
492     }
493   }
494 
495   // Starting a thread to do blocking read on the stream until cancel.
BlockingRead(Stream * stream,std::deque<DiscoveryRequest> * requests,bool * stream_closed)496   void BlockingRead(Stream* stream, std::deque<DiscoveryRequest>* requests,
497                     bool* stream_closed) {
498     DiscoveryRequest request;
499     bool seen_first_request = false;
500     while (stream->Read(&request)) {
501       if (!seen_first_request) {
502         if (check_first_request_ != nullptr) {
503           check_first_request_(request);
504         }
505         seen_first_request = true;
506       }
507       {
508         grpc_core::MutexLock lock(&ads_mu_);
509         requests->emplace_back(std::move(request));
510       }
511     }
512     gpr_log(GPR_INFO, "ADS[%s]: Null read, stream closed",
513             debug_label_.c_str());
514     grpc_core::MutexLock lock(&ads_mu_);
515     *stream_closed = true;
516   }
517 
518   // Completing the building a DiscoveryResponse by adding common information
519   // for all resources and by adding all subscribed resources for LDS and CDS.
CompleteBuildingDiscoveryResponse(const std::string & resource_type,const int version,const SubscriptionNameMap & subscription_name_map,const std::set<std::string> & resources_added_to_response,SentState * sent_state,DiscoveryResponse * response)520   void CompleteBuildingDiscoveryResponse(
521       const std::string& resource_type, const int version,
522       const SubscriptionNameMap& subscription_name_map,
523       const std::set<std::string>& resources_added_to_response,
524       SentState* sent_state, DiscoveryResponse* response)
525       ABSL_EXCLUSIVE_LOCKS_REQUIRED(ads_mu_) {
526     response->set_type_url(resource_type);
527     response->set_version_info(std::to_string(version));
528     response->set_nonce(std::to_string(++sent_state->nonce));
529     if (resource_type == kLdsTypeUrl || resource_type == kCdsTypeUrl) {
530       // For LDS and CDS we must send back all subscribed resources
531       // (even the unchanged ones)
532       for (const auto& p : subscription_name_map) {
533         const std::string& resource_name = p.first;
534         if (resources_added_to_response.find(resource_name) ==
535             resources_added_to_response.end()) {
536           ResourceNameMap& resource_name_map =
537               resource_map_[resource_type].resource_name_map;
538           const ResourceState& resource_state =
539               resource_name_map[resource_name];
540           if (resource_state.resource.has_value()) {
541             auto* resource = response->add_resources();
542             resource->CopyFrom(resource_state.resource.value());
543           }
544         }
545       }
546     }
547     sent_state->resource_type_version = version;
548   }
549 
550   // Checks whether the client needs to receive a newer version of
551   // the resource.
552   static bool ClientNeedsResourceUpdate(
553       const ResourceTypeState& resource_type_state,
554       const ResourceState& resource_state, int client_resource_type_version);
555 
556   // Subscribes to a resource if not already subscribed:
557   // 1. Sets the update_queue field in subscription_state.
558   // 2. Adds subscription_state to resource_state->subscriptions.
559   bool MaybeSubscribe(const std::string& resource_type,
560                       const std::string& resource_name,
561                       SubscriptionState* subscription_state,
562                       ResourceState* resource_state, UpdateQueue* update_queue);
563 
564   // Removes subscriptions for resources no longer present in the
565   // current request.
566   void ProcessUnsubscriptions(
567       const std::string& resource_type,
568       const std::set<std::string>& resources_in_current_request,
569       SubscriptionNameMap* subscription_name_map,
570       ResourceNameMap* resource_name_map);
571 
AddClient(const std::string & client)572   void AddClient(const std::string& client) {
573     grpc_core::MutexLock lock(&clients_mu_);
574     clients_.insert(client);
575   }
576 
RemoveClient(const std::string & client)577   void RemoveClient(const std::string& client) {
578     grpc_core::MutexLock lock(&clients_mu_);
579     clients_.erase(client);
580   }
581 
582   std::function<void(const DiscoveryRequest& request)> check_first_request_;
583   std::function<void(absl::StatusCode)> check_nack_status_code_;
584   std::string debug_label_;
585 
586   grpc_core::CondVar ads_cond_;
587   grpc_core::Mutex ads_mu_;
588   bool ads_done_ ABSL_GUARDED_BY(ads_mu_) = false;
589   std::map<std::string /* type_url */, std::deque<ResponseState>>
590       resource_type_response_state_ ABSL_GUARDED_BY(ads_mu_);
591   std::set<std::string /*resource_type*/> resource_types_to_ignore_
592       ABSL_GUARDED_BY(ads_mu_);
593   std::function<void(absl::string_view, int)> check_version_callack_
594       ABSL_GUARDED_BY(ads_mu_);
595   // An instance data member containing the current state of all resources.
596   // Note that an entry will exist whenever either of the following is true:
597   // - The resource exists (i.e., has been created by SetResource() and has not
598   //   yet been destroyed by UnsetResource()).
599   // - There is at least one subscription for the resource.
600   ResourceMap resource_map_ ABSL_GUARDED_BY(ads_mu_);
601   absl::optional<Status> forced_ads_failure_ ABSL_GUARDED_BY(ads_mu_);
602   bool wrap_resources_ ABSL_GUARDED_BY(ads_mu_) = false;
603 
604   grpc_core::Mutex clients_mu_;
605   std::set<std::string> clients_ ABSL_GUARDED_BY(clients_mu_);
606 };
607 
608 // An LRS service implementation.
609 class LrsServiceImpl
610     : public CountedService<
611           ::envoy::service::load_stats::v3::LoadReportingService::Service>,
612       public std::enable_shared_from_this<LrsServiceImpl> {
613  public:
614   using LoadStatsRequest = ::envoy::service::load_stats::v3::LoadStatsRequest;
615   using LoadStatsResponse = ::envoy::service::load_stats::v3::LoadStatsResponse;
616 
617   // Stats reported by client.
618   class ClientStats {
619    public:
620     // Stats for a given locality.
621     struct LocalityStats {
622       struct LoadMetric {
623         uint64_t num_requests_finished_with_metric;
624         double total_metric_value;
625         LoadMetric& operator+=(const LoadMetric& other) {
626           num_requests_finished_with_metric +=
627               other.num_requests_finished_with_metric;
628           total_metric_value += other.total_metric_value;
629           return *this;
630         }
631       };
632 
LocalityStatsLocalityStats633       LocalityStats() {}
634 
635       // Converts from proto message class.
LocalityStatsLocalityStats636       explicit LocalityStats(
637           const ::envoy::config::endpoint::v3::UpstreamLocalityStats&
638               upstream_locality_stats)
639           : total_successful_requests(
640                 upstream_locality_stats.total_successful_requests()),
641             total_requests_in_progress(
642                 upstream_locality_stats.total_requests_in_progress()),
643             total_error_requests(
644                 upstream_locality_stats.total_error_requests()),
645             total_issued_requests(
646                 upstream_locality_stats.total_issued_requests()) {
647         for (const auto& s : upstream_locality_stats.load_metric_stats()) {
648           load_metrics[s.metric_name()] += LoadMetric{
649               s.num_requests_finished_with_metric(), s.total_metric_value()};
650         }
651       }
652 
653       LocalityStats& operator+=(const LocalityStats& other) {
654         total_successful_requests += other.total_successful_requests;
655         total_requests_in_progress += other.total_requests_in_progress;
656         total_error_requests += other.total_error_requests;
657         total_issued_requests += other.total_issued_requests;
658         for (const auto& p : other.load_metrics) {
659           load_metrics[p.first] += p.second;
660         }
661         return *this;
662       }
663 
664       uint64_t total_successful_requests = 0;
665       uint64_t total_requests_in_progress = 0;
666       uint64_t total_error_requests = 0;
667       uint64_t total_issued_requests = 0;
668       std::map<std::string, LoadMetric> load_metrics;
669     };
670 
ClientStats()671     ClientStats() {}
672 
673     // Converts from proto message class.
ClientStats(const::envoy::config::endpoint::v3::ClusterStats & cluster_stats)674     explicit ClientStats(
675         const ::envoy::config::endpoint::v3::ClusterStats& cluster_stats)
676         : cluster_name_(cluster_stats.cluster_name()),
677           eds_service_name_(cluster_stats.cluster_service_name()),
678           total_dropped_requests_(cluster_stats.total_dropped_requests()) {
679       for (const auto& input_locality_stats :
680            cluster_stats.upstream_locality_stats()) {
681         locality_stats_.emplace(input_locality_stats.locality().sub_zone(),
682                                 LocalityStats(input_locality_stats));
683       }
684       for (const auto& input_dropped_requests :
685            cluster_stats.dropped_requests()) {
686         dropped_requests_.emplace(input_dropped_requests.category(),
687                                   input_dropped_requests.dropped_count());
688       }
689     }
690 
cluster_name()691     const std::string& cluster_name() const { return cluster_name_; }
eds_service_name()692     const std::string& eds_service_name() const { return eds_service_name_; }
693 
locality_stats()694     const std::map<std::string, LocalityStats>& locality_stats() const {
695       return locality_stats_;
696     }
697 
698     uint64_t total_successful_requests() const;
699     uint64_t total_requests_in_progress() const;
700     uint64_t total_error_requests() const;
701     uint64_t total_issued_requests() const;
702 
total_dropped_requests()703     uint64_t total_dropped_requests() const { return total_dropped_requests_; }
704 
705     uint64_t dropped_requests(const std::string& category) const;
706 
707     ClientStats& operator+=(const ClientStats& other);
708 
709    private:
710     std::string cluster_name_;
711     std::string eds_service_name_;
712     std::map<std::string, LocalityStats> locality_stats_;
713     uint64_t total_dropped_requests_ = 0;
714     std::map<std::string, uint64_t> dropped_requests_;
715   };
716 
717   LrsServiceImpl(int client_load_reporting_interval_seconds,
718                  std::set<std::string> cluster_names,
719                  std::function<void()> stream_started_callback = nullptr,
720                  std::function<void(const LoadStatsRequest& request)>
721                      check_first_request = nullptr,
722                  absl::string_view debug_label = "")
client_load_reporting_interval_seconds_(client_load_reporting_interval_seconds)723       : client_load_reporting_interval_seconds_(
724             client_load_reporting_interval_seconds),
725         cluster_names_(std::move(cluster_names)),
726         stream_started_callback_(std::move(stream_started_callback)),
727         check_first_request_(std::move(check_first_request)),
728         debug_label_(absl::StrFormat(
729             "%p%s%s", this, debug_label.empty() ? "" : ":", debug_label)) {}
730 
731   // Must be called before the LRS call is started.
set_send_all_clusters(bool send_all_clusters)732   void set_send_all_clusters(bool send_all_clusters) {
733     send_all_clusters_ = send_all_clusters;
734   }
set_cluster_names(const std::set<std::string> & cluster_names)735   void set_cluster_names(const std::set<std::string>& cluster_names) {
736     cluster_names_ = cluster_names;
737   }
738 
739   void Start() ABSL_LOCKS_EXCLUDED(lrs_mu_, load_report_mu_);
740 
741   void Shutdown();
742 
743   // Returns an empty vector if the timeout elapses with no load report.
744   // TODO(roth): Change the default here to a finite duration and verify
745   // that it doesn't cause failures in any existing tests.
746   std::vector<ClientStats> WaitForLoadReport(
747       absl::Duration timeout = absl::InfiniteDuration());
748 
749  private:
750   using Stream = ServerReaderWriter<LoadStatsResponse, LoadStatsRequest>;
751 
StreamLoadStats(ServerContext *,Stream * stream)752   Status StreamLoadStats(ServerContext* /*context*/, Stream* stream) override {
753     gpr_log(GPR_INFO, "LRS[%s]: StreamLoadStats starts", debug_label_.c_str());
754     if (stream_started_callback_ != nullptr) stream_started_callback_();
755     // Take a reference of the LrsServiceImpl object, reference will go
756     // out of scope after this method exits.
757     std::shared_ptr<LrsServiceImpl> lrs_service_impl = shared_from_this();
758     // Read initial request.
759     LoadStatsRequest request;
760     if (stream->Read(&request)) {
761       IncreaseRequestCount();
762       if (check_first_request_ != nullptr) check_first_request_(request);
763       // Send initial response.
764       LoadStatsResponse response;
765       if (send_all_clusters_) {
766         response.set_send_all_clusters(true);
767       } else {
768         for (const std::string& cluster_name : cluster_names_) {
769           response.add_clusters(cluster_name);
770         }
771       }
772       response.mutable_load_reporting_interval()->set_seconds(
773           client_load_reporting_interval_seconds_ *
774           grpc_test_slowdown_factor());
775       stream->Write(response);
776       IncreaseResponseCount();
777       // Wait for report.
778       request.Clear();
779       while (stream->Read(&request)) {
780         gpr_log(GPR_INFO, "LRS[%s]: received client load report message: %s",
781                 debug_label_.c_str(), request.DebugString().c_str());
782         std::vector<ClientStats> stats;
783         for (const auto& cluster_stats : request.cluster_stats()) {
784           stats.emplace_back(cluster_stats);
785         }
786         grpc_core::MutexLock lock(&load_report_mu_);
787         result_queue_.emplace_back(std::move(stats));
788         if (load_report_cond_ != nullptr) {
789           load_report_cond_->Signal();
790         }
791       }
792       // Wait until notified done.
793       grpc_core::MutexLock lock(&lrs_mu_);
794       while (!lrs_done_) {
795         lrs_cv_.Wait(&lrs_mu_);
796       }
797     }
798     gpr_log(GPR_INFO, "LRS[%s]: StreamLoadStats done", debug_label_.c_str());
799     return Status::OK;
800   }
801 
802   const int client_load_reporting_interval_seconds_;
803   bool send_all_clusters_ = false;
804   std::set<std::string> cluster_names_;
805   std::function<void()> stream_started_callback_;
806   std::function<void(const LoadStatsRequest& request)> check_first_request_;
807   std::string debug_label_;
808 
809   grpc_core::CondVar lrs_cv_;
810   grpc_core::Mutex lrs_mu_;
811   bool lrs_done_ ABSL_GUARDED_BY(lrs_mu_) = false;
812 
813   grpc_core::Mutex load_report_mu_;
814   grpc_core::CondVar* load_report_cond_ ABSL_GUARDED_BY(load_report_mu_) =
815       nullptr;
816   std::deque<std::vector<ClientStats>> result_queue_
817       ABSL_GUARDED_BY(load_report_mu_);
818 };
819 
820 }  // namespace testing
821 }  // namespace grpc
822 
823 #endif  // GRPC_TEST_CPP_END2END_XDS_XDS_SERVER_H
824