1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/ext/xds/xds_transport_grpc.h"
20
21 #include <string.h>
22
23 #include <functional>
24 #include <memory>
25 #include <utility>
26
27 #include "absl/strings/str_cat.h"
28
29 #include <grpc/byte_buffer.h>
30 #include <grpc/byte_buffer_reader.h>
31 #include <grpc/grpc.h>
32 #include <grpc/impl/connectivity_state.h>
33 #include <grpc/impl/propagation_bits.h>
34 #include <grpc/slice.h>
35 #include <grpc/support/log.h>
36
37 #include "src/core/ext/filters/client_channel/client_channel.h"
38 #include "src/core/ext/xds/xds_bootstrap.h"
39 #include "src/core/ext/xds/xds_bootstrap_grpc.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/channel/channel_fwd.h"
42 #include "src/core/lib/channel/channel_stack.h"
43 #include "src/core/lib/config/core_configuration.h"
44 #include "src/core/lib/gprpp/debug_location.h"
45 #include "src/core/lib/gprpp/orphanable.h"
46 #include "src/core/lib/gprpp/ref_counted_ptr.h"
47 #include "src/core/lib/gprpp/time.h"
48 #include "src/core/lib/iomgr/closure.h"
49 #include "src/core/lib/iomgr/pollset_set.h"
50 #include "src/core/lib/json/json.h"
51 #include "src/core/lib/security/credentials/channel_creds_registry.h"
52 #include "src/core/lib/security/credentials/credentials.h"
53 #include "src/core/lib/slice/slice.h"
54 #include "src/core/lib/slice/slice_internal.h"
55 #include "src/core/lib/surface/call.h"
56 #include "src/core/lib/surface/channel.h"
57 #include "src/core/lib/surface/init_internally.h"
58 #include "src/core/lib/surface/lame_client.h"
59 #include "src/core/lib/transport/connectivity_state.h"
60
61 namespace grpc_core {
62
63 //
64 // GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall
65 //
66
GrpcStreamingCall(RefCountedPtr<GrpcXdsTransportFactory> factory,grpc_channel * channel,const char * method,std::unique_ptr<StreamingCall::EventHandler> event_handler)67 GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::GrpcStreamingCall(
68 RefCountedPtr<GrpcXdsTransportFactory> factory, grpc_channel* channel,
69 const char* method,
70 std::unique_ptr<StreamingCall::EventHandler> event_handler)
71 : factory_(std::move(factory)), event_handler_(std::move(event_handler)) {
72 // Create call.
73 call_ = grpc_channel_create_pollset_set_call(
74 channel, nullptr, GRPC_PROPAGATE_DEFAULTS, factory_->interested_parties(),
75 StaticSlice::FromStaticString(method).c_slice(), nullptr,
76 Timestamp::InfFuture(), nullptr);
77 GPR_ASSERT(call_ != nullptr);
78 // Init data associated with the call.
79 grpc_metadata_array_init(&initial_metadata_recv_);
80 grpc_metadata_array_init(&trailing_metadata_recv_);
81 // Initialize closure to be used for sending messages.
82 GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this, nullptr);
83 // Start ops on the call.
84 grpc_call_error call_error;
85 grpc_op ops[3];
86 memset(ops, 0, sizeof(ops));
87 // Send initial metadata. No callback for this, since we don't really
88 // care when it finishes.
89 grpc_op* op = ops;
90 op->op = GRPC_OP_SEND_INITIAL_METADATA;
91 op->data.send_initial_metadata.count = 0;
92 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
93 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
94 op->reserved = nullptr;
95 op++;
96 call_error = grpc_call_start_batch_and_execute(
97 call_, ops, static_cast<size_t>(op - ops), nullptr);
98 GPR_ASSERT(GRPC_CALL_OK == call_error);
99 // Start a batch with recv_initial_metadata and recv_message.
100 op = ops;
101 op->op = GRPC_OP_RECV_INITIAL_METADATA;
102 op->data.recv_initial_metadata.recv_initial_metadata =
103 &initial_metadata_recv_;
104 op->flags = 0;
105 op->reserved = nullptr;
106 op++;
107 op->op = GRPC_OP_RECV_MESSAGE;
108 op->data.recv_message.recv_message = &recv_message_payload_;
109 op->flags = 0;
110 op->reserved = nullptr;
111 op++;
112 Ref(DEBUG_LOCATION, "OnResponseReceived").release();
113 GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this, nullptr);
114 call_error = grpc_call_start_batch_and_execute(
115 call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
116 GPR_ASSERT(GRPC_CALL_OK == call_error);
117 // Start a batch for recv_trailing_metadata.
118 op = ops;
119 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
120 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
121 op->data.recv_status_on_client.status = &status_code_;
122 op->data.recv_status_on_client.status_details = &status_details_;
123 op->flags = 0;
124 op->reserved = nullptr;
125 op++;
126 // This callback signals the end of the call, so it relies on the initial
127 // ref instead of a new ref. When it's invoked, it's the initial ref that is
128 // unreffed.
129 GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this, nullptr);
130 call_error = grpc_call_start_batch_and_execute(
131 call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
132 GPR_ASSERT(GRPC_CALL_OK == call_error);
133 }
134
135 GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
~GrpcStreamingCall()136 ~GrpcStreamingCall() {
137 grpc_metadata_array_destroy(&initial_metadata_recv_);
138 grpc_metadata_array_destroy(&trailing_metadata_recv_);
139 grpc_byte_buffer_destroy(send_message_payload_);
140 grpc_byte_buffer_destroy(recv_message_payload_);
141 CSliceUnref(status_details_);
142 GPR_ASSERT(call_ != nullptr);
143 grpc_call_unref(call_);
144 }
145
Orphan()146 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::Orphan() {
147 GPR_ASSERT(call_ != nullptr);
148 // If we are here because xds_client wants to cancel the call,
149 // OnStatusReceived() will complete the cancellation and clean up.
150 // Otherwise, we are here because xds_client has to orphan a failed call,
151 // in which case the following cancellation will be a no-op.
152 grpc_call_cancel_internal(call_);
153 // Note that the initial ref is held by OnStatusReceived(), so the
154 // corresponding unref happens there instead of here.
155 }
156
SendMessage(std::string payload)157 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::SendMessage(
158 std::string payload) {
159 // Create payload.
160 grpc_slice slice = grpc_slice_from_cpp_string(std::move(payload));
161 send_message_payload_ = grpc_raw_byte_buffer_create(&slice, 1);
162 CSliceUnref(slice);
163 // Send the message.
164 grpc_op op;
165 memset(&op, 0, sizeof(op));
166 op.op = GRPC_OP_SEND_MESSAGE;
167 op.data.send_message.send_message = send_message_payload_;
168 Ref(DEBUG_LOCATION, "OnRequestSent").release();
169 grpc_call_error call_error =
170 grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
171 GPR_ASSERT(GRPC_CALL_OK == call_error);
172 }
173
174 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
OnRequestSent(void * arg,grpc_error_handle error)175 OnRequestSent(void* arg, grpc_error_handle error) {
176 auto* self = static_cast<GrpcStreamingCall*>(arg);
177 // Clean up the sent message.
178 grpc_byte_buffer_destroy(self->send_message_payload_);
179 self->send_message_payload_ = nullptr;
180 // Invoke request handler.
181 self->event_handler_->OnRequestSent(error.ok());
182 // Drop the ref.
183 self->Unref(DEBUG_LOCATION, "OnRequestSent");
184 }
185
186 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
OnResponseReceived(void * arg,grpc_error_handle)187 OnResponseReceived(void* arg, grpc_error_handle /*error*/) {
188 auto* self = static_cast<GrpcStreamingCall*>(arg);
189 // If there was no payload, then we received status before we received
190 // another message, so we stop reading.
191 if (self->recv_message_payload_ == nullptr) {
192 self->Unref(DEBUG_LOCATION, "OnResponseReceived");
193 return;
194 }
195 // Process the response.
196 grpc_byte_buffer_reader bbr;
197 grpc_byte_buffer_reader_init(&bbr, self->recv_message_payload_);
198 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
199 grpc_byte_buffer_reader_destroy(&bbr);
200 grpc_byte_buffer_destroy(self->recv_message_payload_);
201 self->recv_message_payload_ = nullptr;
202 self->event_handler_->OnRecvMessage(StringViewFromSlice(response_slice));
203 CSliceUnref(response_slice);
204 // Keep reading.
205 grpc_op op;
206 memset(&op, 0, sizeof(op));
207 op.op = GRPC_OP_RECV_MESSAGE;
208 op.data.recv_message.recv_message = &self->recv_message_payload_;
209 GPR_ASSERT(self->call_ != nullptr);
210 // Reuses the "OnResponseReceived" ref taken in ctor.
211 const grpc_call_error call_error = grpc_call_start_batch_and_execute(
212 self->call_, &op, 1, &self->on_response_received_);
213 GPR_ASSERT(GRPC_CALL_OK == call_error);
214 }
215
216 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
OnStatusReceived(void * arg,grpc_error_handle)217 OnStatusReceived(void* arg, grpc_error_handle /*error*/) {
218 auto* self = static_cast<GrpcStreamingCall*>(arg);
219 self->event_handler_->OnStatusReceived(
220 absl::Status(static_cast<absl::StatusCode>(self->status_code_),
221 StringViewFromSlice(self->status_details_)));
222 self->Unref(DEBUG_LOCATION, "OnStatusReceived");
223 }
224
225 //
226 // GrpcXdsTransportFactory::GrpcXdsTransport::StateWatcher
227 //
228
229 class GrpcXdsTransportFactory::GrpcXdsTransport::StateWatcher
230 : public AsyncConnectivityStateWatcherInterface {
231 public:
StateWatcher(std::function<void (absl::Status)> on_connectivity_failure)232 explicit StateWatcher(
233 std::function<void(absl::Status)> on_connectivity_failure)
234 : on_connectivity_failure_(std::move(on_connectivity_failure)) {}
235
236 private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)237 void OnConnectivityStateChange(grpc_connectivity_state new_state,
238 const absl::Status& status) override {
239 if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
240 on_connectivity_failure_(absl::Status(
241 status.code(),
242 absl::StrCat("channel in TRANSIENT_FAILURE: ", status.message())));
243 }
244 }
245
246 std::function<void(absl::Status)> on_connectivity_failure_;
247 };
248
249 //
250 // GrpcXdsClient::GrpcXdsTransport
251 //
252
253 namespace {
254
CreateXdsChannel(const ChannelArgs & args,const GrpcXdsBootstrap::GrpcXdsServer & server)255 grpc_channel* CreateXdsChannel(const ChannelArgs& args,
256 const GrpcXdsBootstrap::GrpcXdsServer& server) {
257 RefCountedPtr<grpc_channel_credentials> channel_creds =
258 CoreConfiguration::Get().channel_creds_registry().CreateChannelCreds(
259 server.channel_creds_type(),
260 Json::FromObject(server.channel_creds_config()));
261 return grpc_channel_create(server.server_uri().c_str(), channel_creds.get(),
262 args.ToC().get());
263 }
264
IsLameChannel(grpc_channel * channel)265 bool IsLameChannel(grpc_channel* channel) {
266 grpc_channel_element* elem =
267 grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
268 return elem->filter == &LameClientFilter::kFilter;
269 }
270
271 } // namespace
272
GrpcXdsTransport(GrpcXdsTransportFactory * factory,const XdsBootstrap::XdsServer & server,std::function<void (absl::Status)> on_connectivity_failure,absl::Status * status)273 GrpcXdsTransportFactory::GrpcXdsTransport::GrpcXdsTransport(
274 GrpcXdsTransportFactory* factory, const XdsBootstrap::XdsServer& server,
275 std::function<void(absl::Status)> on_connectivity_failure,
276 absl::Status* status)
277 : factory_(factory) {
278 channel_ = CreateXdsChannel(
279 factory->args_,
280 static_cast<const GrpcXdsBootstrap::GrpcXdsServer&>(server));
281 GPR_ASSERT(channel_ != nullptr);
282 if (IsLameChannel(channel_)) {
283 *status = absl::UnavailableError("xds client has a lame channel");
284 } else {
285 ClientChannel* client_channel =
286 ClientChannel::GetFromChannel(Channel::FromC(channel_));
287 GPR_ASSERT(client_channel != nullptr);
288 watcher_ = new StateWatcher(std::move(on_connectivity_failure));
289 client_channel->AddConnectivityWatcher(
290 GRPC_CHANNEL_IDLE,
291 OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
292 }
293 }
294
~GrpcXdsTransport()295 GrpcXdsTransportFactory::GrpcXdsTransport::~GrpcXdsTransport() {
296 grpc_channel_destroy_internal(channel_);
297 }
298
Orphan()299 void GrpcXdsTransportFactory::GrpcXdsTransport::Orphan() {
300 if (!IsLameChannel(channel_)) {
301 ClientChannel* client_channel =
302 ClientChannel::GetFromChannel(Channel::FromC(channel_));
303 GPR_ASSERT(client_channel != nullptr);
304 client_channel->RemoveConnectivityWatcher(watcher_);
305 }
306 Unref();
307 }
308
309 OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall>
CreateStreamingCall(const char * method,std::unique_ptr<StreamingCall::EventHandler> event_handler)310 GrpcXdsTransportFactory::GrpcXdsTransport::CreateStreamingCall(
311 const char* method,
312 std::unique_ptr<StreamingCall::EventHandler> event_handler) {
313 return MakeOrphanable<GrpcStreamingCall>(
314 factory_->Ref(DEBUG_LOCATION, "StreamingCall"), channel_, method,
315 std::move(event_handler));
316 }
317
ResetBackoff()318 void GrpcXdsTransportFactory::GrpcXdsTransport::ResetBackoff() {
319 grpc_channel_reset_connect_backoff(channel_);
320 }
321
322 //
323 // GrpcXdsTransportFactory
324 //
325
326 namespace {
327
ModifyChannelArgs(const ChannelArgs & args)328 ChannelArgs ModifyChannelArgs(const ChannelArgs& args) {
329 return args.Set(GRPC_ARG_KEEPALIVE_TIME_MS, Duration::Minutes(5).millis());
330 }
331
332 } // namespace
333
GrpcXdsTransportFactory(const ChannelArgs & args)334 GrpcXdsTransportFactory::GrpcXdsTransportFactory(const ChannelArgs& args)
335 : args_(ModifyChannelArgs(args)),
336 interested_parties_(grpc_pollset_set_create()) {
337 // Calling grpc_init to ensure gRPC does not shut down until the XdsClient is
338 // destroyed.
339 InitInternally();
340 }
341
~GrpcXdsTransportFactory()342 GrpcXdsTransportFactory::~GrpcXdsTransportFactory() {
343 grpc_pollset_set_destroy(interested_parties_);
344 // Calling grpc_shutdown to ensure gRPC does not shut down until the XdsClient
345 // is destroyed.
346 ShutdownInternally();
347 }
348
349 OrphanablePtr<XdsTransportFactory::XdsTransport>
Create(const XdsBootstrap::XdsServer & server,std::function<void (absl::Status)> on_connectivity_failure,absl::Status * status)350 GrpcXdsTransportFactory::Create(
351 const XdsBootstrap::XdsServer& server,
352 std::function<void(absl::Status)> on_connectivity_failure,
353 absl::Status* status) {
354 return MakeOrphanable<GrpcXdsTransport>(
355 this, server, std::move(on_connectivity_failure), status);
356 }
357
358 } // namespace grpc_core
359