1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_SRC_CORE_EXT_XDS_XDS_TRANSPORT_GRPC_H
18 #define GRPC_SRC_CORE_EXT_XDS_XDS_TRANSPORT_GRPC_H
19 
20 #include <grpc/support/port_platform.h>
21 
22 #include <functional>
23 #include <memory>
24 #include <string>
25 
26 #include "absl/status/status.h"
27 
28 #include <grpc/grpc.h>
29 #include <grpc/slice.h>
30 #include <grpc/status.h>
31 
32 #include "src/core/ext/xds/xds_bootstrap.h"
33 #include "src/core/ext/xds/xds_transport.h"
34 #include "src/core/lib/channel/channel_args.h"
35 #include "src/core/lib/gprpp/orphanable.h"
36 #include "src/core/lib/gprpp/ref_counted_ptr.h"
37 #include "src/core/lib/iomgr/closure.h"
38 #include "src/core/lib/iomgr/error.h"
39 #include "src/core/lib/iomgr/iomgr_fwd.h"
40 
41 namespace grpc_core {
42 
43 class GrpcXdsTransportFactory : public XdsTransportFactory {
44  public:
45   class GrpcXdsTransport;
46 
47   explicit GrpcXdsTransportFactory(const ChannelArgs& args);
48   ~GrpcXdsTransportFactory() override;
49 
Orphan()50   void Orphan() override { Unref(); }
51 
52   OrphanablePtr<XdsTransport> Create(
53       const XdsBootstrap::XdsServer& server,
54       std::function<void(absl::Status)> on_connectivity_failure,
55       absl::Status* status) override;
56 
interested_parties()57   grpc_pollset_set* interested_parties() const { return interested_parties_; }
58 
59  private:
60   ChannelArgs args_;
61   grpc_pollset_set* interested_parties_;
62 };
63 
64 class GrpcXdsTransportFactory::GrpcXdsTransport
65     : public XdsTransportFactory::XdsTransport {
66  public:
67   class GrpcStreamingCall;
68 
69   GrpcXdsTransport(GrpcXdsTransportFactory* factory,
70                    const XdsBootstrap::XdsServer& server,
71                    std::function<void(absl::Status)> on_connectivity_failure,
72                    absl::Status* status);
73   ~GrpcXdsTransport() override;
74 
75   void Orphan() override;
76 
77   OrphanablePtr<StreamingCall> CreateStreamingCall(
78       const char* method,
79       std::unique_ptr<StreamingCall::EventHandler> event_handler) override;
80 
81   void ResetBackoff() override;
82 
83  private:
84   class StateWatcher;
85 
86   GrpcXdsTransportFactory* factory_;  // Not owned.
87   grpc_channel* channel_;
88   StateWatcher* watcher_;
89 };
90 
91 class GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall
92     : public XdsTransportFactory::XdsTransport::StreamingCall {
93  public:
94   GrpcStreamingCall(RefCountedPtr<GrpcXdsTransportFactory> factory,
95                     grpc_channel* channel, const char* method,
96                     std::unique_ptr<StreamingCall::EventHandler> event_handler);
97   ~GrpcStreamingCall() override;
98 
99   void Orphan() override;
100 
101   void SendMessage(std::string payload) override;
102 
103  private:
104   static void OnRequestSent(void* arg, grpc_error_handle error);
105   static void OnResponseReceived(void* arg, grpc_error_handle /*error*/);
106   static void OnStatusReceived(void* arg, grpc_error_handle /*error*/);
107 
108   RefCountedPtr<GrpcXdsTransportFactory> factory_;
109 
110   std::unique_ptr<StreamingCall::EventHandler> event_handler_;
111 
112   // Always non-NULL.
113   grpc_call* call_;
114 
115   // recv_initial_metadata
116   grpc_metadata_array initial_metadata_recv_;
117 
118   // send_message
119   grpc_byte_buffer* send_message_payload_ = nullptr;
120   grpc_closure on_request_sent_;
121 
122   // recv_message
123   grpc_byte_buffer* recv_message_payload_ = nullptr;
124   grpc_closure on_response_received_;
125 
126   // recv_trailing_metadata
127   grpc_metadata_array trailing_metadata_recv_;
128   grpc_status_code status_code_;
129   grpc_slice status_details_;
130   grpc_closure on_status_received_;
131 };
132 
133 }  // namespace grpc_core
134 
135 #endif  // GRPC_SRC_CORE_EXT_XDS_XDS_TRANSPORT_GRPC_H
136