1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
20 #define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <stddef.h>
25 #include <stdint.h>
26 
27 #include <memory>
28 
29 #include "absl/strings/string_view.h"
30 #include "absl/types/optional.h"
31 
32 #include <grpc/event_engine/event_engine.h>
33 #include <grpc/event_engine/memory_allocator.h>
34 #include <grpc/grpc.h>
35 #include <grpc/slice.h>
36 #include <grpc/support/time.h>
37 
38 #include "src/core/ext/transport/chttp2/transport/context_list_entry.h"
39 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
40 #include "src/core/ext/transport/chttp2/transport/frame.h"
41 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
42 #include "src/core/ext/transport/chttp2/transport/frame_ping.h"
43 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
44 #include "src/core/ext/transport/chttp2/transport/frame_settings.h"
45 #include "src/core/ext/transport/chttp2/transport/frame_window_update.h"
46 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
47 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
48 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
49 #include "src/core/ext/transport/chttp2/transport/stream_map.h"
50 #include "src/core/lib/channel/channel_args.h"
51 #include "src/core/lib/channel/channelz.h"
52 #include "src/core/lib/debug/trace.h"
53 #include "src/core/lib/gprpp/bitset.h"
54 #include "src/core/lib/gprpp/debug_location.h"
55 #include "src/core/lib/gprpp/ref_counted.h"
56 #include "src/core/lib/gprpp/ref_counted_ptr.h"
57 #include "src/core/lib/gprpp/time.h"
58 #include "src/core/lib/iomgr/closure.h"
59 #include "src/core/lib/iomgr/combiner.h"
60 #include "src/core/lib/iomgr/endpoint.h"
61 #include "src/core/lib/iomgr/error.h"
62 #include "src/core/lib/resource_quota/arena.h"
63 #include "src/core/lib/resource_quota/memory_quota.h"
64 #include "src/core/lib/slice/slice.h"
65 #include "src/core/lib/slice/slice_buffer.h"
66 #include "src/core/lib/surface/init_internally.h"
67 #include "src/core/lib/transport/connectivity_state.h"
68 #include "src/core/lib/transport/metadata_batch.h"
69 #include "src/core/lib/transport/transport.h"
70 #include "src/core/lib/transport/transport_fwd.h"
71 #include "src/core/lib/transport/transport_impl.h"
72 
73 // Flag that this closure barrier may be covering a write in a pollset, and so
74 //   we should not complete this closure until we can prove that the write got
75 //   scheduled
76 #define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
77 // First bit of the reference count, stored in the high order bits (with the low
78 //   bits being used for flags defined above)
79 #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
80 
81 // streams are kept in various linked lists depending on what things need to
82 // happen to them... this enum labels each list
83 typedef enum {
84   // If a stream is in the following two lists, an explicit ref is associated
85   // with the stream
86   GRPC_CHTTP2_LIST_WRITABLE,
87   GRPC_CHTTP2_LIST_WRITING,
88   // No additional ref is taken for the following refs. Make sure to remove the
89   // stream from these lists when the stream is removed.
90   GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
91   GRPC_CHTTP2_LIST_STALLED_BY_STREAM,
92   /// streams that are waiting to start because there are too many concurrent
93   /// streams on the connection
94   GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
95   STREAM_LIST_COUNT  // must be last
96 } grpc_chttp2_stream_list_id;
97 
98 typedef enum {
99   GRPC_CHTTP2_WRITE_STATE_IDLE,
100   GRPC_CHTTP2_WRITE_STATE_WRITING,
101   GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
102 } grpc_chttp2_write_state;
103 
104 typedef enum {
105   GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY,
106   GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT,
107 } grpc_chttp2_optimization_target;
108 
109 typedef enum {
110   GRPC_CHTTP2_PCL_INITIATE = 0,
111   GRPC_CHTTP2_PCL_NEXT,
112   GRPC_CHTTP2_PCL_INFLIGHT,
113   GRPC_CHTTP2_PCL_COUNT  // must be last
114 } grpc_chttp2_ping_closure_list;
115 
116 typedef enum {
117   GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE,
118   GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM,
119   GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE,
120   GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA,
121   GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA,
122   GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING,
123   GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS,
124   GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT,
125   GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM,
126   GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API,
127   GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
128   GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL,
129   GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS,
130   GRPC_CHTTP2_INITIATE_WRITE_SETTINGS_ACK,
131   GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING,
132   GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE,
133   GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING,
134   GRPC_CHTTP2_INITIATE_WRITE_BDP_PING,
135   GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING,
136   GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED,
137   GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE,
138   GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM,
139 } grpc_chttp2_initiate_write_reason;
140 
141 const char* grpc_chttp2_initiate_write_reason_string(
142     grpc_chttp2_initiate_write_reason reason);
143 
144 struct grpc_chttp2_ping_queue {
145   grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT] = {};
146   uint64_t inflight_id = 0;
147 };
148 struct grpc_chttp2_repeated_ping_policy {
149   int max_pings_without_data;
150   int max_ping_strikes;
151   grpc_core::Duration min_recv_ping_interval_without_data;
152 };
153 struct grpc_chttp2_repeated_ping_state {
154   grpc_core::Timestamp last_ping_sent_time;
155   int pings_before_data_required;
156   absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
157       delayed_ping_timer_handle;
158 };
159 struct grpc_chttp2_server_ping_recv_state {
160   grpc_core::Timestamp last_ping_recv_time;
161   int ping_strikes;
162 };
163 // deframer state for the overall http2 stream of bytes
164 typedef enum {
165   // prefix: one entry per http2 connection prefix byte
166   GRPC_DTS_CLIENT_PREFIX_0 = 0,
167   GRPC_DTS_CLIENT_PREFIX_1,
168   GRPC_DTS_CLIENT_PREFIX_2,
169   GRPC_DTS_CLIENT_PREFIX_3,
170   GRPC_DTS_CLIENT_PREFIX_4,
171   GRPC_DTS_CLIENT_PREFIX_5,
172   GRPC_DTS_CLIENT_PREFIX_6,
173   GRPC_DTS_CLIENT_PREFIX_7,
174   GRPC_DTS_CLIENT_PREFIX_8,
175   GRPC_DTS_CLIENT_PREFIX_9,
176   GRPC_DTS_CLIENT_PREFIX_10,
177   GRPC_DTS_CLIENT_PREFIX_11,
178   GRPC_DTS_CLIENT_PREFIX_12,
179   GRPC_DTS_CLIENT_PREFIX_13,
180   GRPC_DTS_CLIENT_PREFIX_14,
181   GRPC_DTS_CLIENT_PREFIX_15,
182   GRPC_DTS_CLIENT_PREFIX_16,
183   GRPC_DTS_CLIENT_PREFIX_17,
184   GRPC_DTS_CLIENT_PREFIX_18,
185   GRPC_DTS_CLIENT_PREFIX_19,
186   GRPC_DTS_CLIENT_PREFIX_20,
187   GRPC_DTS_CLIENT_PREFIX_21,
188   GRPC_DTS_CLIENT_PREFIX_22,
189   GRPC_DTS_CLIENT_PREFIX_23,
190   // frame header byte 0...
191   // must follow from the prefix states
192   GRPC_DTS_FH_0,
193   GRPC_DTS_FH_1,
194   GRPC_DTS_FH_2,
195   GRPC_DTS_FH_3,
196   GRPC_DTS_FH_4,
197   GRPC_DTS_FH_5,
198   GRPC_DTS_FH_6,
199   GRPC_DTS_FH_7,
200   // ... frame header byte 8
201   GRPC_DTS_FH_8,
202   // inside a http2 frame
203   GRPC_DTS_FRAME
204 } grpc_chttp2_deframe_transport_state;
205 
206 struct grpc_chttp2_stream_list {
207   grpc_chttp2_stream* head;
208   grpc_chttp2_stream* tail;
209 };
210 struct grpc_chttp2_stream_link {
211   grpc_chttp2_stream* next;
212   grpc_chttp2_stream* prev;
213 };
214 // We keep several sets of connection wide parameters
215 typedef enum {
216   // The settings our peer has asked for (and we have acked)
217   GRPC_PEER_SETTINGS = 0,
218   // The settings we'd like to have
219   GRPC_LOCAL_SETTINGS,
220   // The settings we've published to our peer
221   GRPC_SENT_SETTINGS,
222   // The settings the peer has acked
223   GRPC_ACKED_SETTINGS,
224   GRPC_NUM_SETTING_SETS
225 } grpc_chttp2_setting_set;
226 
227 typedef enum {
228   GRPC_CHTTP2_NO_GOAWAY_SEND,
229   GRPC_CHTTP2_GRACEFUL_GOAWAY,
230   GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED,
231   GRPC_CHTTP2_FINAL_GOAWAY_SENT,
232 } grpc_chttp2_sent_goaway_state;
233 
234 typedef struct grpc_chttp2_write_cb {
235   int64_t call_at_byte;
236   grpc_closure* closure;
237   struct grpc_chttp2_write_cb* next;
238 } grpc_chttp2_write_cb;
239 
240 typedef enum {
241   GRPC_CHTTP2_KEEPALIVE_STATE_WAITING,
242   GRPC_CHTTP2_KEEPALIVE_STATE_PINGING,
243   GRPC_CHTTP2_KEEPALIVE_STATE_DYING,
244   GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED,
245 } grpc_chttp2_keepalive_state;
246 
247 struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
248   grpc_chttp2_transport(const grpc_core::ChannelArgs& channel_args,
249                         grpc_endpoint* ep, bool is_client);
250   ~grpc_chttp2_transport();
251 
252   grpc_transport base;  // must be first
253   grpc_core::RefCount refs;
254   grpc_endpoint* ep;
255   grpc_core::Slice peer_string;
256 
257   grpc_core::MemoryOwner memory_owner;
258   const grpc_core::MemoryAllocator::Reservation self_reservation;
259   grpc_core::ReclamationSweep active_reclamation;
260 
261   grpc_core::Combiner* combiner;
262 
263   grpc_closure* notify_on_receive_settings = nullptr;
264   grpc_closure* notify_on_close = nullptr;
265 
266   /// write execution state of the transport
267   grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE;
268 
269   /// is the transport destroying itself?
270   uint8_t destroying = false;
271   /// has the upper layer closed the transport?
272   grpc_error_handle closed_with_error;
273 
274   /// is there a read request to the endpoint outstanding?
275   uint8_t endpoint_reading = 1;
276 
277   /// various lists of streams
278   grpc_chttp2_stream_list lists[STREAM_LIST_COUNT] = {};
279 
280   /// maps stream id to grpc_chttp2_stream objects
281   grpc_chttp2_stream_map stream_map;
282 
283   grpc_closure write_action_begin_locked;
284   grpc_closure write_action;
285   grpc_closure write_action_end_locked;
286 
287   grpc_closure read_action_locked;
288 
289   /// incoming read bytes
290   grpc_slice_buffer read_buffer;
291 
292   /// address to place a newly accepted stream - set and unset by
293   /// grpc_chttp2_parsing_accept_stream; used by init_stream to
294   /// publish the accepted server stream
295   grpc_chttp2_stream** accepting_stream = nullptr;
296 
297   // accept stream callback
298   void (*accept_stream_cb)(void* user_data, grpc_transport* transport,
299                            const void* server_data);
300   void* accept_stream_cb_user_data;
301 
302   /// connectivity tracking
303   grpc_core::ConnectivityStateTracker state_tracker;
304 
305   /// data to write now
306   grpc_slice_buffer outbuf;
307   /// hpack encoding
308   grpc_core::HPackCompressor hpack_compressor;
309   /// is this a client?
310   bool is_client;
311 
312   /// data to write next write
313   grpc_slice_buffer qbuf;
314 
315   /// how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
316   ///
317   uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow;
318 
319   /// Set to a grpc_error object if a goaway frame is received. By default, set
320   /// to absl::OkStatus()
321   grpc_error_handle goaway_error;
322 
323   grpc_chttp2_sent_goaway_state sent_goaway_state = GRPC_CHTTP2_NO_GOAWAY_SEND;
324 
325   /// are the local settings dirty and need to be sent?
326   bool dirtied_local_settings = true;
327   /// have local settings been sent?
328   bool sent_local_settings = false;
329   /// bitmask of setting indexes to send out
330   /// Hack: it's common for implementations to assume 65536 bytes initial send
331   /// window -- this should by rights be 0
332   uint32_t force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
333   /// settings values
334   uint32_t settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
335 
336   /// what is the next stream id to be allocated by this peer?
337   /// copied to next_stream_id in parsing when parsing commences
338   uint32_t next_stream_id = 0;
339 
340   /// last new stream id
341   uint32_t last_new_stream_id = 0;
342 
343   /// ping queues for various ping insertion points
344   grpc_chttp2_ping_queue ping_queue = grpc_chttp2_ping_queue();
345   grpc_chttp2_repeated_ping_policy ping_policy;
346   grpc_chttp2_repeated_ping_state ping_state;
347   uint64_t ping_ctr = 0;  // unique id for pings
348   grpc_closure retry_initiate_ping_locked;
349 
350   /// ping acks
351   size_t ping_ack_count = 0;
352   size_t ping_ack_capacity = 0;
353   uint64_t* ping_acks = nullptr;
354   grpc_chttp2_server_ping_recv_state ping_recv_state;
355 
356   /// parser for headers
357   grpc_core::HPackParser hpack_parser;
358   /// simple one shot parsers
359   union {
360     grpc_chttp2_window_update_parser window_update;
361     grpc_chttp2_settings_parser settings;
362     grpc_chttp2_ping_parser ping;
363     grpc_chttp2_rst_stream_parser rst_stream;
364   } simple;
365   /// parser for goaway frames
366   grpc_chttp2_goaway_parser goaway_parser;
367 
368   grpc_core::chttp2::TransportFlowControl flow_control;
369   /// initial window change. This is tracked as we parse settings frames from
370   /// the remote peer. If there is a positive delta, then we will make all
371   /// streams readable since they may have become unstalled
372   int64_t initial_window_update = 0;
373 
374   // deframing
375   grpc_chttp2_deframe_transport_state deframe_state = GRPC_DTS_CLIENT_PREFIX_0;
376   uint8_t incoming_frame_type = 0;
377   uint8_t incoming_frame_flags = 0;
378   uint8_t header_eof = 0;
379   bool is_first_frame = true;
380   uint32_t expect_continuation_stream_id = 0;
381   uint32_t incoming_frame_size = 0;
382   uint32_t incoming_stream_id = 0;
383 
384   grpc_chttp2_stream* incoming_stream = nullptr;
385   // active parser
386   struct Parser {
387     const char* name;
388     grpc_error_handle (*parser)(void* parser_user_data,
389                                 grpc_chttp2_transport* t, grpc_chttp2_stream* s,
390                                 const grpc_slice& slice, int is_last);
391     void* user_data = nullptr;
392   };
393   Parser parser;
394 
395   grpc_chttp2_write_cb* write_cb_pool = nullptr;
396 
397   // bdp estimator
398   bool bdp_ping_blocked =
399       false;  // Is the BDP blocked due to not receiving any data?
400   grpc_closure next_bdp_ping_timer_expired_locked;
401   grpc_closure start_bdp_ping_locked;
402   grpc_closure finish_bdp_ping_locked;
403 
404   // if non-NULL, close the transport with this error when writes are finished
405   grpc_error_handle close_transport_on_writes_finished;
406 
407   // a list of closures to run after writes are finished
408   grpc_closure_list run_after_write = GRPC_CLOSURE_LIST_INIT;
409 
410   // buffer pool state
411   /// have we scheduled a benign cleanup?
412   bool benign_reclaimer_registered = false;
413   /// have we scheduled a destructive cleanup?
414   bool destructive_reclaimer_registered = false;
415   /// benign cleanup closure
416   grpc_closure benign_reclaimer_locked;
417   /// destructive cleanup closure
418   grpc_closure destructive_reclaimer_locked;
419 
420   /// If start_bdp_ping_locked has been called
421   bool bdp_ping_started = false;
422   // True if pings should be acked
423   bool ack_pings = true;
424   // next bdp ping timer handle
425   absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
426       next_bdp_ping_timer_handle;
427 
428   // keep-alive ping support
429   /// Closure to initialize a keepalive ping
430   grpc_closure init_keepalive_ping_locked;
431   /// Closure to run when the keepalive ping is sent
432   grpc_closure start_keepalive_ping_locked;
433   /// Closure to run when the keepalive ping ack is received
434   grpc_closure finish_keepalive_ping_locked;
435   /// Closure to run when the keepalive ping timeouts
436   grpc_closure keepalive_watchdog_fired_locked;
437   /// timer to initiate ping events
438   absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
439       keepalive_ping_timer_handle;
440   /// watchdog to kill the transport when waiting for the keepalive ping
441   absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
442       keepalive_watchdog_timer_handle;
443   /// time duration in between pings
444   grpc_core::Duration keepalive_time;
445   /// grace period for a ping to complete before watchdog kicks in
446   grpc_core::Duration keepalive_timeout;
447   /// if keepalive pings are allowed when there's no outstanding streams
448   bool keepalive_permit_without_calls = false;
449   /// If start_keepalive_ping_locked has been called
450   bool keepalive_ping_started = false;
451   /// keep-alive state machine state
452   grpc_chttp2_keepalive_state keepalive_state;
453   // Soft limit on max header size.
454   uint32_t max_header_list_size_soft_limit = 0;
455   grpc_core::ContextList* cl = nullptr;
456   grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> channelz_socket;
457   uint32_t num_messages_in_next_write = 0;
458   /// The number of pending induced frames (SETTINGS_ACK, PINGS_ACK and
459   /// RST_STREAM) in the outgoing buffer (t->qbuf). If this number goes beyond
460   /// DEFAULT_MAX_PENDING_INDUCED_FRAMES, we pause reading new frames. We would
461   /// only continue reading when we are able to write to the socket again,
462   /// thereby reducing the number of induced frames.
463   uint32_t num_pending_induced_frames = 0;
464   bool reading_paused_on_pending_induced_frames = false;
465   /// Based on channel args, preferred_rx_crypto_frame_sizes are advertised to
466   /// the peer
467   bool enable_preferred_rx_crypto_frame_advertisement = false;
468   /// Set to non zero if closures associated with the transport may be
469   /// covering a write in a pollset. Such closures cannot be scheduled until
470   /// we can prove that the write got scheduled.
471   uint8_t closure_barrier_may_cover_write = CLOSURE_BARRIER_MAY_COVER_WRITE;
472 
473   std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine;
474 };
475 
476 typedef enum {
477   GRPC_METADATA_NOT_PUBLISHED,
478   GRPC_METADATA_SYNTHESIZED_FROM_FAKE,
479   GRPC_METADATA_PUBLISHED_FROM_WIRE,
480   GRPC_METADATA_PUBLISHED_AT_CLOSE
481 } grpc_published_metadata_method;
482 
483 struct grpc_chttp2_stream {
484   grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_stream_refcount* refcount,
485                      const void* server_data, grpc_core::Arena* arena);
486   ~grpc_chttp2_stream();
487 
488   void* context;
489   grpc_chttp2_transport* t;
490   grpc_stream_refcount* refcount;
491   // Reffer is a 0-len structure, simply reffing `t` and `refcount` in its ctor
492   // before initializing the rest of the stream, to avoid cache misses. This
493   // field MUST be right after `t` and `refcount`.
494   struct Reffer {
495     explicit Reffer(grpc_chttp2_stream* s);
496   } reffer;
497 
498   grpc_closure destroy_stream;
499   grpc_closure* destroy_stream_arg;
500 
501   grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
502   grpc_core::BitSet<STREAM_LIST_COUNT> included;
503 
504   /// HTTP2 stream id for this stream, or zero if one has not been assigned
505   uint32_t id = 0;
506 
507   /// things the upper layers would like to send
508   grpc_metadata_batch* send_initial_metadata = nullptr;
509   grpc_closure* send_initial_metadata_finished = nullptr;
510   grpc_metadata_batch* send_trailing_metadata = nullptr;
511   // TODO(yashykt): Find a better name for the below field and others in this
512   //                struct to betteer distinguish inputs, return values, and
513   //                internal state.
514   // sent_trailing_metadata_op allows the transport to fill in to the upper
515   // layer whether this stream was able to send its trailing metadata (used for
516   // detecting cancellation on the server-side)..
517   bool* sent_trailing_metadata_op = nullptr;
518   grpc_closure* send_trailing_metadata_finished = nullptr;
519 
520   int64_t next_message_end_offset;
521   int64_t flow_controlled_bytes_written = 0;
522   int64_t flow_controlled_bytes_flowed = 0;
523   grpc_closure* send_message_finished = nullptr;
524 
525   grpc_metadata_batch* recv_initial_metadata;
526   grpc_closure* recv_initial_metadata_ready = nullptr;
527   bool* trailing_metadata_available = nullptr;
528   bool parsed_trailers_only = false;
529   absl::optional<grpc_core::SliceBuffer>* recv_message = nullptr;
530   uint32_t* recv_message_flags = nullptr;
531   bool* call_failed_before_recv_message = nullptr;
532   grpc_closure* recv_message_ready = nullptr;
533   grpc_metadata_batch* recv_trailing_metadata;
534   grpc_closure* recv_trailing_metadata_finished = nullptr;
535 
536   grpc_transport_stream_stats* collecting_stats = nullptr;
537   grpc_transport_stream_stats stats = grpc_transport_stream_stats();
538 
539   /// Is this stream closed for writing.
540   bool write_closed = false;
541   /// Is this stream reading half-closed.
542   bool read_closed = false;
543   /// Are all published incoming byte streams closed.
544   bool all_incoming_byte_streams_finished = false;
545   /// Has this stream seen an error.
546   /// If true, then pending incoming frames can be thrown away.
547   bool seen_error = false;
548   /// Are we buffering writes on this stream? If yes, we won't become writable
549   /// until there's enough queued up in the flow_controlled_buffer
550   bool write_buffering = false;
551 
552   // have we sent or received the EOS bit?
553   bool eos_received = false;
554   bool eos_sent = false;
555 
556   /// the error that resulted in this stream being read-closed
557   grpc_error_handle read_closed_error;
558   /// the error that resulted in this stream being write-closed
559   grpc_error_handle write_closed_error;
560 
561   grpc_published_metadata_method published_metadata[2] = {};
562   bool final_metadata_requested = false;
563 
564   grpc_metadata_batch initial_metadata_buffer;
565   grpc_metadata_batch trailing_metadata_buffer;
566 
567   grpc_slice_buffer frame_storage;   // protected by t combiner
568   bool received_last_frame = false;  // protected by t combiner
569 
570   grpc_core::Timestamp deadline = grpc_core::Timestamp::InfFuture();
571 
572   /// how many header frames have we received?
573   uint8_t header_frames_received = 0;
574   /// number of bytes received - reset at end of parse thread execution
575   int64_t received_bytes = 0;
576 
577   bool sent_initial_metadata = false;
578   bool sent_trailing_metadata = false;
579 
580   grpc_core::chttp2::StreamFlowControl flow_control;
581 
582   grpc_slice_buffer flow_controlled_buffer;
583 
584   grpc_chttp2_write_cb* on_flow_controlled_cbs = nullptr;
585   grpc_chttp2_write_cb* on_write_finished_cbs = nullptr;
586   grpc_chttp2_write_cb* finish_after_write = nullptr;
587   size_t sending_bytes = 0;
588 
589   /// Whether the bytes needs to be traced using Fathom
590   bool traced = false;
591   /// Byte counter for number of bytes written
592   size_t byte_counter = 0;
593 
594   // time this stream was created
595   gpr_timespec creation_time = gpr_now(GPR_CLOCK_MONOTONIC);
596 };
597 
598 /// Transport writing call flow:
599 /// grpc_chttp2_initiate_write() is called anywhere that we know bytes need to
600 /// go out on the wire.
601 /// If no other write has been started, a task is enqueued onto our workqueue.
602 /// When that task executes, it obtains the global lock, and gathers the data
603 /// to write.
604 /// The global lock is dropped and we do the syscall to write.
605 /// After writing, a follow-up check is made to see if another round of writing
606 /// should be performed.
607 
608 /// The actual call chain is documented in the implementation of this function.
609 ///
610 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
611                                 grpc_chttp2_initiate_write_reason reason);
612 
613 struct grpc_chttp2_begin_write_result {
614   /// are we writing?
615   bool writing;
616   /// if writing: was it a complete flush (false) or a partial flush (true)
617   bool partial;
618   /// did we queue any completions as part of beginning the write
619   bool early_results_scheduled;
620 };
621 grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
622     grpc_chttp2_transport* t);
623 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error_handle error);
624 
625 /// Process one slice of incoming data; return 1 if the connection is still
626 /// viable after reading, or 0 if the connection should be torn down
627 grpc_error_handle grpc_chttp2_perform_read(grpc_chttp2_transport* t,
628                                            const grpc_slice& slice);
629 
630 bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport* t,
631                                           grpc_chttp2_stream* s);
632 /// Get a writable stream
633 /// returns non-zero if there was a stream available
634 bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport* t,
635                                           grpc_chttp2_stream** s);
636 bool grpc_chttp2_list_remove_writable_stream(grpc_chttp2_transport* t,
637                                              grpc_chttp2_stream* s);
638 
639 bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport* t,
640                                          grpc_chttp2_stream* s);
641 bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport* t);
642 bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport* t,
643                                          grpc_chttp2_stream** s);
644 
645 void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport* t,
646                                          grpc_chttp2_stream* s);
647 bool grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport* t,
648                                          grpc_chttp2_stream** s);
649 
650 void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport* t,
651                                                   grpc_chttp2_stream* s);
652 bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport* t,
653                                                   grpc_chttp2_stream** s);
654 void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport* t,
655                                                      grpc_chttp2_stream* s);
656 
657 void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport* t,
658                                                grpc_chttp2_stream* s);
659 bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport* t,
660                                                grpc_chttp2_stream** s);
661 void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport* t,
662                                                   grpc_chttp2_stream* s);
663 
664 void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport* t,
665                                             grpc_chttp2_stream* s);
666 bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport* t,
667                                             grpc_chttp2_stream** s);
668 bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport* t,
669                                                grpc_chttp2_stream* s);
670 
671 //******** Flow Control **************
672 
673 // Takes in a flow control action and performs all the needed operations.
674 void grpc_chttp2_act_on_flowctl_action(
675     const grpc_core::chttp2::FlowControlAction& action,
676     grpc_chttp2_transport* t, grpc_chttp2_stream* s);
677 
678 //******** End of Flow Control **************
679 
grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport * t,uint32_t id)680 inline grpc_chttp2_stream* grpc_chttp2_parsing_lookup_stream(
681     grpc_chttp2_transport* t, uint32_t id) {
682   return static_cast<grpc_chttp2_stream*>(
683       grpc_chttp2_stream_map_find(&t->stream_map, id));
684 }
685 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
686                                                       uint32_t id);
687 
688 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
689                                      uint32_t goaway_error,
690                                      uint32_t last_stream_id,
691                                      absl::string_view goaway_text);
692 
693 void grpc_chttp2_parsing_become_skip_parser(grpc_chttp2_transport* t);
694 
695 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
696                                        grpc_chttp2_stream* s,
697                                        grpc_closure** pclosure,
698                                        grpc_error_handle error,
699                                        const char* desc,
700                                        grpc_core::DebugLocation whence = {});
701 
702 #define GRPC_HEADER_SIZE_IN_BYTES 5
703 #define MAX_SIZE_T (~(size_t)0)
704 
705 #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
706 #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
707   (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
708 
709 // extern grpc_core::TraceFlag grpc_flowctl_trace;
710 
711 #define GRPC_CHTTP2_IF_TRACING(stmt)                \
712   do {                                              \
713     if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { \
714       (stmt);                                       \
715     }                                               \
716   } while (0)
717 
718 void grpc_chttp2_fake_status(grpc_chttp2_transport* t,
719                              grpc_chttp2_stream* stream,
720                              grpc_error_handle error);
721 void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
722                                     grpc_chttp2_stream* s, int close_reads,
723                                     int close_writes, grpc_error_handle error);
724 void grpc_chttp2_start_writing(grpc_chttp2_transport* t);
725 
726 #ifndef NDEBUG
727 #define GRPC_CHTTP2_STREAM_REF(stream, reason) \
728   grpc_chttp2_stream_ref(stream, reason)
729 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \
730   grpc_chttp2_stream_unref(stream, reason)
731 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason);
732 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason);
733 #else
734 #define GRPC_CHTTP2_STREAM_REF(stream, reason) grpc_chttp2_stream_ref(stream)
735 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \
736   grpc_chttp2_stream_unref(stream)
737 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s);
738 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s);
739 #endif
740 
741 #ifndef NDEBUG
742 #define GRPC_CHTTP2_REF_TRANSPORT(t, r) \
743   grpc_chttp2_ref_transport(t, r, __FILE__, __LINE__)
744 #define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) \
745   grpc_chttp2_unref_transport(t, r, __FILE__, __LINE__)
grpc_chttp2_unref_transport(grpc_chttp2_transport * t,const char * reason,const char * file,int line)746 inline void grpc_chttp2_unref_transport(grpc_chttp2_transport* t,
747                                         const char* reason, const char* file,
748                                         int line) {
749   if (t->refs.Unref(grpc_core::DebugLocation(file, line), reason)) {
750     delete t;
751   }
752 }
grpc_chttp2_ref_transport(grpc_chttp2_transport * t,const char * reason,const char * file,int line)753 inline void grpc_chttp2_ref_transport(grpc_chttp2_transport* t,
754                                       const char* reason, const char* file,
755                                       int line) {
756   t->refs.Ref(grpc_core::DebugLocation(file, line), reason);
757 }
758 #else
759 #define GRPC_CHTTP2_REF_TRANSPORT(t, r) grpc_chttp2_ref_transport(t)
760 #define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) grpc_chttp2_unref_transport(t)
grpc_chttp2_unref_transport(grpc_chttp2_transport * t)761 inline void grpc_chttp2_unref_transport(grpc_chttp2_transport* t) {
762   if (t->refs.Unref()) {
763     delete t;
764   }
765 }
grpc_chttp2_ref_transport(grpc_chttp2_transport * t)766 inline void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) {
767   t->refs.Ref();
768 }
769 #endif
770 
771 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id);
772 
773 /// Add a new ping strike to ping_recv_state.ping_strikes. If
774 /// ping_recv_state.ping_strikes > ping_policy.max_ping_strikes, it sends GOAWAY
775 /// with error code ENHANCE_YOUR_CALM and additional debug data resembling
776 /// "too_many_pings" followed by immediately closing the connection.
777 void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t);
778 
779 /// Resets ping clock. Should be called when flushing window updates,
780 /// initial/trailing metadata or data frames. For a server, it resets the number
781 /// of ping strikes and the last_ping_recv_time. For a ping sender, it resets
782 /// pings_before_data_required.
783 void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t);
784 
785 /// add a ref to the stream and add it to the writable list;
786 /// ref will be dropped in writing.c
787 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
788                                       grpc_chttp2_stream* s);
789 
790 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
791                                grpc_error_handle due_to_error);
792 
793 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
794                                                       grpc_chttp2_stream* s);
795 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
796                                              grpc_chttp2_stream* s);
797 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
798                                                        grpc_chttp2_stream* s);
799 
800 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
801                                      grpc_chttp2_stream* s,
802                                      grpc_error_handle error);
803 
804 /// Set the default keepalive configurations, must only be called at
805 /// initialization
806 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
807                                                bool is_client);
808 
809 void grpc_chttp2_retry_initiate_ping(grpc_chttp2_transport* t);
810 
811 void schedule_bdp_ping_locked(grpc_chttp2_transport* t);
812 
813 uint32_t grpc_chttp2_min_read_progress_size(grpc_chttp2_transport* t);
814 
815 #endif  // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
816