1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/cronet/transport/cronet_transport.h"
22
23 #include <stdint.h>
24 #include <stdlib.h>
25 #include <string.h>
26
27 #include <initializer_list>
28 #include <new>
29 #include <string>
30 #include <utility>
31
32 #include "absl/status/status.h"
33 #include "absl/strings/match.h"
34 #include "absl/strings/str_cat.h"
35 #include "absl/strings/str_format.h"
36 #include "absl/strings/string_view.h"
37 #include "absl/types/optional.h"
38 #include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
39
40 #include <grpc/slice.h>
41 #include <grpc/status.h>
42 #include <grpc/support/alloc.h>
43 #include <grpc/support/log.h>
44 #include <grpc/support/sync.h>
45
46 #include "src/core/ext/transport/chttp2/transport/bin_decoder.h"
47 #include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
48 #include "src/core/ext/transport/cronet/transport/cronet_status.h"
49 #include "src/core/lib/debug/trace.h"
50 #include "src/core/lib/gprpp/crash.h"
51 #include "src/core/lib/gprpp/debug_location.h"
52 #include "src/core/lib/gprpp/status_helper.h"
53 #include "src/core/lib/iomgr/closure.h"
54 #include "src/core/lib/iomgr/endpoint.h"
55 #include "src/core/lib/iomgr/error.h"
56 #include "src/core/lib/iomgr/exec_ctx.h"
57 #include "src/core/lib/iomgr/iomgr_fwd.h"
58 #include "src/core/lib/resource_quota/arena.h"
59 #include "src/core/lib/slice/slice.h"
60 #include "src/core/lib/slice/slice_buffer.h"
61 #include "src/core/lib/surface/validate_metadata.h"
62 #include "src/core/lib/transport/metadata_batch.h"
63 #include "src/core/lib/transport/transport.h"
64 #include "src/core/lib/transport/transport_impl.h"
65
66 // IWYU pragma: no_include <type_traits>
67
68 #define GRPC_HEADER_SIZE_IN_BYTES 5
69 #define GRPC_FLUSH_READ_SIZE 4096
70
71 grpc_core::TraceFlag grpc_cronet_trace(false, "cronet");
72 #define CRONET_LOG(...) \
73 do { \
74 if (grpc_cronet_trace.enabled()) gpr_log(__VA_ARGS__); \
75 } while (0)
76
77 enum e_op_result {
78 ACTION_TAKEN_WITH_CALLBACK,
79 ACTION_TAKEN_NO_CALLBACK,
80 NO_ACTION_POSSIBLE
81 };
82
83 enum e_op_id {
84 OP_SEND_INITIAL_METADATA = 0,
85 OP_SEND_MESSAGE,
86 OP_SEND_TRAILING_METADATA,
87 OP_RECV_MESSAGE,
88 OP_RECV_INITIAL_METADATA,
89 OP_RECV_TRAILING_METADATA,
90 OP_CANCEL_ERROR,
91 OP_ON_COMPLETE,
92 OP_FAILED,
93 OP_SUCCEEDED,
94 OP_CANCELED,
95 OP_RECV_MESSAGE_AND_ON_COMPLETE,
96 OP_READ_REQ_MADE,
97 OP_NUM_OPS
98 };
99
100 // Cronet callbacks. See cronet_c_for_grpc.h for documentation for each.
101
102 static void on_stream_ready(bidirectional_stream*);
103 static void on_response_headers_received(
104 bidirectional_stream*, const bidirectional_stream_header_array*,
105 const char*);
106 static void on_write_completed(bidirectional_stream*, const char*);
107 static void on_read_completed(bidirectional_stream*, char*, int);
108 static void on_response_trailers_received(
109 bidirectional_stream*, const bidirectional_stream_header_array*);
110 static void on_succeeded(bidirectional_stream*);
111 static void on_failed(bidirectional_stream*, int);
112 static void on_canceled(bidirectional_stream*);
113 static bidirectional_stream_callback cronet_callbacks = {
114 on_stream_ready,
115 on_response_headers_received,
116 on_read_completed,
117 on_write_completed,
118 on_response_trailers_received,
119 on_succeeded,
120 on_failed,
121 on_canceled};
122
123 // Cronet transport object
124 struct grpc_cronet_transport {
125 grpc_transport base; // must be first element in this structure
126 stream_engine* engine;
127 char* host;
128 bool use_packet_coalescing;
129 };
130 typedef struct grpc_cronet_transport grpc_cronet_transport;
131
132 // TODO (makdharma): reorder structure for memory efficiency per
133 // http://www.catb.org/esr/structure-packing/#_structure_reordering:
134 struct read_state {
read_stateread_state135 explicit read_state(grpc_core::Arena* arena)
136 : trailing_metadata(arena), initial_metadata(arena) {}
137
138 // vars to store data coming from server
139 char* read_buffer = nullptr;
140 bool length_field_received = false;
141 int received_bytes = 0;
142 int remaining_bytes = 0;
143 int length_field = 0;
144 bool compressed = false;
145 char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES] = {};
146 char* payload_field = nullptr;
147 bool read_stream_closed = false;
148
149 // vars for holding data destined for the application
150 grpc_core::SliceBuffer read_slice_buffer;
151
152 // vars for trailing metadata
153 grpc_metadata_batch trailing_metadata;
154 bool trailing_metadata_valid = false;
155
156 // vars for initial metadata
157 grpc_metadata_batch initial_metadata;
158 };
159
160 struct write_state {
161 char* write_buffer = nullptr;
162 };
163
164 // track state of one stream op
165 struct op_state {
op_stateop_state166 explicit op_state(grpc_core::Arena* arena) : rs(arena) {}
167
168 bool state_op_done[OP_NUM_OPS] = {};
169 bool state_callback_received[OP_NUM_OPS] = {};
170 // A non-zero gRPC status code has been seen
171 bool fail_state = false;
172 // Transport is discarding all buffered messages
173 bool flush_read = false;
174 bool flush_cronet_when_ready = false;
175 bool pending_write_for_trailer = false;
176 bool pending_send_message = false;
177 // User requested RECV_TRAILING_METADATA
178 bool pending_recv_trailing_metadata = false;
179 cronet_net_error_code net_error = OK;
180 grpc_error_handle cancel_error;
181 // data structure for storing data coming from server
182 struct read_state rs;
183 // data structure for storing data going to the server
184 struct write_state ws;
185 };
186
187 struct stream_obj;
188
189 struct op_and_state {
190 op_and_state(stream_obj* s, const grpc_transport_stream_op_batch& op);
191
192 grpc_transport_stream_op_batch op;
193 struct op_state state;
194 bool done = false;
195 struct stream_obj* s; // Pointer back to the stream object
196 // next op_and_state in the linked list
197 struct op_and_state* next = nullptr;
198 };
199
200 struct op_storage {
201 int num_pending_ops = 0;
202 struct op_and_state* head = nullptr;
203 };
204
205 struct stream_obj {
206 stream_obj(grpc_transport* gt, grpc_stream* gs,
207 grpc_stream_refcount* refcount, grpc_core::Arena* arena);
208 ~stream_obj();
209
210 grpc_core::Arena* arena;
211 struct op_and_state* oas = nullptr;
212 grpc_transport_stream_op_batch* curr_op = nullptr;
213 grpc_cronet_transport* curr_ct;
214 grpc_stream* curr_gs;
215 bidirectional_stream* cbs = nullptr;
216 bidirectional_stream_header_array header_array =
217 bidirectional_stream_header_array(); // Zero-initialize the structure.
218
219 // Stream level state. Some state will be tracked both at stream and stream_op
220 // level
221 struct op_state state;
222
223 // OP storage
224 struct op_storage storage;
225
226 // Mutex to protect storage
227 gpr_mu mu;
228
229 // Refcount object of the stream
230 grpc_stream_refcount* refcount;
231 };
232
233 #ifndef NDEBUG
234 #define GRPC_CRONET_STREAM_REF(stream, reason) \
235 grpc_cronet_stream_ref((stream), (reason))
236 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
237 grpc_cronet_stream_unref((stream), (reason))
grpc_cronet_stream_ref(stream_obj * s,const char * reason)238 void grpc_cronet_stream_ref(stream_obj* s, const char* reason) {
239 grpc_stream_ref(s->refcount, reason);
240 }
grpc_cronet_stream_unref(stream_obj * s,const char * reason)241 void grpc_cronet_stream_unref(stream_obj* s, const char* reason) {
242 grpc_stream_unref(s->refcount, reason);
243 }
244 #else
245 #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
246 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
247 grpc_cronet_stream_unref((stream))
grpc_cronet_stream_ref(stream_obj * s)248 void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); }
grpc_cronet_stream_unref(stream_obj * s)249 void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); }
250 #endif
251
252 static enum e_op_result execute_stream_op(struct op_and_state* oas);
253
254 //
255 // Utility function to translate enum into string for printing
256 //
op_result_string(enum e_op_result i)257 static const char* op_result_string(enum e_op_result i) {
258 switch (i) {
259 case ACTION_TAKEN_WITH_CALLBACK:
260 return "ACTION_TAKEN_WITH_CALLBACK";
261 case ACTION_TAKEN_NO_CALLBACK:
262 return "ACTION_TAKEN_NO_CALLBACK";
263 case NO_ACTION_POSSIBLE:
264 return "NO_ACTION_POSSIBLE";
265 }
266 GPR_UNREACHABLE_CODE(return "UNKNOWN");
267 }
268
op_id_string(enum e_op_id i)269 static const char* op_id_string(enum e_op_id i) {
270 switch (i) {
271 case OP_SEND_INITIAL_METADATA:
272 return "OP_SEND_INITIAL_METADATA";
273 case OP_SEND_MESSAGE:
274 return "OP_SEND_MESSAGE";
275 case OP_SEND_TRAILING_METADATA:
276 return "OP_SEND_TRAILING_METADATA";
277 case OP_RECV_MESSAGE:
278 return "OP_RECV_MESSAGE";
279 case OP_RECV_INITIAL_METADATA:
280 return "OP_RECV_INITIAL_METADATA";
281 case OP_RECV_TRAILING_METADATA:
282 return "OP_RECV_TRAILING_METADATA";
283 case OP_CANCEL_ERROR:
284 return "OP_CANCEL_ERROR";
285 case OP_ON_COMPLETE:
286 return "OP_ON_COMPLETE";
287 case OP_FAILED:
288 return "OP_FAILED";
289 case OP_SUCCEEDED:
290 return "OP_SUCCEEDED";
291 case OP_CANCELED:
292 return "OP_CANCELED";
293 case OP_RECV_MESSAGE_AND_ON_COMPLETE:
294 return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
295 case OP_READ_REQ_MADE:
296 return "OP_READ_REQ_MADE";
297 case OP_NUM_OPS:
298 return "OP_NUM_OPS";
299 }
300 return "UNKNOWN";
301 }
302
null_and_maybe_free_read_buffer(stream_obj * s)303 static void null_and_maybe_free_read_buffer(stream_obj* s) {
304 if (s->state.rs.read_buffer &&
305 s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
306 gpr_free(s->state.rs.read_buffer);
307 }
308 s->state.rs.read_buffer = nullptr;
309 }
310
read_grpc_header(stream_obj * s)311 static void read_grpc_header(stream_obj* s) {
312 s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
313 s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
314 s->state.rs.received_bytes = 0;
315 s->state.rs.compressed = false;
316 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
317 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
318 s->state.rs.remaining_bytes);
319 }
320
make_error_with_desc(int error_code,int cronet_internal_error_code,const char * desc)321 static grpc_error_handle make_error_with_desc(int error_code,
322 int cronet_internal_error_code,
323 const char* desc) {
324 return grpc_error_set_int(GRPC_ERROR_CREATE(absl::StrFormat(
325 "Cronet error code:%d, Cronet error detail:%s",
326 cronet_internal_error_code, desc)),
327 grpc_core::StatusIntProperty::kRpcStatus,
328 error_code);
329 }
330
op_and_state(stream_obj * s,const grpc_transport_stream_op_batch & op)331 inline op_and_state::op_and_state(stream_obj* s,
332 const grpc_transport_stream_op_batch& op)
333 : op(op), state(s->arena), s(s) {}
334
335 //
336 // Add a new stream op to op storage.
337 //
add_to_storage(struct stream_obj * s,grpc_transport_stream_op_batch * op)338 static void add_to_storage(struct stream_obj* s,
339 grpc_transport_stream_op_batch* op) {
340 struct op_storage* storage = &s->storage;
341 // add new op at the beginning of the linked list. The memory is freed
342 // in remove_from_storage
343 op_and_state* new_op = new op_and_state(s, *op);
344 gpr_mu_lock(&s->mu);
345 new_op->next = storage->head;
346 storage->head = new_op;
347 storage->num_pending_ops++;
348 if (op->send_message) {
349 s->state.pending_send_message = true;
350 }
351 if (op->recv_trailing_metadata) {
352 s->state.pending_recv_trailing_metadata = true;
353 }
354 CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
355 storage->num_pending_ops);
356 gpr_mu_unlock(&s->mu);
357 }
358
359 //
360 // Traverse the linked list and delete op and free memory
361 //
remove_from_storage(struct stream_obj * s,struct op_and_state * oas)362 static void remove_from_storage(struct stream_obj* s,
363 struct op_and_state* oas) {
364 struct op_and_state* curr;
365 if (s->storage.head == nullptr || oas == nullptr) {
366 return;
367 }
368 if (s->storage.head == oas) {
369 s->storage.head = oas->next;
370 delete oas;
371 s->storage.num_pending_ops--;
372 CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
373 s->storage.num_pending_ops);
374 } else {
375 for (curr = s->storage.head; curr != nullptr; curr = curr->next) {
376 if (curr->next == oas) {
377 curr->next = oas->next;
378 s->storage.num_pending_ops--;
379 CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
380 s->storage.num_pending_ops);
381 delete oas;
382 break;
383 } else if (GPR_UNLIKELY(curr->next == nullptr)) {
384 CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
385 }
386 }
387 }
388 }
389
390 //
391 // Cycle through ops and try to take next action. Break when either
392 // an action with callback is taken, or no action is possible.
393 // This can get executed from the Cronet network thread via cronet callback
394 // or on the application supplied thread via the perform_stream_op function.
395 //
execute_from_storage(stream_obj * s)396 static void execute_from_storage(stream_obj* s) {
397 gpr_mu_lock(&s->mu);
398 for (struct op_and_state* curr = s->storage.head; curr != nullptr;) {
399 CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
400 GPR_ASSERT(!curr->done);
401 enum e_op_result result = execute_stream_op(curr);
402 CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
403 op_result_string(result));
404 // if this op is done, then remove it and free memory
405 if (curr->done) {
406 struct op_and_state* next = curr->next;
407 remove_from_storage(s, curr);
408 curr = next;
409 } else if (result == NO_ACTION_POSSIBLE) {
410 curr = curr->next;
411 } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
412 // wait for the callback
413 break;
414 } // continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK
415 }
416 gpr_mu_unlock(&s->mu);
417 }
418
convert_cronet_array_to_metadata(const bidirectional_stream_header_array * header_array,grpc_metadata_batch * mds)419 static void convert_cronet_array_to_metadata(
420 const bidirectional_stream_header_array* header_array,
421 grpc_metadata_batch* mds) {
422 for (size_t i = 0; i < header_array->count; i++) {
423 CRONET_LOG(GPR_DEBUG, "header key=%s, value=%s",
424 header_array->headers[i].key, header_array->headers[i].value);
425 grpc_slice value;
426 if (absl::EndsWith(header_array->headers[i].key, "-bin")) {
427 value = grpc_slice_from_static_string(header_array->headers[i].value);
428 value = grpc_chttp2_base64_decode_with_length(
429 value, grpc_chttp2_base64_infer_length_after_decode(value));
430 } else {
431 value = grpc_slice_from_static_string(header_array->headers[i].value);
432 }
433 mds->Append(header_array->headers[i].key, grpc_core::Slice(value),
434 [&](absl::string_view error, const grpc_core::Slice& value) {
435 gpr_log(GPR_DEBUG, "Failed to parse metadata: %s",
436 absl::StrCat("key=", header_array->headers[i].key,
437 " error=", error,
438 " value=", value.as_string_view())
439 .c_str());
440 });
441 }
442 }
443
444 //
445 // Cronet callback
446 //
on_failed(bidirectional_stream * stream,int net_error)447 static void on_failed(bidirectional_stream* stream, int net_error) {
448 gpr_log(GPR_ERROR, "on_failed(%p, %d)", stream, net_error);
449 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
450 grpc_core::ExecCtx exec_ctx;
451
452 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
453 gpr_mu_lock(&s->mu);
454 bidirectional_stream_destroy(s->cbs);
455 s->state.state_callback_received[OP_FAILED] = true;
456 s->state.net_error = static_cast<cronet_net_error_code>(net_error);
457 s->cbs = nullptr;
458 if (s->header_array.headers) {
459 gpr_free(s->header_array.headers);
460 s->header_array.headers = nullptr;
461 }
462 if (s->state.ws.write_buffer) {
463 gpr_free(s->state.ws.write_buffer);
464 s->state.ws.write_buffer = nullptr;
465 }
466 null_and_maybe_free_read_buffer(s);
467 gpr_mu_unlock(&s->mu);
468 execute_from_storage(s);
469 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
470 }
471
472 //
473 // Cronet callback
474 //
on_canceled(bidirectional_stream * stream)475 static void on_canceled(bidirectional_stream* stream) {
476 CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
477 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
478 grpc_core::ExecCtx exec_ctx;
479
480 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
481 gpr_mu_lock(&s->mu);
482 bidirectional_stream_destroy(s->cbs);
483 s->state.state_callback_received[OP_CANCELED] = true;
484 s->cbs = nullptr;
485 if (s->header_array.headers) {
486 gpr_free(s->header_array.headers);
487 s->header_array.headers = nullptr;
488 }
489 if (s->state.ws.write_buffer) {
490 gpr_free(s->state.ws.write_buffer);
491 s->state.ws.write_buffer = nullptr;
492 }
493 null_and_maybe_free_read_buffer(s);
494 gpr_mu_unlock(&s->mu);
495 execute_from_storage(s);
496 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
497 }
498
499 //
500 // Cronet callback
501 //
on_succeeded(bidirectional_stream * stream)502 static void on_succeeded(bidirectional_stream* stream) {
503 CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
504 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
505 grpc_core::ExecCtx exec_ctx;
506
507 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
508 gpr_mu_lock(&s->mu);
509 bidirectional_stream_destroy(s->cbs);
510 s->state.state_callback_received[OP_SUCCEEDED] = true;
511 s->cbs = nullptr;
512 null_and_maybe_free_read_buffer(s);
513 gpr_mu_unlock(&s->mu);
514 execute_from_storage(s);
515 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
516 }
517
518 //
519 // Cronet callback
520 //
on_stream_ready(bidirectional_stream * stream)521 static void on_stream_ready(bidirectional_stream* stream) {
522 CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
523 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
524 grpc_core::ExecCtx exec_ctx;
525 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
526 grpc_cronet_transport* t = s->curr_ct;
527 gpr_mu_lock(&s->mu);
528 s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
529 s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
530 // Free the memory allocated for headers
531 if (s->header_array.headers) {
532 gpr_free(s->header_array.headers);
533 s->header_array.headers = nullptr;
534 }
535 // Send the initial metadata on wire if there is no SEND_MESSAGE or
536 // SEND_TRAILING_METADATA ops pending
537 if (t->use_packet_coalescing) {
538 if (s->state.flush_cronet_when_ready) {
539 CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
540 bidirectional_stream_flush(stream);
541 }
542 }
543 gpr_mu_unlock(&s->mu);
544 execute_from_storage(s);
545 }
546
547 //
548 // Cronet callback
549 //
on_response_headers_received(bidirectional_stream * stream,const bidirectional_stream_header_array * headers,const char * negotiated_protocol)550 static void on_response_headers_received(
551 bidirectional_stream* stream,
552 const bidirectional_stream_header_array* headers,
553 const char* negotiated_protocol) {
554 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
555 grpc_core::ExecCtx exec_ctx;
556 CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
557 headers, negotiated_protocol);
558 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
559
560 // Identify if this is a header or a trailer (in a trailer-only response case)
561 //
562 for (size_t i = 0; i < headers->count; i++) {
563 if (0 == strcmp("grpc-status", headers->headers[i].key)) {
564 on_response_trailers_received(stream, headers);
565
566 // Do an extra read for a trailer-only stream to trigger on_succeeded()
567 // callback
568 read_grpc_header(s);
569 return;
570 }
571 }
572
573 gpr_mu_lock(&s->mu);
574 convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata);
575 s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
576 if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
577 s->state.state_callback_received[OP_FAILED])) {
578 // Do an extra read to trigger on_succeeded() callback in case connection
579 // is closed
580 GPR_ASSERT(s->state.rs.length_field_received == false);
581 read_grpc_header(s);
582 }
583 gpr_mu_unlock(&s->mu);
584 execute_from_storage(s);
585 }
586
587 //
588 // Cronet callback
589 //
on_write_completed(bidirectional_stream * stream,const char * data)590 static void on_write_completed(bidirectional_stream* stream, const char* data) {
591 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
592 grpc_core::ExecCtx exec_ctx;
593 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
594 CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
595 gpr_mu_lock(&s->mu);
596 if (s->state.ws.write_buffer) {
597 gpr_free(s->state.ws.write_buffer);
598 s->state.ws.write_buffer = nullptr;
599 }
600 s->state.state_callback_received[OP_SEND_MESSAGE] = true;
601 gpr_mu_unlock(&s->mu);
602 execute_from_storage(s);
603 }
604
605 //
606 // Cronet callback
607 //
on_read_completed(bidirectional_stream * stream,char * data,int count)608 static void on_read_completed(bidirectional_stream* stream, char* data,
609 int count) {
610 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
611 grpc_core::ExecCtx exec_ctx;
612 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
613 CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
614 count);
615 gpr_mu_lock(&s->mu);
616 s->state.state_callback_received[OP_RECV_MESSAGE] = true;
617 if (count > 0 && s->state.flush_read) {
618 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
619 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
620 GRPC_FLUSH_READ_SIZE);
621 gpr_mu_unlock(&s->mu);
622 } else if (count > 0) {
623 s->state.rs.received_bytes += count;
624 s->state.rs.remaining_bytes -= count;
625 if (s->state.rs.remaining_bytes > 0) {
626 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
627 s->state.state_op_done[OP_READ_REQ_MADE] = true;
628 bidirectional_stream_read(
629 s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
630 s->state.rs.remaining_bytes);
631 gpr_mu_unlock(&s->mu);
632 } else {
633 gpr_mu_unlock(&s->mu);
634 execute_from_storage(s);
635 }
636 } else {
637 null_and_maybe_free_read_buffer(s);
638 s->state.rs.read_stream_closed = true;
639 gpr_mu_unlock(&s->mu);
640 execute_from_storage(s);
641 }
642 }
643
644 //
645 // Cronet callback
646 //
on_response_trailers_received(bidirectional_stream * stream,const bidirectional_stream_header_array * trailers)647 static void on_response_trailers_received(
648 bidirectional_stream* stream,
649 const bidirectional_stream_header_array* trailers) {
650 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
651 grpc_core::ExecCtx exec_ctx;
652 CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
653 trailers);
654 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
655 grpc_cronet_transport* t = s->curr_ct;
656 gpr_mu_lock(&s->mu);
657 s->state.rs.trailing_metadata_valid = false;
658 convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata);
659 if (trailers->count > 0) {
660 s->state.rs.trailing_metadata_valid = true;
661 }
662 s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
663 // Send a EOS when server terminates the stream (testServerFinishesRequest) to
664 // trigger on_succeeded
665 if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] &&
666 !(s->state.state_op_done[OP_CANCEL_ERROR] ||
667 s->state.state_callback_received[OP_FAILED])) {
668 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
669 s->state.state_callback_received[OP_SEND_MESSAGE] = false;
670 bidirectional_stream_write(s->cbs, "", 0, true);
671 if (t->use_packet_coalescing) {
672 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
673 bidirectional_stream_flush(s->cbs);
674 }
675 s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
676
677 gpr_mu_unlock(&s->mu);
678 } else {
679 gpr_mu_unlock(&s->mu);
680 execute_from_storage(s);
681 }
682 }
683
684 //
685 // Utility function that takes the data from s->write_slice_buffer and assembles
686 // into a contiguous byte stream with 5 byte gRPC header prepended.
687 //
create_grpc_frame(grpc_slice_buffer * write_slice_buffer,char ** pp_write_buffer,size_t * p_write_buffer_size,uint32_t flags)688 static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer,
689 char** pp_write_buffer,
690 size_t* p_write_buffer_size, uint32_t flags) {
691 size_t length = write_slice_buffer->length;
692 *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
693 // This is freed in the on_write_completed callback
694 char* write_buffer =
695 static_cast<char*>(gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES));
696 *pp_write_buffer = write_buffer;
697 uint8_t* p = reinterpret_cast<uint8_t*>(write_buffer);
698 // Append 5 byte header
699 // Compressed flag
700 *p++ = static_cast<uint8_t>((flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0);
701 // Message length
702 *p++ = static_cast<uint8_t>(length >> 24);
703 *p++ = static_cast<uint8_t>(length >> 16);
704 *p++ = static_cast<uint8_t>(length >> 8);
705 *p++ = static_cast<uint8_t>(length);
706 // append actual data
707 size_t offset = 0;
708 for (size_t i = 0; i < write_slice_buffer->count; ++i) {
709 memcpy(p + offset, GRPC_SLICE_START_PTR(write_slice_buffer->slices[i]),
710 GRPC_SLICE_LENGTH(write_slice_buffer->slices[i]));
711 offset += GRPC_SLICE_LENGTH(write_slice_buffer->slices[i]);
712 }
713 }
714
715 namespace {
716 class CronetMetadataEncoder {
717 public:
CronetMetadataEncoder(bidirectional_stream_header ** pp_headers,size_t * p_count,const char * host,size_t capacity,const char ** method,std::string * url)718 explicit CronetMetadataEncoder(bidirectional_stream_header** pp_headers,
719 size_t* p_count, const char* host,
720 size_t capacity, const char** method,
721 std::string* url)
722 : host_(host),
723 capacity_(capacity),
724 count_(*p_count),
725 headers_(*pp_headers),
726 method_(method),
727 url_(url) {
728 count_ = 0;
729 headers_ = static_cast<bidirectional_stream_header*>(
730 gpr_malloc(sizeof(bidirectional_stream_header) * capacity_));
731 }
732
733 CronetMetadataEncoder(const CronetMetadataEncoder&) = delete;
734 CronetMetadataEncoder& operator=(const CronetMetadataEncoder&) = delete;
735
736 template <class T, class V>
Encode(T,const V & value)737 void Encode(T, const V& value) {
738 Encode(grpc_core::Slice::FromStaticString(T::key()),
739 grpc_core::Slice(T::Encode(value)));
740 }
741
Encode(grpc_core::HttpSchemeMetadata,grpc_core::HttpSchemeMetadata::ValueType)742 void Encode(grpc_core::HttpSchemeMetadata,
743 grpc_core::HttpSchemeMetadata::ValueType) {
744 // Cronet populates these fields on its own
745 }
Encode(grpc_core::HttpAuthorityMetadata,const grpc_core::HttpAuthorityMetadata::ValueType &)746 void Encode(grpc_core::HttpAuthorityMetadata,
747 const grpc_core::HttpAuthorityMetadata::ValueType&) {
748 // Cronet populates these fields on its own
749 }
750
Encode(grpc_core::HttpMethodMetadata,grpc_core::HttpMethodMetadata::ValueType method)751 void Encode(grpc_core::HttpMethodMetadata,
752 grpc_core::HttpMethodMetadata::ValueType method) {
753 switch (method) {
754 case grpc_core::HttpMethodMetadata::kPost:
755 *method_ = "POST";
756 break;
757 case grpc_core::HttpMethodMetadata::kInvalid:
758 case grpc_core::HttpMethodMetadata::kGet:
759 case grpc_core::HttpMethodMetadata::kPut:
760 abort();
761 }
762 }
763
Encode(grpc_core::HttpPathMetadata,const grpc_core::HttpPathMetadata::ValueType & path)764 void Encode(grpc_core::HttpPathMetadata,
765 const grpc_core::HttpPathMetadata::ValueType& path) {
766 // Create URL by appending :path value to the hostname
767 *url_ = absl::StrCat("https://", host_, path.as_string_view());
768 }
769
Encode(const grpc_core::Slice & key_slice,const grpc_core::Slice & value_slice)770 void Encode(const grpc_core::Slice& key_slice,
771 const grpc_core::Slice& value_slice) {
772 char* key = grpc_slice_to_c_string(key_slice.c_slice());
773 char* value;
774 if (grpc_is_binary_header_internal(key_slice.c_slice())) {
775 grpc_slice wire_value = grpc_chttp2_base64_encode(value_slice.c_slice());
776 value = grpc_slice_to_c_string(wire_value);
777 grpc_core::CSliceUnref(wire_value);
778 } else {
779 value = grpc_slice_to_c_string(value_slice.c_slice());
780 }
781 CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
782 GPR_ASSERT(count_ < capacity_);
783 headers_[count_].key = key;
784 headers_[count_].value = value;
785 ++count_;
786 }
787
788 private:
789 const char* host_;
790 size_t capacity_;
791 size_t& count_;
792 bidirectional_stream_header*& headers_;
793 const char** method_;
794 std::string* url_;
795 };
796 } // namespace
797
798 //
799 // Convert metadata in a format that Cronet can consume
800 //
convert_metadata_to_cronet_headers(grpc_metadata_batch * metadata,const char * host,std::string * pp_url,bidirectional_stream_header ** pp_headers,size_t * p_num_headers,const char ** method)801 static void convert_metadata_to_cronet_headers(
802 grpc_metadata_batch* metadata, const char* host, std::string* pp_url,
803 bidirectional_stream_header** pp_headers, size_t* p_num_headers,
804 const char** method) {
805 CronetMetadataEncoder encoder(pp_headers, p_num_headers, host,
806 metadata->count(), method, pp_url);
807 metadata->Encode(&encoder);
808 }
809
parse_grpc_header(const uint8_t * data,int * length,bool * compressed)810 static void parse_grpc_header(const uint8_t* data, int* length,
811 bool* compressed) {
812 const uint8_t c = *data;
813 const uint8_t* p = data + 1;
814 *compressed = ((c & 0x01) == 0x01);
815 *length = 0;
816 *length |= (*p++) << 24;
817 *length |= (*p++) << 16;
818 *length |= (*p++) << 8;
819 *length |= (*p++);
820 }
821
header_has_authority(const grpc_metadata_batch * b)822 static bool header_has_authority(const grpc_metadata_batch* b) {
823 return b->get_pointer(grpc_core::HttpAuthorityMetadata()) != nullptr;
824 }
825
826 //
827 // Op Execution: Decide if one of the actions contained in the stream op can be
828 // executed. This is the heart of the state machine.
829 //
op_can_be_run(grpc_transport_stream_op_batch * curr_op,struct stream_obj * s,struct op_state * op_state,enum e_op_id op_id)830 static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
831 struct stream_obj* s, struct op_state* op_state,
832 enum e_op_id op_id) {
833 struct op_state* stream_state = &s->state;
834 grpc_cronet_transport* t = s->curr_ct;
835 bool result = true;
836 // When call is canceled, every op can be run, except under following
837 // conditions
838 //
839 bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
840 stream_state->state_callback_received[OP_FAILED];
841 if (is_canceled_or_failed) {
842 if (op_id == OP_SEND_INITIAL_METADATA) {
843 CRONET_LOG(GPR_DEBUG, "Because");
844 result = false;
845 }
846 if (op_id == OP_SEND_MESSAGE) {
847 CRONET_LOG(GPR_DEBUG, "Because");
848 result = false;
849 }
850 if (op_id == OP_SEND_TRAILING_METADATA) {
851 CRONET_LOG(GPR_DEBUG, "Because");
852 result = false;
853 }
854 if (op_id == OP_CANCEL_ERROR) {
855 CRONET_LOG(GPR_DEBUG, "Because");
856 result = false;
857 }
858 // already executed
859 if (op_id == OP_RECV_INITIAL_METADATA &&
860 stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
861 CRONET_LOG(GPR_DEBUG, "Because");
862 result = false;
863 }
864 if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) {
865 CRONET_LOG(GPR_DEBUG, "Because");
866 result = false;
867 }
868 if (op_id == OP_RECV_TRAILING_METADATA &&
869 stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
870 CRONET_LOG(GPR_DEBUG, "Because");
871 result = false;
872 }
873 // ON_COMPLETE can be processed if one of the following conditions is met:
874 // 1. the stream failed
875 // 2. the stream is cancelled, and the callback is received
876 // 3. the stream succeeded before cancel is effective
877 // 4. the stream is cancelled, and the stream is never started
878 if (op_id == OP_ON_COMPLETE &&
879 !(stream_state->state_callback_received[OP_FAILED] ||
880 stream_state->state_callback_received[OP_CANCELED] ||
881 stream_state->state_callback_received[OP_SUCCEEDED] ||
882 !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
883 CRONET_LOG(GPR_DEBUG, "Because");
884 result = false;
885 }
886 } else if (op_id == OP_SEND_INITIAL_METADATA) {
887 // already executed
888 if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
889 } else if (op_id == OP_RECV_INITIAL_METADATA) {
890 if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
891 // already executed
892 result = false;
893 } else if (!stream_state
894 ->state_callback_received[OP_SEND_INITIAL_METADATA]) {
895 // we haven't sent headers yet.
896 result = false;
897 } else if (!stream_state
898 ->state_callback_received[OP_RECV_INITIAL_METADATA] &&
899 !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
900 // we haven't received headers yet.
901 result = false;
902 }
903 } else if (op_id == OP_SEND_MESSAGE) {
904 if (op_state->state_op_done[OP_SEND_MESSAGE]) {
905 // already executed (note we're checking op specific state, not stream
906 // state)
907 result = false;
908 } else if (!stream_state
909 ->state_callback_received[OP_SEND_INITIAL_METADATA]) {
910 // we haven't sent headers yet.
911 result = false;
912 }
913 } else if (op_id == OP_RECV_MESSAGE) {
914 if (op_state->state_op_done[OP_RECV_MESSAGE]) {
915 // already executed
916 result = false;
917 } else if (!stream_state
918 ->state_callback_received[OP_RECV_INITIAL_METADATA] &&
919 !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
920 // we haven't received headers yet.
921 result = false;
922 }
923 } else if (op_id == OP_RECV_TRAILING_METADATA) {
924 if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
925 // already executed
926 result = false;
927 } else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
928 !stream_state->state_op_done[OP_RECV_MESSAGE]) {
929 // we have asked for but haven't received message yet.
930 result = false;
931 } else if (!stream_state
932 ->state_callback_received[OP_RECV_TRAILING_METADATA]) {
933 // we haven't received trailers yet.
934 result = false;
935 } else if (!stream_state->state_callback_received[OP_SUCCEEDED]) {
936 // we haven't received on_succeeded yet.
937 result = false;
938 }
939 } else if (op_id == OP_SEND_TRAILING_METADATA) {
940 if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
941 // already executed
942 result = false;
943 } else if (!stream_state
944 ->state_callback_received[OP_SEND_INITIAL_METADATA]) {
945 // we haven't sent initial metadata yet
946 result = false;
947 } else if (stream_state->pending_send_message &&
948 !stream_state->state_op_done[OP_SEND_MESSAGE]) {
949 // we haven't sent message yet
950 result = false;
951 } else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
952 !stream_state->state_callback_received[OP_SEND_MESSAGE] &&
953 !(t->use_packet_coalescing &&
954 stream_state->pending_write_for_trailer)) {
955 // we haven't got on_write_completed for the send yet
956 result = false;
957 }
958 } else if (op_id == OP_CANCEL_ERROR) {
959 // already executed
960 if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
961 } else if (op_id == OP_ON_COMPLETE) {
962 if (op_state->state_op_done[OP_ON_COMPLETE]) {
963 // already executed (note we're checking op specific state, not stream
964 // state)
965 CRONET_LOG(GPR_DEBUG, "Because");
966 result = false;
967 }
968 // Check if every op that was asked for is done.
969 // TODO(muxi): We should not consider the recv ops here, since they
970 // have their own callbacks. We should invoke a batch's on_complete
971 // as soon as all of the batch's send ops are complete, even if
972 // there are still recv ops pending.
973 else if (curr_op->send_initial_metadata &&
974 !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
975 CRONET_LOG(GPR_DEBUG, "Because");
976 result = false;
977 } else if (curr_op->send_message &&
978 !op_state->state_op_done[OP_SEND_MESSAGE]) {
979 CRONET_LOG(GPR_DEBUG, "Because");
980 result = false;
981 } else if (curr_op->send_message &&
982 !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
983 CRONET_LOG(GPR_DEBUG, "Because");
984 result = false;
985 } else if (curr_op->send_trailing_metadata &&
986 !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
987 CRONET_LOG(GPR_DEBUG, "Because");
988 result = false;
989 } else if (curr_op->recv_initial_metadata &&
990 !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
991 CRONET_LOG(GPR_DEBUG, "Because");
992 result = false;
993 } else if (curr_op->recv_message &&
994 !op_state->state_op_done[OP_RECV_MESSAGE]) {
995 CRONET_LOG(GPR_DEBUG, "Because");
996 result = false;
997 } else if (curr_op->cancel_stream &&
998 !stream_state->state_callback_received[OP_CANCELED]) {
999 CRONET_LOG(GPR_DEBUG, "Because");
1000 result = false;
1001 } else if (curr_op->recv_trailing_metadata) {
1002 // We aren't done with trailing metadata yet
1003 if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1004 CRONET_LOG(GPR_DEBUG, "Because");
1005 result = false;
1006 }
1007 // We've asked for actual message in an earlier op, and it hasn't been
1008 // delivered yet.
1009 else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
1010 // If this op is not the one asking for read, (which means some earlier
1011 // op has asked), and the read hasn't been delivered.
1012 if (!curr_op->recv_message &&
1013 !stream_state->state_callback_received[OP_SUCCEEDED]) {
1014 CRONET_LOG(GPR_DEBUG, "Because");
1015 result = false;
1016 }
1017 }
1018 }
1019 // We should see at least one on_write_completed for the trailers that we
1020 // sent
1021 else if (curr_op->send_trailing_metadata &&
1022 !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
1023 result = false;
1024 }
1025 }
1026 CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
1027 result ? "YES" : "NO");
1028 return result;
1029 }
1030
1031 //
1032 // TODO (makdharma): Break down this function in smaller chunks for readability.
1033 //
execute_stream_op(struct op_and_state * oas)1034 static enum e_op_result execute_stream_op(struct op_and_state* oas) {
1035 grpc_transport_stream_op_batch* stream_op = &oas->op;
1036 struct stream_obj* s = oas->s;
1037 grpc_cronet_transport* t = s->curr_ct;
1038 struct op_state* stream_state = &s->state;
1039 enum e_op_result result = NO_ACTION_POSSIBLE;
1040 if (stream_op->send_initial_metadata &&
1041 op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) {
1042 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
1043 // Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
1044 // on_failed
1045 GPR_ASSERT(s->cbs == nullptr);
1046 GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
1047 s->cbs =
1048 bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks);
1049 CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
1050 if (t->use_packet_coalescing) {
1051 bidirectional_stream_disable_auto_flush(s->cbs, true);
1052 bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
1053 }
1054 std::string url;
1055 const char* method = "POST";
1056 s->header_array.headers = nullptr;
1057 convert_metadata_to_cronet_headers(
1058 stream_op->payload->send_initial_metadata.send_initial_metadata,
1059 t->host, &url, &s->header_array.headers, &s->header_array.count,
1060 &method);
1061 s->header_array.capacity = s->header_array.count;
1062 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs,
1063 url.c_str());
1064 bidirectional_stream_start(s->cbs, url.c_str(), 0, method, &s->header_array,
1065 false);
1066 unsigned int header_index;
1067 for (header_index = 0; header_index < s->header_array.count;
1068 header_index++) {
1069 gpr_free(const_cast<char*>(s->header_array.headers[header_index].key));
1070 gpr_free(const_cast<char*>(s->header_array.headers[header_index].value));
1071 }
1072 stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
1073 if (t->use_packet_coalescing) {
1074 if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
1075 s->state.flush_cronet_when_ready = true;
1076 }
1077 }
1078 result = ACTION_TAKEN_WITH_CALLBACK;
1079 } else if (stream_op->send_message &&
1080 op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
1081 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas);
1082 stream_state->pending_send_message = false;
1083 if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1084 stream_state->state_callback_received[OP_FAILED] ||
1085 stream_state->state_callback_received[OP_SUCCEEDED]) {
1086 result = NO_ACTION_POSSIBLE;
1087 CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1088 } else {
1089 size_t write_buffer_size;
1090 create_grpc_frame(
1091 stream_op->payload->send_message.send_message->c_slice_buffer(),
1092 &stream_state->ws.write_buffer, &write_buffer_size,
1093 stream_op->payload->send_message.flags);
1094 if (write_buffer_size > 0) {
1095 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
1096 stream_state->ws.write_buffer);
1097 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1098 bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
1099 static_cast<int>(write_buffer_size), false);
1100 if (t->use_packet_coalescing) {
1101 if (!stream_op->send_trailing_metadata) {
1102 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1103 bidirectional_stream_flush(s->cbs);
1104 result = ACTION_TAKEN_WITH_CALLBACK;
1105 } else {
1106 stream_state->pending_write_for_trailer = true;
1107 result = ACTION_TAKEN_NO_CALLBACK;
1108 }
1109 } else {
1110 result = ACTION_TAKEN_WITH_CALLBACK;
1111 }
1112 } else {
1113 // Should never reach here
1114 grpc_core::Crash("unreachable");
1115 }
1116 }
1117 stream_state->state_op_done[OP_SEND_MESSAGE] = true;
1118 oas->state.state_op_done[OP_SEND_MESSAGE] = true;
1119 } else if (stream_op->send_trailing_metadata &&
1120 op_can_be_run(stream_op, s, &oas->state,
1121 OP_SEND_TRAILING_METADATA)) {
1122 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas);
1123 if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1124 stream_state->state_callback_received[OP_FAILED] ||
1125 stream_state->state_callback_received[OP_SUCCEEDED]) {
1126 result = NO_ACTION_POSSIBLE;
1127 CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1128 } else {
1129 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
1130 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1131 bidirectional_stream_write(s->cbs, "", 0, true);
1132 if (t->use_packet_coalescing) {
1133 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1134 bidirectional_stream_flush(s->cbs);
1135 }
1136 result = ACTION_TAKEN_WITH_CALLBACK;
1137 }
1138 stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
1139 } else if (stream_op->recv_initial_metadata &&
1140 op_can_be_run(stream_op, s, &oas->state,
1141 OP_RECV_INITIAL_METADATA)) {
1142 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
1143 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1144 grpc_core::ExecCtx::Run(
1145 DEBUG_LOCATION,
1146 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1147 absl::OkStatus());
1148 } else if (stream_state->state_callback_received[OP_FAILED]) {
1149 grpc_core::ExecCtx::Run(
1150 DEBUG_LOCATION,
1151 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1152 absl::OkStatus());
1153 } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1154 grpc_core::ExecCtx::Run(
1155 DEBUG_LOCATION,
1156 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1157 absl::OkStatus());
1158 } else {
1159 *stream_op->payload->recv_initial_metadata.recv_initial_metadata =
1160 std::move(oas->s->state.rs.initial_metadata);
1161 grpc_core::ExecCtx::Run(
1162 DEBUG_LOCATION,
1163 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1164 absl::OkStatus());
1165 }
1166 stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
1167 result = ACTION_TAKEN_NO_CALLBACK;
1168 } else if (stream_op->recv_message &&
1169 op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) {
1170 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
1171 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1172 CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
1173 grpc_core::ExecCtx::Run(
1174 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1175 absl::OkStatus());
1176 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1177 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1178 result = ACTION_TAKEN_NO_CALLBACK;
1179 } else if (stream_state->state_callback_received[OP_FAILED]) {
1180 CRONET_LOG(GPR_DEBUG, "Stream failed.");
1181 grpc_core::ExecCtx::Run(
1182 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1183 absl::OkStatus());
1184 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1185 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1186 result = ACTION_TAKEN_NO_CALLBACK;
1187 } else if (stream_state->rs.read_stream_closed) {
1188 // No more data will be received
1189 CRONET_LOG(GPR_DEBUG, "read stream closed");
1190 grpc_core::ExecCtx::Run(
1191 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1192 absl::OkStatus());
1193 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1194 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1195 result = ACTION_TAKEN_NO_CALLBACK;
1196 } else if (stream_state->flush_read) {
1197 CRONET_LOG(GPR_DEBUG, "flush read");
1198 grpc_core::ExecCtx::Run(
1199 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1200 absl::OkStatus());
1201 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1202 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1203 result = ACTION_TAKEN_NO_CALLBACK;
1204 } else if (!stream_state->rs.length_field_received) {
1205 if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
1206 stream_state->rs.remaining_bytes == 0) {
1207 // Start a read operation for data
1208 stream_state->rs.length_field_received = true;
1209 parse_grpc_header(
1210 reinterpret_cast<const uint8_t*>(stream_state->rs.read_buffer),
1211 &stream_state->rs.length_field, &stream_state->rs.compressed);
1212 CRONET_LOG(GPR_DEBUG, "length field = %d",
1213 stream_state->rs.length_field);
1214 if (stream_state->rs.length_field > 0) {
1215 stream_state->rs.read_buffer = static_cast<char*>(
1216 gpr_malloc(static_cast<size_t>(stream_state->rs.length_field)));
1217 GPR_ASSERT(stream_state->rs.read_buffer);
1218 stream_state->rs.remaining_bytes = stream_state->rs.length_field;
1219 stream_state->rs.received_bytes = 0;
1220 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1221 stream_state->state_op_done[OP_READ_REQ_MADE] =
1222 true; // Indicates that at least one read request has been made
1223 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1224 stream_state->rs.remaining_bytes);
1225 result = ACTION_TAKEN_WITH_CALLBACK;
1226 } else {
1227 stream_state->rs.remaining_bytes = 0;
1228 CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
1229 // Clean up read_slice_buffer in case there is unread data.
1230 stream_state->rs.read_slice_buffer.Clear();
1231 uint32_t flags = 0;
1232 if (stream_state->rs.compressed) {
1233 flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1234 }
1235 *stream_op->payload->recv_message.flags = flags;
1236 *stream_op->payload->recv_message.recv_message =
1237 std::move(stream_state->rs.read_slice_buffer);
1238 grpc_core::ExecCtx::Run(
1239 DEBUG_LOCATION,
1240 stream_op->payload->recv_message.recv_message_ready,
1241 absl::OkStatus());
1242 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1243 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1244
1245 // Extra read to trigger on_succeed
1246 stream_state->rs.length_field_received = false;
1247 stream_state->state_op_done[OP_READ_REQ_MADE] =
1248 true; // Indicates that at least one read request has been made
1249 read_grpc_header(s);
1250 result = ACTION_TAKEN_NO_CALLBACK;
1251 }
1252 } else if (stream_state->rs.remaining_bytes == 0) {
1253 // Start a read operation for first 5 bytes (GRPC header)
1254 stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1255 stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1256 stream_state->rs.received_bytes = 0;
1257 stream_state->rs.compressed = false;
1258 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1259 stream_state->state_op_done[OP_READ_REQ_MADE] =
1260 true; // Indicates that at least one read request has been made
1261 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1262 stream_state->rs.remaining_bytes);
1263 result = ACTION_TAKEN_WITH_CALLBACK;
1264 } else {
1265 result = NO_ACTION_POSSIBLE;
1266 }
1267 } else if (stream_state->rs.remaining_bytes == 0) {
1268 CRONET_LOG(GPR_DEBUG, "read operation complete");
1269 grpc_slice read_data_slice =
1270 GRPC_SLICE_MALLOC((uint32_t)stream_state->rs.length_field);
1271 uint8_t* dst_p = GRPC_SLICE_START_PTR(read_data_slice);
1272 memcpy(dst_p, stream_state->rs.read_buffer,
1273 static_cast<size_t>(stream_state->rs.length_field));
1274 null_and_maybe_free_read_buffer(s);
1275 // Clean up read_slice_buffer in case there is unread data.
1276 stream_state->rs.read_slice_buffer.Clear();
1277 stream_state->rs.read_slice_buffer.Append(
1278 grpc_core::Slice(read_data_slice));
1279 uint32_t flags = 0;
1280 if (stream_state->rs.compressed) {
1281 flags = GRPC_WRITE_INTERNAL_COMPRESS;
1282 }
1283 *stream_op->payload->recv_message.flags = flags;
1284 *stream_op->payload->recv_message.recv_message =
1285 std::move(stream_state->rs.read_slice_buffer);
1286 grpc_core::ExecCtx::Run(
1287 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1288 absl::OkStatus());
1289 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1290 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1291 // Do an extra read to trigger on_succeeded() callback in case connection
1292 // is closed
1293 stream_state->rs.length_field_received = false;
1294 read_grpc_header(s);
1295 result = ACTION_TAKEN_NO_CALLBACK;
1296 }
1297 } else if (stream_op->recv_trailing_metadata &&
1298 op_can_be_run(stream_op, s, &oas->state,
1299 OP_RECV_TRAILING_METADATA)) {
1300 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
1301 grpc_error_handle error;
1302 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1303 error = stream_state->cancel_error;
1304 } else if (stream_state->state_callback_received[OP_FAILED]) {
1305 grpc_status_code grpc_error_code =
1306 cronet_net_error_to_grpc_error(stream_state->net_error);
1307 const char* desc = cronet_net_error_as_string(stream_state->net_error);
1308 error =
1309 make_error_with_desc(grpc_error_code, stream_state->net_error, desc);
1310 } else if (oas->s->state.rs.trailing_metadata_valid) {
1311 *stream_op->payload->recv_trailing_metadata.recv_trailing_metadata =
1312 std::move(oas->s->state.rs.trailing_metadata);
1313 stream_state->rs.trailing_metadata_valid = false;
1314 }
1315 grpc_core::ExecCtx::Run(
1316 DEBUG_LOCATION,
1317 stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1318 error);
1319 stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
1320 result = ACTION_TAKEN_NO_CALLBACK;
1321 } else if (stream_op->cancel_stream &&
1322 op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
1323 CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas);
1324 if (s->cbs) {
1325 CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
1326 bidirectional_stream_cancel(s->cbs);
1327 result = ACTION_TAKEN_WITH_CALLBACK;
1328 } else {
1329 result = ACTION_TAKEN_NO_CALLBACK;
1330 }
1331 stream_state->state_op_done[OP_CANCEL_ERROR] = true;
1332 if (stream_state->cancel_error.ok()) {
1333 stream_state->cancel_error =
1334 stream_op->payload->cancel_stream.cancel_error;
1335 }
1336 } else if (op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
1337 CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
1338 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1339 if (stream_op->on_complete) {
1340 grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1341 stream_state->cancel_error);
1342 }
1343 } else if (stream_state->state_callback_received[OP_FAILED]) {
1344 if (stream_op->on_complete) {
1345 const char* error_message =
1346 cronet_net_error_as_string(stream_state->net_error);
1347 grpc_status_code grpc_error_code =
1348 cronet_net_error_to_grpc_error(stream_state->net_error);
1349 grpc_core::ExecCtx::Run(
1350 DEBUG_LOCATION, stream_op->on_complete,
1351 make_error_with_desc(grpc_error_code, stream_state->net_error,
1352 error_message));
1353 }
1354 } else {
1355 // All actions in this stream_op are complete. Call the on_complete
1356 // callback
1357 //
1358 if (stream_op->on_complete) {
1359 grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1360 absl::OkStatus());
1361 }
1362 }
1363 oas->state.state_op_done[OP_ON_COMPLETE] = true;
1364 oas->done = true;
1365 // reset any send message state, only if this ON_COMPLETE is about a send.
1366 //
1367 if (stream_op->send_message) {
1368 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1369 stream_state->state_op_done[OP_SEND_MESSAGE] = false;
1370 }
1371 result = ACTION_TAKEN_NO_CALLBACK;
1372 // If this is the on_complete callback being called for a received message -
1373 // make a note
1374 if (stream_op->recv_message) {
1375 stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
1376 }
1377 } else {
1378 result = NO_ACTION_POSSIBLE;
1379 }
1380 return result;
1381 }
1382
1383 //
1384 // Functions used by upper layers to access transport functionality.
1385 //
1386
stream_obj(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,grpc_core::Arena * arena)1387 inline stream_obj::stream_obj(grpc_transport* gt, grpc_stream* gs,
1388 grpc_stream_refcount* refcount,
1389 grpc_core::Arena* arena)
1390 : arena(arena),
1391 curr_ct(reinterpret_cast<grpc_cronet_transport*>(gt)),
1392 curr_gs(gs),
1393 state(arena),
1394 refcount(refcount) {
1395 GRPC_CRONET_STREAM_REF(this, "cronet transport");
1396 gpr_mu_init(&mu);
1397 }
1398
~stream_obj()1399 inline stream_obj::~stream_obj() { null_and_maybe_free_read_buffer(this); }
1400
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void *,grpc_core::Arena * arena)1401 static int init_stream(grpc_transport* gt, grpc_stream* gs,
1402 grpc_stream_refcount* refcount,
1403 const void* /*server_data*/, grpc_core::Arena* arena) {
1404 new (gs) stream_obj(gt, gs, refcount, arena);
1405 return 0;
1406 }
1407
set_pollset_do_nothing(grpc_transport *,grpc_stream *,grpc_pollset *)1408 static void set_pollset_do_nothing(grpc_transport* /*gt*/, grpc_stream* /*gs*/,
1409 grpc_pollset* /*pollset*/) {}
1410
set_pollset_set_do_nothing(grpc_transport *,grpc_stream *,grpc_pollset_set *)1411 static void set_pollset_set_do_nothing(grpc_transport* /*gt*/,
1412 grpc_stream* /*gs*/,
1413 grpc_pollset_set* /*pollset_set*/) {}
1414
perform_stream_op(grpc_transport *,grpc_stream * gs,grpc_transport_stream_op_batch * op)1415 static void perform_stream_op(grpc_transport* /*gt*/, grpc_stream* gs,
1416 grpc_transport_stream_op_batch* op) {
1417 CRONET_LOG(GPR_DEBUG, "perform_stream_op");
1418 if (op->send_initial_metadata &&
1419 header_has_authority(
1420 op->payload->send_initial_metadata.send_initial_metadata)) {
1421 // Cronet does not support :authority header field. We cancel the call when
1422 // this field is present in metadata
1423 if (op->recv_initial_metadata) {
1424 grpc_core::ExecCtx::Run(
1425 DEBUG_LOCATION,
1426 op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1427 absl::CancelledError());
1428 }
1429 if (op->recv_message) {
1430 grpc_core::ExecCtx::Run(DEBUG_LOCATION,
1431 op->payload->recv_message.recv_message_ready,
1432 absl::CancelledError());
1433 }
1434 if (op->recv_trailing_metadata) {
1435 grpc_core::ExecCtx::Run(
1436 DEBUG_LOCATION,
1437 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1438 absl::CancelledError());
1439 }
1440 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
1441 absl::CancelledError());
1442 return;
1443 }
1444 stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1445 add_to_storage(s, op);
1446 execute_from_storage(s);
1447 }
1448
destroy_stream(grpc_transport *,grpc_stream * gs,grpc_closure * then_schedule_closure)1449 static void destroy_stream(grpc_transport* /*gt*/, grpc_stream* gs,
1450 grpc_closure* then_schedule_closure) {
1451 stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1452 s->~stream_obj();
1453 grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
1454 absl::OkStatus());
1455 }
1456
destroy_transport(grpc_transport *)1457 static void destroy_transport(grpc_transport* /*gt*/) {}
1458
get_endpoint(grpc_transport *)1459 static grpc_endpoint* get_endpoint(grpc_transport* /*gt*/) { return nullptr; }
1460
perform_op(grpc_transport *,grpc_transport_op *)1461 static void perform_op(grpc_transport* /*gt*/, grpc_transport_op* /*op*/) {}
1462
1463 static const grpc_transport_vtable grpc_cronet_vtable = {
1464 sizeof(stream_obj),
1465 false,
1466 "cronet_http",
1467 init_stream,
1468 nullptr,
1469 set_pollset_do_nothing,
1470 set_pollset_set_do_nothing,
1471 perform_stream_op,
1472 perform_op,
1473 destroy_stream,
1474 destroy_transport,
1475 get_endpoint};
1476
grpc_create_cronet_transport(void * engine,const char * target,const grpc_channel_args * args,void *)1477 grpc_transport* grpc_create_cronet_transport(void* engine, const char* target,
1478 const grpc_channel_args* args,
1479 void* /*reserved*/) {
1480 grpc_cronet_transport* ct = static_cast<grpc_cronet_transport*>(
1481 gpr_malloc(sizeof(grpc_cronet_transport)));
1482 if (!ct) {
1483 goto error;
1484 }
1485 ct->base.vtable = &grpc_cronet_vtable;
1486 ct->engine = static_cast<stream_engine*>(engine);
1487 ct->host = static_cast<char*>(gpr_malloc(strlen(target) + 1));
1488 if (!ct->host) {
1489 goto error;
1490 }
1491 strcpy(ct->host, target);
1492
1493 ct->use_packet_coalescing = true;
1494 if (args) {
1495 for (size_t i = 0; i < args->num_args; i++) {
1496 if (0 ==
1497 strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) {
1498 if (GPR_UNLIKELY(args->args[i].type != GRPC_ARG_INTEGER)) {
1499 gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
1500 GRPC_ARG_USE_CRONET_PACKET_COALESCING);
1501 } else {
1502 ct->use_packet_coalescing = (args->args[i].value.integer != 0);
1503 }
1504 }
1505 }
1506 }
1507
1508 return &ct->base;
1509
1510 error:
1511 if (ct) {
1512 if (ct->host) {
1513 gpr_free(ct->host);
1514 }
1515 gpr_free(ct);
1516 }
1517
1518 return nullptr;
1519 }
1520