1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
20 #define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
21
22 #include <grpc/support/port_platform.h>
23
24 #include <stddef.h>
25 #include <stdint.h>
26
27 #include <memory>
28
29 #include "absl/strings/string_view.h"
30 #include "absl/types/optional.h"
31
32 #include <grpc/event_engine/event_engine.h>
33 #include <grpc/event_engine/memory_allocator.h>
34 #include <grpc/grpc.h>
35 #include <grpc/slice.h>
36 #include <grpc/support/time.h>
37
38 #include "src/core/ext/transport/chttp2/transport/context_list_entry.h"
39 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
40 #include "src/core/ext/transport/chttp2/transport/frame.h"
41 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
42 #include "src/core/ext/transport/chttp2/transport/frame_ping.h"
43 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
44 #include "src/core/ext/transport/chttp2/transport/frame_settings.h"
45 #include "src/core/ext/transport/chttp2/transport/frame_window_update.h"
46 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
47 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
48 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
49 #include "src/core/ext/transport/chttp2/transport/stream_map.h"
50 #include "src/core/lib/channel/channel_args.h"
51 #include "src/core/lib/channel/channelz.h"
52 #include "src/core/lib/debug/trace.h"
53 #include "src/core/lib/gprpp/bitset.h"
54 #include "src/core/lib/gprpp/debug_location.h"
55 #include "src/core/lib/gprpp/ref_counted.h"
56 #include "src/core/lib/gprpp/ref_counted_ptr.h"
57 #include "src/core/lib/gprpp/time.h"
58 #include "src/core/lib/iomgr/closure.h"
59 #include "src/core/lib/iomgr/combiner.h"
60 #include "src/core/lib/iomgr/endpoint.h"
61 #include "src/core/lib/iomgr/error.h"
62 #include "src/core/lib/resource_quota/arena.h"
63 #include "src/core/lib/resource_quota/memory_quota.h"
64 #include "src/core/lib/slice/slice.h"
65 #include "src/core/lib/slice/slice_buffer.h"
66 #include "src/core/lib/surface/init_internally.h"
67 #include "src/core/lib/transport/connectivity_state.h"
68 #include "src/core/lib/transport/metadata_batch.h"
69 #include "src/core/lib/transport/transport.h"
70 #include "src/core/lib/transport/transport_fwd.h"
71 #include "src/core/lib/transport/transport_impl.h"
72
73 // Flag that this closure barrier may be covering a write in a pollset, and so
74 // we should not complete this closure until we can prove that the write got
75 // scheduled
76 #define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
77 // First bit of the reference count, stored in the high order bits (with the low
78 // bits being used for flags defined above)
79 #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
80
81 // streams are kept in various linked lists depending on what things need to
82 // happen to them... this enum labels each list
83 typedef enum {
84 // If a stream is in the following two lists, an explicit ref is associated
85 // with the stream
86 GRPC_CHTTP2_LIST_WRITABLE,
87 GRPC_CHTTP2_LIST_WRITING,
88 // No additional ref is taken for the following refs. Make sure to remove the
89 // stream from these lists when the stream is removed.
90 GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
91 GRPC_CHTTP2_LIST_STALLED_BY_STREAM,
92 /// streams that are waiting to start because there are too many concurrent
93 /// streams on the connection
94 GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
95 STREAM_LIST_COUNT // must be last
96 } grpc_chttp2_stream_list_id;
97
98 typedef enum {
99 GRPC_CHTTP2_WRITE_STATE_IDLE,
100 GRPC_CHTTP2_WRITE_STATE_WRITING,
101 GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
102 } grpc_chttp2_write_state;
103
104 typedef enum {
105 GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY,
106 GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT,
107 } grpc_chttp2_optimization_target;
108
109 typedef enum {
110 GRPC_CHTTP2_PCL_INITIATE = 0,
111 GRPC_CHTTP2_PCL_NEXT,
112 GRPC_CHTTP2_PCL_INFLIGHT,
113 GRPC_CHTTP2_PCL_COUNT // must be last
114 } grpc_chttp2_ping_closure_list;
115
116 typedef enum {
117 GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE,
118 GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM,
119 GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE,
120 GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA,
121 GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA,
122 GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING,
123 GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS,
124 GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT,
125 GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM,
126 GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API,
127 GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
128 GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL,
129 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS,
130 GRPC_CHTTP2_INITIATE_WRITE_SETTINGS_ACK,
131 GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING,
132 GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE,
133 GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING,
134 GRPC_CHTTP2_INITIATE_WRITE_BDP_PING,
135 GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING,
136 GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED,
137 GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE,
138 GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM,
139 } grpc_chttp2_initiate_write_reason;
140
141 const char* grpc_chttp2_initiate_write_reason_string(
142 grpc_chttp2_initiate_write_reason reason);
143
144 struct grpc_chttp2_ping_queue {
145 grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT] = {};
146 uint64_t inflight_id = 0;
147 };
148 struct grpc_chttp2_repeated_ping_policy {
149 int max_pings_without_data;
150 int max_ping_strikes;
151 grpc_core::Duration min_recv_ping_interval_without_data;
152 };
153 struct grpc_chttp2_repeated_ping_state {
154 grpc_core::Timestamp last_ping_sent_time;
155 int pings_before_data_required;
156 absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
157 delayed_ping_timer_handle;
158 };
159 struct grpc_chttp2_server_ping_recv_state {
160 grpc_core::Timestamp last_ping_recv_time;
161 int ping_strikes;
162 };
163 // deframer state for the overall http2 stream of bytes
164 typedef enum {
165 // prefix: one entry per http2 connection prefix byte
166 GRPC_DTS_CLIENT_PREFIX_0 = 0,
167 GRPC_DTS_CLIENT_PREFIX_1,
168 GRPC_DTS_CLIENT_PREFIX_2,
169 GRPC_DTS_CLIENT_PREFIX_3,
170 GRPC_DTS_CLIENT_PREFIX_4,
171 GRPC_DTS_CLIENT_PREFIX_5,
172 GRPC_DTS_CLIENT_PREFIX_6,
173 GRPC_DTS_CLIENT_PREFIX_7,
174 GRPC_DTS_CLIENT_PREFIX_8,
175 GRPC_DTS_CLIENT_PREFIX_9,
176 GRPC_DTS_CLIENT_PREFIX_10,
177 GRPC_DTS_CLIENT_PREFIX_11,
178 GRPC_DTS_CLIENT_PREFIX_12,
179 GRPC_DTS_CLIENT_PREFIX_13,
180 GRPC_DTS_CLIENT_PREFIX_14,
181 GRPC_DTS_CLIENT_PREFIX_15,
182 GRPC_DTS_CLIENT_PREFIX_16,
183 GRPC_DTS_CLIENT_PREFIX_17,
184 GRPC_DTS_CLIENT_PREFIX_18,
185 GRPC_DTS_CLIENT_PREFIX_19,
186 GRPC_DTS_CLIENT_PREFIX_20,
187 GRPC_DTS_CLIENT_PREFIX_21,
188 GRPC_DTS_CLIENT_PREFIX_22,
189 GRPC_DTS_CLIENT_PREFIX_23,
190 // frame header byte 0...
191 // must follow from the prefix states
192 GRPC_DTS_FH_0,
193 GRPC_DTS_FH_1,
194 GRPC_DTS_FH_2,
195 GRPC_DTS_FH_3,
196 GRPC_DTS_FH_4,
197 GRPC_DTS_FH_5,
198 GRPC_DTS_FH_6,
199 GRPC_DTS_FH_7,
200 // ... frame header byte 8
201 GRPC_DTS_FH_8,
202 // inside a http2 frame
203 GRPC_DTS_FRAME
204 } grpc_chttp2_deframe_transport_state;
205
206 struct grpc_chttp2_stream_list {
207 grpc_chttp2_stream* head;
208 grpc_chttp2_stream* tail;
209 };
210 struct grpc_chttp2_stream_link {
211 grpc_chttp2_stream* next;
212 grpc_chttp2_stream* prev;
213 };
214 // We keep several sets of connection wide parameters
215 typedef enum {
216 // The settings our peer has asked for (and we have acked)
217 GRPC_PEER_SETTINGS = 0,
218 // The settings we'd like to have
219 GRPC_LOCAL_SETTINGS,
220 // The settings we've published to our peer
221 GRPC_SENT_SETTINGS,
222 // The settings the peer has acked
223 GRPC_ACKED_SETTINGS,
224 GRPC_NUM_SETTING_SETS
225 } grpc_chttp2_setting_set;
226
227 typedef enum {
228 GRPC_CHTTP2_NO_GOAWAY_SEND,
229 GRPC_CHTTP2_GRACEFUL_GOAWAY,
230 GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED,
231 GRPC_CHTTP2_FINAL_GOAWAY_SENT,
232 } grpc_chttp2_sent_goaway_state;
233
234 typedef struct grpc_chttp2_write_cb {
235 int64_t call_at_byte;
236 grpc_closure* closure;
237 struct grpc_chttp2_write_cb* next;
238 } grpc_chttp2_write_cb;
239
240 typedef enum {
241 GRPC_CHTTP2_KEEPALIVE_STATE_WAITING,
242 GRPC_CHTTP2_KEEPALIVE_STATE_PINGING,
243 GRPC_CHTTP2_KEEPALIVE_STATE_DYING,
244 GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED,
245 } grpc_chttp2_keepalive_state;
246
247 struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
248 grpc_chttp2_transport(const grpc_core::ChannelArgs& channel_args,
249 grpc_endpoint* ep, bool is_client);
250 ~grpc_chttp2_transport();
251
252 grpc_transport base; // must be first
253 grpc_core::RefCount refs;
254 grpc_endpoint* ep;
255 grpc_core::Slice peer_string;
256
257 grpc_core::MemoryOwner memory_owner;
258 const grpc_core::MemoryAllocator::Reservation self_reservation;
259 grpc_core::ReclamationSweep active_reclamation;
260
261 grpc_core::Combiner* combiner;
262
263 grpc_closure* notify_on_receive_settings = nullptr;
264 grpc_closure* notify_on_close = nullptr;
265
266 /// write execution state of the transport
267 grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE;
268
269 /// is the transport destroying itself?
270 uint8_t destroying = false;
271 /// has the upper layer closed the transport?
272 grpc_error_handle closed_with_error;
273
274 /// is there a read request to the endpoint outstanding?
275 uint8_t endpoint_reading = 1;
276
277 /// various lists of streams
278 grpc_chttp2_stream_list lists[STREAM_LIST_COUNT] = {};
279
280 /// maps stream id to grpc_chttp2_stream objects
281 grpc_chttp2_stream_map stream_map;
282
283 grpc_closure write_action_begin_locked;
284 grpc_closure write_action;
285 grpc_closure write_action_end_locked;
286
287 grpc_closure read_action_locked;
288
289 /// incoming read bytes
290 grpc_slice_buffer read_buffer;
291
292 /// address to place a newly accepted stream - set and unset by
293 /// grpc_chttp2_parsing_accept_stream; used by init_stream to
294 /// publish the accepted server stream
295 grpc_chttp2_stream** accepting_stream = nullptr;
296
297 // accept stream callback
298 void (*accept_stream_cb)(void* user_data, grpc_transport* transport,
299 const void* server_data);
300 void* accept_stream_cb_user_data;
301
302 /// connectivity tracking
303 grpc_core::ConnectivityStateTracker state_tracker;
304
305 /// data to write now
306 grpc_slice_buffer outbuf;
307 /// hpack encoding
308 grpc_core::HPackCompressor hpack_compressor;
309 /// is this a client?
310 bool is_client;
311
312 /// data to write next write
313 grpc_slice_buffer qbuf;
314
315 /// how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
316 ///
317 uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow;
318
319 /// Set to a grpc_error object if a goaway frame is received. By default, set
320 /// to absl::OkStatus()
321 grpc_error_handle goaway_error;
322
323 grpc_chttp2_sent_goaway_state sent_goaway_state = GRPC_CHTTP2_NO_GOAWAY_SEND;
324
325 /// are the local settings dirty and need to be sent?
326 bool dirtied_local_settings = true;
327 /// have local settings been sent?
328 bool sent_local_settings = false;
329 /// bitmask of setting indexes to send out
330 /// Hack: it's common for implementations to assume 65536 bytes initial send
331 /// window -- this should by rights be 0
332 uint32_t force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
333 /// settings values
334 uint32_t settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
335
336 /// what is the next stream id to be allocated by this peer?
337 /// copied to next_stream_id in parsing when parsing commences
338 uint32_t next_stream_id = 0;
339
340 /// last new stream id
341 uint32_t last_new_stream_id = 0;
342
343 /// ping queues for various ping insertion points
344 grpc_chttp2_ping_queue ping_queue = grpc_chttp2_ping_queue();
345 grpc_chttp2_repeated_ping_policy ping_policy;
346 grpc_chttp2_repeated_ping_state ping_state;
347 uint64_t ping_ctr = 0; // unique id for pings
348 grpc_closure retry_initiate_ping_locked;
349
350 /// ping acks
351 size_t ping_ack_count = 0;
352 size_t ping_ack_capacity = 0;
353 uint64_t* ping_acks = nullptr;
354 grpc_chttp2_server_ping_recv_state ping_recv_state;
355
356 /// parser for headers
357 grpc_core::HPackParser hpack_parser;
358 /// simple one shot parsers
359 union {
360 grpc_chttp2_window_update_parser window_update;
361 grpc_chttp2_settings_parser settings;
362 grpc_chttp2_ping_parser ping;
363 grpc_chttp2_rst_stream_parser rst_stream;
364 } simple;
365 /// parser for goaway frames
366 grpc_chttp2_goaway_parser goaway_parser;
367
368 grpc_core::chttp2::TransportFlowControl flow_control;
369 /// initial window change. This is tracked as we parse settings frames from
370 /// the remote peer. If there is a positive delta, then we will make all
371 /// streams readable since they may have become unstalled
372 int64_t initial_window_update = 0;
373
374 // deframing
375 grpc_chttp2_deframe_transport_state deframe_state = GRPC_DTS_CLIENT_PREFIX_0;
376 uint8_t incoming_frame_type = 0;
377 uint8_t incoming_frame_flags = 0;
378 uint8_t header_eof = 0;
379 bool is_first_frame = true;
380 uint32_t expect_continuation_stream_id = 0;
381 uint32_t incoming_frame_size = 0;
382 uint32_t incoming_stream_id = 0;
383
384 grpc_chttp2_stream* incoming_stream = nullptr;
385 // active parser
386 struct Parser {
387 const char* name;
388 grpc_error_handle (*parser)(void* parser_user_data,
389 grpc_chttp2_transport* t, grpc_chttp2_stream* s,
390 const grpc_slice& slice, int is_last);
391 void* user_data = nullptr;
392 };
393 Parser parser;
394
395 grpc_chttp2_write_cb* write_cb_pool = nullptr;
396
397 // bdp estimator
398 bool bdp_ping_blocked =
399 false; // Is the BDP blocked due to not receiving any data?
400 grpc_closure next_bdp_ping_timer_expired_locked;
401 grpc_closure start_bdp_ping_locked;
402 grpc_closure finish_bdp_ping_locked;
403
404 // if non-NULL, close the transport with this error when writes are finished
405 grpc_error_handle close_transport_on_writes_finished;
406
407 // a list of closures to run after writes are finished
408 grpc_closure_list run_after_write = GRPC_CLOSURE_LIST_INIT;
409
410 // buffer pool state
411 /// have we scheduled a benign cleanup?
412 bool benign_reclaimer_registered = false;
413 /// have we scheduled a destructive cleanup?
414 bool destructive_reclaimer_registered = false;
415 /// benign cleanup closure
416 grpc_closure benign_reclaimer_locked;
417 /// destructive cleanup closure
418 grpc_closure destructive_reclaimer_locked;
419
420 /// If start_bdp_ping_locked has been called
421 bool bdp_ping_started = false;
422 // True if pings should be acked
423 bool ack_pings = true;
424 // next bdp ping timer handle
425 absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
426 next_bdp_ping_timer_handle;
427
428 // keep-alive ping support
429 /// Closure to initialize a keepalive ping
430 grpc_closure init_keepalive_ping_locked;
431 /// Closure to run when the keepalive ping is sent
432 grpc_closure start_keepalive_ping_locked;
433 /// Closure to run when the keepalive ping ack is received
434 grpc_closure finish_keepalive_ping_locked;
435 /// Closure to run when the keepalive ping timeouts
436 grpc_closure keepalive_watchdog_fired_locked;
437 /// timer to initiate ping events
438 absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
439 keepalive_ping_timer_handle;
440 /// watchdog to kill the transport when waiting for the keepalive ping
441 absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
442 keepalive_watchdog_timer_handle;
443 /// time duration in between pings
444 grpc_core::Duration keepalive_time;
445 /// grace period for a ping to complete before watchdog kicks in
446 grpc_core::Duration keepalive_timeout;
447 /// if keepalive pings are allowed when there's no outstanding streams
448 bool keepalive_permit_without_calls = false;
449 /// If start_keepalive_ping_locked has been called
450 bool keepalive_ping_started = false;
451 /// keep-alive state machine state
452 grpc_chttp2_keepalive_state keepalive_state;
453 // Soft limit on max header size.
454 uint32_t max_header_list_size_soft_limit = 0;
455 grpc_core::ContextList* cl = nullptr;
456 grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> channelz_socket;
457 uint32_t num_messages_in_next_write = 0;
458 /// The number of pending induced frames (SETTINGS_ACK, PINGS_ACK and
459 /// RST_STREAM) in the outgoing buffer (t->qbuf). If this number goes beyond
460 /// DEFAULT_MAX_PENDING_INDUCED_FRAMES, we pause reading new frames. We would
461 /// only continue reading when we are able to write to the socket again,
462 /// thereby reducing the number of induced frames.
463 uint32_t num_pending_induced_frames = 0;
464 bool reading_paused_on_pending_induced_frames = false;
465 /// Based on channel args, preferred_rx_crypto_frame_sizes are advertised to
466 /// the peer
467 bool enable_preferred_rx_crypto_frame_advertisement = false;
468 /// Set to non zero if closures associated with the transport may be
469 /// covering a write in a pollset. Such closures cannot be scheduled until
470 /// we can prove that the write got scheduled.
471 uint8_t closure_barrier_may_cover_write = CLOSURE_BARRIER_MAY_COVER_WRITE;
472
473 std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine;
474 };
475
476 typedef enum {
477 GRPC_METADATA_NOT_PUBLISHED,
478 GRPC_METADATA_SYNTHESIZED_FROM_FAKE,
479 GRPC_METADATA_PUBLISHED_FROM_WIRE,
480 GRPC_METADATA_PUBLISHED_AT_CLOSE
481 } grpc_published_metadata_method;
482
483 struct grpc_chttp2_stream {
484 grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_stream_refcount* refcount,
485 const void* server_data, grpc_core::Arena* arena);
486 ~grpc_chttp2_stream();
487
488 void* context;
489 grpc_chttp2_transport* t;
490 grpc_stream_refcount* refcount;
491 // Reffer is a 0-len structure, simply reffing `t` and `refcount` in its ctor
492 // before initializing the rest of the stream, to avoid cache misses. This
493 // field MUST be right after `t` and `refcount`.
494 struct Reffer {
495 explicit Reffer(grpc_chttp2_stream* s);
496 } reffer;
497
498 grpc_closure destroy_stream;
499 grpc_closure* destroy_stream_arg;
500
501 grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
502 grpc_core::BitSet<STREAM_LIST_COUNT> included;
503
504 /// HTTP2 stream id for this stream, or zero if one has not been assigned
505 uint32_t id = 0;
506
507 /// things the upper layers would like to send
508 grpc_metadata_batch* send_initial_metadata = nullptr;
509 grpc_closure* send_initial_metadata_finished = nullptr;
510 grpc_metadata_batch* send_trailing_metadata = nullptr;
511 // TODO(yashykt): Find a better name for the below field and others in this
512 // struct to betteer distinguish inputs, return values, and
513 // internal state.
514 // sent_trailing_metadata_op allows the transport to fill in to the upper
515 // layer whether this stream was able to send its trailing metadata (used for
516 // detecting cancellation on the server-side)..
517 bool* sent_trailing_metadata_op = nullptr;
518 grpc_closure* send_trailing_metadata_finished = nullptr;
519
520 int64_t next_message_end_offset;
521 int64_t flow_controlled_bytes_written = 0;
522 int64_t flow_controlled_bytes_flowed = 0;
523 grpc_closure* send_message_finished = nullptr;
524
525 grpc_metadata_batch* recv_initial_metadata;
526 grpc_closure* recv_initial_metadata_ready = nullptr;
527 bool* trailing_metadata_available = nullptr;
528 bool parsed_trailers_only = false;
529 absl::optional<grpc_core::SliceBuffer>* recv_message = nullptr;
530 uint32_t* recv_message_flags = nullptr;
531 bool* call_failed_before_recv_message = nullptr;
532 grpc_closure* recv_message_ready = nullptr;
533 grpc_metadata_batch* recv_trailing_metadata;
534 grpc_closure* recv_trailing_metadata_finished = nullptr;
535
536 grpc_transport_stream_stats* collecting_stats = nullptr;
537 grpc_transport_stream_stats stats = grpc_transport_stream_stats();
538
539 /// Is this stream closed for writing.
540 bool write_closed = false;
541 /// Is this stream reading half-closed.
542 bool read_closed = false;
543 /// Are all published incoming byte streams closed.
544 bool all_incoming_byte_streams_finished = false;
545 /// Has this stream seen an error.
546 /// If true, then pending incoming frames can be thrown away.
547 bool seen_error = false;
548 /// Are we buffering writes on this stream? If yes, we won't become writable
549 /// until there's enough queued up in the flow_controlled_buffer
550 bool write_buffering = false;
551
552 // have we sent or received the EOS bit?
553 bool eos_received = false;
554 bool eos_sent = false;
555
556 /// the error that resulted in this stream being read-closed
557 grpc_error_handle read_closed_error;
558 /// the error that resulted in this stream being write-closed
559 grpc_error_handle write_closed_error;
560
561 grpc_published_metadata_method published_metadata[2] = {};
562 bool final_metadata_requested = false;
563
564 grpc_metadata_batch initial_metadata_buffer;
565 grpc_metadata_batch trailing_metadata_buffer;
566
567 grpc_slice_buffer frame_storage; // protected by t combiner
568 bool received_last_frame = false; // protected by t combiner
569
570 grpc_core::Timestamp deadline = grpc_core::Timestamp::InfFuture();
571
572 /// how many header frames have we received?
573 uint8_t header_frames_received = 0;
574 /// number of bytes received - reset at end of parse thread execution
575 int64_t received_bytes = 0;
576
577 bool sent_initial_metadata = false;
578 bool sent_trailing_metadata = false;
579
580 grpc_core::chttp2::StreamFlowControl flow_control;
581
582 grpc_slice_buffer flow_controlled_buffer;
583
584 grpc_chttp2_write_cb* on_flow_controlled_cbs = nullptr;
585 grpc_chttp2_write_cb* on_write_finished_cbs = nullptr;
586 grpc_chttp2_write_cb* finish_after_write = nullptr;
587 size_t sending_bytes = 0;
588
589 /// Whether the bytes needs to be traced using Fathom
590 bool traced = false;
591 /// Byte counter for number of bytes written
592 size_t byte_counter = 0;
593
594 // time this stream was created
595 gpr_timespec creation_time = gpr_now(GPR_CLOCK_MONOTONIC);
596 };
597
598 /// Transport writing call flow:
599 /// grpc_chttp2_initiate_write() is called anywhere that we know bytes need to
600 /// go out on the wire.
601 /// If no other write has been started, a task is enqueued onto our workqueue.
602 /// When that task executes, it obtains the global lock, and gathers the data
603 /// to write.
604 /// The global lock is dropped and we do the syscall to write.
605 /// After writing, a follow-up check is made to see if another round of writing
606 /// should be performed.
607
608 /// The actual call chain is documented in the implementation of this function.
609 ///
610 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
611 grpc_chttp2_initiate_write_reason reason);
612
613 struct grpc_chttp2_begin_write_result {
614 /// are we writing?
615 bool writing;
616 /// if writing: was it a complete flush (false) or a partial flush (true)
617 bool partial;
618 /// did we queue any completions as part of beginning the write
619 bool early_results_scheduled;
620 };
621 grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
622 grpc_chttp2_transport* t);
623 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error_handle error);
624
625 /// Process one slice of incoming data; return 1 if the connection is still
626 /// viable after reading, or 0 if the connection should be torn down
627 grpc_error_handle grpc_chttp2_perform_read(grpc_chttp2_transport* t,
628 const grpc_slice& slice);
629
630 bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport* t,
631 grpc_chttp2_stream* s);
632 /// Get a writable stream
633 /// returns non-zero if there was a stream available
634 bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport* t,
635 grpc_chttp2_stream** s);
636 bool grpc_chttp2_list_remove_writable_stream(grpc_chttp2_transport* t,
637 grpc_chttp2_stream* s);
638
639 bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport* t,
640 grpc_chttp2_stream* s);
641 bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport* t);
642 bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport* t,
643 grpc_chttp2_stream** s);
644
645 void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport* t,
646 grpc_chttp2_stream* s);
647 bool grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport* t,
648 grpc_chttp2_stream** s);
649
650 void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport* t,
651 grpc_chttp2_stream* s);
652 bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport* t,
653 grpc_chttp2_stream** s);
654 void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport* t,
655 grpc_chttp2_stream* s);
656
657 void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport* t,
658 grpc_chttp2_stream* s);
659 bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport* t,
660 grpc_chttp2_stream** s);
661 void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport* t,
662 grpc_chttp2_stream* s);
663
664 void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport* t,
665 grpc_chttp2_stream* s);
666 bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport* t,
667 grpc_chttp2_stream** s);
668 bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport* t,
669 grpc_chttp2_stream* s);
670
671 //******** Flow Control **************
672
673 // Takes in a flow control action and performs all the needed operations.
674 void grpc_chttp2_act_on_flowctl_action(
675 const grpc_core::chttp2::FlowControlAction& action,
676 grpc_chttp2_transport* t, grpc_chttp2_stream* s);
677
678 //******** End of Flow Control **************
679
grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport * t,uint32_t id)680 inline grpc_chttp2_stream* grpc_chttp2_parsing_lookup_stream(
681 grpc_chttp2_transport* t, uint32_t id) {
682 return static_cast<grpc_chttp2_stream*>(
683 grpc_chttp2_stream_map_find(&t->stream_map, id));
684 }
685 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
686 uint32_t id);
687
688 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
689 uint32_t goaway_error,
690 uint32_t last_stream_id,
691 absl::string_view goaway_text);
692
693 void grpc_chttp2_parsing_become_skip_parser(grpc_chttp2_transport* t);
694
695 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
696 grpc_chttp2_stream* s,
697 grpc_closure** pclosure,
698 grpc_error_handle error,
699 const char* desc,
700 grpc_core::DebugLocation whence = {});
701
702 #define GRPC_HEADER_SIZE_IN_BYTES 5
703 #define MAX_SIZE_T (~(size_t)0)
704
705 #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
706 #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
707 (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
708
709 // extern grpc_core::TraceFlag grpc_flowctl_trace;
710
711 #define GRPC_CHTTP2_IF_TRACING(stmt) \
712 do { \
713 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { \
714 (stmt); \
715 } \
716 } while (0)
717
718 void grpc_chttp2_fake_status(grpc_chttp2_transport* t,
719 grpc_chttp2_stream* stream,
720 grpc_error_handle error);
721 void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
722 grpc_chttp2_stream* s, int close_reads,
723 int close_writes, grpc_error_handle error);
724 void grpc_chttp2_start_writing(grpc_chttp2_transport* t);
725
726 #ifndef NDEBUG
727 #define GRPC_CHTTP2_STREAM_REF(stream, reason) \
728 grpc_chttp2_stream_ref(stream, reason)
729 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \
730 grpc_chttp2_stream_unref(stream, reason)
731 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason);
732 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason);
733 #else
734 #define GRPC_CHTTP2_STREAM_REF(stream, reason) grpc_chttp2_stream_ref(stream)
735 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \
736 grpc_chttp2_stream_unref(stream)
737 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s);
738 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s);
739 #endif
740
741 #ifndef NDEBUG
742 #define GRPC_CHTTP2_REF_TRANSPORT(t, r) \
743 grpc_chttp2_ref_transport(t, r, __FILE__, __LINE__)
744 #define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) \
745 grpc_chttp2_unref_transport(t, r, __FILE__, __LINE__)
grpc_chttp2_unref_transport(grpc_chttp2_transport * t,const char * reason,const char * file,int line)746 inline void grpc_chttp2_unref_transport(grpc_chttp2_transport* t,
747 const char* reason, const char* file,
748 int line) {
749 if (t->refs.Unref(grpc_core::DebugLocation(file, line), reason)) {
750 delete t;
751 }
752 }
grpc_chttp2_ref_transport(grpc_chttp2_transport * t,const char * reason,const char * file,int line)753 inline void grpc_chttp2_ref_transport(grpc_chttp2_transport* t,
754 const char* reason, const char* file,
755 int line) {
756 t->refs.Ref(grpc_core::DebugLocation(file, line), reason);
757 }
758 #else
759 #define GRPC_CHTTP2_REF_TRANSPORT(t, r) grpc_chttp2_ref_transport(t)
760 #define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) grpc_chttp2_unref_transport(t)
grpc_chttp2_unref_transport(grpc_chttp2_transport * t)761 inline void grpc_chttp2_unref_transport(grpc_chttp2_transport* t) {
762 if (t->refs.Unref()) {
763 delete t;
764 }
765 }
grpc_chttp2_ref_transport(grpc_chttp2_transport * t)766 inline void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) {
767 t->refs.Ref();
768 }
769 #endif
770
771 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id);
772
773 /// Add a new ping strike to ping_recv_state.ping_strikes. If
774 /// ping_recv_state.ping_strikes > ping_policy.max_ping_strikes, it sends GOAWAY
775 /// with error code ENHANCE_YOUR_CALM and additional debug data resembling
776 /// "too_many_pings" followed by immediately closing the connection.
777 void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t);
778
779 /// Resets ping clock. Should be called when flushing window updates,
780 /// initial/trailing metadata or data frames. For a server, it resets the number
781 /// of ping strikes and the last_ping_recv_time. For a ping sender, it resets
782 /// pings_before_data_required.
783 void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t);
784
785 /// add a ref to the stream and add it to the writable list;
786 /// ref will be dropped in writing.c
787 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
788 grpc_chttp2_stream* s);
789
790 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
791 grpc_error_handle due_to_error);
792
793 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
794 grpc_chttp2_stream* s);
795 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
796 grpc_chttp2_stream* s);
797 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
798 grpc_chttp2_stream* s);
799
800 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
801 grpc_chttp2_stream* s,
802 grpc_error_handle error);
803
804 /// Set the default keepalive configurations, must only be called at
805 /// initialization
806 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
807 bool is_client);
808
809 void grpc_chttp2_retry_initiate_ping(grpc_chttp2_transport* t);
810
811 void schedule_bdp_ping_locked(grpc_chttp2_transport* t);
812
813 uint32_t grpc_chttp2_min_read_progress_size(grpc_chttp2_transport* t);
814
815 #endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
816