xref: /aosp_15_r20/external/grpc-grpc/src/core/lib/transport/transport.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/transport/transport.h"
22 
23 #include <string.h>
24 
25 #include <memory>
26 #include <new>
27 
28 #include "absl/status/status.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/string_view.h"
31 
32 #include <grpc/event_engine/event_engine.h>
33 #include <grpc/grpc.h>
34 
35 #include "src/core/lib/event_engine/default_event_engine.h"
36 #include "src/core/lib/gprpp/time.h"
37 #include "src/core/lib/iomgr/exec_ctx.h"
38 #include "src/core/lib/promise/for_each.h"
39 #include "src/core/lib/promise/promise.h"
40 #include "src/core/lib/promise/try_seq.h"
41 #include "src/core/lib/slice/slice.h"
42 #include "src/core/lib/transport/error_utils.h"
43 
44 grpc_core::DebugOnlyTraceFlag grpc_trace_stream_refcount(false,
45                                                          "stream_refcount");
46 
grpc_stream_destroy(grpc_stream_refcount * refcount)47 void grpc_stream_destroy(grpc_stream_refcount* refcount) {
48   if ((grpc_core::ExecCtx::Get()->flags() &
49        GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP)) {
50     // Ick.
51     // The thread we're running on MAY be owned (indirectly) by a call-stack.
52     // If that's the case, destroying the call-stack MAY try to destroy the
53     // thread, which is a tangled mess that we just don't want to ever have to
54     // cope with.
55     // Throw this over to the executor (on a core-owned thread) and process it
56     // there.
57     grpc_event_engine::experimental::GetDefaultEventEngine()->Run([refcount] {
58       grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
59       grpc_core::ExecCtx exec_ctx;
60       grpc_core::ExecCtx::Run(DEBUG_LOCATION, &refcount->destroy,
61                               absl::OkStatus());
62     });
63   } else {
64     grpc_core::ExecCtx::Run(DEBUG_LOCATION, &refcount->destroy,
65                             absl::OkStatus());
66   }
67 }
68 
slice_stream_destroy(void * arg)69 void slice_stream_destroy(void* arg) {
70   grpc_stream_destroy(static_cast<grpc_stream_refcount*>(arg));
71 }
72 
73 #ifndef NDEBUG
grpc_stream_ref_init(grpc_stream_refcount * refcount,int,grpc_iomgr_cb_func cb,void * cb_arg,const char * object_type)74 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int /*initial_refs*/,
75                           grpc_iomgr_cb_func cb, void* cb_arg,
76                           const char* object_type) {
77   refcount->object_type = object_type;
78 #else
79 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int /*initial_refs*/,
80                           grpc_iomgr_cb_func cb, void* cb_arg) {
81 #endif
82   GRPC_CLOSURE_INIT(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx);
83 
84   new (&refcount->refs) grpc_core::RefCount(
85       1, GRPC_TRACE_FLAG_ENABLED(grpc_trace_stream_refcount) ? "stream_refcount"
86                                                              : nullptr);
87 }
88 
89 namespace grpc_core {
90 void Transport::SetPollingEntity(grpc_stream* stream,
91                                  grpc_polling_entity* pollset_or_pollset_set) {
92   if (auto* pollset = grpc_polling_entity_pollset(pollset_or_pollset_set)) {
93     SetPollset(stream, pollset);
94   } else if (auto* pollset_set =
95                  grpc_polling_entity_pollset_set(pollset_or_pollset_set)) {
96     SetPollsetSet(stream, pollset_set);
97   } else {
98     // No-op for empty pollset. Empty pollset is possible when using
99     // non-fd-based event engines such as CFStream.
100   }
101 }
102 }  // namespace grpc_core
103 
104 // This comment should be sung to the tune of
105 // "Supercalifragilisticexpialidocious":
106 //
107 // grpc_transport_stream_op_batch_finish_with_failure
108 // is a function that must always unref cancel_error
109 // though it lives in lib, it handles transport stream ops sure
110 // it's grpc_transport_stream_op_batch_finish_with_failure
111 void grpc_transport_stream_op_batch_finish_with_failure(
112     grpc_transport_stream_op_batch* batch, grpc_error_handle error,
113     grpc_core::CallCombiner* call_combiner) {
114   grpc_core::CallCombinerClosureList closures;
115   grpc_transport_stream_op_batch_queue_finish_with_failure(batch, error,
116                                                            &closures);
117   // Execute closures.
118   closures.RunClosures(call_combiner);
119 }
120 
121 void grpc_transport_stream_op_batch_queue_finish_with_failure(
122     grpc_transport_stream_op_batch* batch, grpc_error_handle error,
123     grpc_core::CallCombinerClosureList* closures) {
124   // Construct a list of closures to execute.
125   if (batch->recv_initial_metadata) {
126     closures->Add(
127         batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
128         error, "failing recv_initial_metadata_ready");
129   }
130   if (batch->recv_message) {
131     closures->Add(batch->payload->recv_message.recv_message_ready, error,
132                   "failing recv_message_ready");
133   }
134   if (batch->recv_trailing_metadata) {
135     closures->Add(
136         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
137         error, "failing recv_trailing_metadata_ready");
138   }
139   if (batch->on_complete != nullptr) {
140     closures->Add(batch->on_complete, error, "failing on_complete");
141   }
142 }
143 
144 void grpc_transport_stream_op_batch_finish_with_failure_from_transport(
145     grpc_transport_stream_op_batch* batch, grpc_error_handle error) {
146   // Construct a list of closures to execute.
147   if (batch->recv_initial_metadata) {
148     grpc_core::ExecCtx::Run(
149         DEBUG_LOCATION,
150         batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
151         error);
152   }
153   if (batch->recv_message) {
154     grpc_core::ExecCtx::Run(
155         DEBUG_LOCATION, batch->payload->recv_message.recv_message_ready, error);
156   }
157   if (batch->recv_trailing_metadata) {
158     grpc_core::ExecCtx::Run(
159         DEBUG_LOCATION,
160         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
161         error);
162   }
163   if (batch->on_complete != nullptr) {
164     grpc_core::ExecCtx::Run(DEBUG_LOCATION, batch->on_complete, error);
165   }
166 }
167 
168 struct made_transport_op {
169   grpc_closure outer_on_complete;
170   grpc_closure* inner_on_complete = nullptr;
171   grpc_transport_op op;
172   made_transport_op() {
173     memset(&outer_on_complete, 0, sizeof(outer_on_complete));
174   }
175 };
176 
177 static void destroy_made_transport_op(void* arg, grpc_error_handle error) {
178   made_transport_op* op = static_cast<made_transport_op*>(arg);
179   grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->inner_on_complete, error);
180   delete op;
181 }
182 
183 grpc_transport_op* grpc_make_transport_op(grpc_closure* on_complete) {
184   made_transport_op* op = new made_transport_op();
185   GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_op, op,
186                     grpc_schedule_on_exec_ctx);
187   op->inner_on_complete = on_complete;
188   op->op.on_consumed = &op->outer_on_complete;
189   return &op->op;
190 }
191 
192 struct made_transport_stream_op {
193   grpc_closure outer_on_complete;
194   grpc_closure* inner_on_complete = nullptr;
195   grpc_transport_stream_op_batch op;
196   grpc_transport_stream_op_batch_payload payload{nullptr};
197 };
198 static void destroy_made_transport_stream_op(void* arg,
199                                              grpc_error_handle error) {
200   made_transport_stream_op* op = static_cast<made_transport_stream_op*>(arg);
201   grpc_closure* c = op->inner_on_complete;
202   delete op;
203   if (c != nullptr) {
204     grpc_core::Closure::Run(DEBUG_LOCATION, c, error);
205   }
206 }
207 
208 grpc_transport_stream_op_batch* grpc_make_transport_stream_op(
209     grpc_closure* on_complete) {
210   made_transport_stream_op* op = new made_transport_stream_op();
211   op->op.payload = &op->payload;
212   GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_stream_op,
213                     op, grpc_schedule_on_exec_ctx);
214   op->inner_on_complete = on_complete;
215   op->op.on_complete = &op->outer_on_complete;
216   return &op->op;
217 }
218