1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/ext/xds/xds_transport_grpc.h"
20 
21 #include <string.h>
22 
23 #include <functional>
24 #include <memory>
25 #include <utility>
26 
27 #include "absl/strings/str_cat.h"
28 
29 #include <grpc/byte_buffer.h>
30 #include <grpc/byte_buffer_reader.h>
31 #include <grpc/grpc.h>
32 #include <grpc/impl/connectivity_state.h>
33 #include <grpc/impl/propagation_bits.h>
34 #include <grpc/slice.h>
35 #include <grpc/support/log.h>
36 
37 #include "src/core/ext/filters/client_channel/client_channel.h"
38 #include "src/core/ext/xds/xds_bootstrap.h"
39 #include "src/core/ext/xds/xds_bootstrap_grpc.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/channel/channel_fwd.h"
42 #include "src/core/lib/channel/channel_stack.h"
43 #include "src/core/lib/config/core_configuration.h"
44 #include "src/core/lib/gprpp/debug_location.h"
45 #include "src/core/lib/gprpp/orphanable.h"
46 #include "src/core/lib/gprpp/ref_counted_ptr.h"
47 #include "src/core/lib/gprpp/time.h"
48 #include "src/core/lib/iomgr/closure.h"
49 #include "src/core/lib/iomgr/pollset_set.h"
50 #include "src/core/lib/json/json.h"
51 #include "src/core/lib/security/credentials/channel_creds_registry.h"
52 #include "src/core/lib/security/credentials/credentials.h"
53 #include "src/core/lib/slice/slice.h"
54 #include "src/core/lib/slice/slice_internal.h"
55 #include "src/core/lib/surface/call.h"
56 #include "src/core/lib/surface/channel.h"
57 #include "src/core/lib/surface/init_internally.h"
58 #include "src/core/lib/surface/lame_client.h"
59 #include "src/core/lib/transport/connectivity_state.h"
60 
61 namespace grpc_core {
62 
63 //
64 // GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall
65 //
66 
GrpcStreamingCall(RefCountedPtr<GrpcXdsTransportFactory> factory,grpc_channel * channel,const char * method,std::unique_ptr<StreamingCall::EventHandler> event_handler)67 GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::GrpcStreamingCall(
68     RefCountedPtr<GrpcXdsTransportFactory> factory, grpc_channel* channel,
69     const char* method,
70     std::unique_ptr<StreamingCall::EventHandler> event_handler)
71     : factory_(std::move(factory)), event_handler_(std::move(event_handler)) {
72   // Create call.
73   call_ = grpc_channel_create_pollset_set_call(
74       channel, nullptr, GRPC_PROPAGATE_DEFAULTS, factory_->interested_parties(),
75       StaticSlice::FromStaticString(method).c_slice(), nullptr,
76       Timestamp::InfFuture(), nullptr);
77   GPR_ASSERT(call_ != nullptr);
78   // Init data associated with the call.
79   grpc_metadata_array_init(&initial_metadata_recv_);
80   grpc_metadata_array_init(&trailing_metadata_recv_);
81   // Initialize closure to be used for sending messages.
82   GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this, nullptr);
83   // Start ops on the call.
84   grpc_call_error call_error;
85   grpc_op ops[3];
86   memset(ops, 0, sizeof(ops));
87   // Send initial metadata.  No callback for this, since we don't really
88   // care when it finishes.
89   grpc_op* op = ops;
90   op->op = GRPC_OP_SEND_INITIAL_METADATA;
91   op->data.send_initial_metadata.count = 0;
92   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
93               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
94   op->reserved = nullptr;
95   op++;
96   call_error = grpc_call_start_batch_and_execute(
97       call_, ops, static_cast<size_t>(op - ops), nullptr);
98   GPR_ASSERT(GRPC_CALL_OK == call_error);
99   // Start a batch with recv_initial_metadata and recv_message.
100   op = ops;
101   op->op = GRPC_OP_RECV_INITIAL_METADATA;
102   op->data.recv_initial_metadata.recv_initial_metadata =
103       &initial_metadata_recv_;
104   op->flags = 0;
105   op->reserved = nullptr;
106   op++;
107   op->op = GRPC_OP_RECV_MESSAGE;
108   op->data.recv_message.recv_message = &recv_message_payload_;
109   op->flags = 0;
110   op->reserved = nullptr;
111   op++;
112   Ref(DEBUG_LOCATION, "OnResponseReceived").release();
113   GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this, nullptr);
114   call_error = grpc_call_start_batch_and_execute(
115       call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
116   GPR_ASSERT(GRPC_CALL_OK == call_error);
117   // Start a batch for recv_trailing_metadata.
118   op = ops;
119   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
120   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
121   op->data.recv_status_on_client.status = &status_code_;
122   op->data.recv_status_on_client.status_details = &status_details_;
123   op->flags = 0;
124   op->reserved = nullptr;
125   op++;
126   // This callback signals the end of the call, so it relies on the initial
127   // ref instead of a new ref. When it's invoked, it's the initial ref that is
128   // unreffed.
129   GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this, nullptr);
130   call_error = grpc_call_start_batch_and_execute(
131       call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
132   GPR_ASSERT(GRPC_CALL_OK == call_error);
133 }
134 
135 GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
~GrpcStreamingCall()136     ~GrpcStreamingCall() {
137   grpc_metadata_array_destroy(&initial_metadata_recv_);
138   grpc_metadata_array_destroy(&trailing_metadata_recv_);
139   grpc_byte_buffer_destroy(send_message_payload_);
140   grpc_byte_buffer_destroy(recv_message_payload_);
141   CSliceUnref(status_details_);
142   GPR_ASSERT(call_ != nullptr);
143   grpc_call_unref(call_);
144 }
145 
Orphan()146 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::Orphan() {
147   GPR_ASSERT(call_ != nullptr);
148   // If we are here because xds_client wants to cancel the call,
149   // OnStatusReceived() will complete the cancellation and clean up.
150   // Otherwise, we are here because xds_client has to orphan a failed call,
151   // in which case the following cancellation will be a no-op.
152   grpc_call_cancel_internal(call_);
153   // Note that the initial ref is held by OnStatusReceived(), so the
154   // corresponding unref happens there instead of here.
155 }
156 
SendMessage(std::string payload)157 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::SendMessage(
158     std::string payload) {
159   // Create payload.
160   grpc_slice slice = grpc_slice_from_cpp_string(std::move(payload));
161   send_message_payload_ = grpc_raw_byte_buffer_create(&slice, 1);
162   CSliceUnref(slice);
163   // Send the message.
164   grpc_op op;
165   memset(&op, 0, sizeof(op));
166   op.op = GRPC_OP_SEND_MESSAGE;
167   op.data.send_message.send_message = send_message_payload_;
168   Ref(DEBUG_LOCATION, "OnRequestSent").release();
169   grpc_call_error call_error =
170       grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
171   GPR_ASSERT(GRPC_CALL_OK == call_error);
172 }
173 
174 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
OnRequestSent(void * arg,grpc_error_handle error)175     OnRequestSent(void* arg, grpc_error_handle error) {
176   auto* self = static_cast<GrpcStreamingCall*>(arg);
177   // Clean up the sent message.
178   grpc_byte_buffer_destroy(self->send_message_payload_);
179   self->send_message_payload_ = nullptr;
180   // Invoke request handler.
181   self->event_handler_->OnRequestSent(error.ok());
182   // Drop the ref.
183   self->Unref(DEBUG_LOCATION, "OnRequestSent");
184 }
185 
186 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
OnResponseReceived(void * arg,grpc_error_handle)187     OnResponseReceived(void* arg, grpc_error_handle /*error*/) {
188   auto* self = static_cast<GrpcStreamingCall*>(arg);
189   // If there was no payload, then we received status before we received
190   // another message, so we stop reading.
191   if (self->recv_message_payload_ == nullptr) {
192     self->Unref(DEBUG_LOCATION, "OnResponseReceived");
193     return;
194   }
195   // Process the response.
196   grpc_byte_buffer_reader bbr;
197   grpc_byte_buffer_reader_init(&bbr, self->recv_message_payload_);
198   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
199   grpc_byte_buffer_reader_destroy(&bbr);
200   grpc_byte_buffer_destroy(self->recv_message_payload_);
201   self->recv_message_payload_ = nullptr;
202   self->event_handler_->OnRecvMessage(StringViewFromSlice(response_slice));
203   CSliceUnref(response_slice);
204   // Keep reading.
205   grpc_op op;
206   memset(&op, 0, sizeof(op));
207   op.op = GRPC_OP_RECV_MESSAGE;
208   op.data.recv_message.recv_message = &self->recv_message_payload_;
209   GPR_ASSERT(self->call_ != nullptr);
210   // Reuses the "OnResponseReceived" ref taken in ctor.
211   const grpc_call_error call_error = grpc_call_start_batch_and_execute(
212       self->call_, &op, 1, &self->on_response_received_);
213   GPR_ASSERT(GRPC_CALL_OK == call_error);
214 }
215 
216 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
OnStatusReceived(void * arg,grpc_error_handle)217     OnStatusReceived(void* arg, grpc_error_handle /*error*/) {
218   auto* self = static_cast<GrpcStreamingCall*>(arg);
219   self->event_handler_->OnStatusReceived(
220       absl::Status(static_cast<absl::StatusCode>(self->status_code_),
221                    StringViewFromSlice(self->status_details_)));
222   self->Unref(DEBUG_LOCATION, "OnStatusReceived");
223 }
224 
225 //
226 // GrpcXdsTransportFactory::GrpcXdsTransport::StateWatcher
227 //
228 
229 class GrpcXdsTransportFactory::GrpcXdsTransport::StateWatcher
230     : public AsyncConnectivityStateWatcherInterface {
231  public:
StateWatcher(std::function<void (absl::Status)> on_connectivity_failure)232   explicit StateWatcher(
233       std::function<void(absl::Status)> on_connectivity_failure)
234       : on_connectivity_failure_(std::move(on_connectivity_failure)) {}
235 
236  private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)237   void OnConnectivityStateChange(grpc_connectivity_state new_state,
238                                  const absl::Status& status) override {
239     if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
240       on_connectivity_failure_(absl::Status(
241           status.code(),
242           absl::StrCat("channel in TRANSIENT_FAILURE: ", status.message())));
243     }
244   }
245 
246   std::function<void(absl::Status)> on_connectivity_failure_;
247 };
248 
249 //
250 // GrpcXdsClient::GrpcXdsTransport
251 //
252 
253 namespace {
254 
CreateXdsChannel(const ChannelArgs & args,const GrpcXdsBootstrap::GrpcXdsServer & server)255 grpc_channel* CreateXdsChannel(const ChannelArgs& args,
256                                const GrpcXdsBootstrap::GrpcXdsServer& server) {
257   RefCountedPtr<grpc_channel_credentials> channel_creds =
258       CoreConfiguration::Get().channel_creds_registry().CreateChannelCreds(
259           server.channel_creds_type(),
260           Json::FromObject(server.channel_creds_config()));
261   return grpc_channel_create(server.server_uri().c_str(), channel_creds.get(),
262                              args.ToC().get());
263 }
264 
IsLameChannel(grpc_channel * channel)265 bool IsLameChannel(grpc_channel* channel) {
266   grpc_channel_element* elem =
267       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
268   return elem->filter == &LameClientFilter::kFilter;
269 }
270 
271 }  // namespace
272 
GrpcXdsTransport(GrpcXdsTransportFactory * factory,const XdsBootstrap::XdsServer & server,std::function<void (absl::Status)> on_connectivity_failure,absl::Status * status)273 GrpcXdsTransportFactory::GrpcXdsTransport::GrpcXdsTransport(
274     GrpcXdsTransportFactory* factory, const XdsBootstrap::XdsServer& server,
275     std::function<void(absl::Status)> on_connectivity_failure,
276     absl::Status* status)
277     : factory_(factory) {
278   channel_ = CreateXdsChannel(
279       factory->args_,
280       static_cast<const GrpcXdsBootstrap::GrpcXdsServer&>(server));
281   GPR_ASSERT(channel_ != nullptr);
282   if (IsLameChannel(channel_)) {
283     *status = absl::UnavailableError("xds client has a lame channel");
284   } else {
285     ClientChannel* client_channel =
286         ClientChannel::GetFromChannel(Channel::FromC(channel_));
287     GPR_ASSERT(client_channel != nullptr);
288     watcher_ = new StateWatcher(std::move(on_connectivity_failure));
289     client_channel->AddConnectivityWatcher(
290         GRPC_CHANNEL_IDLE,
291         OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
292   }
293 }
294 
~GrpcXdsTransport()295 GrpcXdsTransportFactory::GrpcXdsTransport::~GrpcXdsTransport() {
296   grpc_channel_destroy_internal(channel_);
297 }
298 
Orphan()299 void GrpcXdsTransportFactory::GrpcXdsTransport::Orphan() {
300   if (!IsLameChannel(channel_)) {
301     ClientChannel* client_channel =
302         ClientChannel::GetFromChannel(Channel::FromC(channel_));
303     GPR_ASSERT(client_channel != nullptr);
304     client_channel->RemoveConnectivityWatcher(watcher_);
305   }
306   Unref();
307 }
308 
309 OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall>
CreateStreamingCall(const char * method,std::unique_ptr<StreamingCall::EventHandler> event_handler)310 GrpcXdsTransportFactory::GrpcXdsTransport::CreateStreamingCall(
311     const char* method,
312     std::unique_ptr<StreamingCall::EventHandler> event_handler) {
313   return MakeOrphanable<GrpcStreamingCall>(
314       factory_->Ref(DEBUG_LOCATION, "StreamingCall"), channel_, method,
315       std::move(event_handler));
316 }
317 
ResetBackoff()318 void GrpcXdsTransportFactory::GrpcXdsTransport::ResetBackoff() {
319   grpc_channel_reset_connect_backoff(channel_);
320 }
321 
322 //
323 // GrpcXdsTransportFactory
324 //
325 
326 namespace {
327 
ModifyChannelArgs(const ChannelArgs & args)328 ChannelArgs ModifyChannelArgs(const ChannelArgs& args) {
329   return args.Set(GRPC_ARG_KEEPALIVE_TIME_MS, Duration::Minutes(5).millis());
330 }
331 
332 }  // namespace
333 
GrpcXdsTransportFactory(const ChannelArgs & args)334 GrpcXdsTransportFactory::GrpcXdsTransportFactory(const ChannelArgs& args)
335     : args_(ModifyChannelArgs(args)),
336       interested_parties_(grpc_pollset_set_create()) {
337   // Calling grpc_init to ensure gRPC does not shut down until the XdsClient is
338   // destroyed.
339   InitInternally();
340 }
341 
~GrpcXdsTransportFactory()342 GrpcXdsTransportFactory::~GrpcXdsTransportFactory() {
343   grpc_pollset_set_destroy(interested_parties_);
344   // Calling grpc_shutdown to ensure gRPC does not shut down until the XdsClient
345   // is destroyed.
346   ShutdownInternally();
347 }
348 
349 OrphanablePtr<XdsTransportFactory::XdsTransport>
Create(const XdsBootstrap::XdsServer & server,std::function<void (absl::Status)> on_connectivity_failure,absl::Status * status)350 GrpcXdsTransportFactory::Create(
351     const XdsBootstrap::XdsServer& server,
352     std::function<void(absl::Status)> on_connectivity_failure,
353     absl::Status* status) {
354   return MakeOrphanable<GrpcXdsTransport>(
355       this, server, std::move(on_connectivity_failure), status);
356 }
357 
358 }  // namespace grpc_core
359