1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/transport/transport.h"
22
23 #include <string.h>
24
25 #include <memory>
26 #include <new>
27
28 #include "absl/status/status.h"
29 #include "absl/strings/str_cat.h"
30
31 #include <grpc/event_engine/event_engine.h>
32 #include <grpc/grpc.h>
33
34 #include "src/core/lib/event_engine/default_event_engine.h"
35 #include "src/core/lib/gpr/alloc.h"
36 #include "src/core/lib/gprpp/time.h"
37 #include "src/core/lib/iomgr/exec_ctx.h"
38 #include "src/core/lib/slice/slice.h"
39 #include "src/core/lib/transport/error_utils.h"
40 #include "src/core/lib/transport/transport_impl.h"
41
42 grpc_core::DebugOnlyTraceFlag grpc_trace_stream_refcount(false,
43 "stream_refcount");
44
grpc_stream_destroy(grpc_stream_refcount * refcount)45 void grpc_stream_destroy(grpc_stream_refcount* refcount) {
46 if ((grpc_core::ExecCtx::Get()->flags() &
47 GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP)) {
48 // Ick.
49 // The thread we're running on MAY be owned (indirectly) by a call-stack.
50 // If that's the case, destroying the call-stack MAY try to destroy the
51 // thread, which is a tangled mess that we just don't want to ever have to
52 // cope with.
53 // Throw this over to the executor (on a core-owned thread) and process it
54 // there.
55 grpc_event_engine::experimental::GetDefaultEventEngine()->Run([refcount] {
56 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
57 grpc_core::ExecCtx exec_ctx;
58 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &refcount->destroy,
59 absl::OkStatus());
60 });
61 } else {
62 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &refcount->destroy,
63 absl::OkStatus());
64 }
65 }
66
slice_stream_destroy(void * arg)67 void slice_stream_destroy(void* arg) {
68 grpc_stream_destroy(static_cast<grpc_stream_refcount*>(arg));
69 }
70
71 #ifndef NDEBUG
grpc_stream_ref_init(grpc_stream_refcount * refcount,int,grpc_iomgr_cb_func cb,void * cb_arg,const char * object_type)72 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int /*initial_refs*/,
73 grpc_iomgr_cb_func cb, void* cb_arg,
74 const char* object_type) {
75 refcount->object_type = object_type;
76 #else
77 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int /*initial_refs*/,
78 grpc_iomgr_cb_func cb, void* cb_arg) {
79 #endif
80 GRPC_CLOSURE_INIT(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx);
81
82 new (&refcount->refs) grpc_core::RefCount(
83 1, GRPC_TRACE_FLAG_ENABLED(grpc_trace_stream_refcount) ? "stream_refcount"
84 : nullptr);
85 }
86
87 static void move64bits(uint64_t* from, uint64_t* to) {
88 *to += *from;
89 *from = 0;
90 }
91
92 void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats* from,
93 grpc_transport_one_way_stats* to) {
94 move64bits(&from->framing_bytes, &to->framing_bytes);
95 move64bits(&from->data_bytes, &to->data_bytes);
96 move64bits(&from->header_bytes, &to->header_bytes);
97 }
98
99 void grpc_transport_move_stats(grpc_transport_stream_stats* from,
100 grpc_transport_stream_stats* to) {
101 grpc_transport_move_one_way_stats(&from->incoming, &to->incoming);
102 grpc_transport_move_one_way_stats(&from->outgoing, &to->outgoing);
103 to->latency = std::exchange(from->latency, gpr_inf_future(GPR_TIMESPAN));
104 }
105
106 size_t grpc_transport_stream_size(grpc_transport* transport) {
107 return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(transport->vtable->sizeof_stream);
108 }
109
110 void grpc_transport_destroy(grpc_transport* transport) {
111 transport->vtable->destroy(transport);
112 }
113
114 int grpc_transport_init_stream(grpc_transport* transport, grpc_stream* stream,
115 grpc_stream_refcount* refcount,
116 const void* server_data,
117 grpc_core::Arena* arena) {
118 return transport->vtable->init_stream(transport, stream, refcount,
119 server_data, arena);
120 }
121
122 void grpc_transport_perform_stream_op(grpc_transport* transport,
123 grpc_stream* stream,
124 grpc_transport_stream_op_batch* op) {
125 transport->vtable->perform_stream_op(transport, stream, op);
126 }
127
128 void grpc_transport_perform_op(grpc_transport* transport,
129 grpc_transport_op* op) {
130 transport->vtable->perform_op(transport, op);
131 }
132
133 void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream,
134 grpc_polling_entity* pollent) {
135 grpc_pollset* pollset;
136 grpc_pollset_set* pollset_set;
137 if ((pollset = grpc_polling_entity_pollset(pollent)) != nullptr) {
138 transport->vtable->set_pollset(transport, stream, pollset);
139 } else if ((pollset_set = grpc_polling_entity_pollset_set(pollent)) !=
140 nullptr) {
141 transport->vtable->set_pollset_set(transport, stream, pollset_set);
142 } else {
143 // No-op for empty pollset. Empty pollset is possible when using
144 // non-fd-based event engines such as CFStream.
145 }
146 }
147
148 void grpc_transport_destroy_stream(grpc_transport* transport,
149 grpc_stream* stream,
150 grpc_closure* then_schedule_closure) {
151 transport->vtable->destroy_stream(transport, stream, then_schedule_closure);
152 }
153
154 grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport) {
155 return transport->vtable->get_endpoint(transport);
156 }
157
158 // This comment should be sung to the tune of
159 // "Supercalifragilisticexpialidocious":
160 //
161 // grpc_transport_stream_op_batch_finish_with_failure
162 // is a function that must always unref cancel_error
163 // though it lives in lib, it handles transport stream ops sure
164 // it's grpc_transport_stream_op_batch_finish_with_failure
165 void grpc_transport_stream_op_batch_finish_with_failure(
166 grpc_transport_stream_op_batch* batch, grpc_error_handle error,
167 grpc_core::CallCombiner* call_combiner) {
168 grpc_core::CallCombinerClosureList closures;
169 grpc_transport_stream_op_batch_queue_finish_with_failure(batch, error,
170 &closures);
171 // Execute closures.
172 closures.RunClosures(call_combiner);
173 }
174
175 void grpc_transport_stream_op_batch_queue_finish_with_failure(
176 grpc_transport_stream_op_batch* batch, grpc_error_handle error,
177 grpc_core::CallCombinerClosureList* closures) {
178 // Construct a list of closures to execute.
179 if (batch->recv_initial_metadata) {
180 closures->Add(
181 batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
182 error, "failing recv_initial_metadata_ready");
183 }
184 if (batch->recv_message) {
185 closures->Add(batch->payload->recv_message.recv_message_ready, error,
186 "failing recv_message_ready");
187 }
188 if (batch->recv_trailing_metadata) {
189 closures->Add(
190 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
191 error, "failing recv_trailing_metadata_ready");
192 }
193 if (batch->on_complete != nullptr) {
194 closures->Add(batch->on_complete, error, "failing on_complete");
195 }
196 }
197
198 void grpc_transport_stream_op_batch_finish_with_failure_from_transport(
199 grpc_transport_stream_op_batch* batch, grpc_error_handle error) {
200 // Construct a list of closures to execute.
201 if (batch->recv_initial_metadata) {
202 grpc_core::ExecCtx::Run(
203 DEBUG_LOCATION,
204 batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
205 error);
206 }
207 if (batch->recv_message) {
208 grpc_core::ExecCtx::Run(
209 DEBUG_LOCATION, batch->payload->recv_message.recv_message_ready, error);
210 }
211 if (batch->recv_trailing_metadata) {
212 grpc_core::ExecCtx::Run(
213 DEBUG_LOCATION,
214 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
215 error);
216 }
217 if (batch->on_complete != nullptr) {
218 grpc_core::ExecCtx::Run(DEBUG_LOCATION, batch->on_complete, error);
219 }
220 }
221
222 struct made_transport_op {
223 grpc_closure outer_on_complete;
224 grpc_closure* inner_on_complete = nullptr;
225 grpc_transport_op op;
226 made_transport_op() {
227 memset(&outer_on_complete, 0, sizeof(outer_on_complete));
228 }
229 };
230
231 static void destroy_made_transport_op(void* arg, grpc_error_handle error) {
232 made_transport_op* op = static_cast<made_transport_op*>(arg);
233 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->inner_on_complete, error);
234 delete op;
235 }
236
237 grpc_transport_op* grpc_make_transport_op(grpc_closure* on_complete) {
238 made_transport_op* op = new made_transport_op();
239 GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_op, op,
240 grpc_schedule_on_exec_ctx);
241 op->inner_on_complete = on_complete;
242 op->op.on_consumed = &op->outer_on_complete;
243 return &op->op;
244 }
245
246 struct made_transport_stream_op {
247 grpc_closure outer_on_complete;
248 grpc_closure* inner_on_complete = nullptr;
249 grpc_transport_stream_op_batch op;
250 grpc_transport_stream_op_batch_payload payload{nullptr};
251 };
252 static void destroy_made_transport_stream_op(void* arg,
253 grpc_error_handle error) {
254 made_transport_stream_op* op = static_cast<made_transport_stream_op*>(arg);
255 grpc_closure* c = op->inner_on_complete;
256 delete op;
257 if (c != nullptr) {
258 grpc_core::Closure::Run(DEBUG_LOCATION, c, error);
259 }
260 }
261
262 grpc_transport_stream_op_batch* grpc_make_transport_stream_op(
263 grpc_closure* on_complete) {
264 made_transport_stream_op* op = new made_transport_stream_op();
265 op->op.payload = &op->payload;
266 GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_stream_op,
267 op, grpc_schedule_on_exec_ctx);
268 op->inner_on_complete = on_complete;
269 op->op.on_complete = &op->outer_on_complete;
270 return &op->op;
271 }
272
273 namespace grpc_core {
274
275 ServerMetadataHandle ServerMetadataFromStatus(const absl::Status& status,
276 Arena* arena) {
277 auto hdl = arena->MakePooled<ServerMetadata>(arena);
278 grpc_status_code code;
279 std::string message;
280 grpc_error_get_status(status, Timestamp::InfFuture(), &code, &message,
281 nullptr, nullptr);
282 hdl->Set(GrpcStatusMetadata(), code);
283 if (!status.ok()) {
284 hdl->Set(GrpcMessageMetadata(), Slice::FromCopiedString(message));
285 }
286 return hdl;
287 }
288
289 std::string Message::DebugString() const {
290 std::string out = absl::StrCat(payload_.Length(), "b");
291 auto flags = flags_;
292 auto explain = [&flags, &out](uint32_t flag, absl::string_view name) {
293 if (flags & flag) {
294 flags &= ~flag;
295 absl::StrAppend(&out, ":", name);
296 }
297 };
298 explain(GRPC_WRITE_BUFFER_HINT, "write_buffer");
299 explain(GRPC_WRITE_NO_COMPRESS, "no_compress");
300 explain(GRPC_WRITE_THROUGH, "write_through");
301 explain(GRPC_WRITE_INTERNAL_COMPRESS, "compress");
302 explain(GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED, "was_compressed");
303 if (flags != 0) {
304 absl::StrAppend(&out, ":huh=0x", absl::Hex(flags));
305 }
306 return out;
307 }
308
309 } // namespace grpc_core
310