1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/transport/cronet/transport/cronet_transport.h"
22 
23 #include <stdint.h>
24 #include <stdlib.h>
25 #include <string.h>
26 
27 #include <initializer_list>
28 #include <new>
29 #include <string>
30 #include <utility>
31 
32 #include "absl/status/status.h"
33 #include "absl/strings/match.h"
34 #include "absl/strings/str_cat.h"
35 #include "absl/strings/str_format.h"
36 #include "absl/strings/string_view.h"
37 #include "absl/types/optional.h"
38 #include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
39 
40 #include <grpc/slice.h>
41 #include <grpc/status.h>
42 #include <grpc/support/alloc.h>
43 #include <grpc/support/log.h>
44 #include <grpc/support/sync.h>
45 
46 #include "src/core/ext/transport/chttp2/transport/bin_decoder.h"
47 #include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
48 #include "src/core/ext/transport/cronet/transport/cronet_status.h"
49 #include "src/core/lib/debug/trace.h"
50 #include "src/core/lib/gprpp/crash.h"
51 #include "src/core/lib/gprpp/debug_location.h"
52 #include "src/core/lib/gprpp/status_helper.h"
53 #include "src/core/lib/iomgr/closure.h"
54 #include "src/core/lib/iomgr/endpoint.h"
55 #include "src/core/lib/iomgr/error.h"
56 #include "src/core/lib/iomgr/exec_ctx.h"
57 #include "src/core/lib/iomgr/iomgr_fwd.h"
58 #include "src/core/lib/resource_quota/arena.h"
59 #include "src/core/lib/slice/slice.h"
60 #include "src/core/lib/slice/slice_buffer.h"
61 #include "src/core/lib/surface/validate_metadata.h"
62 #include "src/core/lib/transport/metadata_batch.h"
63 #include "src/core/lib/transport/transport.h"
64 #include "src/core/lib/transport/transport_impl.h"
65 
66 // IWYU pragma: no_include <type_traits>
67 
68 #define GRPC_HEADER_SIZE_IN_BYTES 5
69 #define GRPC_FLUSH_READ_SIZE 4096
70 
71 grpc_core::TraceFlag grpc_cronet_trace(false, "cronet");
72 #define CRONET_LOG(...)                                    \
73   do {                                                     \
74     if (grpc_cronet_trace.enabled()) gpr_log(__VA_ARGS__); \
75   } while (0)
76 
77 enum e_op_result {
78   ACTION_TAKEN_WITH_CALLBACK,
79   ACTION_TAKEN_NO_CALLBACK,
80   NO_ACTION_POSSIBLE
81 };
82 
83 enum e_op_id {
84   OP_SEND_INITIAL_METADATA = 0,
85   OP_SEND_MESSAGE,
86   OP_SEND_TRAILING_METADATA,
87   OP_RECV_MESSAGE,
88   OP_RECV_INITIAL_METADATA,
89   OP_RECV_TRAILING_METADATA,
90   OP_CANCEL_ERROR,
91   OP_ON_COMPLETE,
92   OP_FAILED,
93   OP_SUCCEEDED,
94   OP_CANCELED,
95   OP_RECV_MESSAGE_AND_ON_COMPLETE,
96   OP_READ_REQ_MADE,
97   OP_NUM_OPS
98 };
99 
100 // Cronet callbacks. See cronet_c_for_grpc.h for documentation for each.
101 
102 static void on_stream_ready(bidirectional_stream*);
103 static void on_response_headers_received(
104     bidirectional_stream*, const bidirectional_stream_header_array*,
105     const char*);
106 static void on_write_completed(bidirectional_stream*, const char*);
107 static void on_read_completed(bidirectional_stream*, char*, int);
108 static void on_response_trailers_received(
109     bidirectional_stream*, const bidirectional_stream_header_array*);
110 static void on_succeeded(bidirectional_stream*);
111 static void on_failed(bidirectional_stream*, int);
112 static void on_canceled(bidirectional_stream*);
113 static bidirectional_stream_callback cronet_callbacks = {
114     on_stream_ready,
115     on_response_headers_received,
116     on_read_completed,
117     on_write_completed,
118     on_response_trailers_received,
119     on_succeeded,
120     on_failed,
121     on_canceled};
122 
123 // Cronet transport object
124 struct grpc_cronet_transport {
125   grpc_transport base;  // must be first element in this structure
126   stream_engine* engine;
127   char* host;
128   bool use_packet_coalescing;
129 };
130 typedef struct grpc_cronet_transport grpc_cronet_transport;
131 
132 // TODO (makdharma): reorder structure for memory efficiency per
133 // http://www.catb.org/esr/structure-packing/#_structure_reordering:
134 struct read_state {
read_stateread_state135   explicit read_state(grpc_core::Arena* arena)
136       : trailing_metadata(arena), initial_metadata(arena) {}
137 
138   // vars to store data coming from server
139   char* read_buffer = nullptr;
140   bool length_field_received = false;
141   int received_bytes = 0;
142   int remaining_bytes = 0;
143   int length_field = 0;
144   bool compressed = false;
145   char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES] = {};
146   char* payload_field = nullptr;
147   bool read_stream_closed = false;
148 
149   // vars for holding data destined for the application
150   grpc_core::SliceBuffer read_slice_buffer;
151 
152   // vars for trailing metadata
153   grpc_metadata_batch trailing_metadata;
154   bool trailing_metadata_valid = false;
155 
156   // vars for initial metadata
157   grpc_metadata_batch initial_metadata;
158 };
159 
160 struct write_state {
161   char* write_buffer = nullptr;
162 };
163 
164 // track state of one stream op
165 struct op_state {
op_stateop_state166   explicit op_state(grpc_core::Arena* arena) : rs(arena) {}
167 
168   bool state_op_done[OP_NUM_OPS] = {};
169   bool state_callback_received[OP_NUM_OPS] = {};
170   // A non-zero gRPC status code has been seen
171   bool fail_state = false;
172   // Transport is discarding all buffered messages
173   bool flush_read = false;
174   bool flush_cronet_when_ready = false;
175   bool pending_write_for_trailer = false;
176   bool pending_send_message = false;
177   // User requested RECV_TRAILING_METADATA
178   bool pending_recv_trailing_metadata = false;
179   cronet_net_error_code net_error = OK;
180   grpc_error_handle cancel_error;
181   // data structure for storing data coming from server
182   struct read_state rs;
183   // data structure for storing data going to the server
184   struct write_state ws;
185 };
186 
187 struct stream_obj;
188 
189 struct op_and_state {
190   op_and_state(stream_obj* s, const grpc_transport_stream_op_batch& op);
191 
192   grpc_transport_stream_op_batch op;
193   struct op_state state;
194   bool done = false;
195   struct stream_obj* s;  // Pointer back to the stream object
196   // next op_and_state in the linked list
197   struct op_and_state* next = nullptr;
198 };
199 
200 struct op_storage {
201   int num_pending_ops = 0;
202   struct op_and_state* head = nullptr;
203 };
204 
205 struct stream_obj {
206   stream_obj(grpc_transport* gt, grpc_stream* gs,
207              grpc_stream_refcount* refcount, grpc_core::Arena* arena);
208   ~stream_obj();
209 
210   grpc_core::Arena* arena;
211   struct op_and_state* oas = nullptr;
212   grpc_transport_stream_op_batch* curr_op = nullptr;
213   grpc_cronet_transport* curr_ct;
214   grpc_stream* curr_gs;
215   bidirectional_stream* cbs = nullptr;
216   bidirectional_stream_header_array header_array =
217       bidirectional_stream_header_array();  // Zero-initialize the structure.
218 
219   // Stream level state. Some state will be tracked both at stream and stream_op
220   // level
221   struct op_state state;
222 
223   // OP storage
224   struct op_storage storage;
225 
226   // Mutex to protect storage
227   gpr_mu mu;
228 
229   // Refcount object of the stream
230   grpc_stream_refcount* refcount;
231 };
232 
233 #ifndef NDEBUG
234 #define GRPC_CRONET_STREAM_REF(stream, reason) \
235   grpc_cronet_stream_ref((stream), (reason))
236 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
237   grpc_cronet_stream_unref((stream), (reason))
grpc_cronet_stream_ref(stream_obj * s,const char * reason)238 void grpc_cronet_stream_ref(stream_obj* s, const char* reason) {
239   grpc_stream_ref(s->refcount, reason);
240 }
grpc_cronet_stream_unref(stream_obj * s,const char * reason)241 void grpc_cronet_stream_unref(stream_obj* s, const char* reason) {
242   grpc_stream_unref(s->refcount, reason);
243 }
244 #else
245 #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
246 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
247   grpc_cronet_stream_unref((stream))
grpc_cronet_stream_ref(stream_obj * s)248 void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); }
grpc_cronet_stream_unref(stream_obj * s)249 void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); }
250 #endif
251 
252 static enum e_op_result execute_stream_op(struct op_and_state* oas);
253 
254 //
255 // Utility function to translate enum into string for printing
256 //
op_result_string(enum e_op_result i)257 static const char* op_result_string(enum e_op_result i) {
258   switch (i) {
259     case ACTION_TAKEN_WITH_CALLBACK:
260       return "ACTION_TAKEN_WITH_CALLBACK";
261     case ACTION_TAKEN_NO_CALLBACK:
262       return "ACTION_TAKEN_NO_CALLBACK";
263     case NO_ACTION_POSSIBLE:
264       return "NO_ACTION_POSSIBLE";
265   }
266   GPR_UNREACHABLE_CODE(return "UNKNOWN");
267 }
268 
op_id_string(enum e_op_id i)269 static const char* op_id_string(enum e_op_id i) {
270   switch (i) {
271     case OP_SEND_INITIAL_METADATA:
272       return "OP_SEND_INITIAL_METADATA";
273     case OP_SEND_MESSAGE:
274       return "OP_SEND_MESSAGE";
275     case OP_SEND_TRAILING_METADATA:
276       return "OP_SEND_TRAILING_METADATA";
277     case OP_RECV_MESSAGE:
278       return "OP_RECV_MESSAGE";
279     case OP_RECV_INITIAL_METADATA:
280       return "OP_RECV_INITIAL_METADATA";
281     case OP_RECV_TRAILING_METADATA:
282       return "OP_RECV_TRAILING_METADATA";
283     case OP_CANCEL_ERROR:
284       return "OP_CANCEL_ERROR";
285     case OP_ON_COMPLETE:
286       return "OP_ON_COMPLETE";
287     case OP_FAILED:
288       return "OP_FAILED";
289     case OP_SUCCEEDED:
290       return "OP_SUCCEEDED";
291     case OP_CANCELED:
292       return "OP_CANCELED";
293     case OP_RECV_MESSAGE_AND_ON_COMPLETE:
294       return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
295     case OP_READ_REQ_MADE:
296       return "OP_READ_REQ_MADE";
297     case OP_NUM_OPS:
298       return "OP_NUM_OPS";
299   }
300   return "UNKNOWN";
301 }
302 
null_and_maybe_free_read_buffer(stream_obj * s)303 static void null_and_maybe_free_read_buffer(stream_obj* s) {
304   if (s->state.rs.read_buffer &&
305       s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
306     gpr_free(s->state.rs.read_buffer);
307   }
308   s->state.rs.read_buffer = nullptr;
309 }
310 
read_grpc_header(stream_obj * s)311 static void read_grpc_header(stream_obj* s) {
312   s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
313   s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
314   s->state.rs.received_bytes = 0;
315   s->state.rs.compressed = false;
316   CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
317   bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
318                             s->state.rs.remaining_bytes);
319 }
320 
make_error_with_desc(int error_code,int cronet_internal_error_code,const char * desc)321 static grpc_error_handle make_error_with_desc(int error_code,
322                                               int cronet_internal_error_code,
323                                               const char* desc) {
324   return grpc_error_set_int(GRPC_ERROR_CREATE(absl::StrFormat(
325                                 "Cronet error code:%d, Cronet error detail:%s",
326                                 cronet_internal_error_code, desc)),
327                             grpc_core::StatusIntProperty::kRpcStatus,
328                             error_code);
329 }
330 
op_and_state(stream_obj * s,const grpc_transport_stream_op_batch & op)331 inline op_and_state::op_and_state(stream_obj* s,
332                                   const grpc_transport_stream_op_batch& op)
333     : op(op), state(s->arena), s(s) {}
334 
335 //
336 // Add a new stream op to op storage.
337 //
add_to_storage(struct stream_obj * s,grpc_transport_stream_op_batch * op)338 static void add_to_storage(struct stream_obj* s,
339                            grpc_transport_stream_op_batch* op) {
340   struct op_storage* storage = &s->storage;
341   // add new op at the beginning of the linked list. The memory is freed
342   // in remove_from_storage
343   op_and_state* new_op = new op_and_state(s, *op);
344   gpr_mu_lock(&s->mu);
345   new_op->next = storage->head;
346   storage->head = new_op;
347   storage->num_pending_ops++;
348   if (op->send_message) {
349     s->state.pending_send_message = true;
350   }
351   if (op->recv_trailing_metadata) {
352     s->state.pending_recv_trailing_metadata = true;
353   }
354   CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
355              storage->num_pending_ops);
356   gpr_mu_unlock(&s->mu);
357 }
358 
359 //
360 // Traverse the linked list and delete op and free memory
361 //
remove_from_storage(struct stream_obj * s,struct op_and_state * oas)362 static void remove_from_storage(struct stream_obj* s,
363                                 struct op_and_state* oas) {
364   struct op_and_state* curr;
365   if (s->storage.head == nullptr || oas == nullptr) {
366     return;
367   }
368   if (s->storage.head == oas) {
369     s->storage.head = oas->next;
370     delete oas;
371     s->storage.num_pending_ops--;
372     CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
373                s->storage.num_pending_ops);
374   } else {
375     for (curr = s->storage.head; curr != nullptr; curr = curr->next) {
376       if (curr->next == oas) {
377         curr->next = oas->next;
378         s->storage.num_pending_ops--;
379         CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
380                    s->storage.num_pending_ops);
381         delete oas;
382         break;
383       } else if (GPR_UNLIKELY(curr->next == nullptr)) {
384         CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
385       }
386     }
387   }
388 }
389 
390 //
391 // Cycle through ops and try to take next action. Break when either
392 // an action with callback is taken, or no action is possible.
393 // This can get executed from the Cronet network thread via cronet callback
394 // or on the application supplied thread via the perform_stream_op function.
395 //
execute_from_storage(stream_obj * s)396 static void execute_from_storage(stream_obj* s) {
397   gpr_mu_lock(&s->mu);
398   for (struct op_and_state* curr = s->storage.head; curr != nullptr;) {
399     CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
400     GPR_ASSERT(!curr->done);
401     enum e_op_result result = execute_stream_op(curr);
402     CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
403                op_result_string(result));
404     // if this op is done, then remove it and free memory
405     if (curr->done) {
406       struct op_and_state* next = curr->next;
407       remove_from_storage(s, curr);
408       curr = next;
409     } else if (result == NO_ACTION_POSSIBLE) {
410       curr = curr->next;
411     } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
412       // wait for the callback
413       break;
414     }  // continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK
415   }
416   gpr_mu_unlock(&s->mu);
417 }
418 
convert_cronet_array_to_metadata(const bidirectional_stream_header_array * header_array,grpc_metadata_batch * mds)419 static void convert_cronet_array_to_metadata(
420     const bidirectional_stream_header_array* header_array,
421     grpc_metadata_batch* mds) {
422   for (size_t i = 0; i < header_array->count; i++) {
423     CRONET_LOG(GPR_DEBUG, "header key=%s, value=%s",
424                header_array->headers[i].key, header_array->headers[i].value);
425     grpc_slice value;
426     if (absl::EndsWith(header_array->headers[i].key, "-bin")) {
427       value = grpc_slice_from_static_string(header_array->headers[i].value);
428       value = grpc_chttp2_base64_decode_with_length(
429           value, grpc_chttp2_base64_infer_length_after_decode(value));
430     } else {
431       value = grpc_slice_from_static_string(header_array->headers[i].value);
432     }
433     mds->Append(header_array->headers[i].key, grpc_core::Slice(value),
434                 [&](absl::string_view error, const grpc_core::Slice& value) {
435                   gpr_log(GPR_DEBUG, "Failed to parse metadata: %s",
436                           absl::StrCat("key=", header_array->headers[i].key,
437                                        " error=", error,
438                                        " value=", value.as_string_view())
439                               .c_str());
440                 });
441   }
442 }
443 
444 //
445 // Cronet callback
446 //
on_failed(bidirectional_stream * stream,int net_error)447 static void on_failed(bidirectional_stream* stream, int net_error) {
448   gpr_log(GPR_ERROR, "on_failed(%p, %d)", stream, net_error);
449   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
450   grpc_core::ExecCtx exec_ctx;
451 
452   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
453   gpr_mu_lock(&s->mu);
454   bidirectional_stream_destroy(s->cbs);
455   s->state.state_callback_received[OP_FAILED] = true;
456   s->state.net_error = static_cast<cronet_net_error_code>(net_error);
457   s->cbs = nullptr;
458   if (s->header_array.headers) {
459     gpr_free(s->header_array.headers);
460     s->header_array.headers = nullptr;
461   }
462   if (s->state.ws.write_buffer) {
463     gpr_free(s->state.ws.write_buffer);
464     s->state.ws.write_buffer = nullptr;
465   }
466   null_and_maybe_free_read_buffer(s);
467   gpr_mu_unlock(&s->mu);
468   execute_from_storage(s);
469   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
470 }
471 
472 //
473 // Cronet callback
474 //
on_canceled(bidirectional_stream * stream)475 static void on_canceled(bidirectional_stream* stream) {
476   CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
477   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
478   grpc_core::ExecCtx exec_ctx;
479 
480   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
481   gpr_mu_lock(&s->mu);
482   bidirectional_stream_destroy(s->cbs);
483   s->state.state_callback_received[OP_CANCELED] = true;
484   s->cbs = nullptr;
485   if (s->header_array.headers) {
486     gpr_free(s->header_array.headers);
487     s->header_array.headers = nullptr;
488   }
489   if (s->state.ws.write_buffer) {
490     gpr_free(s->state.ws.write_buffer);
491     s->state.ws.write_buffer = nullptr;
492   }
493   null_and_maybe_free_read_buffer(s);
494   gpr_mu_unlock(&s->mu);
495   execute_from_storage(s);
496   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
497 }
498 
499 //
500 // Cronet callback
501 //
on_succeeded(bidirectional_stream * stream)502 static void on_succeeded(bidirectional_stream* stream) {
503   CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
504   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
505   grpc_core::ExecCtx exec_ctx;
506 
507   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
508   gpr_mu_lock(&s->mu);
509   bidirectional_stream_destroy(s->cbs);
510   s->state.state_callback_received[OP_SUCCEEDED] = true;
511   s->cbs = nullptr;
512   null_and_maybe_free_read_buffer(s);
513   gpr_mu_unlock(&s->mu);
514   execute_from_storage(s);
515   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
516 }
517 
518 //
519 // Cronet callback
520 //
on_stream_ready(bidirectional_stream * stream)521 static void on_stream_ready(bidirectional_stream* stream) {
522   CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
523   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
524   grpc_core::ExecCtx exec_ctx;
525   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
526   grpc_cronet_transport* t = s->curr_ct;
527   gpr_mu_lock(&s->mu);
528   s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
529   s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
530   // Free the memory allocated for headers
531   if (s->header_array.headers) {
532     gpr_free(s->header_array.headers);
533     s->header_array.headers = nullptr;
534   }
535   // Send the initial metadata on wire if there is no SEND_MESSAGE or
536   // SEND_TRAILING_METADATA ops pending
537   if (t->use_packet_coalescing) {
538     if (s->state.flush_cronet_when_ready) {
539       CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
540       bidirectional_stream_flush(stream);
541     }
542   }
543   gpr_mu_unlock(&s->mu);
544   execute_from_storage(s);
545 }
546 
547 //
548 // Cronet callback
549 //
on_response_headers_received(bidirectional_stream * stream,const bidirectional_stream_header_array * headers,const char * negotiated_protocol)550 static void on_response_headers_received(
551     bidirectional_stream* stream,
552     const bidirectional_stream_header_array* headers,
553     const char* negotiated_protocol) {
554   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
555   grpc_core::ExecCtx exec_ctx;
556   CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
557              headers, negotiated_protocol);
558   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
559 
560   // Identify if this is a header or a trailer (in a trailer-only response case)
561   //
562   for (size_t i = 0; i < headers->count; i++) {
563     if (0 == strcmp("grpc-status", headers->headers[i].key)) {
564       on_response_trailers_received(stream, headers);
565 
566       // Do an extra read for a trailer-only stream to trigger on_succeeded()
567       // callback
568       read_grpc_header(s);
569       return;
570     }
571   }
572 
573   gpr_mu_lock(&s->mu);
574   convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata);
575   s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
576   if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
577         s->state.state_callback_received[OP_FAILED])) {
578     // Do an extra read to trigger on_succeeded() callback in case connection
579     // is closed
580     GPR_ASSERT(s->state.rs.length_field_received == false);
581     read_grpc_header(s);
582   }
583   gpr_mu_unlock(&s->mu);
584   execute_from_storage(s);
585 }
586 
587 //
588 // Cronet callback
589 //
on_write_completed(bidirectional_stream * stream,const char * data)590 static void on_write_completed(bidirectional_stream* stream, const char* data) {
591   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
592   grpc_core::ExecCtx exec_ctx;
593   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
594   CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
595   gpr_mu_lock(&s->mu);
596   if (s->state.ws.write_buffer) {
597     gpr_free(s->state.ws.write_buffer);
598     s->state.ws.write_buffer = nullptr;
599   }
600   s->state.state_callback_received[OP_SEND_MESSAGE] = true;
601   gpr_mu_unlock(&s->mu);
602   execute_from_storage(s);
603 }
604 
605 //
606 // Cronet callback
607 //
on_read_completed(bidirectional_stream * stream,char * data,int count)608 static void on_read_completed(bidirectional_stream* stream, char* data,
609                               int count) {
610   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
611   grpc_core::ExecCtx exec_ctx;
612   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
613   CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
614              count);
615   gpr_mu_lock(&s->mu);
616   s->state.state_callback_received[OP_RECV_MESSAGE] = true;
617   if (count > 0 && s->state.flush_read) {
618     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
619     bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
620                               GRPC_FLUSH_READ_SIZE);
621     gpr_mu_unlock(&s->mu);
622   } else if (count > 0) {
623     s->state.rs.received_bytes += count;
624     s->state.rs.remaining_bytes -= count;
625     if (s->state.rs.remaining_bytes > 0) {
626       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
627       s->state.state_op_done[OP_READ_REQ_MADE] = true;
628       bidirectional_stream_read(
629           s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
630           s->state.rs.remaining_bytes);
631       gpr_mu_unlock(&s->mu);
632     } else {
633       gpr_mu_unlock(&s->mu);
634       execute_from_storage(s);
635     }
636   } else {
637     null_and_maybe_free_read_buffer(s);
638     s->state.rs.read_stream_closed = true;
639     gpr_mu_unlock(&s->mu);
640     execute_from_storage(s);
641   }
642 }
643 
644 //
645 // Cronet callback
646 //
on_response_trailers_received(bidirectional_stream * stream,const bidirectional_stream_header_array * trailers)647 static void on_response_trailers_received(
648     bidirectional_stream* stream,
649     const bidirectional_stream_header_array* trailers) {
650   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
651   grpc_core::ExecCtx exec_ctx;
652   CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
653              trailers);
654   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
655   grpc_cronet_transport* t = s->curr_ct;
656   gpr_mu_lock(&s->mu);
657   s->state.rs.trailing_metadata_valid = false;
658   convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata);
659   if (trailers->count > 0) {
660     s->state.rs.trailing_metadata_valid = true;
661   }
662   s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
663   // Send a EOS when server terminates the stream (testServerFinishesRequest) to
664   // trigger on_succeeded
665   if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] &&
666       !(s->state.state_op_done[OP_CANCEL_ERROR] ||
667         s->state.state_callback_received[OP_FAILED])) {
668     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
669     s->state.state_callback_received[OP_SEND_MESSAGE] = false;
670     bidirectional_stream_write(s->cbs, "", 0, true);
671     if (t->use_packet_coalescing) {
672       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
673       bidirectional_stream_flush(s->cbs);
674     }
675     s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
676 
677     gpr_mu_unlock(&s->mu);
678   } else {
679     gpr_mu_unlock(&s->mu);
680     execute_from_storage(s);
681   }
682 }
683 
684 //
685 // Utility function that takes the data from s->write_slice_buffer and assembles
686 // into a contiguous byte stream with 5 byte gRPC header prepended.
687 //
create_grpc_frame(grpc_slice_buffer * write_slice_buffer,char ** pp_write_buffer,size_t * p_write_buffer_size,uint32_t flags)688 static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer,
689                               char** pp_write_buffer,
690                               size_t* p_write_buffer_size, uint32_t flags) {
691   size_t length = write_slice_buffer->length;
692   *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
693   // This is freed in the on_write_completed callback
694   char* write_buffer =
695       static_cast<char*>(gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES));
696   *pp_write_buffer = write_buffer;
697   uint8_t* p = reinterpret_cast<uint8_t*>(write_buffer);
698   // Append 5 byte header
699   // Compressed flag
700   *p++ = static_cast<uint8_t>((flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0);
701   // Message length
702   *p++ = static_cast<uint8_t>(length >> 24);
703   *p++ = static_cast<uint8_t>(length >> 16);
704   *p++ = static_cast<uint8_t>(length >> 8);
705   *p++ = static_cast<uint8_t>(length);
706   // append actual data
707   size_t offset = 0;
708   for (size_t i = 0; i < write_slice_buffer->count; ++i) {
709     memcpy(p + offset, GRPC_SLICE_START_PTR(write_slice_buffer->slices[i]),
710            GRPC_SLICE_LENGTH(write_slice_buffer->slices[i]));
711     offset += GRPC_SLICE_LENGTH(write_slice_buffer->slices[i]);
712   }
713 }
714 
715 namespace {
716 class CronetMetadataEncoder {
717  public:
CronetMetadataEncoder(bidirectional_stream_header ** pp_headers,size_t * p_count,const char * host,size_t capacity,const char ** method,std::string * url)718   explicit CronetMetadataEncoder(bidirectional_stream_header** pp_headers,
719                                  size_t* p_count, const char* host,
720                                  size_t capacity, const char** method,
721                                  std::string* url)
722       : host_(host),
723         capacity_(capacity),
724         count_(*p_count),
725         headers_(*pp_headers),
726         method_(method),
727         url_(url) {
728     count_ = 0;
729     headers_ = static_cast<bidirectional_stream_header*>(
730         gpr_malloc(sizeof(bidirectional_stream_header) * capacity_));
731   }
732 
733   CronetMetadataEncoder(const CronetMetadataEncoder&) = delete;
734   CronetMetadataEncoder& operator=(const CronetMetadataEncoder&) = delete;
735 
736   template <class T, class V>
Encode(T,const V & value)737   void Encode(T, const V& value) {
738     Encode(grpc_core::Slice::FromStaticString(T::key()),
739            grpc_core::Slice(T::Encode(value)));
740   }
741 
Encode(grpc_core::HttpSchemeMetadata,grpc_core::HttpSchemeMetadata::ValueType)742   void Encode(grpc_core::HttpSchemeMetadata,
743               grpc_core::HttpSchemeMetadata::ValueType) {
744     // Cronet populates these fields on its own
745   }
Encode(grpc_core::HttpAuthorityMetadata,const grpc_core::HttpAuthorityMetadata::ValueType &)746   void Encode(grpc_core::HttpAuthorityMetadata,
747               const grpc_core::HttpAuthorityMetadata::ValueType&) {
748     // Cronet populates these fields on its own
749   }
750 
Encode(grpc_core::HttpMethodMetadata,grpc_core::HttpMethodMetadata::ValueType method)751   void Encode(grpc_core::HttpMethodMetadata,
752               grpc_core::HttpMethodMetadata::ValueType method) {
753     switch (method) {
754       case grpc_core::HttpMethodMetadata::kPost:
755         *method_ = "POST";
756         break;
757       case grpc_core::HttpMethodMetadata::kInvalid:
758       case grpc_core::HttpMethodMetadata::kGet:
759       case grpc_core::HttpMethodMetadata::kPut:
760         abort();
761     }
762   }
763 
Encode(grpc_core::HttpPathMetadata,const grpc_core::HttpPathMetadata::ValueType & path)764   void Encode(grpc_core::HttpPathMetadata,
765               const grpc_core::HttpPathMetadata::ValueType& path) {
766     // Create URL by appending :path value to the hostname
767     *url_ = absl::StrCat("https://", host_, path.as_string_view());
768   }
769 
Encode(const grpc_core::Slice & key_slice,const grpc_core::Slice & value_slice)770   void Encode(const grpc_core::Slice& key_slice,
771               const grpc_core::Slice& value_slice) {
772     char* key = grpc_slice_to_c_string(key_slice.c_slice());
773     char* value;
774     if (grpc_is_binary_header_internal(key_slice.c_slice())) {
775       grpc_slice wire_value = grpc_chttp2_base64_encode(value_slice.c_slice());
776       value = grpc_slice_to_c_string(wire_value);
777       grpc_core::CSliceUnref(wire_value);
778     } else {
779       value = grpc_slice_to_c_string(value_slice.c_slice());
780     }
781     CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
782     GPR_ASSERT(count_ < capacity_);
783     headers_[count_].key = key;
784     headers_[count_].value = value;
785     ++count_;
786   }
787 
788  private:
789   const char* host_;
790   size_t capacity_;
791   size_t& count_;
792   bidirectional_stream_header*& headers_;
793   const char** method_;
794   std::string* url_;
795 };
796 }  // namespace
797 
798 //
799 // Convert metadata in a format that Cronet can consume
800 //
convert_metadata_to_cronet_headers(grpc_metadata_batch * metadata,const char * host,std::string * pp_url,bidirectional_stream_header ** pp_headers,size_t * p_num_headers,const char ** method)801 static void convert_metadata_to_cronet_headers(
802     grpc_metadata_batch* metadata, const char* host, std::string* pp_url,
803     bidirectional_stream_header** pp_headers, size_t* p_num_headers,
804     const char** method) {
805   CronetMetadataEncoder encoder(pp_headers, p_num_headers, host,
806                                 metadata->count(), method, pp_url);
807   metadata->Encode(&encoder);
808 }
809 
parse_grpc_header(const uint8_t * data,int * length,bool * compressed)810 static void parse_grpc_header(const uint8_t* data, int* length,
811                               bool* compressed) {
812   const uint8_t c = *data;
813   const uint8_t* p = data + 1;
814   *compressed = ((c & 0x01) == 0x01);
815   *length = 0;
816   *length |= (*p++) << 24;
817   *length |= (*p++) << 16;
818   *length |= (*p++) << 8;
819   *length |= (*p++);
820 }
821 
header_has_authority(const grpc_metadata_batch * b)822 static bool header_has_authority(const grpc_metadata_batch* b) {
823   return b->get_pointer(grpc_core::HttpAuthorityMetadata()) != nullptr;
824 }
825 
826 //
827 // Op Execution: Decide if one of the actions contained in the stream op can be
828 // executed. This is the heart of the state machine.
829 //
op_can_be_run(grpc_transport_stream_op_batch * curr_op,struct stream_obj * s,struct op_state * op_state,enum e_op_id op_id)830 static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
831                           struct stream_obj* s, struct op_state* op_state,
832                           enum e_op_id op_id) {
833   struct op_state* stream_state = &s->state;
834   grpc_cronet_transport* t = s->curr_ct;
835   bool result = true;
836   // When call is canceled, every op can be run, except under following
837   // conditions
838   //
839   bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
840                                stream_state->state_callback_received[OP_FAILED];
841   if (is_canceled_or_failed) {
842     if (op_id == OP_SEND_INITIAL_METADATA) {
843       CRONET_LOG(GPR_DEBUG, "Because");
844       result = false;
845     }
846     if (op_id == OP_SEND_MESSAGE) {
847       CRONET_LOG(GPR_DEBUG, "Because");
848       result = false;
849     }
850     if (op_id == OP_SEND_TRAILING_METADATA) {
851       CRONET_LOG(GPR_DEBUG, "Because");
852       result = false;
853     }
854     if (op_id == OP_CANCEL_ERROR) {
855       CRONET_LOG(GPR_DEBUG, "Because");
856       result = false;
857     }
858     // already executed
859     if (op_id == OP_RECV_INITIAL_METADATA &&
860         stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
861       CRONET_LOG(GPR_DEBUG, "Because");
862       result = false;
863     }
864     if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) {
865       CRONET_LOG(GPR_DEBUG, "Because");
866       result = false;
867     }
868     if (op_id == OP_RECV_TRAILING_METADATA &&
869         stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
870       CRONET_LOG(GPR_DEBUG, "Because");
871       result = false;
872     }
873     // ON_COMPLETE can be processed if one of the following conditions is met:
874     // 1. the stream failed
875     // 2. the stream is cancelled, and the callback is received
876     // 3. the stream succeeded before cancel is effective
877     // 4. the stream is cancelled, and the stream is never started
878     if (op_id == OP_ON_COMPLETE &&
879         !(stream_state->state_callback_received[OP_FAILED] ||
880           stream_state->state_callback_received[OP_CANCELED] ||
881           stream_state->state_callback_received[OP_SUCCEEDED] ||
882           !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
883       CRONET_LOG(GPR_DEBUG, "Because");
884       result = false;
885     }
886   } else if (op_id == OP_SEND_INITIAL_METADATA) {
887     // already executed
888     if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
889   } else if (op_id == OP_RECV_INITIAL_METADATA) {
890     if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
891       // already executed
892       result = false;
893     } else if (!stream_state
894                     ->state_callback_received[OP_SEND_INITIAL_METADATA]) {
895       // we haven't sent headers yet.
896       result = false;
897     } else if (!stream_state
898                     ->state_callback_received[OP_RECV_INITIAL_METADATA] &&
899                !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
900       // we haven't received headers yet.
901       result = false;
902     }
903   } else if (op_id == OP_SEND_MESSAGE) {
904     if (op_state->state_op_done[OP_SEND_MESSAGE]) {
905       // already executed (note we're checking op specific state, not stream
906       // state)
907       result = false;
908     } else if (!stream_state
909                     ->state_callback_received[OP_SEND_INITIAL_METADATA]) {
910       // we haven't sent headers yet.
911       result = false;
912     }
913   } else if (op_id == OP_RECV_MESSAGE) {
914     if (op_state->state_op_done[OP_RECV_MESSAGE]) {
915       // already executed
916       result = false;
917     } else if (!stream_state
918                     ->state_callback_received[OP_RECV_INITIAL_METADATA] &&
919                !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
920       // we haven't received headers yet.
921       result = false;
922     }
923   } else if (op_id == OP_RECV_TRAILING_METADATA) {
924     if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
925       // already executed
926       result = false;
927     } else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
928                !stream_state->state_op_done[OP_RECV_MESSAGE]) {
929       // we have asked for but haven't received message yet.
930       result = false;
931     } else if (!stream_state
932                     ->state_callback_received[OP_RECV_TRAILING_METADATA]) {
933       // we haven't received trailers  yet.
934       result = false;
935     } else if (!stream_state->state_callback_received[OP_SUCCEEDED]) {
936       // we haven't received on_succeeded  yet.
937       result = false;
938     }
939   } else if (op_id == OP_SEND_TRAILING_METADATA) {
940     if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
941       // already executed
942       result = false;
943     } else if (!stream_state
944                     ->state_callback_received[OP_SEND_INITIAL_METADATA]) {
945       // we haven't sent initial metadata yet
946       result = false;
947     } else if (stream_state->pending_send_message &&
948                !stream_state->state_op_done[OP_SEND_MESSAGE]) {
949       // we haven't sent message yet
950       result = false;
951     } else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
952                !stream_state->state_callback_received[OP_SEND_MESSAGE] &&
953                !(t->use_packet_coalescing &&
954                  stream_state->pending_write_for_trailer)) {
955       // we haven't got on_write_completed for the send yet
956       result = false;
957     }
958   } else if (op_id == OP_CANCEL_ERROR) {
959     // already executed
960     if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
961   } else if (op_id == OP_ON_COMPLETE) {
962     if (op_state->state_op_done[OP_ON_COMPLETE]) {
963       // already executed (note we're checking op specific state, not stream
964       // state)
965       CRONET_LOG(GPR_DEBUG, "Because");
966       result = false;
967     }
968     // Check if every op that was asked for is done.
969     // TODO(muxi): We should not consider the recv ops here, since they
970     // have their own callbacks.  We should invoke a batch's on_complete
971     // as soon as all of the batch's send ops are complete, even if
972     // there are still recv ops pending.
973     else if (curr_op->send_initial_metadata &&
974              !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
975       CRONET_LOG(GPR_DEBUG, "Because");
976       result = false;
977     } else if (curr_op->send_message &&
978                !op_state->state_op_done[OP_SEND_MESSAGE]) {
979       CRONET_LOG(GPR_DEBUG, "Because");
980       result = false;
981     } else if (curr_op->send_message &&
982                !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
983       CRONET_LOG(GPR_DEBUG, "Because");
984       result = false;
985     } else if (curr_op->send_trailing_metadata &&
986                !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
987       CRONET_LOG(GPR_DEBUG, "Because");
988       result = false;
989     } else if (curr_op->recv_initial_metadata &&
990                !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
991       CRONET_LOG(GPR_DEBUG, "Because");
992       result = false;
993     } else if (curr_op->recv_message &&
994                !op_state->state_op_done[OP_RECV_MESSAGE]) {
995       CRONET_LOG(GPR_DEBUG, "Because");
996       result = false;
997     } else if (curr_op->cancel_stream &&
998                !stream_state->state_callback_received[OP_CANCELED]) {
999       CRONET_LOG(GPR_DEBUG, "Because");
1000       result = false;
1001     } else if (curr_op->recv_trailing_metadata) {
1002       // We aren't done with trailing metadata yet
1003       if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1004         CRONET_LOG(GPR_DEBUG, "Because");
1005         result = false;
1006       }
1007       // We've asked for actual message in an earlier op, and it hasn't been
1008       // delivered yet.
1009       else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
1010         // If this op is not the one asking for read, (which means some earlier
1011         // op has asked), and the read hasn't been delivered.
1012         if (!curr_op->recv_message &&
1013             !stream_state->state_callback_received[OP_SUCCEEDED]) {
1014           CRONET_LOG(GPR_DEBUG, "Because");
1015           result = false;
1016         }
1017       }
1018     }
1019     // We should see at least one on_write_completed for the trailers that we
1020     // sent
1021     else if (curr_op->send_trailing_metadata &&
1022              !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
1023       result = false;
1024     }
1025   }
1026   CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
1027              result ? "YES" : "NO");
1028   return result;
1029 }
1030 
1031 //
1032 // TODO (makdharma): Break down this function in smaller chunks for readability.
1033 //
execute_stream_op(struct op_and_state * oas)1034 static enum e_op_result execute_stream_op(struct op_and_state* oas) {
1035   grpc_transport_stream_op_batch* stream_op = &oas->op;
1036   struct stream_obj* s = oas->s;
1037   grpc_cronet_transport* t = s->curr_ct;
1038   struct op_state* stream_state = &s->state;
1039   enum e_op_result result = NO_ACTION_POSSIBLE;
1040   if (stream_op->send_initial_metadata &&
1041       op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) {
1042     CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
1043     // Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
1044     // on_failed
1045     GPR_ASSERT(s->cbs == nullptr);
1046     GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
1047     s->cbs =
1048         bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks);
1049     CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
1050     if (t->use_packet_coalescing) {
1051       bidirectional_stream_disable_auto_flush(s->cbs, true);
1052       bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
1053     }
1054     std::string url;
1055     const char* method = "POST";
1056     s->header_array.headers = nullptr;
1057     convert_metadata_to_cronet_headers(
1058         stream_op->payload->send_initial_metadata.send_initial_metadata,
1059         t->host, &url, &s->header_array.headers, &s->header_array.count,
1060         &method);
1061     s->header_array.capacity = s->header_array.count;
1062     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs,
1063                url.c_str());
1064     bidirectional_stream_start(s->cbs, url.c_str(), 0, method, &s->header_array,
1065                                false);
1066     unsigned int header_index;
1067     for (header_index = 0; header_index < s->header_array.count;
1068          header_index++) {
1069       gpr_free(const_cast<char*>(s->header_array.headers[header_index].key));
1070       gpr_free(const_cast<char*>(s->header_array.headers[header_index].value));
1071     }
1072     stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
1073     if (t->use_packet_coalescing) {
1074       if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
1075         s->state.flush_cronet_when_ready = true;
1076       }
1077     }
1078     result = ACTION_TAKEN_WITH_CALLBACK;
1079   } else if (stream_op->send_message &&
1080              op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
1081     CRONET_LOG(GPR_DEBUG, "running: %p  OP_SEND_MESSAGE", oas);
1082     stream_state->pending_send_message = false;
1083     if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1084         stream_state->state_callback_received[OP_FAILED] ||
1085         stream_state->state_callback_received[OP_SUCCEEDED]) {
1086       result = NO_ACTION_POSSIBLE;
1087       CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1088     } else {
1089       size_t write_buffer_size;
1090       create_grpc_frame(
1091           stream_op->payload->send_message.send_message->c_slice_buffer(),
1092           &stream_state->ws.write_buffer, &write_buffer_size,
1093           stream_op->payload->send_message.flags);
1094       if (write_buffer_size > 0) {
1095         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
1096                    stream_state->ws.write_buffer);
1097         stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1098         bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
1099                                    static_cast<int>(write_buffer_size), false);
1100         if (t->use_packet_coalescing) {
1101           if (!stream_op->send_trailing_metadata) {
1102             CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1103             bidirectional_stream_flush(s->cbs);
1104             result = ACTION_TAKEN_WITH_CALLBACK;
1105           } else {
1106             stream_state->pending_write_for_trailer = true;
1107             result = ACTION_TAKEN_NO_CALLBACK;
1108           }
1109         } else {
1110           result = ACTION_TAKEN_WITH_CALLBACK;
1111         }
1112       } else {
1113         // Should never reach here
1114         grpc_core::Crash("unreachable");
1115       }
1116     }
1117     stream_state->state_op_done[OP_SEND_MESSAGE] = true;
1118     oas->state.state_op_done[OP_SEND_MESSAGE] = true;
1119   } else if (stream_op->send_trailing_metadata &&
1120              op_can_be_run(stream_op, s, &oas->state,
1121                            OP_SEND_TRAILING_METADATA)) {
1122     CRONET_LOG(GPR_DEBUG, "running: %p  OP_SEND_TRAILING_METADATA", oas);
1123     if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1124         stream_state->state_callback_received[OP_FAILED] ||
1125         stream_state->state_callback_received[OP_SUCCEEDED]) {
1126       result = NO_ACTION_POSSIBLE;
1127       CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1128     } else {
1129       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
1130       stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1131       bidirectional_stream_write(s->cbs, "", 0, true);
1132       if (t->use_packet_coalescing) {
1133         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1134         bidirectional_stream_flush(s->cbs);
1135       }
1136       result = ACTION_TAKEN_WITH_CALLBACK;
1137     }
1138     stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
1139   } else if (stream_op->recv_initial_metadata &&
1140              op_can_be_run(stream_op, s, &oas->state,
1141                            OP_RECV_INITIAL_METADATA)) {
1142     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_INITIAL_METADATA", oas);
1143     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1144       grpc_core::ExecCtx::Run(
1145           DEBUG_LOCATION,
1146           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1147           absl::OkStatus());
1148     } else if (stream_state->state_callback_received[OP_FAILED]) {
1149       grpc_core::ExecCtx::Run(
1150           DEBUG_LOCATION,
1151           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1152           absl::OkStatus());
1153     } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1154       grpc_core::ExecCtx::Run(
1155           DEBUG_LOCATION,
1156           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1157           absl::OkStatus());
1158     } else {
1159       *stream_op->payload->recv_initial_metadata.recv_initial_metadata =
1160           std::move(oas->s->state.rs.initial_metadata);
1161       grpc_core::ExecCtx::Run(
1162           DEBUG_LOCATION,
1163           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1164           absl::OkStatus());
1165     }
1166     stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
1167     result = ACTION_TAKEN_NO_CALLBACK;
1168   } else if (stream_op->recv_message &&
1169              op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) {
1170     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_MESSAGE", oas);
1171     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1172       CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
1173       grpc_core::ExecCtx::Run(
1174           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1175           absl::OkStatus());
1176       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1177       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1178       result = ACTION_TAKEN_NO_CALLBACK;
1179     } else if (stream_state->state_callback_received[OP_FAILED]) {
1180       CRONET_LOG(GPR_DEBUG, "Stream failed.");
1181       grpc_core::ExecCtx::Run(
1182           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1183           absl::OkStatus());
1184       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1185       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1186       result = ACTION_TAKEN_NO_CALLBACK;
1187     } else if (stream_state->rs.read_stream_closed) {
1188       // No more data will be received
1189       CRONET_LOG(GPR_DEBUG, "read stream closed");
1190       grpc_core::ExecCtx::Run(
1191           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1192           absl::OkStatus());
1193       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1194       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1195       result = ACTION_TAKEN_NO_CALLBACK;
1196     } else if (stream_state->flush_read) {
1197       CRONET_LOG(GPR_DEBUG, "flush read");
1198       grpc_core::ExecCtx::Run(
1199           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1200           absl::OkStatus());
1201       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1202       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1203       result = ACTION_TAKEN_NO_CALLBACK;
1204     } else if (!stream_state->rs.length_field_received) {
1205       if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
1206           stream_state->rs.remaining_bytes == 0) {
1207         // Start a read operation for data
1208         stream_state->rs.length_field_received = true;
1209         parse_grpc_header(
1210             reinterpret_cast<const uint8_t*>(stream_state->rs.read_buffer),
1211             &stream_state->rs.length_field, &stream_state->rs.compressed);
1212         CRONET_LOG(GPR_DEBUG, "length field = %d",
1213                    stream_state->rs.length_field);
1214         if (stream_state->rs.length_field > 0) {
1215           stream_state->rs.read_buffer = static_cast<char*>(
1216               gpr_malloc(static_cast<size_t>(stream_state->rs.length_field)));
1217           GPR_ASSERT(stream_state->rs.read_buffer);
1218           stream_state->rs.remaining_bytes = stream_state->rs.length_field;
1219           stream_state->rs.received_bytes = 0;
1220           CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1221           stream_state->state_op_done[OP_READ_REQ_MADE] =
1222               true;  // Indicates that at least one read request has been made
1223           bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1224                                     stream_state->rs.remaining_bytes);
1225           result = ACTION_TAKEN_WITH_CALLBACK;
1226         } else {
1227           stream_state->rs.remaining_bytes = 0;
1228           CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
1229           // Clean up read_slice_buffer in case there is unread data.
1230           stream_state->rs.read_slice_buffer.Clear();
1231           uint32_t flags = 0;
1232           if (stream_state->rs.compressed) {
1233             flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1234           }
1235           *stream_op->payload->recv_message.flags = flags;
1236           *stream_op->payload->recv_message.recv_message =
1237               std::move(stream_state->rs.read_slice_buffer);
1238           grpc_core::ExecCtx::Run(
1239               DEBUG_LOCATION,
1240               stream_op->payload->recv_message.recv_message_ready,
1241               absl::OkStatus());
1242           stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1243           oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1244 
1245           // Extra read to trigger on_succeed
1246           stream_state->rs.length_field_received = false;
1247           stream_state->state_op_done[OP_READ_REQ_MADE] =
1248               true;  // Indicates that at least one read request has been made
1249           read_grpc_header(s);
1250           result = ACTION_TAKEN_NO_CALLBACK;
1251         }
1252       } else if (stream_state->rs.remaining_bytes == 0) {
1253         // Start a read operation for first 5 bytes (GRPC header)
1254         stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1255         stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1256         stream_state->rs.received_bytes = 0;
1257         stream_state->rs.compressed = false;
1258         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1259         stream_state->state_op_done[OP_READ_REQ_MADE] =
1260             true;  // Indicates that at least one read request has been made
1261         bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1262                                   stream_state->rs.remaining_bytes);
1263         result = ACTION_TAKEN_WITH_CALLBACK;
1264       } else {
1265         result = NO_ACTION_POSSIBLE;
1266       }
1267     } else if (stream_state->rs.remaining_bytes == 0) {
1268       CRONET_LOG(GPR_DEBUG, "read operation complete");
1269       grpc_slice read_data_slice =
1270           GRPC_SLICE_MALLOC((uint32_t)stream_state->rs.length_field);
1271       uint8_t* dst_p = GRPC_SLICE_START_PTR(read_data_slice);
1272       memcpy(dst_p, stream_state->rs.read_buffer,
1273              static_cast<size_t>(stream_state->rs.length_field));
1274       null_and_maybe_free_read_buffer(s);
1275       // Clean up read_slice_buffer in case there is unread data.
1276       stream_state->rs.read_slice_buffer.Clear();
1277       stream_state->rs.read_slice_buffer.Append(
1278           grpc_core::Slice(read_data_slice));
1279       uint32_t flags = 0;
1280       if (stream_state->rs.compressed) {
1281         flags = GRPC_WRITE_INTERNAL_COMPRESS;
1282       }
1283       *stream_op->payload->recv_message.flags = flags;
1284       *stream_op->payload->recv_message.recv_message =
1285           std::move(stream_state->rs.read_slice_buffer);
1286       grpc_core::ExecCtx::Run(
1287           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1288           absl::OkStatus());
1289       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1290       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1291       // Do an extra read to trigger on_succeeded() callback in case connection
1292       // is closed
1293       stream_state->rs.length_field_received = false;
1294       read_grpc_header(s);
1295       result = ACTION_TAKEN_NO_CALLBACK;
1296     }
1297   } else if (stream_op->recv_trailing_metadata &&
1298              op_can_be_run(stream_op, s, &oas->state,
1299                            OP_RECV_TRAILING_METADATA)) {
1300     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_TRAILING_METADATA", oas);
1301     grpc_error_handle error;
1302     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1303       error = stream_state->cancel_error;
1304     } else if (stream_state->state_callback_received[OP_FAILED]) {
1305       grpc_status_code grpc_error_code =
1306           cronet_net_error_to_grpc_error(stream_state->net_error);
1307       const char* desc = cronet_net_error_as_string(stream_state->net_error);
1308       error =
1309           make_error_with_desc(grpc_error_code, stream_state->net_error, desc);
1310     } else if (oas->s->state.rs.trailing_metadata_valid) {
1311       *stream_op->payload->recv_trailing_metadata.recv_trailing_metadata =
1312           std::move(oas->s->state.rs.trailing_metadata);
1313       stream_state->rs.trailing_metadata_valid = false;
1314     }
1315     grpc_core::ExecCtx::Run(
1316         DEBUG_LOCATION,
1317         stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1318         error);
1319     stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
1320     result = ACTION_TAKEN_NO_CALLBACK;
1321   } else if (stream_op->cancel_stream &&
1322              op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
1323     CRONET_LOG(GPR_DEBUG, "running: %p  OP_CANCEL_ERROR", oas);
1324     if (s->cbs) {
1325       CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
1326       bidirectional_stream_cancel(s->cbs);
1327       result = ACTION_TAKEN_WITH_CALLBACK;
1328     } else {
1329       result = ACTION_TAKEN_NO_CALLBACK;
1330     }
1331     stream_state->state_op_done[OP_CANCEL_ERROR] = true;
1332     if (stream_state->cancel_error.ok()) {
1333       stream_state->cancel_error =
1334           stream_op->payload->cancel_stream.cancel_error;
1335     }
1336   } else if (op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
1337     CRONET_LOG(GPR_DEBUG, "running: %p  OP_ON_COMPLETE", oas);
1338     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1339       if (stream_op->on_complete) {
1340         grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1341                                 stream_state->cancel_error);
1342       }
1343     } else if (stream_state->state_callback_received[OP_FAILED]) {
1344       if (stream_op->on_complete) {
1345         const char* error_message =
1346             cronet_net_error_as_string(stream_state->net_error);
1347         grpc_status_code grpc_error_code =
1348             cronet_net_error_to_grpc_error(stream_state->net_error);
1349         grpc_core::ExecCtx::Run(
1350             DEBUG_LOCATION, stream_op->on_complete,
1351             make_error_with_desc(grpc_error_code, stream_state->net_error,
1352                                  error_message));
1353       }
1354     } else {
1355       // All actions in this stream_op are complete. Call the on_complete
1356       // callback
1357       //
1358       if (stream_op->on_complete) {
1359         grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1360                                 absl::OkStatus());
1361       }
1362     }
1363     oas->state.state_op_done[OP_ON_COMPLETE] = true;
1364     oas->done = true;
1365     // reset any send message state, only if this ON_COMPLETE is about a send.
1366     //
1367     if (stream_op->send_message) {
1368       stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1369       stream_state->state_op_done[OP_SEND_MESSAGE] = false;
1370     }
1371     result = ACTION_TAKEN_NO_CALLBACK;
1372     // If this is the on_complete callback being called for a received message -
1373     // make a note
1374     if (stream_op->recv_message) {
1375       stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
1376     }
1377   } else {
1378     result = NO_ACTION_POSSIBLE;
1379   }
1380   return result;
1381 }
1382 
1383 //
1384 // Functions used by upper layers to access transport functionality.
1385 //
1386 
stream_obj(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,grpc_core::Arena * arena)1387 inline stream_obj::stream_obj(grpc_transport* gt, grpc_stream* gs,
1388                               grpc_stream_refcount* refcount,
1389                               grpc_core::Arena* arena)
1390     : arena(arena),
1391       curr_ct(reinterpret_cast<grpc_cronet_transport*>(gt)),
1392       curr_gs(gs),
1393       state(arena),
1394       refcount(refcount) {
1395   GRPC_CRONET_STREAM_REF(this, "cronet transport");
1396   gpr_mu_init(&mu);
1397 }
1398 
~stream_obj()1399 inline stream_obj::~stream_obj() { null_and_maybe_free_read_buffer(this); }
1400 
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void *,grpc_core::Arena * arena)1401 static int init_stream(grpc_transport* gt, grpc_stream* gs,
1402                        grpc_stream_refcount* refcount,
1403                        const void* /*server_data*/, grpc_core::Arena* arena) {
1404   new (gs) stream_obj(gt, gs, refcount, arena);
1405   return 0;
1406 }
1407 
set_pollset_do_nothing(grpc_transport *,grpc_stream *,grpc_pollset *)1408 static void set_pollset_do_nothing(grpc_transport* /*gt*/, grpc_stream* /*gs*/,
1409                                    grpc_pollset* /*pollset*/) {}
1410 
set_pollset_set_do_nothing(grpc_transport *,grpc_stream *,grpc_pollset_set *)1411 static void set_pollset_set_do_nothing(grpc_transport* /*gt*/,
1412                                        grpc_stream* /*gs*/,
1413                                        grpc_pollset_set* /*pollset_set*/) {}
1414 
perform_stream_op(grpc_transport *,grpc_stream * gs,grpc_transport_stream_op_batch * op)1415 static void perform_stream_op(grpc_transport* /*gt*/, grpc_stream* gs,
1416                               grpc_transport_stream_op_batch* op) {
1417   CRONET_LOG(GPR_DEBUG, "perform_stream_op");
1418   if (op->send_initial_metadata &&
1419       header_has_authority(
1420           op->payload->send_initial_metadata.send_initial_metadata)) {
1421     // Cronet does not support :authority header field. We cancel the call when
1422     // this field is present in metadata
1423     if (op->recv_initial_metadata) {
1424       grpc_core::ExecCtx::Run(
1425           DEBUG_LOCATION,
1426           op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1427           absl::CancelledError());
1428     }
1429     if (op->recv_message) {
1430       grpc_core::ExecCtx::Run(DEBUG_LOCATION,
1431                               op->payload->recv_message.recv_message_ready,
1432                               absl::CancelledError());
1433     }
1434     if (op->recv_trailing_metadata) {
1435       grpc_core::ExecCtx::Run(
1436           DEBUG_LOCATION,
1437           op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1438           absl::CancelledError());
1439     }
1440     grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
1441                             absl::CancelledError());
1442     return;
1443   }
1444   stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1445   add_to_storage(s, op);
1446   execute_from_storage(s);
1447 }
1448 
destroy_stream(grpc_transport *,grpc_stream * gs,grpc_closure * then_schedule_closure)1449 static void destroy_stream(grpc_transport* /*gt*/, grpc_stream* gs,
1450                            grpc_closure* then_schedule_closure) {
1451   stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1452   s->~stream_obj();
1453   grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
1454                           absl::OkStatus());
1455 }
1456 
destroy_transport(grpc_transport *)1457 static void destroy_transport(grpc_transport* /*gt*/) {}
1458 
get_endpoint(grpc_transport *)1459 static grpc_endpoint* get_endpoint(grpc_transport* /*gt*/) { return nullptr; }
1460 
perform_op(grpc_transport *,grpc_transport_op *)1461 static void perform_op(grpc_transport* /*gt*/, grpc_transport_op* /*op*/) {}
1462 
1463 static const grpc_transport_vtable grpc_cronet_vtable = {
1464     sizeof(stream_obj),
1465     false,
1466     "cronet_http",
1467     init_stream,
1468     nullptr,
1469     set_pollset_do_nothing,
1470     set_pollset_set_do_nothing,
1471     perform_stream_op,
1472     perform_op,
1473     destroy_stream,
1474     destroy_transport,
1475     get_endpoint};
1476 
grpc_create_cronet_transport(void * engine,const char * target,const grpc_channel_args * args,void *)1477 grpc_transport* grpc_create_cronet_transport(void* engine, const char* target,
1478                                              const grpc_channel_args* args,
1479                                              void* /*reserved*/) {
1480   grpc_cronet_transport* ct = static_cast<grpc_cronet_transport*>(
1481       gpr_malloc(sizeof(grpc_cronet_transport)));
1482   if (!ct) {
1483     goto error;
1484   }
1485   ct->base.vtable = &grpc_cronet_vtable;
1486   ct->engine = static_cast<stream_engine*>(engine);
1487   ct->host = static_cast<char*>(gpr_malloc(strlen(target) + 1));
1488   if (!ct->host) {
1489     goto error;
1490   }
1491   strcpy(ct->host, target);
1492 
1493   ct->use_packet_coalescing = true;
1494   if (args) {
1495     for (size_t i = 0; i < args->num_args; i++) {
1496       if (0 ==
1497           strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) {
1498         if (GPR_UNLIKELY(args->args[i].type != GRPC_ARG_INTEGER)) {
1499           gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
1500                   GRPC_ARG_USE_CRONET_PACKET_COALESCING);
1501         } else {
1502           ct->use_packet_coalescing = (args->args[i].value.integer != 0);
1503         }
1504       }
1505     }
1506   }
1507 
1508   return &ct->base;
1509 
1510 error:
1511   if (ct) {
1512     if (ct->host) {
1513       gpr_free(ct->host);
1514     }
1515     gpr_free(ct);
1516   }
1517 
1518   return nullptr;
1519 }
1520