1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/transport/inproc/inproc_transport.h"
22 
23 #include <stdint.h>
24 
25 #include <algorithm>
26 #include <memory>
27 #include <new>
28 #include <string>
29 #include <utility>
30 
31 #include "absl/status/status.h"
32 #include "absl/status/statusor.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/string_view.h"
35 #include "absl/types/optional.h"
36 
37 #include <grpc/grpc.h>
38 #include <grpc/impl/connectivity_state.h>
39 #include <grpc/status.h>
40 #include <grpc/support/alloc.h>
41 #include <grpc/support/log.h>
42 #include <grpc/support/sync.h>
43 
44 #include "src/core/lib/channel/channel_args.h"
45 #include "src/core/lib/channel/channel_args_preconditioning.h"
46 #include "src/core/lib/channel/channelz.h"
47 #include "src/core/lib/config/core_configuration.h"
48 #include "src/core/lib/gprpp/debug_location.h"
49 #include "src/core/lib/gprpp/ref_counted_ptr.h"
50 #include "src/core/lib/gprpp/status_helper.h"
51 #include "src/core/lib/gprpp/time.h"
52 #include "src/core/lib/iomgr/closure.h"
53 #include "src/core/lib/iomgr/endpoint.h"
54 #include "src/core/lib/iomgr/error.h"
55 #include "src/core/lib/iomgr/exec_ctx.h"
56 #include "src/core/lib/iomgr/iomgr_fwd.h"
57 #include "src/core/lib/resource_quota/arena.h"
58 #include "src/core/lib/slice/slice.h"
59 #include "src/core/lib/slice/slice_buffer.h"
60 #include "src/core/lib/surface/api_trace.h"
61 #include "src/core/lib/surface/channel.h"
62 #include "src/core/lib/surface/channel_stack_type.h"
63 #include "src/core/lib/surface/server.h"
64 #include "src/core/lib/transport/connectivity_state.h"
65 #include "src/core/lib/transport/metadata_batch.h"
66 #include "src/core/lib/transport/transport.h"
67 #include "src/core/lib/transport/transport_fwd.h"
68 #include "src/core/lib/transport/transport_impl.h"
69 
70 #define INPROC_LOG(...)                               \
71   do {                                                \
72     if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) { \
73       gpr_log(__VA_ARGS__);                           \
74     }                                                 \
75   } while (0)
76 
77 namespace {
78 struct inproc_stream;
79 bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error);
80 void maybe_process_ops_locked(inproc_stream* s, grpc_error_handle error);
81 void op_state_machine_locked(inproc_stream* s, grpc_error_handle error);
82 void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
83                   bool is_initial);
84 void fill_in_metadata(inproc_stream* s, const grpc_metadata_batch* metadata,
85                       grpc_metadata_batch* out_md, bool* markfilled);
86 
ResetSendMessage(grpc_transport_stream_op_batch * batch)87 void ResetSendMessage(grpc_transport_stream_op_batch* batch) {
88   std::exchange(batch->payload->send_message.send_message, nullptr)->Clear();
89 }
90 
91 struct shared_mu {
shared_mu__anon483675c30111::shared_mu92   shared_mu() {
93     // Share one lock between both sides since both sides get affected
94     gpr_mu_init(&mu);
95     gpr_ref_init(&refs, 2);
96   }
97 
~shared_mu__anon483675c30111::shared_mu98   ~shared_mu() { gpr_mu_destroy(&mu); }
99 
100   gpr_mu mu;
101   gpr_refcount refs;
102 };
103 
104 struct inproc_transport {
inproc_transport__anon483675c30111::inproc_transport105   inproc_transport(const grpc_transport_vtable* vtable, shared_mu* mu,
106                    bool is_client)
107       : mu(mu),
108         is_client(is_client),
109         state_tracker(is_client ? "inproc_client" : "inproc_server",
110                       GRPC_CHANNEL_READY) {
111     base.vtable = vtable;
112     // Start each side of transport with 2 refs since they each have a ref
113     // to the other
114     gpr_ref_init(&refs, 2);
115   }
116 
~inproc_transport__anon483675c30111::inproc_transport117   ~inproc_transport() {
118     if (gpr_unref(&mu->refs)) {
119       mu->~shared_mu();
120       gpr_free(mu);
121     }
122   }
123 
ref__anon483675c30111::inproc_transport124   void ref() {
125     INPROC_LOG(GPR_INFO, "ref_transport %p", this);
126     gpr_ref(&refs);
127   }
128 
unref__anon483675c30111::inproc_transport129   void unref() {
130     INPROC_LOG(GPR_INFO, "unref_transport %p", this);
131     if (!gpr_unref(&refs)) {
132       return;
133     }
134     INPROC_LOG(GPR_INFO, "really_destroy_transport %p", this);
135     this->~inproc_transport();
136     gpr_free(this);
137   }
138 
139   grpc_transport base;
140   shared_mu* mu;
141   gpr_refcount refs;
142   bool is_client;
143   grpc_core::ConnectivityStateTracker state_tracker;
144   void (*accept_stream_cb)(void* user_data, grpc_transport* transport,
145                            const void* server_data);
146   void* accept_stream_data;
147   bool is_closed = false;
148   struct inproc_transport* other_side;
149   struct inproc_stream* stream_list = nullptr;
150 };
151 
152 struct inproc_stream {
inproc_stream__anon483675c30111::inproc_stream153   inproc_stream(inproc_transport* t, grpc_stream_refcount* refcount,
154                 const void* server_data, grpc_core::Arena* arena)
155       : t(t), refs(refcount), arena(arena) {
156     // Ref this stream right now for ctor and list.
157     ref("inproc_init_stream:init");
158     ref("inproc_init_stream:list");
159 
160     stream_list_prev = nullptr;
161     gpr_mu_lock(&t->mu->mu);
162     stream_list_next = t->stream_list;
163     if (t->stream_list) {
164       t->stream_list->stream_list_prev = this;
165     }
166     t->stream_list = this;
167     gpr_mu_unlock(&t->mu->mu);
168 
169     if (!server_data) {
170       t->ref();
171       inproc_transport* st = t->other_side;
172       st->ref();
173       other_side = nullptr;  // will get filled in soon
174       // Pass the client-side stream address to the server-side for a ref
175       ref("inproc_init_stream:clt");  // ref it now on behalf of server
176                                       // side to avoid destruction
177       INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p",
178                  st->accept_stream_cb, st->accept_stream_data);
179       (*st->accept_stream_cb)(st->accept_stream_data, &st->base, this);
180     } else {
181       // This is the server-side and is being called through accept_stream_cb
182       inproc_stream* cs = const_cast<inproc_stream*>(
183           static_cast<const inproc_stream*>(server_data));
184       other_side = cs;
185       // Ref the server-side stream on behalf of the client now
186       ref("inproc_init_stream:srv");
187 
188       // Now we are about to affect the other side, so lock the transport
189       // to make sure that it doesn't get destroyed
190       gpr_mu_lock(&t->mu->mu);
191       cs->other_side = this;
192       // Now transfer from the other side's write_buffer if any to the to_read
193       // buffer
194       if (cs->write_buffer_initial_md_filled) {
195         fill_in_metadata(this, &cs->write_buffer_initial_md,
196                          &to_read_initial_md, &to_read_initial_md_filled);
197         deadline = std::min(deadline, cs->write_buffer_deadline);
198         cs->write_buffer_initial_md.Clear();
199         cs->write_buffer_initial_md_filled = false;
200       }
201       if (cs->write_buffer_trailing_md_filled) {
202         fill_in_metadata(this, &cs->write_buffer_trailing_md,
203                          &to_read_trailing_md, &to_read_trailing_md_filled);
204         cs->write_buffer_trailing_md.Clear();
205         cs->write_buffer_trailing_md_filled = false;
206       }
207       if (!cs->write_buffer_cancel_error.ok()) {
208         cancel_other_error = cs->write_buffer_cancel_error;
209         cs->write_buffer_cancel_error = absl::OkStatus();
210         maybe_process_ops_locked(this, cancel_other_error);
211       }
212 
213       gpr_mu_unlock(&t->mu->mu);
214     }
215   }
216 
~inproc_stream__anon483675c30111::inproc_stream217   ~inproc_stream() { t->unref(); }
218 
219 #ifndef NDEBUG
220 #define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason)
221 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason)
222 #else
223 #define STREAM_REF(refs, reason) grpc_stream_ref(refs)
224 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs)
225 #endif
ref__anon483675c30111::inproc_stream226   void ref(const char* reason) {
227     INPROC_LOG(GPR_INFO, "ref_stream %p %s", this, reason);
228     STREAM_REF(refs, reason);
229   }
230 
unref__anon483675c30111::inproc_stream231   void unref(const char* reason) {
232     INPROC_LOG(GPR_INFO, "unref_stream %p %s", this, reason);
233     STREAM_UNREF(refs, reason);
234   }
235 #undef STREAM_REF
236 #undef STREAM_UNREF
237 
238   inproc_transport* t;
239   grpc_stream_refcount* refs;
240   grpc_core::Arena* arena;
241 
242   grpc_metadata_batch to_read_initial_md{arena};
243   bool to_read_initial_md_filled = false;
244   grpc_metadata_batch to_read_trailing_md{arena};
245   bool to_read_trailing_md_filled = false;
246   bool ops_needed = false;
247   // Write buffer used only during gap at init time when client-side
248   // stream is set up but server side stream is not yet set up
249   grpc_metadata_batch write_buffer_initial_md{arena};
250   bool write_buffer_initial_md_filled = false;
251   grpc_core::Timestamp write_buffer_deadline =
252       grpc_core::Timestamp::InfFuture();
253   grpc_metadata_batch write_buffer_trailing_md{arena};
254   bool write_buffer_trailing_md_filled = false;
255   grpc_error_handle write_buffer_cancel_error;
256 
257   struct inproc_stream* other_side;
258   bool other_side_closed = false;               // won't talk anymore
259   bool write_buffer_other_side_closed = false;  // on hold
260 
261   grpc_transport_stream_op_batch* send_message_op = nullptr;
262   grpc_transport_stream_op_batch* send_trailing_md_op = nullptr;
263   grpc_transport_stream_op_batch* recv_initial_md_op = nullptr;
264   grpc_transport_stream_op_batch* recv_message_op = nullptr;
265   grpc_transport_stream_op_batch* recv_trailing_md_op = nullptr;
266 
267   bool initial_md_sent = false;
268   bool trailing_md_sent = false;
269   bool initial_md_recvd = false;
270   bool trailing_md_recvd = false;
271   // The following tracks if the server-side only pretends to have received
272   // trailing metadata since it no longer cares about the RPC. If that is the
273   // case, it is still ok for the client to send trailing metadata (in which
274   // case it will be ignored).
275   bool trailing_md_recvd_implicit_only = false;
276 
277   bool closed = false;
278 
279   grpc_error_handle cancel_self_error;
280   grpc_error_handle cancel_other_error;
281 
282   grpc_core::Timestamp deadline = grpc_core::Timestamp::InfFuture();
283 
284   bool listed = true;
285   struct inproc_stream* stream_list_prev;
286   struct inproc_stream* stream_list_next;
287 };
288 
log_metadata(const grpc_metadata_batch * md_batch,bool is_client,bool is_initial)289 void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
290                   bool is_initial) {
291   std::string prefix = absl::StrCat(
292       "INPROC:", is_initial ? "HDR:" : "TRL:", is_client ? "CLI:" : "SVR:");
293   md_batch->Log([&prefix](absl::string_view key, absl::string_view value) {
294     gpr_log(GPR_INFO, "%s", absl::StrCat(prefix, key, ": ", value).c_str());
295   });
296 }
297 
298 namespace {
299 
300 class CopySink {
301  public:
CopySink(grpc_metadata_batch * dst)302   explicit CopySink(grpc_metadata_batch* dst) : dst_(dst) {}
303 
Encode(const grpc_core::Slice & key,const grpc_core::Slice & value)304   void Encode(const grpc_core::Slice& key, const grpc_core::Slice& value) {
305     dst_->Append(key.as_string_view(), value.AsOwned(),
306                  [](absl::string_view, const grpc_core::Slice&) {});
307   }
308 
309   template <class T, class V>
Encode(T trait,V value)310   void Encode(T trait, V value) {
311     dst_->Set(trait, value);
312   }
313 
314   template <class T>
Encode(T trait,const grpc_core::Slice & value)315   void Encode(T trait, const grpc_core::Slice& value) {
316     dst_->Set(trait, value.AsOwned());
317   }
318 
319  private:
320   grpc_metadata_batch* dst_;
321 };
322 
323 }  // namespace
324 
fill_in_metadata(inproc_stream * s,const grpc_metadata_batch * metadata,grpc_metadata_batch * out_md,bool * markfilled)325 void fill_in_metadata(inproc_stream* s, const grpc_metadata_batch* metadata,
326                       grpc_metadata_batch* out_md, bool* markfilled) {
327   if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) {
328     log_metadata(metadata, s->t->is_client,
329                  metadata->get_pointer(grpc_core::WaitForReady()) != nullptr);
330   }
331 
332   if (markfilled != nullptr) {
333     *markfilled = true;
334   }
335 
336   // TODO(ctiller): copy the metadata batch, don't rely on a bespoke copy
337   // function. Can only do this once mdelems are out of the way though, too
338   // many edge cases otherwise.
339   out_md->Clear();
340   CopySink sink(out_md);
341   metadata->Encode(&sink);
342 }
343 
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)344 int init_stream(grpc_transport* gt, grpc_stream* gs,
345                 grpc_stream_refcount* refcount, const void* server_data,
346                 grpc_core::Arena* arena) {
347   INPROC_LOG(GPR_INFO, "init_stream %p %p %p", gt, gs, server_data);
348   inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
349   new (gs) inproc_stream(t, refcount, server_data, arena);
350   return 0;  // return value is not important
351 }
352 
close_stream_locked(inproc_stream * s)353 void close_stream_locked(inproc_stream* s) {
354   if (!s->closed) {
355     // Release the metadata that we would have written out
356     s->write_buffer_initial_md.Clear();
357     s->write_buffer_trailing_md.Clear();
358 
359     if (s->listed) {
360       inproc_stream* p = s->stream_list_prev;
361       inproc_stream* n = s->stream_list_next;
362       if (p != nullptr) {
363         p->stream_list_next = n;
364       } else {
365         s->t->stream_list = n;
366       }
367       if (n != nullptr) {
368         n->stream_list_prev = p;
369       }
370       s->listed = false;
371       s->unref("close_stream:list");
372     }
373     s->closed = true;
374     s->unref("close_stream:closing");
375   }
376 }
377 
378 // This function means that we are done talking/listening to the other side
close_other_side_locked(inproc_stream * s,const char * reason)379 void close_other_side_locked(inproc_stream* s, const char* reason) {
380   if (s->other_side != nullptr) {
381     // First release the metadata that came from the other side's arena
382     s->to_read_initial_md.Clear();
383     s->to_read_trailing_md.Clear();
384 
385     s->other_side->unref(reason);
386     s->other_side_closed = true;
387     s->other_side = nullptr;
388   } else if (!s->other_side_closed) {
389     s->write_buffer_other_side_closed = true;
390   }
391 }
392 
393 // Call the on_complete closure associated with this stream_op_batch if
394 // this stream_op_batch is only one of the pending operations for this
395 // stream. This is called when one of the pending operations for the stream
396 // is done and about to be NULLed out
complete_if_batch_end_locked(inproc_stream * s,grpc_error_handle error,grpc_transport_stream_op_batch * op,const char * msg)397 void complete_if_batch_end_locked(inproc_stream* s, grpc_error_handle error,
398                                   grpc_transport_stream_op_batch* op,
399                                   const char* msg) {
400   int is_sm = static_cast<int>(op == s->send_message_op);
401   int is_stm = static_cast<int>(op == s->send_trailing_md_op);
402   // TODO(vjpai): We should not consider the recv ops here, since they
403   // have their own callbacks.  We should invoke a batch's on_complete
404   // as soon as all of the batch's send ops are complete, even if there
405   // are still recv ops pending.
406   int is_rim = static_cast<int>(op == s->recv_initial_md_op);
407   int is_rm = static_cast<int>(op == s->recv_message_op);
408   int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
409 
410   if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
411     INPROC_LOG(GPR_INFO, "%s %p %p %p %s", msg, s, op, op->on_complete,
412                grpc_core::StatusToString(error).c_str());
413     grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete, error);
414   }
415 }
416 
maybe_process_ops_locked(inproc_stream * s,grpc_error_handle error)417 void maybe_process_ops_locked(inproc_stream* s, grpc_error_handle error) {
418   if (s && (!error.ok() || s->ops_needed)) {
419     s->ops_needed = false;
420     op_state_machine_locked(s, error);
421   }
422 }
423 
fail_helper_locked(inproc_stream * s,grpc_error_handle error)424 void fail_helper_locked(inproc_stream* s, grpc_error_handle error) {
425   INPROC_LOG(GPR_INFO, "op_state_machine %p fail_helper", s);
426   // If we're failing this side, we need to make sure that
427   // we also send or have already sent trailing metadata
428   if (!s->trailing_md_sent) {
429     // Send trailing md to the other side indicating cancellation
430     s->trailing_md_sent = true;
431 
432     grpc_metadata_batch fake_md(s->arena);
433     inproc_stream* other = s->other_side;
434     grpc_metadata_batch* dest = (other == nullptr)
435                                     ? &s->write_buffer_trailing_md
436                                     : &other->to_read_trailing_md;
437     bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
438                                           : &other->to_read_trailing_md_filled;
439     fill_in_metadata(s, &fake_md, dest, destfilled);
440 
441     if (other != nullptr) {
442       if (other->cancel_other_error.ok()) {
443         other->cancel_other_error = error;
444       }
445       maybe_process_ops_locked(other, error);
446     } else if (s->write_buffer_cancel_error.ok()) {
447       s->write_buffer_cancel_error = error;
448     }
449   }
450   if (s->recv_initial_md_op) {
451     grpc_error_handle err;
452     if (!s->t->is_client) {
453       // If this is a server, provide initial metadata with a path and
454       // authority since it expects that as well as no error yet
455       grpc_metadata_batch fake_md(s->arena);
456       fake_md.Set(grpc_core::HttpPathMetadata(),
457                   grpc_core::Slice::FromStaticString("/"));
458       fake_md.Set(grpc_core::HttpAuthorityMetadata(),
459                   grpc_core::Slice::FromStaticString("inproc-fail"));
460 
461       fill_in_metadata(s, &fake_md,
462                        s->recv_initial_md_op->payload->recv_initial_metadata
463                            .recv_initial_metadata,
464                        nullptr);
465       err = absl::OkStatus();
466     } else {
467       err = error;
468     }
469     if (s->recv_initial_md_op->payload->recv_initial_metadata
470             .trailing_metadata_available != nullptr) {
471       // Set to true unconditionally, because we're failing the call, so even
472       // if we haven't actually seen the send_trailing_metadata op from the
473       // other side, we're going to return trailing metadata anyway.
474       *s->recv_initial_md_op->payload->recv_initial_metadata
475            .trailing_metadata_available = true;
476     }
477     INPROC_LOG(GPR_INFO,
478                "fail_helper %p scheduling initial-metadata-ready %s %s", s,
479                grpc_core::StatusToString(error).c_str(),
480                grpc_core::StatusToString(err).c_str());
481     grpc_core::ExecCtx::Run(
482         DEBUG_LOCATION,
483         s->recv_initial_md_op->payload->recv_initial_metadata
484             .recv_initial_metadata_ready,
485         err);
486     // Last use of err so no need to REF and then UNREF it
487 
488     complete_if_batch_end_locked(
489         s, error, s->recv_initial_md_op,
490         "fail_helper scheduling recv-initial-metadata-on-complete");
491     s->recv_initial_md_op = nullptr;
492   }
493   if (s->recv_message_op) {
494     INPROC_LOG(GPR_INFO, "fail_helper %p scheduling message-ready %s", s,
495                grpc_core::StatusToString(error).c_str());
496     if (s->recv_message_op->payload->recv_message
497             .call_failed_before_recv_message != nullptr) {
498       *s->recv_message_op->payload->recv_message
499            .call_failed_before_recv_message = true;
500     }
501     grpc_core::ExecCtx::Run(
502         DEBUG_LOCATION,
503         s->recv_message_op->payload->recv_message.recv_message_ready, error);
504     complete_if_batch_end_locked(
505         s, error, s->recv_message_op,
506         "fail_helper scheduling recv-message-on-complete");
507     s->recv_message_op = nullptr;
508   }
509   if (s->send_message_op) {
510     ResetSendMessage(s->send_message_op);
511     complete_if_batch_end_locked(
512         s, error, s->send_message_op,
513         "fail_helper scheduling send-message-on-complete");
514     s->send_message_op = nullptr;
515   }
516   if (s->send_trailing_md_op) {
517     complete_if_batch_end_locked(
518         s, error, s->send_trailing_md_op,
519         "fail_helper scheduling send-trailng-md-on-complete");
520     s->send_trailing_md_op = nullptr;
521   }
522   if (s->recv_trailing_md_op) {
523     INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %s",
524                s, grpc_core::StatusToString(error).c_str());
525     grpc_core::ExecCtx::Run(
526         DEBUG_LOCATION,
527         s->recv_trailing_md_op->payload->recv_trailing_metadata
528             .recv_trailing_metadata_ready,
529         error);
530     INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %s",
531                s, grpc_core::StatusToString(error).c_str());
532     complete_if_batch_end_locked(
533         s, error, s->recv_trailing_md_op,
534         "fail_helper scheduling recv-trailing-metadata-on-complete");
535     s->recv_trailing_md_op = nullptr;
536   }
537   close_other_side_locked(s, "fail_helper:other_side");
538   close_stream_locked(s);
539 }
540 
541 // TODO(vjpai): It should not be necessary to drain the incoming byte
542 // stream and create a new one; instead, we should simply pass the byte
543 // stream from the sender directly to the receiver as-is.
544 //
545 // Note that fixing this will also avoid the assumption in this code
546 // that the incoming byte stream's next() call will always return
547 // synchronously.  That assumption is true today but may not always be
548 // true in the future.
message_transfer_locked(inproc_stream * sender,inproc_stream * receiver)549 void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) {
550   *receiver->recv_message_op->payload->recv_message.recv_message =
551       std::move(*sender->send_message_op->payload->send_message.send_message);
552   *receiver->recv_message_op->payload->recv_message.flags =
553       sender->send_message_op->payload->send_message.flags;
554 
555   INPROC_LOG(GPR_INFO, "message_transfer_locked %p scheduling message-ready",
556              receiver);
557   grpc_core::ExecCtx::Run(
558       DEBUG_LOCATION,
559       receiver->recv_message_op->payload->recv_message.recv_message_ready,
560       absl::OkStatus());
561   complete_if_batch_end_locked(
562       sender, absl::OkStatus(), sender->send_message_op,
563       "message_transfer scheduling sender on_complete");
564   complete_if_batch_end_locked(
565       receiver, absl::OkStatus(), receiver->recv_message_op,
566       "message_transfer scheduling receiver on_complete");
567 
568   receiver->recv_message_op = nullptr;
569   sender->send_message_op = nullptr;
570 }
571 
op_state_machine_locked(inproc_stream * s,grpc_error_handle error)572 void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
573   // This function gets called when we have contents in the unprocessed reads
574   // Get what we want based on our ops wanted
575   // Schedule our appropriate closures
576   // and then return to ops_needed state if still needed
577 
578   grpc_error_handle new_err;
579 
580   bool needs_close = false;
581 
582   INPROC_LOG(GPR_INFO, "op_state_machine %p", s);
583   // cancellation takes precedence
584   inproc_stream* other = s->other_side;
585 
586   if (!s->cancel_self_error.ok()) {
587     fail_helper_locked(s, s->cancel_self_error);
588     goto done;
589   } else if (!s->cancel_other_error.ok()) {
590     fail_helper_locked(s, s->cancel_other_error);
591     goto done;
592   } else if (!error.ok()) {
593     fail_helper_locked(s, error);
594     goto done;
595   }
596 
597   if (s->send_message_op && other) {
598     if (other->recv_message_op) {
599       message_transfer_locked(s, other);
600       maybe_process_ops_locked(other, absl::OkStatus());
601     } else if (!s->t->is_client && s->trailing_md_sent) {
602       // A server send will never be matched if the server already sent status
603       ResetSendMessage(s->send_message_op);
604       complete_if_batch_end_locked(
605           s, absl::OkStatus(), s->send_message_op,
606           "op_state_machine scheduling send-message-on-complete case 1");
607       s->send_message_op = nullptr;
608     }
609   }
610   // Pause a send trailing metadata if there is still an outstanding
611   // send message unless we know that the send message will never get
612   // matched to a receive. This happens on the client if the server has
613   // already sent status or on the server if the client has requested
614   // status
615   if (s->send_trailing_md_op &&
616       (!s->send_message_op ||
617        (s->t->is_client &&
618         (s->trailing_md_recvd || s->to_read_trailing_md_filled)) ||
619        (!s->t->is_client && other &&
620         (other->trailing_md_recvd || other->to_read_trailing_md_filled ||
621          other->recv_trailing_md_op)))) {
622     grpc_metadata_batch* dest = (other == nullptr)
623                                     ? &s->write_buffer_trailing_md
624                                     : &other->to_read_trailing_md;
625     bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
626                                           : &other->to_read_trailing_md_filled;
627     if (*destfilled || s->trailing_md_sent) {
628       // The buffer is already in use; that's an error!
629       INPROC_LOG(GPR_INFO, "Extra trailing metadata %p", s);
630       new_err = GRPC_ERROR_CREATE("Extra trailing metadata");
631       fail_helper_locked(s, new_err);
632       goto done;
633     } else {
634       if (!other || !other->closed) {
635         fill_in_metadata(s,
636                          s->send_trailing_md_op->payload->send_trailing_metadata
637                              .send_trailing_metadata,
638                          dest, destfilled);
639       }
640       s->trailing_md_sent = true;
641       if (s->send_trailing_md_op->payload->send_trailing_metadata.sent) {
642         *s->send_trailing_md_op->payload->send_trailing_metadata.sent = true;
643       }
644       if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
645         INPROC_LOG(GPR_INFO,
646                    "op_state_machine %p scheduling trailing-metadata-ready", s);
647         grpc_core::ExecCtx::Run(
648             DEBUG_LOCATION,
649             s->recv_trailing_md_op->payload->recv_trailing_metadata
650                 .recv_trailing_metadata_ready,
651             absl::OkStatus());
652         INPROC_LOG(GPR_INFO,
653                    "op_state_machine %p scheduling trailing-md-on-complete", s);
654         grpc_core::ExecCtx::Run(DEBUG_LOCATION,
655                                 s->recv_trailing_md_op->on_complete,
656                                 absl::OkStatus());
657         s->recv_trailing_md_op = nullptr;
658         needs_close = true;
659       }
660     }
661     maybe_process_ops_locked(other, absl::OkStatus());
662     complete_if_batch_end_locked(
663         s, absl::OkStatus(), s->send_trailing_md_op,
664         "op_state_machine scheduling send-trailing-metadata-on-complete");
665     s->send_trailing_md_op = nullptr;
666   }
667   if (s->recv_initial_md_op) {
668     if (s->initial_md_recvd) {
669       new_err = GRPC_ERROR_CREATE("Already recvd initial md");
670       INPROC_LOG(
671           GPR_INFO,
672           "op_state_machine %p scheduling on_complete errors for already "
673           "recvd initial md %s",
674           s, grpc_core::StatusToString(new_err).c_str());
675       fail_helper_locked(s, new_err);
676       goto done;
677     }
678 
679     if (s->to_read_initial_md_filled) {
680       s->initial_md_recvd = true;
681       fill_in_metadata(s, &s->to_read_initial_md,
682                        s->recv_initial_md_op->payload->recv_initial_metadata
683                            .recv_initial_metadata,
684                        nullptr);
685       if (s->deadline != grpc_core::Timestamp::InfFuture()) {
686         s->recv_initial_md_op->payload->recv_initial_metadata
687             .recv_initial_metadata->Set(grpc_core::GrpcTimeoutMetadata(),
688                                         s->deadline);
689       }
690       if (s->recv_initial_md_op->payload->recv_initial_metadata
691               .trailing_metadata_available != nullptr) {
692         *s->recv_initial_md_op->payload->recv_initial_metadata
693              .trailing_metadata_available =
694             (other != nullptr && other->send_trailing_md_op != nullptr);
695       }
696       s->to_read_initial_md.Clear();
697       s->to_read_initial_md_filled = false;
698       grpc_core::ExecCtx::Run(
699           DEBUG_LOCATION,
700           std::exchange(s->recv_initial_md_op->payload->recv_initial_metadata
701                             .recv_initial_metadata_ready,
702                         nullptr),
703           absl::OkStatus());
704       complete_if_batch_end_locked(
705           s, absl::OkStatus(), s->recv_initial_md_op,
706           "op_state_machine scheduling recv-initial-metadata-on-complete");
707       s->recv_initial_md_op = nullptr;
708     }
709   }
710   if (s->recv_message_op) {
711     if (other && other->send_message_op) {
712       message_transfer_locked(other, s);
713       maybe_process_ops_locked(other, absl::OkStatus());
714     }
715   }
716   if (s->to_read_trailing_md_filled) {
717     if (s->trailing_md_recvd) {
718       if (s->trailing_md_recvd_implicit_only) {
719         INPROC_LOG(GPR_INFO,
720                    "op_state_machine %p already implicitly received trailing "
721                    "metadata, so ignoring new trailing metadata from client",
722                    s);
723         s->to_read_trailing_md.Clear();
724         s->to_read_trailing_md_filled = false;
725         s->trailing_md_recvd_implicit_only = false;
726       } else {
727         new_err = GRPC_ERROR_CREATE("Already recvd trailing md");
728         INPROC_LOG(
729             GPR_INFO,
730             "op_state_machine %p scheduling on_complete errors for already "
731             "recvd trailing md %s",
732             s, grpc_core::StatusToString(new_err).c_str());
733         fail_helper_locked(s, new_err);
734         goto done;
735       }
736     }
737     if (s->recv_message_op != nullptr) {
738       // This message needs to be wrapped up because it will never be
739       // satisfied
740       s->recv_message_op->payload->recv_message.recv_message->reset();
741       INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
742       grpc_core::ExecCtx::Run(
743           DEBUG_LOCATION,
744           s->recv_message_op->payload->recv_message.recv_message_ready,
745           absl::OkStatus());
746       complete_if_batch_end_locked(
747           s, new_err, s->recv_message_op,
748           "op_state_machine scheduling recv-message-on-complete");
749       s->recv_message_op = nullptr;
750     }
751     if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
752       // Nothing further will try to receive from this stream, so finish off
753       // any outstanding send_message op
754       ResetSendMessage(s->send_message_op);
755       s->send_message_op->payload->send_message.stream_write_closed = true;
756       complete_if_batch_end_locked(
757           s, new_err, s->send_message_op,
758           "op_state_machine scheduling send-message-on-complete case 2");
759       s->send_message_op = nullptr;
760     }
761     if (s->recv_trailing_md_op != nullptr) {
762       // We wanted trailing metadata and we got it
763       s->trailing_md_recvd = true;
764       fill_in_metadata(s, &s->to_read_trailing_md,
765                        s->recv_trailing_md_op->payload->recv_trailing_metadata
766                            .recv_trailing_metadata,
767                        nullptr);
768       s->to_read_trailing_md.Clear();
769       s->to_read_trailing_md_filled = false;
770       s->recv_trailing_md_op->payload->recv_trailing_metadata
771           .recv_trailing_metadata->Set(grpc_core::GrpcStatusFromWire(), true);
772 
773       // We should schedule the recv_trailing_md_op completion if
774       // 1. this stream is the client-side
775       // 2. this stream is the server-side AND has already sent its trailing md
776       //    (If the server hasn't already sent its trailing md, it doesn't
777       //    have
778       //     a final status, so don't mark this op complete)
779       if (s->t->is_client || s->trailing_md_sent) {
780         grpc_core::ExecCtx::Run(
781             DEBUG_LOCATION,
782             s->recv_trailing_md_op->payload->recv_trailing_metadata
783                 .recv_trailing_metadata_ready,
784             absl::OkStatus());
785         grpc_core::ExecCtx::Run(DEBUG_LOCATION,
786                                 s->recv_trailing_md_op->on_complete,
787                                 absl::OkStatus());
788         s->recv_trailing_md_op = nullptr;
789         needs_close = s->trailing_md_sent;
790       }
791     } else if (!s->trailing_md_recvd) {
792       INPROC_LOG(
793           GPR_INFO,
794           "op_state_machine %p has trailing md but not yet waiting for it", s);
795     }
796   }
797   if (!s->t->is_client && s->trailing_md_sent &&
798       (s->recv_trailing_md_op != nullptr)) {
799     // In this case, we don't care to receive the write-close from the client
800     // because we have already sent status and the RPC is over as far as we
801     // are concerned.
802     INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling trailing-md-ready %s",
803                s, grpc_core::StatusToString(new_err).c_str());
804     grpc_core::ExecCtx::Run(
805         DEBUG_LOCATION,
806         s->recv_trailing_md_op->payload->recv_trailing_metadata
807             .recv_trailing_metadata_ready,
808         new_err);
809     complete_if_batch_end_locked(
810         s, new_err, s->recv_trailing_md_op,
811         "op_state_machine scheduling recv-trailing-md-on-complete");
812     s->trailing_md_recvd = true;
813     s->recv_trailing_md_op = nullptr;
814     // Since we are only pretending to have received the trailing MD, it would
815     // be ok (not an error) if the client actually sends it later.
816     s->trailing_md_recvd_implicit_only = true;
817   }
818   if (s->trailing_md_recvd && s->recv_message_op) {
819     // No further message will come on this stream, so finish off the
820     // recv_message_op
821     INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
822     s->recv_message_op->payload->recv_message.recv_message->reset();
823     grpc_core::ExecCtx::Run(
824         DEBUG_LOCATION,
825         s->recv_message_op->payload->recv_message.recv_message_ready,
826         absl::OkStatus());
827     complete_if_batch_end_locked(
828         s, new_err, s->recv_message_op,
829         "op_state_machine scheduling recv-message-on-complete");
830     s->recv_message_op = nullptr;
831   }
832   if (s->trailing_md_recvd && s->send_message_op && s->t->is_client) {
833     // Nothing further will try to receive from this stream, so finish off
834     // any outstanding send_message op
835     ResetSendMessage(s->send_message_op);
836     complete_if_batch_end_locked(
837         s, new_err, s->send_message_op,
838         "op_state_machine scheduling send-message-on-complete case 3");
839     s->send_message_op = nullptr;
840   }
841   if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||
842       s->recv_message_op || s->recv_trailing_md_op) {
843     // Didn't get the item we wanted so we still need to get
844     // rescheduled
845     INPROC_LOG(
846         GPR_INFO, "op_state_machine %p still needs closure %p %p %p %p %p", s,
847         s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op,
848         s->recv_message_op, s->recv_trailing_md_op);
849     s->ops_needed = true;
850   }
851 done:
852   if (needs_close) {
853     close_other_side_locked(s, "op_state_machine");
854     close_stream_locked(s);
855   }
856 }
857 
cancel_stream_locked(inproc_stream * s,grpc_error_handle error)858 bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error) {
859   bool ret = false;  // was the cancel accepted
860   INPROC_LOG(GPR_INFO, "cancel_stream %p with %s", s,
861              grpc_core::StatusToString(error).c_str());
862   if (s->cancel_self_error.ok()) {
863     ret = true;
864     s->cancel_self_error = error;
865     // Catch current value of other before it gets closed off
866     inproc_stream* other = s->other_side;
867     maybe_process_ops_locked(s, s->cancel_self_error);
868     // Send trailing md to the other side indicating cancellation, even if we
869     // already have
870     s->trailing_md_sent = true;
871 
872     grpc_metadata_batch cancel_md(s->arena);
873 
874     grpc_metadata_batch* dest = (other == nullptr)
875                                     ? &s->write_buffer_trailing_md
876                                     : &other->to_read_trailing_md;
877     bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
878                                           : &other->to_read_trailing_md_filled;
879     fill_in_metadata(s, &cancel_md, dest, destfilled);
880 
881     if (other != nullptr) {
882       if (other->cancel_other_error.ok()) {
883         other->cancel_other_error = s->cancel_self_error;
884       }
885       maybe_process_ops_locked(other, other->cancel_other_error);
886     } else if (s->write_buffer_cancel_error.ok()) {
887       s->write_buffer_cancel_error = s->cancel_self_error;
888     }
889 
890     // if we are a server and already received trailing md but
891     // couldn't complete that because we hadn't yet sent out trailing
892     // md, now's the chance
893     if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
894       grpc_core::ExecCtx::Run(
895           DEBUG_LOCATION,
896           s->recv_trailing_md_op->payload->recv_trailing_metadata
897               .recv_trailing_metadata_ready,
898           s->cancel_self_error);
899       complete_if_batch_end_locked(
900           s, s->cancel_self_error, s->recv_trailing_md_op,
901           "cancel_stream scheduling trailing-md-on-complete");
902       s->recv_trailing_md_op = nullptr;
903     }
904   }
905 
906   close_other_side_locked(s, "cancel_stream:other_side");
907   close_stream_locked(s);
908 
909   return ret;
910 }
911 
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)912 void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
913                        grpc_transport_stream_op_batch* op) {
914   INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
915   inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
916   gpr_mu* mu = &s->t->mu->mu;  // save aside in case s gets closed
917   gpr_mu_lock(mu);
918 
919   if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) {
920     if (op->send_initial_metadata) {
921       log_metadata(op->payload->send_initial_metadata.send_initial_metadata,
922                    s->t->is_client, true);
923     }
924     if (op->send_trailing_metadata) {
925       log_metadata(op->payload->send_trailing_metadata.send_trailing_metadata,
926                    s->t->is_client, false);
927     }
928   }
929   grpc_error_handle error;
930   grpc_closure* on_complete = op->on_complete;
931   // TODO(roth): This is a hack needed because we use data inside of the
932   // closure itself to do the barrier calculation (i.e., to ensure that
933   // we don't schedule the closure until all ops in the batch have been
934   // completed).  This can go away once we move to a new C++ closure API
935   // that provides the ability to create a barrier closure.
936   if (on_complete == nullptr) {
937     on_complete = op->on_complete =
938         grpc_core::NewClosure([](grpc_error_handle) {});
939   }
940 
941   if (op->cancel_stream) {
942     // Call cancel_stream_locked without ref'ing the cancel_error because
943     // this function is responsible to make sure that that field gets unref'ed
944     cancel_stream_locked(s, op->payload->cancel_stream.cancel_error);
945     // this op can complete without an error
946   } else if (!s->cancel_self_error.ok()) {
947     // already self-canceled so still give it an error
948     error = s->cancel_self_error;
949   } else {
950     INPROC_LOG(GPR_INFO, "perform_stream_op %p %s%s%s%s%s%s%s", s,
951                s->t->is_client ? "client" : "server",
952                op->send_initial_metadata ? " send_initial_metadata" : "",
953                op->send_message ? " send_message" : "",
954                op->send_trailing_metadata ? " send_trailing_metadata" : "",
955                op->recv_initial_metadata ? " recv_initial_metadata" : "",
956                op->recv_message ? " recv_message" : "",
957                op->recv_trailing_metadata ? " recv_trailing_metadata" : "");
958   }
959 
960   inproc_stream* other = s->other_side;
961   if (error.ok() && (op->send_initial_metadata || op->send_trailing_metadata)) {
962     if (s->t->is_closed) {
963       error = GRPC_ERROR_CREATE("Endpoint already shutdown");
964     }
965     if (error.ok() && op->send_initial_metadata) {
966       grpc_metadata_batch* dest = (other == nullptr)
967                                       ? &s->write_buffer_initial_md
968                                       : &other->to_read_initial_md;
969       bool* destfilled = (other == nullptr) ? &s->write_buffer_initial_md_filled
970                                             : &other->to_read_initial_md_filled;
971       if (*destfilled || s->initial_md_sent) {
972         // The buffer is already in use; that's an error!
973         INPROC_LOG(GPR_INFO, "Extra initial metadata %p", s);
974         error = GRPC_ERROR_CREATE("Extra initial metadata");
975       } else {
976         if (!s->other_side_closed) {
977           fill_in_metadata(
978               s, op->payload->send_initial_metadata.send_initial_metadata, dest,
979               destfilled);
980         }
981         if (s->t->is_client) {
982           grpc_core::Timestamp* dl =
983               (other == nullptr) ? &s->write_buffer_deadline : &other->deadline;
984           *dl = std::min(
985               *dl, op->payload->send_initial_metadata.send_initial_metadata
986                        ->get(grpc_core::GrpcTimeoutMetadata())
987                        .value_or(grpc_core::Timestamp::InfFuture()));
988           s->initial_md_sent = true;
989         }
990       }
991       maybe_process_ops_locked(other, error);
992     }
993   }
994 
995   if (error.ok() && (op->send_message || op->send_trailing_metadata ||
996                      op->recv_initial_metadata || op->recv_message ||
997                      op->recv_trailing_metadata)) {
998     // Mark ops that need to be processed by the state machine
999     if (op->send_message) {
1000       s->send_message_op = op;
1001     }
1002     if (op->send_trailing_metadata) {
1003       s->send_trailing_md_op = op;
1004     }
1005     if (op->recv_initial_metadata) {
1006       s->recv_initial_md_op = op;
1007     }
1008     if (op->recv_message) {
1009       s->recv_message_op = op;
1010     }
1011     if (op->recv_trailing_metadata) {
1012       s->recv_trailing_md_op = op;
1013     }
1014 
1015     // We want to initiate the state machine if:
1016     // 1. We want to send a message and the other side wants to receive
1017     // 2. We want to send trailing metadata and there isn't an unmatched send
1018     //    or the other side wants trailing metadata
1019     // 3. We want initial metadata and the other side has sent it
1020     // 4. We want to receive a message and there is a message ready
1021     // 5. There is trailing metadata, even if nothing specifically wants
1022     //    that because that can shut down the receive message as well
1023     if ((op->send_message && other && other->recv_message_op != nullptr) ||
1024         (op->send_trailing_metadata &&
1025          (!s->send_message_op || (other && other->recv_trailing_md_op))) ||
1026         (op->recv_initial_metadata && s->to_read_initial_md_filled) ||
1027         (op->recv_message && other && other->send_message_op != nullptr) ||
1028         (s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
1029       op_state_machine_locked(s, error);
1030     } else {
1031       s->ops_needed = true;
1032     }
1033   } else {
1034     if (!error.ok()) {
1035       // Consume any send message that was sent here but that we are not
1036       // pushing to the other side
1037       if (op->send_message) {
1038         ResetSendMessage(op);
1039       }
1040       // Schedule op's closures that we didn't push to op state machine
1041       if (op->recv_initial_metadata) {
1042         if (op->payload->recv_initial_metadata.trailing_metadata_available !=
1043             nullptr) {
1044           // Set to true unconditionally, because we're failing the call, so
1045           // even if we haven't actually seen the send_trailing_metadata op
1046           // from the other side, we're going to return trailing metadata
1047           // anyway.
1048           *op->payload->recv_initial_metadata.trailing_metadata_available =
1049               true;
1050         }
1051         INPROC_LOG(
1052             GPR_INFO,
1053             "perform_stream_op error %p scheduling initial-metadata-ready %s",
1054             s, grpc_core::StatusToString(error).c_str());
1055         grpc_core::ExecCtx::Run(
1056             DEBUG_LOCATION,
1057             op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1058             error);
1059       }
1060       if (op->recv_message) {
1061         INPROC_LOG(
1062             GPR_INFO,
1063             "perform_stream_op error %p scheduling recv message-ready %s", s,
1064             grpc_core::StatusToString(error).c_str());
1065         if (op->payload->recv_message.call_failed_before_recv_message !=
1066             nullptr) {
1067           *op->payload->recv_message.call_failed_before_recv_message = true;
1068         }
1069         grpc_core::ExecCtx::Run(DEBUG_LOCATION,
1070                                 op->payload->recv_message.recv_message_ready,
1071                                 error);
1072       }
1073       if (op->recv_trailing_metadata) {
1074         INPROC_LOG(GPR_INFO,
1075                    "perform_stream_op error %p scheduling "
1076                    "trailing-metadata-ready %s",
1077                    s, grpc_core::StatusToString(error).c_str());
1078         grpc_core::ExecCtx::Run(
1079             DEBUG_LOCATION,
1080             op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1081             error);
1082       }
1083     }
1084     INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %s", s,
1085                grpc_core::StatusToString(error).c_str());
1086     grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_complete, error);
1087   }
1088   gpr_mu_unlock(mu);
1089 }
1090 
close_transport_locked(inproc_transport * t)1091 void close_transport_locked(inproc_transport* t) {
1092   INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed);
1093   t->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, absl::Status(),
1094                             "close transport");
1095   if (!t->is_closed) {
1096     t->is_closed = true;
1097     // Also end all streams on this transport
1098     while (t->stream_list != nullptr) {
1099       // cancel_stream_locked also adjusts stream list
1100       cancel_stream_locked(
1101           t->stream_list,
1102           grpc_error_set_int(GRPC_ERROR_CREATE("Transport closed"),
1103                              grpc_core::StatusIntProperty::kRpcStatus,
1104                              GRPC_STATUS_UNAVAILABLE));
1105     }
1106   }
1107 }
1108 
perform_transport_op(grpc_transport * gt,grpc_transport_op * op)1109 void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
1110   inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1111   INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", t, op);
1112   gpr_mu_lock(&t->mu->mu);
1113   if (op->start_connectivity_watch != nullptr) {
1114     t->state_tracker.AddWatcher(op->start_connectivity_watch_state,
1115                                 std::move(op->start_connectivity_watch));
1116   }
1117   if (op->stop_connectivity_watch != nullptr) {
1118     t->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
1119   }
1120   if (op->set_accept_stream) {
1121     t->accept_stream_cb = op->set_accept_stream_fn;
1122     t->accept_stream_data = op->set_accept_stream_user_data;
1123   }
1124   if (op->on_consumed) {
1125     grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
1126   }
1127 
1128   bool do_close = false;
1129   if (!op->goaway_error.ok()) {
1130     do_close = true;
1131   }
1132   if (!op->disconnect_with_error.ok()) {
1133     do_close = true;
1134   }
1135 
1136   if (do_close) {
1137     close_transport_locked(t);
1138   }
1139   gpr_mu_unlock(&t->mu->mu);
1140 }
1141 
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)1142 void destroy_stream(grpc_transport* gt, grpc_stream* gs,
1143                     grpc_closure* then_schedule_closure) {
1144   INPROC_LOG(GPR_INFO, "destroy_stream %p %p", gs, then_schedule_closure);
1145   inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1146   inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
1147   gpr_mu_lock(&t->mu->mu);
1148   close_stream_locked(s);
1149   gpr_mu_unlock(&t->mu->mu);
1150   s->~inproc_stream();
1151   grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
1152                           absl::OkStatus());
1153 }
1154 
destroy_transport(grpc_transport * gt)1155 void destroy_transport(grpc_transport* gt) {
1156   inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1157   INPROC_LOG(GPR_INFO, "destroy_transport %p", t);
1158   gpr_mu_lock(&t->mu->mu);
1159   close_transport_locked(t);
1160   gpr_mu_unlock(&t->mu->mu);
1161   t->other_side->unref();
1162   t->unref();
1163 }
1164 
1165 //******************************************************************************
1166 // INTEGRATION GLUE
1167 //
1168 
set_pollset(grpc_transport *,grpc_stream *,grpc_pollset *)1169 void set_pollset(grpc_transport* /*gt*/, grpc_stream* /*gs*/,
1170                  grpc_pollset* /*pollset*/) {
1171   // Nothing to do here
1172 }
1173 
set_pollset_set(grpc_transport *,grpc_stream *,grpc_pollset_set *)1174 void set_pollset_set(grpc_transport* /*gt*/, grpc_stream* /*gs*/,
1175                      grpc_pollset_set* /*pollset_set*/) {
1176   // Nothing to do here
1177 }
1178 
get_endpoint(grpc_transport *)1179 grpc_endpoint* get_endpoint(grpc_transport* /*t*/) { return nullptr; }
1180 
1181 const grpc_transport_vtable inproc_vtable = {sizeof(inproc_stream),
1182                                              true,
1183                                              "inproc",
1184                                              init_stream,
1185                                              nullptr,
1186                                              set_pollset,
1187                                              set_pollset_set,
1188                                              perform_stream_op,
1189                                              perform_transport_op,
1190                                              destroy_stream,
1191                                              destroy_transport,
1192                                              get_endpoint};
1193 
1194 //******************************************************************************
1195 // Main inproc transport functions
1196 //
inproc_transports_create(grpc_transport ** server_transport,grpc_transport ** client_transport)1197 void inproc_transports_create(grpc_transport** server_transport,
1198                               grpc_transport** client_transport) {
1199   INPROC_LOG(GPR_INFO, "inproc_transports_create");
1200   shared_mu* mu = new (gpr_malloc(sizeof(*mu))) shared_mu();
1201   inproc_transport* st = new (gpr_malloc(sizeof(*st)))
1202       inproc_transport(&inproc_vtable, mu, /*is_client=*/false);
1203   inproc_transport* ct = new (gpr_malloc(sizeof(*ct)))
1204       inproc_transport(&inproc_vtable, mu, /*is_client=*/true);
1205   st->other_side = ct;
1206   ct->other_side = st;
1207   *server_transport = reinterpret_cast<grpc_transport*>(st);
1208   *client_transport = reinterpret_cast<grpc_transport*>(ct);
1209 }
1210 }  // namespace
1211 
grpc_inproc_channel_create(grpc_server * server,const grpc_channel_args * args,void *)1212 grpc_channel* grpc_inproc_channel_create(grpc_server* server,
1213                                          const grpc_channel_args* args,
1214                                          void* /*reserved*/) {
1215   GRPC_API_TRACE("grpc_inproc_channel_create(server=%p, args=%p)", 2,
1216                  (server, args));
1217 
1218   grpc_core::ExecCtx exec_ctx;
1219 
1220   grpc_core::Server* core_server = grpc_core::Server::FromC(server);
1221   // Remove max_connection_idle and max_connection_age channel arguments since
1222   // those do not apply to inproc transports.
1223   grpc_core::ChannelArgs server_args =
1224       core_server->channel_args()
1225           .Remove(GRPC_ARG_MAX_CONNECTION_IDLE_MS)
1226           .Remove(GRPC_ARG_MAX_CONNECTION_AGE_MS);
1227 
1228   // Add a default authority channel argument for the client
1229   grpc_core::ChannelArgs client_args =
1230       grpc_core::CoreConfiguration::Get()
1231           .channel_args_preconditioning()
1232           .PreconditionChannelArgs(args)
1233           .Set(GRPC_ARG_DEFAULT_AUTHORITY, "inproc.authority");
1234   grpc_transport* server_transport;
1235   grpc_transport* client_transport;
1236   inproc_transports_create(&server_transport, &client_transport);
1237 
1238   // TODO(ncteisen): design and support channelz GetSocket for inproc.
1239   grpc_error_handle error = core_server->SetupTransport(
1240       server_transport, nullptr, server_args, nullptr);
1241   grpc_channel* channel = nullptr;
1242   if (error.ok()) {
1243     auto new_channel = grpc_core::Channel::Create(
1244         "inproc", client_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport);
1245     if (!new_channel.ok()) {
1246       GPR_ASSERT(!channel);
1247       gpr_log(GPR_ERROR, "Failed to create client channel: %s",
1248               grpc_core::StatusToString(error).c_str());
1249       intptr_t integer;
1250       grpc_status_code status = GRPC_STATUS_INTERNAL;
1251       if (grpc_error_get_int(error, grpc_core::StatusIntProperty::kRpcStatus,
1252                              &integer)) {
1253         status = static_cast<grpc_status_code>(integer);
1254       }
1255       // client_transport was destroyed when grpc_channel_create_internal saw an
1256       // error.
1257       grpc_transport_destroy(server_transport);
1258       channel = grpc_lame_client_channel_create(
1259           nullptr, status, "Failed to create client channel");
1260     } else {
1261       channel = new_channel->release()->c_ptr();
1262     }
1263   } else {
1264     GPR_ASSERT(!channel);
1265     gpr_log(GPR_ERROR, "Failed to create server channel: %s",
1266             grpc_core::StatusToString(error).c_str());
1267     intptr_t integer;
1268     grpc_status_code status = GRPC_STATUS_INTERNAL;
1269     if (grpc_error_get_int(error, grpc_core::StatusIntProperty::kRpcStatus,
1270                            &integer)) {
1271       status = static_cast<grpc_status_code>(integer);
1272     }
1273     grpc_transport_destroy(client_transport);
1274     grpc_transport_destroy(server_transport);
1275     channel = grpc_lame_client_channel_create(
1276         nullptr, status, "Failed to create server channel");
1277   }
1278 
1279   return channel;
1280 }
1281