1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/inproc/inproc_transport.h"
22
23 #include <stdint.h>
24
25 #include <algorithm>
26 #include <memory>
27 #include <new>
28 #include <string>
29 #include <utility>
30
31 #include "absl/status/status.h"
32 #include "absl/status/statusor.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/string_view.h"
35 #include "absl/types/optional.h"
36
37 #include <grpc/grpc.h>
38 #include <grpc/impl/connectivity_state.h>
39 #include <grpc/status.h>
40 #include <grpc/support/alloc.h>
41 #include <grpc/support/log.h>
42 #include <grpc/support/sync.h>
43
44 #include "src/core/lib/channel/channel_args.h"
45 #include "src/core/lib/channel/channel_args_preconditioning.h"
46 #include "src/core/lib/channel/channelz.h"
47 #include "src/core/lib/config/core_configuration.h"
48 #include "src/core/lib/gprpp/debug_location.h"
49 #include "src/core/lib/gprpp/ref_counted_ptr.h"
50 #include "src/core/lib/gprpp/status_helper.h"
51 #include "src/core/lib/gprpp/time.h"
52 #include "src/core/lib/iomgr/closure.h"
53 #include "src/core/lib/iomgr/endpoint.h"
54 #include "src/core/lib/iomgr/error.h"
55 #include "src/core/lib/iomgr/exec_ctx.h"
56 #include "src/core/lib/iomgr/iomgr_fwd.h"
57 #include "src/core/lib/resource_quota/arena.h"
58 #include "src/core/lib/slice/slice.h"
59 #include "src/core/lib/slice/slice_buffer.h"
60 #include "src/core/lib/surface/api_trace.h"
61 #include "src/core/lib/surface/channel.h"
62 #include "src/core/lib/surface/channel_stack_type.h"
63 #include "src/core/lib/surface/server.h"
64 #include "src/core/lib/transport/connectivity_state.h"
65 #include "src/core/lib/transport/metadata_batch.h"
66 #include "src/core/lib/transport/transport.h"
67 #include "src/core/lib/transport/transport_fwd.h"
68 #include "src/core/lib/transport/transport_impl.h"
69
70 #define INPROC_LOG(...) \
71 do { \
72 if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) { \
73 gpr_log(__VA_ARGS__); \
74 } \
75 } while (0)
76
77 namespace {
78 struct inproc_stream;
79 bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error);
80 void maybe_process_ops_locked(inproc_stream* s, grpc_error_handle error);
81 void op_state_machine_locked(inproc_stream* s, grpc_error_handle error);
82 void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
83 bool is_initial);
84 void fill_in_metadata(inproc_stream* s, const grpc_metadata_batch* metadata,
85 grpc_metadata_batch* out_md, bool* markfilled);
86
ResetSendMessage(grpc_transport_stream_op_batch * batch)87 void ResetSendMessage(grpc_transport_stream_op_batch* batch) {
88 std::exchange(batch->payload->send_message.send_message, nullptr)->Clear();
89 }
90
91 struct shared_mu {
shared_mu__anon483675c30111::shared_mu92 shared_mu() {
93 // Share one lock between both sides since both sides get affected
94 gpr_mu_init(&mu);
95 gpr_ref_init(&refs, 2);
96 }
97
~shared_mu__anon483675c30111::shared_mu98 ~shared_mu() { gpr_mu_destroy(&mu); }
99
100 gpr_mu mu;
101 gpr_refcount refs;
102 };
103
104 struct inproc_transport {
inproc_transport__anon483675c30111::inproc_transport105 inproc_transport(const grpc_transport_vtable* vtable, shared_mu* mu,
106 bool is_client)
107 : mu(mu),
108 is_client(is_client),
109 state_tracker(is_client ? "inproc_client" : "inproc_server",
110 GRPC_CHANNEL_READY) {
111 base.vtable = vtable;
112 // Start each side of transport with 2 refs since they each have a ref
113 // to the other
114 gpr_ref_init(&refs, 2);
115 }
116
~inproc_transport__anon483675c30111::inproc_transport117 ~inproc_transport() {
118 if (gpr_unref(&mu->refs)) {
119 mu->~shared_mu();
120 gpr_free(mu);
121 }
122 }
123
ref__anon483675c30111::inproc_transport124 void ref() {
125 INPROC_LOG(GPR_INFO, "ref_transport %p", this);
126 gpr_ref(&refs);
127 }
128
unref__anon483675c30111::inproc_transport129 void unref() {
130 INPROC_LOG(GPR_INFO, "unref_transport %p", this);
131 if (!gpr_unref(&refs)) {
132 return;
133 }
134 INPROC_LOG(GPR_INFO, "really_destroy_transport %p", this);
135 this->~inproc_transport();
136 gpr_free(this);
137 }
138
139 grpc_transport base;
140 shared_mu* mu;
141 gpr_refcount refs;
142 bool is_client;
143 grpc_core::ConnectivityStateTracker state_tracker;
144 void (*accept_stream_cb)(void* user_data, grpc_transport* transport,
145 const void* server_data);
146 void* accept_stream_data;
147 bool is_closed = false;
148 struct inproc_transport* other_side;
149 struct inproc_stream* stream_list = nullptr;
150 };
151
152 struct inproc_stream {
inproc_stream__anon483675c30111::inproc_stream153 inproc_stream(inproc_transport* t, grpc_stream_refcount* refcount,
154 const void* server_data, grpc_core::Arena* arena)
155 : t(t), refs(refcount), arena(arena) {
156 // Ref this stream right now for ctor and list.
157 ref("inproc_init_stream:init");
158 ref("inproc_init_stream:list");
159
160 stream_list_prev = nullptr;
161 gpr_mu_lock(&t->mu->mu);
162 stream_list_next = t->stream_list;
163 if (t->stream_list) {
164 t->stream_list->stream_list_prev = this;
165 }
166 t->stream_list = this;
167 gpr_mu_unlock(&t->mu->mu);
168
169 if (!server_data) {
170 t->ref();
171 inproc_transport* st = t->other_side;
172 st->ref();
173 other_side = nullptr; // will get filled in soon
174 // Pass the client-side stream address to the server-side for a ref
175 ref("inproc_init_stream:clt"); // ref it now on behalf of server
176 // side to avoid destruction
177 INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p",
178 st->accept_stream_cb, st->accept_stream_data);
179 (*st->accept_stream_cb)(st->accept_stream_data, &st->base, this);
180 } else {
181 // This is the server-side and is being called through accept_stream_cb
182 inproc_stream* cs = const_cast<inproc_stream*>(
183 static_cast<const inproc_stream*>(server_data));
184 other_side = cs;
185 // Ref the server-side stream on behalf of the client now
186 ref("inproc_init_stream:srv");
187
188 // Now we are about to affect the other side, so lock the transport
189 // to make sure that it doesn't get destroyed
190 gpr_mu_lock(&t->mu->mu);
191 cs->other_side = this;
192 // Now transfer from the other side's write_buffer if any to the to_read
193 // buffer
194 if (cs->write_buffer_initial_md_filled) {
195 fill_in_metadata(this, &cs->write_buffer_initial_md,
196 &to_read_initial_md, &to_read_initial_md_filled);
197 deadline = std::min(deadline, cs->write_buffer_deadline);
198 cs->write_buffer_initial_md.Clear();
199 cs->write_buffer_initial_md_filled = false;
200 }
201 if (cs->write_buffer_trailing_md_filled) {
202 fill_in_metadata(this, &cs->write_buffer_trailing_md,
203 &to_read_trailing_md, &to_read_trailing_md_filled);
204 cs->write_buffer_trailing_md.Clear();
205 cs->write_buffer_trailing_md_filled = false;
206 }
207 if (!cs->write_buffer_cancel_error.ok()) {
208 cancel_other_error = cs->write_buffer_cancel_error;
209 cs->write_buffer_cancel_error = absl::OkStatus();
210 maybe_process_ops_locked(this, cancel_other_error);
211 }
212
213 gpr_mu_unlock(&t->mu->mu);
214 }
215 }
216
~inproc_stream__anon483675c30111::inproc_stream217 ~inproc_stream() { t->unref(); }
218
219 #ifndef NDEBUG
220 #define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason)
221 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason)
222 #else
223 #define STREAM_REF(refs, reason) grpc_stream_ref(refs)
224 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs)
225 #endif
ref__anon483675c30111::inproc_stream226 void ref(const char* reason) {
227 INPROC_LOG(GPR_INFO, "ref_stream %p %s", this, reason);
228 STREAM_REF(refs, reason);
229 }
230
unref__anon483675c30111::inproc_stream231 void unref(const char* reason) {
232 INPROC_LOG(GPR_INFO, "unref_stream %p %s", this, reason);
233 STREAM_UNREF(refs, reason);
234 }
235 #undef STREAM_REF
236 #undef STREAM_UNREF
237
238 inproc_transport* t;
239 grpc_stream_refcount* refs;
240 grpc_core::Arena* arena;
241
242 grpc_metadata_batch to_read_initial_md{arena};
243 bool to_read_initial_md_filled = false;
244 grpc_metadata_batch to_read_trailing_md{arena};
245 bool to_read_trailing_md_filled = false;
246 bool ops_needed = false;
247 // Write buffer used only during gap at init time when client-side
248 // stream is set up but server side stream is not yet set up
249 grpc_metadata_batch write_buffer_initial_md{arena};
250 bool write_buffer_initial_md_filled = false;
251 grpc_core::Timestamp write_buffer_deadline =
252 grpc_core::Timestamp::InfFuture();
253 grpc_metadata_batch write_buffer_trailing_md{arena};
254 bool write_buffer_trailing_md_filled = false;
255 grpc_error_handle write_buffer_cancel_error;
256
257 struct inproc_stream* other_side;
258 bool other_side_closed = false; // won't talk anymore
259 bool write_buffer_other_side_closed = false; // on hold
260
261 grpc_transport_stream_op_batch* send_message_op = nullptr;
262 grpc_transport_stream_op_batch* send_trailing_md_op = nullptr;
263 grpc_transport_stream_op_batch* recv_initial_md_op = nullptr;
264 grpc_transport_stream_op_batch* recv_message_op = nullptr;
265 grpc_transport_stream_op_batch* recv_trailing_md_op = nullptr;
266
267 bool initial_md_sent = false;
268 bool trailing_md_sent = false;
269 bool initial_md_recvd = false;
270 bool trailing_md_recvd = false;
271 // The following tracks if the server-side only pretends to have received
272 // trailing metadata since it no longer cares about the RPC. If that is the
273 // case, it is still ok for the client to send trailing metadata (in which
274 // case it will be ignored).
275 bool trailing_md_recvd_implicit_only = false;
276
277 bool closed = false;
278
279 grpc_error_handle cancel_self_error;
280 grpc_error_handle cancel_other_error;
281
282 grpc_core::Timestamp deadline = grpc_core::Timestamp::InfFuture();
283
284 bool listed = true;
285 struct inproc_stream* stream_list_prev;
286 struct inproc_stream* stream_list_next;
287 };
288
log_metadata(const grpc_metadata_batch * md_batch,bool is_client,bool is_initial)289 void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
290 bool is_initial) {
291 std::string prefix = absl::StrCat(
292 "INPROC:", is_initial ? "HDR:" : "TRL:", is_client ? "CLI:" : "SVR:");
293 md_batch->Log([&prefix](absl::string_view key, absl::string_view value) {
294 gpr_log(GPR_INFO, "%s", absl::StrCat(prefix, key, ": ", value).c_str());
295 });
296 }
297
298 namespace {
299
300 class CopySink {
301 public:
CopySink(grpc_metadata_batch * dst)302 explicit CopySink(grpc_metadata_batch* dst) : dst_(dst) {}
303
Encode(const grpc_core::Slice & key,const grpc_core::Slice & value)304 void Encode(const grpc_core::Slice& key, const grpc_core::Slice& value) {
305 dst_->Append(key.as_string_view(), value.AsOwned(),
306 [](absl::string_view, const grpc_core::Slice&) {});
307 }
308
309 template <class T, class V>
Encode(T trait,V value)310 void Encode(T trait, V value) {
311 dst_->Set(trait, value);
312 }
313
314 template <class T>
Encode(T trait,const grpc_core::Slice & value)315 void Encode(T trait, const grpc_core::Slice& value) {
316 dst_->Set(trait, value.AsOwned());
317 }
318
319 private:
320 grpc_metadata_batch* dst_;
321 };
322
323 } // namespace
324
fill_in_metadata(inproc_stream * s,const grpc_metadata_batch * metadata,grpc_metadata_batch * out_md,bool * markfilled)325 void fill_in_metadata(inproc_stream* s, const grpc_metadata_batch* metadata,
326 grpc_metadata_batch* out_md, bool* markfilled) {
327 if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) {
328 log_metadata(metadata, s->t->is_client,
329 metadata->get_pointer(grpc_core::WaitForReady()) != nullptr);
330 }
331
332 if (markfilled != nullptr) {
333 *markfilled = true;
334 }
335
336 // TODO(ctiller): copy the metadata batch, don't rely on a bespoke copy
337 // function. Can only do this once mdelems are out of the way though, too
338 // many edge cases otherwise.
339 out_md->Clear();
340 CopySink sink(out_md);
341 metadata->Encode(&sink);
342 }
343
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)344 int init_stream(grpc_transport* gt, grpc_stream* gs,
345 grpc_stream_refcount* refcount, const void* server_data,
346 grpc_core::Arena* arena) {
347 INPROC_LOG(GPR_INFO, "init_stream %p %p %p", gt, gs, server_data);
348 inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
349 new (gs) inproc_stream(t, refcount, server_data, arena);
350 return 0; // return value is not important
351 }
352
close_stream_locked(inproc_stream * s)353 void close_stream_locked(inproc_stream* s) {
354 if (!s->closed) {
355 // Release the metadata that we would have written out
356 s->write_buffer_initial_md.Clear();
357 s->write_buffer_trailing_md.Clear();
358
359 if (s->listed) {
360 inproc_stream* p = s->stream_list_prev;
361 inproc_stream* n = s->stream_list_next;
362 if (p != nullptr) {
363 p->stream_list_next = n;
364 } else {
365 s->t->stream_list = n;
366 }
367 if (n != nullptr) {
368 n->stream_list_prev = p;
369 }
370 s->listed = false;
371 s->unref("close_stream:list");
372 }
373 s->closed = true;
374 s->unref("close_stream:closing");
375 }
376 }
377
378 // This function means that we are done talking/listening to the other side
close_other_side_locked(inproc_stream * s,const char * reason)379 void close_other_side_locked(inproc_stream* s, const char* reason) {
380 if (s->other_side != nullptr) {
381 // First release the metadata that came from the other side's arena
382 s->to_read_initial_md.Clear();
383 s->to_read_trailing_md.Clear();
384
385 s->other_side->unref(reason);
386 s->other_side_closed = true;
387 s->other_side = nullptr;
388 } else if (!s->other_side_closed) {
389 s->write_buffer_other_side_closed = true;
390 }
391 }
392
393 // Call the on_complete closure associated with this stream_op_batch if
394 // this stream_op_batch is only one of the pending operations for this
395 // stream. This is called when one of the pending operations for the stream
396 // is done and about to be NULLed out
complete_if_batch_end_locked(inproc_stream * s,grpc_error_handle error,grpc_transport_stream_op_batch * op,const char * msg)397 void complete_if_batch_end_locked(inproc_stream* s, grpc_error_handle error,
398 grpc_transport_stream_op_batch* op,
399 const char* msg) {
400 int is_sm = static_cast<int>(op == s->send_message_op);
401 int is_stm = static_cast<int>(op == s->send_trailing_md_op);
402 // TODO(vjpai): We should not consider the recv ops here, since they
403 // have their own callbacks. We should invoke a batch's on_complete
404 // as soon as all of the batch's send ops are complete, even if there
405 // are still recv ops pending.
406 int is_rim = static_cast<int>(op == s->recv_initial_md_op);
407 int is_rm = static_cast<int>(op == s->recv_message_op);
408 int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
409
410 if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
411 INPROC_LOG(GPR_INFO, "%s %p %p %p %s", msg, s, op, op->on_complete,
412 grpc_core::StatusToString(error).c_str());
413 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete, error);
414 }
415 }
416
maybe_process_ops_locked(inproc_stream * s,grpc_error_handle error)417 void maybe_process_ops_locked(inproc_stream* s, grpc_error_handle error) {
418 if (s && (!error.ok() || s->ops_needed)) {
419 s->ops_needed = false;
420 op_state_machine_locked(s, error);
421 }
422 }
423
fail_helper_locked(inproc_stream * s,grpc_error_handle error)424 void fail_helper_locked(inproc_stream* s, grpc_error_handle error) {
425 INPROC_LOG(GPR_INFO, "op_state_machine %p fail_helper", s);
426 // If we're failing this side, we need to make sure that
427 // we also send or have already sent trailing metadata
428 if (!s->trailing_md_sent) {
429 // Send trailing md to the other side indicating cancellation
430 s->trailing_md_sent = true;
431
432 grpc_metadata_batch fake_md(s->arena);
433 inproc_stream* other = s->other_side;
434 grpc_metadata_batch* dest = (other == nullptr)
435 ? &s->write_buffer_trailing_md
436 : &other->to_read_trailing_md;
437 bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
438 : &other->to_read_trailing_md_filled;
439 fill_in_metadata(s, &fake_md, dest, destfilled);
440
441 if (other != nullptr) {
442 if (other->cancel_other_error.ok()) {
443 other->cancel_other_error = error;
444 }
445 maybe_process_ops_locked(other, error);
446 } else if (s->write_buffer_cancel_error.ok()) {
447 s->write_buffer_cancel_error = error;
448 }
449 }
450 if (s->recv_initial_md_op) {
451 grpc_error_handle err;
452 if (!s->t->is_client) {
453 // If this is a server, provide initial metadata with a path and
454 // authority since it expects that as well as no error yet
455 grpc_metadata_batch fake_md(s->arena);
456 fake_md.Set(grpc_core::HttpPathMetadata(),
457 grpc_core::Slice::FromStaticString("/"));
458 fake_md.Set(grpc_core::HttpAuthorityMetadata(),
459 grpc_core::Slice::FromStaticString("inproc-fail"));
460
461 fill_in_metadata(s, &fake_md,
462 s->recv_initial_md_op->payload->recv_initial_metadata
463 .recv_initial_metadata,
464 nullptr);
465 err = absl::OkStatus();
466 } else {
467 err = error;
468 }
469 if (s->recv_initial_md_op->payload->recv_initial_metadata
470 .trailing_metadata_available != nullptr) {
471 // Set to true unconditionally, because we're failing the call, so even
472 // if we haven't actually seen the send_trailing_metadata op from the
473 // other side, we're going to return trailing metadata anyway.
474 *s->recv_initial_md_op->payload->recv_initial_metadata
475 .trailing_metadata_available = true;
476 }
477 INPROC_LOG(GPR_INFO,
478 "fail_helper %p scheduling initial-metadata-ready %s %s", s,
479 grpc_core::StatusToString(error).c_str(),
480 grpc_core::StatusToString(err).c_str());
481 grpc_core::ExecCtx::Run(
482 DEBUG_LOCATION,
483 s->recv_initial_md_op->payload->recv_initial_metadata
484 .recv_initial_metadata_ready,
485 err);
486 // Last use of err so no need to REF and then UNREF it
487
488 complete_if_batch_end_locked(
489 s, error, s->recv_initial_md_op,
490 "fail_helper scheduling recv-initial-metadata-on-complete");
491 s->recv_initial_md_op = nullptr;
492 }
493 if (s->recv_message_op) {
494 INPROC_LOG(GPR_INFO, "fail_helper %p scheduling message-ready %s", s,
495 grpc_core::StatusToString(error).c_str());
496 if (s->recv_message_op->payload->recv_message
497 .call_failed_before_recv_message != nullptr) {
498 *s->recv_message_op->payload->recv_message
499 .call_failed_before_recv_message = true;
500 }
501 grpc_core::ExecCtx::Run(
502 DEBUG_LOCATION,
503 s->recv_message_op->payload->recv_message.recv_message_ready, error);
504 complete_if_batch_end_locked(
505 s, error, s->recv_message_op,
506 "fail_helper scheduling recv-message-on-complete");
507 s->recv_message_op = nullptr;
508 }
509 if (s->send_message_op) {
510 ResetSendMessage(s->send_message_op);
511 complete_if_batch_end_locked(
512 s, error, s->send_message_op,
513 "fail_helper scheduling send-message-on-complete");
514 s->send_message_op = nullptr;
515 }
516 if (s->send_trailing_md_op) {
517 complete_if_batch_end_locked(
518 s, error, s->send_trailing_md_op,
519 "fail_helper scheduling send-trailng-md-on-complete");
520 s->send_trailing_md_op = nullptr;
521 }
522 if (s->recv_trailing_md_op) {
523 INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %s",
524 s, grpc_core::StatusToString(error).c_str());
525 grpc_core::ExecCtx::Run(
526 DEBUG_LOCATION,
527 s->recv_trailing_md_op->payload->recv_trailing_metadata
528 .recv_trailing_metadata_ready,
529 error);
530 INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %s",
531 s, grpc_core::StatusToString(error).c_str());
532 complete_if_batch_end_locked(
533 s, error, s->recv_trailing_md_op,
534 "fail_helper scheduling recv-trailing-metadata-on-complete");
535 s->recv_trailing_md_op = nullptr;
536 }
537 close_other_side_locked(s, "fail_helper:other_side");
538 close_stream_locked(s);
539 }
540
541 // TODO(vjpai): It should not be necessary to drain the incoming byte
542 // stream and create a new one; instead, we should simply pass the byte
543 // stream from the sender directly to the receiver as-is.
544 //
545 // Note that fixing this will also avoid the assumption in this code
546 // that the incoming byte stream's next() call will always return
547 // synchronously. That assumption is true today but may not always be
548 // true in the future.
message_transfer_locked(inproc_stream * sender,inproc_stream * receiver)549 void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) {
550 *receiver->recv_message_op->payload->recv_message.recv_message =
551 std::move(*sender->send_message_op->payload->send_message.send_message);
552 *receiver->recv_message_op->payload->recv_message.flags =
553 sender->send_message_op->payload->send_message.flags;
554
555 INPROC_LOG(GPR_INFO, "message_transfer_locked %p scheduling message-ready",
556 receiver);
557 grpc_core::ExecCtx::Run(
558 DEBUG_LOCATION,
559 receiver->recv_message_op->payload->recv_message.recv_message_ready,
560 absl::OkStatus());
561 complete_if_batch_end_locked(
562 sender, absl::OkStatus(), sender->send_message_op,
563 "message_transfer scheduling sender on_complete");
564 complete_if_batch_end_locked(
565 receiver, absl::OkStatus(), receiver->recv_message_op,
566 "message_transfer scheduling receiver on_complete");
567
568 receiver->recv_message_op = nullptr;
569 sender->send_message_op = nullptr;
570 }
571
op_state_machine_locked(inproc_stream * s,grpc_error_handle error)572 void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
573 // This function gets called when we have contents in the unprocessed reads
574 // Get what we want based on our ops wanted
575 // Schedule our appropriate closures
576 // and then return to ops_needed state if still needed
577
578 grpc_error_handle new_err;
579
580 bool needs_close = false;
581
582 INPROC_LOG(GPR_INFO, "op_state_machine %p", s);
583 // cancellation takes precedence
584 inproc_stream* other = s->other_side;
585
586 if (!s->cancel_self_error.ok()) {
587 fail_helper_locked(s, s->cancel_self_error);
588 goto done;
589 } else if (!s->cancel_other_error.ok()) {
590 fail_helper_locked(s, s->cancel_other_error);
591 goto done;
592 } else if (!error.ok()) {
593 fail_helper_locked(s, error);
594 goto done;
595 }
596
597 if (s->send_message_op && other) {
598 if (other->recv_message_op) {
599 message_transfer_locked(s, other);
600 maybe_process_ops_locked(other, absl::OkStatus());
601 } else if (!s->t->is_client && s->trailing_md_sent) {
602 // A server send will never be matched if the server already sent status
603 ResetSendMessage(s->send_message_op);
604 complete_if_batch_end_locked(
605 s, absl::OkStatus(), s->send_message_op,
606 "op_state_machine scheduling send-message-on-complete case 1");
607 s->send_message_op = nullptr;
608 }
609 }
610 // Pause a send trailing metadata if there is still an outstanding
611 // send message unless we know that the send message will never get
612 // matched to a receive. This happens on the client if the server has
613 // already sent status or on the server if the client has requested
614 // status
615 if (s->send_trailing_md_op &&
616 (!s->send_message_op ||
617 (s->t->is_client &&
618 (s->trailing_md_recvd || s->to_read_trailing_md_filled)) ||
619 (!s->t->is_client && other &&
620 (other->trailing_md_recvd || other->to_read_trailing_md_filled ||
621 other->recv_trailing_md_op)))) {
622 grpc_metadata_batch* dest = (other == nullptr)
623 ? &s->write_buffer_trailing_md
624 : &other->to_read_trailing_md;
625 bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
626 : &other->to_read_trailing_md_filled;
627 if (*destfilled || s->trailing_md_sent) {
628 // The buffer is already in use; that's an error!
629 INPROC_LOG(GPR_INFO, "Extra trailing metadata %p", s);
630 new_err = GRPC_ERROR_CREATE("Extra trailing metadata");
631 fail_helper_locked(s, new_err);
632 goto done;
633 } else {
634 if (!other || !other->closed) {
635 fill_in_metadata(s,
636 s->send_trailing_md_op->payload->send_trailing_metadata
637 .send_trailing_metadata,
638 dest, destfilled);
639 }
640 s->trailing_md_sent = true;
641 if (s->send_trailing_md_op->payload->send_trailing_metadata.sent) {
642 *s->send_trailing_md_op->payload->send_trailing_metadata.sent = true;
643 }
644 if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
645 INPROC_LOG(GPR_INFO,
646 "op_state_machine %p scheduling trailing-metadata-ready", s);
647 grpc_core::ExecCtx::Run(
648 DEBUG_LOCATION,
649 s->recv_trailing_md_op->payload->recv_trailing_metadata
650 .recv_trailing_metadata_ready,
651 absl::OkStatus());
652 INPROC_LOG(GPR_INFO,
653 "op_state_machine %p scheduling trailing-md-on-complete", s);
654 grpc_core::ExecCtx::Run(DEBUG_LOCATION,
655 s->recv_trailing_md_op->on_complete,
656 absl::OkStatus());
657 s->recv_trailing_md_op = nullptr;
658 needs_close = true;
659 }
660 }
661 maybe_process_ops_locked(other, absl::OkStatus());
662 complete_if_batch_end_locked(
663 s, absl::OkStatus(), s->send_trailing_md_op,
664 "op_state_machine scheduling send-trailing-metadata-on-complete");
665 s->send_trailing_md_op = nullptr;
666 }
667 if (s->recv_initial_md_op) {
668 if (s->initial_md_recvd) {
669 new_err = GRPC_ERROR_CREATE("Already recvd initial md");
670 INPROC_LOG(
671 GPR_INFO,
672 "op_state_machine %p scheduling on_complete errors for already "
673 "recvd initial md %s",
674 s, grpc_core::StatusToString(new_err).c_str());
675 fail_helper_locked(s, new_err);
676 goto done;
677 }
678
679 if (s->to_read_initial_md_filled) {
680 s->initial_md_recvd = true;
681 fill_in_metadata(s, &s->to_read_initial_md,
682 s->recv_initial_md_op->payload->recv_initial_metadata
683 .recv_initial_metadata,
684 nullptr);
685 if (s->deadline != grpc_core::Timestamp::InfFuture()) {
686 s->recv_initial_md_op->payload->recv_initial_metadata
687 .recv_initial_metadata->Set(grpc_core::GrpcTimeoutMetadata(),
688 s->deadline);
689 }
690 if (s->recv_initial_md_op->payload->recv_initial_metadata
691 .trailing_metadata_available != nullptr) {
692 *s->recv_initial_md_op->payload->recv_initial_metadata
693 .trailing_metadata_available =
694 (other != nullptr && other->send_trailing_md_op != nullptr);
695 }
696 s->to_read_initial_md.Clear();
697 s->to_read_initial_md_filled = false;
698 grpc_core::ExecCtx::Run(
699 DEBUG_LOCATION,
700 std::exchange(s->recv_initial_md_op->payload->recv_initial_metadata
701 .recv_initial_metadata_ready,
702 nullptr),
703 absl::OkStatus());
704 complete_if_batch_end_locked(
705 s, absl::OkStatus(), s->recv_initial_md_op,
706 "op_state_machine scheduling recv-initial-metadata-on-complete");
707 s->recv_initial_md_op = nullptr;
708 }
709 }
710 if (s->recv_message_op) {
711 if (other && other->send_message_op) {
712 message_transfer_locked(other, s);
713 maybe_process_ops_locked(other, absl::OkStatus());
714 }
715 }
716 if (s->to_read_trailing_md_filled) {
717 if (s->trailing_md_recvd) {
718 if (s->trailing_md_recvd_implicit_only) {
719 INPROC_LOG(GPR_INFO,
720 "op_state_machine %p already implicitly received trailing "
721 "metadata, so ignoring new trailing metadata from client",
722 s);
723 s->to_read_trailing_md.Clear();
724 s->to_read_trailing_md_filled = false;
725 s->trailing_md_recvd_implicit_only = false;
726 } else {
727 new_err = GRPC_ERROR_CREATE("Already recvd trailing md");
728 INPROC_LOG(
729 GPR_INFO,
730 "op_state_machine %p scheduling on_complete errors for already "
731 "recvd trailing md %s",
732 s, grpc_core::StatusToString(new_err).c_str());
733 fail_helper_locked(s, new_err);
734 goto done;
735 }
736 }
737 if (s->recv_message_op != nullptr) {
738 // This message needs to be wrapped up because it will never be
739 // satisfied
740 s->recv_message_op->payload->recv_message.recv_message->reset();
741 INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
742 grpc_core::ExecCtx::Run(
743 DEBUG_LOCATION,
744 s->recv_message_op->payload->recv_message.recv_message_ready,
745 absl::OkStatus());
746 complete_if_batch_end_locked(
747 s, new_err, s->recv_message_op,
748 "op_state_machine scheduling recv-message-on-complete");
749 s->recv_message_op = nullptr;
750 }
751 if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
752 // Nothing further will try to receive from this stream, so finish off
753 // any outstanding send_message op
754 ResetSendMessage(s->send_message_op);
755 s->send_message_op->payload->send_message.stream_write_closed = true;
756 complete_if_batch_end_locked(
757 s, new_err, s->send_message_op,
758 "op_state_machine scheduling send-message-on-complete case 2");
759 s->send_message_op = nullptr;
760 }
761 if (s->recv_trailing_md_op != nullptr) {
762 // We wanted trailing metadata and we got it
763 s->trailing_md_recvd = true;
764 fill_in_metadata(s, &s->to_read_trailing_md,
765 s->recv_trailing_md_op->payload->recv_trailing_metadata
766 .recv_trailing_metadata,
767 nullptr);
768 s->to_read_trailing_md.Clear();
769 s->to_read_trailing_md_filled = false;
770 s->recv_trailing_md_op->payload->recv_trailing_metadata
771 .recv_trailing_metadata->Set(grpc_core::GrpcStatusFromWire(), true);
772
773 // We should schedule the recv_trailing_md_op completion if
774 // 1. this stream is the client-side
775 // 2. this stream is the server-side AND has already sent its trailing md
776 // (If the server hasn't already sent its trailing md, it doesn't
777 // have
778 // a final status, so don't mark this op complete)
779 if (s->t->is_client || s->trailing_md_sent) {
780 grpc_core::ExecCtx::Run(
781 DEBUG_LOCATION,
782 s->recv_trailing_md_op->payload->recv_trailing_metadata
783 .recv_trailing_metadata_ready,
784 absl::OkStatus());
785 grpc_core::ExecCtx::Run(DEBUG_LOCATION,
786 s->recv_trailing_md_op->on_complete,
787 absl::OkStatus());
788 s->recv_trailing_md_op = nullptr;
789 needs_close = s->trailing_md_sent;
790 }
791 } else if (!s->trailing_md_recvd) {
792 INPROC_LOG(
793 GPR_INFO,
794 "op_state_machine %p has trailing md but not yet waiting for it", s);
795 }
796 }
797 if (!s->t->is_client && s->trailing_md_sent &&
798 (s->recv_trailing_md_op != nullptr)) {
799 // In this case, we don't care to receive the write-close from the client
800 // because we have already sent status and the RPC is over as far as we
801 // are concerned.
802 INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling trailing-md-ready %s",
803 s, grpc_core::StatusToString(new_err).c_str());
804 grpc_core::ExecCtx::Run(
805 DEBUG_LOCATION,
806 s->recv_trailing_md_op->payload->recv_trailing_metadata
807 .recv_trailing_metadata_ready,
808 new_err);
809 complete_if_batch_end_locked(
810 s, new_err, s->recv_trailing_md_op,
811 "op_state_machine scheduling recv-trailing-md-on-complete");
812 s->trailing_md_recvd = true;
813 s->recv_trailing_md_op = nullptr;
814 // Since we are only pretending to have received the trailing MD, it would
815 // be ok (not an error) if the client actually sends it later.
816 s->trailing_md_recvd_implicit_only = true;
817 }
818 if (s->trailing_md_recvd && s->recv_message_op) {
819 // No further message will come on this stream, so finish off the
820 // recv_message_op
821 INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
822 s->recv_message_op->payload->recv_message.recv_message->reset();
823 grpc_core::ExecCtx::Run(
824 DEBUG_LOCATION,
825 s->recv_message_op->payload->recv_message.recv_message_ready,
826 absl::OkStatus());
827 complete_if_batch_end_locked(
828 s, new_err, s->recv_message_op,
829 "op_state_machine scheduling recv-message-on-complete");
830 s->recv_message_op = nullptr;
831 }
832 if (s->trailing_md_recvd && s->send_message_op && s->t->is_client) {
833 // Nothing further will try to receive from this stream, so finish off
834 // any outstanding send_message op
835 ResetSendMessage(s->send_message_op);
836 complete_if_batch_end_locked(
837 s, new_err, s->send_message_op,
838 "op_state_machine scheduling send-message-on-complete case 3");
839 s->send_message_op = nullptr;
840 }
841 if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||
842 s->recv_message_op || s->recv_trailing_md_op) {
843 // Didn't get the item we wanted so we still need to get
844 // rescheduled
845 INPROC_LOG(
846 GPR_INFO, "op_state_machine %p still needs closure %p %p %p %p %p", s,
847 s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op,
848 s->recv_message_op, s->recv_trailing_md_op);
849 s->ops_needed = true;
850 }
851 done:
852 if (needs_close) {
853 close_other_side_locked(s, "op_state_machine");
854 close_stream_locked(s);
855 }
856 }
857
cancel_stream_locked(inproc_stream * s,grpc_error_handle error)858 bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error) {
859 bool ret = false; // was the cancel accepted
860 INPROC_LOG(GPR_INFO, "cancel_stream %p with %s", s,
861 grpc_core::StatusToString(error).c_str());
862 if (s->cancel_self_error.ok()) {
863 ret = true;
864 s->cancel_self_error = error;
865 // Catch current value of other before it gets closed off
866 inproc_stream* other = s->other_side;
867 maybe_process_ops_locked(s, s->cancel_self_error);
868 // Send trailing md to the other side indicating cancellation, even if we
869 // already have
870 s->trailing_md_sent = true;
871
872 grpc_metadata_batch cancel_md(s->arena);
873
874 grpc_metadata_batch* dest = (other == nullptr)
875 ? &s->write_buffer_trailing_md
876 : &other->to_read_trailing_md;
877 bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
878 : &other->to_read_trailing_md_filled;
879 fill_in_metadata(s, &cancel_md, dest, destfilled);
880
881 if (other != nullptr) {
882 if (other->cancel_other_error.ok()) {
883 other->cancel_other_error = s->cancel_self_error;
884 }
885 maybe_process_ops_locked(other, other->cancel_other_error);
886 } else if (s->write_buffer_cancel_error.ok()) {
887 s->write_buffer_cancel_error = s->cancel_self_error;
888 }
889
890 // if we are a server and already received trailing md but
891 // couldn't complete that because we hadn't yet sent out trailing
892 // md, now's the chance
893 if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
894 grpc_core::ExecCtx::Run(
895 DEBUG_LOCATION,
896 s->recv_trailing_md_op->payload->recv_trailing_metadata
897 .recv_trailing_metadata_ready,
898 s->cancel_self_error);
899 complete_if_batch_end_locked(
900 s, s->cancel_self_error, s->recv_trailing_md_op,
901 "cancel_stream scheduling trailing-md-on-complete");
902 s->recv_trailing_md_op = nullptr;
903 }
904 }
905
906 close_other_side_locked(s, "cancel_stream:other_side");
907 close_stream_locked(s);
908
909 return ret;
910 }
911
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)912 void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
913 grpc_transport_stream_op_batch* op) {
914 INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
915 inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
916 gpr_mu* mu = &s->t->mu->mu; // save aside in case s gets closed
917 gpr_mu_lock(mu);
918
919 if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) {
920 if (op->send_initial_metadata) {
921 log_metadata(op->payload->send_initial_metadata.send_initial_metadata,
922 s->t->is_client, true);
923 }
924 if (op->send_trailing_metadata) {
925 log_metadata(op->payload->send_trailing_metadata.send_trailing_metadata,
926 s->t->is_client, false);
927 }
928 }
929 grpc_error_handle error;
930 grpc_closure* on_complete = op->on_complete;
931 // TODO(roth): This is a hack needed because we use data inside of the
932 // closure itself to do the barrier calculation (i.e., to ensure that
933 // we don't schedule the closure until all ops in the batch have been
934 // completed). This can go away once we move to a new C++ closure API
935 // that provides the ability to create a barrier closure.
936 if (on_complete == nullptr) {
937 on_complete = op->on_complete =
938 grpc_core::NewClosure([](grpc_error_handle) {});
939 }
940
941 if (op->cancel_stream) {
942 // Call cancel_stream_locked without ref'ing the cancel_error because
943 // this function is responsible to make sure that that field gets unref'ed
944 cancel_stream_locked(s, op->payload->cancel_stream.cancel_error);
945 // this op can complete without an error
946 } else if (!s->cancel_self_error.ok()) {
947 // already self-canceled so still give it an error
948 error = s->cancel_self_error;
949 } else {
950 INPROC_LOG(GPR_INFO, "perform_stream_op %p %s%s%s%s%s%s%s", s,
951 s->t->is_client ? "client" : "server",
952 op->send_initial_metadata ? " send_initial_metadata" : "",
953 op->send_message ? " send_message" : "",
954 op->send_trailing_metadata ? " send_trailing_metadata" : "",
955 op->recv_initial_metadata ? " recv_initial_metadata" : "",
956 op->recv_message ? " recv_message" : "",
957 op->recv_trailing_metadata ? " recv_trailing_metadata" : "");
958 }
959
960 inproc_stream* other = s->other_side;
961 if (error.ok() && (op->send_initial_metadata || op->send_trailing_metadata)) {
962 if (s->t->is_closed) {
963 error = GRPC_ERROR_CREATE("Endpoint already shutdown");
964 }
965 if (error.ok() && op->send_initial_metadata) {
966 grpc_metadata_batch* dest = (other == nullptr)
967 ? &s->write_buffer_initial_md
968 : &other->to_read_initial_md;
969 bool* destfilled = (other == nullptr) ? &s->write_buffer_initial_md_filled
970 : &other->to_read_initial_md_filled;
971 if (*destfilled || s->initial_md_sent) {
972 // The buffer is already in use; that's an error!
973 INPROC_LOG(GPR_INFO, "Extra initial metadata %p", s);
974 error = GRPC_ERROR_CREATE("Extra initial metadata");
975 } else {
976 if (!s->other_side_closed) {
977 fill_in_metadata(
978 s, op->payload->send_initial_metadata.send_initial_metadata, dest,
979 destfilled);
980 }
981 if (s->t->is_client) {
982 grpc_core::Timestamp* dl =
983 (other == nullptr) ? &s->write_buffer_deadline : &other->deadline;
984 *dl = std::min(
985 *dl, op->payload->send_initial_metadata.send_initial_metadata
986 ->get(grpc_core::GrpcTimeoutMetadata())
987 .value_or(grpc_core::Timestamp::InfFuture()));
988 s->initial_md_sent = true;
989 }
990 }
991 maybe_process_ops_locked(other, error);
992 }
993 }
994
995 if (error.ok() && (op->send_message || op->send_trailing_metadata ||
996 op->recv_initial_metadata || op->recv_message ||
997 op->recv_trailing_metadata)) {
998 // Mark ops that need to be processed by the state machine
999 if (op->send_message) {
1000 s->send_message_op = op;
1001 }
1002 if (op->send_trailing_metadata) {
1003 s->send_trailing_md_op = op;
1004 }
1005 if (op->recv_initial_metadata) {
1006 s->recv_initial_md_op = op;
1007 }
1008 if (op->recv_message) {
1009 s->recv_message_op = op;
1010 }
1011 if (op->recv_trailing_metadata) {
1012 s->recv_trailing_md_op = op;
1013 }
1014
1015 // We want to initiate the state machine if:
1016 // 1. We want to send a message and the other side wants to receive
1017 // 2. We want to send trailing metadata and there isn't an unmatched send
1018 // or the other side wants trailing metadata
1019 // 3. We want initial metadata and the other side has sent it
1020 // 4. We want to receive a message and there is a message ready
1021 // 5. There is trailing metadata, even if nothing specifically wants
1022 // that because that can shut down the receive message as well
1023 if ((op->send_message && other && other->recv_message_op != nullptr) ||
1024 (op->send_trailing_metadata &&
1025 (!s->send_message_op || (other && other->recv_trailing_md_op))) ||
1026 (op->recv_initial_metadata && s->to_read_initial_md_filled) ||
1027 (op->recv_message && other && other->send_message_op != nullptr) ||
1028 (s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
1029 op_state_machine_locked(s, error);
1030 } else {
1031 s->ops_needed = true;
1032 }
1033 } else {
1034 if (!error.ok()) {
1035 // Consume any send message that was sent here but that we are not
1036 // pushing to the other side
1037 if (op->send_message) {
1038 ResetSendMessage(op);
1039 }
1040 // Schedule op's closures that we didn't push to op state machine
1041 if (op->recv_initial_metadata) {
1042 if (op->payload->recv_initial_metadata.trailing_metadata_available !=
1043 nullptr) {
1044 // Set to true unconditionally, because we're failing the call, so
1045 // even if we haven't actually seen the send_trailing_metadata op
1046 // from the other side, we're going to return trailing metadata
1047 // anyway.
1048 *op->payload->recv_initial_metadata.trailing_metadata_available =
1049 true;
1050 }
1051 INPROC_LOG(
1052 GPR_INFO,
1053 "perform_stream_op error %p scheduling initial-metadata-ready %s",
1054 s, grpc_core::StatusToString(error).c_str());
1055 grpc_core::ExecCtx::Run(
1056 DEBUG_LOCATION,
1057 op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1058 error);
1059 }
1060 if (op->recv_message) {
1061 INPROC_LOG(
1062 GPR_INFO,
1063 "perform_stream_op error %p scheduling recv message-ready %s", s,
1064 grpc_core::StatusToString(error).c_str());
1065 if (op->payload->recv_message.call_failed_before_recv_message !=
1066 nullptr) {
1067 *op->payload->recv_message.call_failed_before_recv_message = true;
1068 }
1069 grpc_core::ExecCtx::Run(DEBUG_LOCATION,
1070 op->payload->recv_message.recv_message_ready,
1071 error);
1072 }
1073 if (op->recv_trailing_metadata) {
1074 INPROC_LOG(GPR_INFO,
1075 "perform_stream_op error %p scheduling "
1076 "trailing-metadata-ready %s",
1077 s, grpc_core::StatusToString(error).c_str());
1078 grpc_core::ExecCtx::Run(
1079 DEBUG_LOCATION,
1080 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1081 error);
1082 }
1083 }
1084 INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %s", s,
1085 grpc_core::StatusToString(error).c_str());
1086 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_complete, error);
1087 }
1088 gpr_mu_unlock(mu);
1089 }
1090
close_transport_locked(inproc_transport * t)1091 void close_transport_locked(inproc_transport* t) {
1092 INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed);
1093 t->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, absl::Status(),
1094 "close transport");
1095 if (!t->is_closed) {
1096 t->is_closed = true;
1097 // Also end all streams on this transport
1098 while (t->stream_list != nullptr) {
1099 // cancel_stream_locked also adjusts stream list
1100 cancel_stream_locked(
1101 t->stream_list,
1102 grpc_error_set_int(GRPC_ERROR_CREATE("Transport closed"),
1103 grpc_core::StatusIntProperty::kRpcStatus,
1104 GRPC_STATUS_UNAVAILABLE));
1105 }
1106 }
1107 }
1108
perform_transport_op(grpc_transport * gt,grpc_transport_op * op)1109 void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
1110 inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1111 INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", t, op);
1112 gpr_mu_lock(&t->mu->mu);
1113 if (op->start_connectivity_watch != nullptr) {
1114 t->state_tracker.AddWatcher(op->start_connectivity_watch_state,
1115 std::move(op->start_connectivity_watch));
1116 }
1117 if (op->stop_connectivity_watch != nullptr) {
1118 t->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
1119 }
1120 if (op->set_accept_stream) {
1121 t->accept_stream_cb = op->set_accept_stream_fn;
1122 t->accept_stream_data = op->set_accept_stream_user_data;
1123 }
1124 if (op->on_consumed) {
1125 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
1126 }
1127
1128 bool do_close = false;
1129 if (!op->goaway_error.ok()) {
1130 do_close = true;
1131 }
1132 if (!op->disconnect_with_error.ok()) {
1133 do_close = true;
1134 }
1135
1136 if (do_close) {
1137 close_transport_locked(t);
1138 }
1139 gpr_mu_unlock(&t->mu->mu);
1140 }
1141
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)1142 void destroy_stream(grpc_transport* gt, grpc_stream* gs,
1143 grpc_closure* then_schedule_closure) {
1144 INPROC_LOG(GPR_INFO, "destroy_stream %p %p", gs, then_schedule_closure);
1145 inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1146 inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
1147 gpr_mu_lock(&t->mu->mu);
1148 close_stream_locked(s);
1149 gpr_mu_unlock(&t->mu->mu);
1150 s->~inproc_stream();
1151 grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
1152 absl::OkStatus());
1153 }
1154
destroy_transport(grpc_transport * gt)1155 void destroy_transport(grpc_transport* gt) {
1156 inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1157 INPROC_LOG(GPR_INFO, "destroy_transport %p", t);
1158 gpr_mu_lock(&t->mu->mu);
1159 close_transport_locked(t);
1160 gpr_mu_unlock(&t->mu->mu);
1161 t->other_side->unref();
1162 t->unref();
1163 }
1164
1165 //******************************************************************************
1166 // INTEGRATION GLUE
1167 //
1168
set_pollset(grpc_transport *,grpc_stream *,grpc_pollset *)1169 void set_pollset(grpc_transport* /*gt*/, grpc_stream* /*gs*/,
1170 grpc_pollset* /*pollset*/) {
1171 // Nothing to do here
1172 }
1173
set_pollset_set(grpc_transport *,grpc_stream *,grpc_pollset_set *)1174 void set_pollset_set(grpc_transport* /*gt*/, grpc_stream* /*gs*/,
1175 grpc_pollset_set* /*pollset_set*/) {
1176 // Nothing to do here
1177 }
1178
get_endpoint(grpc_transport *)1179 grpc_endpoint* get_endpoint(grpc_transport* /*t*/) { return nullptr; }
1180
1181 const grpc_transport_vtable inproc_vtable = {sizeof(inproc_stream),
1182 true,
1183 "inproc",
1184 init_stream,
1185 nullptr,
1186 set_pollset,
1187 set_pollset_set,
1188 perform_stream_op,
1189 perform_transport_op,
1190 destroy_stream,
1191 destroy_transport,
1192 get_endpoint};
1193
1194 //******************************************************************************
1195 // Main inproc transport functions
1196 //
inproc_transports_create(grpc_transport ** server_transport,grpc_transport ** client_transport)1197 void inproc_transports_create(grpc_transport** server_transport,
1198 grpc_transport** client_transport) {
1199 INPROC_LOG(GPR_INFO, "inproc_transports_create");
1200 shared_mu* mu = new (gpr_malloc(sizeof(*mu))) shared_mu();
1201 inproc_transport* st = new (gpr_malloc(sizeof(*st)))
1202 inproc_transport(&inproc_vtable, mu, /*is_client=*/false);
1203 inproc_transport* ct = new (gpr_malloc(sizeof(*ct)))
1204 inproc_transport(&inproc_vtable, mu, /*is_client=*/true);
1205 st->other_side = ct;
1206 ct->other_side = st;
1207 *server_transport = reinterpret_cast<grpc_transport*>(st);
1208 *client_transport = reinterpret_cast<grpc_transport*>(ct);
1209 }
1210 } // namespace
1211
grpc_inproc_channel_create(grpc_server * server,const grpc_channel_args * args,void *)1212 grpc_channel* grpc_inproc_channel_create(grpc_server* server,
1213 const grpc_channel_args* args,
1214 void* /*reserved*/) {
1215 GRPC_API_TRACE("grpc_inproc_channel_create(server=%p, args=%p)", 2,
1216 (server, args));
1217
1218 grpc_core::ExecCtx exec_ctx;
1219
1220 grpc_core::Server* core_server = grpc_core::Server::FromC(server);
1221 // Remove max_connection_idle and max_connection_age channel arguments since
1222 // those do not apply to inproc transports.
1223 grpc_core::ChannelArgs server_args =
1224 core_server->channel_args()
1225 .Remove(GRPC_ARG_MAX_CONNECTION_IDLE_MS)
1226 .Remove(GRPC_ARG_MAX_CONNECTION_AGE_MS);
1227
1228 // Add a default authority channel argument for the client
1229 grpc_core::ChannelArgs client_args =
1230 grpc_core::CoreConfiguration::Get()
1231 .channel_args_preconditioning()
1232 .PreconditionChannelArgs(args)
1233 .Set(GRPC_ARG_DEFAULT_AUTHORITY, "inproc.authority");
1234 grpc_transport* server_transport;
1235 grpc_transport* client_transport;
1236 inproc_transports_create(&server_transport, &client_transport);
1237
1238 // TODO(ncteisen): design and support channelz GetSocket for inproc.
1239 grpc_error_handle error = core_server->SetupTransport(
1240 server_transport, nullptr, server_args, nullptr);
1241 grpc_channel* channel = nullptr;
1242 if (error.ok()) {
1243 auto new_channel = grpc_core::Channel::Create(
1244 "inproc", client_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport);
1245 if (!new_channel.ok()) {
1246 GPR_ASSERT(!channel);
1247 gpr_log(GPR_ERROR, "Failed to create client channel: %s",
1248 grpc_core::StatusToString(error).c_str());
1249 intptr_t integer;
1250 grpc_status_code status = GRPC_STATUS_INTERNAL;
1251 if (grpc_error_get_int(error, grpc_core::StatusIntProperty::kRpcStatus,
1252 &integer)) {
1253 status = static_cast<grpc_status_code>(integer);
1254 }
1255 // client_transport was destroyed when grpc_channel_create_internal saw an
1256 // error.
1257 grpc_transport_destroy(server_transport);
1258 channel = grpc_lame_client_channel_create(
1259 nullptr, status, "Failed to create client channel");
1260 } else {
1261 channel = new_channel->release()->c_ptr();
1262 }
1263 } else {
1264 GPR_ASSERT(!channel);
1265 gpr_log(GPR_ERROR, "Failed to create server channel: %s",
1266 grpc_core::StatusToString(error).c_str());
1267 intptr_t integer;
1268 grpc_status_code status = GRPC_STATUS_INTERNAL;
1269 if (grpc_error_get_int(error, grpc_core::StatusIntProperty::kRpcStatus,
1270 &integer)) {
1271 status = static_cast<grpc_status_code>(integer);
1272 }
1273 grpc_transport_destroy(client_transport);
1274 grpc_transport_destroy(server_transport);
1275 channel = grpc_lame_client_channel_create(
1276 nullptr, status, "Failed to create server channel");
1277 }
1278
1279 return channel;
1280 }
1281