1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/transport/transport.h"
22 
23 #include <string.h>
24 
25 #include <memory>
26 #include <new>
27 
28 #include "absl/status/status.h"
29 #include "absl/strings/str_cat.h"
30 
31 #include <grpc/event_engine/event_engine.h>
32 #include <grpc/grpc.h>
33 
34 #include "src/core/lib/event_engine/default_event_engine.h"
35 #include "src/core/lib/gpr/alloc.h"
36 #include "src/core/lib/gprpp/time.h"
37 #include "src/core/lib/iomgr/exec_ctx.h"
38 #include "src/core/lib/slice/slice.h"
39 #include "src/core/lib/transport/error_utils.h"
40 #include "src/core/lib/transport/transport_impl.h"
41 
42 grpc_core::DebugOnlyTraceFlag grpc_trace_stream_refcount(false,
43                                                          "stream_refcount");
44 
grpc_stream_destroy(grpc_stream_refcount * refcount)45 void grpc_stream_destroy(grpc_stream_refcount* refcount) {
46   if ((grpc_core::ExecCtx::Get()->flags() &
47        GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP)) {
48     // Ick.
49     // The thread we're running on MAY be owned (indirectly) by a call-stack.
50     // If that's the case, destroying the call-stack MAY try to destroy the
51     // thread, which is a tangled mess that we just don't want to ever have to
52     // cope with.
53     // Throw this over to the executor (on a core-owned thread) and process it
54     // there.
55     grpc_event_engine::experimental::GetDefaultEventEngine()->Run([refcount] {
56       grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
57       grpc_core::ExecCtx exec_ctx;
58       grpc_core::ExecCtx::Run(DEBUG_LOCATION, &refcount->destroy,
59                               absl::OkStatus());
60     });
61   } else {
62     grpc_core::ExecCtx::Run(DEBUG_LOCATION, &refcount->destroy,
63                             absl::OkStatus());
64   }
65 }
66 
slice_stream_destroy(void * arg)67 void slice_stream_destroy(void* arg) {
68   grpc_stream_destroy(static_cast<grpc_stream_refcount*>(arg));
69 }
70 
71 #ifndef NDEBUG
grpc_stream_ref_init(grpc_stream_refcount * refcount,int,grpc_iomgr_cb_func cb,void * cb_arg,const char * object_type)72 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int /*initial_refs*/,
73                           grpc_iomgr_cb_func cb, void* cb_arg,
74                           const char* object_type) {
75   refcount->object_type = object_type;
76 #else
77 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int /*initial_refs*/,
78                           grpc_iomgr_cb_func cb, void* cb_arg) {
79 #endif
80   GRPC_CLOSURE_INIT(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx);
81 
82   new (&refcount->refs) grpc_core::RefCount(
83       1, GRPC_TRACE_FLAG_ENABLED(grpc_trace_stream_refcount) ? "stream_refcount"
84                                                              : nullptr);
85 }
86 
87 static void move64bits(uint64_t* from, uint64_t* to) {
88   *to += *from;
89   *from = 0;
90 }
91 
92 void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats* from,
93                                        grpc_transport_one_way_stats* to) {
94   move64bits(&from->framing_bytes, &to->framing_bytes);
95   move64bits(&from->data_bytes, &to->data_bytes);
96   move64bits(&from->header_bytes, &to->header_bytes);
97 }
98 
99 void grpc_transport_move_stats(grpc_transport_stream_stats* from,
100                                grpc_transport_stream_stats* to) {
101   grpc_transport_move_one_way_stats(&from->incoming, &to->incoming);
102   grpc_transport_move_one_way_stats(&from->outgoing, &to->outgoing);
103   to->latency = std::exchange(from->latency, gpr_inf_future(GPR_TIMESPAN));
104 }
105 
106 size_t grpc_transport_stream_size(grpc_transport* transport) {
107   return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(transport->vtable->sizeof_stream);
108 }
109 
110 void grpc_transport_destroy(grpc_transport* transport) {
111   transport->vtable->destroy(transport);
112 }
113 
114 int grpc_transport_init_stream(grpc_transport* transport, grpc_stream* stream,
115                                grpc_stream_refcount* refcount,
116                                const void* server_data,
117                                grpc_core::Arena* arena) {
118   return transport->vtable->init_stream(transport, stream, refcount,
119                                         server_data, arena);
120 }
121 
122 void grpc_transport_perform_stream_op(grpc_transport* transport,
123                                       grpc_stream* stream,
124                                       grpc_transport_stream_op_batch* op) {
125   transport->vtable->perform_stream_op(transport, stream, op);
126 }
127 
128 void grpc_transport_perform_op(grpc_transport* transport,
129                                grpc_transport_op* op) {
130   transport->vtable->perform_op(transport, op);
131 }
132 
133 void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream,
134                              grpc_polling_entity* pollent) {
135   grpc_pollset* pollset;
136   grpc_pollset_set* pollset_set;
137   if ((pollset = grpc_polling_entity_pollset(pollent)) != nullptr) {
138     transport->vtable->set_pollset(transport, stream, pollset);
139   } else if ((pollset_set = grpc_polling_entity_pollset_set(pollent)) !=
140              nullptr) {
141     transport->vtable->set_pollset_set(transport, stream, pollset_set);
142   } else {
143     // No-op for empty pollset. Empty pollset is possible when using
144     // non-fd-based event engines such as CFStream.
145   }
146 }
147 
148 void grpc_transport_destroy_stream(grpc_transport* transport,
149                                    grpc_stream* stream,
150                                    grpc_closure* then_schedule_closure) {
151   transport->vtable->destroy_stream(transport, stream, then_schedule_closure);
152 }
153 
154 grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport) {
155   return transport->vtable->get_endpoint(transport);
156 }
157 
158 // This comment should be sung to the tune of
159 // "Supercalifragilisticexpialidocious":
160 //
161 // grpc_transport_stream_op_batch_finish_with_failure
162 // is a function that must always unref cancel_error
163 // though it lives in lib, it handles transport stream ops sure
164 // it's grpc_transport_stream_op_batch_finish_with_failure
165 void grpc_transport_stream_op_batch_finish_with_failure(
166     grpc_transport_stream_op_batch* batch, grpc_error_handle error,
167     grpc_core::CallCombiner* call_combiner) {
168   grpc_core::CallCombinerClosureList closures;
169   grpc_transport_stream_op_batch_queue_finish_with_failure(batch, error,
170                                                            &closures);
171   // Execute closures.
172   closures.RunClosures(call_combiner);
173 }
174 
175 void grpc_transport_stream_op_batch_queue_finish_with_failure(
176     grpc_transport_stream_op_batch* batch, grpc_error_handle error,
177     grpc_core::CallCombinerClosureList* closures) {
178   // Construct a list of closures to execute.
179   if (batch->recv_initial_metadata) {
180     closures->Add(
181         batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
182         error, "failing recv_initial_metadata_ready");
183   }
184   if (batch->recv_message) {
185     closures->Add(batch->payload->recv_message.recv_message_ready, error,
186                   "failing recv_message_ready");
187   }
188   if (batch->recv_trailing_metadata) {
189     closures->Add(
190         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
191         error, "failing recv_trailing_metadata_ready");
192   }
193   if (batch->on_complete != nullptr) {
194     closures->Add(batch->on_complete, error, "failing on_complete");
195   }
196 }
197 
198 void grpc_transport_stream_op_batch_finish_with_failure_from_transport(
199     grpc_transport_stream_op_batch* batch, grpc_error_handle error) {
200   // Construct a list of closures to execute.
201   if (batch->recv_initial_metadata) {
202     grpc_core::ExecCtx::Run(
203         DEBUG_LOCATION,
204         batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
205         error);
206   }
207   if (batch->recv_message) {
208     grpc_core::ExecCtx::Run(
209         DEBUG_LOCATION, batch->payload->recv_message.recv_message_ready, error);
210   }
211   if (batch->recv_trailing_metadata) {
212     grpc_core::ExecCtx::Run(
213         DEBUG_LOCATION,
214         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
215         error);
216   }
217   if (batch->on_complete != nullptr) {
218     grpc_core::ExecCtx::Run(DEBUG_LOCATION, batch->on_complete, error);
219   }
220 }
221 
222 struct made_transport_op {
223   grpc_closure outer_on_complete;
224   grpc_closure* inner_on_complete = nullptr;
225   grpc_transport_op op;
226   made_transport_op() {
227     memset(&outer_on_complete, 0, sizeof(outer_on_complete));
228   }
229 };
230 
231 static void destroy_made_transport_op(void* arg, grpc_error_handle error) {
232   made_transport_op* op = static_cast<made_transport_op*>(arg);
233   grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->inner_on_complete, error);
234   delete op;
235 }
236 
237 grpc_transport_op* grpc_make_transport_op(grpc_closure* on_complete) {
238   made_transport_op* op = new made_transport_op();
239   GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_op, op,
240                     grpc_schedule_on_exec_ctx);
241   op->inner_on_complete = on_complete;
242   op->op.on_consumed = &op->outer_on_complete;
243   return &op->op;
244 }
245 
246 struct made_transport_stream_op {
247   grpc_closure outer_on_complete;
248   grpc_closure* inner_on_complete = nullptr;
249   grpc_transport_stream_op_batch op;
250   grpc_transport_stream_op_batch_payload payload{nullptr};
251 };
252 static void destroy_made_transport_stream_op(void* arg,
253                                              grpc_error_handle error) {
254   made_transport_stream_op* op = static_cast<made_transport_stream_op*>(arg);
255   grpc_closure* c = op->inner_on_complete;
256   delete op;
257   if (c != nullptr) {
258     grpc_core::Closure::Run(DEBUG_LOCATION, c, error);
259   }
260 }
261 
262 grpc_transport_stream_op_batch* grpc_make_transport_stream_op(
263     grpc_closure* on_complete) {
264   made_transport_stream_op* op = new made_transport_stream_op();
265   op->op.payload = &op->payload;
266   GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_stream_op,
267                     op, grpc_schedule_on_exec_ctx);
268   op->inner_on_complete = on_complete;
269   op->op.on_complete = &op->outer_on_complete;
270   return &op->op;
271 }
272 
273 namespace grpc_core {
274 
275 ServerMetadataHandle ServerMetadataFromStatus(const absl::Status& status,
276                                               Arena* arena) {
277   auto hdl = arena->MakePooled<ServerMetadata>(arena);
278   grpc_status_code code;
279   std::string message;
280   grpc_error_get_status(status, Timestamp::InfFuture(), &code, &message,
281                         nullptr, nullptr);
282   hdl->Set(GrpcStatusMetadata(), code);
283   if (!status.ok()) {
284     hdl->Set(GrpcMessageMetadata(), Slice::FromCopiedString(message));
285   }
286   return hdl;
287 }
288 
289 std::string Message::DebugString() const {
290   std::string out = absl::StrCat(payload_.Length(), "b");
291   auto flags = flags_;
292   auto explain = [&flags, &out](uint32_t flag, absl::string_view name) {
293     if (flags & flag) {
294       flags &= ~flag;
295       absl::StrAppend(&out, ":", name);
296     }
297   };
298   explain(GRPC_WRITE_BUFFER_HINT, "write_buffer");
299   explain(GRPC_WRITE_NO_COMPRESS, "no_compress");
300   explain(GRPC_WRITE_THROUGH, "write_through");
301   explain(GRPC_WRITE_INTERNAL_COMPRESS, "compress");
302   explain(GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED, "was_compressed");
303   if (flags != 0) {
304     absl::StrAppend(&out, ":huh=0x", absl::Hex(flags));
305   }
306   return out;
307 }
308 
309 }  // namespace grpc_core
310