xref: /aosp_15_r20/external/grpc-grpc/test/core/xds/xds_client_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 // TODO(roth): Add the following tests:
18 // - tests for DumpClientConfigBinary()
19 // - tests for load-reporting APIs?  (or maybe move those out of XdsClient?)
20 
21 #include "src/core/ext/xds/xds_client.h"
22 
23 #include <stdint.h>
24 
25 #include <algorithm>
26 #include <deque>
27 #include <fstream>
28 #include <map>
29 #include <memory>
30 #include <optional>
31 #include <string>
32 #include <vector>
33 
34 #include <google/protobuf/any.pb.h>
35 #include <google/protobuf/struct.pb.h>
36 
37 #include "absl/strings/str_cat.h"
38 #include "absl/time/time.h"
39 #include "absl/types/optional.h"
40 #include "absl/types/variant.h"
41 #include "gmock/gmock.h"
42 #include "gtest/gtest.h"
43 #include "upb/reflection/def.h"
44 
45 #include <grpc/grpc.h>
46 #include <grpc/support/json.h>
47 #include <grpc/support/log.h>
48 #include <grpcpp/impl/codegen/config_protobuf.h>
49 
50 #include "src/core/ext/xds/xds_bootstrap.h"
51 #include "src/core/ext/xds/xds_resource_type_impl.h"
52 #include "src/core/lib/event_engine/default_event_engine.h"
53 #include "src/core/lib/gprpp/debug_location.h"
54 #include "src/core/lib/gprpp/match.h"
55 #include "src/core/lib/gprpp/sync.h"
56 #include "src/core/lib/json/json.h"
57 #include "src/core/lib/json/json_args.h"
58 #include "src/core/lib/json/json_object_loader.h"
59 #include "src/core/lib/json/json_reader.h"
60 #include "src/core/lib/json/json_writer.h"
61 #include "src/proto/grpc/testing/xds/v3/base.pb.h"
62 #include "src/proto/grpc/testing/xds/v3/discovery.pb.h"
63 #include "test/core/util/scoped_env_var.h"
64 #include "test/core/util/test_config.h"
65 #include "test/core/xds/xds_client_test_peer.h"
66 #include "test/core/xds/xds_transport_fake.h"
67 
68 // IWYU pragma: no_include <google/protobuf/message.h>
69 // IWYU pragma: no_include <google/protobuf/stubs/status.h>
70 // IWYU pragma: no_include <google/protobuf/unknown_field_set.h>
71 // IWYU pragma: no_include <google/protobuf/util/json_util.h>
72 // IWYU pragma: no_include "google/protobuf/json/json.h"
73 // IWYU pragma: no_include "google/protobuf/util/json_util.h"
74 
75 using envoy::service::discovery::v3::DiscoveryRequest;
76 using envoy::service::discovery::v3::DiscoveryResponse;
77 
78 namespace grpc_core {
79 namespace testing {
80 namespace {
81 
82 constexpr absl::string_view kDefaultXdsServerUrl = "default_xds_server";
83 
84 class XdsClientTest : public ::testing::Test {
85  protected:
86   // A fake bootstrap implementation that allows tests to populate the
87   // fields however they want.
88   class FakeXdsBootstrap : public XdsBootstrap {
89    public:
90     class FakeNode : public Node {
91      public:
FakeNode()92       FakeNode() : id_("xds_client_test") {}
id() const93       const std::string& id() const override { return id_; }
cluster() const94       const std::string& cluster() const override { return cluster_; }
locality_region() const95       const std::string& locality_region() const override {
96         return locality_region_;
97       }
locality_zone() const98       const std::string& locality_zone() const override {
99         return locality_zone_;
100       }
locality_sub_zone() const101       const std::string& locality_sub_zone() const override {
102         return locality_sub_zone_;
103       }
metadata() const104       const Json::Object& metadata() const override { return metadata_; }
105 
set_id(std::string id)106       void set_id(std::string id) { id_ = std::move(id); }
set_cluster(std::string cluster)107       void set_cluster(std::string cluster) { cluster_ = std::move(cluster); }
set_locality_region(std::string locality_region)108       void set_locality_region(std::string locality_region) {
109         locality_region_ = std::move(locality_region);
110       }
set_locality_zone(std::string locality_zone)111       void set_locality_zone(std::string locality_zone) {
112         locality_zone_ = std::move(locality_zone);
113       }
set_locality_sub_zone(std::string locality_sub_zone)114       void set_locality_sub_zone(std::string locality_sub_zone) {
115         locality_sub_zone_ = std::move(locality_sub_zone);
116       }
set_metadata(Json::Object metadata)117       void set_metadata(Json::Object metadata) {
118         metadata_ = std::move(metadata);
119       }
120 
121      private:
122       std::string id_;
123       std::string cluster_;
124       std::string locality_region_;
125       std::string locality_zone_;
126       std::string locality_sub_zone_;
127       Json::Object metadata_;
128     };
129 
130     class FakeXdsServer : public XdsServer {
131      public:
FakeXdsServer(absl::string_view server_uri=kDefaultXdsServerUrl,bool ignore_resource_deletion=false)132       explicit FakeXdsServer(
133           absl::string_view server_uri = kDefaultXdsServerUrl,
134           bool ignore_resource_deletion = false)
135           : server_uri_(server_uri),
136             ignore_resource_deletion_(ignore_resource_deletion) {}
server_uri() const137       const std::string& server_uri() const override { return server_uri_; }
IgnoreResourceDeletion() const138       bool IgnoreResourceDeletion() const override {
139         return ignore_resource_deletion_;
140       }
Equals(const XdsServer & other) const141       bool Equals(const XdsServer& other) const override {
142         const auto& o = static_cast<const FakeXdsServer&>(other);
143         return server_uri_ == o.server_uri_ &&
144                ignore_resource_deletion_ == o.ignore_resource_deletion_;
145       }
Key() const146       std::string Key() const override {
147         return absl::StrCat(server_uri_, "#", ignore_resource_deletion_);
148       }
149 
150      private:
151       std::string server_uri_;
152       bool ignore_resource_deletion_ = false;
153     };
154 
155     class FakeAuthority : public Authority {
156      public:
servers() const157       std::vector<const XdsServer*> servers() const override {
158         if (server_.has_value()) {
159           return {&*server_};
160         } else {
161           return {};
162         };
163       }
164 
set_server(absl::optional<FakeXdsServer> server)165       void set_server(absl::optional<FakeXdsServer> server) {
166         server_ = std::move(server);
167       }
168 
169      private:
170       absl::optional<FakeXdsServer> server_;
171     };
172 
173     class Builder {
174      public:
Builder()175       Builder() { node_.emplace(); }
set_node_id(std::string id)176       Builder& set_node_id(std::string id) {
177         if (!node_.has_value()) node_.emplace();
178         node_->set_id(std::move(id));
179         return *this;
180       }
AddAuthority(std::string name,FakeAuthority authority)181       Builder& AddAuthority(std::string name, FakeAuthority authority) {
182         authorities_[std::move(name)] = std::move(authority);
183         return *this;
184       }
SetServers(absl::Span<const FakeXdsServer> servers)185       Builder& SetServers(absl::Span<const FakeXdsServer> servers) {
186         servers_.assign(servers.begin(), servers.end());
187         return *this;
188       }
Build()189       std::unique_ptr<XdsBootstrap> Build() {
190         auto bootstrap = std::make_unique<FakeXdsBootstrap>();
191         bootstrap->servers_ = std::move(servers_);
192         bootstrap->node_ = std::move(node_);
193         bootstrap->authorities_ = std::move(authorities_);
194         return bootstrap;
195       }
196 
197      private:
198       std::vector<FakeXdsServer> servers_ = {FakeXdsServer()};
199       absl::optional<FakeNode> node_;
200       std::map<std::string, FakeAuthority> authorities_;
201     };
202 
ToString() const203     std::string ToString() const override { return "<fake>"; }
204 
servers() const205     std::vector<const XdsServer*> servers() const override {
206       std::vector<const XdsServer*> result;
207       result.reserve(servers_.size());
208       for (size_t i = 0; i < servers_.size(); ++i) {
209         result.emplace_back(&servers_[i]);
210       }
211       return result;
212     }
213 
node() const214     const Node* node() const override {
215       return node_.has_value() ? &*node_ : nullptr;
216     }
LookupAuthority(const std::string & name) const217     const Authority* LookupAuthority(const std::string& name) const override {
218       auto it = authorities_.find(name);
219       if (it == authorities_.end()) return nullptr;
220       return &it->second;
221     }
222 
223    private:
224     std::vector<FakeXdsServer> servers_;
225     absl::optional<FakeNode> node_;
226     std::map<std::string, FakeAuthority> authorities_;
227   };
228 
229   // A template for a test xDS resource type with an associated watcher impl.
230   // For simplicity, we use JSON instead of proto for serialization.
231   //
232   // The specified ResourceStruct must provide the following:
233   // - a static JsonLoader() method, as described in json_object_loader.h
234   // - an AsJsonString() method that returns the object in JSON string form
235   // - a static TypeUrl() method that returns the resource type
236   //
237   // The all_resources_required_in_sotw parameter indicates the value
238   // that should be returned by the AllResourcesRequiredInSotW() method.
239   template <typename ResourceStruct, bool all_resources_required_in_sotw>
240   class XdsTestResourceType
241       : public XdsResourceTypeImpl<
242             XdsTestResourceType<ResourceStruct, all_resources_required_in_sotw>,
243             ResourceStruct> {
244    public:
245     struct ResourceAndReadDelayHandle {
246       std::shared_ptr<const ResourceStruct> resource;
247       RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle;
248     };
249 
250     // A watcher implementation that queues delivered watches.
251     class Watcher : public XdsResourceTypeImpl<
252                         XdsTestResourceType<ResourceStruct,
253                                             all_resources_required_in_sotw>,
254                         ResourceStruct>::WatcherInterface {
255      public:
~Watcher()256       ~Watcher() override {
257         MutexLock lock(&mu_);
258         EXPECT_THAT(queue_, ::testing::IsEmpty())
259             << this << " "
260             << Match(
261                    queue_[0],
262                    [&](const ResourceAndReadDelayHandle& resource) {
263                      return absl::StrFormat("Resource %s",
264                                             resource.resource->name);
265                    },
266                    [&](const absl::Status& status) {
267                      return status.ToString();
268                    },
269                    [&](const DoesNotExist& /* tag */) -> std::string {
270                      return "<Does not exist>";
271                    });
272       }
273 
274       // Returns true if no event is received during the timeout period.
ExpectNoEvent(absl::Duration timeout)275       bool ExpectNoEvent(absl::Duration timeout) {
276         MutexLock lock(&mu_);
277         return !WaitForEventLocked(timeout);
278       }
279 
HasEvent()280       bool HasEvent() {
281         MutexLock lock(&mu_);
282         return !queue_.empty();
283       }
284 
WaitForNextResourceAndHandle(absl::Duration timeout=absl::Seconds (1),SourceLocation location=SourceLocation ())285       absl::optional<ResourceAndReadDelayHandle> WaitForNextResourceAndHandle(
286           absl::Duration timeout = absl::Seconds(1),
287           SourceLocation location = SourceLocation()) {
288         MutexLock lock(&mu_);
289         if (!WaitForEventLocked(timeout)) return absl::nullopt;
290         Event& event = queue_.front();
291         if (!absl::holds_alternative<ResourceAndReadDelayHandle>(event)) {
292           EXPECT_TRUE(false)
293               << "got unexpected event "
294               << (absl::holds_alternative<absl::Status>(event)
295                       ? "error"
296                       : "does-not-exist")
297               << " at " << location.file() << ":" << location.line();
298           return absl::nullopt;
299         }
300         auto foo = std::move(absl::get<ResourceAndReadDelayHandle>(event));
301         queue_.pop_front();
302         return foo;
303       }
304 
WaitForNextResource(absl::Duration timeout=absl::Seconds (1),SourceLocation location=SourceLocation ())305       std::shared_ptr<const ResourceStruct> WaitForNextResource(
306           absl::Duration timeout = absl::Seconds(1),
307           SourceLocation location = SourceLocation()) {
308         auto resource_and_handle =
309             WaitForNextResourceAndHandle(timeout, location);
310         if (!resource_and_handle.has_value()) {
311           return nullptr;
312         }
313         return std::move(resource_and_handle->resource);
314       }
315 
WaitForNextError(absl::Duration timeout=absl::Seconds (1),SourceLocation location=SourceLocation ())316       absl::optional<absl::Status> WaitForNextError(
317           absl::Duration timeout = absl::Seconds(1),
318           SourceLocation location = SourceLocation()) {
319         MutexLock lock(&mu_);
320         if (!WaitForEventLocked(timeout)) return absl::nullopt;
321         Event& event = queue_.front();
322         if (!absl::holds_alternative<absl::Status>(event)) {
323           EXPECT_TRUE(false)
324               << "got unexpected event "
325               << (absl::holds_alternative<ResourceAndReadDelayHandle>(event)
326                       ? "resource"
327                       : "does-not-exist")
328               << " at " << location.file() << ":" << location.line();
329           return absl::nullopt;
330         }
331         absl::Status error = std::move(absl::get<absl::Status>(event));
332         queue_.pop_front();
333         return std::move(error);
334       }
335 
WaitForDoesNotExist(absl::Duration timeout,SourceLocation location=SourceLocation ())336       bool WaitForDoesNotExist(absl::Duration timeout,
337                                SourceLocation location = SourceLocation()) {
338         MutexLock lock(&mu_);
339         if (!WaitForEventLocked(timeout)) return false;
340         Event& event = queue_.front();
341         if (!absl::holds_alternative<DoesNotExist>(event)) {
342           EXPECT_TRUE(false)
343               << "got unexpected event "
344               << (absl::holds_alternative<absl::Status>(event) ? "error"
345                                                                : "resource")
346               << " at " << location.file() << ":" << location.line();
347           return false;
348         }
349         queue_.pop_front();
350         return true;
351       }
352 
353      private:
354       struct DoesNotExist {};
355       using Event =
356           absl::variant<ResourceAndReadDelayHandle, absl::Status, DoesNotExist>;
357 
OnResourceChanged(std::shared_ptr<const ResourceStruct> foo,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)358       void OnResourceChanged(std::shared_ptr<const ResourceStruct> foo,
359                              RefCountedPtr<XdsClient::ReadDelayHandle>
360                                  read_delay_handle) override {
361         MutexLock lock(&mu_);
362         ResourceAndReadDelayHandle event_details = {
363             std::move(foo), std::move(read_delay_handle)};
364         queue_.emplace_back(std::move(event_details));
365         cv_.Signal();
366       }
367 
OnError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle>)368       void OnError(
369           absl::Status status,
370           RefCountedPtr<XdsClient::ReadDelayHandle> /* read_delay_handle */)
371           override {
372         MutexLock lock(&mu_);
373         queue_.push_back(std::move(status));
374         cv_.Signal();
375       }
376 
OnResourceDoesNotExist(RefCountedPtr<XdsClient::ReadDelayHandle>)377       void OnResourceDoesNotExist(
378           RefCountedPtr<XdsClient::ReadDelayHandle> /* read_delay_handle */)
379           override {
380         MutexLock lock(&mu_);
381         queue_.push_back(DoesNotExist());
382         cv_.Signal();
383       }
384 
385       // Returns true if an event was received, or false if the timeout
386       // expires before any event is received.
WaitForEventLocked(absl::Duration timeout)387       bool WaitForEventLocked(absl::Duration timeout)
388           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) {
389         while (queue_.empty()) {
390           if (cv_.WaitWithTimeout(&mu_,
391                                   timeout * grpc_test_slowdown_factor())) {
392             return false;
393           }
394         }
395         return true;
396       }
397 
398       Mutex mu_;
399       CondVar cv_;
400       std::deque<Event> queue_ ABSL_GUARDED_BY(&mu_);
401     };
402 
type_url() const403     absl::string_view type_url() const override {
404       return ResourceStruct::TypeUrl();
405     }
Decode(const XdsResourceType::DecodeContext &,absl::string_view serialized_resource) const406     XdsResourceType::DecodeResult Decode(
407         const XdsResourceType::DecodeContext& /*context*/,
408         absl::string_view serialized_resource) const override {
409       auto json = JsonParse(serialized_resource);
410       XdsResourceType::DecodeResult result;
411       if (!json.ok()) {
412         result.resource = json.status();
413       } else {
414         absl::StatusOr<ResourceStruct> foo =
415             LoadFromJson<ResourceStruct>(*json);
416         if (!foo.ok()) {
417           auto it = json->object().find("name");
418           if (it != json->object().end()) {
419             result.name = it->second.string();
420           }
421           result.resource = foo.status();
422         } else {
423           result.name = foo->name;
424           result.resource = std::make_unique<ResourceStruct>(std::move(*foo));
425         }
426       }
427       return result;
428     }
AllResourcesRequiredInSotW() const429     bool AllResourcesRequiredInSotW() const override {
430       return all_resources_required_in_sotw;
431     }
InitUpbSymtab(XdsClient *,upb_DefPool *) const432     void InitUpbSymtab(XdsClient*, upb_DefPool* /*symtab*/) const override {}
433 
EncodeAsAny(const ResourceStruct & resource)434     static google::protobuf::Any EncodeAsAny(const ResourceStruct& resource) {
435       google::protobuf::Any any;
436       any.set_type_url(
437           absl::StrCat("type.googleapis.com/", ResourceStruct::TypeUrl()));
438       any.set_value(resource.AsJsonString());
439       return any;
440     }
441   };
442 
443   // A fake "Foo" xDS resource type.
444   struct XdsFooResource : public XdsResourceType::ResourceData {
445     std::string name;
446     uint32_t value;
447 
448     XdsFooResource() = default;
XdsFooResourcegrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsFooResource449     XdsFooResource(std::string name, uint32_t value)
450         : name(std::move(name)), value(value) {}
451 
operator ==grpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsFooResource452     bool operator==(const XdsFooResource& other) const {
453       return name == other.name && value == other.value;
454     }
455 
AsJsonStringgrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsFooResource456     std::string AsJsonString() const {
457       return absl::StrCat("{\"name\":\"", name, "\",\"value\":", value, "}");
458     }
459 
JsonLoadergrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsFooResource460     static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
461       static const auto* loader = JsonObjectLoader<XdsFooResource>()
462                                       .Field("name", &XdsFooResource::name)
463                                       .Field("value", &XdsFooResource::value)
464                                       .Finish();
465       return loader;
466     }
467 
TypeUrlgrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsFooResource468     static absl::string_view TypeUrl() { return "test.v3.foo"; }
469   };
470   using XdsFooResourceType = XdsTestResourceType<XdsFooResource, false>;
471 
472   // A fake "Bar" xDS resource type.
473   struct XdsBarResource : public XdsResourceType::ResourceData {
474     std::string name;
475     std::string value;
476 
477     XdsBarResource() = default;
XdsBarResourcegrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsBarResource478     XdsBarResource(std::string name, std::string value)
479         : name(std::move(name)), value(std::move(value)) {}
480 
operator ==grpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsBarResource481     bool operator==(const XdsBarResource& other) const {
482       return name == other.name && value == other.value;
483     }
484 
AsJsonStringgrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsBarResource485     std::string AsJsonString() const {
486       return absl::StrCat("{\"name\":\"", name, "\",\"value\":\"", value,
487                           "\"}");
488     }
489 
JsonLoadergrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsBarResource490     static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
491       static const auto* loader = JsonObjectLoader<XdsBarResource>()
492                                       .Field("name", &XdsBarResource::name)
493                                       .Field("value", &XdsBarResource::value)
494                                       .Finish();
495       return loader;
496     }
497 
TypeUrlgrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsBarResource498     static absl::string_view TypeUrl() { return "test.v3.bar"; }
499   };
500   using XdsBarResourceType = XdsTestResourceType<XdsBarResource, false>;
501 
502   // A fake "WildcardCapable" xDS resource type.
503   // This resource type return true for AllResourcesRequiredInSotW(),
504   // just like LDS and CDS.
505   struct XdsWildcardCapableResource : public XdsResourceType::ResourceData {
506     std::string name;
507     uint32_t value;
508 
509     XdsWildcardCapableResource() = default;
XdsWildcardCapableResourcegrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsWildcardCapableResource510     XdsWildcardCapableResource(std::string name, uint32_t value)
511         : name(std::move(name)), value(value) {}
512 
operator ==grpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsWildcardCapableResource513     bool operator==(const XdsWildcardCapableResource& other) const {
514       return name == other.name && value == other.value;
515     }
516 
AsJsonStringgrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsWildcardCapableResource517     std::string AsJsonString() const {
518       return absl::StrCat("{\"name\":\"", name, "\",\"value\":\"", value,
519                           "\"}");
520     }
521 
JsonLoadergrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsWildcardCapableResource522     static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
523       static const auto* loader =
524           JsonObjectLoader<XdsWildcardCapableResource>()
525               .Field("name", &XdsWildcardCapableResource::name)
526               .Field("value", &XdsWildcardCapableResource::value)
527               .Finish();
528       return loader;
529     }
530 
TypeUrlgrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsWildcardCapableResource531     static absl::string_view TypeUrl() { return "test.v3.wildcard_capable"; }
532   };
533   using XdsWildcardCapableResourceType =
534       XdsTestResourceType<XdsWildcardCapableResource,
535                           /*all_resources_required_in_sotw=*/true>;
536 
537   // A helper class to build and serialize a DiscoveryResponse.
538   class ResponseBuilder {
539    public:
ResponseBuilder(absl::string_view type_url)540     explicit ResponseBuilder(absl::string_view type_url) {
541       response_.set_type_url(absl::StrCat("type.googleapis.com/", type_url));
542     }
543 
set_version_info(absl::string_view version_info)544     ResponseBuilder& set_version_info(absl::string_view version_info) {
545       response_.set_version_info(std::string(version_info));
546       return *this;
547     }
set_nonce(absl::string_view nonce)548     ResponseBuilder& set_nonce(absl::string_view nonce) {
549       response_.set_nonce(std::string(nonce));
550       return *this;
551     }
552 
553     template <typename ResourceType>
AddResource(const typename ResourceType::ResourceType & resource,bool in_resource_wrapper=false)554     ResponseBuilder& AddResource(
555         const typename ResourceType::ResourceType& resource,
556         bool in_resource_wrapper = false) {
557       auto* res = response_.add_resources();
558       *res = ResourceType::EncodeAsAny(resource);
559       if (in_resource_wrapper) {
560         envoy::service::discovery::v3::Resource resource_wrapper;
561         resource_wrapper.set_name(resource.name);
562         *resource_wrapper.mutable_resource() = std::move(*res);
563         res->PackFrom(resource_wrapper);
564       }
565       return *this;
566     }
567 
AddFooResource(const XdsFooResource & resource,bool in_resource_wrapper=false)568     ResponseBuilder& AddFooResource(const XdsFooResource& resource,
569                                     bool in_resource_wrapper = false) {
570       return AddResource<XdsFooResourceType>(resource, in_resource_wrapper);
571     }
572 
AddBarResource(const XdsBarResource & resource,bool in_resource_wrapper=false)573     ResponseBuilder& AddBarResource(const XdsBarResource& resource,
574                                     bool in_resource_wrapper = false) {
575       return AddResource<XdsBarResourceType>(resource, in_resource_wrapper);
576     }
577 
AddWildcardCapableResource(const XdsWildcardCapableResource & resource,bool in_resource_wrapper=false)578     ResponseBuilder& AddWildcardCapableResource(
579         const XdsWildcardCapableResource& resource,
580         bool in_resource_wrapper = false) {
581       return AddResource<XdsWildcardCapableResourceType>(resource,
582                                                          in_resource_wrapper);
583     }
584 
AddInvalidResource(absl::string_view type_url,absl::string_view value,absl::string_view resource_wrapper_name="")585     ResponseBuilder& AddInvalidResource(
586         absl::string_view type_url, absl::string_view value,
587         absl::string_view resource_wrapper_name = "") {
588       auto* res = response_.add_resources();
589       res->set_type_url(absl::StrCat("type.googleapis.com/", type_url));
590       res->set_value(std::string(value));
591       if (!resource_wrapper_name.empty()) {
592         envoy::service::discovery::v3::Resource resource_wrapper;
593         resource_wrapper.set_name(std::string(resource_wrapper_name));
594         *resource_wrapper.mutable_resource() = std::move(*res);
595         res->PackFrom(resource_wrapper);
596       }
597       return *this;
598     }
599 
AddInvalidResourceWrapper()600     ResponseBuilder& AddInvalidResourceWrapper() {
601       auto* res = response_.add_resources();
602       res->set_type_url(
603           "type.googleapis.com/envoy.service.discovery.v3.Resource");
604       res->set_value(std::string("\0", 1));
605       return *this;
606     }
607 
AddEmptyResource()608     ResponseBuilder& AddEmptyResource() {
609       response_.add_resources();
610       return *this;
611     }
612 
Serialize()613     std::string Serialize() {
614       std::string serialized_response;
615       EXPECT_TRUE(response_.SerializeToString(&serialized_response));
616       return serialized_response;
617     }
618 
619    private:
620     DiscoveryResponse response_;
621   };
622 
623   class MetricsReporter : public XdsMetricsReporter {
624    public:
625     using ResourceUpdateMap = std::map<
626         std::pair<std::string /*xds_server*/, std::string /*resource_type*/>,
627         uint64_t>;
628 
resource_updates_valid() const629     const ResourceUpdateMap& resource_updates_valid() const {
630       return resource_updates_valid_;
631     }
resource_updates_invalid() const632     const ResourceUpdateMap& resource_updates_invalid() const {
633       return resource_updates_invalid_;
634     }
635 
636    private:
ReportResourceUpdates(absl::string_view xds_server,absl::string_view resource_type,uint64_t num_resources_valid,uint64_t num_resources_invalid)637     void ReportResourceUpdates(absl::string_view xds_server,
638                                absl::string_view resource_type,
639                                uint64_t num_resources_valid,
640                                uint64_t num_resources_invalid) override {
641       auto key =
642           std::make_pair(std::string(xds_server), std::string(resource_type));
643       if (num_resources_valid > 0) {
644         resource_updates_valid_[key] += num_resources_valid;
645       }
646       if (num_resources_invalid > 0) {
647         resource_updates_invalid_[key] += num_resources_invalid;
648       }
649     }
650 
651     ResourceUpdateMap resource_updates_valid_;
652     ResourceUpdateMap resource_updates_invalid_;
653   };
654 
655   using ResourceCounts =
656       std::vector<std::pair<XdsClientTestPeer::ResourceCountLabels, uint64_t>>;
GetResourceCounts()657   ResourceCounts GetResourceCounts() {
658     ResourceCounts resource_counts;
659     XdsClientTestPeer(xds_client_.get())
660         .TestReportResourceCounts(
661             [&](const XdsClientTestPeer::ResourceCountLabels& labels,
662                 uint64_t count) {
663               resource_counts.emplace_back(labels, count);
664             });
665     return resource_counts;
666   }
667 
668   using ServerConnectionMap = std::map<std::string, bool>;
GetServerConnections()669   ServerConnectionMap GetServerConnections() {
670     ServerConnectionMap server_connection_map;
671     XdsClientTestPeer(xds_client_.get())
672         .TestReportServerConnections(
673             [&](absl::string_view xds_server, bool connected) {
674               std::string server(xds_server);
675               EXPECT_EQ(server_connection_map.find(server),
676                         server_connection_map.end());
677               server_connection_map[std::move(server)] = connected;
678             });
679     return server_connection_map;
680   }
681 
682   // Sets transport_factory_ and initializes xds_client_ with the
683   // specified bootstrap config.
InitXdsClient(FakeXdsBootstrap::Builder bootstrap_builder=FakeXdsBootstrap::Builder (),Duration resource_request_timeout=Duration::Seconds (15))684   void InitXdsClient(
685       FakeXdsBootstrap::Builder bootstrap_builder = FakeXdsBootstrap::Builder(),
686       Duration resource_request_timeout = Duration::Seconds(15)) {
687     auto transport_factory = MakeOrphanable<FakeXdsTransportFactory>(
688         []() { FAIL() << "Multiple concurrent reads"; });
689     transport_factory_ =
690         transport_factory->Ref().TakeAsSubclass<FakeXdsTransportFactory>();
691     auto metrics_reporter = std::make_unique<MetricsReporter>();
692     metrics_reporter_ = metrics_reporter.get();
693     xds_client_ = MakeRefCounted<XdsClient>(
694         bootstrap_builder.Build(), std::move(transport_factory),
695         grpc_event_engine::experimental::GetDefaultEventEngine(),
696         std::move(metrics_reporter), "foo agent", "foo version",
697         resource_request_timeout * grpc_test_slowdown_factor());
698   }
699 
700   // Starts and cancels a watch for a Foo resource.
StartFooWatch(absl::string_view resource_name)701   RefCountedPtr<XdsFooResourceType::Watcher> StartFooWatch(
702       absl::string_view resource_name) {
703     auto watcher = MakeRefCounted<XdsFooResourceType::Watcher>();
704     XdsFooResourceType::StartWatch(xds_client_.get(), resource_name, watcher);
705     return watcher;
706   }
CancelFooWatch(XdsFooResourceType::Watcher * watcher,absl::string_view resource_name,bool delay_unsubscription=false)707   void CancelFooWatch(XdsFooResourceType::Watcher* watcher,
708                       absl::string_view resource_name,
709                       bool delay_unsubscription = false) {
710     XdsFooResourceType::CancelWatch(xds_client_.get(), resource_name, watcher,
711                                     delay_unsubscription);
712   }
713 
714   // Starts and cancels a watch for a Bar resource.
StartBarWatch(absl::string_view resource_name)715   RefCountedPtr<XdsBarResourceType::Watcher> StartBarWatch(
716       absl::string_view resource_name) {
717     auto watcher = MakeRefCounted<XdsBarResourceType::Watcher>();
718     XdsBarResourceType::StartWatch(xds_client_.get(), resource_name, watcher);
719     return watcher;
720   }
CancelBarWatch(XdsBarResourceType::Watcher * watcher,absl::string_view resource_name,bool delay_unsubscription=false)721   void CancelBarWatch(XdsBarResourceType::Watcher* watcher,
722                       absl::string_view resource_name,
723                       bool delay_unsubscription = false) {
724     XdsBarResourceType::CancelWatch(xds_client_.get(), resource_name, watcher,
725                                     delay_unsubscription);
726   }
727 
728   // Starts and cancels a watch for a WildcardCapable resource.
729   RefCountedPtr<XdsWildcardCapableResourceType::Watcher>
StartWildcardCapableWatch(absl::string_view resource_name)730   StartWildcardCapableWatch(absl::string_view resource_name) {
731     auto watcher = MakeRefCounted<XdsWildcardCapableResourceType::Watcher>();
732     XdsWildcardCapableResourceType::StartWatch(xds_client_.get(), resource_name,
733                                                watcher);
734     return watcher;
735   }
CancelWildcardCapableWatch(XdsWildcardCapableResourceType::Watcher * watcher,absl::string_view resource_name,bool delay_unsubscription=false)736   void CancelWildcardCapableWatch(
737       XdsWildcardCapableResourceType::Watcher* watcher,
738       absl::string_view resource_name, bool delay_unsubscription = false) {
739     XdsWildcardCapableResourceType::CancelWatch(
740         xds_client_.get(), resource_name, watcher, delay_unsubscription);
741   }
742 
WaitForAdsStream(const XdsBootstrap::XdsServer & xds_server,absl::Duration timeout=absl::Seconds (5))743   RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> WaitForAdsStream(
744       const XdsBootstrap::XdsServer& xds_server,
745       absl::Duration timeout = absl::Seconds(5)) {
746     return transport_factory_->WaitForStream(
747         xds_server, FakeXdsTransportFactory::kAdsMethod,
748         timeout * grpc_test_slowdown_factor());
749   }
750 
TriggerConnectionFailure(const XdsBootstrap::XdsServer & xds_server,absl::Status status)751   void TriggerConnectionFailure(const XdsBootstrap::XdsServer& xds_server,
752                                 absl::Status status) {
753     transport_factory_->TriggerConnectionFailure(xds_server, std::move(status));
754   }
755 
WaitForAdsStream(absl::Duration timeout=absl::Seconds (5))756   RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> WaitForAdsStream(
757       absl::Duration timeout = absl::Seconds(5)) {
758     return WaitForAdsStream(*xds_client_->bootstrap().servers().front(),
759                             timeout);
760   }
761 
762   // Gets the latest request sent to the fake xDS server.
WaitForRequest(FakeXdsTransportFactory::FakeStreamingCall * stream,absl::Duration timeout=absl::Seconds (3),SourceLocation location=SourceLocation ())763   absl::optional<DiscoveryRequest> WaitForRequest(
764       FakeXdsTransportFactory::FakeStreamingCall* stream,
765       absl::Duration timeout = absl::Seconds(3),
766       SourceLocation location = SourceLocation()) {
767     auto message =
768         stream->WaitForMessageFromClient(timeout * grpc_test_slowdown_factor());
769     if (!message.has_value()) return absl::nullopt;
770     DiscoveryRequest request;
771     bool success = request.ParseFromString(*message);
772     EXPECT_TRUE(success) << "Failed to deserialize DiscoveryRequest at "
773                          << location.file() << ":" << location.line();
774     if (!success) return absl::nullopt;
775     return std::move(request);
776   }
777 
778   // Helper function to check the fields of a DiscoveryRequest.
CheckRequest(const DiscoveryRequest & request,absl::string_view type_url,absl::string_view version_info,absl::string_view response_nonce,const absl::Status & error_detail,const std::set<absl::string_view> & resource_names,SourceLocation location=SourceLocation ())779   void CheckRequest(const DiscoveryRequest& request, absl::string_view type_url,
780                     absl::string_view version_info,
781                     absl::string_view response_nonce,
782                     const absl::Status& error_detail,
783                     const std::set<absl::string_view>& resource_names,
784                     SourceLocation location = SourceLocation()) {
785     EXPECT_EQ(request.type_url(),
786               absl::StrCat("type.googleapis.com/", type_url))
787         << location.file() << ":" << location.line();
788     EXPECT_EQ(request.version_info(), version_info)
789         << location.file() << ":" << location.line();
790     EXPECT_EQ(request.response_nonce(), response_nonce)
791         << location.file() << ":" << location.line();
792     if (error_detail.ok()) {
793       EXPECT_FALSE(request.has_error_detail())
794           << location.file() << ":" << location.line();
795     } else {
796       EXPECT_EQ(request.error_detail().code(),
797                 static_cast<int>(error_detail.code()))
798           << location.file() << ":" << location.line();
799       EXPECT_EQ(request.error_detail().message(), error_detail.message())
800           << location.file() << ":" << location.line();
801     }
802     EXPECT_THAT(request.resource_names(),
803                 ::testing::UnorderedElementsAreArray(resource_names))
804         << location.file() << ":" << location.line();
805   }
806 
807   // Helper function to check the contents of the node message in a
808   // request against the client's node info.
CheckRequestNode(const DiscoveryRequest & request,SourceLocation location=SourceLocation ())809   void CheckRequestNode(const DiscoveryRequest& request,
810                         SourceLocation location = SourceLocation()) {
811     // These fields come from the bootstrap config.
812     EXPECT_EQ(request.node().id(), xds_client_->bootstrap().node()->id())
813         << location.file() << ":" << location.line();
814     EXPECT_EQ(request.node().cluster(),
815               xds_client_->bootstrap().node()->cluster())
816         << location.file() << ":" << location.line();
817     EXPECT_EQ(request.node().locality().region(),
818               xds_client_->bootstrap().node()->locality_region())
819         << location.file() << ":" << location.line();
820     EXPECT_EQ(request.node().locality().zone(),
821               xds_client_->bootstrap().node()->locality_zone())
822         << location.file() << ":" << location.line();
823     EXPECT_EQ(request.node().locality().sub_zone(),
824               xds_client_->bootstrap().node()->locality_sub_zone())
825         << location.file() << ":" << location.line();
826     if (xds_client_->bootstrap().node()->metadata().empty()) {
827       EXPECT_FALSE(request.node().has_metadata())
828           << location.file() << ":" << location.line();
829     } else {
830       std::string metadata_json_str;
831       auto status =
832           MessageToJsonString(request.node().metadata(), &metadata_json_str,
833                               GRPC_CUSTOM_JSONUTIL::JsonPrintOptions());
834       ASSERT_TRUE(status.ok())
835           << status << " on " << location.file() << ":" << location.line();
836       auto metadata_json = JsonParse(metadata_json_str);
837       ASSERT_TRUE(metadata_json.ok())
838           << metadata_json.status() << " on " << location.file() << ":"
839           << location.line();
840       Json expected =
841           Json::FromObject(xds_client_->bootstrap().node()->metadata());
842       EXPECT_EQ(*metadata_json, expected)
843           << location.file() << ":" << location.line()
844           << ":\nexpected: " << JsonDump(expected)
845           << "\nactual: " << JsonDump(*metadata_json);
846     }
847     EXPECT_EQ(request.node().user_agent_name(), "foo agent")
848         << location.file() << ":" << location.line();
849     EXPECT_EQ(request.node().user_agent_version(), "foo version")
850         << location.file() << ":" << location.line();
851   }
852 
853   RefCountedPtr<FakeXdsTransportFactory> transport_factory_;
854   RefCountedPtr<XdsClient> xds_client_;
855   MetricsReporter* metrics_reporter_ = nullptr;
856 };
857 
858 MATCHER_P3(ResourceCountLabelsEq, xds_authority, resource_type, cache_state,
859            "equals ResourceCountLabels") {
860   bool ok = true;
861   ok &= ::testing::ExplainMatchResult(xds_authority, arg.xds_authority,
862                                       result_listener);
863   ok &= ::testing::ExplainMatchResult(resource_type, arg.resource_type,
864                                       result_listener);
865   ok &= ::testing::ExplainMatchResult(cache_state, arg.cache_state,
866                                       result_listener);
867   return ok;
868 }
869 
TEST_F(XdsClientTest,BasicWatch)870 TEST_F(XdsClientTest, BasicWatch) {
871   InitXdsClient();
872   // Metrics should initially be empty.
873   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
874               ::testing::ElementsAre());
875   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
876               ::testing::ElementsAre());
877   EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre());
878   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
879   // Start a watch for "foo1".
880   auto watcher = StartFooWatch("foo1");
881   // Check metrics.
882   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
883               ::testing::ElementsAre());
884   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
885               ::testing::ElementsAre());
886   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
887                                           kDefaultXdsServerUrl, true)));
888   EXPECT_THAT(GetResourceCounts(),
889               ::testing::ElementsAre(::testing::Pair(
890                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
891                                         XdsFooResourceType::Get()->type_url(),
892                                         "requested"),
893                   1)));
894   // Watcher should initially not see any resource reported.
895   EXPECT_FALSE(watcher->HasEvent());
896   // XdsClient should have created an ADS stream.
897   auto stream = WaitForAdsStream();
898   ASSERT_TRUE(stream != nullptr);
899   // XdsClient should have sent a subscription request on the ADS stream.
900   auto request = WaitForRequest(stream.get());
901   ASSERT_TRUE(request.has_value());
902   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
903                /*version_info=*/"", /*response_nonce=*/"",
904                /*error_detail=*/absl::OkStatus(),
905                /*resource_names=*/{"foo1"});
906   CheckRequestNode(*request);  // Should be present on the first request.
907   // Send a response.
908   stream->SendMessageToClient(
909       ResponseBuilder(XdsFooResourceType::Get()->type_url())
910           .set_version_info("1")
911           .set_nonce("A")
912           .AddFooResource(XdsFooResource("foo1", 6))
913           .Serialize());
914   // XdsClient should have delivered the response to the watcher.
915   auto resource = watcher->WaitForNextResource();
916   ASSERT_NE(resource, nullptr);
917   EXPECT_EQ(resource->name, "foo1");
918   EXPECT_EQ(resource->value, 6);
919   // Check metric data.
920   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
921               ::testing::ElementsAre(::testing::Pair(
922                   ::testing::Pair(kDefaultXdsServerUrl,
923                                   XdsFooResourceType::Get()->type_url()),
924                   1)));
925   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
926               ::testing::ElementsAre());
927   EXPECT_THAT(
928       GetResourceCounts(),
929       ::testing::ElementsAre(::testing::Pair(
930           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
931                                 XdsFooResourceType::Get()->type_url(), "acked"),
932           1)));
933   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
934                                           kDefaultXdsServerUrl, true)));
935   // XdsClient should have sent an ACK message to the xDS server.
936   request = WaitForRequest(stream.get());
937   ASSERT_TRUE(request.has_value());
938   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
939                /*version_info=*/"1", /*response_nonce=*/"A",
940                /*error_detail=*/absl::OkStatus(),
941                /*resource_names=*/{"foo1"});
942   // Cancel watch.
943   CancelFooWatch(watcher.get(), "foo1");
944   EXPECT_TRUE(stream->Orphaned());
945   // Check metric data.
946   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
947               ::testing::ElementsAre(::testing::Pair(
948                   ::testing::Pair(kDefaultXdsServerUrl,
949                                   XdsFooResourceType::Get()->type_url()),
950                   1)));
951   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
952               ::testing::ElementsAre());
953   EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre());
954   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
955 }
956 
TEST_F(XdsClientTest,UpdateFromServer)957 TEST_F(XdsClientTest, UpdateFromServer) {
958   InitXdsClient();
959   // Start a watch for "foo1".
960   auto watcher = StartFooWatch("foo1");
961   // Watcher should initially not see any resource reported.
962   EXPECT_FALSE(watcher->HasEvent());
963   // XdsClient should have created an ADS stream.
964   auto stream = WaitForAdsStream();
965   ASSERT_TRUE(stream != nullptr);
966   // XdsClient should have sent a subscription request on the ADS stream.
967   auto request = WaitForRequest(stream.get());
968   ASSERT_TRUE(request.has_value());
969   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
970                /*version_info=*/"", /*response_nonce=*/"",
971                /*error_detail=*/absl::OkStatus(),
972                /*resource_names=*/{"foo1"});
973   CheckRequestNode(*request);  // Should be present on the first request.
974   // Send a response.
975   stream->SendMessageToClient(
976       ResponseBuilder(XdsFooResourceType::Get()->type_url())
977           .set_version_info("1")
978           .set_nonce("A")
979           .AddFooResource(XdsFooResource("foo1", 6))
980           .Serialize());
981   // XdsClient should have delivered the response to the watcher.
982   auto resource = watcher->WaitForNextResource();
983   ASSERT_NE(resource, nullptr);
984   EXPECT_EQ(resource->name, "foo1");
985   EXPECT_EQ(resource->value, 6);
986   // Check metric data.
987   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
988               ::testing::ElementsAre(::testing::Pair(
989                   ::testing::Pair(kDefaultXdsServerUrl,
990                                   XdsFooResourceType::Get()->type_url()),
991                   1)));
992   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
993               ::testing::ElementsAre());
994   EXPECT_THAT(
995       GetResourceCounts(),
996       ::testing::ElementsAre(::testing::Pair(
997           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
998                                 XdsFooResourceType::Get()->type_url(), "acked"),
999           1)));
1000   // XdsClient should have sent an ACK message to the xDS server.
1001   request = WaitForRequest(stream.get());
1002   ASSERT_TRUE(request.has_value());
1003   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1004                /*version_info=*/"1", /*response_nonce=*/"A",
1005                /*error_detail=*/absl::OkStatus(),
1006                /*resource_names=*/{"foo1"});
1007   // Server sends an updated version of the resource.
1008   stream->SendMessageToClient(
1009       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1010           .set_version_info("2")
1011           .set_nonce("B")
1012           .AddFooResource(XdsFooResource("foo1", 9))
1013           .Serialize());
1014   // XdsClient should have delivered the response to the watcher.
1015   resource = watcher->WaitForNextResource();
1016   ASSERT_NE(resource, nullptr);
1017   EXPECT_EQ(resource->name, "foo1");
1018   EXPECT_EQ(resource->value, 9);
1019   // Check metric data.
1020   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1021               ::testing::ElementsAre(::testing::Pair(
1022                   ::testing::Pair(kDefaultXdsServerUrl,
1023                                   XdsFooResourceType::Get()->type_url()),
1024                   2)));
1025   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1026               ::testing::ElementsAre());
1027   EXPECT_THAT(
1028       GetResourceCounts(),
1029       ::testing::ElementsAre(::testing::Pair(
1030           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1031                                 XdsFooResourceType::Get()->type_url(), "acked"),
1032           1)));
1033   // XdsClient should have sent an ACK message to the xDS server.
1034   request = WaitForRequest(stream.get());
1035   ASSERT_TRUE(request.has_value());
1036   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1037                /*version_info=*/"2", /*response_nonce=*/"B",
1038                /*error_detail=*/absl::OkStatus(),
1039                /*resource_names=*/{"foo1"});
1040   // Cancel watch.
1041   CancelFooWatch(watcher.get(), "foo1");
1042   EXPECT_TRUE(stream->Orphaned());
1043 }
1044 
TEST_F(XdsClientTest,MultipleWatchersForSameResource)1045 TEST_F(XdsClientTest, MultipleWatchersForSameResource) {
1046   InitXdsClient();
1047   // Start a watch for "foo1".
1048   auto watcher = StartFooWatch("foo1");
1049   // Watcher should initially not see any resource reported.
1050   EXPECT_FALSE(watcher->HasEvent());
1051   // XdsClient should have created an ADS stream.
1052   auto stream = WaitForAdsStream();
1053   ASSERT_TRUE(stream != nullptr);
1054   // XdsClient should have sent a subscription request on the ADS stream.
1055   auto request = WaitForRequest(stream.get());
1056   ASSERT_TRUE(request.has_value());
1057   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1058                /*version_info=*/"", /*response_nonce=*/"",
1059                /*error_detail=*/absl::OkStatus(),
1060                /*resource_names=*/{"foo1"});
1061   CheckRequestNode(*request);  // Should be present on the first request.
1062   // Send a response.
1063   stream->SendMessageToClient(
1064       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1065           .set_version_info("1")
1066           .set_nonce("A")
1067           .AddFooResource(XdsFooResource("foo1", 6))
1068           .Serialize());
1069   // XdsClient should have delivered the response to the watcher.
1070   auto resource = watcher->WaitForNextResource();
1071   ASSERT_NE(resource, nullptr);
1072   EXPECT_EQ(resource->name, "foo1");
1073   EXPECT_EQ(resource->value, 6);
1074   // Check metric data.
1075   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1076               ::testing::ElementsAre(::testing::Pair(
1077                   ::testing::Pair(kDefaultXdsServerUrl,
1078                                   XdsFooResourceType::Get()->type_url()),
1079                   1)));
1080   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1081               ::testing::ElementsAre());
1082   EXPECT_THAT(
1083       GetResourceCounts(),
1084       ::testing::ElementsAre(::testing::Pair(
1085           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1086                                 XdsFooResourceType::Get()->type_url(), "acked"),
1087           1)));
1088   // XdsClient should have sent an ACK message to the xDS server.
1089   request = WaitForRequest(stream.get());
1090   ASSERT_TRUE(request.has_value());
1091   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1092                /*version_info=*/"1", /*response_nonce=*/"A",
1093                /*error_detail=*/absl::OkStatus(),
1094                /*resource_names=*/{"foo1"});
1095   // Start a second watcher for the same resource.
1096   auto watcher2 = StartFooWatch("foo1");
1097   // This watcher should get an immediate notification, because the
1098   // resource is already cached.
1099   resource = watcher2->WaitForNextResource();
1100   ASSERT_NE(resource, nullptr);
1101   EXPECT_EQ(resource->name, "foo1");
1102   EXPECT_EQ(resource->value, 6);
1103   // Server should not have seen another request from the client.
1104   ASSERT_FALSE(stream->HaveMessageFromClient());
1105   // Server sends an updated version of the resource.
1106   stream->SendMessageToClient(
1107       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1108           .set_version_info("2")
1109           .set_nonce("B")
1110           .AddFooResource(XdsFooResource("foo1", 9))
1111           .Serialize());
1112   // XdsClient should deliver the response to both watchers.
1113   resource = watcher->WaitForNextResource();
1114   ASSERT_NE(resource, nullptr);
1115   EXPECT_EQ(resource->name, "foo1");
1116   EXPECT_EQ(resource->value, 9);
1117   resource = watcher2->WaitForNextResource();
1118   ASSERT_NE(resource, nullptr);
1119   EXPECT_EQ(resource->name, "foo1");
1120   EXPECT_EQ(resource->value, 9);
1121   // Check metric data.
1122   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1123               ::testing::ElementsAre(::testing::Pair(
1124                   ::testing::Pair(kDefaultXdsServerUrl,
1125                                   XdsFooResourceType::Get()->type_url()),
1126                   2)));
1127   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1128               ::testing::ElementsAre());
1129   EXPECT_THAT(
1130       GetResourceCounts(),
1131       ::testing::ElementsAre(::testing::Pair(
1132           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1133                                 XdsFooResourceType::Get()->type_url(), "acked"),
1134           1)));
1135   // XdsClient should have sent an ACK message to the xDS server.
1136   request = WaitForRequest(stream.get());
1137   ASSERT_TRUE(request.has_value());
1138   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1139                /*version_info=*/"2", /*response_nonce=*/"B",
1140                /*error_detail=*/absl::OkStatus(),
1141                /*resource_names=*/{"foo1"});
1142   // Cancel one of the watchers.
1143   CancelFooWatch(watcher.get(), "foo1");
1144   // The server should not see any new request.
1145   ASSERT_FALSE(WaitForRequest(stream.get()));
1146   // Now cancel the second watcher.
1147   CancelFooWatch(watcher2.get(), "foo1");
1148   EXPECT_TRUE(stream->Orphaned());
1149 }
1150 
TEST_F(XdsClientTest,SubscribeToMultipleResources)1151 TEST_F(XdsClientTest, SubscribeToMultipleResources) {
1152   InitXdsClient();
1153   // Start a watch for "foo1".
1154   auto watcher = StartFooWatch("foo1");
1155   // Watcher should initially not see any resource reported.
1156   EXPECT_FALSE(watcher->HasEvent());
1157   // Check metrics.
1158   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1159               ::testing::ElementsAre());
1160   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1161               ::testing::ElementsAre());
1162   EXPECT_THAT(GetResourceCounts(),
1163               ::testing::ElementsAre(::testing::Pair(
1164                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1165                                         XdsFooResourceType::Get()->type_url(),
1166                                         "requested"),
1167                   1)));
1168   // XdsClient should have created an ADS stream.
1169   auto stream = WaitForAdsStream();
1170   ASSERT_TRUE(stream != nullptr);
1171   // XdsClient should have sent a subscription request on the ADS stream.
1172   auto request = WaitForRequest(stream.get());
1173   ASSERT_TRUE(request.has_value());
1174   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1175                /*version_info=*/"", /*response_nonce=*/"",
1176                /*error_detail=*/absl::OkStatus(),
1177                /*resource_names=*/{"foo1"});
1178   CheckRequestNode(*request);  // Should be present on the first request.
1179   // Send a response.
1180   stream->SendMessageToClient(
1181       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1182           .set_version_info("1")
1183           .set_nonce("A")
1184           .AddFooResource(XdsFooResource("foo1", 6))
1185           .Serialize());
1186   // XdsClient should have delivered the response to the watcher.
1187   auto resource = watcher->WaitForNextResource();
1188   ASSERT_NE(resource, nullptr);
1189   EXPECT_EQ(resource->name, "foo1");
1190   EXPECT_EQ(resource->value, 6);
1191   // Check metric data.
1192   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1193               ::testing::ElementsAre(::testing::Pair(
1194                   ::testing::Pair(kDefaultXdsServerUrl,
1195                                   XdsFooResourceType::Get()->type_url()),
1196                   1)));
1197   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1198               ::testing::ElementsAre());
1199   EXPECT_THAT(
1200       GetResourceCounts(),
1201       ::testing::ElementsAre(::testing::Pair(
1202           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1203                                 XdsFooResourceType::Get()->type_url(), "acked"),
1204           1)));
1205   // XdsClient should have sent an ACK message to the xDS server.
1206   request = WaitForRequest(stream.get());
1207   ASSERT_TRUE(request.has_value());
1208   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1209                /*version_info=*/"1", /*response_nonce=*/"A",
1210                /*error_detail=*/absl::OkStatus(),
1211                /*resource_names=*/{"foo1"});
1212   // Start a watch for "foo2".
1213   auto watcher2 = StartFooWatch("foo2");
1214   // Check metric data.
1215   EXPECT_THAT(
1216       GetResourceCounts(),
1217       ::testing::ElementsAre(
1218           ::testing::Pair(ResourceCountLabelsEq(
1219                               XdsClient::kOldStyleAuthority,
1220                               XdsFooResourceType::Get()->type_url(), "acked"),
1221                           1),
1222           ::testing::Pair(
1223               ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1224                                     XdsFooResourceType::Get()->type_url(),
1225                                     "requested"),
1226               1)));
1227   // XdsClient should have sent a subscription request on the ADS stream.
1228   request = WaitForRequest(stream.get());
1229   ASSERT_TRUE(request.has_value());
1230   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1231                /*version_info=*/"1", /*response_nonce=*/"A",
1232                /*error_detail=*/absl::OkStatus(),
1233                /*resource_names=*/{"foo1", "foo2"});
1234   // Send a response.
1235   stream->SendMessageToClient(
1236       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1237           .set_version_info("1")
1238           .set_nonce("B")
1239           .AddFooResource(XdsFooResource("foo2", 7))
1240           .Serialize());
1241   // XdsClient should have delivered the response to the watcher.
1242   resource = watcher2->WaitForNextResource();
1243   ASSERT_NE(resource, nullptr);
1244   EXPECT_EQ(resource->name, "foo2");
1245   EXPECT_EQ(resource->value, 7);
1246   // Check metric data.
1247   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1248               ::testing::ElementsAre(::testing::Pair(
1249                   ::testing::Pair(kDefaultXdsServerUrl,
1250                                   XdsFooResourceType::Get()->type_url()),
1251                   2)));
1252   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1253               ::testing::ElementsAre());
1254   EXPECT_THAT(
1255       GetResourceCounts(),
1256       ::testing::ElementsAre(::testing::Pair(
1257           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1258                                 XdsFooResourceType::Get()->type_url(), "acked"),
1259           2)));
1260   // XdsClient should have sent an ACK message to the xDS server.
1261   request = WaitForRequest(stream.get());
1262   ASSERT_TRUE(request.has_value());
1263   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1264                /*version_info=*/"1", /*response_nonce=*/"B",
1265                /*error_detail=*/absl::OkStatus(),
1266                /*resource_names=*/{"foo1", "foo2"});
1267   // Cancel watch for "foo1".
1268   CancelFooWatch(watcher.get(), "foo1");
1269   // Check metric data.
1270   EXPECT_THAT(
1271       GetResourceCounts(),
1272       ::testing::ElementsAre(::testing::Pair(
1273           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1274                                 XdsFooResourceType::Get()->type_url(), "acked"),
1275           1)));
1276   // XdsClient should send an unsubscription request.
1277   request = WaitForRequest(stream.get());
1278   ASSERT_TRUE(request.has_value());
1279   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1280                /*version_info=*/"1", /*response_nonce=*/"B",
1281                /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo2"});
1282   // Now cancel watch for "foo2".
1283   CancelFooWatch(watcher2.get(), "foo2");
1284   EXPECT_TRUE(stream->Orphaned());
1285 }
1286 
TEST_F(XdsClientTest,UpdateContainsOnlyChangedResource)1287 TEST_F(XdsClientTest, UpdateContainsOnlyChangedResource) {
1288   InitXdsClient();
1289   // Start a watch for "foo1".
1290   auto watcher = StartFooWatch("foo1");
1291   // Watcher should initially not see any resource reported.
1292   EXPECT_FALSE(watcher->HasEvent());
1293   // XdsClient should have created an ADS stream.
1294   auto stream = WaitForAdsStream();
1295   ASSERT_TRUE(stream != nullptr);
1296   // XdsClient should have sent a subscription request on the ADS stream.
1297   auto request = WaitForRequest(stream.get());
1298   ASSERT_TRUE(request.has_value());
1299   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1300                /*version_info=*/"", /*response_nonce=*/"",
1301                /*error_detail=*/absl::OkStatus(),
1302                /*resource_names=*/{"foo1"});
1303   CheckRequestNode(*request);  // Should be present on the first request.
1304   // Send a response.
1305   stream->SendMessageToClient(
1306       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1307           .set_version_info("1")
1308           .set_nonce("A")
1309           .AddFooResource(XdsFooResource("foo1", 6))
1310           .Serialize());
1311   // XdsClient should have delivered the response to the watcher.
1312   auto resource = watcher->WaitForNextResource();
1313   ASSERT_NE(resource, nullptr);
1314   EXPECT_EQ(resource->name, "foo1");
1315   EXPECT_EQ(resource->value, 6);
1316   // XdsClient should have sent an ACK message to the xDS server.
1317   request = WaitForRequest(stream.get());
1318   ASSERT_TRUE(request.has_value());
1319   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1320                /*version_info=*/"1", /*response_nonce=*/"A",
1321                /*error_detail=*/absl::OkStatus(),
1322                /*resource_names=*/{"foo1"});
1323   // Start a watch for "foo2".
1324   auto watcher2 = StartFooWatch("foo2");
1325   // XdsClient should have sent a subscription request on the ADS stream.
1326   request = WaitForRequest(stream.get());
1327   ASSERT_TRUE(request.has_value());
1328   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1329                /*version_info=*/"1", /*response_nonce=*/"A",
1330                /*error_detail=*/absl::OkStatus(),
1331                /*resource_names=*/{"foo1", "foo2"});
1332   // Send a response.
1333   stream->SendMessageToClient(
1334       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1335           .set_version_info("1")
1336           .set_nonce("B")
1337           .AddFooResource(XdsFooResource("foo2", 7))
1338           .Serialize());
1339   // XdsClient should have delivered the response to the watcher.
1340   resource = watcher2->WaitForNextResource();
1341   ASSERT_NE(resource, nullptr);
1342   EXPECT_EQ(resource->name, "foo2");
1343   EXPECT_EQ(resource->value, 7);
1344   // Check metric data.
1345   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1346               ::testing::ElementsAre(::testing::Pair(
1347                   ::testing::Pair(kDefaultXdsServerUrl,
1348                                   XdsFooResourceType::Get()->type_url()),
1349                   2)));
1350   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1351               ::testing::ElementsAre());
1352   EXPECT_THAT(
1353       GetResourceCounts(),
1354       ::testing::ElementsAre(::testing::Pair(
1355           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1356                                 XdsFooResourceType::Get()->type_url(), "acked"),
1357           2)));
1358   // XdsClient should have sent an ACK message to the xDS server.
1359   request = WaitForRequest(stream.get());
1360   ASSERT_TRUE(request.has_value());
1361   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1362                /*version_info=*/"1", /*response_nonce=*/"B",
1363                /*error_detail=*/absl::OkStatus(),
1364                /*resource_names=*/{"foo1", "foo2"});
1365   // Server sends an update for "foo1".  The response does not contain "foo2".
1366   stream->SendMessageToClient(
1367       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1368           .set_version_info("2")
1369           .set_nonce("C")
1370           .AddFooResource(XdsFooResource("foo1", 9))
1371           .Serialize());
1372   // XdsClient should have delivered the response to the watcher.
1373   resource = watcher->WaitForNextResource();
1374   ASSERT_NE(resource, nullptr);
1375   EXPECT_EQ(resource->name, "foo1");
1376   EXPECT_EQ(resource->value, 9);
1377   // Check metric data.
1378   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1379               ::testing::ElementsAre(::testing::Pair(
1380                   ::testing::Pair(kDefaultXdsServerUrl,
1381                                   XdsFooResourceType::Get()->type_url()),
1382                   3)));
1383   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1384               ::testing::ElementsAre());
1385   EXPECT_THAT(
1386       GetResourceCounts(),
1387       ::testing::ElementsAre(::testing::Pair(
1388           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1389                                 XdsFooResourceType::Get()->type_url(), "acked"),
1390           2)));
1391   // XdsClient should have sent an ACK message to the xDS server.
1392   request = WaitForRequest(stream.get());
1393   ASSERT_TRUE(request.has_value());
1394   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1395                /*version_info=*/"2", /*response_nonce=*/"C",
1396                /*error_detail=*/absl::OkStatus(),
1397                /*resource_names=*/{"foo1", "foo2"});
1398   // Cancel watch for "foo1".
1399   CancelFooWatch(watcher.get(), "foo1");
1400   // XdsClient should send an unsubscription request.
1401   request = WaitForRequest(stream.get());
1402   ASSERT_TRUE(request.has_value());
1403   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1404                /*version_info=*/"2", /*response_nonce=*/"C",
1405                /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo2"});
1406   // Now cancel watch for "foo2".
1407   CancelFooWatch(watcher2.get(), "foo2");
1408   EXPECT_TRUE(stream->Orphaned());
1409 }
1410 
TEST_F(XdsClientTest,ResourceValidationFailure)1411 TEST_F(XdsClientTest, ResourceValidationFailure) {
1412   InitXdsClient();
1413   // Start a watch for "foo1".
1414   auto watcher = StartFooWatch("foo1");
1415   // Check metric data.
1416   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1417               ::testing::ElementsAre());
1418   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1419               ::testing::ElementsAre());
1420   EXPECT_THAT(GetResourceCounts(),
1421               ::testing::ElementsAre(::testing::Pair(
1422                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1423                                         XdsFooResourceType::Get()->type_url(),
1424                                         "requested"),
1425                   1)));
1426   // Watcher should initially not see any resource reported.
1427   EXPECT_FALSE(watcher->HasEvent());
1428   // XdsClient should have created an ADS stream.
1429   auto stream = WaitForAdsStream();
1430   ASSERT_TRUE(stream != nullptr);
1431   // XdsClient should have sent a subscription request on the ADS stream.
1432   auto request = WaitForRequest(stream.get());
1433   ASSERT_TRUE(request.has_value());
1434   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1435                /*version_info=*/"", /*response_nonce=*/"",
1436                /*error_detail=*/absl::OkStatus(),
1437                /*resource_names=*/{"foo1"});
1438   CheckRequestNode(*request);  // Should be present on the first request.
1439   // Send a response containing an invalid resource.
1440   stream->SendMessageToClient(
1441       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1442           .set_version_info("1")
1443           .set_nonce("A")
1444           .AddInvalidResource(XdsFooResourceType::Get()->type_url(),
1445                               "{\"name\":\"foo1\",\"value\":[]}")
1446           .Serialize());
1447   // XdsClient should deliver an error to the watcher.
1448   auto error = watcher->WaitForNextError();
1449   ASSERT_TRUE(error.has_value());
1450   EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
1451   EXPECT_EQ(error->message(),
1452             "invalid resource: INVALID_ARGUMENT: errors validating JSON: "
1453             "[field:value error:is not a number] (node ID:xds_client_test)")
1454       << *error;
1455   // Check metric data.
1456   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1457               ::testing::ElementsAre());
1458   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1459               ::testing::ElementsAre(::testing::Pair(
1460                   ::testing::Pair(kDefaultXdsServerUrl,
1461                                   XdsFooResourceType::Get()->type_url()),
1462                   1)));
1463   EXPECT_THAT(GetResourceCounts(),
1464               ::testing::ElementsAre(::testing::Pair(
1465                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1466                                         XdsFooResourceType::Get()->type_url(),
1467                                         "nacked"),
1468                   1)));
1469   // XdsClient should NACK the update.
1470   // Note that version_info is not populated in the request.
1471   request = WaitForRequest(stream.get());
1472   ASSERT_TRUE(request.has_value());
1473   CheckRequest(
1474       *request, XdsFooResourceType::Get()->type_url(),
1475       /*version_info=*/"", /*response_nonce=*/"A",
1476       // error_detail=
1477       absl::InvalidArgumentError(
1478           "xDS response validation errors: ["
1479           "resource index 0: foo1: INVALID_ARGUMENT: errors validating JSON: "
1480           "[field:value error:is not a number]]"),
1481       /*resource_names=*/{"foo1"});
1482   // Start a second watch for the same resource.  It should immediately
1483   // receive the same error.
1484   auto watcher2 = StartFooWatch("foo1");
1485   error = watcher2->WaitForNextError();
1486   ASSERT_TRUE(error.has_value());
1487   EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
1488   EXPECT_EQ(error->message(),
1489             "invalid resource: INVALID_ARGUMENT: errors validating JSON: "
1490             "[field:value error:is not a number] (node ID:xds_client_test)")
1491       << *error;
1492   // Now server sends an updated version of the resource.
1493   stream->SendMessageToClient(
1494       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1495           .set_version_info("2")
1496           .set_nonce("B")
1497           .AddFooResource(XdsFooResource("foo1", 9))
1498           .Serialize());
1499   // XdsClient should deliver the response to both watchers.
1500   auto resource = watcher->WaitForNextResource();
1501   ASSERT_NE(resource, nullptr);
1502   EXPECT_EQ(resource->name, "foo1");
1503   EXPECT_EQ(resource->value, 9);
1504   resource = watcher2->WaitForNextResource();
1505   ASSERT_NE(resource, nullptr);
1506   EXPECT_EQ(resource->name, "foo1");
1507   EXPECT_EQ(resource->value, 9);
1508   // Check metric data.
1509   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1510               ::testing::ElementsAre(::testing::Pair(
1511                   ::testing::Pair(kDefaultXdsServerUrl,
1512                                   XdsFooResourceType::Get()->type_url()),
1513                   1)));
1514   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1515               ::testing::ElementsAre(::testing::Pair(
1516                   ::testing::Pair(kDefaultXdsServerUrl,
1517                                   XdsFooResourceType::Get()->type_url()),
1518                   1)));
1519   EXPECT_THAT(
1520       GetResourceCounts(),
1521       ::testing::ElementsAre(::testing::Pair(
1522           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1523                                 XdsFooResourceType::Get()->type_url(), "acked"),
1524           1)));
1525   // XdsClient should have sent an ACK message to the xDS server.
1526   request = WaitForRequest(stream.get());
1527   ASSERT_TRUE(request.has_value());
1528   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1529                /*version_info=*/"2", /*response_nonce=*/"B",
1530                /*error_detail=*/absl::OkStatus(),
1531                /*resource_names=*/{"foo1"});
1532   // Cancel watch.
1533   CancelFooWatch(watcher.get(), "foo1");
1534   CancelFooWatch(watcher2.get(), "foo1");
1535   EXPECT_TRUE(stream->Orphaned());
1536 }
1537 
TEST_F(XdsClientTest,ResourceValidationFailureMultipleResources)1538 TEST_F(XdsClientTest, ResourceValidationFailureMultipleResources) {
1539   InitXdsClient();
1540   // Start a watch for "foo1".
1541   auto watcher = StartFooWatch("foo1");
1542   // Watcher should initially not see any resource reported.
1543   EXPECT_FALSE(watcher->HasEvent());
1544   // Check metric data.
1545   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1546               ::testing::ElementsAre());
1547   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1548               ::testing::ElementsAre());
1549   EXPECT_THAT(GetResourceCounts(),
1550               ::testing::ElementsAre(::testing::Pair(
1551                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1552                                         XdsFooResourceType::Get()->type_url(),
1553                                         "requested"),
1554                   1)));
1555   // XdsClient should have created an ADS stream.
1556   auto stream = WaitForAdsStream();
1557   ASSERT_TRUE(stream != nullptr);
1558   // XdsClient should have sent a subscription request on the ADS stream.
1559   auto request = WaitForRequest(stream.get());
1560   ASSERT_TRUE(request.has_value());
1561   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1562                /*version_info=*/"", /*response_nonce=*/"",
1563                /*error_detail=*/absl::OkStatus(),
1564                /*resource_names=*/{"foo1"});
1565   CheckRequestNode(*request);  // Should be present on the first request.
1566   // Before the server responds, add a watch for another resource.
1567   auto watcher2 = StartFooWatch("foo2");
1568   // Check metric data.
1569   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1570               ::testing::ElementsAre());
1571   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1572               ::testing::ElementsAre());
1573   EXPECT_THAT(GetResourceCounts(),
1574               ::testing::ElementsAre(::testing::Pair(
1575                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1576                                         XdsFooResourceType::Get()->type_url(),
1577                                         "requested"),
1578                   2)));
1579   // Client should send another request.
1580   request = WaitForRequest(stream.get());
1581   ASSERT_TRUE(request.has_value());
1582   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1583                /*version_info=*/"", /*response_nonce=*/"",
1584                /*error_detail=*/absl::OkStatus(),
1585                /*resource_names=*/{"foo1", "foo2"});
1586   // Add a watch for a third resource.
1587   auto watcher3 = StartFooWatch("foo3");
1588   // Check metric data.
1589   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1590               ::testing::ElementsAre());
1591   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1592               ::testing::ElementsAre());
1593   EXPECT_THAT(GetResourceCounts(),
1594               ::testing::ElementsAre(::testing::Pair(
1595                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1596                                         XdsFooResourceType::Get()->type_url(),
1597                                         "requested"),
1598                   3)));
1599   // Client should send another request.
1600   request = WaitForRequest(stream.get());
1601   ASSERT_TRUE(request.has_value());
1602   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1603                /*version_info=*/"", /*response_nonce=*/"",
1604                /*error_detail=*/absl::OkStatus(),
1605                /*resource_names=*/{"foo1", "foo2", "foo3"});
1606   // Add a watch for a fourth resource.
1607   auto watcher4 = StartFooWatch("foo4");
1608   // Check metric data.
1609   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1610               ::testing::ElementsAre());
1611   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1612               ::testing::ElementsAre());
1613   EXPECT_THAT(GetResourceCounts(),
1614               ::testing::ElementsAre(::testing::Pair(
1615                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1616                                         XdsFooResourceType::Get()->type_url(),
1617                                         "requested"),
1618                   4)));
1619   // Client should send another request.
1620   request = WaitForRequest(stream.get());
1621   ASSERT_TRUE(request.has_value());
1622   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1623                /*version_info=*/"", /*response_nonce=*/"",
1624                /*error_detail=*/absl::OkStatus(),
1625                /*resource_names=*/{"foo1", "foo2", "foo3", "foo4"});
1626   // Server sends a response containing three invalid resources and one
1627   // valid resource.
1628   stream->SendMessageToClient(
1629       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1630           .set_version_info("1")
1631           .set_nonce("A")
1632           // foo1: JSON parsing succeeds, so we know the resource name,
1633           // but validation fails.
1634           .AddInvalidResource(XdsFooResourceType::Get()->type_url(),
1635                               "{\"name\":\"foo1\",\"value\":[]}")
1636           // foo2: JSON parsing fails, and not wrapped in a Resource
1637           // wrapper, so we don't actually know the resource's name.
1638           .AddInvalidResource(XdsFooResourceType::Get()->type_url(),
1639                               "{\"name\":\"foo2,\"value\":6}")
1640           // Empty resource.  Will be included in NACK but will not
1641           // affect any watchers.
1642           .AddEmptyResource()
1643           // Invalid resource wrapper.  Will be included in NACK but
1644           // will not affect any watchers.
1645           .AddInvalidResourceWrapper()
1646           // foo3: JSON parsing fails, but it is wrapped in a Resource
1647           // wrapper, so we do know the resource's name.
1648           .AddInvalidResource(XdsFooResourceType::Get()->type_url(),
1649                               "{\"name\":\"foo3,\"value\":6}",
1650                               /*resource_wrapper_name=*/"foo3")
1651           // foo4: valid resource.
1652           .AddFooResource(XdsFooResource("foo4", 5))
1653           .Serialize());
1654   // XdsClient should deliver an error to the watchers for foo1 and foo3.
1655   auto error = watcher->WaitForNextError();
1656   ASSERT_TRUE(error.has_value());
1657   EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
1658   EXPECT_EQ(error->message(),
1659             "invalid resource: INVALID_ARGUMENT: errors validating JSON: "
1660             "[field:value error:is not a number] (node ID:xds_client_test)")
1661       << *error;
1662   error = watcher3->WaitForNextError();
1663   ASSERT_TRUE(error.has_value());
1664   EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
1665   EXPECT_EQ(error->message(),
1666             "invalid resource: INVALID_ARGUMENT: JSON parsing failed: "
1667             "[JSON parse error at index 15] (node ID:xds_client_test)")
1668       << *error;
1669   // It cannot delivery an error for foo2, because the client doesn't know
1670   // that that resource in the response was actually supposed to be foo2.
1671   EXPECT_FALSE(watcher2->HasEvent());
1672   // It will delivery a valid resource update for foo4.
1673   auto resource = watcher4->WaitForNextResource();
1674   ASSERT_NE(resource, nullptr);
1675   EXPECT_EQ(resource->name, "foo4");
1676   EXPECT_EQ(resource->value, 5);
1677   // Check metric data.
1678   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1679               ::testing::ElementsAre(::testing::Pair(
1680                   ::testing::Pair(kDefaultXdsServerUrl,
1681                                   XdsFooResourceType::Get()->type_url()),
1682                   1)));
1683   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1684               ::testing::ElementsAre(::testing::Pair(
1685                   ::testing::Pair(kDefaultXdsServerUrl,
1686                                   XdsFooResourceType::Get()->type_url()),
1687                   5)));
1688   EXPECT_THAT(
1689       GetResourceCounts(),
1690       ::testing::ElementsAre(
1691           // foo4
1692           ::testing::Pair(ResourceCountLabelsEq(
1693                               XdsClient::kOldStyleAuthority,
1694                               XdsFooResourceType::Get()->type_url(), "acked"),
1695                           1),
1696           // foo1 and foo3
1697           ::testing::Pair(ResourceCountLabelsEq(
1698                               XdsClient::kOldStyleAuthority,
1699                               XdsFooResourceType::Get()->type_url(), "nacked"),
1700                           2),
1701           // did not recognize response for foo2
1702           ::testing::Pair(
1703               ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1704                                     XdsFooResourceType::Get()->type_url(),
1705                                     "requested"),
1706               1)));
1707   // XdsClient should NACK the update.
1708   // There was one good resource, so the version will be updated.
1709   request = WaitForRequest(stream.get());
1710   ASSERT_TRUE(request.has_value());
1711   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1712                /*version_info=*/"1", /*response_nonce=*/"A",
1713                // error_detail=
1714                absl::InvalidArgumentError(absl::StrCat(
1715                    "xDS response validation errors: ["
1716                    // foo1
1717                    "resource index 0: foo1: "
1718                    "INVALID_ARGUMENT: errors validating JSON: "
1719                    "[field:value error:is not a number]; "
1720                    // foo2 (name not known)
1721                    "resource index 1: INVALID_ARGUMENT: JSON parsing failed: "
1722                    "[JSON parse error at index 15]; "
1723                    // empty resource
1724                    "resource index 2: incorrect resource type \"\" "
1725                    "(should be \"",
1726                    XdsFooResourceType::Get()->type_url(),
1727                    "\"); "
1728                    // invalid resource wrapper
1729                    "resource index 3: Can't decode Resource proto wrapper; "
1730                    // foo3
1731                    "resource index 4: foo3: "
1732                    "INVALID_ARGUMENT: JSON parsing failed: "
1733                    "[JSON parse error at index 15]]")),
1734                /*resource_names=*/{"foo1", "foo2", "foo3", "foo4"});
1735   // Cancel watches.
1736   CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true);
1737   CancelFooWatch(watcher2.get(), "foo2", /*delay_unsubscription=*/true);
1738   CancelFooWatch(watcher3.get(), "foo3", /*delay_unsubscription=*/true);
1739   CancelFooWatch(watcher4.get(), "foo4");
1740   EXPECT_TRUE(stream->Orphaned());
1741 }
1742 
TEST_F(XdsClientTest,ResourceValidationFailureForCachedResource)1743 TEST_F(XdsClientTest, ResourceValidationFailureForCachedResource) {
1744   InitXdsClient();
1745   // Start a watch for "foo1".
1746   auto watcher = StartFooWatch("foo1");
1747   // Watcher should initially not see any resource reported.
1748   EXPECT_FALSE(watcher->HasEvent());
1749   // XdsClient should have created an ADS stream.
1750   auto stream = WaitForAdsStream();
1751   ASSERT_TRUE(stream != nullptr);
1752   // XdsClient should have sent a subscription request on the ADS stream.
1753   auto request = WaitForRequest(stream.get());
1754   ASSERT_TRUE(request.has_value());
1755   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1756                /*version_info=*/"", /*response_nonce=*/"",
1757                /*error_detail=*/absl::OkStatus(),
1758                /*resource_names=*/{"foo1"});
1759   CheckRequestNode(*request);  // Should be present on the first request.
1760   // Send a response.
1761   stream->SendMessageToClient(
1762       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1763           .set_version_info("1")
1764           .set_nonce("A")
1765           .AddFooResource(XdsFooResource("foo1", 6))
1766           .Serialize());
1767   // XdsClient should have delivered the response to the watcher.
1768   auto resource = watcher->WaitForNextResource();
1769   ASSERT_NE(resource, nullptr);
1770   EXPECT_EQ(resource->name, "foo1");
1771   EXPECT_EQ(resource->value, 6);
1772   // Check metric data.
1773   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1774               ::testing::ElementsAre(::testing::Pair(
1775                   ::testing::Pair(kDefaultXdsServerUrl,
1776                                   XdsFooResourceType::Get()->type_url()),
1777                   1)));
1778   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1779               ::testing::ElementsAre());
1780   EXPECT_THAT(
1781       GetResourceCounts(),
1782       ::testing::ElementsAre(::testing::Pair(
1783           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1784                                 XdsFooResourceType::Get()->type_url(), "acked"),
1785           1)));
1786   // XdsClient should have sent an ACK message to the xDS server.
1787   request = WaitForRequest(stream.get());
1788   ASSERT_TRUE(request.has_value());
1789   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1790                /*version_info=*/"1", /*response_nonce=*/"A",
1791                /*error_detail=*/absl::OkStatus(),
1792                /*resource_names=*/{"foo1"});
1793   // Send an update containing an invalid resource.
1794   stream->SendMessageToClient(
1795       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1796           .set_version_info("2")
1797           .set_nonce("B")
1798           .AddInvalidResource(XdsFooResourceType::Get()->type_url(),
1799                               "{\"name\":\"foo1\",\"value\":[]}")
1800           .Serialize());
1801   // XdsClient should deliver an error to the watcher.
1802   auto error = watcher->WaitForNextError();
1803   ASSERT_TRUE(error.has_value());
1804   EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
1805   EXPECT_EQ(error->message(),
1806             "invalid resource: INVALID_ARGUMENT: errors validating JSON: "
1807             "[field:value error:is not a number] (node ID:xds_client_test)")
1808       << *error;
1809   // Check metric data.
1810   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1811               ::testing::ElementsAre(::testing::Pair(
1812                   ::testing::Pair(kDefaultXdsServerUrl,
1813                                   XdsFooResourceType::Get()->type_url()),
1814                   1)));
1815   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1816               ::testing::ElementsAre(::testing::Pair(
1817                   ::testing::Pair(kDefaultXdsServerUrl,
1818                                   XdsFooResourceType::Get()->type_url()),
1819                   1)));
1820   EXPECT_THAT(GetResourceCounts(),
1821               ::testing::ElementsAre(::testing::Pair(
1822                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1823                                         XdsFooResourceType::Get()->type_url(),
1824                                         "nacked_but_cached"),
1825                   1)));
1826   // XdsClient should NACK the update.
1827   // Note that version_info is set to the previous version in this request,
1828   // because there were no valid resources in it.
1829   request = WaitForRequest(stream.get());
1830   ASSERT_TRUE(request.has_value());
1831   CheckRequest(
1832       *request, XdsFooResourceType::Get()->type_url(),
1833       /*version_info=*/"1", /*response_nonce=*/"B",
1834       // error_detail=
1835       absl::InvalidArgumentError(
1836           "xDS response validation errors: ["
1837           "resource index 0: foo1: INVALID_ARGUMENT: errors validating JSON: "
1838           "[field:value error:is not a number]]"),
1839       /*resource_names=*/{"foo1"});
1840   // Start a second watcher for the same resource.  Even though the last
1841   // update was a NACK, we should still deliver the cached resource to
1842   // the watcher.
1843   // TODO(roth): Consider what the right behavior is here.  It seems
1844   // inconsistent that the watcher sees the error if it had started
1845   // before the error was seen but does not if it was started afterwards.
1846   // One option is to not send errors at all for already-cached resources;
1847   // another option is to send the errors even for newly started watchers.
1848   auto watcher2 = StartFooWatch("foo1");
1849   resource = watcher2->WaitForNextResource();
1850   ASSERT_NE(resource, nullptr);
1851   EXPECT_EQ(resource->name, "foo1");
1852   EXPECT_EQ(resource->value, 6);
1853   // Cancel watches.
1854   CancelFooWatch(watcher.get(), "foo1");
1855   CancelFooWatch(watcher2.get(), "foo1");
1856   EXPECT_TRUE(stream->Orphaned());
1857 }
1858 
TEST_F(XdsClientTest,WildcardCapableResponseWithEmptyResource)1859 TEST_F(XdsClientTest, WildcardCapableResponseWithEmptyResource) {
1860   InitXdsClient();
1861   // Start a watch for "wc1".
1862   auto watcher = StartWildcardCapableWatch("wc1");
1863   // Watcher should initially not see any resource reported.
1864   EXPECT_FALSE(watcher->HasEvent());
1865   // XdsClient should have created an ADS stream.
1866   auto stream = WaitForAdsStream();
1867   ASSERT_TRUE(stream != nullptr);
1868   // XdsClient should have sent a subscription request on the ADS stream.
1869   auto request = WaitForRequest(stream.get());
1870   ASSERT_TRUE(request.has_value());
1871   CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
1872                /*version_info=*/"", /*response_nonce=*/"",
1873                /*error_detail=*/absl::OkStatus(),
1874                /*resource_names=*/{"wc1"});
1875   CheckRequestNode(*request);  // Should be present on the first request.
1876   // Server sends a response containing the requested resources plus an
1877   // empty resource.
1878   stream->SendMessageToClient(
1879       ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
1880           .set_version_info("1")
1881           .set_nonce("A")
1882           .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 6))
1883           .AddEmptyResource()
1884           .Serialize());
1885   // XdsClient will delivery a valid resource update for wc1.
1886   auto resource = watcher->WaitForNextResource();
1887   ASSERT_NE(resource, nullptr);
1888   EXPECT_EQ(resource->name, "wc1");
1889   EXPECT_EQ(resource->value, 6);
1890   // Check metric data.
1891   EXPECT_THAT(
1892       metrics_reporter_->resource_updates_valid(),
1893       ::testing::ElementsAre(::testing::Pair(
1894           ::testing::Pair(kDefaultXdsServerUrl,
1895                           XdsWildcardCapableResourceType::Get()->type_url()),
1896           1)));
1897   EXPECT_THAT(
1898       metrics_reporter_->resource_updates_invalid(),
1899       ::testing::ElementsAre(::testing::Pair(
1900           ::testing::Pair(kDefaultXdsServerUrl,
1901                           XdsWildcardCapableResourceType::Get()->type_url()),
1902           1)));
1903   EXPECT_THAT(
1904       GetResourceCounts(),
1905       ::testing::ElementsAre(::testing::Pair(
1906           ResourceCountLabelsEq(
1907               XdsClient::kOldStyleAuthority,
1908               XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
1909           1)));
1910   // XdsClient should NACK the update.
1911   // There was one good resource, so the version will be updated.
1912   request = WaitForRequest(stream.get());
1913   ASSERT_TRUE(request.has_value());
1914   CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
1915                /*version_info=*/"1", /*response_nonce=*/"A",
1916                // error_detail=
1917                absl::InvalidArgumentError(absl::StrCat(
1918                    "xDS response validation errors: ["
1919                    "resource index 1: incorrect resource type \"\" "
1920                    "(should be \"",
1921                    XdsWildcardCapableResourceType::Get()->type_url(), "\")]")),
1922                /*resource_names=*/{"wc1"});
1923   // Cancel watch.
1924   CancelWildcardCapableWatch(watcher.get(), "wc1");
1925   EXPECT_TRUE(stream->Orphaned());
1926 }
1927 
1928 // This tests resource removal triggered by the server when using a
1929 // resource type that requires all resources to be present in every
1930 // response, similar to LDS and CDS.
TEST_F(XdsClientTest,ResourceDeletion)1931 TEST_F(XdsClientTest, ResourceDeletion) {
1932   InitXdsClient();
1933   // Start a watch for "wc1".
1934   auto watcher = StartWildcardCapableWatch("wc1");
1935   // Watcher should initially not see any resource reported.
1936   EXPECT_FALSE(watcher->HasEvent());
1937   // XdsClient should have created an ADS stream.
1938   auto stream = WaitForAdsStream();
1939   ASSERT_TRUE(stream != nullptr);
1940   // XdsClient should have sent a subscription request on the ADS stream.
1941   auto request = WaitForRequest(stream.get());
1942   ASSERT_TRUE(request.has_value());
1943   CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
1944                /*version_info=*/"", /*response_nonce=*/"",
1945                /*error_detail=*/absl::OkStatus(),
1946                /*resource_names=*/{"wc1"});
1947   CheckRequestNode(*request);  // Should be present on the first request.
1948   // Server sends a response.
1949   stream->SendMessageToClient(
1950       ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
1951           .set_version_info("1")
1952           .set_nonce("A")
1953           .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 6))
1954           .Serialize());
1955   // XdsClient should have delivered the response to the watcher.
1956   auto resource = watcher->WaitForNextResource();
1957   ASSERT_NE(resource, nullptr);
1958   EXPECT_EQ(resource->name, "wc1");
1959   EXPECT_EQ(resource->value, 6);
1960   // Check metric data.
1961   EXPECT_THAT(
1962       metrics_reporter_->resource_updates_valid(),
1963       ::testing::ElementsAre(::testing::Pair(
1964           ::testing::Pair(kDefaultXdsServerUrl,
1965                           XdsWildcardCapableResourceType::Get()->type_url()),
1966           1)));
1967   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1968               ::testing::ElementsAre());
1969   EXPECT_THAT(
1970       GetResourceCounts(),
1971       ::testing::ElementsAre(::testing::Pair(
1972           ResourceCountLabelsEq(
1973               XdsClient::kOldStyleAuthority,
1974               XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
1975           1)));
1976   // XdsClient should have sent an ACK message to the xDS server.
1977   request = WaitForRequest(stream.get());
1978   ASSERT_TRUE(request.has_value());
1979   CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
1980                /*version_info=*/"1", /*response_nonce=*/"A",
1981                /*error_detail=*/absl::OkStatus(),
1982                /*resource_names=*/{"wc1"});
1983   // Server now sends a response without the resource, thus indicating
1984   // it's been deleted.
1985   stream->SendMessageToClient(
1986       ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
1987           .set_version_info("2")
1988           .set_nonce("B")
1989           .Serialize());
1990   // Watcher should see the does-not-exist event.
1991   EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(1)));
1992   // Check metric data.
1993   EXPECT_THAT(
1994       metrics_reporter_->resource_updates_valid(),
1995       ::testing::ElementsAre(::testing::Pair(
1996           ::testing::Pair(kDefaultXdsServerUrl,
1997                           XdsWildcardCapableResourceType::Get()->type_url()),
1998           1)));
1999   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2000               ::testing::ElementsAre());
2001   EXPECT_THAT(GetResourceCounts(),
2002               ::testing::ElementsAre(::testing::Pair(
2003                   ResourceCountLabelsEq(
2004                       XdsClient::kOldStyleAuthority,
2005                       XdsWildcardCapableResourceType::Get()->type_url(),
2006                       "does_not_exist"),
2007                   1)));
2008   // Start a new watcher for the same resource.  It should immediately
2009   // receive the same does-not-exist notification.
2010   auto watcher2 = StartWildcardCapableWatch("wc1");
2011   EXPECT_TRUE(watcher2->WaitForDoesNotExist(absl::Seconds(1)));
2012   // XdsClient should have sent an ACK message to the xDS server.
2013   request = WaitForRequest(stream.get());
2014   ASSERT_TRUE(request.has_value());
2015   CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2016                /*version_info=*/"2", /*response_nonce=*/"B",
2017                /*error_detail=*/absl::OkStatus(),
2018                /*resource_names=*/{"wc1"});
2019   // Server sends the resource again.
2020   stream->SendMessageToClient(
2021       ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2022           .set_version_info("3")
2023           .set_nonce("C")
2024           .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 7))
2025           .Serialize());
2026   // XdsClient should have delivered the response to the watchers.
2027   resource = watcher->WaitForNextResource();
2028   ASSERT_NE(resource, nullptr);
2029   EXPECT_EQ(resource->name, "wc1");
2030   EXPECT_EQ(resource->value, 7);
2031   resource = watcher2->WaitForNextResource();
2032   ASSERT_NE(resource, nullptr);
2033   EXPECT_EQ(resource->name, "wc1");
2034   EXPECT_EQ(resource->value, 7);
2035   // Check metric data.
2036   EXPECT_THAT(
2037       metrics_reporter_->resource_updates_valid(),
2038       ::testing::ElementsAre(::testing::Pair(
2039           ::testing::Pair(kDefaultXdsServerUrl,
2040                           XdsWildcardCapableResourceType::Get()->type_url()),
2041           2)));
2042   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2043               ::testing::ElementsAre());
2044   EXPECT_THAT(
2045       GetResourceCounts(),
2046       ::testing::ElementsAre(::testing::Pair(
2047           ResourceCountLabelsEq(
2048               XdsClient::kOldStyleAuthority,
2049               XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
2050           1)));
2051   // XdsClient should have sent an ACK message to the xDS server.
2052   request = WaitForRequest(stream.get());
2053   ASSERT_TRUE(request.has_value());
2054   CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2055                /*version_info=*/"3", /*response_nonce=*/"C",
2056                /*error_detail=*/absl::OkStatus(),
2057                /*resource_names=*/{"wc1"});
2058   // Cancel watch.
2059   CancelWildcardCapableWatch(watcher.get(), "wc1");
2060   CancelWildcardCapableWatch(watcher2.get(), "wc1");
2061   EXPECT_TRUE(stream->Orphaned());
2062 }
2063 
2064 // This tests that when we ignore resource deletions from the server
2065 // when configured to do so.
TEST_F(XdsClientTest,ResourceDeletionIgnoredWhenConfigured)2066 TEST_F(XdsClientTest, ResourceDeletionIgnoredWhenConfigured) {
2067   InitXdsClient(FakeXdsBootstrap::Builder().SetServers(
2068       {FakeXdsBootstrap::FakeXdsServer(kDefaultXdsServerUrl, true)}));
2069   // Start a watch for "wc1".
2070   auto watcher = StartWildcardCapableWatch("wc1");
2071   // Watcher should initially not see any resource reported.
2072   EXPECT_FALSE(watcher->HasEvent());
2073   // XdsClient should have created an ADS stream.
2074   auto stream = WaitForAdsStream();
2075   ASSERT_TRUE(stream != nullptr);
2076   // XdsClient should have sent a subscription request on the ADS stream.
2077   auto request = WaitForRequest(stream.get());
2078   ASSERT_TRUE(request.has_value());
2079   CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2080                /*version_info=*/"", /*response_nonce=*/"",
2081                /*error_detail=*/absl::OkStatus(),
2082                /*resource_names=*/{"wc1"});
2083   CheckRequestNode(*request);  // Should be present on the first request.
2084   // Server sends a response.
2085   stream->SendMessageToClient(
2086       ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2087           .set_version_info("1")
2088           .set_nonce("A")
2089           .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 6))
2090           .Serialize());
2091   // XdsClient should have delivered the response to the watcher.
2092   auto resource = watcher->WaitForNextResource();
2093   ASSERT_NE(resource, nullptr);
2094   EXPECT_EQ(resource->name, "wc1");
2095   EXPECT_EQ(resource->value, 6);
2096   // Check metric data.
2097   EXPECT_THAT(
2098       metrics_reporter_->resource_updates_valid(),
2099       ::testing::ElementsAre(::testing::Pair(
2100           ::testing::Pair(kDefaultXdsServerUrl,
2101                           XdsWildcardCapableResourceType::Get()->type_url()),
2102           1)));
2103   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2104               ::testing::ElementsAre());
2105   EXPECT_THAT(
2106       GetResourceCounts(),
2107       ::testing::ElementsAre(::testing::Pair(
2108           ResourceCountLabelsEq(
2109               XdsClient::kOldStyleAuthority,
2110               XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
2111           1)));
2112   // XdsClient should have sent an ACK message to the xDS server.
2113   request = WaitForRequest(stream.get());
2114   ASSERT_TRUE(request.has_value());
2115   CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2116                /*version_info=*/"1", /*response_nonce=*/"A",
2117                /*error_detail=*/absl::OkStatus(),
2118                /*resource_names=*/{"wc1"});
2119   // Server now sends a response without the resource, thus indicating
2120   // it's been deleted.
2121   stream->SendMessageToClient(
2122       ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2123           .set_version_info("2")
2124           .set_nonce("B")
2125           .Serialize());
2126   // Watcher should not see any update, since we should have ignored the
2127   // deletion.
2128   EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(1)));
2129   // Check metric data.
2130   EXPECT_THAT(
2131       metrics_reporter_->resource_updates_valid(),
2132       ::testing::ElementsAre(::testing::Pair(
2133           ::testing::Pair(kDefaultXdsServerUrl,
2134                           XdsWildcardCapableResourceType::Get()->type_url()),
2135           1)));
2136   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2137               ::testing::ElementsAre());
2138   EXPECT_THAT(
2139       GetResourceCounts(),
2140       ::testing::ElementsAre(::testing::Pair(
2141           ResourceCountLabelsEq(
2142               XdsClient::kOldStyleAuthority,
2143               XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
2144           1)));
2145   // Start a new watcher for the same resource.  It should immediately
2146   // receive the cached resource.
2147   auto watcher2 = StartWildcardCapableWatch("wc1");
2148   resource = watcher2->WaitForNextResource();
2149   ASSERT_NE(resource, nullptr);
2150   EXPECT_EQ(resource->name, "wc1");
2151   EXPECT_EQ(resource->value, 6);
2152   // XdsClient should have sent an ACK message to the xDS server.
2153   request = WaitForRequest(stream.get());
2154   ASSERT_TRUE(request.has_value());
2155   CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2156                /*version_info=*/"2", /*response_nonce=*/"B",
2157                /*error_detail=*/absl::OkStatus(),
2158                /*resource_names=*/{"wc1"});
2159   // Server sends a new value for the resource.
2160   stream->SendMessageToClient(
2161       ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2162           .set_version_info("3")
2163           .set_nonce("C")
2164           .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 7))
2165           .Serialize());
2166   // XdsClient should have delivered the response to the watchers.
2167   resource = watcher->WaitForNextResource();
2168   ASSERT_NE(resource, nullptr);
2169   EXPECT_EQ(resource->name, "wc1");
2170   EXPECT_EQ(resource->value, 7);
2171   resource = watcher2->WaitForNextResource();
2172   ASSERT_NE(resource, nullptr);
2173   EXPECT_EQ(resource->name, "wc1");
2174   EXPECT_EQ(resource->value, 7);
2175   // Check metric data.
2176   EXPECT_THAT(
2177       metrics_reporter_->resource_updates_valid(),
2178       ::testing::ElementsAre(::testing::Pair(
2179           ::testing::Pair(kDefaultXdsServerUrl,
2180                           XdsWildcardCapableResourceType::Get()->type_url()),
2181           2)));
2182   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2183               ::testing::ElementsAre());
2184   EXPECT_THAT(
2185       GetResourceCounts(),
2186       ::testing::ElementsAre(::testing::Pair(
2187           ResourceCountLabelsEq(
2188               XdsClient::kOldStyleAuthority,
2189               XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
2190           1)));
2191   // XdsClient should have sent an ACK message to the xDS server.
2192   request = WaitForRequest(stream.get());
2193   ASSERT_TRUE(request.has_value());
2194   CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2195                /*version_info=*/"3", /*response_nonce=*/"C",
2196                /*error_detail=*/absl::OkStatus(),
2197                /*resource_names=*/{"wc1"});
2198   // Cancel watch.
2199   CancelWildcardCapableWatch(watcher.get(), "wc1");
2200   CancelWildcardCapableWatch(watcher2.get(), "wc1");
2201   EXPECT_TRUE(stream->Orphaned());
2202 }
2203 
TEST_F(XdsClientTest,StreamClosedByServer)2204 TEST_F(XdsClientTest, StreamClosedByServer) {
2205   InitXdsClient();
2206   // Metrics should initially be empty.
2207   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
2208   // Start a watch for "foo1".
2209   auto watcher = StartFooWatch("foo1");
2210   // Check metric data.
2211   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2212                                           kDefaultXdsServerUrl, true)));
2213   // Watcher should initially not see any resource reported.
2214   EXPECT_FALSE(watcher->HasEvent());
2215   // XdsClient should have created an ADS stream.
2216   auto stream = WaitForAdsStream();
2217   ASSERT_TRUE(stream != nullptr);
2218   // XdsClient should have sent a subscription request on the ADS stream.
2219   auto request = WaitForRequest(stream.get());
2220   ASSERT_TRUE(request.has_value());
2221   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2222                /*version_info=*/"", /*response_nonce=*/"",
2223                /*error_detail=*/absl::OkStatus(),
2224                /*resource_names=*/{"foo1"});
2225   CheckRequestNode(*request);  // Should be present on the first request.
2226   // Server sends a response.
2227   stream->SendMessageToClient(
2228       ResponseBuilder(XdsFooResourceType::Get()->type_url())
2229           .set_version_info("1")
2230           .set_nonce("A")
2231           .AddFooResource(XdsFooResource("foo1", 6))
2232           .Serialize());
2233   // XdsClient should have delivered the response to the watcher.
2234   auto resource = watcher->WaitForNextResource();
2235   ASSERT_NE(resource, nullptr);
2236   EXPECT_EQ(resource->name, "foo1");
2237   EXPECT_EQ(resource->value, 6);
2238   // XdsClient should have sent an ACK message to the xDS server.
2239   request = WaitForRequest(stream.get());
2240   ASSERT_TRUE(request.has_value());
2241   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2242                /*version_info=*/"1", /*response_nonce=*/"A",
2243                /*error_detail=*/absl::OkStatus(),
2244                /*resource_names=*/{"foo1"});
2245   // Now server closes the stream.
2246   stream->MaybeSendStatusToClient(absl::OkStatus());
2247   // XdsClient should NOT report error to watcher, because we saw a
2248   // response on the stream before it failed.
2249   // Stream should be orphaned.
2250   EXPECT_TRUE(stream->Orphaned());
2251   // Check metric data.
2252   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2253                                           kDefaultXdsServerUrl, true)));
2254   // XdsClient should create a new stream.
2255   stream = WaitForAdsStream();
2256   ASSERT_TRUE(stream != nullptr);
2257   // XdsClient sends a subscription request.
2258   // Note that the version persists from the previous stream, but the
2259   // nonce does not.
2260   request = WaitForRequest(stream.get());
2261   ASSERT_TRUE(request.has_value());
2262   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2263                /*version_info=*/"1", /*response_nonce=*/"",
2264                /*error_detail=*/absl::OkStatus(),
2265                /*resource_names=*/{"foo1"});
2266   CheckRequestNode(*request);  // Should be present on the first request.
2267   // Before the server resends the resource, start a new watcher for the
2268   // same resource.  This watcher should immediately receive the cached
2269   // resource.
2270   auto watcher2 = StartFooWatch("foo1");
2271   resource = watcher2->WaitForNextResource();
2272   ASSERT_NE(resource, nullptr);
2273   EXPECT_EQ(resource->name, "foo1");
2274   EXPECT_EQ(resource->value, 6);
2275   // Server now sends the requested resource.
2276   stream->SendMessageToClient(
2277       ResponseBuilder(XdsFooResourceType::Get()->type_url())
2278           .set_version_info("1")
2279           .set_nonce("B")
2280           .AddFooResource(XdsFooResource("foo1", 6))
2281           .Serialize());
2282   // Watcher does NOT get an update, since the resource has not changed.
2283   EXPECT_FALSE(watcher->WaitForNextResource());
2284   // XdsClient sends an ACK.
2285   request = WaitForRequest(stream.get());
2286   ASSERT_TRUE(request.has_value());
2287   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2288                /*version_info=*/"1", /*response_nonce=*/"B",
2289                /*error_detail=*/absl::OkStatus(),
2290                /*resource_names=*/{"foo1"});
2291   // Cancel watcher.
2292   CancelFooWatch(watcher.get(), "foo1");
2293   CancelFooWatch(watcher2.get(), "foo1");
2294   EXPECT_TRUE(stream->Orphaned());
2295 }
2296 
TEST_F(XdsClientTest,StreamClosedByServerWithoutSeeingResponse)2297 TEST_F(XdsClientTest, StreamClosedByServerWithoutSeeingResponse) {
2298   InitXdsClient();
2299   // Metrics should initially be empty.
2300   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
2301   // Start a watch for "foo1".
2302   auto watcher = StartFooWatch("foo1");
2303   // Check metric data.
2304   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2305                                           kDefaultXdsServerUrl, true)));
2306   // Watcher should initially not see any resource reported.
2307   EXPECT_FALSE(watcher->HasEvent());
2308   // XdsClient should have created an ADS stream.
2309   auto stream = WaitForAdsStream();
2310   ASSERT_TRUE(stream != nullptr);
2311   // XdsClient should have sent a subscription request on the ADS stream.
2312   auto request = WaitForRequest(stream.get());
2313   ASSERT_TRUE(request.has_value());
2314   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2315                /*version_info=*/"", /*response_nonce=*/"",
2316                /*error_detail=*/absl::OkStatus(),
2317                /*resource_names=*/{"foo1"});
2318   CheckRequestNode(*request);  // Should be present on the first request.
2319   // Server closes the stream without sending a response.
2320   stream->MaybeSendStatusToClient(absl::UnavailableError("ugh"));
2321   // Check metric data.
2322   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2323                                           kDefaultXdsServerUrl, false)));
2324   // XdsClient should report an error to the watcher.
2325   auto error = watcher->WaitForNextError();
2326   ASSERT_TRUE(error.has_value());
2327   EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
2328   EXPECT_EQ(error->message(),
2329             "xDS channel for server default_xds_server: xDS call failed "
2330             "with no responses received; status: UNAVAILABLE: ugh "
2331             "(node ID:xds_client_test)")
2332       << *error;
2333   // XdsClient should create a new stream.
2334   stream = WaitForAdsStream();
2335   ASSERT_TRUE(stream != nullptr);
2336   // XdsClient sends a subscription request.
2337   request = WaitForRequest(stream.get());
2338   ASSERT_TRUE(request.has_value());
2339   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2340                /*version_info=*/"", /*response_nonce=*/"",
2341                /*error_detail=*/absl::OkStatus(),
2342                /*resource_names=*/{"foo1"});
2343   CheckRequestNode(*request);  // Should be present on the first request.
2344   // Connection still reported as unhappy until we get a response.
2345   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2346                                           kDefaultXdsServerUrl, false)));
2347   // Server now sends the requested resource.
2348   stream->SendMessageToClient(
2349       ResponseBuilder(XdsFooResourceType::Get()->type_url())
2350           .set_version_info("1")
2351           .set_nonce("A")
2352           .AddFooResource(XdsFooResource("foo1", 6))
2353           .Serialize());
2354   // Watcher gets the resource.
2355   auto resource = watcher->WaitForNextResource();
2356   ASSERT_NE(resource, nullptr);
2357   EXPECT_EQ(resource->name, "foo1");
2358   EXPECT_EQ(resource->value, 6);
2359   // Connection now reported as happy.
2360   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2361                                           kDefaultXdsServerUrl, true)));
2362   // XdsClient sends an ACK.
2363   request = WaitForRequest(stream.get());
2364   ASSERT_TRUE(request.has_value());
2365   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2366                /*version_info=*/"1", /*response_nonce=*/"A",
2367                /*error_detail=*/absl::OkStatus(),
2368                /*resource_names=*/{"foo1"});
2369   // Cancel watcher.
2370   CancelFooWatch(watcher.get(), "foo1");
2371   EXPECT_TRUE(stream->Orphaned());
2372 }
2373 
TEST_F(XdsClientTest,ConnectionFails)2374 TEST_F(XdsClientTest, ConnectionFails) {
2375   // Lower resources-does-not-exist timeout, to make sure that we're not
2376   // triggering that here.
2377   InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3));
2378   // Tell transport to let us manually trigger completion of the
2379   // send_message ops to XdsClient.
2380   transport_factory_->SetAutoCompleteMessagesFromClient(false);
2381   // Metrics should initially be empty.
2382   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
2383   // Start a watch for "foo1".
2384   auto watcher = StartFooWatch("foo1");
2385   // Check metric data.
2386   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2387                                           kDefaultXdsServerUrl, true)));
2388   // Watcher should initially not see any resource reported.
2389   EXPECT_FALSE(watcher->HasEvent());
2390   // XdsClient should have created an ADS stream.
2391   auto stream = WaitForAdsStream();
2392   ASSERT_TRUE(stream != nullptr);
2393   // XdsClient should have sent a subscription request on the ADS stream.
2394   auto request = WaitForRequest(stream.get());
2395   ASSERT_TRUE(request.has_value());
2396   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2397                /*version_info=*/"", /*response_nonce=*/"",
2398                /*error_detail=*/absl::OkStatus(),
2399                /*resource_names=*/{"foo1"});
2400   CheckRequestNode(*request);  // Should be present on the first request.
2401   // Transport reports connection failure.
2402   TriggerConnectionFailure(*xds_client_->bootstrap().servers().front(),
2403                            absl::UnavailableError("connection failed"));
2404   // XdsClient should report an error to the watcher.
2405   auto error = watcher->WaitForNextError();
2406   ASSERT_TRUE(error.has_value());
2407   EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
2408   EXPECT_EQ(error->message(),
2409             "xDS channel for server default_xds_server: "
2410             "connection failed (node ID:xds_client_test)")
2411       << *error;
2412   // Connection reported as unhappy.
2413   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2414                                           kDefaultXdsServerUrl, false)));
2415   // We should not see a resource-does-not-exist event, because the
2416   // timer should not be running while the channel is disconnected.
2417   EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4)));
2418   // Start a new watch.  This watcher should be given the same error,
2419   // since we have not yet recovered.
2420   auto watcher2 = StartFooWatch("foo1");
2421   error = watcher2->WaitForNextError();
2422   ASSERT_TRUE(error.has_value());
2423   EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
2424   EXPECT_EQ(error->message(),
2425             "xDS channel for server default_xds_server: "
2426             "connection failed (node ID:xds_client_test)")
2427       << *error;
2428   // Second watcher should not see resource-does-not-exist either.
2429   EXPECT_FALSE(watcher2->HasEvent());
2430   // The ADS stream uses wait_for_ready inside the XdsTransport interface,
2431   // so when the channel reconnects, the already-started stream will proceed.
2432   stream->CompleteSendMessageFromClient();
2433   // Server sends a response.
2434   stream->SendMessageToClient(
2435       ResponseBuilder(XdsFooResourceType::Get()->type_url())
2436           .set_version_info("1")
2437           .set_nonce("A")
2438           .AddFooResource(XdsFooResource("foo1", 6))
2439           .Serialize());
2440   // Connection now reported as happy.
2441   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2442                                           kDefaultXdsServerUrl, true)));
2443   // XdsClient should have delivered the response to the watchers.
2444   auto resource = watcher->WaitForNextResource();
2445   ASSERT_NE(resource, nullptr);
2446   EXPECT_EQ(resource->name, "foo1");
2447   EXPECT_EQ(resource->value, 6);
2448   resource = watcher2->WaitForNextResource();
2449   ASSERT_NE(resource, nullptr);
2450   EXPECT_EQ(resource->name, "foo1");
2451   EXPECT_EQ(resource->value, 6);
2452   // XdsClient should have sent an ACK message to the xDS server.
2453   request = WaitForRequest(stream.get());
2454   ASSERT_TRUE(request.has_value());
2455   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2456                /*version_info=*/"1", /*response_nonce=*/"A",
2457                /*error_detail=*/absl::OkStatus(),
2458                /*resource_names=*/{"foo1"});
2459   stream->CompleteSendMessageFromClient();
2460   // Cancel watches.
2461   CancelFooWatch(watcher.get(), "foo1");
2462   CancelFooWatch(watcher2.get(), "foo1");
2463   EXPECT_TRUE(stream->Orphaned());
2464 }
2465 
TEST_F(XdsClientTest,ResourceDoesNotExistUponTimeout)2466 TEST_F(XdsClientTest, ResourceDoesNotExistUponTimeout) {
2467   InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(1));
2468   // Start a watch for "foo1".
2469   auto watcher = StartFooWatch("foo1");
2470   // Watcher should initially not see any resource reported.
2471   EXPECT_FALSE(watcher->HasEvent());
2472   // Check metric data.
2473   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2474               ::testing::ElementsAre());
2475   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2476               ::testing::ElementsAre());
2477   EXPECT_THAT(GetResourceCounts(),
2478               ::testing::ElementsAre(::testing::Pair(
2479                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2480                                         XdsFooResourceType::Get()->type_url(),
2481                                         "requested"),
2482                   1)));
2483   // XdsClient should have created an ADS stream.
2484   auto stream = WaitForAdsStream();
2485   ASSERT_TRUE(stream != nullptr);
2486   // XdsClient should have sent a subscription request on the ADS stream.
2487   auto request = WaitForRequest(stream.get());
2488   ASSERT_TRUE(request.has_value());
2489   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2490                /*version_info=*/"", /*response_nonce=*/"",
2491                /*error_detail=*/absl::OkStatus(),
2492                /*resource_names=*/{"foo1"});
2493   CheckRequestNode(*request);  // Should be present on the first request.
2494   // Do not send a response, but wait for the resource to be reported as
2495   // not existing.
2496   EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(5)));
2497   // Check metric data.
2498   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2499               ::testing::ElementsAre());
2500   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2501               ::testing::ElementsAre());
2502   EXPECT_THAT(GetResourceCounts(),
2503               ::testing::ElementsAre(::testing::Pair(
2504                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2505                                         XdsFooResourceType::Get()->type_url(),
2506                                         "does_not_exist"),
2507                   1)));
2508   // Start a new watcher for the same resource.  It should immediately
2509   // receive the same does-not-exist notification.
2510   auto watcher2 = StartFooWatch("foo1");
2511   EXPECT_TRUE(watcher2->WaitForDoesNotExist(absl::Seconds(1)));
2512   // Now server sends a response.
2513   stream->SendMessageToClient(
2514       ResponseBuilder(XdsFooResourceType::Get()->type_url())
2515           .set_version_info("1")
2516           .set_nonce("A")
2517           .AddFooResource(XdsFooResource("foo1", 6))
2518           .Serialize());
2519   // XdsClient should have delivered the response to the watchers.
2520   auto resource = watcher->WaitForNextResource();
2521   ASSERT_NE(resource, nullptr);
2522   EXPECT_EQ(resource->name, "foo1");
2523   EXPECT_EQ(resource->value, 6);
2524   resource = watcher2->WaitForNextResource();
2525   ASSERT_NE(resource, nullptr);
2526   EXPECT_EQ(resource->name, "foo1");
2527   EXPECT_EQ(resource->value, 6);
2528   // Check metric data.
2529   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2530               ::testing::ElementsAre(::testing::Pair(
2531                   ::testing::Pair(kDefaultXdsServerUrl,
2532                                   XdsFooResourceType::Get()->type_url()),
2533                   1)));
2534   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2535               ::testing::ElementsAre());
2536   EXPECT_THAT(
2537       GetResourceCounts(),
2538       ::testing::ElementsAre(::testing::Pair(
2539           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2540                                 XdsFooResourceType::Get()->type_url(), "acked"),
2541           1)));
2542   // XdsClient should have sent an ACK message to the xDS server.
2543   request = WaitForRequest(stream.get());
2544   ASSERT_TRUE(request.has_value());
2545   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2546                /*version_info=*/"1", /*response_nonce=*/"A",
2547                /*error_detail=*/absl::OkStatus(),
2548                /*resource_names=*/{"foo1"});
2549   // Cancel watch.
2550   CancelFooWatch(watcher.get(), "foo1");
2551   CancelFooWatch(watcher2.get(), "foo1");
2552   EXPECT_TRUE(stream->Orphaned());
2553 }
2554 
TEST_F(XdsClientTest,ResourceDoesNotExistAfterStreamRestart)2555 TEST_F(XdsClientTest, ResourceDoesNotExistAfterStreamRestart) {
2556   // Lower resources-does-not-exist timeout so test finishes faster.
2557   InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3));
2558   // Metrics should initially be empty.
2559   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2560               ::testing::ElementsAre());
2561   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2562               ::testing::ElementsAre());
2563   EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre());
2564   // Start a watch for "foo1".
2565   auto watcher = StartFooWatch("foo1");
2566   // Check metric data.
2567   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2568               ::testing::ElementsAre());
2569   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2570               ::testing::ElementsAre());
2571   EXPECT_THAT(GetResourceCounts(),
2572               ::testing::ElementsAre(::testing::Pair(
2573                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2574                                         XdsFooResourceType::Get()->type_url(),
2575                                         "requested"),
2576                   1)));
2577   // Watcher should initially not see any resource reported.
2578   EXPECT_FALSE(watcher->HasEvent());
2579   // XdsClient should have created an ADS stream.
2580   auto stream = WaitForAdsStream();
2581   ASSERT_TRUE(stream != nullptr);
2582   // XdsClient should have sent a subscription request on the ADS stream.
2583   auto request = WaitForRequest(stream.get());
2584   ASSERT_TRUE(request.has_value());
2585   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2586                /*version_info=*/"", /*response_nonce=*/"",
2587                /*error_detail=*/absl::OkStatus(),
2588                /*resource_names=*/{"foo1"});
2589   CheckRequestNode(*request);  // Should be present on the first request.
2590   // Stream fails.
2591   stream->MaybeSendStatusToClient(absl::UnavailableError("ugh"));
2592   // XdsClient should report error to watcher.
2593   auto error = watcher->WaitForNextError();
2594   ASSERT_TRUE(error.has_value());
2595   EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
2596   EXPECT_EQ(error->message(),
2597             "xDS channel for server default_xds_server: xDS call failed "
2598             "with no responses received; status: UNAVAILABLE: ugh "
2599             "(node ID:xds_client_test)")
2600       << *error;
2601   // Check metric data.
2602   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2603               ::testing::ElementsAre());
2604   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2605               ::testing::ElementsAre());
2606   EXPECT_THAT(GetResourceCounts(),
2607               ::testing::ElementsAre(::testing::Pair(
2608                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2609                                         XdsFooResourceType::Get()->type_url(),
2610                                         "requested"),
2611                   1)));
2612   // XdsClient should create a new stream.
2613   stream = WaitForAdsStream();
2614   ASSERT_TRUE(stream != nullptr);
2615   // XdsClient sends a subscription request.
2616   request = WaitForRequest(stream.get());
2617   ASSERT_TRUE(request.has_value());
2618   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2619                /*version_info=*/"", /*response_nonce=*/"",
2620                /*error_detail=*/absl::OkStatus(),
2621                /*resource_names=*/{"foo1"});
2622   CheckRequestNode(*request);  // Should be present on the first request.
2623   // Server does NOT send a response immediately.
2624   // Client should receive a resource does-not-exist.
2625   ASSERT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(4)));
2626   // Check metric data.
2627   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2628               ::testing::ElementsAre());
2629   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2630               ::testing::ElementsAre());
2631   EXPECT_THAT(GetResourceCounts(),
2632               ::testing::ElementsAre(::testing::Pair(
2633                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2634                                         XdsFooResourceType::Get()->type_url(),
2635                                         "does_not_exist"),
2636                   1)));
2637   // Server now sends the requested resource.
2638   stream->SendMessageToClient(
2639       ResponseBuilder(XdsFooResourceType::Get()->type_url())
2640           .set_version_info("1")
2641           .set_nonce("A")
2642           .AddFooResource(XdsFooResource("foo1", 6))
2643           .Serialize());
2644   // The resource is delivered to the watcher.
2645   auto resource = watcher->WaitForNextResource();
2646   ASSERT_NE(resource, nullptr);
2647   EXPECT_EQ(resource->name, "foo1");
2648   EXPECT_EQ(resource->value, 6);
2649   // Check metric data.
2650   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2651               ::testing::ElementsAre(::testing::Pair(
2652                   ::testing::Pair(kDefaultXdsServerUrl,
2653                                   XdsFooResourceType::Get()->type_url()),
2654                   1)));
2655   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2656               ::testing::ElementsAre());
2657   EXPECT_THAT(
2658       GetResourceCounts(),
2659       ::testing::ElementsAre(::testing::Pair(
2660           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2661                                 XdsFooResourceType::Get()->type_url(), "acked"),
2662           1)));
2663   // XdsClient sends an ACK.
2664   request = WaitForRequest(stream.get());
2665   ASSERT_TRUE(request.has_value());
2666   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2667                /*version_info=*/"1", /*response_nonce=*/"A",
2668                /*error_detail=*/absl::OkStatus(),
2669                /*resource_names=*/{"foo1"});
2670   // Cancel watcher.
2671   CancelFooWatch(watcher.get(), "foo1");
2672   EXPECT_TRUE(stream->Orphaned());
2673 }
2674 
TEST_F(XdsClientTest,DoesNotExistTimerNotStartedUntilSendCompletes)2675 TEST_F(XdsClientTest, DoesNotExistTimerNotStartedUntilSendCompletes) {
2676   // Lower resources-does-not-exist timeout, to make sure that we're not
2677   // triggering that here.
2678   InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3));
2679   // Tell transport to let us manually trigger completion of the
2680   // send_message ops to XdsClient.
2681   transport_factory_->SetAutoCompleteMessagesFromClient(false);
2682   // Start a watch for "foo1".
2683   auto watcher = StartFooWatch("foo1");
2684   // Watcher should initially not see any resource reported.
2685   EXPECT_FALSE(watcher->HasEvent());
2686   // XdsClient should have created an ADS stream.
2687   auto stream = WaitForAdsStream();
2688   ASSERT_TRUE(stream != nullptr);
2689   // XdsClient should have sent a subscription request on the ADS stream.
2690   auto request = WaitForRequest(stream.get());
2691   ASSERT_TRUE(request.has_value());
2692   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2693                /*version_info=*/"", /*response_nonce=*/"",
2694                /*error_detail=*/absl::OkStatus(),
2695                /*resource_names=*/{"foo1"});
2696   CheckRequestNode(*request);  // Should be present on the first request.
2697   // Server does NOT send a response.
2698   // We should not see a resource-does-not-exist event, because the
2699   // timer should not be running while the channel is disconnected.
2700   EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4)));
2701   // Check metric data.
2702   EXPECT_THAT(GetResourceCounts(),
2703               ::testing::ElementsAre(::testing::Pair(
2704                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2705                                         XdsFooResourceType::Get()->type_url(),
2706                                         "requested"),
2707                   1)));
2708   // The ADS stream uses wait_for_ready inside the XdsTransport interface,
2709   // so when the channel connects, the already-started stream will proceed.
2710   stream->CompleteSendMessageFromClient();
2711   // Server does NOT send a response.
2712   // Watcher should see a does-not-exist event.
2713   EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(4)));
2714   // Check metric data.
2715   EXPECT_THAT(GetResourceCounts(),
2716               ::testing::ElementsAre(::testing::Pair(
2717                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2718                                         XdsFooResourceType::Get()->type_url(),
2719                                         "does_not_exist"),
2720                   1)));
2721   // Now server sends a response.
2722   stream->SendMessageToClient(
2723       ResponseBuilder(XdsFooResourceType::Get()->type_url())
2724           .set_version_info("1")
2725           .set_nonce("A")
2726           .AddFooResource(XdsFooResource("foo1", 6))
2727           .Serialize());
2728   // XdsClient should have delivered the response to the watcher.
2729   auto resource = watcher->WaitForNextResource();
2730   ASSERT_NE(resource, nullptr);
2731   EXPECT_EQ(resource->name, "foo1");
2732   EXPECT_EQ(resource->value, 6);
2733   // Check metric data.
2734   EXPECT_THAT(
2735       GetResourceCounts(),
2736       ::testing::ElementsAre(::testing::Pair(
2737           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2738                                 XdsFooResourceType::Get()->type_url(), "acked"),
2739           1)));
2740   // XdsClient should have sent an ACK message to the xDS server.
2741   request = WaitForRequest(stream.get());
2742   ASSERT_TRUE(request.has_value());
2743   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2744                /*version_info=*/"1", /*response_nonce=*/"A",
2745                /*error_detail=*/absl::OkStatus(),
2746                /*resource_names=*/{"foo1"});
2747   stream->CompleteSendMessageFromClient();
2748   // Cancel watch.
2749   CancelFooWatch(watcher.get(), "foo1");
2750   EXPECT_TRUE(stream->Orphaned());
2751 }
2752 
2753 // In https://github.com/grpc/grpc/issues/29583, we ran into a case
2754 // where we wound up starting a timer after we had already received the
2755 // resource, thus incorrectly reporting the resource as not existing.
2756 // This happened when unsubscribing and then resubscribing to the same
2757 // resource a send_message op was already in flight and then receiving an
2758 // update containing that resource.
TEST_F(XdsClientTest,ResourceDoesNotExistUnsubscribeAndResubscribeWhileSendMessagePending)2759 TEST_F(XdsClientTest,
2760        ResourceDoesNotExistUnsubscribeAndResubscribeWhileSendMessagePending) {
2761   InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(1));
2762   // Tell transport to let us manually trigger completion of the
2763   // send_message ops to XdsClient.
2764   transport_factory_->SetAutoCompleteMessagesFromClient(false);
2765   // Start a watch for "foo1".
2766   auto watcher = StartFooWatch("foo1");
2767   // Watcher should initially not see any resource reported.
2768   EXPECT_FALSE(watcher->HasEvent());
2769   // XdsClient should have created an ADS stream.
2770   auto stream = WaitForAdsStream();
2771   ASSERT_TRUE(stream != nullptr);
2772   // XdsClient should have sent a subscription request on the ADS stream.
2773   auto request = WaitForRequest(stream.get());
2774   ASSERT_TRUE(request.has_value());
2775   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2776                /*version_info=*/"", /*response_nonce=*/"",
2777                /*error_detail=*/absl::OkStatus(),
2778                /*resource_names=*/{"foo1"});
2779   CheckRequestNode(*request);  // Should be present on the first request.
2780   stream->CompleteSendMessageFromClient();
2781   // Server sends a response.
2782   stream->SendMessageToClient(
2783       ResponseBuilder(XdsFooResourceType::Get()->type_url())
2784           .set_version_info("1")
2785           .set_nonce("A")
2786           .AddFooResource(XdsFooResource("foo1", 6))
2787           .Serialize());
2788   // XdsClient should have delivered the response to the watchers.
2789   auto resource = watcher->WaitForNextResource();
2790   ASSERT_NE(resource, nullptr);
2791   EXPECT_EQ(resource->name, "foo1");
2792   EXPECT_EQ(resource->value, 6);
2793   // Check metric data.
2794   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2795               ::testing::ElementsAre(::testing::Pair(
2796                   ::testing::Pair(kDefaultXdsServerUrl,
2797                                   XdsFooResourceType::Get()->type_url()),
2798                   1)));
2799   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2800               ::testing::ElementsAre());
2801   EXPECT_THAT(
2802       GetResourceCounts(),
2803       ::testing::ElementsAre(::testing::Pair(
2804           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2805                                 XdsFooResourceType::Get()->type_url(), "acked"),
2806           1)));
2807   // XdsClient should have sent an ACK message to the xDS server.
2808   request = WaitForRequest(stream.get());
2809   ASSERT_TRUE(request.has_value());
2810   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2811                /*version_info=*/"1", /*response_nonce=*/"A",
2812                /*error_detail=*/absl::OkStatus(),
2813                /*resource_names=*/{"foo1"});
2814   stream->CompleteSendMessageFromClient();
2815   // Start a watch for a second resource.
2816   auto watcher2 = StartFooWatch("foo2");
2817   // Watcher should initially not see any resource reported.
2818   EXPECT_FALSE(watcher2->HasEvent());
2819   // Check metric data.
2820   EXPECT_THAT(
2821       GetResourceCounts(),
2822       ::testing::ElementsAre(
2823           ::testing::Pair(ResourceCountLabelsEq(
2824                               XdsClient::kOldStyleAuthority,
2825                               XdsFooResourceType::Get()->type_url(), "acked"),
2826                           1),
2827           ::testing::Pair(
2828               ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2829                                     XdsFooResourceType::Get()->type_url(),
2830                                     "requested"),
2831               1)));
2832   // XdsClient sends a request to subscribe to the new resource.
2833   request = WaitForRequest(stream.get());
2834   ASSERT_TRUE(request.has_value());
2835   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2836                /*version_info=*/"1", /*response_nonce=*/"A",
2837                /*error_detail=*/absl::OkStatus(),
2838                /*resource_names=*/{"foo1", "foo2"});
2839   // NOTE: We do NOT yet tell the XdsClient that the send_message op is
2840   // complete.
2841   // Unsubscribe from foo1 and then re-subscribe to it.
2842   CancelFooWatch(watcher.get(), "foo1");
2843   EXPECT_THAT(GetResourceCounts(),
2844               ::testing::ElementsAre(::testing::Pair(
2845                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2846                                         XdsFooResourceType::Get()->type_url(),
2847                                         "requested"),
2848                   1)));
2849   watcher = StartFooWatch("foo1");
2850   EXPECT_THAT(GetResourceCounts(),
2851               ::testing::ElementsAre(::testing::Pair(
2852                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2853                                         XdsFooResourceType::Get()->type_url(),
2854                                         "requested"),
2855                   2)));
2856   // Now send a response from the server containing both foo1 and foo2.
2857   stream->SendMessageToClient(
2858       ResponseBuilder(XdsFooResourceType::Get()->type_url())
2859           .set_version_info("1")
2860           .set_nonce("B")
2861           .AddFooResource(XdsFooResource("foo1", 6))
2862           .AddFooResource(XdsFooResource("foo2", 7))
2863           .Serialize());
2864   // The watcher for foo1 will receive an update even if the resource
2865   // has not changed, since the previous value was removed from the
2866   // cache when we unsubscribed.
2867   resource = watcher->WaitForNextResource();
2868   ASSERT_NE(resource, nullptr);
2869   EXPECT_EQ(resource->name, "foo1");
2870   EXPECT_EQ(resource->value, 6);
2871   // For foo2, the watcher should receive notification for the new resource.
2872   resource = watcher2->WaitForNextResource();
2873   ASSERT_NE(resource, nullptr);
2874   EXPECT_EQ(resource->name, "foo2");
2875   EXPECT_EQ(resource->value, 7);
2876   // Check metric data.
2877   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2878               ::testing::ElementsAre(::testing::Pair(
2879                   ::testing::Pair(kDefaultXdsServerUrl,
2880                                   XdsFooResourceType::Get()->type_url()),
2881                   3)));
2882   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2883               ::testing::ElementsAre());
2884   EXPECT_THAT(
2885       GetResourceCounts(),
2886       ::testing::ElementsAre(::testing::Pair(
2887           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2888                                 XdsFooResourceType::Get()->type_url(), "acked"),
2889           2)));
2890   // Now we finally tell XdsClient that its previous send_message op is
2891   // complete.
2892   stream->CompleteSendMessageFromClient();
2893   // XdsClient should send an ACK with the updated subscription list
2894   // (which happens to be identical to the old list), and it should not
2895   // restart the does-not-exist timer.
2896   request = WaitForRequest(stream.get());
2897   ASSERT_TRUE(request.has_value());
2898   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2899                /*version_info=*/"1", /*response_nonce=*/"B",
2900                /*error_detail=*/absl::OkStatus(),
2901                /*resource_names=*/{"foo1", "foo2"});
2902   stream->CompleteSendMessageFromClient();
2903   // Make sure the watcher for foo1 does not see a does-not-exist event.
2904   EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(5)));
2905   // Cancel watches.
2906   CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true);
2907   CancelFooWatch(watcher2.get(), "foo2");
2908   EXPECT_TRUE(stream->Orphaned());
2909 }
2910 
TEST_F(XdsClientTest,DoNotSendDoesNotExistForCachedResource)2911 TEST_F(XdsClientTest, DoNotSendDoesNotExistForCachedResource) {
2912   // Lower resources-does-not-exist timeout, to make sure that we're not
2913   // triggering that here.
2914   InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3));
2915   // Start a watch for "foo1".
2916   auto watcher = StartFooWatch("foo1");
2917   // Watcher should initially not see any resource reported.
2918   EXPECT_FALSE(watcher->HasEvent());
2919   // XdsClient should have created an ADS stream.
2920   auto stream = WaitForAdsStream();
2921   ASSERT_TRUE(stream != nullptr);
2922   // XdsClient should have sent a subscription request on the ADS stream.
2923   auto request = WaitForRequest(stream.get());
2924   ASSERT_TRUE(request.has_value());
2925   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2926                /*version_info=*/"", /*response_nonce=*/"",
2927                /*error_detail=*/absl::OkStatus(),
2928                /*resource_names=*/{"foo1"});
2929   CheckRequestNode(*request);  // Should be present on the first request.
2930   // Server sends a response.
2931   stream->SendMessageToClient(
2932       ResponseBuilder(XdsFooResourceType::Get()->type_url())
2933           .set_version_info("1")
2934           .set_nonce("A")
2935           .AddFooResource(XdsFooResource("foo1", 6))
2936           .Serialize());
2937   // XdsClient should have delivered the response to the watcher.
2938   auto resource = watcher->WaitForNextResource();
2939   ASSERT_NE(resource, nullptr);
2940   EXPECT_EQ(resource->name, "foo1");
2941   EXPECT_EQ(resource->value, 6);
2942   // Check metric data.
2943   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2944               ::testing::ElementsAre(::testing::Pair(
2945                   ::testing::Pair(kDefaultXdsServerUrl,
2946                                   XdsFooResourceType::Get()->type_url()),
2947                   1)));
2948   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2949               ::testing::ElementsAre());
2950   EXPECT_THAT(
2951       GetResourceCounts(),
2952       ::testing::ElementsAre(::testing::Pair(
2953           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2954                                 XdsFooResourceType::Get()->type_url(), "acked"),
2955           1)));
2956   // XdsClient should have sent an ACK message to the xDS server.
2957   request = WaitForRequest(stream.get());
2958   ASSERT_TRUE(request.has_value());
2959   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2960                /*version_info=*/"1", /*response_nonce=*/"A",
2961                /*error_detail=*/absl::OkStatus(),
2962                /*resource_names=*/{"foo1"});
2963   // Stream fails because of transport disconnection.
2964   stream->MaybeSendStatusToClient(absl::UnavailableError("connection failed"));
2965   // XdsClient should NOT report error to watcher, because we saw a
2966   // response on the stream before it failed.
2967   // XdsClient creates a new stream.
2968   stream = WaitForAdsStream();
2969   ASSERT_TRUE(stream != nullptr);
2970   // XdsClient should have sent a subscription request on the ADS stream.
2971   request = WaitForRequest(stream.get());
2972   ASSERT_TRUE(request.has_value());
2973   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2974                /*version_info=*/"1", /*response_nonce=*/"",
2975                /*error_detail=*/absl::OkStatus(),
2976                /*resource_names=*/{"foo1"});
2977   CheckRequestNode(*request);  // Should be present on the first request.
2978   // Server does NOT send a response.
2979   // We should not see a resource-does-not-exist event, because the
2980   // resource was already cached, so the server can optimize by not
2981   // resending it.
2982   EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4)));
2983   // Check metric data.
2984   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2985               ::testing::ElementsAre(::testing::Pair(
2986                   ::testing::Pair(kDefaultXdsServerUrl,
2987                                   XdsFooResourceType::Get()->type_url()),
2988                   1)));
2989   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2990               ::testing::ElementsAre());
2991   EXPECT_THAT(
2992       GetResourceCounts(),
2993       ::testing::ElementsAre(::testing::Pair(
2994           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2995                                 XdsFooResourceType::Get()->type_url(), "acked"),
2996           1)));
2997   // Now server sends a response.
2998   stream->SendMessageToClient(
2999       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3000           .set_version_info("1")
3001           .set_nonce("A")
3002           .AddFooResource(XdsFooResource("foo1", 6))
3003           .Serialize());
3004   // Watcher will not see any update, since the resource is unchanged.
3005   EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(1)));
3006   // Check metric data.
3007   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3008               ::testing::ElementsAre(::testing::Pair(
3009                   ::testing::Pair(kDefaultXdsServerUrl,
3010                                   XdsFooResourceType::Get()->type_url()),
3011                   2)));
3012   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3013               ::testing::ElementsAre());
3014   EXPECT_THAT(
3015       GetResourceCounts(),
3016       ::testing::ElementsAre(::testing::Pair(
3017           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3018                                 XdsFooResourceType::Get()->type_url(), "acked"),
3019           1)));
3020   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
3021                                           kDefaultXdsServerUrl, true)));
3022   // XdsClient should have sent an ACK message to the xDS server.
3023   request = WaitForRequest(stream.get());
3024   ASSERT_TRUE(request.has_value());
3025   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3026                /*version_info=*/"1", /*response_nonce=*/"A",
3027                /*error_detail=*/absl::OkStatus(),
3028                /*resource_names=*/{"foo1"});
3029   // Cancel watch.
3030   CancelFooWatch(watcher.get(), "foo1");
3031   EXPECT_TRUE(stream->Orphaned());
3032 }
3033 
TEST_F(XdsClientTest,ResourceWrappedInResourceMessage)3034 TEST_F(XdsClientTest, ResourceWrappedInResourceMessage) {
3035   InitXdsClient();
3036   // Start a watch for "foo1".
3037   auto watcher = StartFooWatch("foo1");
3038   // Watcher should initially not see any resource reported.
3039   EXPECT_FALSE(watcher->HasEvent());
3040   // XdsClient should have created an ADS stream.
3041   auto stream = WaitForAdsStream();
3042   ASSERT_TRUE(stream != nullptr);
3043   // XdsClient should have sent a subscription request on the ADS stream.
3044   auto request = WaitForRequest(stream.get());
3045   ASSERT_TRUE(request.has_value());
3046   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3047                /*version_info=*/"", /*response_nonce=*/"",
3048                /*error_detail=*/absl::OkStatus(),
3049                /*resource_names=*/{"foo1"});
3050   CheckRequestNode(*request);  // Should be present on the first request.
3051   // Send a response with the resource wrapped in a Resource message.
3052   stream->SendMessageToClient(
3053       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3054           .set_version_info("1")
3055           .set_nonce("A")
3056           .AddFooResource(XdsFooResource("foo1", 6),
3057                           /*in_resource_wrapper=*/true)
3058           .Serialize());
3059   // XdsClient should have delivered the response to the watcher.
3060   auto resource = watcher->WaitForNextResource();
3061   ASSERT_NE(resource, nullptr);
3062   EXPECT_EQ(resource->name, "foo1");
3063   EXPECT_EQ(resource->value, 6);
3064   // Check metric data.
3065   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3066               ::testing::ElementsAre(::testing::Pair(
3067                   ::testing::Pair(kDefaultXdsServerUrl,
3068                                   XdsFooResourceType::Get()->type_url()),
3069                   1)));
3070   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3071               ::testing::ElementsAre());
3072   EXPECT_THAT(
3073       GetResourceCounts(),
3074       ::testing::ElementsAre(::testing::Pair(
3075           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3076                                 XdsFooResourceType::Get()->type_url(), "acked"),
3077           1)));
3078   // XdsClient should have sent an ACK message to the xDS server.
3079   request = WaitForRequest(stream.get());
3080   ASSERT_TRUE(request.has_value());
3081   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3082                /*version_info=*/"1", /*response_nonce=*/"A",
3083                /*error_detail=*/absl::OkStatus(),
3084                /*resource_names=*/{"foo1"});
3085   // Cancel watch.
3086   CancelFooWatch(watcher.get(), "foo1");
3087   EXPECT_TRUE(stream->Orphaned());
3088 }
3089 
TEST_F(XdsClientTest,MultipleResourceTypes)3090 TEST_F(XdsClientTest, MultipleResourceTypes) {
3091   InitXdsClient();
3092   // Start a watch for "foo1".
3093   auto watcher = StartFooWatch("foo1");
3094   // Watcher should initially not see any resource reported.
3095   EXPECT_FALSE(watcher->HasEvent());
3096   // XdsClient should have created an ADS stream.
3097   auto stream = WaitForAdsStream();
3098   ASSERT_TRUE(stream != nullptr);
3099   // XdsClient should have sent a subscription request on the ADS stream.
3100   auto request = WaitForRequest(stream.get());
3101   ASSERT_TRUE(request.has_value());
3102   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3103                /*version_info=*/"", /*response_nonce=*/"",
3104                /*error_detail=*/absl::OkStatus(),
3105                /*resource_names=*/{"foo1"});
3106   CheckRequestNode(*request);  // Should be present on the first request.
3107   // Send a response.
3108   stream->SendMessageToClient(
3109       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3110           .set_version_info("1")
3111           .set_nonce("A")
3112           .AddFooResource(XdsFooResource("foo1", 6))
3113           .Serialize());
3114   // XdsClient should have delivered the response to the watcher.
3115   auto resource = watcher->WaitForNextResource();
3116   ASSERT_NE(resource, nullptr);
3117   EXPECT_EQ(resource->name, "foo1");
3118   EXPECT_EQ(resource->value, 6);
3119   // Check metric data.
3120   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3121               ::testing::ElementsAre(::testing::Pair(
3122                   ::testing::Pair(kDefaultXdsServerUrl,
3123                                   XdsFooResourceType::Get()->type_url()),
3124                   1)));
3125   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3126               ::testing::ElementsAre());
3127   EXPECT_THAT(
3128       GetResourceCounts(),
3129       ::testing::ElementsAre(::testing::Pair(
3130           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3131                                 XdsFooResourceType::Get()->type_url(), "acked"),
3132           1)));
3133   // XdsClient should have sent an ACK message to the xDS server.
3134   request = WaitForRequest(stream.get());
3135   ASSERT_TRUE(request.has_value());
3136   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3137                /*version_info=*/"1", /*response_nonce=*/"A",
3138                /*error_detail=*/absl::OkStatus(),
3139                /*resource_names=*/{"foo1"});
3140   // Start a watch for "bar1".
3141   auto watcher2 = StartBarWatch("bar1");
3142   // XdsClient should have sent a subscription request on the ADS stream.
3143   // Note that version and nonce here do NOT use the values for Foo,
3144   // since each resource type has its own state.
3145   request = WaitForRequest(stream.get());
3146   ASSERT_TRUE(request.has_value());
3147   CheckRequest(*request, XdsBarResourceType::Get()->type_url(),
3148                /*version_info=*/"", /*response_nonce=*/"",
3149                /*error_detail=*/absl::OkStatus(),
3150                /*resource_names=*/{"bar1"});
3151   // Send a response.
3152   stream->SendMessageToClient(
3153       ResponseBuilder(XdsBarResourceType::Get()->type_url())
3154           .set_version_info("2")
3155           .set_nonce("B")
3156           .AddBarResource(XdsBarResource("bar1", "whee"))
3157           .Serialize());
3158   // XdsClient should have delivered the response to the watcher.
3159   auto resource2 = watcher2->WaitForNextResource();
3160   ASSERT_NE(resource, nullptr);
3161   EXPECT_EQ(resource2->name, "bar1");
3162   EXPECT_EQ(resource2->value, "whee");
3163   // Check metric data.
3164   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3165               ::testing::ElementsAre(
3166                   ::testing::Pair(
3167                       ::testing::Pair(kDefaultXdsServerUrl,
3168                                       XdsBarResourceType::Get()->type_url()),
3169                       1),
3170                   ::testing::Pair(
3171                       ::testing::Pair(kDefaultXdsServerUrl,
3172                                       XdsFooResourceType::Get()->type_url()),
3173                       1)));
3174   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3175               ::testing::ElementsAre());
3176   EXPECT_THAT(
3177       GetResourceCounts(),
3178       ::testing::UnorderedElementsAre(
3179           ::testing::Pair(ResourceCountLabelsEq(
3180                               XdsClient::kOldStyleAuthority,
3181                               XdsBarResourceType::Get()->type_url(), "acked"),
3182                           1),
3183           ::testing::Pair(ResourceCountLabelsEq(
3184                               XdsClient::kOldStyleAuthority,
3185                               XdsFooResourceType::Get()->type_url(), "acked"),
3186                           1)));
3187   // XdsClient should have sent an ACK message to the xDS server.
3188   request = WaitForRequest(stream.get());
3189   ASSERT_TRUE(request.has_value());
3190   CheckRequest(*request, XdsBarResourceType::Get()->type_url(),
3191                /*version_info=*/"2", /*response_nonce=*/"B",
3192                /*error_detail=*/absl::OkStatus(),
3193                /*resource_names=*/{"bar1"});
3194   // Cancel watch for "foo1".
3195   CancelFooWatch(watcher.get(), "foo1");
3196   // XdsClient should send an unsubscription request.
3197   request = WaitForRequest(stream.get());
3198   ASSERT_TRUE(request.has_value());
3199   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3200                /*version_info=*/"1", /*response_nonce=*/"A",
3201                /*error_detail=*/absl::OkStatus(), /*resource_names=*/{});
3202   // Now cancel watch for "bar1".
3203   CancelBarWatch(watcher2.get(), "bar1");
3204   EXPECT_TRUE(stream->Orphaned());
3205 }
3206 
TEST_F(XdsClientTest,Federation)3207 TEST_F(XdsClientTest, Federation) {
3208   constexpr char kAuthority[] = "xds.example.com";
3209   const std::string kXdstpResourceName = absl::StrCat(
3210       "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2");
3211   FakeXdsBootstrap::FakeXdsServer authority_server("other_xds_server");
3212   FakeXdsBootstrap::FakeAuthority authority;
3213   authority.set_server(authority_server);
3214   InitXdsClient(
3215       FakeXdsBootstrap::Builder().AddAuthority(kAuthority, authority));
3216   // Metrics should initially be empty.
3217   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3218               ::testing::ElementsAre());
3219   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3220               ::testing::ElementsAre());
3221   EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre());
3222   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
3223   // Start a watch for "foo1".
3224   auto watcher = StartFooWatch("foo1");
3225   // Watcher should initially not see any resource reported.
3226   EXPECT_FALSE(watcher->HasEvent());
3227   // XdsClient should have created an ADS stream to the top-level xDS server.
3228   auto stream = WaitForAdsStream(*xds_client_->bootstrap().servers().front());
3229   ASSERT_TRUE(stream != nullptr);
3230   // XdsClient should have sent a subscription request on the ADS stream.
3231   auto request = WaitForRequest(stream.get());
3232   ASSERT_TRUE(request.has_value());
3233   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3234                /*version_info=*/"", /*response_nonce=*/"",
3235                /*error_detail=*/absl::OkStatus(),
3236                /*resource_names=*/{"foo1"});
3237   CheckRequestNode(*request);  // Should be present on the first request.
3238   // Send a response.
3239   stream->SendMessageToClient(
3240       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3241           .set_version_info("1")
3242           .set_nonce("A")
3243           .AddFooResource(XdsFooResource("foo1", 6))
3244           .Serialize());
3245   // XdsClient should have delivered the response to the watcher.
3246   auto resource = watcher->WaitForNextResource();
3247   ASSERT_NE(resource, nullptr);
3248   EXPECT_EQ(resource->name, "foo1");
3249   EXPECT_EQ(resource->value, 6);
3250   // Check metric data.
3251   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3252               ::testing::ElementsAre(::testing::Pair(
3253                   ::testing::Pair(kDefaultXdsServerUrl,
3254                                   XdsFooResourceType::Get()->type_url()),
3255                   1)));
3256   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3257               ::testing::ElementsAre());
3258   EXPECT_THAT(
3259       GetResourceCounts(),
3260       ::testing::ElementsAre(::testing::Pair(
3261           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3262                                 XdsFooResourceType::Get()->type_url(), "acked"),
3263           1)));
3264   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
3265                                           kDefaultXdsServerUrl, true)));
3266   // XdsClient should have sent an ACK message to the xDS server.
3267   request = WaitForRequest(stream.get());
3268   ASSERT_TRUE(request.has_value());
3269   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3270                /*version_info=*/"1", /*response_nonce=*/"A",
3271                /*error_detail=*/absl::OkStatus(),
3272                /*resource_names=*/{"foo1"});
3273   // Start a watch for the xdstp resource name.
3274   auto watcher2 = StartFooWatch(kXdstpResourceName);
3275   // Watcher should initially not see any resource reported.
3276   EXPECT_FALSE(watcher2->HasEvent());
3277   // Check metric data.
3278   EXPECT_THAT(
3279       GetResourceCounts(),
3280       ::testing::ElementsAre(
3281           ::testing::Pair(ResourceCountLabelsEq(
3282                               XdsClient::kOldStyleAuthority,
3283                               XdsFooResourceType::Get()->type_url(), "acked"),
3284                           1),
3285           ::testing::Pair(ResourceCountLabelsEq(
3286                               kAuthority, XdsFooResourceType::Get()->type_url(),
3287                               "requested"),
3288                           1)));
3289   EXPECT_THAT(GetServerConnections(),
3290               ::testing::ElementsAre(
3291                   ::testing::Pair(kDefaultXdsServerUrl, true),
3292                   ::testing::Pair(authority_server.server_uri(), true)));
3293   // XdsClient will create a new stream to the server for this authority.
3294   auto stream2 = WaitForAdsStream(authority_server);
3295   ASSERT_TRUE(stream2 != nullptr);
3296   // XdsClient should have sent a subscription request on the ADS stream.
3297   // Note that version and nonce here do NOT use the values for Foo,
3298   // since each authority has its own state.
3299   request = WaitForRequest(stream2.get());
3300   ASSERT_TRUE(request.has_value());
3301   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3302                /*version_info=*/"", /*response_nonce=*/"",
3303                /*error_detail=*/absl::OkStatus(),
3304                /*resource_names=*/{kXdstpResourceName});
3305   CheckRequestNode(*request);  // Should be present on the first request.
3306   // Send a response.
3307   stream2->SendMessageToClient(
3308       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3309           .set_version_info("2")
3310           .set_nonce("B")
3311           .AddFooResource(XdsFooResource(kXdstpResourceName, 3))
3312           .Serialize());
3313   // XdsClient should have delivered the response to the watcher.
3314   resource = watcher2->WaitForNextResource();
3315   ASSERT_NE(resource, nullptr);
3316   EXPECT_EQ(resource->name, kXdstpResourceName);
3317   EXPECT_EQ(resource->value, 3);
3318   // Check metric data.
3319   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3320               ::testing::ElementsAre(
3321                   ::testing::Pair(
3322                       ::testing::Pair(kDefaultXdsServerUrl,
3323                                       XdsFooResourceType::Get()->type_url()),
3324                       1),
3325                   ::testing::Pair(
3326                       ::testing::Pair(authority_server.server_uri(),
3327                                       XdsFooResourceType::Get()->type_url()),
3328                       1)));
3329   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3330               ::testing::ElementsAre());
3331   EXPECT_THAT(
3332       GetResourceCounts(),
3333       ::testing::ElementsAre(
3334           ::testing::Pair(ResourceCountLabelsEq(
3335                               XdsClient::kOldStyleAuthority,
3336                               XdsFooResourceType::Get()->type_url(), "acked"),
3337                           1),
3338           ::testing::Pair(
3339               ResourceCountLabelsEq(
3340                   kAuthority, XdsFooResourceType::Get()->type_url(), "acked"),
3341               1)));
3342   EXPECT_THAT(GetServerConnections(),
3343               ::testing::ElementsAre(
3344                   ::testing::Pair(kDefaultXdsServerUrl, true),
3345                   ::testing::Pair(authority_server.server_uri(), true)));
3346   // XdsClient should have sent an ACK message to the xDS server.
3347   request = WaitForRequest(stream2.get());
3348   ASSERT_TRUE(request.has_value());
3349   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3350                /*version_info=*/"2", /*response_nonce=*/"B",
3351                /*error_detail=*/absl::OkStatus(),
3352                /*resource_names=*/{kXdstpResourceName});
3353   // Cancel watch for "foo1".
3354   CancelFooWatch(watcher.get(), "foo1");
3355   EXPECT_TRUE(stream->Orphaned());
3356   // Now cancel watch for xdstp resource name.
3357   CancelFooWatch(watcher2.get(), kXdstpResourceName);
3358   EXPECT_TRUE(stream2->Orphaned());
3359 }
3360 
TEST_F(XdsClientTest,FederationAuthorityDefaultsToTopLevelXdsServer)3361 TEST_F(XdsClientTest, FederationAuthorityDefaultsToTopLevelXdsServer) {
3362   constexpr char kAuthority[] = "xds.example.com";
3363   const std::string kXdstpResourceName = absl::StrCat(
3364       "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2");
3365   // Authority does not specify any xDS servers, so XdsClient will use
3366   // the top-level xDS server in the bootstrap config for this authority.
3367   InitXdsClient(FakeXdsBootstrap::Builder().AddAuthority(
3368       kAuthority, FakeXdsBootstrap::FakeAuthority()));
3369   // Start a watch for "foo1".
3370   auto watcher = StartFooWatch("foo1");
3371   // Watcher should initially not see any resource reported.
3372   EXPECT_FALSE(watcher->HasEvent());
3373   // XdsClient should have created an ADS stream to the top-level xDS server.
3374   auto stream = WaitForAdsStream(*xds_client_->bootstrap().servers().front());
3375   ASSERT_TRUE(stream != nullptr);
3376   // XdsClient should have sent a subscription request on the ADS stream.
3377   auto request = WaitForRequest(stream.get());
3378   ASSERT_TRUE(request.has_value());
3379   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3380                /*version_info=*/"", /*response_nonce=*/"",
3381                /*error_detail=*/absl::OkStatus(),
3382                /*resource_names=*/{"foo1"});
3383   CheckRequestNode(*request);  // Should be present on the first request.
3384   // Send a response.
3385   stream->SendMessageToClient(
3386       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3387           .set_version_info("1")
3388           .set_nonce("A")
3389           .AddFooResource(XdsFooResource("foo1", 6))
3390           .Serialize());
3391   // XdsClient should have delivered the response to the watcher.
3392   auto resource = watcher->WaitForNextResource();
3393   ASSERT_NE(resource, nullptr);
3394   EXPECT_EQ(resource->name, "foo1");
3395   EXPECT_EQ(resource->value, 6);
3396   // Check metric data.
3397   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3398               ::testing::ElementsAre(::testing::Pair(
3399                   ::testing::Pair(kDefaultXdsServerUrl,
3400                                   XdsFooResourceType::Get()->type_url()),
3401                   1)));
3402   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3403               ::testing::ElementsAre());
3404   EXPECT_THAT(
3405       GetResourceCounts(),
3406       ::testing::ElementsAre(::testing::Pair(
3407           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3408                                 XdsFooResourceType::Get()->type_url(), "acked"),
3409           1)));
3410   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
3411                                           kDefaultXdsServerUrl, true)));
3412   // XdsClient should have sent an ACK message to the xDS server.
3413   request = WaitForRequest(stream.get());
3414   ASSERT_TRUE(request.has_value());
3415   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3416                /*version_info=*/"1", /*response_nonce=*/"A",
3417                /*error_detail=*/absl::OkStatus(),
3418                /*resource_names=*/{"foo1"});
3419   // Start a watch for the xdstp resource name.
3420   auto watcher2 = StartFooWatch(kXdstpResourceName);
3421   // Watcher should initially not see any resource reported.
3422   EXPECT_FALSE(watcher2->HasEvent());
3423   // XdsClient will send a subscription request on the ADS stream that
3424   // includes both resources, since both are being obtained from the
3425   // same server.
3426   request = WaitForRequest(stream.get());
3427   ASSERT_TRUE(request.has_value());
3428   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3429                /*version_info=*/"1", /*response_nonce=*/"A",
3430                /*error_detail=*/absl::OkStatus(),
3431                /*resource_names=*/{"foo1", kXdstpResourceName});
3432   // Send a response.
3433   stream->SendMessageToClient(
3434       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3435           .set_version_info("2")
3436           .set_nonce("B")
3437           .AddFooResource(XdsFooResource(kXdstpResourceName, 3))
3438           .Serialize());
3439   // XdsClient should have delivered the response to the watcher.
3440   resource = watcher2->WaitForNextResource();
3441   ASSERT_NE(resource, nullptr);
3442   EXPECT_EQ(resource->name, kXdstpResourceName);
3443   EXPECT_EQ(resource->value, 3);
3444   // Check metric data.
3445   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3446               ::testing::ElementsAre(::testing::Pair(
3447                   ::testing::Pair(kDefaultXdsServerUrl,
3448                                   XdsFooResourceType::Get()->type_url()),
3449                   2)));
3450   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3451               ::testing::ElementsAre());
3452   EXPECT_THAT(
3453       GetResourceCounts(),
3454       ::testing::ElementsAre(
3455           ::testing::Pair(ResourceCountLabelsEq(
3456                               XdsClient::kOldStyleAuthority,
3457                               XdsFooResourceType::Get()->type_url(), "acked"),
3458                           1),
3459           ::testing::Pair(
3460               ResourceCountLabelsEq(
3461                   kAuthority, XdsFooResourceType::Get()->type_url(), "acked"),
3462               1)));
3463   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
3464                                           kDefaultXdsServerUrl, true)));
3465   // XdsClient should have sent an ACK message to the xDS server.
3466   request = WaitForRequest(stream.get());
3467   ASSERT_TRUE(request.has_value());
3468   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3469                /*version_info=*/"2", /*response_nonce=*/"B",
3470                /*error_detail=*/absl::OkStatus(),
3471                /*resource_names=*/{"foo1", kXdstpResourceName});
3472   // Cancel watch for "foo1".
3473   CancelFooWatch(watcher.get(), "foo1");
3474   // XdsClient should send an unsubscription request.
3475   request = WaitForRequest(stream.get());
3476   ASSERT_TRUE(request.has_value());
3477   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3478                /*version_info=*/"2", /*response_nonce=*/"B",
3479                /*error_detail=*/absl::OkStatus(),
3480                /*resource_names=*/{kXdstpResourceName});
3481   // Now cancel watch for xdstp resource name.
3482   CancelFooWatch(watcher2.get(), kXdstpResourceName);
3483   EXPECT_TRUE(stream->Orphaned());
3484 }
3485 
TEST_F(XdsClientTest,FederationWithUnknownAuthority)3486 TEST_F(XdsClientTest, FederationWithUnknownAuthority) {
3487   constexpr char kAuthority[] = "xds.example.com";
3488   const std::string kXdstpResourceName = absl::StrCat(
3489       "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2");
3490   // Note: Not adding authority to bootstrap config.
3491   InitXdsClient();
3492   // Start a watch for the xdstp resource name.
3493   auto watcher = StartFooWatch(kXdstpResourceName);
3494   // Watcher should immediately get an error about the unknown authority.
3495   auto error = watcher->WaitForNextError();
3496   ASSERT_TRUE(error.has_value());
3497   EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
3498   EXPECT_EQ(error->message(),
3499             "authority \"xds.example.com\" not present in bootstrap config")
3500       << *error;
3501 }
3502 
TEST_F(XdsClientTest,FederationWithUnparseableXdstpResourceName)3503 TEST_F(XdsClientTest, FederationWithUnparseableXdstpResourceName) {
3504   // Note: Not adding authority to bootstrap config.
3505   InitXdsClient();
3506   // Start a watch for the xdstp resource name.
3507   auto watcher = StartFooWatch("xdstp://x");
3508   // Watcher should immediately get an error about the unknown authority.
3509   auto error = watcher->WaitForNextError();
3510   ASSERT_TRUE(error.has_value());
3511   EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
3512   EXPECT_EQ(error->message(), "Unable to parse resource name xdstp://x")
3513       << *error;
3514 }
3515 
3516 // TODO(roth,apolcyn): remove this test when the
3517 // GRPC_EXPERIMENTAL_XDS_FEDERATION env var is removed.
TEST_F(XdsClientTest,FederationDisabledWithNewStyleName)3518 TEST_F(XdsClientTest, FederationDisabledWithNewStyleName) {
3519   testing::ScopedEnvVar env_var("GRPC_EXPERIMENTAL_XDS_FEDERATION", "false");
3520   // We will use this xdstp name, whose authority is not present in
3521   // the bootstrap config.  But since federation is not enabled, we
3522   // will treat this as an opaque old-style name, so we'll send it to
3523   // the default server.
3524   constexpr char kXdstpResourceName[] =
3525       "xdstp://xds.example.com/test.v3.foo/foo1";
3526   InitXdsClient();
3527   // Start a watch for the xdstp name.
3528   auto watcher = StartFooWatch(kXdstpResourceName);
3529   // Watcher should initially not see any resource reported.
3530   EXPECT_FALSE(watcher->HasEvent());
3531   // XdsClient should have created an ADS stream.
3532   auto stream = WaitForAdsStream();
3533   ASSERT_TRUE(stream != nullptr);
3534   // XdsClient should have sent a subscription request on the ADS stream.
3535   auto request = WaitForRequest(stream.get());
3536   ASSERT_TRUE(request.has_value());
3537   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3538                /*version_info=*/"", /*response_nonce=*/"",
3539                /*error_detail=*/absl::OkStatus(),
3540                /*resource_names=*/{kXdstpResourceName});
3541   CheckRequestNode(*request);  // Should be present on the first request.
3542   // Send a response.
3543   stream->SendMessageToClient(
3544       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3545           .set_version_info("1")
3546           .set_nonce("A")
3547           .AddFooResource(XdsFooResource(kXdstpResourceName, 6))
3548           .Serialize());
3549   // XdsClient should have delivered the response to the watcher.
3550   auto resource = watcher->WaitForNextResource();
3551   ASSERT_NE(resource, nullptr);
3552   EXPECT_EQ(resource->name, kXdstpResourceName);
3553   EXPECT_EQ(resource->value, 6);
3554   // XdsClient should have sent an ACK message to the xDS server.
3555   request = WaitForRequest(stream.get());
3556   ASSERT_TRUE(request.has_value());
3557   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3558                /*version_info=*/"1", /*response_nonce=*/"A",
3559                /*error_detail=*/absl::OkStatus(),
3560                /*resource_names=*/{kXdstpResourceName});
3561   // Cancel watch.
3562   CancelFooWatch(watcher.get(), kXdstpResourceName);
3563   EXPECT_TRUE(stream->Orphaned());
3564 }
3565 
TEST_F(XdsClientTest,FederationChannelFailureReportedToWatchers)3566 TEST_F(XdsClientTest, FederationChannelFailureReportedToWatchers) {
3567   constexpr char kAuthority[] = "xds.example.com";
3568   const std::string kXdstpResourceName = absl::StrCat(
3569       "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2");
3570   FakeXdsBootstrap::FakeXdsServer authority_server("other_xds_server");
3571   FakeXdsBootstrap::FakeAuthority authority;
3572   authority.set_server(authority_server);
3573   InitXdsClient(
3574       FakeXdsBootstrap::Builder().AddAuthority(kAuthority, authority));
3575   // Start a watch for "foo1".
3576   auto watcher = StartFooWatch("foo1");
3577   // Watcher should initially not see any resource reported.
3578   EXPECT_FALSE(watcher->HasEvent());
3579   // XdsClient should have created an ADS stream to the top-level xDS server.
3580   auto stream = WaitForAdsStream(*xds_client_->bootstrap().servers().front());
3581   ASSERT_TRUE(stream != nullptr);
3582   // XdsClient should have sent a subscription request on the ADS stream.
3583   auto request = WaitForRequest(stream.get());
3584   ASSERT_TRUE(request.has_value());
3585   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3586                /*version_info=*/"", /*response_nonce=*/"",
3587                /*error_detail=*/absl::OkStatus(),
3588                /*resource_names=*/{"foo1"});
3589   CheckRequestNode(*request);  // Should be present on the first request.
3590   // Send a response.
3591   stream->SendMessageToClient(
3592       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3593           .set_version_info("1")
3594           .set_nonce("A")
3595           .AddFooResource(XdsFooResource("foo1", 6))
3596           .Serialize());
3597   // XdsClient should have delivered the response to the watcher.
3598   auto resource = watcher->WaitForNextResource();
3599   ASSERT_NE(resource, nullptr);
3600   EXPECT_EQ(resource->name, "foo1");
3601   EXPECT_EQ(resource->value, 6);
3602   // Check metric data.
3603   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3604               ::testing::ElementsAre(::testing::Pair(
3605                   ::testing::Pair(kDefaultXdsServerUrl,
3606                                   XdsFooResourceType::Get()->type_url()),
3607                   1)));
3608   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3609               ::testing::ElementsAre());
3610   EXPECT_THAT(
3611       GetResourceCounts(),
3612       ::testing::ElementsAre(::testing::Pair(
3613           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3614                                 XdsFooResourceType::Get()->type_url(), "acked"),
3615           1)));
3616   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
3617                                           kDefaultXdsServerUrl, true)));
3618   // XdsClient should have sent an ACK message to the xDS server.
3619   request = WaitForRequest(stream.get());
3620   ASSERT_TRUE(request.has_value());
3621   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3622                /*version_info=*/"1", /*response_nonce=*/"A",
3623                /*error_detail=*/absl::OkStatus(),
3624                /*resource_names=*/{"foo1"});
3625   // Start a watch for the xdstp resource name.
3626   auto watcher2 = StartFooWatch(kXdstpResourceName);
3627   // Check metric data.
3628   EXPECT_THAT(GetServerConnections(),
3629               ::testing::ElementsAre(
3630                   ::testing::Pair(kDefaultXdsServerUrl, true),
3631                   ::testing::Pair(authority_server.server_uri(), true)));
3632   // Watcher should initially not see any resource reported.
3633   EXPECT_FALSE(watcher2->HasEvent());
3634   // XdsClient will create a new stream to the server for this authority.
3635   auto stream2 = WaitForAdsStream(authority_server);
3636   ASSERT_TRUE(stream2 != nullptr);
3637   // XdsClient should have sent a subscription request on the ADS stream.
3638   // Note that version and nonce here do NOT use the values for Foo,
3639   // since each authority has its own state.
3640   request = WaitForRequest(stream2.get());
3641   ASSERT_TRUE(request.has_value());
3642   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3643                /*version_info=*/"", /*response_nonce=*/"",
3644                /*error_detail=*/absl::OkStatus(),
3645                /*resource_names=*/{kXdstpResourceName});
3646   CheckRequestNode(*request);  // Should be present on the first request.
3647   // Send a response.
3648   stream2->SendMessageToClient(
3649       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3650           .set_version_info("2")
3651           .set_nonce("B")
3652           .AddFooResource(XdsFooResource(kXdstpResourceName, 3))
3653           .Serialize());
3654   // XdsClient should have delivered the response to the watcher.
3655   resource = watcher2->WaitForNextResource();
3656   ASSERT_NE(resource, nullptr);
3657   EXPECT_EQ(resource->name, kXdstpResourceName);
3658   EXPECT_EQ(resource->value, 3);
3659   // Check metric data.
3660   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3661               ::testing::ElementsAre(
3662                   ::testing::Pair(
3663                       ::testing::Pair(kDefaultXdsServerUrl,
3664                                       XdsFooResourceType::Get()->type_url()),
3665                       1),
3666                   ::testing::Pair(
3667                       ::testing::Pair(authority_server.server_uri(),
3668                                       XdsFooResourceType::Get()->type_url()),
3669                       1)));
3670   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3671               ::testing::ElementsAre());
3672   EXPECT_THAT(
3673       GetResourceCounts(),
3674       ::testing::ElementsAre(
3675           ::testing::Pair(ResourceCountLabelsEq(
3676                               XdsClient::kOldStyleAuthority,
3677                               XdsFooResourceType::Get()->type_url(), "acked"),
3678                           1),
3679           ::testing::Pair(
3680               ResourceCountLabelsEq(
3681                   kAuthority, XdsFooResourceType::Get()->type_url(), "acked"),
3682               1)));
3683   EXPECT_THAT(GetServerConnections(),
3684               ::testing::ElementsAre(
3685                   ::testing::Pair(kDefaultXdsServerUrl, true),
3686                   ::testing::Pair(authority_server.server_uri(), true)));
3687   // XdsClient should have sent an ACK message to the xDS server.
3688   request = WaitForRequest(stream2.get());
3689   ASSERT_TRUE(request.has_value());
3690   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3691                /*version_info=*/"2", /*response_nonce=*/"B",
3692                /*error_detail=*/absl::OkStatus(),
3693                /*resource_names=*/{kXdstpResourceName});
3694   // Now cause a channel failure on the stream to the authority's xDS server.
3695   TriggerConnectionFailure(authority_server,
3696                            absl::UnavailableError("connection failed"));
3697   // The watcher for the xdstp resource name should see the error.
3698   auto error = watcher2->WaitForNextError();
3699   ASSERT_TRUE(error.has_value());
3700   EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
3701   EXPECT_EQ(error->message(),
3702             "xDS channel for server other_xds_server: connection failed "
3703             "(node ID:xds_client_test)")
3704       << *error;
3705   // The watcher for "foo1" should not see any error.
3706   EXPECT_FALSE(watcher->HasEvent());
3707   // Check metric data.
3708   EXPECT_THAT(GetServerConnections(),
3709               ::testing::ElementsAre(
3710                   ::testing::Pair(kDefaultXdsServerUrl, true),
3711                   ::testing::Pair(authority_server.server_uri(), false)));
3712   // Cancel watch for "foo1".
3713   CancelFooWatch(watcher.get(), "foo1");
3714   EXPECT_TRUE(stream->Orphaned());
3715   // Now cancel watch for xdstp resource name.
3716   CancelFooWatch(watcher2.get(), kXdstpResourceName);
3717   EXPECT_TRUE(stream2->Orphaned());
3718 }
3719 
TEST_F(XdsClientTest,AdsReadWaitsForHandleRelease)3720 TEST_F(XdsClientTest, AdsReadWaitsForHandleRelease) {
3721   const absl::Duration timeout = absl::Seconds(5) * grpc_test_slowdown_factor();
3722   InitXdsClient();
3723   // Start watches for "foo1" and "foo2".
3724   auto watcher1 = StartFooWatch("foo1");
3725   // XdsClient should have created an ADS stream.
3726   auto stream = WaitForAdsStream();
3727   ASSERT_TRUE(stream != nullptr);
3728   // XdsClient should have sent a subscription request on the ADS stream.
3729   auto request = WaitForRequest(stream.get());
3730   ASSERT_TRUE(request.has_value());
3731   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3732                /*version_info=*/"", /*response_nonce=*/"",
3733                /*error_detail=*/absl::OkStatus(),
3734                /*resource_names=*/{"foo1"});
3735   auto watcher2 = StartFooWatch("foo2");
3736   request = WaitForRequest(stream.get());
3737   ASSERT_TRUE(request.has_value());
3738   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3739                /*version_info=*/"", /*response_nonce=*/"",
3740                /*error_detail=*/absl::OkStatus(),
3741                /*resource_names=*/{"foo1", "foo2"});
3742   // Send a response with 2 resources.
3743   stream->SendMessageToClient(
3744       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3745           .set_version_info("1")
3746           .set_nonce("A")
3747           .AddFooResource(XdsFooResource("foo1", 6))
3748           .AddFooResource(XdsFooResource("foo2", 10))
3749           .Serialize());
3750   // Send a response with a single resource, will not be read until the handle
3751   // is released
3752   stream->SendMessageToClient(
3753       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3754           .set_version_info("2")
3755           .set_nonce("B")
3756           .AddFooResource(XdsFooResource("foo1", 8))
3757           .Serialize());
3758   // XdsClient should have delivered the response to the watcher.
3759   auto resource1 = watcher1->WaitForNextResourceAndHandle();
3760   ASSERT_NE(resource1, absl::nullopt);
3761   EXPECT_EQ(resource1->resource->name, "foo1");
3762   EXPECT_EQ(resource1->resource->value, 6);
3763   auto resource2 = watcher2->WaitForNextResourceAndHandle();
3764   ASSERT_NE(resource2, absl::nullopt);
3765   EXPECT_EQ(resource2->resource->name, "foo2");
3766   EXPECT_EQ(resource2->resource->value, 10);
3767   // XdsClient should have sent an ACK message to the xDS server.
3768   request = WaitForRequest(stream.get());
3769   ASSERT_TRUE(request.has_value());
3770   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3771                /*version_info=*/"1", /*response_nonce=*/"A",
3772                /*error_detail=*/absl::OkStatus(),
3773                /*resource_names=*/{"foo1", "foo2"});
3774   EXPECT_TRUE(stream->WaitForReadsStarted(1, timeout));
3775   resource1->read_delay_handle.reset();
3776   EXPECT_TRUE(stream->WaitForReadsStarted(1, timeout));
3777   resource2->read_delay_handle.reset();
3778   EXPECT_TRUE(stream->WaitForReadsStarted(2, timeout));
3779   resource1 = watcher1->WaitForNextResourceAndHandle();
3780   ASSERT_NE(resource1, absl::nullopt);
3781   EXPECT_EQ(resource1->resource->name, "foo1");
3782   EXPECT_EQ(resource1->resource->value, 8);
3783   EXPECT_EQ(watcher2->WaitForNextResourceAndHandle(), absl::nullopt);
3784   // XdsClient should have sent an ACK message to the xDS server.
3785   request = WaitForRequest(stream.get());
3786   ASSERT_TRUE(request.has_value());
3787   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3788                /*version_info=*/"2", /*response_nonce=*/"B",
3789                /*error_detail=*/absl::OkStatus(),
3790                /*resource_names=*/{"foo1", "foo2"});
3791   EXPECT_TRUE(stream->WaitForReadsStarted(2, timeout));
3792   resource1->read_delay_handle.reset();
3793   EXPECT_TRUE(stream->WaitForReadsStarted(3, timeout));
3794   // Cancel watch.
3795   CancelFooWatch(watcher1.get(), "foo1");
3796   request = WaitForRequest(stream.get());
3797   ASSERT_TRUE(request.has_value());
3798   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3799                /*version_info=*/"2", /*response_nonce=*/"B",
3800                /*error_detail=*/absl::OkStatus(),
3801                /*resource_names=*/{"foo2"});
3802   CancelFooWatch(watcher2.get(), "foo2");
3803   EXPECT_TRUE(stream->Orphaned());
3804 }
3805 
TEST_F(XdsClientTest,FallbackAndRecover)3806 TEST_F(XdsClientTest, FallbackAndRecover) {
3807   FakeXdsBootstrap::FakeXdsServer primary_server(kDefaultXdsServerUrl);
3808   FakeXdsBootstrap::FakeXdsServer fallback_server("fallback_xds_server");
3809   // Regular operation
3810   InitXdsClient(FakeXdsBootstrap::Builder().SetServers(
3811       {primary_server, fallback_server}));
3812   // Start a watch for "foo1".
3813   auto watcher = StartFooWatch("foo1");
3814   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
3815                                           kDefaultXdsServerUrl, true)));
3816   EXPECT_THAT(GetResourceCounts(),
3817               ::testing::ElementsAre(::testing::Pair(
3818                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3819                                         XdsFooResourceType::Get()->type_url(),
3820                                         "requested"),
3821                   1)));
3822   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3823               ::testing::IsEmpty());
3824   // XdsClient should have created an ADS stream.
3825   auto stream = WaitForAdsStream();
3826   ASSERT_TRUE(stream != nullptr);
3827   // XdsClient should have sent a subscription request on the ADS stream.
3828   auto request = WaitForRequest(stream.get());
3829   ASSERT_TRUE(request.has_value());
3830   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3831                /*version_info=*/"", /*response_nonce=*/"",
3832                /*error_detail=*/absl::OkStatus(),
3833                /*resource_names=*/{"foo1"});
3834   // Input: Get initial response from primary server.
3835   stream->SendMessageToClient(
3836       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3837           .set_version_info("20")
3838           .set_nonce("O")
3839           .AddFooResource(XdsFooResource("foo1", 6))
3840           .Serialize());
3841   // Result (local): Resource is delivered to watcher.
3842   auto resource = watcher->WaitForNextResource();
3843   ASSERT_NE(resource, nullptr);
3844   EXPECT_EQ(resource->name, "foo1");
3845   EXPECT_EQ(resource->value, 6);
3846   // Result (local): Metrics show 1 resource update and 1 cached resource.
3847   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3848               ::testing::ElementsAre(::testing::Pair(
3849                   ::testing::Pair(kDefaultXdsServerUrl,
3850                                   XdsFooResourceType::Get()->type_url()),
3851                   1)));
3852   EXPECT_THAT(
3853       GetResourceCounts(),
3854       ::testing::ElementsAre(::testing::Pair(
3855           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3856                                 XdsFooResourceType::Get()->type_url(), "acked"),
3857           1)));
3858   // Result (remote): Client sends ACK to server.
3859   request = WaitForRequest(stream.get());
3860   ASSERT_TRUE(request.has_value());
3861   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3862                /*version_info=*/"20", /*response_nonce=*/"O",
3863                /*error_detail=*/absl::OkStatus(),
3864                /*resource_names=*/{"foo1"});
3865   // Input: Trigger connection failure to primary.
3866   TriggerConnectionFailure(primary_server,
3867                            absl::UnavailableError("Server down"));
3868   // Result (local): The error is reported to the watcher.
3869   auto error = watcher->WaitForNextError();
3870   ASSERT_TRUE(error.has_value());
3871   EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
3872   EXPECT_EQ(error->message(),
3873             "xDS channel for server default_xds_server: Server down (node "
3874             "ID:xds_client_test)");
3875   // Result (local): The metrics show the channel as being unhealthy.
3876   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
3877                                           kDefaultXdsServerUrl, false)));
3878   // Input: Trigger stream failure.
3879   stream->MaybeSendStatusToClient(absl::UnavailableError("Stream failure"));
3880   // Result (local): The metrics still show the channel as being unhealthy.
3881   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
3882                                           kDefaultXdsServerUrl, false)));
3883   // Result (remote): The client starts a new stream and sends a subscription
3884   //   message. Note that the server does not respond, so the channel will still
3885   //   have non-OK status.
3886   stream = WaitForAdsStream();
3887   ASSERT_NE(stream, nullptr);
3888   request = WaitForRequest(stream.get());
3889   ASSERT_TRUE(request.has_value());
3890   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3891                /*version_info=*/"20", /*response_nonce=*/"",
3892                /*error_detail=*/absl::OkStatus(),
3893                /*resource_names=*/{"foo1"});
3894   // Input: Start second watch for foo1 (already cached).
3895   auto watcher_cached = StartFooWatch("foo1");
3896   // Result (local): New watcher gets the cached resource.
3897   resource = watcher_cached->WaitForNextResource();
3898   ASSERT_NE(resource, nullptr);
3899   EXPECT_EQ(resource->name, "foo1");
3900   EXPECT_EQ(resource->value, 6);
3901   // Result (local): New watcher gets the error from the channel state.
3902   error = watcher_cached->WaitForNextError();
3903   ASSERT_TRUE(error.has_value());
3904   EXPECT_EQ(error->message(),
3905             "xDS channel for server default_xds_server: Server down (node "
3906             "ID:xds_client_test)")
3907       << error->message();
3908   CancelFooWatch(watcher_cached.get(), "foo1");
3909   // Input: Start watch for foo2 (not already cached).
3910   auto watcher2 = StartFooWatch("foo2");
3911   // Result (local): Metrics show a healthy channel to the fallback server.
3912   EXPECT_THAT(GetServerConnections(),
3913               ::testing::ElementsAre(
3914                   ::testing::Pair(kDefaultXdsServerUrl, false),
3915                   ::testing::Pair(fallback_server.server_uri(), true)));
3916   // Result (remote): Client sent a new request for both resources on the
3917   //   stream to the primary.
3918   request = WaitForRequest(stream.get());
3919   ASSERT_TRUE(request.has_value());
3920   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3921                /*version_info=*/"20", /*response_nonce=*/"",
3922                /*error_detail=*/absl::OkStatus(),
3923                /*resource_names=*/{"foo1", "foo2"});
3924   // Result (remote): Client created a stream to the fallback server and sent a
3925   //   request on that stream for both resources.
3926   auto stream2 = WaitForAdsStream(fallback_server);
3927   ASSERT_TRUE(stream2 != nullptr);
3928   request = WaitForRequest(stream2.get());
3929   ASSERT_TRUE(request.has_value());
3930   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3931                /*version_info=*/"", /*response_nonce=*/"",
3932                /*error_detail=*/absl::OkStatus(),
3933                /*resource_names=*/{"foo1", "foo2"});
3934   // Input: Fallback server sends a response with both resources.
3935   stream2->SendMessageToClient(
3936       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3937           .set_version_info("5")
3938           .set_nonce("A")
3939           .AddFooResource(XdsFooResource("foo1", 20))
3940           .AddFooResource(XdsFooResource("foo2", 30))
3941           .Serialize());
3942   // Result (local): Resources are delivered to watchers.
3943   resource = watcher->WaitForNextResource();
3944   ASSERT_NE(resource, nullptr);
3945   EXPECT_EQ(resource->name, "foo1");
3946   EXPECT_EQ(resource->value, 20);
3947   resource = watcher2->WaitForNextResource();
3948   ASSERT_NE(resource, nullptr);
3949   EXPECT_EQ(resource->name, "foo2");
3950   EXPECT_EQ(resource->value, 30);
3951   // Result (local): Metrics show an update from fallback server.
3952   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3953               ::testing::ElementsAre(
3954                   ::testing::Pair(
3955                       ::testing::Pair(kDefaultXdsServerUrl,
3956                                       XdsFooResourceType::Get()->type_url()),
3957                       1),
3958                   ::testing::Pair(
3959                       ::testing::Pair(fallback_server.server_uri(),
3960                                       XdsFooResourceType::Get()->type_url()),
3961                       2)));
3962   EXPECT_THAT(GetServerConnections(),
3963               ::testing::ElementsAre(
3964                   ::testing::Pair(kDefaultXdsServerUrl, false),
3965                   ::testing::Pair(fallback_server.server_uri(), true)));
3966   EXPECT_THAT(
3967       GetResourceCounts(),
3968       ::testing::ElementsAre(::testing::Pair(
3969           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3970                                 XdsFooResourceType::Get()->type_url(), "acked"),
3971           2)));
3972   // Result (remote): Client sends ACK to fallback server.
3973   request = WaitForRequest(stream2.get());
3974   ASSERT_TRUE(request.has_value());
3975   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3976                /*version_info=*/"5", /*response_nonce=*/"A",
3977                /*error_detail=*/absl::OkStatus(),
3978                /*resource_names=*/{"foo1", "foo2"});
3979   // Input: Primary server sends a response containing both resources.
3980   stream->SendMessageToClient(
3981       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3982           .set_version_info("15")
3983           .set_nonce("B")
3984           .AddFooResource(XdsFooResource("foo1", 35))
3985           .AddFooResource(XdsFooResource("foo2", 25))
3986           .Serialize());
3987   // Result (local): Metrics show that we've closed the channel to the fallback
3988   //   server and received resource updates from the primary server.
3989   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3990               ::testing::ElementsAre(
3991                   ::testing::Pair(
3992                       ::testing::Pair(kDefaultXdsServerUrl,
3993                                       XdsFooResourceType::Get()->type_url()),
3994                       3),
3995                   ::testing::Pair(
3996                       ::testing::Pair(fallback_server.server_uri(),
3997                                       XdsFooResourceType::Get()->type_url()),
3998                       2)));
3999   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4000                                           kDefaultXdsServerUrl, true)));
4001   // Result (remote): The stream to the fallback server has been orphaned.
4002   EXPECT_TRUE(stream2->Orphaned());
4003   // Result (local): Resources are delivered to watchers.
4004   resource = watcher->WaitForNextResource();
4005   ASSERT_NE(resource, nullptr);
4006   EXPECT_EQ(resource->name, "foo1");
4007   EXPECT_EQ(resource->value, 35);
4008   resource = watcher2->WaitForNextResource();
4009   ASSERT_NE(resource, nullptr);
4010   EXPECT_EQ(resource->name, "foo2");
4011   EXPECT_EQ(resource->value, 25);
4012   // Result (remote): Client sends ACK to server.
4013   request = WaitForRequest(stream.get());
4014   ASSERT_TRUE(request.has_value());
4015   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4016                /*version_info=*/"15", /*response_nonce=*/"B",
4017                /*error_detail=*/absl::OkStatus(),
4018                /*resource_names=*/{"foo1", "foo2"});
4019   // Clean up.
4020   CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true);
4021   CancelFooWatch(watcher2.get(), "foo2");
4022   // Result (remote): The stream to the primary server has been orphaned.
4023   EXPECT_TRUE(stream->Orphaned());
4024 }
4025 
4026 // Test for both servers being unavailable
TEST_F(XdsClientTest,FallbackReportsError)4027 TEST_F(XdsClientTest, FallbackReportsError) {
4028   FakeXdsBootstrap::FakeXdsServer primary_server(kDefaultXdsServerUrl);
4029   FakeXdsBootstrap::FakeXdsServer fallback_server("fallback_xds_server");
4030   InitXdsClient(FakeXdsBootstrap::Builder().SetServers(
4031       {primary_server, fallback_server}));
4032   auto watcher = StartFooWatch("foo1");
4033   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4034                                           kDefaultXdsServerUrl, true)));
4035   auto stream = WaitForAdsStream();
4036   ASSERT_TRUE(stream != nullptr);
4037   // XdsClient should have sent a subscription request on the ADS stream.
4038   auto request = WaitForRequest(stream.get());
4039   ASSERT_TRUE(request.has_value());
4040   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4041                /*version_info=*/"", /*response_nonce=*/"",
4042                /*error_detail=*/absl::OkStatus(),
4043                /*resource_names=*/{"foo1"});
4044   EXPECT_THAT(GetResourceCounts(),
4045               ::testing::ElementsAre(::testing::Pair(
4046                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
4047                                         XdsFooResourceType::Get()->type_url(),
4048                                         "requested"),
4049                   1)));
4050   TriggerConnectionFailure(primary_server,
4051                            absl::UnavailableError("Server down"));
4052   EXPECT_THAT(GetServerConnections(),
4053               ::testing::ElementsAre(
4054                   ::testing::Pair(kDefaultXdsServerUrl, false),
4055                   ::testing::Pair(fallback_server.server_uri(), true)));
4056   // Fallback happens now
4057   stream = WaitForAdsStream(fallback_server);
4058   ASSERT_NE(stream, nullptr);
4059   request = WaitForRequest(stream.get());
4060   ASSERT_TRUE(request.has_value());
4061   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4062                /*version_info=*/"", /*response_nonce=*/"",
4063                /*error_detail=*/absl::OkStatus(),
4064                /*resource_names=*/{"foo1"});
4065   TriggerConnectionFailure(fallback_server,
4066                            absl::UnavailableError("Another server down"));
4067   EXPECT_THAT(GetServerConnections(),
4068               ::testing::ElementsAre(
4069                   ::testing::Pair(kDefaultXdsServerUrl, false),
4070                   ::testing::Pair(fallback_server.server_uri(), false)));
4071   auto error = watcher->WaitForNextError();
4072   ASSERT_TRUE(error.has_value());
4073   EXPECT_THAT(error->code(), absl::StatusCode::kUnavailable);
4074   EXPECT_EQ(error->message(),
4075             "xDS channel for server fallback_xds_server: Another server down "
4076             "(node ID:xds_client_test)")
4077       << error->message();
4078 }
4079 
TEST_F(XdsClientTest,FallbackOnStartup)4080 TEST_F(XdsClientTest, FallbackOnStartup) {
4081   FakeXdsBootstrap::FakeXdsServer primary_server;
4082   FakeXdsBootstrap::FakeXdsServer fallback_server("fallback_xds_server");
4083   // Regular operation
4084   InitXdsClient(FakeXdsBootstrap::Builder().SetServers(
4085       {primary_server, fallback_server}));
4086   // Start a watch for "foo1".
4087   auto watcher = StartFooWatch("foo1");
4088   auto primary_stream = WaitForAdsStream(primary_server);
4089   ASSERT_NE(primary_stream, nullptr);
4090   // XdsClient should have sent a subscription request on the ADS stream.
4091   auto request = WaitForRequest(primary_stream.get());
4092   ASSERT_TRUE(request.has_value());
4093   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4094                /*version_info=*/"", /*response_nonce=*/"",
4095                /*error_detail=*/absl::OkStatus(),
4096                /*resource_names=*/{"foo1"});
4097   TriggerConnectionFailure(primary_server,
4098                            absl::UnavailableError("Primary server is down"));
4099   // XdsClient should have created an ADS stream.
4100   auto fallback_stream = WaitForAdsStream(fallback_server);
4101   ASSERT_NE(fallback_stream, nullptr);
4102   // XdsClient should have sent a subscription request on the ADS stream.
4103   request = WaitForRequest(fallback_stream.get());
4104   ASSERT_TRUE(request.has_value());
4105   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4106                /*version_info=*/"", /*response_nonce=*/"",
4107                /*error_detail=*/absl::OkStatus(),
4108                /*resource_names=*/{"foo1"});
4109   // Send a response.
4110   fallback_stream->SendMessageToClient(
4111       ResponseBuilder(XdsFooResourceType::Get()->type_url())
4112           .set_version_info("1")
4113           .set_nonce("A")
4114           .AddFooResource(XdsFooResource("foo1", 6))
4115           .Serialize());
4116   EXPECT_THAT(GetServerConnections(),
4117               ::testing::ElementsAre(
4118                   ::testing::Pair(kDefaultXdsServerUrl, false),
4119                   ::testing::Pair(fallback_server.server_uri(), true)));
4120   // XdsClient should have delivered the response to the watcher.
4121   auto resource = watcher->WaitForNextResource();
4122   ASSERT_NE(resource, nullptr);
4123   EXPECT_EQ(resource->name, "foo1");
4124   EXPECT_EQ(resource->value, 6);
4125   // Client sends an ACK.
4126   request = WaitForRequest(fallback_stream.get());
4127   ASSERT_TRUE(request.has_value());
4128   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4129                /*version_info=*/"1", /*response_nonce=*/"A",
4130                /*error_detail=*/absl::OkStatus(),
4131                /*resource_names=*/{"foo1"});
4132   // Recover to primary
4133   primary_stream->SendMessageToClient(
4134       ResponseBuilder(XdsFooResourceType::Get()->type_url())
4135           .set_version_info("5")
4136           .set_nonce("D")
4137           .AddFooResource(XdsFooResource("foo1", 42))
4138           .Serialize());
4139   EXPECT_TRUE(fallback_stream->Orphaned());
4140   resource = watcher->WaitForNextResource();
4141   ASSERT_NE(resource, nullptr);
4142   EXPECT_EQ(resource->name, "foo1");
4143   EXPECT_EQ(resource->value, 42);
4144   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4145                                           kDefaultXdsServerUrl, true)));
4146   request = WaitForRequest(primary_stream.get());
4147   ASSERT_TRUE(request.has_value());
4148   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4149                /*version_info=*/"5", /*response_nonce=*/"D",
4150                /*error_detail=*/absl::OkStatus(),
4151                /*resource_names=*/{"foo1"});
4152 }
4153 
4154 }  // namespace
4155 }  // namespace testing
4156 }  // namespace grpc_core
4157 
main(int argc,char ** argv)4158 int main(int argc, char** argv) {
4159   ::testing::InitGoogleTest(&argc, argv);
4160   grpc::testing::TestEnvironment env(&argc, argv);
4161   grpc_init();
4162   int ret = RUN_ALL_TESTS();
4163   grpc_shutdown();
4164   return ret;
4165 }
4166