1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 // TODO(roth): Add the following tests:
18 // - tests for DumpClientConfigBinary()
19 // - tests for load-reporting APIs? (or maybe move those out of XdsClient?)
20
21 #include "src/core/ext/xds/xds_client.h"
22
23 #include <stdint.h>
24
25 #include <algorithm>
26 #include <deque>
27 #include <fstream>
28 #include <map>
29 #include <memory>
30 #include <optional>
31 #include <string>
32 #include <vector>
33
34 #include <google/protobuf/any.pb.h>
35 #include <google/protobuf/struct.pb.h>
36
37 #include "absl/strings/str_cat.h"
38 #include "absl/time/time.h"
39 #include "absl/types/optional.h"
40 #include "absl/types/variant.h"
41 #include "gmock/gmock.h"
42 #include "gtest/gtest.h"
43 #include "upb/reflection/def.h"
44
45 #include <grpc/grpc.h>
46 #include <grpc/support/json.h>
47 #include <grpc/support/log.h>
48 #include <grpcpp/impl/codegen/config_protobuf.h>
49
50 #include "src/core/ext/xds/xds_bootstrap.h"
51 #include "src/core/ext/xds/xds_resource_type_impl.h"
52 #include "src/core/lib/event_engine/default_event_engine.h"
53 #include "src/core/lib/gprpp/debug_location.h"
54 #include "src/core/lib/gprpp/match.h"
55 #include "src/core/lib/gprpp/sync.h"
56 #include "src/core/lib/json/json.h"
57 #include "src/core/lib/json/json_args.h"
58 #include "src/core/lib/json/json_object_loader.h"
59 #include "src/core/lib/json/json_reader.h"
60 #include "src/core/lib/json/json_writer.h"
61 #include "src/proto/grpc/testing/xds/v3/base.pb.h"
62 #include "src/proto/grpc/testing/xds/v3/discovery.pb.h"
63 #include "test/core/util/scoped_env_var.h"
64 #include "test/core/util/test_config.h"
65 #include "test/core/xds/xds_client_test_peer.h"
66 #include "test/core/xds/xds_transport_fake.h"
67
68 // IWYU pragma: no_include <google/protobuf/message.h>
69 // IWYU pragma: no_include <google/protobuf/stubs/status.h>
70 // IWYU pragma: no_include <google/protobuf/unknown_field_set.h>
71 // IWYU pragma: no_include <google/protobuf/util/json_util.h>
72 // IWYU pragma: no_include "google/protobuf/json/json.h"
73 // IWYU pragma: no_include "google/protobuf/util/json_util.h"
74
75 using envoy::service::discovery::v3::DiscoveryRequest;
76 using envoy::service::discovery::v3::DiscoveryResponse;
77
78 namespace grpc_core {
79 namespace testing {
80 namespace {
81
82 constexpr absl::string_view kDefaultXdsServerUrl = "default_xds_server";
83
84 class XdsClientTest : public ::testing::Test {
85 protected:
86 // A fake bootstrap implementation that allows tests to populate the
87 // fields however they want.
88 class FakeXdsBootstrap : public XdsBootstrap {
89 public:
90 class FakeNode : public Node {
91 public:
FakeNode()92 FakeNode() : id_("xds_client_test") {}
id() const93 const std::string& id() const override { return id_; }
cluster() const94 const std::string& cluster() const override { return cluster_; }
locality_region() const95 const std::string& locality_region() const override {
96 return locality_region_;
97 }
locality_zone() const98 const std::string& locality_zone() const override {
99 return locality_zone_;
100 }
locality_sub_zone() const101 const std::string& locality_sub_zone() const override {
102 return locality_sub_zone_;
103 }
metadata() const104 const Json::Object& metadata() const override { return metadata_; }
105
set_id(std::string id)106 void set_id(std::string id) { id_ = std::move(id); }
set_cluster(std::string cluster)107 void set_cluster(std::string cluster) { cluster_ = std::move(cluster); }
set_locality_region(std::string locality_region)108 void set_locality_region(std::string locality_region) {
109 locality_region_ = std::move(locality_region);
110 }
set_locality_zone(std::string locality_zone)111 void set_locality_zone(std::string locality_zone) {
112 locality_zone_ = std::move(locality_zone);
113 }
set_locality_sub_zone(std::string locality_sub_zone)114 void set_locality_sub_zone(std::string locality_sub_zone) {
115 locality_sub_zone_ = std::move(locality_sub_zone);
116 }
set_metadata(Json::Object metadata)117 void set_metadata(Json::Object metadata) {
118 metadata_ = std::move(metadata);
119 }
120
121 private:
122 std::string id_;
123 std::string cluster_;
124 std::string locality_region_;
125 std::string locality_zone_;
126 std::string locality_sub_zone_;
127 Json::Object metadata_;
128 };
129
130 class FakeXdsServer : public XdsServer {
131 public:
FakeXdsServer(absl::string_view server_uri=kDefaultXdsServerUrl,bool ignore_resource_deletion=false)132 explicit FakeXdsServer(
133 absl::string_view server_uri = kDefaultXdsServerUrl,
134 bool ignore_resource_deletion = false)
135 : server_uri_(server_uri),
136 ignore_resource_deletion_(ignore_resource_deletion) {}
server_uri() const137 const std::string& server_uri() const override { return server_uri_; }
IgnoreResourceDeletion() const138 bool IgnoreResourceDeletion() const override {
139 return ignore_resource_deletion_;
140 }
Equals(const XdsServer & other) const141 bool Equals(const XdsServer& other) const override {
142 const auto& o = static_cast<const FakeXdsServer&>(other);
143 return server_uri_ == o.server_uri_ &&
144 ignore_resource_deletion_ == o.ignore_resource_deletion_;
145 }
Key() const146 std::string Key() const override {
147 return absl::StrCat(server_uri_, "#", ignore_resource_deletion_);
148 }
149
150 private:
151 std::string server_uri_;
152 bool ignore_resource_deletion_ = false;
153 };
154
155 class FakeAuthority : public Authority {
156 public:
servers() const157 std::vector<const XdsServer*> servers() const override {
158 if (server_.has_value()) {
159 return {&*server_};
160 } else {
161 return {};
162 };
163 }
164
set_server(absl::optional<FakeXdsServer> server)165 void set_server(absl::optional<FakeXdsServer> server) {
166 server_ = std::move(server);
167 }
168
169 private:
170 absl::optional<FakeXdsServer> server_;
171 };
172
173 class Builder {
174 public:
Builder()175 Builder() { node_.emplace(); }
set_node_id(std::string id)176 Builder& set_node_id(std::string id) {
177 if (!node_.has_value()) node_.emplace();
178 node_->set_id(std::move(id));
179 return *this;
180 }
AddAuthority(std::string name,FakeAuthority authority)181 Builder& AddAuthority(std::string name, FakeAuthority authority) {
182 authorities_[std::move(name)] = std::move(authority);
183 return *this;
184 }
SetServers(absl::Span<const FakeXdsServer> servers)185 Builder& SetServers(absl::Span<const FakeXdsServer> servers) {
186 servers_.assign(servers.begin(), servers.end());
187 return *this;
188 }
Build()189 std::unique_ptr<XdsBootstrap> Build() {
190 auto bootstrap = std::make_unique<FakeXdsBootstrap>();
191 bootstrap->servers_ = std::move(servers_);
192 bootstrap->node_ = std::move(node_);
193 bootstrap->authorities_ = std::move(authorities_);
194 return bootstrap;
195 }
196
197 private:
198 std::vector<FakeXdsServer> servers_ = {FakeXdsServer()};
199 absl::optional<FakeNode> node_;
200 std::map<std::string, FakeAuthority> authorities_;
201 };
202
ToString() const203 std::string ToString() const override { return "<fake>"; }
204
servers() const205 std::vector<const XdsServer*> servers() const override {
206 std::vector<const XdsServer*> result;
207 result.reserve(servers_.size());
208 for (size_t i = 0; i < servers_.size(); ++i) {
209 result.emplace_back(&servers_[i]);
210 }
211 return result;
212 }
213
node() const214 const Node* node() const override {
215 return node_.has_value() ? &*node_ : nullptr;
216 }
LookupAuthority(const std::string & name) const217 const Authority* LookupAuthority(const std::string& name) const override {
218 auto it = authorities_.find(name);
219 if (it == authorities_.end()) return nullptr;
220 return &it->second;
221 }
222
223 private:
224 std::vector<FakeXdsServer> servers_;
225 absl::optional<FakeNode> node_;
226 std::map<std::string, FakeAuthority> authorities_;
227 };
228
229 // A template for a test xDS resource type with an associated watcher impl.
230 // For simplicity, we use JSON instead of proto for serialization.
231 //
232 // The specified ResourceStruct must provide the following:
233 // - a static JsonLoader() method, as described in json_object_loader.h
234 // - an AsJsonString() method that returns the object in JSON string form
235 // - a static TypeUrl() method that returns the resource type
236 //
237 // The all_resources_required_in_sotw parameter indicates the value
238 // that should be returned by the AllResourcesRequiredInSotW() method.
239 template <typename ResourceStruct, bool all_resources_required_in_sotw>
240 class XdsTestResourceType
241 : public XdsResourceTypeImpl<
242 XdsTestResourceType<ResourceStruct, all_resources_required_in_sotw>,
243 ResourceStruct> {
244 public:
245 struct ResourceAndReadDelayHandle {
246 std::shared_ptr<const ResourceStruct> resource;
247 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle;
248 };
249
250 // A watcher implementation that queues delivered watches.
251 class Watcher : public XdsResourceTypeImpl<
252 XdsTestResourceType<ResourceStruct,
253 all_resources_required_in_sotw>,
254 ResourceStruct>::WatcherInterface {
255 public:
~Watcher()256 ~Watcher() override {
257 MutexLock lock(&mu_);
258 EXPECT_THAT(queue_, ::testing::IsEmpty())
259 << this << " "
260 << Match(
261 queue_[0],
262 [&](const ResourceAndReadDelayHandle& resource) {
263 return absl::StrFormat("Resource %s",
264 resource.resource->name);
265 },
266 [&](const absl::Status& status) {
267 return status.ToString();
268 },
269 [&](const DoesNotExist& /* tag */) -> std::string {
270 return "<Does not exist>";
271 });
272 }
273
274 // Returns true if no event is received during the timeout period.
ExpectNoEvent(absl::Duration timeout)275 bool ExpectNoEvent(absl::Duration timeout) {
276 MutexLock lock(&mu_);
277 return !WaitForEventLocked(timeout);
278 }
279
HasEvent()280 bool HasEvent() {
281 MutexLock lock(&mu_);
282 return !queue_.empty();
283 }
284
WaitForNextResourceAndHandle(absl::Duration timeout=absl::Seconds (1),SourceLocation location=SourceLocation ())285 absl::optional<ResourceAndReadDelayHandle> WaitForNextResourceAndHandle(
286 absl::Duration timeout = absl::Seconds(1),
287 SourceLocation location = SourceLocation()) {
288 MutexLock lock(&mu_);
289 if (!WaitForEventLocked(timeout)) return absl::nullopt;
290 Event& event = queue_.front();
291 if (!absl::holds_alternative<ResourceAndReadDelayHandle>(event)) {
292 EXPECT_TRUE(false)
293 << "got unexpected event "
294 << (absl::holds_alternative<absl::Status>(event)
295 ? "error"
296 : "does-not-exist")
297 << " at " << location.file() << ":" << location.line();
298 return absl::nullopt;
299 }
300 auto foo = std::move(absl::get<ResourceAndReadDelayHandle>(event));
301 queue_.pop_front();
302 return foo;
303 }
304
WaitForNextResource(absl::Duration timeout=absl::Seconds (1),SourceLocation location=SourceLocation ())305 std::shared_ptr<const ResourceStruct> WaitForNextResource(
306 absl::Duration timeout = absl::Seconds(1),
307 SourceLocation location = SourceLocation()) {
308 auto resource_and_handle =
309 WaitForNextResourceAndHandle(timeout, location);
310 if (!resource_and_handle.has_value()) {
311 return nullptr;
312 }
313 return std::move(resource_and_handle->resource);
314 }
315
WaitForNextError(absl::Duration timeout=absl::Seconds (1),SourceLocation location=SourceLocation ())316 absl::optional<absl::Status> WaitForNextError(
317 absl::Duration timeout = absl::Seconds(1),
318 SourceLocation location = SourceLocation()) {
319 MutexLock lock(&mu_);
320 if (!WaitForEventLocked(timeout)) return absl::nullopt;
321 Event& event = queue_.front();
322 if (!absl::holds_alternative<absl::Status>(event)) {
323 EXPECT_TRUE(false)
324 << "got unexpected event "
325 << (absl::holds_alternative<ResourceAndReadDelayHandle>(event)
326 ? "resource"
327 : "does-not-exist")
328 << " at " << location.file() << ":" << location.line();
329 return absl::nullopt;
330 }
331 absl::Status error = std::move(absl::get<absl::Status>(event));
332 queue_.pop_front();
333 return std::move(error);
334 }
335
WaitForDoesNotExist(absl::Duration timeout,SourceLocation location=SourceLocation ())336 bool WaitForDoesNotExist(absl::Duration timeout,
337 SourceLocation location = SourceLocation()) {
338 MutexLock lock(&mu_);
339 if (!WaitForEventLocked(timeout)) return false;
340 Event& event = queue_.front();
341 if (!absl::holds_alternative<DoesNotExist>(event)) {
342 EXPECT_TRUE(false)
343 << "got unexpected event "
344 << (absl::holds_alternative<absl::Status>(event) ? "error"
345 : "resource")
346 << " at " << location.file() << ":" << location.line();
347 return false;
348 }
349 queue_.pop_front();
350 return true;
351 }
352
353 private:
354 struct DoesNotExist {};
355 using Event =
356 absl::variant<ResourceAndReadDelayHandle, absl::Status, DoesNotExist>;
357
OnResourceChanged(std::shared_ptr<const ResourceStruct> foo,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)358 void OnResourceChanged(std::shared_ptr<const ResourceStruct> foo,
359 RefCountedPtr<XdsClient::ReadDelayHandle>
360 read_delay_handle) override {
361 MutexLock lock(&mu_);
362 ResourceAndReadDelayHandle event_details = {
363 std::move(foo), std::move(read_delay_handle)};
364 queue_.emplace_back(std::move(event_details));
365 cv_.Signal();
366 }
367
OnError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle>)368 void OnError(
369 absl::Status status,
370 RefCountedPtr<XdsClient::ReadDelayHandle> /* read_delay_handle */)
371 override {
372 MutexLock lock(&mu_);
373 queue_.push_back(std::move(status));
374 cv_.Signal();
375 }
376
OnResourceDoesNotExist(RefCountedPtr<XdsClient::ReadDelayHandle>)377 void OnResourceDoesNotExist(
378 RefCountedPtr<XdsClient::ReadDelayHandle> /* read_delay_handle */)
379 override {
380 MutexLock lock(&mu_);
381 queue_.push_back(DoesNotExist());
382 cv_.Signal();
383 }
384
385 // Returns true if an event was received, or false if the timeout
386 // expires before any event is received.
WaitForEventLocked(absl::Duration timeout)387 bool WaitForEventLocked(absl::Duration timeout)
388 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) {
389 while (queue_.empty()) {
390 if (cv_.WaitWithTimeout(&mu_,
391 timeout * grpc_test_slowdown_factor())) {
392 return false;
393 }
394 }
395 return true;
396 }
397
398 Mutex mu_;
399 CondVar cv_;
400 std::deque<Event> queue_ ABSL_GUARDED_BY(&mu_);
401 };
402
type_url() const403 absl::string_view type_url() const override {
404 return ResourceStruct::TypeUrl();
405 }
Decode(const XdsResourceType::DecodeContext &,absl::string_view serialized_resource) const406 XdsResourceType::DecodeResult Decode(
407 const XdsResourceType::DecodeContext& /*context*/,
408 absl::string_view serialized_resource) const override {
409 auto json = JsonParse(serialized_resource);
410 XdsResourceType::DecodeResult result;
411 if (!json.ok()) {
412 result.resource = json.status();
413 } else {
414 absl::StatusOr<ResourceStruct> foo =
415 LoadFromJson<ResourceStruct>(*json);
416 if (!foo.ok()) {
417 auto it = json->object().find("name");
418 if (it != json->object().end()) {
419 result.name = it->second.string();
420 }
421 result.resource = foo.status();
422 } else {
423 result.name = foo->name;
424 result.resource = std::make_unique<ResourceStruct>(std::move(*foo));
425 }
426 }
427 return result;
428 }
AllResourcesRequiredInSotW() const429 bool AllResourcesRequiredInSotW() const override {
430 return all_resources_required_in_sotw;
431 }
InitUpbSymtab(XdsClient *,upb_DefPool *) const432 void InitUpbSymtab(XdsClient*, upb_DefPool* /*symtab*/) const override {}
433
EncodeAsAny(const ResourceStruct & resource)434 static google::protobuf::Any EncodeAsAny(const ResourceStruct& resource) {
435 google::protobuf::Any any;
436 any.set_type_url(
437 absl::StrCat("type.googleapis.com/", ResourceStruct::TypeUrl()));
438 any.set_value(resource.AsJsonString());
439 return any;
440 }
441 };
442
443 // A fake "Foo" xDS resource type.
444 struct XdsFooResource : public XdsResourceType::ResourceData {
445 std::string name;
446 uint32_t value;
447
448 XdsFooResource() = default;
XdsFooResourcegrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsFooResource449 XdsFooResource(std::string name, uint32_t value)
450 : name(std::move(name)), value(value) {}
451
operator ==grpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsFooResource452 bool operator==(const XdsFooResource& other) const {
453 return name == other.name && value == other.value;
454 }
455
AsJsonStringgrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsFooResource456 std::string AsJsonString() const {
457 return absl::StrCat("{\"name\":\"", name, "\",\"value\":", value, "}");
458 }
459
JsonLoadergrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsFooResource460 static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
461 static const auto* loader = JsonObjectLoader<XdsFooResource>()
462 .Field("name", &XdsFooResource::name)
463 .Field("value", &XdsFooResource::value)
464 .Finish();
465 return loader;
466 }
467
TypeUrlgrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsFooResource468 static absl::string_view TypeUrl() { return "test.v3.foo"; }
469 };
470 using XdsFooResourceType = XdsTestResourceType<XdsFooResource, false>;
471
472 // A fake "Bar" xDS resource type.
473 struct XdsBarResource : public XdsResourceType::ResourceData {
474 std::string name;
475 std::string value;
476
477 XdsBarResource() = default;
XdsBarResourcegrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsBarResource478 XdsBarResource(std::string name, std::string value)
479 : name(std::move(name)), value(std::move(value)) {}
480
operator ==grpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsBarResource481 bool operator==(const XdsBarResource& other) const {
482 return name == other.name && value == other.value;
483 }
484
AsJsonStringgrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsBarResource485 std::string AsJsonString() const {
486 return absl::StrCat("{\"name\":\"", name, "\",\"value\":\"", value,
487 "\"}");
488 }
489
JsonLoadergrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsBarResource490 static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
491 static const auto* loader = JsonObjectLoader<XdsBarResource>()
492 .Field("name", &XdsBarResource::name)
493 .Field("value", &XdsBarResource::value)
494 .Finish();
495 return loader;
496 }
497
TypeUrlgrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsBarResource498 static absl::string_view TypeUrl() { return "test.v3.bar"; }
499 };
500 using XdsBarResourceType = XdsTestResourceType<XdsBarResource, false>;
501
502 // A fake "WildcardCapable" xDS resource type.
503 // This resource type return true for AllResourcesRequiredInSotW(),
504 // just like LDS and CDS.
505 struct XdsWildcardCapableResource : public XdsResourceType::ResourceData {
506 std::string name;
507 uint32_t value;
508
509 XdsWildcardCapableResource() = default;
XdsWildcardCapableResourcegrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsWildcardCapableResource510 XdsWildcardCapableResource(std::string name, uint32_t value)
511 : name(std::move(name)), value(value) {}
512
operator ==grpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsWildcardCapableResource513 bool operator==(const XdsWildcardCapableResource& other) const {
514 return name == other.name && value == other.value;
515 }
516
AsJsonStringgrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsWildcardCapableResource517 std::string AsJsonString() const {
518 return absl::StrCat("{\"name\":\"", name, "\",\"value\":\"", value,
519 "\"}");
520 }
521
JsonLoadergrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsWildcardCapableResource522 static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
523 static const auto* loader =
524 JsonObjectLoader<XdsWildcardCapableResource>()
525 .Field("name", &XdsWildcardCapableResource::name)
526 .Field("value", &XdsWildcardCapableResource::value)
527 .Finish();
528 return loader;
529 }
530
TypeUrlgrpc_core::testing::__anon1b480ec00111::XdsClientTest::XdsWildcardCapableResource531 static absl::string_view TypeUrl() { return "test.v3.wildcard_capable"; }
532 };
533 using XdsWildcardCapableResourceType =
534 XdsTestResourceType<XdsWildcardCapableResource,
535 /*all_resources_required_in_sotw=*/true>;
536
537 // A helper class to build and serialize a DiscoveryResponse.
538 class ResponseBuilder {
539 public:
ResponseBuilder(absl::string_view type_url)540 explicit ResponseBuilder(absl::string_view type_url) {
541 response_.set_type_url(absl::StrCat("type.googleapis.com/", type_url));
542 }
543
set_version_info(absl::string_view version_info)544 ResponseBuilder& set_version_info(absl::string_view version_info) {
545 response_.set_version_info(std::string(version_info));
546 return *this;
547 }
set_nonce(absl::string_view nonce)548 ResponseBuilder& set_nonce(absl::string_view nonce) {
549 response_.set_nonce(std::string(nonce));
550 return *this;
551 }
552
553 template <typename ResourceType>
AddResource(const typename ResourceType::ResourceType & resource,bool in_resource_wrapper=false)554 ResponseBuilder& AddResource(
555 const typename ResourceType::ResourceType& resource,
556 bool in_resource_wrapper = false) {
557 auto* res = response_.add_resources();
558 *res = ResourceType::EncodeAsAny(resource);
559 if (in_resource_wrapper) {
560 envoy::service::discovery::v3::Resource resource_wrapper;
561 resource_wrapper.set_name(resource.name);
562 *resource_wrapper.mutable_resource() = std::move(*res);
563 res->PackFrom(resource_wrapper);
564 }
565 return *this;
566 }
567
AddFooResource(const XdsFooResource & resource,bool in_resource_wrapper=false)568 ResponseBuilder& AddFooResource(const XdsFooResource& resource,
569 bool in_resource_wrapper = false) {
570 return AddResource<XdsFooResourceType>(resource, in_resource_wrapper);
571 }
572
AddBarResource(const XdsBarResource & resource,bool in_resource_wrapper=false)573 ResponseBuilder& AddBarResource(const XdsBarResource& resource,
574 bool in_resource_wrapper = false) {
575 return AddResource<XdsBarResourceType>(resource, in_resource_wrapper);
576 }
577
AddWildcardCapableResource(const XdsWildcardCapableResource & resource,bool in_resource_wrapper=false)578 ResponseBuilder& AddWildcardCapableResource(
579 const XdsWildcardCapableResource& resource,
580 bool in_resource_wrapper = false) {
581 return AddResource<XdsWildcardCapableResourceType>(resource,
582 in_resource_wrapper);
583 }
584
AddInvalidResource(absl::string_view type_url,absl::string_view value,absl::string_view resource_wrapper_name="")585 ResponseBuilder& AddInvalidResource(
586 absl::string_view type_url, absl::string_view value,
587 absl::string_view resource_wrapper_name = "") {
588 auto* res = response_.add_resources();
589 res->set_type_url(absl::StrCat("type.googleapis.com/", type_url));
590 res->set_value(std::string(value));
591 if (!resource_wrapper_name.empty()) {
592 envoy::service::discovery::v3::Resource resource_wrapper;
593 resource_wrapper.set_name(std::string(resource_wrapper_name));
594 *resource_wrapper.mutable_resource() = std::move(*res);
595 res->PackFrom(resource_wrapper);
596 }
597 return *this;
598 }
599
AddInvalidResourceWrapper()600 ResponseBuilder& AddInvalidResourceWrapper() {
601 auto* res = response_.add_resources();
602 res->set_type_url(
603 "type.googleapis.com/envoy.service.discovery.v3.Resource");
604 res->set_value(std::string("\0", 1));
605 return *this;
606 }
607
AddEmptyResource()608 ResponseBuilder& AddEmptyResource() {
609 response_.add_resources();
610 return *this;
611 }
612
Serialize()613 std::string Serialize() {
614 std::string serialized_response;
615 EXPECT_TRUE(response_.SerializeToString(&serialized_response));
616 return serialized_response;
617 }
618
619 private:
620 DiscoveryResponse response_;
621 };
622
623 class MetricsReporter : public XdsMetricsReporter {
624 public:
625 using ResourceUpdateMap = std::map<
626 std::pair<std::string /*xds_server*/, std::string /*resource_type*/>,
627 uint64_t>;
628
resource_updates_valid() const629 const ResourceUpdateMap& resource_updates_valid() const {
630 return resource_updates_valid_;
631 }
resource_updates_invalid() const632 const ResourceUpdateMap& resource_updates_invalid() const {
633 return resource_updates_invalid_;
634 }
635
636 private:
ReportResourceUpdates(absl::string_view xds_server,absl::string_view resource_type,uint64_t num_resources_valid,uint64_t num_resources_invalid)637 void ReportResourceUpdates(absl::string_view xds_server,
638 absl::string_view resource_type,
639 uint64_t num_resources_valid,
640 uint64_t num_resources_invalid) override {
641 auto key =
642 std::make_pair(std::string(xds_server), std::string(resource_type));
643 if (num_resources_valid > 0) {
644 resource_updates_valid_[key] += num_resources_valid;
645 }
646 if (num_resources_invalid > 0) {
647 resource_updates_invalid_[key] += num_resources_invalid;
648 }
649 }
650
651 ResourceUpdateMap resource_updates_valid_;
652 ResourceUpdateMap resource_updates_invalid_;
653 };
654
655 using ResourceCounts =
656 std::vector<std::pair<XdsClientTestPeer::ResourceCountLabels, uint64_t>>;
GetResourceCounts()657 ResourceCounts GetResourceCounts() {
658 ResourceCounts resource_counts;
659 XdsClientTestPeer(xds_client_.get())
660 .TestReportResourceCounts(
661 [&](const XdsClientTestPeer::ResourceCountLabels& labels,
662 uint64_t count) {
663 resource_counts.emplace_back(labels, count);
664 });
665 return resource_counts;
666 }
667
668 using ServerConnectionMap = std::map<std::string, bool>;
GetServerConnections()669 ServerConnectionMap GetServerConnections() {
670 ServerConnectionMap server_connection_map;
671 XdsClientTestPeer(xds_client_.get())
672 .TestReportServerConnections(
673 [&](absl::string_view xds_server, bool connected) {
674 std::string server(xds_server);
675 EXPECT_EQ(server_connection_map.find(server),
676 server_connection_map.end());
677 server_connection_map[std::move(server)] = connected;
678 });
679 return server_connection_map;
680 }
681
682 // Sets transport_factory_ and initializes xds_client_ with the
683 // specified bootstrap config.
InitXdsClient(FakeXdsBootstrap::Builder bootstrap_builder=FakeXdsBootstrap::Builder (),Duration resource_request_timeout=Duration::Seconds (15))684 void InitXdsClient(
685 FakeXdsBootstrap::Builder bootstrap_builder = FakeXdsBootstrap::Builder(),
686 Duration resource_request_timeout = Duration::Seconds(15)) {
687 auto transport_factory = MakeOrphanable<FakeXdsTransportFactory>(
688 []() { FAIL() << "Multiple concurrent reads"; });
689 transport_factory_ =
690 transport_factory->Ref().TakeAsSubclass<FakeXdsTransportFactory>();
691 auto metrics_reporter = std::make_unique<MetricsReporter>();
692 metrics_reporter_ = metrics_reporter.get();
693 xds_client_ = MakeRefCounted<XdsClient>(
694 bootstrap_builder.Build(), std::move(transport_factory),
695 grpc_event_engine::experimental::GetDefaultEventEngine(),
696 std::move(metrics_reporter), "foo agent", "foo version",
697 resource_request_timeout * grpc_test_slowdown_factor());
698 }
699
700 // Starts and cancels a watch for a Foo resource.
StartFooWatch(absl::string_view resource_name)701 RefCountedPtr<XdsFooResourceType::Watcher> StartFooWatch(
702 absl::string_view resource_name) {
703 auto watcher = MakeRefCounted<XdsFooResourceType::Watcher>();
704 XdsFooResourceType::StartWatch(xds_client_.get(), resource_name, watcher);
705 return watcher;
706 }
CancelFooWatch(XdsFooResourceType::Watcher * watcher,absl::string_view resource_name,bool delay_unsubscription=false)707 void CancelFooWatch(XdsFooResourceType::Watcher* watcher,
708 absl::string_view resource_name,
709 bool delay_unsubscription = false) {
710 XdsFooResourceType::CancelWatch(xds_client_.get(), resource_name, watcher,
711 delay_unsubscription);
712 }
713
714 // Starts and cancels a watch for a Bar resource.
StartBarWatch(absl::string_view resource_name)715 RefCountedPtr<XdsBarResourceType::Watcher> StartBarWatch(
716 absl::string_view resource_name) {
717 auto watcher = MakeRefCounted<XdsBarResourceType::Watcher>();
718 XdsBarResourceType::StartWatch(xds_client_.get(), resource_name, watcher);
719 return watcher;
720 }
CancelBarWatch(XdsBarResourceType::Watcher * watcher,absl::string_view resource_name,bool delay_unsubscription=false)721 void CancelBarWatch(XdsBarResourceType::Watcher* watcher,
722 absl::string_view resource_name,
723 bool delay_unsubscription = false) {
724 XdsBarResourceType::CancelWatch(xds_client_.get(), resource_name, watcher,
725 delay_unsubscription);
726 }
727
728 // Starts and cancels a watch for a WildcardCapable resource.
729 RefCountedPtr<XdsWildcardCapableResourceType::Watcher>
StartWildcardCapableWatch(absl::string_view resource_name)730 StartWildcardCapableWatch(absl::string_view resource_name) {
731 auto watcher = MakeRefCounted<XdsWildcardCapableResourceType::Watcher>();
732 XdsWildcardCapableResourceType::StartWatch(xds_client_.get(), resource_name,
733 watcher);
734 return watcher;
735 }
CancelWildcardCapableWatch(XdsWildcardCapableResourceType::Watcher * watcher,absl::string_view resource_name,bool delay_unsubscription=false)736 void CancelWildcardCapableWatch(
737 XdsWildcardCapableResourceType::Watcher* watcher,
738 absl::string_view resource_name, bool delay_unsubscription = false) {
739 XdsWildcardCapableResourceType::CancelWatch(
740 xds_client_.get(), resource_name, watcher, delay_unsubscription);
741 }
742
WaitForAdsStream(const XdsBootstrap::XdsServer & xds_server,absl::Duration timeout=absl::Seconds (5))743 RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> WaitForAdsStream(
744 const XdsBootstrap::XdsServer& xds_server,
745 absl::Duration timeout = absl::Seconds(5)) {
746 return transport_factory_->WaitForStream(
747 xds_server, FakeXdsTransportFactory::kAdsMethod,
748 timeout * grpc_test_slowdown_factor());
749 }
750
TriggerConnectionFailure(const XdsBootstrap::XdsServer & xds_server,absl::Status status)751 void TriggerConnectionFailure(const XdsBootstrap::XdsServer& xds_server,
752 absl::Status status) {
753 transport_factory_->TriggerConnectionFailure(xds_server, std::move(status));
754 }
755
WaitForAdsStream(absl::Duration timeout=absl::Seconds (5))756 RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> WaitForAdsStream(
757 absl::Duration timeout = absl::Seconds(5)) {
758 return WaitForAdsStream(*xds_client_->bootstrap().servers().front(),
759 timeout);
760 }
761
762 // Gets the latest request sent to the fake xDS server.
WaitForRequest(FakeXdsTransportFactory::FakeStreamingCall * stream,absl::Duration timeout=absl::Seconds (3),SourceLocation location=SourceLocation ())763 absl::optional<DiscoveryRequest> WaitForRequest(
764 FakeXdsTransportFactory::FakeStreamingCall* stream,
765 absl::Duration timeout = absl::Seconds(3),
766 SourceLocation location = SourceLocation()) {
767 auto message =
768 stream->WaitForMessageFromClient(timeout * grpc_test_slowdown_factor());
769 if (!message.has_value()) return absl::nullopt;
770 DiscoveryRequest request;
771 bool success = request.ParseFromString(*message);
772 EXPECT_TRUE(success) << "Failed to deserialize DiscoveryRequest at "
773 << location.file() << ":" << location.line();
774 if (!success) return absl::nullopt;
775 return std::move(request);
776 }
777
778 // Helper function to check the fields of a DiscoveryRequest.
CheckRequest(const DiscoveryRequest & request,absl::string_view type_url,absl::string_view version_info,absl::string_view response_nonce,const absl::Status & error_detail,const std::set<absl::string_view> & resource_names,SourceLocation location=SourceLocation ())779 void CheckRequest(const DiscoveryRequest& request, absl::string_view type_url,
780 absl::string_view version_info,
781 absl::string_view response_nonce,
782 const absl::Status& error_detail,
783 const std::set<absl::string_view>& resource_names,
784 SourceLocation location = SourceLocation()) {
785 EXPECT_EQ(request.type_url(),
786 absl::StrCat("type.googleapis.com/", type_url))
787 << location.file() << ":" << location.line();
788 EXPECT_EQ(request.version_info(), version_info)
789 << location.file() << ":" << location.line();
790 EXPECT_EQ(request.response_nonce(), response_nonce)
791 << location.file() << ":" << location.line();
792 if (error_detail.ok()) {
793 EXPECT_FALSE(request.has_error_detail())
794 << location.file() << ":" << location.line();
795 } else {
796 EXPECT_EQ(request.error_detail().code(),
797 static_cast<int>(error_detail.code()))
798 << location.file() << ":" << location.line();
799 EXPECT_EQ(request.error_detail().message(), error_detail.message())
800 << location.file() << ":" << location.line();
801 }
802 EXPECT_THAT(request.resource_names(),
803 ::testing::UnorderedElementsAreArray(resource_names))
804 << location.file() << ":" << location.line();
805 }
806
807 // Helper function to check the contents of the node message in a
808 // request against the client's node info.
CheckRequestNode(const DiscoveryRequest & request,SourceLocation location=SourceLocation ())809 void CheckRequestNode(const DiscoveryRequest& request,
810 SourceLocation location = SourceLocation()) {
811 // These fields come from the bootstrap config.
812 EXPECT_EQ(request.node().id(), xds_client_->bootstrap().node()->id())
813 << location.file() << ":" << location.line();
814 EXPECT_EQ(request.node().cluster(),
815 xds_client_->bootstrap().node()->cluster())
816 << location.file() << ":" << location.line();
817 EXPECT_EQ(request.node().locality().region(),
818 xds_client_->bootstrap().node()->locality_region())
819 << location.file() << ":" << location.line();
820 EXPECT_EQ(request.node().locality().zone(),
821 xds_client_->bootstrap().node()->locality_zone())
822 << location.file() << ":" << location.line();
823 EXPECT_EQ(request.node().locality().sub_zone(),
824 xds_client_->bootstrap().node()->locality_sub_zone())
825 << location.file() << ":" << location.line();
826 if (xds_client_->bootstrap().node()->metadata().empty()) {
827 EXPECT_FALSE(request.node().has_metadata())
828 << location.file() << ":" << location.line();
829 } else {
830 std::string metadata_json_str;
831 auto status =
832 MessageToJsonString(request.node().metadata(), &metadata_json_str,
833 GRPC_CUSTOM_JSONUTIL::JsonPrintOptions());
834 ASSERT_TRUE(status.ok())
835 << status << " on " << location.file() << ":" << location.line();
836 auto metadata_json = JsonParse(metadata_json_str);
837 ASSERT_TRUE(metadata_json.ok())
838 << metadata_json.status() << " on " << location.file() << ":"
839 << location.line();
840 Json expected =
841 Json::FromObject(xds_client_->bootstrap().node()->metadata());
842 EXPECT_EQ(*metadata_json, expected)
843 << location.file() << ":" << location.line()
844 << ":\nexpected: " << JsonDump(expected)
845 << "\nactual: " << JsonDump(*metadata_json);
846 }
847 EXPECT_EQ(request.node().user_agent_name(), "foo agent")
848 << location.file() << ":" << location.line();
849 EXPECT_EQ(request.node().user_agent_version(), "foo version")
850 << location.file() << ":" << location.line();
851 }
852
853 RefCountedPtr<FakeXdsTransportFactory> transport_factory_;
854 RefCountedPtr<XdsClient> xds_client_;
855 MetricsReporter* metrics_reporter_ = nullptr;
856 };
857
858 MATCHER_P3(ResourceCountLabelsEq, xds_authority, resource_type, cache_state,
859 "equals ResourceCountLabels") {
860 bool ok = true;
861 ok &= ::testing::ExplainMatchResult(xds_authority, arg.xds_authority,
862 result_listener);
863 ok &= ::testing::ExplainMatchResult(resource_type, arg.resource_type,
864 result_listener);
865 ok &= ::testing::ExplainMatchResult(cache_state, arg.cache_state,
866 result_listener);
867 return ok;
868 }
869
TEST_F(XdsClientTest,BasicWatch)870 TEST_F(XdsClientTest, BasicWatch) {
871 InitXdsClient();
872 // Metrics should initially be empty.
873 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
874 ::testing::ElementsAre());
875 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
876 ::testing::ElementsAre());
877 EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre());
878 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
879 // Start a watch for "foo1".
880 auto watcher = StartFooWatch("foo1");
881 // Check metrics.
882 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
883 ::testing::ElementsAre());
884 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
885 ::testing::ElementsAre());
886 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
887 kDefaultXdsServerUrl, true)));
888 EXPECT_THAT(GetResourceCounts(),
889 ::testing::ElementsAre(::testing::Pair(
890 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
891 XdsFooResourceType::Get()->type_url(),
892 "requested"),
893 1)));
894 // Watcher should initially not see any resource reported.
895 EXPECT_FALSE(watcher->HasEvent());
896 // XdsClient should have created an ADS stream.
897 auto stream = WaitForAdsStream();
898 ASSERT_TRUE(stream != nullptr);
899 // XdsClient should have sent a subscription request on the ADS stream.
900 auto request = WaitForRequest(stream.get());
901 ASSERT_TRUE(request.has_value());
902 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
903 /*version_info=*/"", /*response_nonce=*/"",
904 /*error_detail=*/absl::OkStatus(),
905 /*resource_names=*/{"foo1"});
906 CheckRequestNode(*request); // Should be present on the first request.
907 // Send a response.
908 stream->SendMessageToClient(
909 ResponseBuilder(XdsFooResourceType::Get()->type_url())
910 .set_version_info("1")
911 .set_nonce("A")
912 .AddFooResource(XdsFooResource("foo1", 6))
913 .Serialize());
914 // XdsClient should have delivered the response to the watcher.
915 auto resource = watcher->WaitForNextResource();
916 ASSERT_NE(resource, nullptr);
917 EXPECT_EQ(resource->name, "foo1");
918 EXPECT_EQ(resource->value, 6);
919 // Check metric data.
920 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
921 ::testing::ElementsAre(::testing::Pair(
922 ::testing::Pair(kDefaultXdsServerUrl,
923 XdsFooResourceType::Get()->type_url()),
924 1)));
925 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
926 ::testing::ElementsAre());
927 EXPECT_THAT(
928 GetResourceCounts(),
929 ::testing::ElementsAre(::testing::Pair(
930 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
931 XdsFooResourceType::Get()->type_url(), "acked"),
932 1)));
933 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
934 kDefaultXdsServerUrl, true)));
935 // XdsClient should have sent an ACK message to the xDS server.
936 request = WaitForRequest(stream.get());
937 ASSERT_TRUE(request.has_value());
938 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
939 /*version_info=*/"1", /*response_nonce=*/"A",
940 /*error_detail=*/absl::OkStatus(),
941 /*resource_names=*/{"foo1"});
942 // Cancel watch.
943 CancelFooWatch(watcher.get(), "foo1");
944 EXPECT_TRUE(stream->Orphaned());
945 // Check metric data.
946 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
947 ::testing::ElementsAre(::testing::Pair(
948 ::testing::Pair(kDefaultXdsServerUrl,
949 XdsFooResourceType::Get()->type_url()),
950 1)));
951 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
952 ::testing::ElementsAre());
953 EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre());
954 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
955 }
956
TEST_F(XdsClientTest,UpdateFromServer)957 TEST_F(XdsClientTest, UpdateFromServer) {
958 InitXdsClient();
959 // Start a watch for "foo1".
960 auto watcher = StartFooWatch("foo1");
961 // Watcher should initially not see any resource reported.
962 EXPECT_FALSE(watcher->HasEvent());
963 // XdsClient should have created an ADS stream.
964 auto stream = WaitForAdsStream();
965 ASSERT_TRUE(stream != nullptr);
966 // XdsClient should have sent a subscription request on the ADS stream.
967 auto request = WaitForRequest(stream.get());
968 ASSERT_TRUE(request.has_value());
969 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
970 /*version_info=*/"", /*response_nonce=*/"",
971 /*error_detail=*/absl::OkStatus(),
972 /*resource_names=*/{"foo1"});
973 CheckRequestNode(*request); // Should be present on the first request.
974 // Send a response.
975 stream->SendMessageToClient(
976 ResponseBuilder(XdsFooResourceType::Get()->type_url())
977 .set_version_info("1")
978 .set_nonce("A")
979 .AddFooResource(XdsFooResource("foo1", 6))
980 .Serialize());
981 // XdsClient should have delivered the response to the watcher.
982 auto resource = watcher->WaitForNextResource();
983 ASSERT_NE(resource, nullptr);
984 EXPECT_EQ(resource->name, "foo1");
985 EXPECT_EQ(resource->value, 6);
986 // Check metric data.
987 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
988 ::testing::ElementsAre(::testing::Pair(
989 ::testing::Pair(kDefaultXdsServerUrl,
990 XdsFooResourceType::Get()->type_url()),
991 1)));
992 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
993 ::testing::ElementsAre());
994 EXPECT_THAT(
995 GetResourceCounts(),
996 ::testing::ElementsAre(::testing::Pair(
997 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
998 XdsFooResourceType::Get()->type_url(), "acked"),
999 1)));
1000 // XdsClient should have sent an ACK message to the xDS server.
1001 request = WaitForRequest(stream.get());
1002 ASSERT_TRUE(request.has_value());
1003 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1004 /*version_info=*/"1", /*response_nonce=*/"A",
1005 /*error_detail=*/absl::OkStatus(),
1006 /*resource_names=*/{"foo1"});
1007 // Server sends an updated version of the resource.
1008 stream->SendMessageToClient(
1009 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1010 .set_version_info("2")
1011 .set_nonce("B")
1012 .AddFooResource(XdsFooResource("foo1", 9))
1013 .Serialize());
1014 // XdsClient should have delivered the response to the watcher.
1015 resource = watcher->WaitForNextResource();
1016 ASSERT_NE(resource, nullptr);
1017 EXPECT_EQ(resource->name, "foo1");
1018 EXPECT_EQ(resource->value, 9);
1019 // Check metric data.
1020 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1021 ::testing::ElementsAre(::testing::Pair(
1022 ::testing::Pair(kDefaultXdsServerUrl,
1023 XdsFooResourceType::Get()->type_url()),
1024 2)));
1025 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1026 ::testing::ElementsAre());
1027 EXPECT_THAT(
1028 GetResourceCounts(),
1029 ::testing::ElementsAre(::testing::Pair(
1030 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1031 XdsFooResourceType::Get()->type_url(), "acked"),
1032 1)));
1033 // XdsClient should have sent an ACK message to the xDS server.
1034 request = WaitForRequest(stream.get());
1035 ASSERT_TRUE(request.has_value());
1036 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1037 /*version_info=*/"2", /*response_nonce=*/"B",
1038 /*error_detail=*/absl::OkStatus(),
1039 /*resource_names=*/{"foo1"});
1040 // Cancel watch.
1041 CancelFooWatch(watcher.get(), "foo1");
1042 EXPECT_TRUE(stream->Orphaned());
1043 }
1044
TEST_F(XdsClientTest,MultipleWatchersForSameResource)1045 TEST_F(XdsClientTest, MultipleWatchersForSameResource) {
1046 InitXdsClient();
1047 // Start a watch for "foo1".
1048 auto watcher = StartFooWatch("foo1");
1049 // Watcher should initially not see any resource reported.
1050 EXPECT_FALSE(watcher->HasEvent());
1051 // XdsClient should have created an ADS stream.
1052 auto stream = WaitForAdsStream();
1053 ASSERT_TRUE(stream != nullptr);
1054 // XdsClient should have sent a subscription request on the ADS stream.
1055 auto request = WaitForRequest(stream.get());
1056 ASSERT_TRUE(request.has_value());
1057 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1058 /*version_info=*/"", /*response_nonce=*/"",
1059 /*error_detail=*/absl::OkStatus(),
1060 /*resource_names=*/{"foo1"});
1061 CheckRequestNode(*request); // Should be present on the first request.
1062 // Send a response.
1063 stream->SendMessageToClient(
1064 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1065 .set_version_info("1")
1066 .set_nonce("A")
1067 .AddFooResource(XdsFooResource("foo1", 6))
1068 .Serialize());
1069 // XdsClient should have delivered the response to the watcher.
1070 auto resource = watcher->WaitForNextResource();
1071 ASSERT_NE(resource, nullptr);
1072 EXPECT_EQ(resource->name, "foo1");
1073 EXPECT_EQ(resource->value, 6);
1074 // Check metric data.
1075 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1076 ::testing::ElementsAre(::testing::Pair(
1077 ::testing::Pair(kDefaultXdsServerUrl,
1078 XdsFooResourceType::Get()->type_url()),
1079 1)));
1080 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1081 ::testing::ElementsAre());
1082 EXPECT_THAT(
1083 GetResourceCounts(),
1084 ::testing::ElementsAre(::testing::Pair(
1085 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1086 XdsFooResourceType::Get()->type_url(), "acked"),
1087 1)));
1088 // XdsClient should have sent an ACK message to the xDS server.
1089 request = WaitForRequest(stream.get());
1090 ASSERT_TRUE(request.has_value());
1091 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1092 /*version_info=*/"1", /*response_nonce=*/"A",
1093 /*error_detail=*/absl::OkStatus(),
1094 /*resource_names=*/{"foo1"});
1095 // Start a second watcher for the same resource.
1096 auto watcher2 = StartFooWatch("foo1");
1097 // This watcher should get an immediate notification, because the
1098 // resource is already cached.
1099 resource = watcher2->WaitForNextResource();
1100 ASSERT_NE(resource, nullptr);
1101 EXPECT_EQ(resource->name, "foo1");
1102 EXPECT_EQ(resource->value, 6);
1103 // Server should not have seen another request from the client.
1104 ASSERT_FALSE(stream->HaveMessageFromClient());
1105 // Server sends an updated version of the resource.
1106 stream->SendMessageToClient(
1107 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1108 .set_version_info("2")
1109 .set_nonce("B")
1110 .AddFooResource(XdsFooResource("foo1", 9))
1111 .Serialize());
1112 // XdsClient should deliver the response to both watchers.
1113 resource = watcher->WaitForNextResource();
1114 ASSERT_NE(resource, nullptr);
1115 EXPECT_EQ(resource->name, "foo1");
1116 EXPECT_EQ(resource->value, 9);
1117 resource = watcher2->WaitForNextResource();
1118 ASSERT_NE(resource, nullptr);
1119 EXPECT_EQ(resource->name, "foo1");
1120 EXPECT_EQ(resource->value, 9);
1121 // Check metric data.
1122 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1123 ::testing::ElementsAre(::testing::Pair(
1124 ::testing::Pair(kDefaultXdsServerUrl,
1125 XdsFooResourceType::Get()->type_url()),
1126 2)));
1127 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1128 ::testing::ElementsAre());
1129 EXPECT_THAT(
1130 GetResourceCounts(),
1131 ::testing::ElementsAre(::testing::Pair(
1132 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1133 XdsFooResourceType::Get()->type_url(), "acked"),
1134 1)));
1135 // XdsClient should have sent an ACK message to the xDS server.
1136 request = WaitForRequest(stream.get());
1137 ASSERT_TRUE(request.has_value());
1138 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1139 /*version_info=*/"2", /*response_nonce=*/"B",
1140 /*error_detail=*/absl::OkStatus(),
1141 /*resource_names=*/{"foo1"});
1142 // Cancel one of the watchers.
1143 CancelFooWatch(watcher.get(), "foo1");
1144 // The server should not see any new request.
1145 ASSERT_FALSE(WaitForRequest(stream.get()));
1146 // Now cancel the second watcher.
1147 CancelFooWatch(watcher2.get(), "foo1");
1148 EXPECT_TRUE(stream->Orphaned());
1149 }
1150
TEST_F(XdsClientTest,SubscribeToMultipleResources)1151 TEST_F(XdsClientTest, SubscribeToMultipleResources) {
1152 InitXdsClient();
1153 // Start a watch for "foo1".
1154 auto watcher = StartFooWatch("foo1");
1155 // Watcher should initially not see any resource reported.
1156 EXPECT_FALSE(watcher->HasEvent());
1157 // Check metrics.
1158 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1159 ::testing::ElementsAre());
1160 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1161 ::testing::ElementsAre());
1162 EXPECT_THAT(GetResourceCounts(),
1163 ::testing::ElementsAre(::testing::Pair(
1164 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1165 XdsFooResourceType::Get()->type_url(),
1166 "requested"),
1167 1)));
1168 // XdsClient should have created an ADS stream.
1169 auto stream = WaitForAdsStream();
1170 ASSERT_TRUE(stream != nullptr);
1171 // XdsClient should have sent a subscription request on the ADS stream.
1172 auto request = WaitForRequest(stream.get());
1173 ASSERT_TRUE(request.has_value());
1174 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1175 /*version_info=*/"", /*response_nonce=*/"",
1176 /*error_detail=*/absl::OkStatus(),
1177 /*resource_names=*/{"foo1"});
1178 CheckRequestNode(*request); // Should be present on the first request.
1179 // Send a response.
1180 stream->SendMessageToClient(
1181 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1182 .set_version_info("1")
1183 .set_nonce("A")
1184 .AddFooResource(XdsFooResource("foo1", 6))
1185 .Serialize());
1186 // XdsClient should have delivered the response to the watcher.
1187 auto resource = watcher->WaitForNextResource();
1188 ASSERT_NE(resource, nullptr);
1189 EXPECT_EQ(resource->name, "foo1");
1190 EXPECT_EQ(resource->value, 6);
1191 // Check metric data.
1192 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1193 ::testing::ElementsAre(::testing::Pair(
1194 ::testing::Pair(kDefaultXdsServerUrl,
1195 XdsFooResourceType::Get()->type_url()),
1196 1)));
1197 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1198 ::testing::ElementsAre());
1199 EXPECT_THAT(
1200 GetResourceCounts(),
1201 ::testing::ElementsAre(::testing::Pair(
1202 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1203 XdsFooResourceType::Get()->type_url(), "acked"),
1204 1)));
1205 // XdsClient should have sent an ACK message to the xDS server.
1206 request = WaitForRequest(stream.get());
1207 ASSERT_TRUE(request.has_value());
1208 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1209 /*version_info=*/"1", /*response_nonce=*/"A",
1210 /*error_detail=*/absl::OkStatus(),
1211 /*resource_names=*/{"foo1"});
1212 // Start a watch for "foo2".
1213 auto watcher2 = StartFooWatch("foo2");
1214 // Check metric data.
1215 EXPECT_THAT(
1216 GetResourceCounts(),
1217 ::testing::ElementsAre(
1218 ::testing::Pair(ResourceCountLabelsEq(
1219 XdsClient::kOldStyleAuthority,
1220 XdsFooResourceType::Get()->type_url(), "acked"),
1221 1),
1222 ::testing::Pair(
1223 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1224 XdsFooResourceType::Get()->type_url(),
1225 "requested"),
1226 1)));
1227 // XdsClient should have sent a subscription request on the ADS stream.
1228 request = WaitForRequest(stream.get());
1229 ASSERT_TRUE(request.has_value());
1230 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1231 /*version_info=*/"1", /*response_nonce=*/"A",
1232 /*error_detail=*/absl::OkStatus(),
1233 /*resource_names=*/{"foo1", "foo2"});
1234 // Send a response.
1235 stream->SendMessageToClient(
1236 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1237 .set_version_info("1")
1238 .set_nonce("B")
1239 .AddFooResource(XdsFooResource("foo2", 7))
1240 .Serialize());
1241 // XdsClient should have delivered the response to the watcher.
1242 resource = watcher2->WaitForNextResource();
1243 ASSERT_NE(resource, nullptr);
1244 EXPECT_EQ(resource->name, "foo2");
1245 EXPECT_EQ(resource->value, 7);
1246 // Check metric data.
1247 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1248 ::testing::ElementsAre(::testing::Pair(
1249 ::testing::Pair(kDefaultXdsServerUrl,
1250 XdsFooResourceType::Get()->type_url()),
1251 2)));
1252 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1253 ::testing::ElementsAre());
1254 EXPECT_THAT(
1255 GetResourceCounts(),
1256 ::testing::ElementsAre(::testing::Pair(
1257 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1258 XdsFooResourceType::Get()->type_url(), "acked"),
1259 2)));
1260 // XdsClient should have sent an ACK message to the xDS server.
1261 request = WaitForRequest(stream.get());
1262 ASSERT_TRUE(request.has_value());
1263 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1264 /*version_info=*/"1", /*response_nonce=*/"B",
1265 /*error_detail=*/absl::OkStatus(),
1266 /*resource_names=*/{"foo1", "foo2"});
1267 // Cancel watch for "foo1".
1268 CancelFooWatch(watcher.get(), "foo1");
1269 // Check metric data.
1270 EXPECT_THAT(
1271 GetResourceCounts(),
1272 ::testing::ElementsAre(::testing::Pair(
1273 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1274 XdsFooResourceType::Get()->type_url(), "acked"),
1275 1)));
1276 // XdsClient should send an unsubscription request.
1277 request = WaitForRequest(stream.get());
1278 ASSERT_TRUE(request.has_value());
1279 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1280 /*version_info=*/"1", /*response_nonce=*/"B",
1281 /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo2"});
1282 // Now cancel watch for "foo2".
1283 CancelFooWatch(watcher2.get(), "foo2");
1284 EXPECT_TRUE(stream->Orphaned());
1285 }
1286
TEST_F(XdsClientTest,UpdateContainsOnlyChangedResource)1287 TEST_F(XdsClientTest, UpdateContainsOnlyChangedResource) {
1288 InitXdsClient();
1289 // Start a watch for "foo1".
1290 auto watcher = StartFooWatch("foo1");
1291 // Watcher should initially not see any resource reported.
1292 EXPECT_FALSE(watcher->HasEvent());
1293 // XdsClient should have created an ADS stream.
1294 auto stream = WaitForAdsStream();
1295 ASSERT_TRUE(stream != nullptr);
1296 // XdsClient should have sent a subscription request on the ADS stream.
1297 auto request = WaitForRequest(stream.get());
1298 ASSERT_TRUE(request.has_value());
1299 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1300 /*version_info=*/"", /*response_nonce=*/"",
1301 /*error_detail=*/absl::OkStatus(),
1302 /*resource_names=*/{"foo1"});
1303 CheckRequestNode(*request); // Should be present on the first request.
1304 // Send a response.
1305 stream->SendMessageToClient(
1306 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1307 .set_version_info("1")
1308 .set_nonce("A")
1309 .AddFooResource(XdsFooResource("foo1", 6))
1310 .Serialize());
1311 // XdsClient should have delivered the response to the watcher.
1312 auto resource = watcher->WaitForNextResource();
1313 ASSERT_NE(resource, nullptr);
1314 EXPECT_EQ(resource->name, "foo1");
1315 EXPECT_EQ(resource->value, 6);
1316 // XdsClient should have sent an ACK message to the xDS server.
1317 request = WaitForRequest(stream.get());
1318 ASSERT_TRUE(request.has_value());
1319 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1320 /*version_info=*/"1", /*response_nonce=*/"A",
1321 /*error_detail=*/absl::OkStatus(),
1322 /*resource_names=*/{"foo1"});
1323 // Start a watch for "foo2".
1324 auto watcher2 = StartFooWatch("foo2");
1325 // XdsClient should have sent a subscription request on the ADS stream.
1326 request = WaitForRequest(stream.get());
1327 ASSERT_TRUE(request.has_value());
1328 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1329 /*version_info=*/"1", /*response_nonce=*/"A",
1330 /*error_detail=*/absl::OkStatus(),
1331 /*resource_names=*/{"foo1", "foo2"});
1332 // Send a response.
1333 stream->SendMessageToClient(
1334 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1335 .set_version_info("1")
1336 .set_nonce("B")
1337 .AddFooResource(XdsFooResource("foo2", 7))
1338 .Serialize());
1339 // XdsClient should have delivered the response to the watcher.
1340 resource = watcher2->WaitForNextResource();
1341 ASSERT_NE(resource, nullptr);
1342 EXPECT_EQ(resource->name, "foo2");
1343 EXPECT_EQ(resource->value, 7);
1344 // Check metric data.
1345 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1346 ::testing::ElementsAre(::testing::Pair(
1347 ::testing::Pair(kDefaultXdsServerUrl,
1348 XdsFooResourceType::Get()->type_url()),
1349 2)));
1350 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1351 ::testing::ElementsAre());
1352 EXPECT_THAT(
1353 GetResourceCounts(),
1354 ::testing::ElementsAre(::testing::Pair(
1355 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1356 XdsFooResourceType::Get()->type_url(), "acked"),
1357 2)));
1358 // XdsClient should have sent an ACK message to the xDS server.
1359 request = WaitForRequest(stream.get());
1360 ASSERT_TRUE(request.has_value());
1361 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1362 /*version_info=*/"1", /*response_nonce=*/"B",
1363 /*error_detail=*/absl::OkStatus(),
1364 /*resource_names=*/{"foo1", "foo2"});
1365 // Server sends an update for "foo1". The response does not contain "foo2".
1366 stream->SendMessageToClient(
1367 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1368 .set_version_info("2")
1369 .set_nonce("C")
1370 .AddFooResource(XdsFooResource("foo1", 9))
1371 .Serialize());
1372 // XdsClient should have delivered the response to the watcher.
1373 resource = watcher->WaitForNextResource();
1374 ASSERT_NE(resource, nullptr);
1375 EXPECT_EQ(resource->name, "foo1");
1376 EXPECT_EQ(resource->value, 9);
1377 // Check metric data.
1378 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1379 ::testing::ElementsAre(::testing::Pair(
1380 ::testing::Pair(kDefaultXdsServerUrl,
1381 XdsFooResourceType::Get()->type_url()),
1382 3)));
1383 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1384 ::testing::ElementsAre());
1385 EXPECT_THAT(
1386 GetResourceCounts(),
1387 ::testing::ElementsAre(::testing::Pair(
1388 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1389 XdsFooResourceType::Get()->type_url(), "acked"),
1390 2)));
1391 // XdsClient should have sent an ACK message to the xDS server.
1392 request = WaitForRequest(stream.get());
1393 ASSERT_TRUE(request.has_value());
1394 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1395 /*version_info=*/"2", /*response_nonce=*/"C",
1396 /*error_detail=*/absl::OkStatus(),
1397 /*resource_names=*/{"foo1", "foo2"});
1398 // Cancel watch for "foo1".
1399 CancelFooWatch(watcher.get(), "foo1");
1400 // XdsClient should send an unsubscription request.
1401 request = WaitForRequest(stream.get());
1402 ASSERT_TRUE(request.has_value());
1403 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1404 /*version_info=*/"2", /*response_nonce=*/"C",
1405 /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo2"});
1406 // Now cancel watch for "foo2".
1407 CancelFooWatch(watcher2.get(), "foo2");
1408 EXPECT_TRUE(stream->Orphaned());
1409 }
1410
TEST_F(XdsClientTest,ResourceValidationFailure)1411 TEST_F(XdsClientTest, ResourceValidationFailure) {
1412 InitXdsClient();
1413 // Start a watch for "foo1".
1414 auto watcher = StartFooWatch("foo1");
1415 // Check metric data.
1416 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1417 ::testing::ElementsAre());
1418 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1419 ::testing::ElementsAre());
1420 EXPECT_THAT(GetResourceCounts(),
1421 ::testing::ElementsAre(::testing::Pair(
1422 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1423 XdsFooResourceType::Get()->type_url(),
1424 "requested"),
1425 1)));
1426 // Watcher should initially not see any resource reported.
1427 EXPECT_FALSE(watcher->HasEvent());
1428 // XdsClient should have created an ADS stream.
1429 auto stream = WaitForAdsStream();
1430 ASSERT_TRUE(stream != nullptr);
1431 // XdsClient should have sent a subscription request on the ADS stream.
1432 auto request = WaitForRequest(stream.get());
1433 ASSERT_TRUE(request.has_value());
1434 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1435 /*version_info=*/"", /*response_nonce=*/"",
1436 /*error_detail=*/absl::OkStatus(),
1437 /*resource_names=*/{"foo1"});
1438 CheckRequestNode(*request); // Should be present on the first request.
1439 // Send a response containing an invalid resource.
1440 stream->SendMessageToClient(
1441 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1442 .set_version_info("1")
1443 .set_nonce("A")
1444 .AddInvalidResource(XdsFooResourceType::Get()->type_url(),
1445 "{\"name\":\"foo1\",\"value\":[]}")
1446 .Serialize());
1447 // XdsClient should deliver an error to the watcher.
1448 auto error = watcher->WaitForNextError();
1449 ASSERT_TRUE(error.has_value());
1450 EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
1451 EXPECT_EQ(error->message(),
1452 "invalid resource: INVALID_ARGUMENT: errors validating JSON: "
1453 "[field:value error:is not a number] (node ID:xds_client_test)")
1454 << *error;
1455 // Check metric data.
1456 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1457 ::testing::ElementsAre());
1458 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1459 ::testing::ElementsAre(::testing::Pair(
1460 ::testing::Pair(kDefaultXdsServerUrl,
1461 XdsFooResourceType::Get()->type_url()),
1462 1)));
1463 EXPECT_THAT(GetResourceCounts(),
1464 ::testing::ElementsAre(::testing::Pair(
1465 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1466 XdsFooResourceType::Get()->type_url(),
1467 "nacked"),
1468 1)));
1469 // XdsClient should NACK the update.
1470 // Note that version_info is not populated in the request.
1471 request = WaitForRequest(stream.get());
1472 ASSERT_TRUE(request.has_value());
1473 CheckRequest(
1474 *request, XdsFooResourceType::Get()->type_url(),
1475 /*version_info=*/"", /*response_nonce=*/"A",
1476 // error_detail=
1477 absl::InvalidArgumentError(
1478 "xDS response validation errors: ["
1479 "resource index 0: foo1: INVALID_ARGUMENT: errors validating JSON: "
1480 "[field:value error:is not a number]]"),
1481 /*resource_names=*/{"foo1"});
1482 // Start a second watch for the same resource. It should immediately
1483 // receive the same error.
1484 auto watcher2 = StartFooWatch("foo1");
1485 error = watcher2->WaitForNextError();
1486 ASSERT_TRUE(error.has_value());
1487 EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
1488 EXPECT_EQ(error->message(),
1489 "invalid resource: INVALID_ARGUMENT: errors validating JSON: "
1490 "[field:value error:is not a number] (node ID:xds_client_test)")
1491 << *error;
1492 // Now server sends an updated version of the resource.
1493 stream->SendMessageToClient(
1494 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1495 .set_version_info("2")
1496 .set_nonce("B")
1497 .AddFooResource(XdsFooResource("foo1", 9))
1498 .Serialize());
1499 // XdsClient should deliver the response to both watchers.
1500 auto resource = watcher->WaitForNextResource();
1501 ASSERT_NE(resource, nullptr);
1502 EXPECT_EQ(resource->name, "foo1");
1503 EXPECT_EQ(resource->value, 9);
1504 resource = watcher2->WaitForNextResource();
1505 ASSERT_NE(resource, nullptr);
1506 EXPECT_EQ(resource->name, "foo1");
1507 EXPECT_EQ(resource->value, 9);
1508 // Check metric data.
1509 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1510 ::testing::ElementsAre(::testing::Pair(
1511 ::testing::Pair(kDefaultXdsServerUrl,
1512 XdsFooResourceType::Get()->type_url()),
1513 1)));
1514 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1515 ::testing::ElementsAre(::testing::Pair(
1516 ::testing::Pair(kDefaultXdsServerUrl,
1517 XdsFooResourceType::Get()->type_url()),
1518 1)));
1519 EXPECT_THAT(
1520 GetResourceCounts(),
1521 ::testing::ElementsAre(::testing::Pair(
1522 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1523 XdsFooResourceType::Get()->type_url(), "acked"),
1524 1)));
1525 // XdsClient should have sent an ACK message to the xDS server.
1526 request = WaitForRequest(stream.get());
1527 ASSERT_TRUE(request.has_value());
1528 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1529 /*version_info=*/"2", /*response_nonce=*/"B",
1530 /*error_detail=*/absl::OkStatus(),
1531 /*resource_names=*/{"foo1"});
1532 // Cancel watch.
1533 CancelFooWatch(watcher.get(), "foo1");
1534 CancelFooWatch(watcher2.get(), "foo1");
1535 EXPECT_TRUE(stream->Orphaned());
1536 }
1537
TEST_F(XdsClientTest,ResourceValidationFailureMultipleResources)1538 TEST_F(XdsClientTest, ResourceValidationFailureMultipleResources) {
1539 InitXdsClient();
1540 // Start a watch for "foo1".
1541 auto watcher = StartFooWatch("foo1");
1542 // Watcher should initially not see any resource reported.
1543 EXPECT_FALSE(watcher->HasEvent());
1544 // Check metric data.
1545 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1546 ::testing::ElementsAre());
1547 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1548 ::testing::ElementsAre());
1549 EXPECT_THAT(GetResourceCounts(),
1550 ::testing::ElementsAre(::testing::Pair(
1551 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1552 XdsFooResourceType::Get()->type_url(),
1553 "requested"),
1554 1)));
1555 // XdsClient should have created an ADS stream.
1556 auto stream = WaitForAdsStream();
1557 ASSERT_TRUE(stream != nullptr);
1558 // XdsClient should have sent a subscription request on the ADS stream.
1559 auto request = WaitForRequest(stream.get());
1560 ASSERT_TRUE(request.has_value());
1561 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1562 /*version_info=*/"", /*response_nonce=*/"",
1563 /*error_detail=*/absl::OkStatus(),
1564 /*resource_names=*/{"foo1"});
1565 CheckRequestNode(*request); // Should be present on the first request.
1566 // Before the server responds, add a watch for another resource.
1567 auto watcher2 = StartFooWatch("foo2");
1568 // Check metric data.
1569 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1570 ::testing::ElementsAre());
1571 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1572 ::testing::ElementsAre());
1573 EXPECT_THAT(GetResourceCounts(),
1574 ::testing::ElementsAre(::testing::Pair(
1575 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1576 XdsFooResourceType::Get()->type_url(),
1577 "requested"),
1578 2)));
1579 // Client should send another request.
1580 request = WaitForRequest(stream.get());
1581 ASSERT_TRUE(request.has_value());
1582 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1583 /*version_info=*/"", /*response_nonce=*/"",
1584 /*error_detail=*/absl::OkStatus(),
1585 /*resource_names=*/{"foo1", "foo2"});
1586 // Add a watch for a third resource.
1587 auto watcher3 = StartFooWatch("foo3");
1588 // Check metric data.
1589 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1590 ::testing::ElementsAre());
1591 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1592 ::testing::ElementsAre());
1593 EXPECT_THAT(GetResourceCounts(),
1594 ::testing::ElementsAre(::testing::Pair(
1595 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1596 XdsFooResourceType::Get()->type_url(),
1597 "requested"),
1598 3)));
1599 // Client should send another request.
1600 request = WaitForRequest(stream.get());
1601 ASSERT_TRUE(request.has_value());
1602 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1603 /*version_info=*/"", /*response_nonce=*/"",
1604 /*error_detail=*/absl::OkStatus(),
1605 /*resource_names=*/{"foo1", "foo2", "foo3"});
1606 // Add a watch for a fourth resource.
1607 auto watcher4 = StartFooWatch("foo4");
1608 // Check metric data.
1609 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1610 ::testing::ElementsAre());
1611 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1612 ::testing::ElementsAre());
1613 EXPECT_THAT(GetResourceCounts(),
1614 ::testing::ElementsAre(::testing::Pair(
1615 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1616 XdsFooResourceType::Get()->type_url(),
1617 "requested"),
1618 4)));
1619 // Client should send another request.
1620 request = WaitForRequest(stream.get());
1621 ASSERT_TRUE(request.has_value());
1622 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1623 /*version_info=*/"", /*response_nonce=*/"",
1624 /*error_detail=*/absl::OkStatus(),
1625 /*resource_names=*/{"foo1", "foo2", "foo3", "foo4"});
1626 // Server sends a response containing three invalid resources and one
1627 // valid resource.
1628 stream->SendMessageToClient(
1629 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1630 .set_version_info("1")
1631 .set_nonce("A")
1632 // foo1: JSON parsing succeeds, so we know the resource name,
1633 // but validation fails.
1634 .AddInvalidResource(XdsFooResourceType::Get()->type_url(),
1635 "{\"name\":\"foo1\",\"value\":[]}")
1636 // foo2: JSON parsing fails, and not wrapped in a Resource
1637 // wrapper, so we don't actually know the resource's name.
1638 .AddInvalidResource(XdsFooResourceType::Get()->type_url(),
1639 "{\"name\":\"foo2,\"value\":6}")
1640 // Empty resource. Will be included in NACK but will not
1641 // affect any watchers.
1642 .AddEmptyResource()
1643 // Invalid resource wrapper. Will be included in NACK but
1644 // will not affect any watchers.
1645 .AddInvalidResourceWrapper()
1646 // foo3: JSON parsing fails, but it is wrapped in a Resource
1647 // wrapper, so we do know the resource's name.
1648 .AddInvalidResource(XdsFooResourceType::Get()->type_url(),
1649 "{\"name\":\"foo3,\"value\":6}",
1650 /*resource_wrapper_name=*/"foo3")
1651 // foo4: valid resource.
1652 .AddFooResource(XdsFooResource("foo4", 5))
1653 .Serialize());
1654 // XdsClient should deliver an error to the watchers for foo1 and foo3.
1655 auto error = watcher->WaitForNextError();
1656 ASSERT_TRUE(error.has_value());
1657 EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
1658 EXPECT_EQ(error->message(),
1659 "invalid resource: INVALID_ARGUMENT: errors validating JSON: "
1660 "[field:value error:is not a number] (node ID:xds_client_test)")
1661 << *error;
1662 error = watcher3->WaitForNextError();
1663 ASSERT_TRUE(error.has_value());
1664 EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
1665 EXPECT_EQ(error->message(),
1666 "invalid resource: INVALID_ARGUMENT: JSON parsing failed: "
1667 "[JSON parse error at index 15] (node ID:xds_client_test)")
1668 << *error;
1669 // It cannot delivery an error for foo2, because the client doesn't know
1670 // that that resource in the response was actually supposed to be foo2.
1671 EXPECT_FALSE(watcher2->HasEvent());
1672 // It will delivery a valid resource update for foo4.
1673 auto resource = watcher4->WaitForNextResource();
1674 ASSERT_NE(resource, nullptr);
1675 EXPECT_EQ(resource->name, "foo4");
1676 EXPECT_EQ(resource->value, 5);
1677 // Check metric data.
1678 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1679 ::testing::ElementsAre(::testing::Pair(
1680 ::testing::Pair(kDefaultXdsServerUrl,
1681 XdsFooResourceType::Get()->type_url()),
1682 1)));
1683 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1684 ::testing::ElementsAre(::testing::Pair(
1685 ::testing::Pair(kDefaultXdsServerUrl,
1686 XdsFooResourceType::Get()->type_url()),
1687 5)));
1688 EXPECT_THAT(
1689 GetResourceCounts(),
1690 ::testing::ElementsAre(
1691 // foo4
1692 ::testing::Pair(ResourceCountLabelsEq(
1693 XdsClient::kOldStyleAuthority,
1694 XdsFooResourceType::Get()->type_url(), "acked"),
1695 1),
1696 // foo1 and foo3
1697 ::testing::Pair(ResourceCountLabelsEq(
1698 XdsClient::kOldStyleAuthority,
1699 XdsFooResourceType::Get()->type_url(), "nacked"),
1700 2),
1701 // did not recognize response for foo2
1702 ::testing::Pair(
1703 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1704 XdsFooResourceType::Get()->type_url(),
1705 "requested"),
1706 1)));
1707 // XdsClient should NACK the update.
1708 // There was one good resource, so the version will be updated.
1709 request = WaitForRequest(stream.get());
1710 ASSERT_TRUE(request.has_value());
1711 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1712 /*version_info=*/"1", /*response_nonce=*/"A",
1713 // error_detail=
1714 absl::InvalidArgumentError(absl::StrCat(
1715 "xDS response validation errors: ["
1716 // foo1
1717 "resource index 0: foo1: "
1718 "INVALID_ARGUMENT: errors validating JSON: "
1719 "[field:value error:is not a number]; "
1720 // foo2 (name not known)
1721 "resource index 1: INVALID_ARGUMENT: JSON parsing failed: "
1722 "[JSON parse error at index 15]; "
1723 // empty resource
1724 "resource index 2: incorrect resource type \"\" "
1725 "(should be \"",
1726 XdsFooResourceType::Get()->type_url(),
1727 "\"); "
1728 // invalid resource wrapper
1729 "resource index 3: Can't decode Resource proto wrapper; "
1730 // foo3
1731 "resource index 4: foo3: "
1732 "INVALID_ARGUMENT: JSON parsing failed: "
1733 "[JSON parse error at index 15]]")),
1734 /*resource_names=*/{"foo1", "foo2", "foo3", "foo4"});
1735 // Cancel watches.
1736 CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true);
1737 CancelFooWatch(watcher2.get(), "foo2", /*delay_unsubscription=*/true);
1738 CancelFooWatch(watcher3.get(), "foo3", /*delay_unsubscription=*/true);
1739 CancelFooWatch(watcher4.get(), "foo4");
1740 EXPECT_TRUE(stream->Orphaned());
1741 }
1742
TEST_F(XdsClientTest,ResourceValidationFailureForCachedResource)1743 TEST_F(XdsClientTest, ResourceValidationFailureForCachedResource) {
1744 InitXdsClient();
1745 // Start a watch for "foo1".
1746 auto watcher = StartFooWatch("foo1");
1747 // Watcher should initially not see any resource reported.
1748 EXPECT_FALSE(watcher->HasEvent());
1749 // XdsClient should have created an ADS stream.
1750 auto stream = WaitForAdsStream();
1751 ASSERT_TRUE(stream != nullptr);
1752 // XdsClient should have sent a subscription request on the ADS stream.
1753 auto request = WaitForRequest(stream.get());
1754 ASSERT_TRUE(request.has_value());
1755 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1756 /*version_info=*/"", /*response_nonce=*/"",
1757 /*error_detail=*/absl::OkStatus(),
1758 /*resource_names=*/{"foo1"});
1759 CheckRequestNode(*request); // Should be present on the first request.
1760 // Send a response.
1761 stream->SendMessageToClient(
1762 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1763 .set_version_info("1")
1764 .set_nonce("A")
1765 .AddFooResource(XdsFooResource("foo1", 6))
1766 .Serialize());
1767 // XdsClient should have delivered the response to the watcher.
1768 auto resource = watcher->WaitForNextResource();
1769 ASSERT_NE(resource, nullptr);
1770 EXPECT_EQ(resource->name, "foo1");
1771 EXPECT_EQ(resource->value, 6);
1772 // Check metric data.
1773 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1774 ::testing::ElementsAre(::testing::Pair(
1775 ::testing::Pair(kDefaultXdsServerUrl,
1776 XdsFooResourceType::Get()->type_url()),
1777 1)));
1778 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1779 ::testing::ElementsAre());
1780 EXPECT_THAT(
1781 GetResourceCounts(),
1782 ::testing::ElementsAre(::testing::Pair(
1783 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1784 XdsFooResourceType::Get()->type_url(), "acked"),
1785 1)));
1786 // XdsClient should have sent an ACK message to the xDS server.
1787 request = WaitForRequest(stream.get());
1788 ASSERT_TRUE(request.has_value());
1789 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1790 /*version_info=*/"1", /*response_nonce=*/"A",
1791 /*error_detail=*/absl::OkStatus(),
1792 /*resource_names=*/{"foo1"});
1793 // Send an update containing an invalid resource.
1794 stream->SendMessageToClient(
1795 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1796 .set_version_info("2")
1797 .set_nonce("B")
1798 .AddInvalidResource(XdsFooResourceType::Get()->type_url(),
1799 "{\"name\":\"foo1\",\"value\":[]}")
1800 .Serialize());
1801 // XdsClient should deliver an error to the watcher.
1802 auto error = watcher->WaitForNextError();
1803 ASSERT_TRUE(error.has_value());
1804 EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
1805 EXPECT_EQ(error->message(),
1806 "invalid resource: INVALID_ARGUMENT: errors validating JSON: "
1807 "[field:value error:is not a number] (node ID:xds_client_test)")
1808 << *error;
1809 // Check metric data.
1810 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1811 ::testing::ElementsAre(::testing::Pair(
1812 ::testing::Pair(kDefaultXdsServerUrl,
1813 XdsFooResourceType::Get()->type_url()),
1814 1)));
1815 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1816 ::testing::ElementsAre(::testing::Pair(
1817 ::testing::Pair(kDefaultXdsServerUrl,
1818 XdsFooResourceType::Get()->type_url()),
1819 1)));
1820 EXPECT_THAT(GetResourceCounts(),
1821 ::testing::ElementsAre(::testing::Pair(
1822 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1823 XdsFooResourceType::Get()->type_url(),
1824 "nacked_but_cached"),
1825 1)));
1826 // XdsClient should NACK the update.
1827 // Note that version_info is set to the previous version in this request,
1828 // because there were no valid resources in it.
1829 request = WaitForRequest(stream.get());
1830 ASSERT_TRUE(request.has_value());
1831 CheckRequest(
1832 *request, XdsFooResourceType::Get()->type_url(),
1833 /*version_info=*/"1", /*response_nonce=*/"B",
1834 // error_detail=
1835 absl::InvalidArgumentError(
1836 "xDS response validation errors: ["
1837 "resource index 0: foo1: INVALID_ARGUMENT: errors validating JSON: "
1838 "[field:value error:is not a number]]"),
1839 /*resource_names=*/{"foo1"});
1840 // Start a second watcher for the same resource. Even though the last
1841 // update was a NACK, we should still deliver the cached resource to
1842 // the watcher.
1843 // TODO(roth): Consider what the right behavior is here. It seems
1844 // inconsistent that the watcher sees the error if it had started
1845 // before the error was seen but does not if it was started afterwards.
1846 // One option is to not send errors at all for already-cached resources;
1847 // another option is to send the errors even for newly started watchers.
1848 auto watcher2 = StartFooWatch("foo1");
1849 resource = watcher2->WaitForNextResource();
1850 ASSERT_NE(resource, nullptr);
1851 EXPECT_EQ(resource->name, "foo1");
1852 EXPECT_EQ(resource->value, 6);
1853 // Cancel watches.
1854 CancelFooWatch(watcher.get(), "foo1");
1855 CancelFooWatch(watcher2.get(), "foo1");
1856 EXPECT_TRUE(stream->Orphaned());
1857 }
1858
TEST_F(XdsClientTest,WildcardCapableResponseWithEmptyResource)1859 TEST_F(XdsClientTest, WildcardCapableResponseWithEmptyResource) {
1860 InitXdsClient();
1861 // Start a watch for "wc1".
1862 auto watcher = StartWildcardCapableWatch("wc1");
1863 // Watcher should initially not see any resource reported.
1864 EXPECT_FALSE(watcher->HasEvent());
1865 // XdsClient should have created an ADS stream.
1866 auto stream = WaitForAdsStream();
1867 ASSERT_TRUE(stream != nullptr);
1868 // XdsClient should have sent a subscription request on the ADS stream.
1869 auto request = WaitForRequest(stream.get());
1870 ASSERT_TRUE(request.has_value());
1871 CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
1872 /*version_info=*/"", /*response_nonce=*/"",
1873 /*error_detail=*/absl::OkStatus(),
1874 /*resource_names=*/{"wc1"});
1875 CheckRequestNode(*request); // Should be present on the first request.
1876 // Server sends a response containing the requested resources plus an
1877 // empty resource.
1878 stream->SendMessageToClient(
1879 ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
1880 .set_version_info("1")
1881 .set_nonce("A")
1882 .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 6))
1883 .AddEmptyResource()
1884 .Serialize());
1885 // XdsClient will delivery a valid resource update for wc1.
1886 auto resource = watcher->WaitForNextResource();
1887 ASSERT_NE(resource, nullptr);
1888 EXPECT_EQ(resource->name, "wc1");
1889 EXPECT_EQ(resource->value, 6);
1890 // Check metric data.
1891 EXPECT_THAT(
1892 metrics_reporter_->resource_updates_valid(),
1893 ::testing::ElementsAre(::testing::Pair(
1894 ::testing::Pair(kDefaultXdsServerUrl,
1895 XdsWildcardCapableResourceType::Get()->type_url()),
1896 1)));
1897 EXPECT_THAT(
1898 metrics_reporter_->resource_updates_invalid(),
1899 ::testing::ElementsAre(::testing::Pair(
1900 ::testing::Pair(kDefaultXdsServerUrl,
1901 XdsWildcardCapableResourceType::Get()->type_url()),
1902 1)));
1903 EXPECT_THAT(
1904 GetResourceCounts(),
1905 ::testing::ElementsAre(::testing::Pair(
1906 ResourceCountLabelsEq(
1907 XdsClient::kOldStyleAuthority,
1908 XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
1909 1)));
1910 // XdsClient should NACK the update.
1911 // There was one good resource, so the version will be updated.
1912 request = WaitForRequest(stream.get());
1913 ASSERT_TRUE(request.has_value());
1914 CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
1915 /*version_info=*/"1", /*response_nonce=*/"A",
1916 // error_detail=
1917 absl::InvalidArgumentError(absl::StrCat(
1918 "xDS response validation errors: ["
1919 "resource index 1: incorrect resource type \"\" "
1920 "(should be \"",
1921 XdsWildcardCapableResourceType::Get()->type_url(), "\")]")),
1922 /*resource_names=*/{"wc1"});
1923 // Cancel watch.
1924 CancelWildcardCapableWatch(watcher.get(), "wc1");
1925 EXPECT_TRUE(stream->Orphaned());
1926 }
1927
1928 // This tests resource removal triggered by the server when using a
1929 // resource type that requires all resources to be present in every
1930 // response, similar to LDS and CDS.
TEST_F(XdsClientTest,ResourceDeletion)1931 TEST_F(XdsClientTest, ResourceDeletion) {
1932 InitXdsClient();
1933 // Start a watch for "wc1".
1934 auto watcher = StartWildcardCapableWatch("wc1");
1935 // Watcher should initially not see any resource reported.
1936 EXPECT_FALSE(watcher->HasEvent());
1937 // XdsClient should have created an ADS stream.
1938 auto stream = WaitForAdsStream();
1939 ASSERT_TRUE(stream != nullptr);
1940 // XdsClient should have sent a subscription request on the ADS stream.
1941 auto request = WaitForRequest(stream.get());
1942 ASSERT_TRUE(request.has_value());
1943 CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
1944 /*version_info=*/"", /*response_nonce=*/"",
1945 /*error_detail=*/absl::OkStatus(),
1946 /*resource_names=*/{"wc1"});
1947 CheckRequestNode(*request); // Should be present on the first request.
1948 // Server sends a response.
1949 stream->SendMessageToClient(
1950 ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
1951 .set_version_info("1")
1952 .set_nonce("A")
1953 .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 6))
1954 .Serialize());
1955 // XdsClient should have delivered the response to the watcher.
1956 auto resource = watcher->WaitForNextResource();
1957 ASSERT_NE(resource, nullptr);
1958 EXPECT_EQ(resource->name, "wc1");
1959 EXPECT_EQ(resource->value, 6);
1960 // Check metric data.
1961 EXPECT_THAT(
1962 metrics_reporter_->resource_updates_valid(),
1963 ::testing::ElementsAre(::testing::Pair(
1964 ::testing::Pair(kDefaultXdsServerUrl,
1965 XdsWildcardCapableResourceType::Get()->type_url()),
1966 1)));
1967 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1968 ::testing::ElementsAre());
1969 EXPECT_THAT(
1970 GetResourceCounts(),
1971 ::testing::ElementsAre(::testing::Pair(
1972 ResourceCountLabelsEq(
1973 XdsClient::kOldStyleAuthority,
1974 XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
1975 1)));
1976 // XdsClient should have sent an ACK message to the xDS server.
1977 request = WaitForRequest(stream.get());
1978 ASSERT_TRUE(request.has_value());
1979 CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
1980 /*version_info=*/"1", /*response_nonce=*/"A",
1981 /*error_detail=*/absl::OkStatus(),
1982 /*resource_names=*/{"wc1"});
1983 // Server now sends a response without the resource, thus indicating
1984 // it's been deleted.
1985 stream->SendMessageToClient(
1986 ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
1987 .set_version_info("2")
1988 .set_nonce("B")
1989 .Serialize());
1990 // Watcher should see the does-not-exist event.
1991 EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(1)));
1992 // Check metric data.
1993 EXPECT_THAT(
1994 metrics_reporter_->resource_updates_valid(),
1995 ::testing::ElementsAre(::testing::Pair(
1996 ::testing::Pair(kDefaultXdsServerUrl,
1997 XdsWildcardCapableResourceType::Get()->type_url()),
1998 1)));
1999 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2000 ::testing::ElementsAre());
2001 EXPECT_THAT(GetResourceCounts(),
2002 ::testing::ElementsAre(::testing::Pair(
2003 ResourceCountLabelsEq(
2004 XdsClient::kOldStyleAuthority,
2005 XdsWildcardCapableResourceType::Get()->type_url(),
2006 "does_not_exist"),
2007 1)));
2008 // Start a new watcher for the same resource. It should immediately
2009 // receive the same does-not-exist notification.
2010 auto watcher2 = StartWildcardCapableWatch("wc1");
2011 EXPECT_TRUE(watcher2->WaitForDoesNotExist(absl::Seconds(1)));
2012 // XdsClient should have sent an ACK message to the xDS server.
2013 request = WaitForRequest(stream.get());
2014 ASSERT_TRUE(request.has_value());
2015 CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2016 /*version_info=*/"2", /*response_nonce=*/"B",
2017 /*error_detail=*/absl::OkStatus(),
2018 /*resource_names=*/{"wc1"});
2019 // Server sends the resource again.
2020 stream->SendMessageToClient(
2021 ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2022 .set_version_info("3")
2023 .set_nonce("C")
2024 .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 7))
2025 .Serialize());
2026 // XdsClient should have delivered the response to the watchers.
2027 resource = watcher->WaitForNextResource();
2028 ASSERT_NE(resource, nullptr);
2029 EXPECT_EQ(resource->name, "wc1");
2030 EXPECT_EQ(resource->value, 7);
2031 resource = watcher2->WaitForNextResource();
2032 ASSERT_NE(resource, nullptr);
2033 EXPECT_EQ(resource->name, "wc1");
2034 EXPECT_EQ(resource->value, 7);
2035 // Check metric data.
2036 EXPECT_THAT(
2037 metrics_reporter_->resource_updates_valid(),
2038 ::testing::ElementsAre(::testing::Pair(
2039 ::testing::Pair(kDefaultXdsServerUrl,
2040 XdsWildcardCapableResourceType::Get()->type_url()),
2041 2)));
2042 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2043 ::testing::ElementsAre());
2044 EXPECT_THAT(
2045 GetResourceCounts(),
2046 ::testing::ElementsAre(::testing::Pair(
2047 ResourceCountLabelsEq(
2048 XdsClient::kOldStyleAuthority,
2049 XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
2050 1)));
2051 // XdsClient should have sent an ACK message to the xDS server.
2052 request = WaitForRequest(stream.get());
2053 ASSERT_TRUE(request.has_value());
2054 CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2055 /*version_info=*/"3", /*response_nonce=*/"C",
2056 /*error_detail=*/absl::OkStatus(),
2057 /*resource_names=*/{"wc1"});
2058 // Cancel watch.
2059 CancelWildcardCapableWatch(watcher.get(), "wc1");
2060 CancelWildcardCapableWatch(watcher2.get(), "wc1");
2061 EXPECT_TRUE(stream->Orphaned());
2062 }
2063
2064 // This tests that when we ignore resource deletions from the server
2065 // when configured to do so.
TEST_F(XdsClientTest,ResourceDeletionIgnoredWhenConfigured)2066 TEST_F(XdsClientTest, ResourceDeletionIgnoredWhenConfigured) {
2067 InitXdsClient(FakeXdsBootstrap::Builder().SetServers(
2068 {FakeXdsBootstrap::FakeXdsServer(kDefaultXdsServerUrl, true)}));
2069 // Start a watch for "wc1".
2070 auto watcher = StartWildcardCapableWatch("wc1");
2071 // Watcher should initially not see any resource reported.
2072 EXPECT_FALSE(watcher->HasEvent());
2073 // XdsClient should have created an ADS stream.
2074 auto stream = WaitForAdsStream();
2075 ASSERT_TRUE(stream != nullptr);
2076 // XdsClient should have sent a subscription request on the ADS stream.
2077 auto request = WaitForRequest(stream.get());
2078 ASSERT_TRUE(request.has_value());
2079 CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2080 /*version_info=*/"", /*response_nonce=*/"",
2081 /*error_detail=*/absl::OkStatus(),
2082 /*resource_names=*/{"wc1"});
2083 CheckRequestNode(*request); // Should be present on the first request.
2084 // Server sends a response.
2085 stream->SendMessageToClient(
2086 ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2087 .set_version_info("1")
2088 .set_nonce("A")
2089 .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 6))
2090 .Serialize());
2091 // XdsClient should have delivered the response to the watcher.
2092 auto resource = watcher->WaitForNextResource();
2093 ASSERT_NE(resource, nullptr);
2094 EXPECT_EQ(resource->name, "wc1");
2095 EXPECT_EQ(resource->value, 6);
2096 // Check metric data.
2097 EXPECT_THAT(
2098 metrics_reporter_->resource_updates_valid(),
2099 ::testing::ElementsAre(::testing::Pair(
2100 ::testing::Pair(kDefaultXdsServerUrl,
2101 XdsWildcardCapableResourceType::Get()->type_url()),
2102 1)));
2103 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2104 ::testing::ElementsAre());
2105 EXPECT_THAT(
2106 GetResourceCounts(),
2107 ::testing::ElementsAre(::testing::Pair(
2108 ResourceCountLabelsEq(
2109 XdsClient::kOldStyleAuthority,
2110 XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
2111 1)));
2112 // XdsClient should have sent an ACK message to the xDS server.
2113 request = WaitForRequest(stream.get());
2114 ASSERT_TRUE(request.has_value());
2115 CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2116 /*version_info=*/"1", /*response_nonce=*/"A",
2117 /*error_detail=*/absl::OkStatus(),
2118 /*resource_names=*/{"wc1"});
2119 // Server now sends a response without the resource, thus indicating
2120 // it's been deleted.
2121 stream->SendMessageToClient(
2122 ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2123 .set_version_info("2")
2124 .set_nonce("B")
2125 .Serialize());
2126 // Watcher should not see any update, since we should have ignored the
2127 // deletion.
2128 EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(1)));
2129 // Check metric data.
2130 EXPECT_THAT(
2131 metrics_reporter_->resource_updates_valid(),
2132 ::testing::ElementsAre(::testing::Pair(
2133 ::testing::Pair(kDefaultXdsServerUrl,
2134 XdsWildcardCapableResourceType::Get()->type_url()),
2135 1)));
2136 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2137 ::testing::ElementsAre());
2138 EXPECT_THAT(
2139 GetResourceCounts(),
2140 ::testing::ElementsAre(::testing::Pair(
2141 ResourceCountLabelsEq(
2142 XdsClient::kOldStyleAuthority,
2143 XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
2144 1)));
2145 // Start a new watcher for the same resource. It should immediately
2146 // receive the cached resource.
2147 auto watcher2 = StartWildcardCapableWatch("wc1");
2148 resource = watcher2->WaitForNextResource();
2149 ASSERT_NE(resource, nullptr);
2150 EXPECT_EQ(resource->name, "wc1");
2151 EXPECT_EQ(resource->value, 6);
2152 // XdsClient should have sent an ACK message to the xDS server.
2153 request = WaitForRequest(stream.get());
2154 ASSERT_TRUE(request.has_value());
2155 CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2156 /*version_info=*/"2", /*response_nonce=*/"B",
2157 /*error_detail=*/absl::OkStatus(),
2158 /*resource_names=*/{"wc1"});
2159 // Server sends a new value for the resource.
2160 stream->SendMessageToClient(
2161 ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2162 .set_version_info("3")
2163 .set_nonce("C")
2164 .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 7))
2165 .Serialize());
2166 // XdsClient should have delivered the response to the watchers.
2167 resource = watcher->WaitForNextResource();
2168 ASSERT_NE(resource, nullptr);
2169 EXPECT_EQ(resource->name, "wc1");
2170 EXPECT_EQ(resource->value, 7);
2171 resource = watcher2->WaitForNextResource();
2172 ASSERT_NE(resource, nullptr);
2173 EXPECT_EQ(resource->name, "wc1");
2174 EXPECT_EQ(resource->value, 7);
2175 // Check metric data.
2176 EXPECT_THAT(
2177 metrics_reporter_->resource_updates_valid(),
2178 ::testing::ElementsAre(::testing::Pair(
2179 ::testing::Pair(kDefaultXdsServerUrl,
2180 XdsWildcardCapableResourceType::Get()->type_url()),
2181 2)));
2182 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2183 ::testing::ElementsAre());
2184 EXPECT_THAT(
2185 GetResourceCounts(),
2186 ::testing::ElementsAre(::testing::Pair(
2187 ResourceCountLabelsEq(
2188 XdsClient::kOldStyleAuthority,
2189 XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
2190 1)));
2191 // XdsClient should have sent an ACK message to the xDS server.
2192 request = WaitForRequest(stream.get());
2193 ASSERT_TRUE(request.has_value());
2194 CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2195 /*version_info=*/"3", /*response_nonce=*/"C",
2196 /*error_detail=*/absl::OkStatus(),
2197 /*resource_names=*/{"wc1"});
2198 // Cancel watch.
2199 CancelWildcardCapableWatch(watcher.get(), "wc1");
2200 CancelWildcardCapableWatch(watcher2.get(), "wc1");
2201 EXPECT_TRUE(stream->Orphaned());
2202 }
2203
TEST_F(XdsClientTest,StreamClosedByServer)2204 TEST_F(XdsClientTest, StreamClosedByServer) {
2205 InitXdsClient();
2206 // Metrics should initially be empty.
2207 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
2208 // Start a watch for "foo1".
2209 auto watcher = StartFooWatch("foo1");
2210 // Check metric data.
2211 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2212 kDefaultXdsServerUrl, true)));
2213 // Watcher should initially not see any resource reported.
2214 EXPECT_FALSE(watcher->HasEvent());
2215 // XdsClient should have created an ADS stream.
2216 auto stream = WaitForAdsStream();
2217 ASSERT_TRUE(stream != nullptr);
2218 // XdsClient should have sent a subscription request on the ADS stream.
2219 auto request = WaitForRequest(stream.get());
2220 ASSERT_TRUE(request.has_value());
2221 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2222 /*version_info=*/"", /*response_nonce=*/"",
2223 /*error_detail=*/absl::OkStatus(),
2224 /*resource_names=*/{"foo1"});
2225 CheckRequestNode(*request); // Should be present on the first request.
2226 // Server sends a response.
2227 stream->SendMessageToClient(
2228 ResponseBuilder(XdsFooResourceType::Get()->type_url())
2229 .set_version_info("1")
2230 .set_nonce("A")
2231 .AddFooResource(XdsFooResource("foo1", 6))
2232 .Serialize());
2233 // XdsClient should have delivered the response to the watcher.
2234 auto resource = watcher->WaitForNextResource();
2235 ASSERT_NE(resource, nullptr);
2236 EXPECT_EQ(resource->name, "foo1");
2237 EXPECT_EQ(resource->value, 6);
2238 // XdsClient should have sent an ACK message to the xDS server.
2239 request = WaitForRequest(stream.get());
2240 ASSERT_TRUE(request.has_value());
2241 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2242 /*version_info=*/"1", /*response_nonce=*/"A",
2243 /*error_detail=*/absl::OkStatus(),
2244 /*resource_names=*/{"foo1"});
2245 // Now server closes the stream.
2246 stream->MaybeSendStatusToClient(absl::OkStatus());
2247 // XdsClient should NOT report error to watcher, because we saw a
2248 // response on the stream before it failed.
2249 // Stream should be orphaned.
2250 EXPECT_TRUE(stream->Orphaned());
2251 // Check metric data.
2252 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2253 kDefaultXdsServerUrl, true)));
2254 // XdsClient should create a new stream.
2255 stream = WaitForAdsStream();
2256 ASSERT_TRUE(stream != nullptr);
2257 // XdsClient sends a subscription request.
2258 // Note that the version persists from the previous stream, but the
2259 // nonce does not.
2260 request = WaitForRequest(stream.get());
2261 ASSERT_TRUE(request.has_value());
2262 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2263 /*version_info=*/"1", /*response_nonce=*/"",
2264 /*error_detail=*/absl::OkStatus(),
2265 /*resource_names=*/{"foo1"});
2266 CheckRequestNode(*request); // Should be present on the first request.
2267 // Before the server resends the resource, start a new watcher for the
2268 // same resource. This watcher should immediately receive the cached
2269 // resource.
2270 auto watcher2 = StartFooWatch("foo1");
2271 resource = watcher2->WaitForNextResource();
2272 ASSERT_NE(resource, nullptr);
2273 EXPECT_EQ(resource->name, "foo1");
2274 EXPECT_EQ(resource->value, 6);
2275 // Server now sends the requested resource.
2276 stream->SendMessageToClient(
2277 ResponseBuilder(XdsFooResourceType::Get()->type_url())
2278 .set_version_info("1")
2279 .set_nonce("B")
2280 .AddFooResource(XdsFooResource("foo1", 6))
2281 .Serialize());
2282 // Watcher does NOT get an update, since the resource has not changed.
2283 EXPECT_FALSE(watcher->WaitForNextResource());
2284 // XdsClient sends an ACK.
2285 request = WaitForRequest(stream.get());
2286 ASSERT_TRUE(request.has_value());
2287 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2288 /*version_info=*/"1", /*response_nonce=*/"B",
2289 /*error_detail=*/absl::OkStatus(),
2290 /*resource_names=*/{"foo1"});
2291 // Cancel watcher.
2292 CancelFooWatch(watcher.get(), "foo1");
2293 CancelFooWatch(watcher2.get(), "foo1");
2294 EXPECT_TRUE(stream->Orphaned());
2295 }
2296
TEST_F(XdsClientTest,StreamClosedByServerWithoutSeeingResponse)2297 TEST_F(XdsClientTest, StreamClosedByServerWithoutSeeingResponse) {
2298 InitXdsClient();
2299 // Metrics should initially be empty.
2300 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
2301 // Start a watch for "foo1".
2302 auto watcher = StartFooWatch("foo1");
2303 // Check metric data.
2304 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2305 kDefaultXdsServerUrl, true)));
2306 // Watcher should initially not see any resource reported.
2307 EXPECT_FALSE(watcher->HasEvent());
2308 // XdsClient should have created an ADS stream.
2309 auto stream = WaitForAdsStream();
2310 ASSERT_TRUE(stream != nullptr);
2311 // XdsClient should have sent a subscription request on the ADS stream.
2312 auto request = WaitForRequest(stream.get());
2313 ASSERT_TRUE(request.has_value());
2314 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2315 /*version_info=*/"", /*response_nonce=*/"",
2316 /*error_detail=*/absl::OkStatus(),
2317 /*resource_names=*/{"foo1"});
2318 CheckRequestNode(*request); // Should be present on the first request.
2319 // Server closes the stream without sending a response.
2320 stream->MaybeSendStatusToClient(absl::UnavailableError("ugh"));
2321 // Check metric data.
2322 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2323 kDefaultXdsServerUrl, false)));
2324 // XdsClient should report an error to the watcher.
2325 auto error = watcher->WaitForNextError();
2326 ASSERT_TRUE(error.has_value());
2327 EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
2328 EXPECT_EQ(error->message(),
2329 "xDS channel for server default_xds_server: xDS call failed "
2330 "with no responses received; status: UNAVAILABLE: ugh "
2331 "(node ID:xds_client_test)")
2332 << *error;
2333 // XdsClient should create a new stream.
2334 stream = WaitForAdsStream();
2335 ASSERT_TRUE(stream != nullptr);
2336 // XdsClient sends a subscription request.
2337 request = WaitForRequest(stream.get());
2338 ASSERT_TRUE(request.has_value());
2339 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2340 /*version_info=*/"", /*response_nonce=*/"",
2341 /*error_detail=*/absl::OkStatus(),
2342 /*resource_names=*/{"foo1"});
2343 CheckRequestNode(*request); // Should be present on the first request.
2344 // Connection still reported as unhappy until we get a response.
2345 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2346 kDefaultXdsServerUrl, false)));
2347 // Server now sends the requested resource.
2348 stream->SendMessageToClient(
2349 ResponseBuilder(XdsFooResourceType::Get()->type_url())
2350 .set_version_info("1")
2351 .set_nonce("A")
2352 .AddFooResource(XdsFooResource("foo1", 6))
2353 .Serialize());
2354 // Watcher gets the resource.
2355 auto resource = watcher->WaitForNextResource();
2356 ASSERT_NE(resource, nullptr);
2357 EXPECT_EQ(resource->name, "foo1");
2358 EXPECT_EQ(resource->value, 6);
2359 // Connection now reported as happy.
2360 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2361 kDefaultXdsServerUrl, true)));
2362 // XdsClient sends an ACK.
2363 request = WaitForRequest(stream.get());
2364 ASSERT_TRUE(request.has_value());
2365 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2366 /*version_info=*/"1", /*response_nonce=*/"A",
2367 /*error_detail=*/absl::OkStatus(),
2368 /*resource_names=*/{"foo1"});
2369 // Cancel watcher.
2370 CancelFooWatch(watcher.get(), "foo1");
2371 EXPECT_TRUE(stream->Orphaned());
2372 }
2373
TEST_F(XdsClientTest,ConnectionFails)2374 TEST_F(XdsClientTest, ConnectionFails) {
2375 // Lower resources-does-not-exist timeout, to make sure that we're not
2376 // triggering that here.
2377 InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3));
2378 // Tell transport to let us manually trigger completion of the
2379 // send_message ops to XdsClient.
2380 transport_factory_->SetAutoCompleteMessagesFromClient(false);
2381 // Metrics should initially be empty.
2382 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
2383 // Start a watch for "foo1".
2384 auto watcher = StartFooWatch("foo1");
2385 // Check metric data.
2386 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2387 kDefaultXdsServerUrl, true)));
2388 // Watcher should initially not see any resource reported.
2389 EXPECT_FALSE(watcher->HasEvent());
2390 // XdsClient should have created an ADS stream.
2391 auto stream = WaitForAdsStream();
2392 ASSERT_TRUE(stream != nullptr);
2393 // XdsClient should have sent a subscription request on the ADS stream.
2394 auto request = WaitForRequest(stream.get());
2395 ASSERT_TRUE(request.has_value());
2396 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2397 /*version_info=*/"", /*response_nonce=*/"",
2398 /*error_detail=*/absl::OkStatus(),
2399 /*resource_names=*/{"foo1"});
2400 CheckRequestNode(*request); // Should be present on the first request.
2401 // Transport reports connection failure.
2402 TriggerConnectionFailure(*xds_client_->bootstrap().servers().front(),
2403 absl::UnavailableError("connection failed"));
2404 // XdsClient should report an error to the watcher.
2405 auto error = watcher->WaitForNextError();
2406 ASSERT_TRUE(error.has_value());
2407 EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
2408 EXPECT_EQ(error->message(),
2409 "xDS channel for server default_xds_server: "
2410 "connection failed (node ID:xds_client_test)")
2411 << *error;
2412 // Connection reported as unhappy.
2413 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2414 kDefaultXdsServerUrl, false)));
2415 // We should not see a resource-does-not-exist event, because the
2416 // timer should not be running while the channel is disconnected.
2417 EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4)));
2418 // Start a new watch. This watcher should be given the same error,
2419 // since we have not yet recovered.
2420 auto watcher2 = StartFooWatch("foo1");
2421 error = watcher2->WaitForNextError();
2422 ASSERT_TRUE(error.has_value());
2423 EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
2424 EXPECT_EQ(error->message(),
2425 "xDS channel for server default_xds_server: "
2426 "connection failed (node ID:xds_client_test)")
2427 << *error;
2428 // Second watcher should not see resource-does-not-exist either.
2429 EXPECT_FALSE(watcher2->HasEvent());
2430 // The ADS stream uses wait_for_ready inside the XdsTransport interface,
2431 // so when the channel reconnects, the already-started stream will proceed.
2432 stream->CompleteSendMessageFromClient();
2433 // Server sends a response.
2434 stream->SendMessageToClient(
2435 ResponseBuilder(XdsFooResourceType::Get()->type_url())
2436 .set_version_info("1")
2437 .set_nonce("A")
2438 .AddFooResource(XdsFooResource("foo1", 6))
2439 .Serialize());
2440 // Connection now reported as happy.
2441 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2442 kDefaultXdsServerUrl, true)));
2443 // XdsClient should have delivered the response to the watchers.
2444 auto resource = watcher->WaitForNextResource();
2445 ASSERT_NE(resource, nullptr);
2446 EXPECT_EQ(resource->name, "foo1");
2447 EXPECT_EQ(resource->value, 6);
2448 resource = watcher2->WaitForNextResource();
2449 ASSERT_NE(resource, nullptr);
2450 EXPECT_EQ(resource->name, "foo1");
2451 EXPECT_EQ(resource->value, 6);
2452 // XdsClient should have sent an ACK message to the xDS server.
2453 request = WaitForRequest(stream.get());
2454 ASSERT_TRUE(request.has_value());
2455 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2456 /*version_info=*/"1", /*response_nonce=*/"A",
2457 /*error_detail=*/absl::OkStatus(),
2458 /*resource_names=*/{"foo1"});
2459 stream->CompleteSendMessageFromClient();
2460 // Cancel watches.
2461 CancelFooWatch(watcher.get(), "foo1");
2462 CancelFooWatch(watcher2.get(), "foo1");
2463 EXPECT_TRUE(stream->Orphaned());
2464 }
2465
TEST_F(XdsClientTest,ResourceDoesNotExistUponTimeout)2466 TEST_F(XdsClientTest, ResourceDoesNotExistUponTimeout) {
2467 InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(1));
2468 // Start a watch for "foo1".
2469 auto watcher = StartFooWatch("foo1");
2470 // Watcher should initially not see any resource reported.
2471 EXPECT_FALSE(watcher->HasEvent());
2472 // Check metric data.
2473 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2474 ::testing::ElementsAre());
2475 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2476 ::testing::ElementsAre());
2477 EXPECT_THAT(GetResourceCounts(),
2478 ::testing::ElementsAre(::testing::Pair(
2479 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2480 XdsFooResourceType::Get()->type_url(),
2481 "requested"),
2482 1)));
2483 // XdsClient should have created an ADS stream.
2484 auto stream = WaitForAdsStream();
2485 ASSERT_TRUE(stream != nullptr);
2486 // XdsClient should have sent a subscription request on the ADS stream.
2487 auto request = WaitForRequest(stream.get());
2488 ASSERT_TRUE(request.has_value());
2489 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2490 /*version_info=*/"", /*response_nonce=*/"",
2491 /*error_detail=*/absl::OkStatus(),
2492 /*resource_names=*/{"foo1"});
2493 CheckRequestNode(*request); // Should be present on the first request.
2494 // Do not send a response, but wait for the resource to be reported as
2495 // not existing.
2496 EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(5)));
2497 // Check metric data.
2498 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2499 ::testing::ElementsAre());
2500 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2501 ::testing::ElementsAre());
2502 EXPECT_THAT(GetResourceCounts(),
2503 ::testing::ElementsAre(::testing::Pair(
2504 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2505 XdsFooResourceType::Get()->type_url(),
2506 "does_not_exist"),
2507 1)));
2508 // Start a new watcher for the same resource. It should immediately
2509 // receive the same does-not-exist notification.
2510 auto watcher2 = StartFooWatch("foo1");
2511 EXPECT_TRUE(watcher2->WaitForDoesNotExist(absl::Seconds(1)));
2512 // Now server sends a response.
2513 stream->SendMessageToClient(
2514 ResponseBuilder(XdsFooResourceType::Get()->type_url())
2515 .set_version_info("1")
2516 .set_nonce("A")
2517 .AddFooResource(XdsFooResource("foo1", 6))
2518 .Serialize());
2519 // XdsClient should have delivered the response to the watchers.
2520 auto resource = watcher->WaitForNextResource();
2521 ASSERT_NE(resource, nullptr);
2522 EXPECT_EQ(resource->name, "foo1");
2523 EXPECT_EQ(resource->value, 6);
2524 resource = watcher2->WaitForNextResource();
2525 ASSERT_NE(resource, nullptr);
2526 EXPECT_EQ(resource->name, "foo1");
2527 EXPECT_EQ(resource->value, 6);
2528 // Check metric data.
2529 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2530 ::testing::ElementsAre(::testing::Pair(
2531 ::testing::Pair(kDefaultXdsServerUrl,
2532 XdsFooResourceType::Get()->type_url()),
2533 1)));
2534 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2535 ::testing::ElementsAre());
2536 EXPECT_THAT(
2537 GetResourceCounts(),
2538 ::testing::ElementsAre(::testing::Pair(
2539 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2540 XdsFooResourceType::Get()->type_url(), "acked"),
2541 1)));
2542 // XdsClient should have sent an ACK message to the xDS server.
2543 request = WaitForRequest(stream.get());
2544 ASSERT_TRUE(request.has_value());
2545 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2546 /*version_info=*/"1", /*response_nonce=*/"A",
2547 /*error_detail=*/absl::OkStatus(),
2548 /*resource_names=*/{"foo1"});
2549 // Cancel watch.
2550 CancelFooWatch(watcher.get(), "foo1");
2551 CancelFooWatch(watcher2.get(), "foo1");
2552 EXPECT_TRUE(stream->Orphaned());
2553 }
2554
TEST_F(XdsClientTest,ResourceDoesNotExistAfterStreamRestart)2555 TEST_F(XdsClientTest, ResourceDoesNotExistAfterStreamRestart) {
2556 // Lower resources-does-not-exist timeout so test finishes faster.
2557 InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3));
2558 // Metrics should initially be empty.
2559 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2560 ::testing::ElementsAre());
2561 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2562 ::testing::ElementsAre());
2563 EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre());
2564 // Start a watch for "foo1".
2565 auto watcher = StartFooWatch("foo1");
2566 // Check metric data.
2567 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2568 ::testing::ElementsAre());
2569 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2570 ::testing::ElementsAre());
2571 EXPECT_THAT(GetResourceCounts(),
2572 ::testing::ElementsAre(::testing::Pair(
2573 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2574 XdsFooResourceType::Get()->type_url(),
2575 "requested"),
2576 1)));
2577 // Watcher should initially not see any resource reported.
2578 EXPECT_FALSE(watcher->HasEvent());
2579 // XdsClient should have created an ADS stream.
2580 auto stream = WaitForAdsStream();
2581 ASSERT_TRUE(stream != nullptr);
2582 // XdsClient should have sent a subscription request on the ADS stream.
2583 auto request = WaitForRequest(stream.get());
2584 ASSERT_TRUE(request.has_value());
2585 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2586 /*version_info=*/"", /*response_nonce=*/"",
2587 /*error_detail=*/absl::OkStatus(),
2588 /*resource_names=*/{"foo1"});
2589 CheckRequestNode(*request); // Should be present on the first request.
2590 // Stream fails.
2591 stream->MaybeSendStatusToClient(absl::UnavailableError("ugh"));
2592 // XdsClient should report error to watcher.
2593 auto error = watcher->WaitForNextError();
2594 ASSERT_TRUE(error.has_value());
2595 EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
2596 EXPECT_EQ(error->message(),
2597 "xDS channel for server default_xds_server: xDS call failed "
2598 "with no responses received; status: UNAVAILABLE: ugh "
2599 "(node ID:xds_client_test)")
2600 << *error;
2601 // Check metric data.
2602 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2603 ::testing::ElementsAre());
2604 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2605 ::testing::ElementsAre());
2606 EXPECT_THAT(GetResourceCounts(),
2607 ::testing::ElementsAre(::testing::Pair(
2608 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2609 XdsFooResourceType::Get()->type_url(),
2610 "requested"),
2611 1)));
2612 // XdsClient should create a new stream.
2613 stream = WaitForAdsStream();
2614 ASSERT_TRUE(stream != nullptr);
2615 // XdsClient sends a subscription request.
2616 request = WaitForRequest(stream.get());
2617 ASSERT_TRUE(request.has_value());
2618 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2619 /*version_info=*/"", /*response_nonce=*/"",
2620 /*error_detail=*/absl::OkStatus(),
2621 /*resource_names=*/{"foo1"});
2622 CheckRequestNode(*request); // Should be present on the first request.
2623 // Server does NOT send a response immediately.
2624 // Client should receive a resource does-not-exist.
2625 ASSERT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(4)));
2626 // Check metric data.
2627 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2628 ::testing::ElementsAre());
2629 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2630 ::testing::ElementsAre());
2631 EXPECT_THAT(GetResourceCounts(),
2632 ::testing::ElementsAre(::testing::Pair(
2633 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2634 XdsFooResourceType::Get()->type_url(),
2635 "does_not_exist"),
2636 1)));
2637 // Server now sends the requested resource.
2638 stream->SendMessageToClient(
2639 ResponseBuilder(XdsFooResourceType::Get()->type_url())
2640 .set_version_info("1")
2641 .set_nonce("A")
2642 .AddFooResource(XdsFooResource("foo1", 6))
2643 .Serialize());
2644 // The resource is delivered to the watcher.
2645 auto resource = watcher->WaitForNextResource();
2646 ASSERT_NE(resource, nullptr);
2647 EXPECT_EQ(resource->name, "foo1");
2648 EXPECT_EQ(resource->value, 6);
2649 // Check metric data.
2650 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2651 ::testing::ElementsAre(::testing::Pair(
2652 ::testing::Pair(kDefaultXdsServerUrl,
2653 XdsFooResourceType::Get()->type_url()),
2654 1)));
2655 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2656 ::testing::ElementsAre());
2657 EXPECT_THAT(
2658 GetResourceCounts(),
2659 ::testing::ElementsAre(::testing::Pair(
2660 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2661 XdsFooResourceType::Get()->type_url(), "acked"),
2662 1)));
2663 // XdsClient sends an ACK.
2664 request = WaitForRequest(stream.get());
2665 ASSERT_TRUE(request.has_value());
2666 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2667 /*version_info=*/"1", /*response_nonce=*/"A",
2668 /*error_detail=*/absl::OkStatus(),
2669 /*resource_names=*/{"foo1"});
2670 // Cancel watcher.
2671 CancelFooWatch(watcher.get(), "foo1");
2672 EXPECT_TRUE(stream->Orphaned());
2673 }
2674
TEST_F(XdsClientTest,DoesNotExistTimerNotStartedUntilSendCompletes)2675 TEST_F(XdsClientTest, DoesNotExistTimerNotStartedUntilSendCompletes) {
2676 // Lower resources-does-not-exist timeout, to make sure that we're not
2677 // triggering that here.
2678 InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3));
2679 // Tell transport to let us manually trigger completion of the
2680 // send_message ops to XdsClient.
2681 transport_factory_->SetAutoCompleteMessagesFromClient(false);
2682 // Start a watch for "foo1".
2683 auto watcher = StartFooWatch("foo1");
2684 // Watcher should initially not see any resource reported.
2685 EXPECT_FALSE(watcher->HasEvent());
2686 // XdsClient should have created an ADS stream.
2687 auto stream = WaitForAdsStream();
2688 ASSERT_TRUE(stream != nullptr);
2689 // XdsClient should have sent a subscription request on the ADS stream.
2690 auto request = WaitForRequest(stream.get());
2691 ASSERT_TRUE(request.has_value());
2692 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2693 /*version_info=*/"", /*response_nonce=*/"",
2694 /*error_detail=*/absl::OkStatus(),
2695 /*resource_names=*/{"foo1"});
2696 CheckRequestNode(*request); // Should be present on the first request.
2697 // Server does NOT send a response.
2698 // We should not see a resource-does-not-exist event, because the
2699 // timer should not be running while the channel is disconnected.
2700 EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4)));
2701 // Check metric data.
2702 EXPECT_THAT(GetResourceCounts(),
2703 ::testing::ElementsAre(::testing::Pair(
2704 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2705 XdsFooResourceType::Get()->type_url(),
2706 "requested"),
2707 1)));
2708 // The ADS stream uses wait_for_ready inside the XdsTransport interface,
2709 // so when the channel connects, the already-started stream will proceed.
2710 stream->CompleteSendMessageFromClient();
2711 // Server does NOT send a response.
2712 // Watcher should see a does-not-exist event.
2713 EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(4)));
2714 // Check metric data.
2715 EXPECT_THAT(GetResourceCounts(),
2716 ::testing::ElementsAre(::testing::Pair(
2717 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2718 XdsFooResourceType::Get()->type_url(),
2719 "does_not_exist"),
2720 1)));
2721 // Now server sends a response.
2722 stream->SendMessageToClient(
2723 ResponseBuilder(XdsFooResourceType::Get()->type_url())
2724 .set_version_info("1")
2725 .set_nonce("A")
2726 .AddFooResource(XdsFooResource("foo1", 6))
2727 .Serialize());
2728 // XdsClient should have delivered the response to the watcher.
2729 auto resource = watcher->WaitForNextResource();
2730 ASSERT_NE(resource, nullptr);
2731 EXPECT_EQ(resource->name, "foo1");
2732 EXPECT_EQ(resource->value, 6);
2733 // Check metric data.
2734 EXPECT_THAT(
2735 GetResourceCounts(),
2736 ::testing::ElementsAre(::testing::Pair(
2737 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2738 XdsFooResourceType::Get()->type_url(), "acked"),
2739 1)));
2740 // XdsClient should have sent an ACK message to the xDS server.
2741 request = WaitForRequest(stream.get());
2742 ASSERT_TRUE(request.has_value());
2743 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2744 /*version_info=*/"1", /*response_nonce=*/"A",
2745 /*error_detail=*/absl::OkStatus(),
2746 /*resource_names=*/{"foo1"});
2747 stream->CompleteSendMessageFromClient();
2748 // Cancel watch.
2749 CancelFooWatch(watcher.get(), "foo1");
2750 EXPECT_TRUE(stream->Orphaned());
2751 }
2752
2753 // In https://github.com/grpc/grpc/issues/29583, we ran into a case
2754 // where we wound up starting a timer after we had already received the
2755 // resource, thus incorrectly reporting the resource as not existing.
2756 // This happened when unsubscribing and then resubscribing to the same
2757 // resource a send_message op was already in flight and then receiving an
2758 // update containing that resource.
TEST_F(XdsClientTest,ResourceDoesNotExistUnsubscribeAndResubscribeWhileSendMessagePending)2759 TEST_F(XdsClientTest,
2760 ResourceDoesNotExistUnsubscribeAndResubscribeWhileSendMessagePending) {
2761 InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(1));
2762 // Tell transport to let us manually trigger completion of the
2763 // send_message ops to XdsClient.
2764 transport_factory_->SetAutoCompleteMessagesFromClient(false);
2765 // Start a watch for "foo1".
2766 auto watcher = StartFooWatch("foo1");
2767 // Watcher should initially not see any resource reported.
2768 EXPECT_FALSE(watcher->HasEvent());
2769 // XdsClient should have created an ADS stream.
2770 auto stream = WaitForAdsStream();
2771 ASSERT_TRUE(stream != nullptr);
2772 // XdsClient should have sent a subscription request on the ADS stream.
2773 auto request = WaitForRequest(stream.get());
2774 ASSERT_TRUE(request.has_value());
2775 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2776 /*version_info=*/"", /*response_nonce=*/"",
2777 /*error_detail=*/absl::OkStatus(),
2778 /*resource_names=*/{"foo1"});
2779 CheckRequestNode(*request); // Should be present on the first request.
2780 stream->CompleteSendMessageFromClient();
2781 // Server sends a response.
2782 stream->SendMessageToClient(
2783 ResponseBuilder(XdsFooResourceType::Get()->type_url())
2784 .set_version_info("1")
2785 .set_nonce("A")
2786 .AddFooResource(XdsFooResource("foo1", 6))
2787 .Serialize());
2788 // XdsClient should have delivered the response to the watchers.
2789 auto resource = watcher->WaitForNextResource();
2790 ASSERT_NE(resource, nullptr);
2791 EXPECT_EQ(resource->name, "foo1");
2792 EXPECT_EQ(resource->value, 6);
2793 // Check metric data.
2794 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2795 ::testing::ElementsAre(::testing::Pair(
2796 ::testing::Pair(kDefaultXdsServerUrl,
2797 XdsFooResourceType::Get()->type_url()),
2798 1)));
2799 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2800 ::testing::ElementsAre());
2801 EXPECT_THAT(
2802 GetResourceCounts(),
2803 ::testing::ElementsAre(::testing::Pair(
2804 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2805 XdsFooResourceType::Get()->type_url(), "acked"),
2806 1)));
2807 // XdsClient should have sent an ACK message to the xDS server.
2808 request = WaitForRequest(stream.get());
2809 ASSERT_TRUE(request.has_value());
2810 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2811 /*version_info=*/"1", /*response_nonce=*/"A",
2812 /*error_detail=*/absl::OkStatus(),
2813 /*resource_names=*/{"foo1"});
2814 stream->CompleteSendMessageFromClient();
2815 // Start a watch for a second resource.
2816 auto watcher2 = StartFooWatch("foo2");
2817 // Watcher should initially not see any resource reported.
2818 EXPECT_FALSE(watcher2->HasEvent());
2819 // Check metric data.
2820 EXPECT_THAT(
2821 GetResourceCounts(),
2822 ::testing::ElementsAre(
2823 ::testing::Pair(ResourceCountLabelsEq(
2824 XdsClient::kOldStyleAuthority,
2825 XdsFooResourceType::Get()->type_url(), "acked"),
2826 1),
2827 ::testing::Pair(
2828 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2829 XdsFooResourceType::Get()->type_url(),
2830 "requested"),
2831 1)));
2832 // XdsClient sends a request to subscribe to the new resource.
2833 request = WaitForRequest(stream.get());
2834 ASSERT_TRUE(request.has_value());
2835 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2836 /*version_info=*/"1", /*response_nonce=*/"A",
2837 /*error_detail=*/absl::OkStatus(),
2838 /*resource_names=*/{"foo1", "foo2"});
2839 // NOTE: We do NOT yet tell the XdsClient that the send_message op is
2840 // complete.
2841 // Unsubscribe from foo1 and then re-subscribe to it.
2842 CancelFooWatch(watcher.get(), "foo1");
2843 EXPECT_THAT(GetResourceCounts(),
2844 ::testing::ElementsAre(::testing::Pair(
2845 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2846 XdsFooResourceType::Get()->type_url(),
2847 "requested"),
2848 1)));
2849 watcher = StartFooWatch("foo1");
2850 EXPECT_THAT(GetResourceCounts(),
2851 ::testing::ElementsAre(::testing::Pair(
2852 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2853 XdsFooResourceType::Get()->type_url(),
2854 "requested"),
2855 2)));
2856 // Now send a response from the server containing both foo1 and foo2.
2857 stream->SendMessageToClient(
2858 ResponseBuilder(XdsFooResourceType::Get()->type_url())
2859 .set_version_info("1")
2860 .set_nonce("B")
2861 .AddFooResource(XdsFooResource("foo1", 6))
2862 .AddFooResource(XdsFooResource("foo2", 7))
2863 .Serialize());
2864 // The watcher for foo1 will receive an update even if the resource
2865 // has not changed, since the previous value was removed from the
2866 // cache when we unsubscribed.
2867 resource = watcher->WaitForNextResource();
2868 ASSERT_NE(resource, nullptr);
2869 EXPECT_EQ(resource->name, "foo1");
2870 EXPECT_EQ(resource->value, 6);
2871 // For foo2, the watcher should receive notification for the new resource.
2872 resource = watcher2->WaitForNextResource();
2873 ASSERT_NE(resource, nullptr);
2874 EXPECT_EQ(resource->name, "foo2");
2875 EXPECT_EQ(resource->value, 7);
2876 // Check metric data.
2877 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2878 ::testing::ElementsAre(::testing::Pair(
2879 ::testing::Pair(kDefaultXdsServerUrl,
2880 XdsFooResourceType::Get()->type_url()),
2881 3)));
2882 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2883 ::testing::ElementsAre());
2884 EXPECT_THAT(
2885 GetResourceCounts(),
2886 ::testing::ElementsAre(::testing::Pair(
2887 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2888 XdsFooResourceType::Get()->type_url(), "acked"),
2889 2)));
2890 // Now we finally tell XdsClient that its previous send_message op is
2891 // complete.
2892 stream->CompleteSendMessageFromClient();
2893 // XdsClient should send an ACK with the updated subscription list
2894 // (which happens to be identical to the old list), and it should not
2895 // restart the does-not-exist timer.
2896 request = WaitForRequest(stream.get());
2897 ASSERT_TRUE(request.has_value());
2898 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2899 /*version_info=*/"1", /*response_nonce=*/"B",
2900 /*error_detail=*/absl::OkStatus(),
2901 /*resource_names=*/{"foo1", "foo2"});
2902 stream->CompleteSendMessageFromClient();
2903 // Make sure the watcher for foo1 does not see a does-not-exist event.
2904 EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(5)));
2905 // Cancel watches.
2906 CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true);
2907 CancelFooWatch(watcher2.get(), "foo2");
2908 EXPECT_TRUE(stream->Orphaned());
2909 }
2910
TEST_F(XdsClientTest,DoNotSendDoesNotExistForCachedResource)2911 TEST_F(XdsClientTest, DoNotSendDoesNotExistForCachedResource) {
2912 // Lower resources-does-not-exist timeout, to make sure that we're not
2913 // triggering that here.
2914 InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3));
2915 // Start a watch for "foo1".
2916 auto watcher = StartFooWatch("foo1");
2917 // Watcher should initially not see any resource reported.
2918 EXPECT_FALSE(watcher->HasEvent());
2919 // XdsClient should have created an ADS stream.
2920 auto stream = WaitForAdsStream();
2921 ASSERT_TRUE(stream != nullptr);
2922 // XdsClient should have sent a subscription request on the ADS stream.
2923 auto request = WaitForRequest(stream.get());
2924 ASSERT_TRUE(request.has_value());
2925 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2926 /*version_info=*/"", /*response_nonce=*/"",
2927 /*error_detail=*/absl::OkStatus(),
2928 /*resource_names=*/{"foo1"});
2929 CheckRequestNode(*request); // Should be present on the first request.
2930 // Server sends a response.
2931 stream->SendMessageToClient(
2932 ResponseBuilder(XdsFooResourceType::Get()->type_url())
2933 .set_version_info("1")
2934 .set_nonce("A")
2935 .AddFooResource(XdsFooResource("foo1", 6))
2936 .Serialize());
2937 // XdsClient should have delivered the response to the watcher.
2938 auto resource = watcher->WaitForNextResource();
2939 ASSERT_NE(resource, nullptr);
2940 EXPECT_EQ(resource->name, "foo1");
2941 EXPECT_EQ(resource->value, 6);
2942 // Check metric data.
2943 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2944 ::testing::ElementsAre(::testing::Pair(
2945 ::testing::Pair(kDefaultXdsServerUrl,
2946 XdsFooResourceType::Get()->type_url()),
2947 1)));
2948 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2949 ::testing::ElementsAre());
2950 EXPECT_THAT(
2951 GetResourceCounts(),
2952 ::testing::ElementsAre(::testing::Pair(
2953 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2954 XdsFooResourceType::Get()->type_url(), "acked"),
2955 1)));
2956 // XdsClient should have sent an ACK message to the xDS server.
2957 request = WaitForRequest(stream.get());
2958 ASSERT_TRUE(request.has_value());
2959 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2960 /*version_info=*/"1", /*response_nonce=*/"A",
2961 /*error_detail=*/absl::OkStatus(),
2962 /*resource_names=*/{"foo1"});
2963 // Stream fails because of transport disconnection.
2964 stream->MaybeSendStatusToClient(absl::UnavailableError("connection failed"));
2965 // XdsClient should NOT report error to watcher, because we saw a
2966 // response on the stream before it failed.
2967 // XdsClient creates a new stream.
2968 stream = WaitForAdsStream();
2969 ASSERT_TRUE(stream != nullptr);
2970 // XdsClient should have sent a subscription request on the ADS stream.
2971 request = WaitForRequest(stream.get());
2972 ASSERT_TRUE(request.has_value());
2973 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2974 /*version_info=*/"1", /*response_nonce=*/"",
2975 /*error_detail=*/absl::OkStatus(),
2976 /*resource_names=*/{"foo1"});
2977 CheckRequestNode(*request); // Should be present on the first request.
2978 // Server does NOT send a response.
2979 // We should not see a resource-does-not-exist event, because the
2980 // resource was already cached, so the server can optimize by not
2981 // resending it.
2982 EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4)));
2983 // Check metric data.
2984 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
2985 ::testing::ElementsAre(::testing::Pair(
2986 ::testing::Pair(kDefaultXdsServerUrl,
2987 XdsFooResourceType::Get()->type_url()),
2988 1)));
2989 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
2990 ::testing::ElementsAre());
2991 EXPECT_THAT(
2992 GetResourceCounts(),
2993 ::testing::ElementsAre(::testing::Pair(
2994 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2995 XdsFooResourceType::Get()->type_url(), "acked"),
2996 1)));
2997 // Now server sends a response.
2998 stream->SendMessageToClient(
2999 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3000 .set_version_info("1")
3001 .set_nonce("A")
3002 .AddFooResource(XdsFooResource("foo1", 6))
3003 .Serialize());
3004 // Watcher will not see any update, since the resource is unchanged.
3005 EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(1)));
3006 // Check metric data.
3007 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3008 ::testing::ElementsAre(::testing::Pair(
3009 ::testing::Pair(kDefaultXdsServerUrl,
3010 XdsFooResourceType::Get()->type_url()),
3011 2)));
3012 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3013 ::testing::ElementsAre());
3014 EXPECT_THAT(
3015 GetResourceCounts(),
3016 ::testing::ElementsAre(::testing::Pair(
3017 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3018 XdsFooResourceType::Get()->type_url(), "acked"),
3019 1)));
3020 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
3021 kDefaultXdsServerUrl, true)));
3022 // XdsClient should have sent an ACK message to the xDS server.
3023 request = WaitForRequest(stream.get());
3024 ASSERT_TRUE(request.has_value());
3025 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3026 /*version_info=*/"1", /*response_nonce=*/"A",
3027 /*error_detail=*/absl::OkStatus(),
3028 /*resource_names=*/{"foo1"});
3029 // Cancel watch.
3030 CancelFooWatch(watcher.get(), "foo1");
3031 EXPECT_TRUE(stream->Orphaned());
3032 }
3033
TEST_F(XdsClientTest,ResourceWrappedInResourceMessage)3034 TEST_F(XdsClientTest, ResourceWrappedInResourceMessage) {
3035 InitXdsClient();
3036 // Start a watch for "foo1".
3037 auto watcher = StartFooWatch("foo1");
3038 // Watcher should initially not see any resource reported.
3039 EXPECT_FALSE(watcher->HasEvent());
3040 // XdsClient should have created an ADS stream.
3041 auto stream = WaitForAdsStream();
3042 ASSERT_TRUE(stream != nullptr);
3043 // XdsClient should have sent a subscription request on the ADS stream.
3044 auto request = WaitForRequest(stream.get());
3045 ASSERT_TRUE(request.has_value());
3046 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3047 /*version_info=*/"", /*response_nonce=*/"",
3048 /*error_detail=*/absl::OkStatus(),
3049 /*resource_names=*/{"foo1"});
3050 CheckRequestNode(*request); // Should be present on the first request.
3051 // Send a response with the resource wrapped in a Resource message.
3052 stream->SendMessageToClient(
3053 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3054 .set_version_info("1")
3055 .set_nonce("A")
3056 .AddFooResource(XdsFooResource("foo1", 6),
3057 /*in_resource_wrapper=*/true)
3058 .Serialize());
3059 // XdsClient should have delivered the response to the watcher.
3060 auto resource = watcher->WaitForNextResource();
3061 ASSERT_NE(resource, nullptr);
3062 EXPECT_EQ(resource->name, "foo1");
3063 EXPECT_EQ(resource->value, 6);
3064 // Check metric data.
3065 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3066 ::testing::ElementsAre(::testing::Pair(
3067 ::testing::Pair(kDefaultXdsServerUrl,
3068 XdsFooResourceType::Get()->type_url()),
3069 1)));
3070 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3071 ::testing::ElementsAre());
3072 EXPECT_THAT(
3073 GetResourceCounts(),
3074 ::testing::ElementsAre(::testing::Pair(
3075 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3076 XdsFooResourceType::Get()->type_url(), "acked"),
3077 1)));
3078 // XdsClient should have sent an ACK message to the xDS server.
3079 request = WaitForRequest(stream.get());
3080 ASSERT_TRUE(request.has_value());
3081 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3082 /*version_info=*/"1", /*response_nonce=*/"A",
3083 /*error_detail=*/absl::OkStatus(),
3084 /*resource_names=*/{"foo1"});
3085 // Cancel watch.
3086 CancelFooWatch(watcher.get(), "foo1");
3087 EXPECT_TRUE(stream->Orphaned());
3088 }
3089
TEST_F(XdsClientTest,MultipleResourceTypes)3090 TEST_F(XdsClientTest, MultipleResourceTypes) {
3091 InitXdsClient();
3092 // Start a watch for "foo1".
3093 auto watcher = StartFooWatch("foo1");
3094 // Watcher should initially not see any resource reported.
3095 EXPECT_FALSE(watcher->HasEvent());
3096 // XdsClient should have created an ADS stream.
3097 auto stream = WaitForAdsStream();
3098 ASSERT_TRUE(stream != nullptr);
3099 // XdsClient should have sent a subscription request on the ADS stream.
3100 auto request = WaitForRequest(stream.get());
3101 ASSERT_TRUE(request.has_value());
3102 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3103 /*version_info=*/"", /*response_nonce=*/"",
3104 /*error_detail=*/absl::OkStatus(),
3105 /*resource_names=*/{"foo1"});
3106 CheckRequestNode(*request); // Should be present on the first request.
3107 // Send a response.
3108 stream->SendMessageToClient(
3109 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3110 .set_version_info("1")
3111 .set_nonce("A")
3112 .AddFooResource(XdsFooResource("foo1", 6))
3113 .Serialize());
3114 // XdsClient should have delivered the response to the watcher.
3115 auto resource = watcher->WaitForNextResource();
3116 ASSERT_NE(resource, nullptr);
3117 EXPECT_EQ(resource->name, "foo1");
3118 EXPECT_EQ(resource->value, 6);
3119 // Check metric data.
3120 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3121 ::testing::ElementsAre(::testing::Pair(
3122 ::testing::Pair(kDefaultXdsServerUrl,
3123 XdsFooResourceType::Get()->type_url()),
3124 1)));
3125 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3126 ::testing::ElementsAre());
3127 EXPECT_THAT(
3128 GetResourceCounts(),
3129 ::testing::ElementsAre(::testing::Pair(
3130 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3131 XdsFooResourceType::Get()->type_url(), "acked"),
3132 1)));
3133 // XdsClient should have sent an ACK message to the xDS server.
3134 request = WaitForRequest(stream.get());
3135 ASSERT_TRUE(request.has_value());
3136 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3137 /*version_info=*/"1", /*response_nonce=*/"A",
3138 /*error_detail=*/absl::OkStatus(),
3139 /*resource_names=*/{"foo1"});
3140 // Start a watch for "bar1".
3141 auto watcher2 = StartBarWatch("bar1");
3142 // XdsClient should have sent a subscription request on the ADS stream.
3143 // Note that version and nonce here do NOT use the values for Foo,
3144 // since each resource type has its own state.
3145 request = WaitForRequest(stream.get());
3146 ASSERT_TRUE(request.has_value());
3147 CheckRequest(*request, XdsBarResourceType::Get()->type_url(),
3148 /*version_info=*/"", /*response_nonce=*/"",
3149 /*error_detail=*/absl::OkStatus(),
3150 /*resource_names=*/{"bar1"});
3151 // Send a response.
3152 stream->SendMessageToClient(
3153 ResponseBuilder(XdsBarResourceType::Get()->type_url())
3154 .set_version_info("2")
3155 .set_nonce("B")
3156 .AddBarResource(XdsBarResource("bar1", "whee"))
3157 .Serialize());
3158 // XdsClient should have delivered the response to the watcher.
3159 auto resource2 = watcher2->WaitForNextResource();
3160 ASSERT_NE(resource, nullptr);
3161 EXPECT_EQ(resource2->name, "bar1");
3162 EXPECT_EQ(resource2->value, "whee");
3163 // Check metric data.
3164 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3165 ::testing::ElementsAre(
3166 ::testing::Pair(
3167 ::testing::Pair(kDefaultXdsServerUrl,
3168 XdsBarResourceType::Get()->type_url()),
3169 1),
3170 ::testing::Pair(
3171 ::testing::Pair(kDefaultXdsServerUrl,
3172 XdsFooResourceType::Get()->type_url()),
3173 1)));
3174 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3175 ::testing::ElementsAre());
3176 EXPECT_THAT(
3177 GetResourceCounts(),
3178 ::testing::UnorderedElementsAre(
3179 ::testing::Pair(ResourceCountLabelsEq(
3180 XdsClient::kOldStyleAuthority,
3181 XdsBarResourceType::Get()->type_url(), "acked"),
3182 1),
3183 ::testing::Pair(ResourceCountLabelsEq(
3184 XdsClient::kOldStyleAuthority,
3185 XdsFooResourceType::Get()->type_url(), "acked"),
3186 1)));
3187 // XdsClient should have sent an ACK message to the xDS server.
3188 request = WaitForRequest(stream.get());
3189 ASSERT_TRUE(request.has_value());
3190 CheckRequest(*request, XdsBarResourceType::Get()->type_url(),
3191 /*version_info=*/"2", /*response_nonce=*/"B",
3192 /*error_detail=*/absl::OkStatus(),
3193 /*resource_names=*/{"bar1"});
3194 // Cancel watch for "foo1".
3195 CancelFooWatch(watcher.get(), "foo1");
3196 // XdsClient should send an unsubscription request.
3197 request = WaitForRequest(stream.get());
3198 ASSERT_TRUE(request.has_value());
3199 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3200 /*version_info=*/"1", /*response_nonce=*/"A",
3201 /*error_detail=*/absl::OkStatus(), /*resource_names=*/{});
3202 // Now cancel watch for "bar1".
3203 CancelBarWatch(watcher2.get(), "bar1");
3204 EXPECT_TRUE(stream->Orphaned());
3205 }
3206
TEST_F(XdsClientTest,Federation)3207 TEST_F(XdsClientTest, Federation) {
3208 constexpr char kAuthority[] = "xds.example.com";
3209 const std::string kXdstpResourceName = absl::StrCat(
3210 "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2");
3211 FakeXdsBootstrap::FakeXdsServer authority_server("other_xds_server");
3212 FakeXdsBootstrap::FakeAuthority authority;
3213 authority.set_server(authority_server);
3214 InitXdsClient(
3215 FakeXdsBootstrap::Builder().AddAuthority(kAuthority, authority));
3216 // Metrics should initially be empty.
3217 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3218 ::testing::ElementsAre());
3219 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3220 ::testing::ElementsAre());
3221 EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre());
3222 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
3223 // Start a watch for "foo1".
3224 auto watcher = StartFooWatch("foo1");
3225 // Watcher should initially not see any resource reported.
3226 EXPECT_FALSE(watcher->HasEvent());
3227 // XdsClient should have created an ADS stream to the top-level xDS server.
3228 auto stream = WaitForAdsStream(*xds_client_->bootstrap().servers().front());
3229 ASSERT_TRUE(stream != nullptr);
3230 // XdsClient should have sent a subscription request on the ADS stream.
3231 auto request = WaitForRequest(stream.get());
3232 ASSERT_TRUE(request.has_value());
3233 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3234 /*version_info=*/"", /*response_nonce=*/"",
3235 /*error_detail=*/absl::OkStatus(),
3236 /*resource_names=*/{"foo1"});
3237 CheckRequestNode(*request); // Should be present on the first request.
3238 // Send a response.
3239 stream->SendMessageToClient(
3240 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3241 .set_version_info("1")
3242 .set_nonce("A")
3243 .AddFooResource(XdsFooResource("foo1", 6))
3244 .Serialize());
3245 // XdsClient should have delivered the response to the watcher.
3246 auto resource = watcher->WaitForNextResource();
3247 ASSERT_NE(resource, nullptr);
3248 EXPECT_EQ(resource->name, "foo1");
3249 EXPECT_EQ(resource->value, 6);
3250 // Check metric data.
3251 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3252 ::testing::ElementsAre(::testing::Pair(
3253 ::testing::Pair(kDefaultXdsServerUrl,
3254 XdsFooResourceType::Get()->type_url()),
3255 1)));
3256 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3257 ::testing::ElementsAre());
3258 EXPECT_THAT(
3259 GetResourceCounts(),
3260 ::testing::ElementsAre(::testing::Pair(
3261 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3262 XdsFooResourceType::Get()->type_url(), "acked"),
3263 1)));
3264 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
3265 kDefaultXdsServerUrl, true)));
3266 // XdsClient should have sent an ACK message to the xDS server.
3267 request = WaitForRequest(stream.get());
3268 ASSERT_TRUE(request.has_value());
3269 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3270 /*version_info=*/"1", /*response_nonce=*/"A",
3271 /*error_detail=*/absl::OkStatus(),
3272 /*resource_names=*/{"foo1"});
3273 // Start a watch for the xdstp resource name.
3274 auto watcher2 = StartFooWatch(kXdstpResourceName);
3275 // Watcher should initially not see any resource reported.
3276 EXPECT_FALSE(watcher2->HasEvent());
3277 // Check metric data.
3278 EXPECT_THAT(
3279 GetResourceCounts(),
3280 ::testing::ElementsAre(
3281 ::testing::Pair(ResourceCountLabelsEq(
3282 XdsClient::kOldStyleAuthority,
3283 XdsFooResourceType::Get()->type_url(), "acked"),
3284 1),
3285 ::testing::Pair(ResourceCountLabelsEq(
3286 kAuthority, XdsFooResourceType::Get()->type_url(),
3287 "requested"),
3288 1)));
3289 EXPECT_THAT(GetServerConnections(),
3290 ::testing::ElementsAre(
3291 ::testing::Pair(kDefaultXdsServerUrl, true),
3292 ::testing::Pair(authority_server.server_uri(), true)));
3293 // XdsClient will create a new stream to the server for this authority.
3294 auto stream2 = WaitForAdsStream(authority_server);
3295 ASSERT_TRUE(stream2 != nullptr);
3296 // XdsClient should have sent a subscription request on the ADS stream.
3297 // Note that version and nonce here do NOT use the values for Foo,
3298 // since each authority has its own state.
3299 request = WaitForRequest(stream2.get());
3300 ASSERT_TRUE(request.has_value());
3301 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3302 /*version_info=*/"", /*response_nonce=*/"",
3303 /*error_detail=*/absl::OkStatus(),
3304 /*resource_names=*/{kXdstpResourceName});
3305 CheckRequestNode(*request); // Should be present on the first request.
3306 // Send a response.
3307 stream2->SendMessageToClient(
3308 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3309 .set_version_info("2")
3310 .set_nonce("B")
3311 .AddFooResource(XdsFooResource(kXdstpResourceName, 3))
3312 .Serialize());
3313 // XdsClient should have delivered the response to the watcher.
3314 resource = watcher2->WaitForNextResource();
3315 ASSERT_NE(resource, nullptr);
3316 EXPECT_EQ(resource->name, kXdstpResourceName);
3317 EXPECT_EQ(resource->value, 3);
3318 // Check metric data.
3319 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3320 ::testing::ElementsAre(
3321 ::testing::Pair(
3322 ::testing::Pair(kDefaultXdsServerUrl,
3323 XdsFooResourceType::Get()->type_url()),
3324 1),
3325 ::testing::Pair(
3326 ::testing::Pair(authority_server.server_uri(),
3327 XdsFooResourceType::Get()->type_url()),
3328 1)));
3329 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3330 ::testing::ElementsAre());
3331 EXPECT_THAT(
3332 GetResourceCounts(),
3333 ::testing::ElementsAre(
3334 ::testing::Pair(ResourceCountLabelsEq(
3335 XdsClient::kOldStyleAuthority,
3336 XdsFooResourceType::Get()->type_url(), "acked"),
3337 1),
3338 ::testing::Pair(
3339 ResourceCountLabelsEq(
3340 kAuthority, XdsFooResourceType::Get()->type_url(), "acked"),
3341 1)));
3342 EXPECT_THAT(GetServerConnections(),
3343 ::testing::ElementsAre(
3344 ::testing::Pair(kDefaultXdsServerUrl, true),
3345 ::testing::Pair(authority_server.server_uri(), true)));
3346 // XdsClient should have sent an ACK message to the xDS server.
3347 request = WaitForRequest(stream2.get());
3348 ASSERT_TRUE(request.has_value());
3349 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3350 /*version_info=*/"2", /*response_nonce=*/"B",
3351 /*error_detail=*/absl::OkStatus(),
3352 /*resource_names=*/{kXdstpResourceName});
3353 // Cancel watch for "foo1".
3354 CancelFooWatch(watcher.get(), "foo1");
3355 EXPECT_TRUE(stream->Orphaned());
3356 // Now cancel watch for xdstp resource name.
3357 CancelFooWatch(watcher2.get(), kXdstpResourceName);
3358 EXPECT_TRUE(stream2->Orphaned());
3359 }
3360
TEST_F(XdsClientTest,FederationAuthorityDefaultsToTopLevelXdsServer)3361 TEST_F(XdsClientTest, FederationAuthorityDefaultsToTopLevelXdsServer) {
3362 constexpr char kAuthority[] = "xds.example.com";
3363 const std::string kXdstpResourceName = absl::StrCat(
3364 "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2");
3365 // Authority does not specify any xDS servers, so XdsClient will use
3366 // the top-level xDS server in the bootstrap config for this authority.
3367 InitXdsClient(FakeXdsBootstrap::Builder().AddAuthority(
3368 kAuthority, FakeXdsBootstrap::FakeAuthority()));
3369 // Start a watch for "foo1".
3370 auto watcher = StartFooWatch("foo1");
3371 // Watcher should initially not see any resource reported.
3372 EXPECT_FALSE(watcher->HasEvent());
3373 // XdsClient should have created an ADS stream to the top-level xDS server.
3374 auto stream = WaitForAdsStream(*xds_client_->bootstrap().servers().front());
3375 ASSERT_TRUE(stream != nullptr);
3376 // XdsClient should have sent a subscription request on the ADS stream.
3377 auto request = WaitForRequest(stream.get());
3378 ASSERT_TRUE(request.has_value());
3379 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3380 /*version_info=*/"", /*response_nonce=*/"",
3381 /*error_detail=*/absl::OkStatus(),
3382 /*resource_names=*/{"foo1"});
3383 CheckRequestNode(*request); // Should be present on the first request.
3384 // Send a response.
3385 stream->SendMessageToClient(
3386 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3387 .set_version_info("1")
3388 .set_nonce("A")
3389 .AddFooResource(XdsFooResource("foo1", 6))
3390 .Serialize());
3391 // XdsClient should have delivered the response to the watcher.
3392 auto resource = watcher->WaitForNextResource();
3393 ASSERT_NE(resource, nullptr);
3394 EXPECT_EQ(resource->name, "foo1");
3395 EXPECT_EQ(resource->value, 6);
3396 // Check metric data.
3397 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3398 ::testing::ElementsAre(::testing::Pair(
3399 ::testing::Pair(kDefaultXdsServerUrl,
3400 XdsFooResourceType::Get()->type_url()),
3401 1)));
3402 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3403 ::testing::ElementsAre());
3404 EXPECT_THAT(
3405 GetResourceCounts(),
3406 ::testing::ElementsAre(::testing::Pair(
3407 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3408 XdsFooResourceType::Get()->type_url(), "acked"),
3409 1)));
3410 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
3411 kDefaultXdsServerUrl, true)));
3412 // XdsClient should have sent an ACK message to the xDS server.
3413 request = WaitForRequest(stream.get());
3414 ASSERT_TRUE(request.has_value());
3415 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3416 /*version_info=*/"1", /*response_nonce=*/"A",
3417 /*error_detail=*/absl::OkStatus(),
3418 /*resource_names=*/{"foo1"});
3419 // Start a watch for the xdstp resource name.
3420 auto watcher2 = StartFooWatch(kXdstpResourceName);
3421 // Watcher should initially not see any resource reported.
3422 EXPECT_FALSE(watcher2->HasEvent());
3423 // XdsClient will send a subscription request on the ADS stream that
3424 // includes both resources, since both are being obtained from the
3425 // same server.
3426 request = WaitForRequest(stream.get());
3427 ASSERT_TRUE(request.has_value());
3428 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3429 /*version_info=*/"1", /*response_nonce=*/"A",
3430 /*error_detail=*/absl::OkStatus(),
3431 /*resource_names=*/{"foo1", kXdstpResourceName});
3432 // Send a response.
3433 stream->SendMessageToClient(
3434 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3435 .set_version_info("2")
3436 .set_nonce("B")
3437 .AddFooResource(XdsFooResource(kXdstpResourceName, 3))
3438 .Serialize());
3439 // XdsClient should have delivered the response to the watcher.
3440 resource = watcher2->WaitForNextResource();
3441 ASSERT_NE(resource, nullptr);
3442 EXPECT_EQ(resource->name, kXdstpResourceName);
3443 EXPECT_EQ(resource->value, 3);
3444 // Check metric data.
3445 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3446 ::testing::ElementsAre(::testing::Pair(
3447 ::testing::Pair(kDefaultXdsServerUrl,
3448 XdsFooResourceType::Get()->type_url()),
3449 2)));
3450 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3451 ::testing::ElementsAre());
3452 EXPECT_THAT(
3453 GetResourceCounts(),
3454 ::testing::ElementsAre(
3455 ::testing::Pair(ResourceCountLabelsEq(
3456 XdsClient::kOldStyleAuthority,
3457 XdsFooResourceType::Get()->type_url(), "acked"),
3458 1),
3459 ::testing::Pair(
3460 ResourceCountLabelsEq(
3461 kAuthority, XdsFooResourceType::Get()->type_url(), "acked"),
3462 1)));
3463 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
3464 kDefaultXdsServerUrl, true)));
3465 // XdsClient should have sent an ACK message to the xDS server.
3466 request = WaitForRequest(stream.get());
3467 ASSERT_TRUE(request.has_value());
3468 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3469 /*version_info=*/"2", /*response_nonce=*/"B",
3470 /*error_detail=*/absl::OkStatus(),
3471 /*resource_names=*/{"foo1", kXdstpResourceName});
3472 // Cancel watch for "foo1".
3473 CancelFooWatch(watcher.get(), "foo1");
3474 // XdsClient should send an unsubscription request.
3475 request = WaitForRequest(stream.get());
3476 ASSERT_TRUE(request.has_value());
3477 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3478 /*version_info=*/"2", /*response_nonce=*/"B",
3479 /*error_detail=*/absl::OkStatus(),
3480 /*resource_names=*/{kXdstpResourceName});
3481 // Now cancel watch for xdstp resource name.
3482 CancelFooWatch(watcher2.get(), kXdstpResourceName);
3483 EXPECT_TRUE(stream->Orphaned());
3484 }
3485
TEST_F(XdsClientTest,FederationWithUnknownAuthority)3486 TEST_F(XdsClientTest, FederationWithUnknownAuthority) {
3487 constexpr char kAuthority[] = "xds.example.com";
3488 const std::string kXdstpResourceName = absl::StrCat(
3489 "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2");
3490 // Note: Not adding authority to bootstrap config.
3491 InitXdsClient();
3492 // Start a watch for the xdstp resource name.
3493 auto watcher = StartFooWatch(kXdstpResourceName);
3494 // Watcher should immediately get an error about the unknown authority.
3495 auto error = watcher->WaitForNextError();
3496 ASSERT_TRUE(error.has_value());
3497 EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
3498 EXPECT_EQ(error->message(),
3499 "authority \"xds.example.com\" not present in bootstrap config")
3500 << *error;
3501 }
3502
TEST_F(XdsClientTest,FederationWithUnparseableXdstpResourceName)3503 TEST_F(XdsClientTest, FederationWithUnparseableXdstpResourceName) {
3504 // Note: Not adding authority to bootstrap config.
3505 InitXdsClient();
3506 // Start a watch for the xdstp resource name.
3507 auto watcher = StartFooWatch("xdstp://x");
3508 // Watcher should immediately get an error about the unknown authority.
3509 auto error = watcher->WaitForNextError();
3510 ASSERT_TRUE(error.has_value());
3511 EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
3512 EXPECT_EQ(error->message(), "Unable to parse resource name xdstp://x")
3513 << *error;
3514 }
3515
3516 // TODO(roth,apolcyn): remove this test when the
3517 // GRPC_EXPERIMENTAL_XDS_FEDERATION env var is removed.
TEST_F(XdsClientTest,FederationDisabledWithNewStyleName)3518 TEST_F(XdsClientTest, FederationDisabledWithNewStyleName) {
3519 testing::ScopedEnvVar env_var("GRPC_EXPERIMENTAL_XDS_FEDERATION", "false");
3520 // We will use this xdstp name, whose authority is not present in
3521 // the bootstrap config. But since federation is not enabled, we
3522 // will treat this as an opaque old-style name, so we'll send it to
3523 // the default server.
3524 constexpr char kXdstpResourceName[] =
3525 "xdstp://xds.example.com/test.v3.foo/foo1";
3526 InitXdsClient();
3527 // Start a watch for the xdstp name.
3528 auto watcher = StartFooWatch(kXdstpResourceName);
3529 // Watcher should initially not see any resource reported.
3530 EXPECT_FALSE(watcher->HasEvent());
3531 // XdsClient should have created an ADS stream.
3532 auto stream = WaitForAdsStream();
3533 ASSERT_TRUE(stream != nullptr);
3534 // XdsClient should have sent a subscription request on the ADS stream.
3535 auto request = WaitForRequest(stream.get());
3536 ASSERT_TRUE(request.has_value());
3537 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3538 /*version_info=*/"", /*response_nonce=*/"",
3539 /*error_detail=*/absl::OkStatus(),
3540 /*resource_names=*/{kXdstpResourceName});
3541 CheckRequestNode(*request); // Should be present on the first request.
3542 // Send a response.
3543 stream->SendMessageToClient(
3544 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3545 .set_version_info("1")
3546 .set_nonce("A")
3547 .AddFooResource(XdsFooResource(kXdstpResourceName, 6))
3548 .Serialize());
3549 // XdsClient should have delivered the response to the watcher.
3550 auto resource = watcher->WaitForNextResource();
3551 ASSERT_NE(resource, nullptr);
3552 EXPECT_EQ(resource->name, kXdstpResourceName);
3553 EXPECT_EQ(resource->value, 6);
3554 // XdsClient should have sent an ACK message to the xDS server.
3555 request = WaitForRequest(stream.get());
3556 ASSERT_TRUE(request.has_value());
3557 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3558 /*version_info=*/"1", /*response_nonce=*/"A",
3559 /*error_detail=*/absl::OkStatus(),
3560 /*resource_names=*/{kXdstpResourceName});
3561 // Cancel watch.
3562 CancelFooWatch(watcher.get(), kXdstpResourceName);
3563 EXPECT_TRUE(stream->Orphaned());
3564 }
3565
TEST_F(XdsClientTest,FederationChannelFailureReportedToWatchers)3566 TEST_F(XdsClientTest, FederationChannelFailureReportedToWatchers) {
3567 constexpr char kAuthority[] = "xds.example.com";
3568 const std::string kXdstpResourceName = absl::StrCat(
3569 "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2");
3570 FakeXdsBootstrap::FakeXdsServer authority_server("other_xds_server");
3571 FakeXdsBootstrap::FakeAuthority authority;
3572 authority.set_server(authority_server);
3573 InitXdsClient(
3574 FakeXdsBootstrap::Builder().AddAuthority(kAuthority, authority));
3575 // Start a watch for "foo1".
3576 auto watcher = StartFooWatch("foo1");
3577 // Watcher should initially not see any resource reported.
3578 EXPECT_FALSE(watcher->HasEvent());
3579 // XdsClient should have created an ADS stream to the top-level xDS server.
3580 auto stream = WaitForAdsStream(*xds_client_->bootstrap().servers().front());
3581 ASSERT_TRUE(stream != nullptr);
3582 // XdsClient should have sent a subscription request on the ADS stream.
3583 auto request = WaitForRequest(stream.get());
3584 ASSERT_TRUE(request.has_value());
3585 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3586 /*version_info=*/"", /*response_nonce=*/"",
3587 /*error_detail=*/absl::OkStatus(),
3588 /*resource_names=*/{"foo1"});
3589 CheckRequestNode(*request); // Should be present on the first request.
3590 // Send a response.
3591 stream->SendMessageToClient(
3592 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3593 .set_version_info("1")
3594 .set_nonce("A")
3595 .AddFooResource(XdsFooResource("foo1", 6))
3596 .Serialize());
3597 // XdsClient should have delivered the response to the watcher.
3598 auto resource = watcher->WaitForNextResource();
3599 ASSERT_NE(resource, nullptr);
3600 EXPECT_EQ(resource->name, "foo1");
3601 EXPECT_EQ(resource->value, 6);
3602 // Check metric data.
3603 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3604 ::testing::ElementsAre(::testing::Pair(
3605 ::testing::Pair(kDefaultXdsServerUrl,
3606 XdsFooResourceType::Get()->type_url()),
3607 1)));
3608 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3609 ::testing::ElementsAre());
3610 EXPECT_THAT(
3611 GetResourceCounts(),
3612 ::testing::ElementsAre(::testing::Pair(
3613 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3614 XdsFooResourceType::Get()->type_url(), "acked"),
3615 1)));
3616 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
3617 kDefaultXdsServerUrl, true)));
3618 // XdsClient should have sent an ACK message to the xDS server.
3619 request = WaitForRequest(stream.get());
3620 ASSERT_TRUE(request.has_value());
3621 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3622 /*version_info=*/"1", /*response_nonce=*/"A",
3623 /*error_detail=*/absl::OkStatus(),
3624 /*resource_names=*/{"foo1"});
3625 // Start a watch for the xdstp resource name.
3626 auto watcher2 = StartFooWatch(kXdstpResourceName);
3627 // Check metric data.
3628 EXPECT_THAT(GetServerConnections(),
3629 ::testing::ElementsAre(
3630 ::testing::Pair(kDefaultXdsServerUrl, true),
3631 ::testing::Pair(authority_server.server_uri(), true)));
3632 // Watcher should initially not see any resource reported.
3633 EXPECT_FALSE(watcher2->HasEvent());
3634 // XdsClient will create a new stream to the server for this authority.
3635 auto stream2 = WaitForAdsStream(authority_server);
3636 ASSERT_TRUE(stream2 != nullptr);
3637 // XdsClient should have sent a subscription request on the ADS stream.
3638 // Note that version and nonce here do NOT use the values for Foo,
3639 // since each authority has its own state.
3640 request = WaitForRequest(stream2.get());
3641 ASSERT_TRUE(request.has_value());
3642 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3643 /*version_info=*/"", /*response_nonce=*/"",
3644 /*error_detail=*/absl::OkStatus(),
3645 /*resource_names=*/{kXdstpResourceName});
3646 CheckRequestNode(*request); // Should be present on the first request.
3647 // Send a response.
3648 stream2->SendMessageToClient(
3649 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3650 .set_version_info("2")
3651 .set_nonce("B")
3652 .AddFooResource(XdsFooResource(kXdstpResourceName, 3))
3653 .Serialize());
3654 // XdsClient should have delivered the response to the watcher.
3655 resource = watcher2->WaitForNextResource();
3656 ASSERT_NE(resource, nullptr);
3657 EXPECT_EQ(resource->name, kXdstpResourceName);
3658 EXPECT_EQ(resource->value, 3);
3659 // Check metric data.
3660 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3661 ::testing::ElementsAre(
3662 ::testing::Pair(
3663 ::testing::Pair(kDefaultXdsServerUrl,
3664 XdsFooResourceType::Get()->type_url()),
3665 1),
3666 ::testing::Pair(
3667 ::testing::Pair(authority_server.server_uri(),
3668 XdsFooResourceType::Get()->type_url()),
3669 1)));
3670 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3671 ::testing::ElementsAre());
3672 EXPECT_THAT(
3673 GetResourceCounts(),
3674 ::testing::ElementsAre(
3675 ::testing::Pair(ResourceCountLabelsEq(
3676 XdsClient::kOldStyleAuthority,
3677 XdsFooResourceType::Get()->type_url(), "acked"),
3678 1),
3679 ::testing::Pair(
3680 ResourceCountLabelsEq(
3681 kAuthority, XdsFooResourceType::Get()->type_url(), "acked"),
3682 1)));
3683 EXPECT_THAT(GetServerConnections(),
3684 ::testing::ElementsAre(
3685 ::testing::Pair(kDefaultXdsServerUrl, true),
3686 ::testing::Pair(authority_server.server_uri(), true)));
3687 // XdsClient should have sent an ACK message to the xDS server.
3688 request = WaitForRequest(stream2.get());
3689 ASSERT_TRUE(request.has_value());
3690 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3691 /*version_info=*/"2", /*response_nonce=*/"B",
3692 /*error_detail=*/absl::OkStatus(),
3693 /*resource_names=*/{kXdstpResourceName});
3694 // Now cause a channel failure on the stream to the authority's xDS server.
3695 TriggerConnectionFailure(authority_server,
3696 absl::UnavailableError("connection failed"));
3697 // The watcher for the xdstp resource name should see the error.
3698 auto error = watcher2->WaitForNextError();
3699 ASSERT_TRUE(error.has_value());
3700 EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
3701 EXPECT_EQ(error->message(),
3702 "xDS channel for server other_xds_server: connection failed "
3703 "(node ID:xds_client_test)")
3704 << *error;
3705 // The watcher for "foo1" should not see any error.
3706 EXPECT_FALSE(watcher->HasEvent());
3707 // Check metric data.
3708 EXPECT_THAT(GetServerConnections(),
3709 ::testing::ElementsAre(
3710 ::testing::Pair(kDefaultXdsServerUrl, true),
3711 ::testing::Pair(authority_server.server_uri(), false)));
3712 // Cancel watch for "foo1".
3713 CancelFooWatch(watcher.get(), "foo1");
3714 EXPECT_TRUE(stream->Orphaned());
3715 // Now cancel watch for xdstp resource name.
3716 CancelFooWatch(watcher2.get(), kXdstpResourceName);
3717 EXPECT_TRUE(stream2->Orphaned());
3718 }
3719
TEST_F(XdsClientTest,AdsReadWaitsForHandleRelease)3720 TEST_F(XdsClientTest, AdsReadWaitsForHandleRelease) {
3721 const absl::Duration timeout = absl::Seconds(5) * grpc_test_slowdown_factor();
3722 InitXdsClient();
3723 // Start watches for "foo1" and "foo2".
3724 auto watcher1 = StartFooWatch("foo1");
3725 // XdsClient should have created an ADS stream.
3726 auto stream = WaitForAdsStream();
3727 ASSERT_TRUE(stream != nullptr);
3728 // XdsClient should have sent a subscription request on the ADS stream.
3729 auto request = WaitForRequest(stream.get());
3730 ASSERT_TRUE(request.has_value());
3731 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3732 /*version_info=*/"", /*response_nonce=*/"",
3733 /*error_detail=*/absl::OkStatus(),
3734 /*resource_names=*/{"foo1"});
3735 auto watcher2 = StartFooWatch("foo2");
3736 request = WaitForRequest(stream.get());
3737 ASSERT_TRUE(request.has_value());
3738 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3739 /*version_info=*/"", /*response_nonce=*/"",
3740 /*error_detail=*/absl::OkStatus(),
3741 /*resource_names=*/{"foo1", "foo2"});
3742 // Send a response with 2 resources.
3743 stream->SendMessageToClient(
3744 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3745 .set_version_info("1")
3746 .set_nonce("A")
3747 .AddFooResource(XdsFooResource("foo1", 6))
3748 .AddFooResource(XdsFooResource("foo2", 10))
3749 .Serialize());
3750 // Send a response with a single resource, will not be read until the handle
3751 // is released
3752 stream->SendMessageToClient(
3753 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3754 .set_version_info("2")
3755 .set_nonce("B")
3756 .AddFooResource(XdsFooResource("foo1", 8))
3757 .Serialize());
3758 // XdsClient should have delivered the response to the watcher.
3759 auto resource1 = watcher1->WaitForNextResourceAndHandle();
3760 ASSERT_NE(resource1, absl::nullopt);
3761 EXPECT_EQ(resource1->resource->name, "foo1");
3762 EXPECT_EQ(resource1->resource->value, 6);
3763 auto resource2 = watcher2->WaitForNextResourceAndHandle();
3764 ASSERT_NE(resource2, absl::nullopt);
3765 EXPECT_EQ(resource2->resource->name, "foo2");
3766 EXPECT_EQ(resource2->resource->value, 10);
3767 // XdsClient should have sent an ACK message to the xDS server.
3768 request = WaitForRequest(stream.get());
3769 ASSERT_TRUE(request.has_value());
3770 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3771 /*version_info=*/"1", /*response_nonce=*/"A",
3772 /*error_detail=*/absl::OkStatus(),
3773 /*resource_names=*/{"foo1", "foo2"});
3774 EXPECT_TRUE(stream->WaitForReadsStarted(1, timeout));
3775 resource1->read_delay_handle.reset();
3776 EXPECT_TRUE(stream->WaitForReadsStarted(1, timeout));
3777 resource2->read_delay_handle.reset();
3778 EXPECT_TRUE(stream->WaitForReadsStarted(2, timeout));
3779 resource1 = watcher1->WaitForNextResourceAndHandle();
3780 ASSERT_NE(resource1, absl::nullopt);
3781 EXPECT_EQ(resource1->resource->name, "foo1");
3782 EXPECT_EQ(resource1->resource->value, 8);
3783 EXPECT_EQ(watcher2->WaitForNextResourceAndHandle(), absl::nullopt);
3784 // XdsClient should have sent an ACK message to the xDS server.
3785 request = WaitForRequest(stream.get());
3786 ASSERT_TRUE(request.has_value());
3787 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3788 /*version_info=*/"2", /*response_nonce=*/"B",
3789 /*error_detail=*/absl::OkStatus(),
3790 /*resource_names=*/{"foo1", "foo2"});
3791 EXPECT_TRUE(stream->WaitForReadsStarted(2, timeout));
3792 resource1->read_delay_handle.reset();
3793 EXPECT_TRUE(stream->WaitForReadsStarted(3, timeout));
3794 // Cancel watch.
3795 CancelFooWatch(watcher1.get(), "foo1");
3796 request = WaitForRequest(stream.get());
3797 ASSERT_TRUE(request.has_value());
3798 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3799 /*version_info=*/"2", /*response_nonce=*/"B",
3800 /*error_detail=*/absl::OkStatus(),
3801 /*resource_names=*/{"foo2"});
3802 CancelFooWatch(watcher2.get(), "foo2");
3803 EXPECT_TRUE(stream->Orphaned());
3804 }
3805
TEST_F(XdsClientTest,FallbackAndRecover)3806 TEST_F(XdsClientTest, FallbackAndRecover) {
3807 FakeXdsBootstrap::FakeXdsServer primary_server(kDefaultXdsServerUrl);
3808 FakeXdsBootstrap::FakeXdsServer fallback_server("fallback_xds_server");
3809 // Regular operation
3810 InitXdsClient(FakeXdsBootstrap::Builder().SetServers(
3811 {primary_server, fallback_server}));
3812 // Start a watch for "foo1".
3813 auto watcher = StartFooWatch("foo1");
3814 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
3815 kDefaultXdsServerUrl, true)));
3816 EXPECT_THAT(GetResourceCounts(),
3817 ::testing::ElementsAre(::testing::Pair(
3818 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3819 XdsFooResourceType::Get()->type_url(),
3820 "requested"),
3821 1)));
3822 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3823 ::testing::IsEmpty());
3824 // XdsClient should have created an ADS stream.
3825 auto stream = WaitForAdsStream();
3826 ASSERT_TRUE(stream != nullptr);
3827 // XdsClient should have sent a subscription request on the ADS stream.
3828 auto request = WaitForRequest(stream.get());
3829 ASSERT_TRUE(request.has_value());
3830 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3831 /*version_info=*/"", /*response_nonce=*/"",
3832 /*error_detail=*/absl::OkStatus(),
3833 /*resource_names=*/{"foo1"});
3834 // Input: Get initial response from primary server.
3835 stream->SendMessageToClient(
3836 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3837 .set_version_info("20")
3838 .set_nonce("O")
3839 .AddFooResource(XdsFooResource("foo1", 6))
3840 .Serialize());
3841 // Result (local): Resource is delivered to watcher.
3842 auto resource = watcher->WaitForNextResource();
3843 ASSERT_NE(resource, nullptr);
3844 EXPECT_EQ(resource->name, "foo1");
3845 EXPECT_EQ(resource->value, 6);
3846 // Result (local): Metrics show 1 resource update and 1 cached resource.
3847 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3848 ::testing::ElementsAre(::testing::Pair(
3849 ::testing::Pair(kDefaultXdsServerUrl,
3850 XdsFooResourceType::Get()->type_url()),
3851 1)));
3852 EXPECT_THAT(
3853 GetResourceCounts(),
3854 ::testing::ElementsAre(::testing::Pair(
3855 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3856 XdsFooResourceType::Get()->type_url(), "acked"),
3857 1)));
3858 // Result (remote): Client sends ACK to server.
3859 request = WaitForRequest(stream.get());
3860 ASSERT_TRUE(request.has_value());
3861 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3862 /*version_info=*/"20", /*response_nonce=*/"O",
3863 /*error_detail=*/absl::OkStatus(),
3864 /*resource_names=*/{"foo1"});
3865 // Input: Trigger connection failure to primary.
3866 TriggerConnectionFailure(primary_server,
3867 absl::UnavailableError("Server down"));
3868 // Result (local): The error is reported to the watcher.
3869 auto error = watcher->WaitForNextError();
3870 ASSERT_TRUE(error.has_value());
3871 EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
3872 EXPECT_EQ(error->message(),
3873 "xDS channel for server default_xds_server: Server down (node "
3874 "ID:xds_client_test)");
3875 // Result (local): The metrics show the channel as being unhealthy.
3876 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
3877 kDefaultXdsServerUrl, false)));
3878 // Input: Trigger stream failure.
3879 stream->MaybeSendStatusToClient(absl::UnavailableError("Stream failure"));
3880 // Result (local): The metrics still show the channel as being unhealthy.
3881 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
3882 kDefaultXdsServerUrl, false)));
3883 // Result (remote): The client starts a new stream and sends a subscription
3884 // message. Note that the server does not respond, so the channel will still
3885 // have non-OK status.
3886 stream = WaitForAdsStream();
3887 ASSERT_NE(stream, nullptr);
3888 request = WaitForRequest(stream.get());
3889 ASSERT_TRUE(request.has_value());
3890 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3891 /*version_info=*/"20", /*response_nonce=*/"",
3892 /*error_detail=*/absl::OkStatus(),
3893 /*resource_names=*/{"foo1"});
3894 // Input: Start second watch for foo1 (already cached).
3895 auto watcher_cached = StartFooWatch("foo1");
3896 // Result (local): New watcher gets the cached resource.
3897 resource = watcher_cached->WaitForNextResource();
3898 ASSERT_NE(resource, nullptr);
3899 EXPECT_EQ(resource->name, "foo1");
3900 EXPECT_EQ(resource->value, 6);
3901 // Result (local): New watcher gets the error from the channel state.
3902 error = watcher_cached->WaitForNextError();
3903 ASSERT_TRUE(error.has_value());
3904 EXPECT_EQ(error->message(),
3905 "xDS channel for server default_xds_server: Server down (node "
3906 "ID:xds_client_test)")
3907 << error->message();
3908 CancelFooWatch(watcher_cached.get(), "foo1");
3909 // Input: Start watch for foo2 (not already cached).
3910 auto watcher2 = StartFooWatch("foo2");
3911 // Result (local): Metrics show a healthy channel to the fallback server.
3912 EXPECT_THAT(GetServerConnections(),
3913 ::testing::ElementsAre(
3914 ::testing::Pair(kDefaultXdsServerUrl, false),
3915 ::testing::Pair(fallback_server.server_uri(), true)));
3916 // Result (remote): Client sent a new request for both resources on the
3917 // stream to the primary.
3918 request = WaitForRequest(stream.get());
3919 ASSERT_TRUE(request.has_value());
3920 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3921 /*version_info=*/"20", /*response_nonce=*/"",
3922 /*error_detail=*/absl::OkStatus(),
3923 /*resource_names=*/{"foo1", "foo2"});
3924 // Result (remote): Client created a stream to the fallback server and sent a
3925 // request on that stream for both resources.
3926 auto stream2 = WaitForAdsStream(fallback_server);
3927 ASSERT_TRUE(stream2 != nullptr);
3928 request = WaitForRequest(stream2.get());
3929 ASSERT_TRUE(request.has_value());
3930 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3931 /*version_info=*/"", /*response_nonce=*/"",
3932 /*error_detail=*/absl::OkStatus(),
3933 /*resource_names=*/{"foo1", "foo2"});
3934 // Input: Fallback server sends a response with both resources.
3935 stream2->SendMessageToClient(
3936 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3937 .set_version_info("5")
3938 .set_nonce("A")
3939 .AddFooResource(XdsFooResource("foo1", 20))
3940 .AddFooResource(XdsFooResource("foo2", 30))
3941 .Serialize());
3942 // Result (local): Resources are delivered to watchers.
3943 resource = watcher->WaitForNextResource();
3944 ASSERT_NE(resource, nullptr);
3945 EXPECT_EQ(resource->name, "foo1");
3946 EXPECT_EQ(resource->value, 20);
3947 resource = watcher2->WaitForNextResource();
3948 ASSERT_NE(resource, nullptr);
3949 EXPECT_EQ(resource->name, "foo2");
3950 EXPECT_EQ(resource->value, 30);
3951 // Result (local): Metrics show an update from fallback server.
3952 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3953 ::testing::ElementsAre(
3954 ::testing::Pair(
3955 ::testing::Pair(kDefaultXdsServerUrl,
3956 XdsFooResourceType::Get()->type_url()),
3957 1),
3958 ::testing::Pair(
3959 ::testing::Pair(fallback_server.server_uri(),
3960 XdsFooResourceType::Get()->type_url()),
3961 2)));
3962 EXPECT_THAT(GetServerConnections(),
3963 ::testing::ElementsAre(
3964 ::testing::Pair(kDefaultXdsServerUrl, false),
3965 ::testing::Pair(fallback_server.server_uri(), true)));
3966 EXPECT_THAT(
3967 GetResourceCounts(),
3968 ::testing::ElementsAre(::testing::Pair(
3969 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3970 XdsFooResourceType::Get()->type_url(), "acked"),
3971 2)));
3972 // Result (remote): Client sends ACK to fallback server.
3973 request = WaitForRequest(stream2.get());
3974 ASSERT_TRUE(request.has_value());
3975 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3976 /*version_info=*/"5", /*response_nonce=*/"A",
3977 /*error_detail=*/absl::OkStatus(),
3978 /*resource_names=*/{"foo1", "foo2"});
3979 // Input: Primary server sends a response containing both resources.
3980 stream->SendMessageToClient(
3981 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3982 .set_version_info("15")
3983 .set_nonce("B")
3984 .AddFooResource(XdsFooResource("foo1", 35))
3985 .AddFooResource(XdsFooResource("foo2", 25))
3986 .Serialize());
3987 // Result (local): Metrics show that we've closed the channel to the fallback
3988 // server and received resource updates from the primary server.
3989 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3990 ::testing::ElementsAre(
3991 ::testing::Pair(
3992 ::testing::Pair(kDefaultXdsServerUrl,
3993 XdsFooResourceType::Get()->type_url()),
3994 3),
3995 ::testing::Pair(
3996 ::testing::Pair(fallback_server.server_uri(),
3997 XdsFooResourceType::Get()->type_url()),
3998 2)));
3999 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4000 kDefaultXdsServerUrl, true)));
4001 // Result (remote): The stream to the fallback server has been orphaned.
4002 EXPECT_TRUE(stream2->Orphaned());
4003 // Result (local): Resources are delivered to watchers.
4004 resource = watcher->WaitForNextResource();
4005 ASSERT_NE(resource, nullptr);
4006 EXPECT_EQ(resource->name, "foo1");
4007 EXPECT_EQ(resource->value, 35);
4008 resource = watcher2->WaitForNextResource();
4009 ASSERT_NE(resource, nullptr);
4010 EXPECT_EQ(resource->name, "foo2");
4011 EXPECT_EQ(resource->value, 25);
4012 // Result (remote): Client sends ACK to server.
4013 request = WaitForRequest(stream.get());
4014 ASSERT_TRUE(request.has_value());
4015 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4016 /*version_info=*/"15", /*response_nonce=*/"B",
4017 /*error_detail=*/absl::OkStatus(),
4018 /*resource_names=*/{"foo1", "foo2"});
4019 // Clean up.
4020 CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true);
4021 CancelFooWatch(watcher2.get(), "foo2");
4022 // Result (remote): The stream to the primary server has been orphaned.
4023 EXPECT_TRUE(stream->Orphaned());
4024 }
4025
4026 // Test for both servers being unavailable
TEST_F(XdsClientTest,FallbackReportsError)4027 TEST_F(XdsClientTest, FallbackReportsError) {
4028 FakeXdsBootstrap::FakeXdsServer primary_server(kDefaultXdsServerUrl);
4029 FakeXdsBootstrap::FakeXdsServer fallback_server("fallback_xds_server");
4030 InitXdsClient(FakeXdsBootstrap::Builder().SetServers(
4031 {primary_server, fallback_server}));
4032 auto watcher = StartFooWatch("foo1");
4033 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4034 kDefaultXdsServerUrl, true)));
4035 auto stream = WaitForAdsStream();
4036 ASSERT_TRUE(stream != nullptr);
4037 // XdsClient should have sent a subscription request on the ADS stream.
4038 auto request = WaitForRequest(stream.get());
4039 ASSERT_TRUE(request.has_value());
4040 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4041 /*version_info=*/"", /*response_nonce=*/"",
4042 /*error_detail=*/absl::OkStatus(),
4043 /*resource_names=*/{"foo1"});
4044 EXPECT_THAT(GetResourceCounts(),
4045 ::testing::ElementsAre(::testing::Pair(
4046 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
4047 XdsFooResourceType::Get()->type_url(),
4048 "requested"),
4049 1)));
4050 TriggerConnectionFailure(primary_server,
4051 absl::UnavailableError("Server down"));
4052 EXPECT_THAT(GetServerConnections(),
4053 ::testing::ElementsAre(
4054 ::testing::Pair(kDefaultXdsServerUrl, false),
4055 ::testing::Pair(fallback_server.server_uri(), true)));
4056 // Fallback happens now
4057 stream = WaitForAdsStream(fallback_server);
4058 ASSERT_NE(stream, nullptr);
4059 request = WaitForRequest(stream.get());
4060 ASSERT_TRUE(request.has_value());
4061 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4062 /*version_info=*/"", /*response_nonce=*/"",
4063 /*error_detail=*/absl::OkStatus(),
4064 /*resource_names=*/{"foo1"});
4065 TriggerConnectionFailure(fallback_server,
4066 absl::UnavailableError("Another server down"));
4067 EXPECT_THAT(GetServerConnections(),
4068 ::testing::ElementsAre(
4069 ::testing::Pair(kDefaultXdsServerUrl, false),
4070 ::testing::Pair(fallback_server.server_uri(), false)));
4071 auto error = watcher->WaitForNextError();
4072 ASSERT_TRUE(error.has_value());
4073 EXPECT_THAT(error->code(), absl::StatusCode::kUnavailable);
4074 EXPECT_EQ(error->message(),
4075 "xDS channel for server fallback_xds_server: Another server down "
4076 "(node ID:xds_client_test)")
4077 << error->message();
4078 }
4079
TEST_F(XdsClientTest,FallbackOnStartup)4080 TEST_F(XdsClientTest, FallbackOnStartup) {
4081 FakeXdsBootstrap::FakeXdsServer primary_server;
4082 FakeXdsBootstrap::FakeXdsServer fallback_server("fallback_xds_server");
4083 // Regular operation
4084 InitXdsClient(FakeXdsBootstrap::Builder().SetServers(
4085 {primary_server, fallback_server}));
4086 // Start a watch for "foo1".
4087 auto watcher = StartFooWatch("foo1");
4088 auto primary_stream = WaitForAdsStream(primary_server);
4089 ASSERT_NE(primary_stream, nullptr);
4090 // XdsClient should have sent a subscription request on the ADS stream.
4091 auto request = WaitForRequest(primary_stream.get());
4092 ASSERT_TRUE(request.has_value());
4093 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4094 /*version_info=*/"", /*response_nonce=*/"",
4095 /*error_detail=*/absl::OkStatus(),
4096 /*resource_names=*/{"foo1"});
4097 TriggerConnectionFailure(primary_server,
4098 absl::UnavailableError("Primary server is down"));
4099 // XdsClient should have created an ADS stream.
4100 auto fallback_stream = WaitForAdsStream(fallback_server);
4101 ASSERT_NE(fallback_stream, nullptr);
4102 // XdsClient should have sent a subscription request on the ADS stream.
4103 request = WaitForRequest(fallback_stream.get());
4104 ASSERT_TRUE(request.has_value());
4105 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4106 /*version_info=*/"", /*response_nonce=*/"",
4107 /*error_detail=*/absl::OkStatus(),
4108 /*resource_names=*/{"foo1"});
4109 // Send a response.
4110 fallback_stream->SendMessageToClient(
4111 ResponseBuilder(XdsFooResourceType::Get()->type_url())
4112 .set_version_info("1")
4113 .set_nonce("A")
4114 .AddFooResource(XdsFooResource("foo1", 6))
4115 .Serialize());
4116 EXPECT_THAT(GetServerConnections(),
4117 ::testing::ElementsAre(
4118 ::testing::Pair(kDefaultXdsServerUrl, false),
4119 ::testing::Pair(fallback_server.server_uri(), true)));
4120 // XdsClient should have delivered the response to the watcher.
4121 auto resource = watcher->WaitForNextResource();
4122 ASSERT_NE(resource, nullptr);
4123 EXPECT_EQ(resource->name, "foo1");
4124 EXPECT_EQ(resource->value, 6);
4125 // Client sends an ACK.
4126 request = WaitForRequest(fallback_stream.get());
4127 ASSERT_TRUE(request.has_value());
4128 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4129 /*version_info=*/"1", /*response_nonce=*/"A",
4130 /*error_detail=*/absl::OkStatus(),
4131 /*resource_names=*/{"foo1"});
4132 // Recover to primary
4133 primary_stream->SendMessageToClient(
4134 ResponseBuilder(XdsFooResourceType::Get()->type_url())
4135 .set_version_info("5")
4136 .set_nonce("D")
4137 .AddFooResource(XdsFooResource("foo1", 42))
4138 .Serialize());
4139 EXPECT_TRUE(fallback_stream->Orphaned());
4140 resource = watcher->WaitForNextResource();
4141 ASSERT_NE(resource, nullptr);
4142 EXPECT_EQ(resource->name, "foo1");
4143 EXPECT_EQ(resource->value, 42);
4144 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4145 kDefaultXdsServerUrl, true)));
4146 request = WaitForRequest(primary_stream.get());
4147 ASSERT_TRUE(request.has_value());
4148 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4149 /*version_info=*/"5", /*response_nonce=*/"D",
4150 /*error_detail=*/absl::OkStatus(),
4151 /*resource_names=*/{"foo1"});
4152 }
4153
4154 } // namespace
4155 } // namespace testing
4156 } // namespace grpc_core
4157
main(int argc,char ** argv)4158 int main(int argc, char** argv) {
4159 ::testing::InitGoogleTest(&argc, argv);
4160 grpc::testing::TestEnvironment env(&argc, argv);
4161 grpc_init();
4162 int ret = RUN_ALL_TESTS();
4163 grpc_shutdown();
4164 return ret;
4165 }
4166