1 // 2 // Copyright 2017 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_TEST_CPP_END2END_XDS_XDS_SERVER_H 18 #define GRPC_TEST_CPP_END2END_XDS_XDS_SERVER_H 19 20 #include <deque> 21 #include <set> 22 #include <string> 23 #include <thread> 24 #include <vector> 25 26 #include "absl/types/optional.h" 27 28 #include <grpc/support/log.h> 29 #include <grpcpp/support/status.h> 30 31 #include "src/core/lib/address_utils/parse_address.h" 32 #include "src/core/lib/gprpp/crash.h" 33 #include "src/core/lib/gprpp/sync.h" 34 #include "src/proto/grpc/testing/xds/v3/ads.grpc.pb.h" 35 #include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h" 36 #include "src/proto/grpc/testing/xds/v3/discovery.grpc.pb.h" 37 #include "src/proto/grpc/testing/xds/v3/discovery.pb.h" 38 #include "src/proto/grpc/testing/xds/v3/endpoint.grpc.pb.h" 39 #include "src/proto/grpc/testing/xds/v3/listener.grpc.pb.h" 40 #include "src/proto/grpc/testing/xds/v3/lrs.grpc.pb.h" 41 #include "src/proto/grpc/testing/xds/v3/route.grpc.pb.h" 42 #include "test/core/util/test_config.h" 43 #include "test/cpp/end2end/counted_service.h" 44 45 namespace grpc { 46 namespace testing { 47 48 constexpr char kLdsTypeUrl[] = 49 "type.googleapis.com/envoy.config.listener.v3.Listener"; 50 constexpr char kRdsTypeUrl[] = 51 "type.googleapis.com/envoy.config.route.v3.RouteConfiguration"; 52 constexpr char kCdsTypeUrl[] = 53 "type.googleapis.com/envoy.config.cluster.v3.Cluster"; 54 constexpr char kEdsTypeUrl[] = 55 "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"; 56 57 // An ADS service implementation. 58 class AdsServiceImpl 59 : public CountedService< 60 ::envoy::service::discovery::v3::AggregatedDiscoveryService::Service>, 61 public std::enable_shared_from_this<AdsServiceImpl> { 62 public: 63 using DiscoveryRequest = ::envoy::service::discovery::v3::DiscoveryRequest; 64 using DiscoveryResponse = ::envoy::service::discovery::v3::DiscoveryResponse; 65 66 // State for a given xDS resource type. 67 struct ResponseState { 68 enum State { 69 ACKED, // ACK received. 70 NACKED, // NACK received; error_message will contain the error. 71 }; 72 State state = ACKED; 73 std::string error_message; 74 }; 75 76 explicit AdsServiceImpl( 77 std::function<void(const DiscoveryRequest& request)> check_first_request = 78 nullptr, 79 std::function<void(absl::StatusCode)> check_nack_status_code = nullptr, 80 absl::string_view debug_label = "") check_first_request_(std::move (check_first_request))81 : check_first_request_(std::move(check_first_request)), 82 check_nack_status_code_(std::move(check_nack_status_code)), 83 debug_label_(absl::StrFormat( 84 "%p%s%s", this, debug_label.empty() ? "" : ":", debug_label)) {} 85 set_wrap_resources(bool wrap_resources)86 void set_wrap_resources(bool wrap_resources) { 87 grpc_core::MutexLock lock(&ads_mu_); 88 wrap_resources_ = wrap_resources; 89 } 90 91 // Sets a resource to a particular value, overwriting any previous value. 92 void SetResource(google::protobuf::Any resource, const std::string& type_url, 93 const std::string& name); 94 95 // Removes a resource from the server's state. 96 void UnsetResource(const std::string& type_url, const std::string& name); 97 SetLdsResource(const::envoy::config::listener::v3::Listener & listener)98 void SetLdsResource(const ::envoy::config::listener::v3::Listener& listener) { 99 google::protobuf::Any resource; 100 resource.PackFrom(listener); 101 SetResource(std::move(resource), kLdsTypeUrl, listener.name()); 102 } 103 SetRdsResource(const::envoy::config::route::v3::RouteConfiguration & route)104 void SetRdsResource( 105 const ::envoy::config::route::v3::RouteConfiguration& route) { 106 google::protobuf::Any resource; 107 resource.PackFrom(route); 108 SetResource(std::move(resource), kRdsTypeUrl, route.name()); 109 } 110 SetCdsResource(const::envoy::config::cluster::v3::Cluster & cluster)111 void SetCdsResource(const ::envoy::config::cluster::v3::Cluster& cluster) { 112 google::protobuf::Any resource; 113 resource.PackFrom(cluster); 114 SetResource(std::move(resource), kCdsTypeUrl, cluster.name()); 115 } 116 SetEdsResource(const::envoy::config::endpoint::v3::ClusterLoadAssignment & assignment)117 void SetEdsResource( 118 const ::envoy::config::endpoint::v3::ClusterLoadAssignment& assignment) { 119 google::protobuf::Any resource; 120 resource.PackFrom(assignment); 121 SetResource(std::move(resource), kEdsTypeUrl, assignment.cluster_name()); 122 } 123 124 // Tells the server to ignore requests from the client for a given 125 // resource type. IgnoreResourceType(const std::string & type_url)126 void IgnoreResourceType(const std::string& type_url) { 127 grpc_core::MutexLock lock(&ads_mu_); 128 resource_types_to_ignore_.emplace(type_url); 129 } 130 131 // Sets a callback to be invoked on request messages with respoonse_nonce 132 // set. The callback is passed the resource type and version. SetCheckVersionCallback(std::function<void (absl::string_view,int)> check_version_callack)133 void SetCheckVersionCallback( 134 std::function<void(absl::string_view, int)> check_version_callack) { 135 grpc_core::MutexLock lock(&ads_mu_); 136 check_version_callack_ = std::move(check_version_callack); 137 } 138 139 // Get the list of response state for each resource type. 140 // TODO(roth): Consider adding an absl::Notification-based mechanism 141 // here to avoid the need for tests to poll the response state. GetResponseState(const std::string & type_url)142 absl::optional<ResponseState> GetResponseState(const std::string& type_url) { 143 grpc_core::MutexLock lock(&ads_mu_); 144 if (resource_type_response_state_[type_url].empty()) { 145 return absl::nullopt; 146 } 147 auto response = resource_type_response_state_[type_url].front(); 148 resource_type_response_state_[type_url].pop_front(); 149 return response; 150 } lds_response_state()151 absl::optional<ResponseState> lds_response_state() { 152 return GetResponseState(kLdsTypeUrl); 153 } rds_response_state()154 absl::optional<ResponseState> rds_response_state() { 155 return GetResponseState(kRdsTypeUrl); 156 } cds_response_state()157 absl::optional<ResponseState> cds_response_state() { 158 return GetResponseState(kCdsTypeUrl); 159 } eds_response_state()160 absl::optional<ResponseState> eds_response_state() { 161 return GetResponseState(kEdsTypeUrl); 162 } 163 164 // Starts the service. 165 void Start(); 166 167 // Shuts down the service. 168 void Shutdown(); 169 170 // Returns the peer names of clients currently connected to the service. clients()171 std::set<std::string> clients() { 172 grpc_core::MutexLock lock(&clients_mu_); 173 return clients_; 174 } 175 ForceADSFailure(Status status)176 void ForceADSFailure(Status status) { 177 grpc_core::MutexLock lock(&ads_mu_); 178 forced_ads_failure_ = std::move(status); 179 } 180 ClearADSFailure()181 void ClearADSFailure() { 182 grpc_core::MutexLock lock(&ads_mu_); 183 forced_ads_failure_ = absl::nullopt; 184 } 185 186 private: 187 // A queue of resource type/name pairs that have changed since the client 188 // subscribed to them. 189 using UpdateQueue = std::deque< 190 std::pair<std::string /* type url */, std::string /* resource name */>>; 191 192 // A struct representing a client's subscription to a particular resource. 193 struct SubscriptionState { 194 // The queue upon which to place updates when the resource is updated. 195 UpdateQueue* update_queue; 196 }; 197 198 // A struct representing the a client's subscription to all the resources. 199 using SubscriptionNameMap = 200 std::map<std::string /* resource_name */, SubscriptionState>; 201 using SubscriptionMap = 202 std::map<std::string /* type_url */, SubscriptionNameMap>; 203 204 // Sent state for a given resource type. 205 struct SentState { 206 int nonce = 0; 207 int resource_type_version = 0; 208 }; 209 210 // A struct representing the current state for an individual resource. 211 struct ResourceState { 212 // The resource itself, if present. 213 absl::optional<google::protobuf::Any> resource; 214 // The resource type version that this resource was last updated in. 215 int resource_type_version = 0; 216 // A list of subscriptions to this resource. 217 std::set<SubscriptionState*> subscriptions; 218 }; 219 220 // The current state for all individual resources of a given type. 221 using ResourceNameMap = 222 std::map<std::string /* resource_name */, ResourceState>; 223 224 struct ResourceTypeState { 225 int resource_type_version = 0; 226 ResourceNameMap resource_name_map; 227 }; 228 229 using ResourceMap = std::map<std::string /* type_url */, ResourceTypeState>; 230 231 using Stream = ServerReaderWriter<DiscoveryResponse, DiscoveryRequest>; 232 StreamAggregatedResources(ServerContext * context,Stream * stream)233 Status StreamAggregatedResources(ServerContext* context, 234 Stream* stream) override { 235 gpr_log(GPR_INFO, "ADS[%s]: StreamAggregatedResources starts", 236 debug_label_.c_str()); 237 { 238 grpc_core::MutexLock lock(&ads_mu_); 239 if (forced_ads_failure_.has_value()) { 240 gpr_log(GPR_INFO, 241 "ADS[%s]: StreamAggregatedResources forcing early failure " 242 "with status code: %d, message: %s", 243 debug_label_.c_str(), forced_ads_failure_.value().error_code(), 244 forced_ads_failure_.value().error_message().c_str()); 245 return forced_ads_failure_.value(); 246 } 247 } 248 AddClient(context->peer()); 249 // Take a reference of the AdsServiceImpl object, which will go 250 // out of scope when this request handler returns. This ensures 251 // that the parent won't be destroyed until this stream is complete. 252 std::shared_ptr<AdsServiceImpl> ads_service_impl = shared_from_this(); 253 // Resources (type/name pairs) that have changed since the client 254 // subscribed to them. 255 UpdateQueue update_queue; 256 // Resources that the client will be subscribed to keyed by resource type 257 // url. 258 SubscriptionMap subscription_map; 259 // Sent state for each resource type. 260 std::map<std::string /*type_url*/, SentState> sent_state_map; 261 // Spawn a thread to read requests from the stream. 262 // Requests will be delivered to this thread in a queue. 263 std::deque<DiscoveryRequest> requests; 264 bool stream_closed = false; 265 std::thread reader(std::bind(&AdsServiceImpl::BlockingRead, this, stream, 266 &requests, &stream_closed)); 267 // Main loop to process requests and updates. 268 while (true) { 269 // Boolean to keep track if the loop received any work to do: a 270 // request or an update; regardless whether a response was actually 271 // sent out. 272 bool did_work = false; 273 // Look for new requests and decide what to handle. 274 absl::optional<DiscoveryResponse> response; 275 { 276 grpc_core::MutexLock lock(&ads_mu_); 277 // If the stream has been closed or our parent is being shut 278 // down, stop immediately. 279 if (stream_closed || ads_done_) break; 280 // Otherwise, see if there's a request to read from the queue. 281 if (!requests.empty()) { 282 DiscoveryRequest request = std::move(requests.front()); 283 requests.pop_front(); 284 did_work = true; 285 gpr_log(GPR_INFO, 286 "ADS[%s]: Received request for type %s with content %s", 287 debug_label_.c_str(), request.type_url().c_str(), 288 request.DebugString().c_str()); 289 SentState& sent_state = sent_state_map[request.type_url()]; 290 // Process request. 291 ProcessRequest(request, &update_queue, &subscription_map, &sent_state, 292 &response); 293 } 294 } 295 if (response.has_value()) { 296 gpr_log(GPR_INFO, "ADS[%s]: Sending response: %s", debug_label_.c_str(), 297 response->DebugString().c_str()); 298 stream->Write(response.value()); 299 } 300 response.reset(); 301 // Look for updates and decide what to handle. 302 { 303 grpc_core::MutexLock lock(&ads_mu_); 304 if (!update_queue.empty()) { 305 const std::string resource_type = 306 std::move(update_queue.front().first); 307 const std::string resource_name = 308 std::move(update_queue.front().second); 309 update_queue.pop_front(); 310 did_work = true; 311 SentState& sent_state = sent_state_map[resource_type]; 312 ProcessUpdate(resource_type, resource_name, &subscription_map, 313 &sent_state, &response); 314 } 315 } 316 if (response.has_value()) { 317 gpr_log(GPR_INFO, "ADS[%s]: Sending update response: %s", 318 debug_label_.c_str(), response->DebugString().c_str()); 319 stream->Write(response.value()); 320 } 321 { 322 grpc_core::MutexLock lock(&ads_mu_); 323 if (ads_done_) { 324 break; 325 } 326 } 327 // If we didn't find anything to do, delay before the next loop 328 // iteration; otherwise, check whether we should exit and then 329 // immediately continue. 330 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(did_work ? 0 : 10)); 331 } 332 // Done with main loop. Clean up before returning. 333 // Join reader thread. 334 reader.join(); 335 // Clean up any subscriptions that were still active when the call 336 // finished. 337 { 338 grpc_core::MutexLock lock(&ads_mu_); 339 for (auto& p : subscription_map) { 340 const std::string& type_url = p.first; 341 SubscriptionNameMap& subscription_name_map = p.second; 342 for (auto& q : subscription_name_map) { 343 const std::string& resource_name = q.first; 344 SubscriptionState& subscription_state = q.second; 345 ResourceNameMap& resource_name_map = 346 resource_map_[type_url].resource_name_map; 347 ResourceState& resource_state = resource_name_map[resource_name]; 348 resource_state.subscriptions.erase(&subscription_state); 349 } 350 } 351 } 352 gpr_log(GPR_INFO, "ADS[%s]: StreamAggregatedResources done", 353 debug_label_.c_str()); 354 RemoveClient(context->peer()); 355 return Status::OK; 356 } 357 358 // Processes a response read from the client. 359 // Populates response if needed. ProcessRequest(const DiscoveryRequest & request,UpdateQueue * update_queue,SubscriptionMap * subscription_map,SentState * sent_state,absl::optional<DiscoveryResponse> * response)360 void ProcessRequest(const DiscoveryRequest& request, 361 UpdateQueue* update_queue, 362 SubscriptionMap* subscription_map, SentState* sent_state, 363 absl::optional<DiscoveryResponse>* response) 364 ABSL_EXCLUSIVE_LOCKS_REQUIRED(ads_mu_) { 365 // Check the nonce sent by the client, if any. 366 // (This will be absent on the first request on a stream.) 367 if (request.response_nonce().empty()) { 368 int client_resource_type_version = 0; 369 if (!request.version_info().empty()) { 370 GPR_ASSERT(absl::SimpleAtoi(request.version_info(), 371 &client_resource_type_version)); 372 } 373 if (check_version_callack_ != nullptr) { 374 check_version_callack_(request.type_url(), 375 client_resource_type_version); 376 } 377 } else { 378 int client_nonce; 379 GPR_ASSERT(absl::SimpleAtoi(request.response_nonce(), &client_nonce)); 380 // Check for ACK or NACK. 381 ResponseState response_state; 382 if (!request.has_error_detail()) { 383 response_state.state = ResponseState::ACKED; 384 gpr_log(GPR_INFO, "ADS[%s]: client ACKed resource_type=%s version=%s", 385 debug_label_.c_str(), request.type_url().c_str(), 386 request.version_info().c_str()); 387 } else { 388 response_state.state = ResponseState::NACKED; 389 if (check_nack_status_code_ != nullptr) { 390 check_nack_status_code_( 391 static_cast<absl::StatusCode>(request.error_detail().code())); 392 } 393 response_state.error_message = request.error_detail().message(); 394 gpr_log(GPR_INFO, 395 "ADS[%s]: client NACKed resource_type=%s version=%s: %s", 396 debug_label_.c_str(), request.type_url().c_str(), 397 request.version_info().c_str(), 398 response_state.error_message.c_str()); 399 } 400 resource_type_response_state_[request.type_url()].emplace_back( 401 std::move(response_state)); 402 // Ignore requests with stale nonces. 403 if (client_nonce < sent_state->nonce) return; 404 } 405 // Ignore resource types as requested by tests. 406 if (resource_types_to_ignore_.find(request.type_url()) != 407 resource_types_to_ignore_.end()) { 408 return; 409 } 410 // Look at all the resource names in the request. 411 auto& subscription_name_map = (*subscription_map)[request.type_url()]; 412 auto& resource_type_state = resource_map_[request.type_url()]; 413 auto& resource_name_map = resource_type_state.resource_name_map; 414 std::set<std::string> resources_in_current_request; 415 std::set<std::string> resources_added_to_response; 416 for (const std::string& resource_name : request.resource_names()) { 417 resources_in_current_request.emplace(resource_name); 418 auto& subscription_state = subscription_name_map[resource_name]; 419 auto& resource_state = resource_name_map[resource_name]; 420 // Subscribe if needed. 421 // Send the resource in the response if either (a) this is 422 // a new subscription or (b) there is an updated version of 423 // this resource to send. 424 if (MaybeSubscribe(request.type_url(), resource_name, &subscription_state, 425 &resource_state, update_queue) || 426 ClientNeedsResourceUpdate(resource_type_state, resource_state, 427 sent_state->resource_type_version)) { 428 gpr_log(GPR_INFO, "ADS[%s]: Sending update for type=%s name=%s", 429 debug_label_.c_str(), request.type_url().c_str(), 430 resource_name.c_str()); 431 resources_added_to_response.emplace(resource_name); 432 if (!response->has_value()) response->emplace(); 433 if (resource_state.resource.has_value()) { 434 auto* resource = (*response)->add_resources(); 435 resource->CopyFrom(resource_state.resource.value()); 436 if (wrap_resources_) { 437 envoy::service::discovery::v3::Resource resource_wrapper; 438 *resource_wrapper.mutable_resource() = std::move(*resource); 439 resource->PackFrom(resource_wrapper); 440 } 441 } 442 } else { 443 gpr_log(GPR_INFO, 444 "ADS[%s]: client does not need update for type=%s name=%s", 445 debug_label_.c_str(), request.type_url().c_str(), 446 resource_name.c_str()); 447 } 448 } 449 // Process unsubscriptions for any resource no longer 450 // present in the request's resource list. 451 ProcessUnsubscriptions(request.type_url(), resources_in_current_request, 452 &subscription_name_map, &resource_name_map); 453 // Construct response if needed. 454 if (!resources_added_to_response.empty()) { 455 CompleteBuildingDiscoveryResponse( 456 request.type_url(), resource_type_state.resource_type_version, 457 subscription_name_map, resources_added_to_response, sent_state, 458 &response->value()); 459 } 460 } 461 462 // Processes a resource update from the test. 463 // Populates response if needed. ProcessUpdate(const std::string & resource_type,const std::string & resource_name,SubscriptionMap * subscription_map,SentState * sent_state,absl::optional<DiscoveryResponse> * response)464 void ProcessUpdate(const std::string& resource_type, 465 const std::string& resource_name, 466 SubscriptionMap* subscription_map, SentState* sent_state, 467 absl::optional<DiscoveryResponse>* response) 468 ABSL_EXCLUSIVE_LOCKS_REQUIRED(ads_mu_) { 469 gpr_log(GPR_INFO, "ADS[%s]: Received update for type=%s name=%s", 470 debug_label_.c_str(), resource_type.c_str(), resource_name.c_str()); 471 auto& subscription_name_map = (*subscription_map)[resource_type]; 472 auto& resource_type_state = resource_map_[resource_type]; 473 auto& resource_name_map = resource_type_state.resource_name_map; 474 auto it = subscription_name_map.find(resource_name); 475 if (it != subscription_name_map.end()) { 476 ResourceState& resource_state = resource_name_map[resource_name]; 477 if (ClientNeedsResourceUpdate(resource_type_state, resource_state, 478 sent_state->resource_type_version)) { 479 gpr_log(GPR_INFO, "ADS[%s]: Sending update for type=%s name=%s", 480 debug_label_.c_str(), resource_type.c_str(), 481 resource_name.c_str()); 482 response->emplace(); 483 if (resource_state.resource.has_value()) { 484 auto* resource = (*response)->add_resources(); 485 resource->CopyFrom(resource_state.resource.value()); 486 } 487 CompleteBuildingDiscoveryResponse( 488 resource_type, resource_type_state.resource_type_version, 489 subscription_name_map, {resource_name}, sent_state, 490 &response->value()); 491 } 492 } 493 } 494 495 // Starting a thread to do blocking read on the stream until cancel. BlockingRead(Stream * stream,std::deque<DiscoveryRequest> * requests,bool * stream_closed)496 void BlockingRead(Stream* stream, std::deque<DiscoveryRequest>* requests, 497 bool* stream_closed) { 498 DiscoveryRequest request; 499 bool seen_first_request = false; 500 while (stream->Read(&request)) { 501 if (!seen_first_request) { 502 if (check_first_request_ != nullptr) { 503 check_first_request_(request); 504 } 505 seen_first_request = true; 506 } 507 { 508 grpc_core::MutexLock lock(&ads_mu_); 509 requests->emplace_back(std::move(request)); 510 } 511 } 512 gpr_log(GPR_INFO, "ADS[%s]: Null read, stream closed", 513 debug_label_.c_str()); 514 grpc_core::MutexLock lock(&ads_mu_); 515 *stream_closed = true; 516 } 517 518 // Completing the building a DiscoveryResponse by adding common information 519 // for all resources and by adding all subscribed resources for LDS and CDS. CompleteBuildingDiscoveryResponse(const std::string & resource_type,const int version,const SubscriptionNameMap & subscription_name_map,const std::set<std::string> & resources_added_to_response,SentState * sent_state,DiscoveryResponse * response)520 void CompleteBuildingDiscoveryResponse( 521 const std::string& resource_type, const int version, 522 const SubscriptionNameMap& subscription_name_map, 523 const std::set<std::string>& resources_added_to_response, 524 SentState* sent_state, DiscoveryResponse* response) 525 ABSL_EXCLUSIVE_LOCKS_REQUIRED(ads_mu_) { 526 response->set_type_url(resource_type); 527 response->set_version_info(std::to_string(version)); 528 response->set_nonce(std::to_string(++sent_state->nonce)); 529 if (resource_type == kLdsTypeUrl || resource_type == kCdsTypeUrl) { 530 // For LDS and CDS we must send back all subscribed resources 531 // (even the unchanged ones) 532 for (const auto& p : subscription_name_map) { 533 const std::string& resource_name = p.first; 534 if (resources_added_to_response.find(resource_name) == 535 resources_added_to_response.end()) { 536 ResourceNameMap& resource_name_map = 537 resource_map_[resource_type].resource_name_map; 538 const ResourceState& resource_state = 539 resource_name_map[resource_name]; 540 if (resource_state.resource.has_value()) { 541 auto* resource = response->add_resources(); 542 resource->CopyFrom(resource_state.resource.value()); 543 } 544 } 545 } 546 } 547 sent_state->resource_type_version = version; 548 } 549 550 // Checks whether the client needs to receive a newer version of 551 // the resource. 552 static bool ClientNeedsResourceUpdate( 553 const ResourceTypeState& resource_type_state, 554 const ResourceState& resource_state, int client_resource_type_version); 555 556 // Subscribes to a resource if not already subscribed: 557 // 1. Sets the update_queue field in subscription_state. 558 // 2. Adds subscription_state to resource_state->subscriptions. 559 bool MaybeSubscribe(const std::string& resource_type, 560 const std::string& resource_name, 561 SubscriptionState* subscription_state, 562 ResourceState* resource_state, UpdateQueue* update_queue); 563 564 // Removes subscriptions for resources no longer present in the 565 // current request. 566 void ProcessUnsubscriptions( 567 const std::string& resource_type, 568 const std::set<std::string>& resources_in_current_request, 569 SubscriptionNameMap* subscription_name_map, 570 ResourceNameMap* resource_name_map); 571 AddClient(const std::string & client)572 void AddClient(const std::string& client) { 573 grpc_core::MutexLock lock(&clients_mu_); 574 clients_.insert(client); 575 } 576 RemoveClient(const std::string & client)577 void RemoveClient(const std::string& client) { 578 grpc_core::MutexLock lock(&clients_mu_); 579 clients_.erase(client); 580 } 581 582 std::function<void(const DiscoveryRequest& request)> check_first_request_; 583 std::function<void(absl::StatusCode)> check_nack_status_code_; 584 std::string debug_label_; 585 586 grpc_core::CondVar ads_cond_; 587 grpc_core::Mutex ads_mu_; 588 bool ads_done_ ABSL_GUARDED_BY(ads_mu_) = false; 589 std::map<std::string /* type_url */, std::deque<ResponseState>> 590 resource_type_response_state_ ABSL_GUARDED_BY(ads_mu_); 591 std::set<std::string /*resource_type*/> resource_types_to_ignore_ 592 ABSL_GUARDED_BY(ads_mu_); 593 std::function<void(absl::string_view, int)> check_version_callack_ 594 ABSL_GUARDED_BY(ads_mu_); 595 // An instance data member containing the current state of all resources. 596 // Note that an entry will exist whenever either of the following is true: 597 // - The resource exists (i.e., has been created by SetResource() and has not 598 // yet been destroyed by UnsetResource()). 599 // - There is at least one subscription for the resource. 600 ResourceMap resource_map_ ABSL_GUARDED_BY(ads_mu_); 601 absl::optional<Status> forced_ads_failure_ ABSL_GUARDED_BY(ads_mu_); 602 bool wrap_resources_ ABSL_GUARDED_BY(ads_mu_) = false; 603 604 grpc_core::Mutex clients_mu_; 605 std::set<std::string> clients_ ABSL_GUARDED_BY(clients_mu_); 606 }; 607 608 // An LRS service implementation. 609 class LrsServiceImpl 610 : public CountedService< 611 ::envoy::service::load_stats::v3::LoadReportingService::Service>, 612 public std::enable_shared_from_this<LrsServiceImpl> { 613 public: 614 using LoadStatsRequest = ::envoy::service::load_stats::v3::LoadStatsRequest; 615 using LoadStatsResponse = ::envoy::service::load_stats::v3::LoadStatsResponse; 616 617 // Stats reported by client. 618 class ClientStats { 619 public: 620 // Stats for a given locality. 621 struct LocalityStats { 622 struct LoadMetric { 623 uint64_t num_requests_finished_with_metric; 624 double total_metric_value; 625 LoadMetric& operator+=(const LoadMetric& other) { 626 num_requests_finished_with_metric += 627 other.num_requests_finished_with_metric; 628 total_metric_value += other.total_metric_value; 629 return *this; 630 } 631 }; 632 LocalityStatsLocalityStats633 LocalityStats() {} 634 635 // Converts from proto message class. LocalityStatsLocalityStats636 explicit LocalityStats( 637 const ::envoy::config::endpoint::v3::UpstreamLocalityStats& 638 upstream_locality_stats) 639 : total_successful_requests( 640 upstream_locality_stats.total_successful_requests()), 641 total_requests_in_progress( 642 upstream_locality_stats.total_requests_in_progress()), 643 total_error_requests( 644 upstream_locality_stats.total_error_requests()), 645 total_issued_requests( 646 upstream_locality_stats.total_issued_requests()) { 647 for (const auto& s : upstream_locality_stats.load_metric_stats()) { 648 load_metrics[s.metric_name()] += LoadMetric{ 649 s.num_requests_finished_with_metric(), s.total_metric_value()}; 650 } 651 } 652 653 LocalityStats& operator+=(const LocalityStats& other) { 654 total_successful_requests += other.total_successful_requests; 655 total_requests_in_progress += other.total_requests_in_progress; 656 total_error_requests += other.total_error_requests; 657 total_issued_requests += other.total_issued_requests; 658 for (const auto& p : other.load_metrics) { 659 load_metrics[p.first] += p.second; 660 } 661 return *this; 662 } 663 664 uint64_t total_successful_requests = 0; 665 uint64_t total_requests_in_progress = 0; 666 uint64_t total_error_requests = 0; 667 uint64_t total_issued_requests = 0; 668 std::map<std::string, LoadMetric> load_metrics; 669 }; 670 ClientStats()671 ClientStats() {} 672 673 // Converts from proto message class. ClientStats(const::envoy::config::endpoint::v3::ClusterStats & cluster_stats)674 explicit ClientStats( 675 const ::envoy::config::endpoint::v3::ClusterStats& cluster_stats) 676 : cluster_name_(cluster_stats.cluster_name()), 677 eds_service_name_(cluster_stats.cluster_service_name()), 678 total_dropped_requests_(cluster_stats.total_dropped_requests()) { 679 for (const auto& input_locality_stats : 680 cluster_stats.upstream_locality_stats()) { 681 locality_stats_.emplace(input_locality_stats.locality().sub_zone(), 682 LocalityStats(input_locality_stats)); 683 } 684 for (const auto& input_dropped_requests : 685 cluster_stats.dropped_requests()) { 686 dropped_requests_.emplace(input_dropped_requests.category(), 687 input_dropped_requests.dropped_count()); 688 } 689 } 690 cluster_name()691 const std::string& cluster_name() const { return cluster_name_; } eds_service_name()692 const std::string& eds_service_name() const { return eds_service_name_; } 693 locality_stats()694 const std::map<std::string, LocalityStats>& locality_stats() const { 695 return locality_stats_; 696 } 697 698 uint64_t total_successful_requests() const; 699 uint64_t total_requests_in_progress() const; 700 uint64_t total_error_requests() const; 701 uint64_t total_issued_requests() const; 702 total_dropped_requests()703 uint64_t total_dropped_requests() const { return total_dropped_requests_; } 704 705 uint64_t dropped_requests(const std::string& category) const; 706 707 ClientStats& operator+=(const ClientStats& other); 708 709 private: 710 std::string cluster_name_; 711 std::string eds_service_name_; 712 std::map<std::string, LocalityStats> locality_stats_; 713 uint64_t total_dropped_requests_ = 0; 714 std::map<std::string, uint64_t> dropped_requests_; 715 }; 716 717 LrsServiceImpl(int client_load_reporting_interval_seconds, 718 std::set<std::string> cluster_names, 719 std::function<void()> stream_started_callback = nullptr, 720 std::function<void(const LoadStatsRequest& request)> 721 check_first_request = nullptr, 722 absl::string_view debug_label = "") client_load_reporting_interval_seconds_(client_load_reporting_interval_seconds)723 : client_load_reporting_interval_seconds_( 724 client_load_reporting_interval_seconds), 725 cluster_names_(std::move(cluster_names)), 726 stream_started_callback_(std::move(stream_started_callback)), 727 check_first_request_(std::move(check_first_request)), 728 debug_label_(absl::StrFormat( 729 "%p%s%s", this, debug_label.empty() ? "" : ":", debug_label)) {} 730 731 // Must be called before the LRS call is started. set_send_all_clusters(bool send_all_clusters)732 void set_send_all_clusters(bool send_all_clusters) { 733 send_all_clusters_ = send_all_clusters; 734 } set_cluster_names(const std::set<std::string> & cluster_names)735 void set_cluster_names(const std::set<std::string>& cluster_names) { 736 cluster_names_ = cluster_names; 737 } 738 739 void Start() ABSL_LOCKS_EXCLUDED(lrs_mu_, load_report_mu_); 740 741 void Shutdown(); 742 743 // Returns an empty vector if the timeout elapses with no load report. 744 // TODO(roth): Change the default here to a finite duration and verify 745 // that it doesn't cause failures in any existing tests. 746 std::vector<ClientStats> WaitForLoadReport( 747 absl::Duration timeout = absl::InfiniteDuration()); 748 749 private: 750 using Stream = ServerReaderWriter<LoadStatsResponse, LoadStatsRequest>; 751 StreamLoadStats(ServerContext *,Stream * stream)752 Status StreamLoadStats(ServerContext* /*context*/, Stream* stream) override { 753 gpr_log(GPR_INFO, "LRS[%s]: StreamLoadStats starts", debug_label_.c_str()); 754 if (stream_started_callback_ != nullptr) stream_started_callback_(); 755 // Take a reference of the LrsServiceImpl object, reference will go 756 // out of scope after this method exits. 757 std::shared_ptr<LrsServiceImpl> lrs_service_impl = shared_from_this(); 758 // Read initial request. 759 LoadStatsRequest request; 760 if (stream->Read(&request)) { 761 IncreaseRequestCount(); 762 if (check_first_request_ != nullptr) check_first_request_(request); 763 // Send initial response. 764 LoadStatsResponse response; 765 if (send_all_clusters_) { 766 response.set_send_all_clusters(true); 767 } else { 768 for (const std::string& cluster_name : cluster_names_) { 769 response.add_clusters(cluster_name); 770 } 771 } 772 response.mutable_load_reporting_interval()->set_seconds( 773 client_load_reporting_interval_seconds_ * 774 grpc_test_slowdown_factor()); 775 stream->Write(response); 776 IncreaseResponseCount(); 777 // Wait for report. 778 request.Clear(); 779 while (stream->Read(&request)) { 780 gpr_log(GPR_INFO, "LRS[%s]: received client load report message: %s", 781 debug_label_.c_str(), request.DebugString().c_str()); 782 std::vector<ClientStats> stats; 783 for (const auto& cluster_stats : request.cluster_stats()) { 784 stats.emplace_back(cluster_stats); 785 } 786 grpc_core::MutexLock lock(&load_report_mu_); 787 result_queue_.emplace_back(std::move(stats)); 788 if (load_report_cond_ != nullptr) { 789 load_report_cond_->Signal(); 790 } 791 } 792 // Wait until notified done. 793 grpc_core::MutexLock lock(&lrs_mu_); 794 while (!lrs_done_) { 795 lrs_cv_.Wait(&lrs_mu_); 796 } 797 } 798 gpr_log(GPR_INFO, "LRS[%s]: StreamLoadStats done", debug_label_.c_str()); 799 return Status::OK; 800 } 801 802 const int client_load_reporting_interval_seconds_; 803 bool send_all_clusters_ = false; 804 std::set<std::string> cluster_names_; 805 std::function<void()> stream_started_callback_; 806 std::function<void(const LoadStatsRequest& request)> check_first_request_; 807 std::string debug_label_; 808 809 grpc_core::CondVar lrs_cv_; 810 grpc_core::Mutex lrs_mu_; 811 bool lrs_done_ ABSL_GUARDED_BY(lrs_mu_) = false; 812 813 grpc_core::Mutex load_report_mu_; 814 grpc_core::CondVar* load_report_cond_ ABSL_GUARDED_BY(load_report_mu_) = 815 nullptr; 816 std::deque<std::vector<ClientStats>> result_queue_ 817 ABSL_GUARDED_BY(load_report_mu_); 818 }; 819 820 } // namespace testing 821 } // namespace grpc 822 823 #endif // GRPC_TEST_CPP_END2END_XDS_XDS_SERVER_H 824