1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
20 
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stdio.h>
24 #include <string.h>
25 
26 #include <algorithm>
27 #include <initializer_list>
28 #include <memory>
29 #include <new>
30 #include <string>
31 #include <utility>
32 #include <vector>
33 
34 #include "absl/base/attributes.h"
35 #include "absl/status/status.h"
36 #include "absl/strings/cord.h"
37 #include "absl/strings/str_cat.h"
38 #include "absl/strings/str_format.h"
39 #include "absl/strings/string_view.h"
40 #include "absl/types/optional.h"
41 
42 #include <grpc/event_engine/event_engine.h>
43 #include <grpc/grpc.h>
44 #include <grpc/impl/connectivity_state.h>
45 #include <grpc/slice_buffer.h>
46 #include <grpc/status.h>
47 #include <grpc/support/alloc.h>
48 #include <grpc/support/log.h>
49 #include <grpc/support/time.h>
50 
51 #include "src/core/ext/transport/chttp2/transport/context_list_entry.h"
52 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
53 #include "src/core/ext/transport/chttp2/transport/frame.h"
54 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
55 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
56 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
57 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
58 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
59 #include "src/core/ext/transport/chttp2/transport/http_trace.h"
60 #include "src/core/ext/transport/chttp2/transport/internal.h"
61 #include "src/core/ext/transport/chttp2/transport/stream_map.h"
62 #include "src/core/ext/transport/chttp2/transport/varint.h"
63 #include "src/core/lib/channel/call_tracer.h"
64 #include "src/core/lib/channel/channel_args.h"
65 #include "src/core/lib/channel/context.h"
66 #include "src/core/lib/debug/stats.h"
67 #include "src/core/lib/debug/stats_data.h"
68 #include "src/core/lib/experiments/experiments.h"
69 #include "src/core/lib/gpr/useful.h"
70 #include "src/core/lib/gprpp/bitset.h"
71 #include "src/core/lib/gprpp/crash.h"
72 #include "src/core/lib/gprpp/debug_location.h"
73 #include "src/core/lib/gprpp/ref_counted.h"
74 #include "src/core/lib/gprpp/status_helper.h"
75 #include "src/core/lib/gprpp/time.h"
76 #include "src/core/lib/http/parser.h"
77 #include "src/core/lib/iomgr/combiner.h"
78 #include "src/core/lib/iomgr/error.h"
79 #include "src/core/lib/iomgr/exec_ctx.h"
80 #include "src/core/lib/iomgr/iomgr_fwd.h"
81 #include "src/core/lib/iomgr/port.h"
82 #include "src/core/lib/promise/poll.h"
83 #include "src/core/lib/resource_quota/arena.h"
84 #include "src/core/lib/resource_quota/memory_quota.h"
85 #include "src/core/lib/resource_quota/resource_quota.h"
86 #include "src/core/lib/resource_quota/trace.h"
87 #include "src/core/lib/slice/slice.h"
88 #include "src/core/lib/slice/slice_buffer.h"
89 #include "src/core/lib/slice/slice_internal.h"
90 #include "src/core/lib/transport/bdp_estimator.h"
91 #include "src/core/lib/transport/connectivity_state.h"
92 #include "src/core/lib/transport/error_utils.h"
93 #include "src/core/lib/transport/http2_errors.h"
94 #include "src/core/lib/transport/metadata_batch.h"
95 #include "src/core/lib/transport/status_conversion.h"
96 #include "src/core/lib/transport/transport.h"
97 #include "src/core/lib/transport/transport_impl.h"
98 
99 #ifdef GRPC_POSIX_SOCKET_TCP
100 #include "src/core/lib/iomgr/ev_posix.h"
101 #endif
102 
103 #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
104 #define MAX_WINDOW 0x7fffffffu
105 #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
106 #define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024)
107 #define DEFAULT_MAX_HEADER_LIST_SIZE_SOFT_LIMIT (8 * 1024)
108 
109 #define DEFAULT_CLIENT_KEEPALIVE_TIME_MS INT_MAX
110 #define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS 20000  // 20 seconds
111 #define DEFAULT_SERVER_KEEPALIVE_TIME_MS 7200000   // 2 hours
112 #define DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS 20000  // 20 seconds
113 #define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false
114 #define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2
115 
116 #define DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS 300000  // 5 minutes
117 #define DEFAULT_MAX_PINGS_BETWEEN_DATA 2
118 #define DEFAULT_MAX_PING_STRIKES 2
119 
120 #define DEFAULT_MAX_PENDING_INDUCED_FRAMES 10000
121 
122 static int g_default_client_keepalive_time_ms =
123     DEFAULT_CLIENT_KEEPALIVE_TIME_MS;
124 static int g_default_client_keepalive_timeout_ms =
125     DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS;
126 static int g_default_server_keepalive_time_ms =
127     DEFAULT_SERVER_KEEPALIVE_TIME_MS;
128 static int g_default_server_keepalive_timeout_ms =
129     DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS;
130 static bool g_default_client_keepalive_permit_without_calls =
131     DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
132 static bool g_default_server_keepalive_permit_without_calls =
133     DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
134 
135 static int g_default_min_recv_ping_interval_without_data_ms =
136     DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS;
137 static int g_default_max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA;
138 static int g_default_max_ping_strikes = DEFAULT_MAX_PING_STRIKES;
139 
140 #define MAX_CLIENT_STREAM_ID 0x7fffffffu
141 grpc_core::TraceFlag grpc_keepalive_trace(false, "http_keepalive");
142 grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
143                                                          "chttp2_refcount");
144 
145 // forward declarations of various callbacks that we'll build closures around
146 static void write_action_begin_locked(void* t, grpc_error_handle error);
147 static void write_action(void* t, grpc_error_handle error);
148 static void write_action_end(void* t, grpc_error_handle error);
149 static void write_action_end_locked(void* t, grpc_error_handle error);
150 
151 static void read_action(void* t, grpc_error_handle error);
152 static void read_action_locked(void* t, grpc_error_handle error);
153 static void continue_read_action_locked(grpc_chttp2_transport* t);
154 
155 // Set a transport level setting, and push it to our peer
156 static void queue_setting_update(grpc_chttp2_transport* t,
157                                  grpc_chttp2_setting_id id, uint32_t value);
158 
159 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
160                            grpc_error_handle error);
161 
162 // Start new streams that have been created if we can
163 static void maybe_start_some_streams(grpc_chttp2_transport* t);
164 
165 static void connectivity_state_set(grpc_chttp2_transport* t,
166                                    grpc_connectivity_state state,
167                                    const absl::Status& status,
168                                    const char* reason);
169 
170 static void benign_reclaimer_locked(void* arg, grpc_error_handle error);
171 static void destructive_reclaimer_locked(void* arg, grpc_error_handle error);
172 
173 static void post_benign_reclaimer(grpc_chttp2_transport* t);
174 static void post_destructive_reclaimer(grpc_chttp2_transport* t);
175 
176 static void close_transport_locked(grpc_chttp2_transport* t,
177                                    grpc_error_handle error);
178 static void end_all_the_calls(grpc_chttp2_transport* t,
179                               grpc_error_handle error);
180 
181 static void start_bdp_ping(void* tp, grpc_error_handle error);
182 static void finish_bdp_ping(void* tp, grpc_error_handle error);
183 static void start_bdp_ping_locked(void* tp, grpc_error_handle error);
184 static void finish_bdp_ping_locked(void* tp, grpc_error_handle error);
185 static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t);
186 static void next_bdp_ping_timer_expired_locked(
187     void* tp, GRPC_UNUSED grpc_error_handle error);
188 
189 static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error);
190 static void send_ping_locked(grpc_chttp2_transport* t,
191                              grpc_closure* on_initiate, grpc_closure* on_ack);
192 static void retry_initiate_ping_locked(void* tp,
193                                        GRPC_UNUSED grpc_error_handle error);
194 
195 // keepalive-relevant functions
196 static void init_keepalive_ping(grpc_chttp2_transport* t);
197 static void init_keepalive_ping_locked(void* arg,
198                                        GRPC_UNUSED grpc_error_handle error);
199 static void start_keepalive_ping(void* arg, grpc_error_handle error);
200 static void finish_keepalive_ping(void* arg, grpc_error_handle error);
201 static void start_keepalive_ping_locked(void* arg, grpc_error_handle error);
202 static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error);
203 static void keepalive_watchdog_fired(grpc_chttp2_transport* t);
204 static void keepalive_watchdog_fired_locked(
205     void* arg, GRPC_UNUSED grpc_error_handle error);
206 static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t);
207 
208 namespace {
CallTracerIfEnabled(grpc_chttp2_stream * s)209 grpc_core::CallTracerInterface* CallTracerIfEnabled(grpc_chttp2_stream* s) {
210   if (s->context == nullptr || !grpc_core::IsTraceRecordCallopsEnabled()) {
211     return nullptr;
212   }
213   return static_cast<grpc_core::CallTracerInterface*>(
214       static_cast<grpc_call_context_element*>(
215           s->context)[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE]
216           .value);
217 }
218 
219 grpc_core::WriteTimestampsCallback g_write_timestamps_callback = nullptr;
220 grpc_core::CopyContextFn g_get_copied_context_fn = nullptr;
221 }  // namespace
222 
223 namespace grpc_core {
224 
225 namespace {
226 TestOnlyGlobalHttp2TransportInitCallback test_only_init_callback = nullptr;
227 TestOnlyGlobalHttp2TransportDestructCallback test_only_destruct_callback =
228     nullptr;
229 bool test_only_disable_transient_failure_state_notification = false;
230 }  // namespace
231 
TestOnlySetGlobalHttp2TransportInitCallback(TestOnlyGlobalHttp2TransportInitCallback callback)232 void TestOnlySetGlobalHttp2TransportInitCallback(
233     TestOnlyGlobalHttp2TransportInitCallback callback) {
234   test_only_init_callback = callback;
235 }
236 
TestOnlySetGlobalHttp2TransportDestructCallback(TestOnlyGlobalHttp2TransportDestructCallback callback)237 void TestOnlySetGlobalHttp2TransportDestructCallback(
238     TestOnlyGlobalHttp2TransportDestructCallback callback) {
239   test_only_destruct_callback = callback;
240 }
241 
TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification(bool disable)242 void TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification(
243     bool disable) {
244   test_only_disable_transient_failure_state_notification = disable;
245 }
246 
GrpcHttp2SetWriteTimestampsCallback(WriteTimestampsCallback fn)247 void GrpcHttp2SetWriteTimestampsCallback(WriteTimestampsCallback fn) {
248   g_write_timestamps_callback = fn;
249 }
250 
GrpcHttp2SetCopyContextFn(CopyContextFn fn)251 void GrpcHttp2SetCopyContextFn(CopyContextFn fn) {
252   g_get_copied_context_fn = fn;
253 }
254 
GrpcHttp2GetWriteTimestampsCallback()255 WriteTimestampsCallback GrpcHttp2GetWriteTimestampsCallback() {
256   return g_write_timestamps_callback;
257 }
258 
GrpcHttp2GetCopyContextFn()259 CopyContextFn GrpcHttp2GetCopyContextFn() { return g_get_copied_context_fn; }
260 
261 // For each entry in the passed ContextList, it executes the function set using
262 // GrpcHttp2SetWriteTimestampsCallback method with each context in the list
263 // and \a ts. It also deletes/frees up the passed ContextList after this
264 // operation.
ForEachContextListEntryExecute(void * arg,Timestamps * ts,grpc_error_handle error)265 void ForEachContextListEntryExecute(void* arg, Timestamps* ts,
266                                     grpc_error_handle error) {
267   ContextList* context_list = reinterpret_cast<ContextList*>(arg);
268   if (!context_list) {
269     return;
270   }
271   for (auto it = context_list->begin(); it != context_list->end(); it++) {
272     ContextListEntry& entry = (*it);
273     if (ts) {
274       ts->byte_offset = static_cast<uint32_t>(entry.ByteOffsetInStream());
275     }
276     g_write_timestamps_callback(entry.TraceContext(), ts, error);
277   }
278   delete context_list;
279 }
280 
281 }  // namespace grpc_core
282 
283 //
284 // CONSTRUCTION/DESTRUCTION/REFCOUNTING
285 //
286 
~grpc_chttp2_transport()287 grpc_chttp2_transport::~grpc_chttp2_transport() {
288   size_t i;
289 
290   event_engine.reset();
291 
292   if (channelz_socket != nullptr) {
293     channelz_socket.reset();
294   }
295 
296   grpc_endpoint_destroy(ep);
297 
298   grpc_slice_buffer_destroy(&qbuf);
299 
300   grpc_slice_buffer_destroy(&outbuf);
301 
302   grpc_error_handle error = GRPC_ERROR_CREATE("Transport destroyed");
303   // ContextList::Execute follows semantics of a callback function and does not
304   // take a ref on error
305   if (cl != nullptr) {
306     grpc_core::ForEachContextListEntryExecute(cl, nullptr, error);
307   }
308   cl = nullptr;
309 
310   grpc_slice_buffer_destroy(&read_buffer);
311   grpc_chttp2_goaway_parser_destroy(&goaway_parser);
312 
313   for (i = 0; i < STREAM_LIST_COUNT; i++) {
314     GPR_ASSERT(lists[i].head == nullptr);
315     GPR_ASSERT(lists[i].tail == nullptr);
316   }
317 
318   GPR_ASSERT(grpc_chttp2_stream_map_size(&stream_map) == 0);
319 
320   grpc_chttp2_stream_map_destroy(&stream_map);
321 
322   GRPC_COMBINER_UNREF(combiner, "chttp2_transport");
323 
324   cancel_pings(this, GRPC_ERROR_CREATE("Transport destroyed"));
325 
326   while (write_cb_pool) {
327     grpc_chttp2_write_cb* next = write_cb_pool->next;
328     gpr_free(write_cb_pool);
329     write_cb_pool = next;
330   }
331 
332   gpr_free(ping_acks);
333   if (grpc_core::test_only_destruct_callback != nullptr) {
334     grpc_core::test_only_destruct_callback();
335   }
336 }
337 
338 static const grpc_transport_vtable* get_vtable(void);
339 
read_channel_args(grpc_chttp2_transport * t,const grpc_core::ChannelArgs & channel_args,bool is_client)340 static void read_channel_args(grpc_chttp2_transport* t,
341                               const grpc_core::ChannelArgs& channel_args,
342                               bool is_client) {
343   const int initial_sequence_number =
344       channel_args.GetInt(GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER).value_or(-1);
345   if (initial_sequence_number > 0) {
346     if ((t->next_stream_id & 1) != (initial_sequence_number & 1)) {
347       gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
348               GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1,
349               is_client ? "client" : "server");
350     } else {
351       t->next_stream_id = static_cast<uint32_t>(initial_sequence_number);
352     }
353   }
354 
355   const int max_hpack_table_size =
356       channel_args.GetInt(GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER).value_or(-1);
357   if (max_hpack_table_size >= 0) {
358     t->hpack_compressor.SetMaxUsableSize(max_hpack_table_size);
359   }
360 
361   t->ping_policy.max_pings_without_data =
362       std::max(0, channel_args.GetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)
363                       .value_or(g_default_max_pings_without_data));
364   t->ping_policy.max_ping_strikes =
365       std::max(0, channel_args.GetInt(GRPC_ARG_HTTP2_MAX_PING_STRIKES)
366                       .value_or(g_default_max_ping_strikes));
367   t->ping_policy.min_recv_ping_interval_without_data =
368       std::max(grpc_core::Duration::Zero(),
369                channel_args
370                    .GetDurationFromIntMillis(
371                        GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)
372                    .value_or(grpc_core::Duration::Milliseconds(
373                        g_default_min_recv_ping_interval_without_data_ms)));
374   t->write_buffer_size =
375       std::max(0, channel_args.GetInt(GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)
376                       .value_or(grpc_core::chttp2::kDefaultWindow));
377   t->keepalive_time =
378       std::max(grpc_core::Duration::Milliseconds(1),
379                channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIME_MS)
380                    .value_or(grpc_core::Duration::Milliseconds(
381                        t->is_client ? g_default_client_keepalive_time_ms
382                                     : g_default_server_keepalive_time_ms)));
383   t->keepalive_timeout = std::max(
384       grpc_core::Duration::Zero(),
385       channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIMEOUT_MS)
386           .value_or(grpc_core::Duration::Milliseconds(
387               t->is_client ? g_default_client_keepalive_timeout_ms
388                            : g_default_server_keepalive_timeout_ms)));
389   t->keepalive_permit_without_calls =
390       channel_args.GetBool(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)
391           .value_or(false);
392   // Only send the prefered rx frame size http2 setting if we are instructed
393   // to auto size the buffers allocated at tcp level and we also can adjust
394   // sending frame size.
395   t->enable_preferred_rx_crypto_frame_advertisement =
396       channel_args
397           .GetBool(GRPC_ARG_EXPERIMENTAL_HTTP2_PREFERRED_CRYPTO_FRAME_SIZE)
398           .value_or(false);
399 
400   if (channel_args.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
401           .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) {
402     t->channelz_socket =
403         grpc_core::MakeRefCounted<grpc_core::channelz::SocketNode>(
404             std::string(grpc_endpoint_get_local_address(t->ep)),
405             std::string(t->peer_string.as_string_view()),
406             absl::StrCat(get_vtable()->name, " ",
407                          t->peer_string.as_string_view()),
408             channel_args
409                 .GetObjectRef<grpc_core::channelz::SocketNode::Security>());
410   }
411 
412   t->ack_pings = channel_args.GetBool("grpc.http2.ack_pings").value_or(true);
413 
414   const int soft_limit =
415       channel_args.GetInt(GRPC_ARG_MAX_METADATA_SIZE).value_or(-1);
416   if (soft_limit < 0) {
417     // Set soft limit to 0.8 * hard limit if this is larger than
418     // `DEFAULT_MAX_HEADER_LIST_SIZE_SOFT_LIMIT` and
419     // `GRPC_ARG_MAX_METADATA_SIZE` is not set.
420     t->max_header_list_size_soft_limit = std::max(
421         DEFAULT_MAX_HEADER_LIST_SIZE_SOFT_LIMIT,
422         static_cast<int>(
423             0.8 * channel_args.GetInt(GRPC_ARG_ABSOLUTE_MAX_METADATA_SIZE)
424                       .value_or(-1)));
425   } else {
426     t->max_header_list_size_soft_limit = soft_limit;
427   }
428 
429   static const struct {
430     absl::string_view channel_arg_name;
431     grpc_chttp2_setting_id setting_id;
432     int default_value;
433     int min;
434     int max;
435     bool availability[2] /* server, client */;
436   } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS,
437                        GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
438                        -1,
439                        0,
440                        INT32_MAX,
441                        {true, false}},
442                       {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
443                        GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
444                        -1,
445                        0,
446                        INT32_MAX,
447                        {true, true}},
448                       {GRPC_ARG_ABSOLUTE_MAX_METADATA_SIZE,
449                        GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
450                        -1,
451                        0,
452                        INT32_MAX,
453                        {true, true}},
454                       {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
455                        GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
456                        -1,
457                        16384,
458                        16777215,
459                        {true, true}},
460                       {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY,
461                        GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA,
462                        1,
463                        0,
464                        1,
465                        {true, true}},
466                       {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES,
467                        GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
468                        -1,
469                        5,
470                        INT32_MAX,
471                        {true, true}}};
472 
473   for (size_t i = 0; i < GPR_ARRAY_SIZE(settings_map); i++) {
474     const auto& setting = settings_map[i];
475     if (setting.availability[is_client]) {
476       const int value = channel_args.GetInt(setting.channel_arg_name)
477                             .value_or(setting.default_value);
478       if (value >= 0) {
479         queue_setting_update(t, setting.setting_id,
480                              grpc_core::Clamp(value, setting.min, setting.max));
481       } else if (setting.setting_id ==
482                  GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE) {
483         // Set value to 1.25 * soft limit if this is larger than
484         // `DEFAULT_MAX_HEADER_LIST_SIZE` and
485         // `GRPC_ARG_ABSOLUTE_MAX_METADATA_SIZE` is not set.
486         const int soft_limit = channel_args.GetInt(GRPC_ARG_MAX_METADATA_SIZE)
487                                    .value_or(setting.default_value);
488         const int value = (soft_limit >= 0 && soft_limit < (INT_MAX / 1.25))
489                               ? static_cast<int>(soft_limit * 1.25)
490                               : soft_limit;
491         if (value > DEFAULT_MAX_HEADER_LIST_SIZE) {
492           queue_setting_update(
493               t, setting.setting_id,
494               grpc_core::Clamp(value, setting.min, setting.max));
495         }
496       }
497     } else if (channel_args.Contains(setting.channel_arg_name)) {
498       gpr_log(GPR_DEBUG, "%s is not available on %s",
499               std::string(setting.channel_arg_name).c_str(),
500               is_client ? "clients" : "servers");
501     }
502   }
503 
504   if (t->enable_preferred_rx_crypto_frame_advertisement) {
505     const grpc_chttp2_setting_parameters* sp =
506         &grpc_chttp2_settings_parameters
507             [GRPC_CHTTP2_SETTINGS_GRPC_PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE];
508     queue_setting_update(
509         t, GRPC_CHTTP2_SETTINGS_GRPC_PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE,
510         grpc_core::Clamp(INT_MAX, static_cast<int>(sp->min_value),
511                          static_cast<int>(sp->max_value)));
512   }
513 }
514 
init_transport_keepalive_settings(grpc_chttp2_transport * t)515 static void init_transport_keepalive_settings(grpc_chttp2_transport* t) {
516   if (t->is_client) {
517     t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX
518                             ? grpc_core::Duration::Infinity()
519                             : grpc_core::Duration::Milliseconds(
520                                   g_default_client_keepalive_time_ms);
521     t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX
522                                ? grpc_core::Duration::Infinity()
523                                : grpc_core::Duration::Milliseconds(
524                                      g_default_client_keepalive_timeout_ms);
525     t->keepalive_permit_without_calls =
526         g_default_client_keepalive_permit_without_calls;
527   } else {
528     t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX
529                             ? grpc_core::Duration::Infinity()
530                             : grpc_core::Duration::Milliseconds(
531                                   g_default_server_keepalive_time_ms);
532     t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX
533                                ? grpc_core::Duration::Infinity()
534                                : grpc_core::Duration::Milliseconds(
535                                      g_default_server_keepalive_timeout_ms);
536     t->keepalive_permit_without_calls =
537         g_default_server_keepalive_permit_without_calls;
538   }
539 }
540 
configure_transport_ping_policy(grpc_chttp2_transport * t)541 static void configure_transport_ping_policy(grpc_chttp2_transport* t) {
542   t->ping_policy.max_pings_without_data = g_default_max_pings_without_data;
543   t->ping_policy.max_ping_strikes = g_default_max_ping_strikes;
544   t->ping_policy.min_recv_ping_interval_without_data =
545       grpc_core::Duration::Milliseconds(
546           g_default_min_recv_ping_interval_without_data_ms);
547 }
548 
init_keepalive_pings_if_enabled_locked(void * arg,GRPC_UNUSED grpc_error_handle error)549 static void init_keepalive_pings_if_enabled_locked(
550     void* arg, GRPC_UNUSED grpc_error_handle error) {
551   GPR_DEBUG_ASSERT(error.ok());
552   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
553   if (t->keepalive_time != grpc_core::Duration::Infinity()) {
554     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
555     GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
556     t->keepalive_ping_timer_handle =
557         t->event_engine->RunAfter(t->keepalive_time, [t] {
558           grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
559           grpc_core::ExecCtx exec_ctx;
560           init_keepalive_ping(t);
561         });
562   } else {
563     // Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
564     // inflight keepalive timers
565     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
566   }
567 }
568 
grpc_chttp2_transport(const grpc_core::ChannelArgs & channel_args,grpc_endpoint * ep,bool is_client)569 grpc_chttp2_transport::grpc_chttp2_transport(
570     const grpc_core::ChannelArgs& channel_args, grpc_endpoint* ep,
571     bool is_client)
572     : refs(1, GRPC_TRACE_FLAG_ENABLED(grpc_trace_chttp2_refcount)
573                   ? "chttp2_refcount"
574                   : nullptr),
575       ep(ep),
576       peer_string(
577           grpc_core::Slice::FromCopiedString(grpc_endpoint_get_peer(ep))),
578       memory_owner(channel_args.GetObject<grpc_core::ResourceQuota>()
579                        ->memory_quota()
580                        ->CreateMemoryOwner(absl::StrCat(
581                            grpc_endpoint_get_peer(ep), ":client_transport"))),
582       self_reservation(
583           memory_owner.MakeReservation(sizeof(grpc_chttp2_transport))),
584       combiner(grpc_combiner_create()),
585       state_tracker(is_client ? "client_transport" : "server_transport",
586                     GRPC_CHANNEL_READY),
587       is_client(is_client),
588       next_stream_id(is_client ? 1 : 2),
589       flow_control(
590           peer_string.as_string_view(),
591           channel_args.GetBool(GRPC_ARG_HTTP2_BDP_PROBE).value_or(true),
592           &memory_owner),
593       deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0),
594       event_engine(
595           channel_args
596               .GetObjectRef<grpc_event_engine::experimental::EventEngine>()) {
597   cl = new grpc_core::ContextList();
598   GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
599              GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
600   base.vtable = get_vtable();
601   // 8 is a random stab in the dark as to a good initial size: it's small enough
602   //   that it shouldn't waste memory for infrequently used connections, yet
603   //   large enough that the exponential growth should happen nicely when it's
604   //   needed.
605   //   TODO(ctiller): tune this
606   grpc_chttp2_stream_map_init(&stream_map, 8);
607 
608   grpc_slice_buffer_init(&read_buffer);
609   grpc_slice_buffer_init(&outbuf);
610   if (is_client) {
611     grpc_slice_buffer_add(&outbuf, grpc_slice_from_copied_string(
612                                        GRPC_CHTTP2_CLIENT_CONNECT_STRING));
613   }
614   grpc_slice_buffer_init(&qbuf);
615   // copy in initial settings to all setting sets
616   size_t i;
617   int j;
618   for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
619     for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) {
620       settings[j][i] = grpc_chttp2_settings_parameters[i].default_value;
621     }
622   }
623   grpc_chttp2_goaway_parser_init(&goaway_parser);
624 
625   // configure http2 the way we like it
626   if (is_client) {
627     queue_setting_update(this, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
628     queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
629   }
630   queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
631                        DEFAULT_MAX_HEADER_LIST_SIZE);
632   queue_setting_update(this,
633                        GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1);
634 
635   configure_transport_ping_policy(this);
636   init_transport_keepalive_settings(this);
637 
638   read_channel_args(this, channel_args, is_client);
639 
640   // No pings allowed before receiving a header or data frame.
641   ping_state.pings_before_data_required = 0;
642   ping_state.last_ping_sent_time = grpc_core::Timestamp::InfPast();
643 
644   ping_recv_state.last_ping_recv_time = grpc_core::Timestamp::InfPast();
645   ping_recv_state.ping_strikes = 0;
646 
647   grpc_core::ExecCtx exec_ctx;
648   combiner->Run(
649       GRPC_CLOSURE_INIT(&init_keepalive_ping_locked,
650                         init_keepalive_pings_if_enabled_locked, this, nullptr),
651       absl::OkStatus());
652 
653   if (flow_control.bdp_probe()) {
654     bdp_ping_blocked = true;
655     grpc_chttp2_act_on_flowctl_action(flow_control.PeriodicUpdate(), this,
656                                       nullptr);
657   }
658 
659   grpc_chttp2_initiate_write(this, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
660   post_benign_reclaimer(this);
661   if (grpc_core::test_only_init_callback != nullptr) {
662     grpc_core::test_only_init_callback();
663   }
664 
665 #ifdef GRPC_POSIX_SOCKET_TCP
666   closure_barrier_may_cover_write =
667       grpc_event_engine_run_in_background() &&
668               grpc_core::IsScheduleCancellationOverWriteEnabled()
669           ? 0
670           : CLOSURE_BARRIER_MAY_COVER_WRITE;
671 #endif
672 }
673 
destroy_transport_locked(void * tp,grpc_error_handle)674 static void destroy_transport_locked(void* tp, grpc_error_handle /*error*/) {
675   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
676   t->destroying = 1;
677   close_transport_locked(
678       t, grpc_error_set_int(GRPC_ERROR_CREATE("Transport destroyed"),
679                             grpc_core::StatusIntProperty::kOccurredDuringWrite,
680                             t->write_state));
681   t->memory_owner.Reset();
682   // Must be the last line.
683   GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy");
684 }
685 
destroy_transport(grpc_transport * gt)686 static void destroy_transport(grpc_transport* gt) {
687   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
688   t->combiner->Run(GRPC_CLOSURE_CREATE(destroy_transport_locked, t, nullptr),
689                    absl::OkStatus());
690 }
691 
close_transport_locked(grpc_chttp2_transport * t,grpc_error_handle error)692 static void close_transport_locked(grpc_chttp2_transport* t,
693                                    grpc_error_handle error) {
694   end_all_the_calls(t, error);
695   cancel_pings(t, error);
696   if (t->closed_with_error.ok()) {
697     if (!grpc_error_has_clear_grpc_status(error)) {
698       error =
699           grpc_error_set_int(error, grpc_core::StatusIntProperty::kRpcStatus,
700                              GRPC_STATUS_UNAVAILABLE);
701     }
702     if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) {
703       if (t->close_transport_on_writes_finished.ok()) {
704         t->close_transport_on_writes_finished =
705             GRPC_ERROR_CREATE("Delayed close due to in-progress write");
706       }
707       t->close_transport_on_writes_finished =
708           grpc_error_add_child(t->close_transport_on_writes_finished, error);
709       return;
710     }
711     GPR_ASSERT(!error.ok());
712     t->closed_with_error = error;
713     connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, absl::Status(),
714                            "close_transport");
715     if (t->ping_state.delayed_ping_timer_handle.has_value()) {
716       if (t->event_engine->Cancel(*t->ping_state.delayed_ping_timer_handle)) {
717         GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked");
718         t->ping_state.delayed_ping_timer_handle.reset();
719       }
720     }
721     if (t->next_bdp_ping_timer_handle.has_value()) {
722       if (t->event_engine->Cancel(*t->next_bdp_ping_timer_handle)) {
723         GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
724         t->next_bdp_ping_timer_handle.reset();
725       }
726     }
727     switch (t->keepalive_state) {
728       case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
729         if (t->keepalive_ping_timer_handle.has_value()) {
730           if (t->event_engine->Cancel(*t->keepalive_ping_timer_handle)) {
731             GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
732             t->keepalive_ping_timer_handle.reset();
733           }
734         }
735         break;
736       case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
737         if (t->keepalive_ping_timer_handle.has_value()) {
738           if (t->event_engine->Cancel(*t->keepalive_ping_timer_handle)) {
739             GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
740             t->keepalive_ping_timer_handle.reset();
741           }
742         }
743         if (t->keepalive_watchdog_timer_handle.has_value()) {
744           if (t->event_engine->Cancel(*t->keepalive_watchdog_timer_handle)) {
745             GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
746             t->keepalive_watchdog_timer_handle.reset();
747           }
748         }
749         break;
750       case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
751       case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
752         // keepalive timers are not set in these two states
753         break;
754     }
755 
756     // flush writable stream list to avoid dangling references
757     grpc_chttp2_stream* s;
758     while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
759       GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
760     }
761     GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
762     grpc_endpoint_shutdown(t->ep, error);
763   }
764   if (t->notify_on_receive_settings != nullptr) {
765     grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings,
766                             error);
767     t->notify_on_receive_settings = nullptr;
768   }
769   if (t->notify_on_close != nullptr) {
770     grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_close, error);
771     t->notify_on_close = nullptr;
772   }
773 }
774 
775 #ifndef NDEBUG
grpc_chttp2_stream_ref(grpc_chttp2_stream * s,const char * reason)776 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) {
777   grpc_stream_ref(s->refcount, reason);
778 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s,const char * reason)779 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) {
780   grpc_stream_unref(s->refcount, reason);
781 }
782 #else
grpc_chttp2_stream_ref(grpc_chttp2_stream * s)783 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) {
784   grpc_stream_ref(s->refcount);
785 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s)786 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
787   grpc_stream_unref(s->refcount);
788 }
789 #endif
790 
Reffer(grpc_chttp2_stream * s)791 grpc_chttp2_stream::Reffer::Reffer(grpc_chttp2_stream* s) {
792   // We reserve one 'active stream' that's dropped when the stream is
793   //   read-closed. The others are for Chttp2IncomingByteStreams that are
794   //   actively reading
795   GRPC_CHTTP2_STREAM_REF(s, "chttp2");
796   GRPC_CHTTP2_REF_TRANSPORT(s->t, "stream");
797 }
798 
grpc_chttp2_stream(grpc_chttp2_transport * t,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)799 grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
800                                        grpc_stream_refcount* refcount,
801                                        const void* server_data,
802                                        grpc_core::Arena* arena)
803     : t(t),
804       refcount(refcount),
805       reffer(this),
806       initial_metadata_buffer(arena),
807       trailing_metadata_buffer(arena),
808       flow_control(&t->flow_control) {
809   if (server_data) {
810     id = static_cast<uint32_t>(reinterpret_cast<uintptr_t>(server_data));
811     if (grpc_http_trace.enabled()) {
812       gpr_log(GPR_DEBUG, "HTTP:%p/%p creating accept stream %d [from %p]", t,
813               this, id, server_data);
814     }
815     *t->accepting_stream = this;
816     grpc_chttp2_stream_map_add(&t->stream_map, id, this);
817     post_destructive_reclaimer(t);
818   }
819 
820   grpc_slice_buffer_init(&frame_storage);
821   grpc_slice_buffer_init(&flow_controlled_buffer);
822 }
823 
~grpc_chttp2_stream()824 grpc_chttp2_stream::~grpc_chttp2_stream() {
825   grpc_chttp2_list_remove_stalled_by_stream(t, this);
826   grpc_chttp2_list_remove_stalled_by_transport(t, this);
827 
828   if (t->channelz_socket != nullptr) {
829     if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) {
830       t->channelz_socket->RecordStreamSucceeded();
831     } else {
832       t->channelz_socket->RecordStreamFailed();
833     }
834   }
835 
836   GPR_ASSERT((write_closed && read_closed) || id == 0);
837   if (id != 0) {
838     GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr);
839   }
840 
841   grpc_slice_buffer_destroy(&frame_storage);
842 
843   for (int i = 0; i < STREAM_LIST_COUNT; i++) {
844     if (GPR_UNLIKELY(included.is_set(i))) {
845       grpc_core::Crash(absl::StrFormat("%s stream %d still included in list %d",
846                                        t->is_client ? "client" : "server", id,
847                                        i));
848     }
849   }
850 
851   GPR_ASSERT(send_initial_metadata_finished == nullptr);
852   GPR_ASSERT(send_trailing_metadata_finished == nullptr);
853   GPR_ASSERT(recv_initial_metadata_ready == nullptr);
854   GPR_ASSERT(recv_message_ready == nullptr);
855   GPR_ASSERT(recv_trailing_metadata_finished == nullptr);
856   grpc_slice_buffer_destroy(&flow_controlled_buffer);
857   GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream");
858   grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_stream_arg, absl::OkStatus());
859 }
860 
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)861 static int init_stream(grpc_transport* gt, grpc_stream* gs,
862                        grpc_stream_refcount* refcount, const void* server_data,
863                        grpc_core::Arena* arena) {
864   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
865   new (gs) grpc_chttp2_stream(t, refcount, server_data, arena);
866   return 0;
867 }
868 
destroy_stream_locked(void * sp,grpc_error_handle)869 static void destroy_stream_locked(void* sp, grpc_error_handle /*error*/) {
870   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
871   s->~grpc_chttp2_stream();
872 }
873 
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)874 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
875                            grpc_closure* then_schedule_closure) {
876   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
877   grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
878 
879   s->destroy_stream_arg = then_schedule_closure;
880   t->combiner->Run(
881       GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, nullptr),
882       absl::OkStatus());
883 }
884 
grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport * t,uint32_t id)885 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
886                                                       uint32_t id) {
887   if (t->accept_stream_cb == nullptr) {
888     return nullptr;
889   }
890   grpc_chttp2_stream* accepting = nullptr;
891   GPR_ASSERT(t->accepting_stream == nullptr);
892   t->accepting_stream = &accepting;
893   t->accept_stream_cb(t->accept_stream_cb_user_data, &t->base,
894                       reinterpret_cast<void*>(id));
895   t->accepting_stream = nullptr;
896   return accepting;
897 }
898 
899 //
900 // OUTPUT PROCESSING
901 //
902 
write_state_name(grpc_chttp2_write_state st)903 static const char* write_state_name(grpc_chttp2_write_state st) {
904   switch (st) {
905     case GRPC_CHTTP2_WRITE_STATE_IDLE:
906       return "IDLE";
907     case GRPC_CHTTP2_WRITE_STATE_WRITING:
908       return "WRITING";
909     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
910       return "WRITING+MORE";
911   }
912   GPR_UNREACHABLE_CODE(return "UNKNOWN");
913 }
914 
set_write_state(grpc_chttp2_transport * t,grpc_chttp2_write_state st,const char * reason)915 static void set_write_state(grpc_chttp2_transport* t,
916                             grpc_chttp2_write_state st, const char* reason) {
917   GRPC_CHTTP2_IF_TRACING(
918       gpr_log(GPR_INFO, "W:%p %s [%s] state %s -> %s [%s]", t,
919               t->is_client ? "CLIENT" : "SERVER",
920               std::string(t->peer_string.as_string_view()).c_str(),
921               write_state_name(t->write_state), write_state_name(st), reason));
922   t->write_state = st;
923   // If the state is being reset back to idle, it means a write was just
924   // finished. Make sure all the run_after_write closures are scheduled.
925   //
926   // This is also our chance to close the transport if the transport was marked
927   // to be closed after all writes finish (for example, if we received a go-away
928   // from peer while we had some pending writes)
929   if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
930     grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
931     if (!t->close_transport_on_writes_finished.ok()) {
932       grpc_error_handle err = t->close_transport_on_writes_finished;
933       t->close_transport_on_writes_finished = absl::OkStatus();
934       close_transport_locked(t, err);
935     }
936   }
937 }
938 
grpc_chttp2_initiate_write(grpc_chttp2_transport * t,grpc_chttp2_initiate_write_reason reason)939 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
940                                 grpc_chttp2_initiate_write_reason reason) {
941   switch (t->write_state) {
942     case GRPC_CHTTP2_WRITE_STATE_IDLE:
943       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
944                       grpc_chttp2_initiate_write_reason_string(reason));
945       GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
946       // Note that the 'write_action_begin_locked' closure is being scheduled
947       // on the 'finally_scheduler' of t->combiner. This means that
948       // 'write_action_begin_locked' is called only *after* all the other
949       // closures (some of which are potentially initiating more writes on the
950       // transport) are executed on the t->combiner.
951       //
952       // The reason for scheduling on finally_scheduler is to make sure we batch
953       // as many writes as possible. 'write_action_begin_locked' is the function
954       // that gathers all the relevant bytes (which are at various places in the
955       // grpc_chttp2_transport structure) and append them to 'outbuf' field in
956       // grpc_chttp2_transport thereby batching what would have been potentially
957       // multiple write operations.
958       //
959       // Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
960       // It does not call the endpoint to write the bytes. That is done by the
961       // 'write_action' (which is scheduled by 'write_action_begin_locked')
962       t->combiner->FinallyRun(
963           GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
964                             write_action_begin_locked, t, nullptr),
965           absl::OkStatus());
966       break;
967     case GRPC_CHTTP2_WRITE_STATE_WRITING:
968       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
969                       grpc_chttp2_initiate_write_reason_string(reason));
970       break;
971     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
972       break;
973   }
974 }
975 
grpc_chttp2_mark_stream_writable(grpc_chttp2_transport * t,grpc_chttp2_stream * s)976 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
977                                       grpc_chttp2_stream* s) {
978   if (t->closed_with_error.ok() && grpc_chttp2_list_add_writable_stream(t, s)) {
979     GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
980   }
981 }
982 
begin_writing_desc(bool partial)983 static const char* begin_writing_desc(bool partial) {
984   if (partial) {
985     return "begin partial write in background";
986   } else {
987     return "begin write in current thread";
988   }
989 }
990 
write_action_begin_locked(void * gt,grpc_error_handle)991 static void write_action_begin_locked(void* gt,
992                                       grpc_error_handle /*error_ignored*/) {
993   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
994   GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
995   grpc_chttp2_begin_write_result r;
996   if (!t->closed_with_error.ok()) {
997     r.writing = false;
998   } else {
999     r = grpc_chttp2_begin_write(t);
1000   }
1001   if (r.writing) {
1002     set_write_state(t,
1003                     r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
1004                               : GRPC_CHTTP2_WRITE_STATE_WRITING,
1005                     begin_writing_desc(r.partial));
1006     write_action(t, absl::OkStatus());
1007     if (t->reading_paused_on_pending_induced_frames) {
1008       GPR_ASSERT(t->num_pending_induced_frames == 0);
1009       // We had paused reading, because we had many induced frames (SETTINGS
1010       // ACK, PINGS ACK and RST_STREAMS) pending in t->qbuf. Now that we have
1011       // been able to flush qbuf, we can resume reading.
1012       GRPC_CHTTP2_IF_TRACING(gpr_log(
1013           GPR_INFO,
1014           "transport %p : Resuming reading after being paused due to too "
1015           "many unwritten SETTINGS ACK, PINGS ACK and RST_STREAM frames",
1016           t));
1017       t->reading_paused_on_pending_induced_frames = false;
1018       continue_read_action_locked(t);
1019     }
1020   } else {
1021     set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing");
1022     GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
1023   }
1024 }
1025 
write_action(void * gt,grpc_error_handle)1026 static void write_action(void* gt, grpc_error_handle /*error*/) {
1027   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
1028   void* cl = t->cl;
1029   if (!t->cl->empty()) {
1030     // Transfer the ownership of the context list to the endpoint and create and
1031     // associate a new context list with the transport.
1032     // The old context list is stored in the cl local variable which is passed
1033     // to the endpoint. Its upto the endpoint to manage its lifetime.
1034     t->cl = new grpc_core::ContextList();
1035   } else {
1036     // t->cl is Empty. There is nothing to trace in this endpoint_write. set cl
1037     // to nullptr.
1038     cl = nullptr;
1039   }
1040   // Choose max_frame_size as the prefered rx crypto frame size indicated by the
1041   // peer.
1042   int max_frame_size =
1043       t->settings
1044           [GRPC_PEER_SETTINGS]
1045           [GRPC_CHTTP2_SETTINGS_GRPC_PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE];
1046   // Note: max frame size is 0 if the remote peer does not support adjusting the
1047   // sending frame size.
1048   if (max_frame_size == 0) {
1049     max_frame_size = INT_MAX;
1050   }
1051   grpc_endpoint_write(
1052       t->ep, &t->outbuf,
1053       GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end, t,
1054                         grpc_schedule_on_exec_ctx),
1055       cl, max_frame_size);
1056 }
1057 
write_action_end(void * tp,grpc_error_handle error)1058 static void write_action_end(void* tp, grpc_error_handle error) {
1059   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1060   t->combiner->Run(GRPC_CLOSURE_INIT(&t->write_action_end_locked,
1061                                      write_action_end_locked, t, nullptr),
1062                    error);
1063 }
1064 
1065 // Callback from the grpc_endpoint after bytes have been written by calling
1066 // sendmsg
write_action_end_locked(void * tp,grpc_error_handle error)1067 static void write_action_end_locked(void* tp, grpc_error_handle error) {
1068   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1069 
1070   bool closed = false;
1071   if (!error.ok()) {
1072     close_transport_locked(t, error);
1073     closed = true;
1074   }
1075 
1076   if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED) {
1077     t->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SENT;
1078     closed = true;
1079     if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
1080       close_transport_locked(t, GRPC_ERROR_CREATE("goaway sent"));
1081     }
1082   }
1083 
1084   switch (t->write_state) {
1085     case GRPC_CHTTP2_WRITE_STATE_IDLE:
1086       GPR_UNREACHABLE_CODE(break);
1087     case GRPC_CHTTP2_WRITE_STATE_WRITING:
1088       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing");
1089       break;
1090     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
1091       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing");
1092       GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
1093       // If the transport is closed, we will retry writing on the endpoint
1094       // and next write may contain part of the currently serialized frames.
1095       // So, we should only call the run_after_write callbacks when the next
1096       // write finishes, or the callbacks will be invoked when the stream is
1097       // closed.
1098       if (!closed) {
1099         grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
1100       }
1101       t->combiner->FinallyRun(
1102           GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
1103                             write_action_begin_locked, t, nullptr),
1104           absl::OkStatus());
1105       break;
1106   }
1107 
1108   grpc_chttp2_end_write(t, error);
1109   GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
1110 }
1111 
1112 // Dirties an HTTP2 setting to be sent out next time a writing path occurs.
1113 // If the change needs to occur immediately, manually initiate a write.
queue_setting_update(grpc_chttp2_transport * t,grpc_chttp2_setting_id id,uint32_t value)1114 static void queue_setting_update(grpc_chttp2_transport* t,
1115                                  grpc_chttp2_setting_id id, uint32_t value) {
1116   const grpc_chttp2_setting_parameters* sp =
1117       &grpc_chttp2_settings_parameters[id];
1118   uint32_t use_value = grpc_core::Clamp(value, sp->min_value, sp->max_value);
1119   if (use_value != value) {
1120     gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
1121             value, use_value);
1122   }
1123   if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) {
1124     t->settings[GRPC_LOCAL_SETTINGS][id] = use_value;
1125     t->dirtied_local_settings = true;
1126   }
1127 }
1128 
1129 // Cancel out streams that haven't yet started if we have received a GOAWAY
cancel_unstarted_streams(grpc_chttp2_transport * t,grpc_error_handle error)1130 static void cancel_unstarted_streams(grpc_chttp2_transport* t,
1131                                      grpc_error_handle error) {
1132   grpc_chttp2_stream* s;
1133   while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1134     s->trailing_metadata_buffer.Set(
1135         grpc_core::GrpcStreamNetworkState(),
1136         grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
1137     grpc_chttp2_cancel_stream(t, s, error);
1138   }
1139 }
1140 
grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport * t,uint32_t goaway_error,uint32_t last_stream_id,absl::string_view goaway_text)1141 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
1142                                      uint32_t goaway_error,
1143                                      uint32_t last_stream_id,
1144                                      absl::string_view goaway_text) {
1145   t->goaway_error = grpc_error_set_str(
1146       grpc_error_set_int(
1147           grpc_error_set_int(
1148               grpc_core::StatusCreate(
1149                   absl::StatusCode::kUnavailable,
1150                   absl::StrFormat(
1151                       "GOAWAY received; Error code: %u; Debug Text: %s",
1152                       goaway_error, goaway_text),
1153                   DEBUG_LOCATION, {}),
1154               grpc_core::StatusIntProperty::kHttp2Error,
1155               static_cast<intptr_t>(goaway_error)),
1156           grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE),
1157       grpc_core::StatusStrProperty::kRawBytes, goaway_text);
1158 
1159   GRPC_CHTTP2_IF_TRACING(
1160       gpr_log(GPR_INFO, "transport %p got goaway with last stream id %d", t,
1161               last_stream_id));
1162   // We want to log this irrespective of whether http tracing is enabled if we
1163   // received a GOAWAY with a non NO_ERROR code.
1164   if (goaway_error != GRPC_HTTP2_NO_ERROR) {
1165     gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s",
1166             std::string(t->peer_string.as_string_view()).c_str(), goaway_error,
1167             grpc_core::StatusToString(t->goaway_error).c_str());
1168   }
1169   if (t->is_client) {
1170     cancel_unstarted_streams(t, t->goaway_error);
1171     // Cancel all unseen streams
1172     grpc_chttp2_stream_map_for_each(
1173         &t->stream_map,
1174         [](void* user_data, uint32_t /* key */, void* stream) {
1175           uint32_t last_stream_id = *(static_cast<uint32_t*>(user_data));
1176           grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(stream);
1177           if (s->id > last_stream_id) {
1178             s->trailing_metadata_buffer.Set(
1179                 grpc_core::GrpcStreamNetworkState(),
1180                 grpc_core::GrpcStreamNetworkState::kNotSeenByServer);
1181             grpc_chttp2_cancel_stream(s->t, s, s->t->goaway_error);
1182           }
1183         },
1184         &last_stream_id);
1185   }
1186   absl::Status status = grpc_error_to_absl_status(t->goaway_error);
1187   // When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
1188   // data equal to "too_many_pings", it should log the occurrence at a log level
1189   // that is enabled by default and double the configured KEEPALIVE_TIME used
1190   // for new connections on that channel.
1191   if (GPR_UNLIKELY(t->is_client &&
1192                    goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM &&
1193                    goaway_text == "too_many_pings")) {
1194     gpr_log(GPR_ERROR,
1195             "%s: Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug "
1196             "data equal to \"too_many_pings\". Current keepalive time (before "
1197             "throttling): %s",
1198             std::string(t->peer_string.as_string_view()).c_str(),
1199             t->keepalive_time.ToString().c_str());
1200     constexpr int max_keepalive_time_millis =
1201         INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER;
1202     int64_t throttled_keepalive_time =
1203         t->keepalive_time.millis() > max_keepalive_time_millis
1204             ? INT_MAX
1205             : t->keepalive_time.millis() * KEEPALIVE_TIME_BACKOFF_MULTIPLIER;
1206     status.SetPayload(grpc_core::kKeepaliveThrottlingKey,
1207                       absl::Cord(std::to_string(throttled_keepalive_time)));
1208   }
1209   // lie: use transient failure from the transport to indicate goaway has been
1210   // received.
1211   if (!grpc_core::test_only_disable_transient_failure_state_notification) {
1212     connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1213                            "got_goaway");
1214   }
1215 }
1216 
maybe_start_some_streams(grpc_chttp2_transport * t)1217 static void maybe_start_some_streams(grpc_chttp2_transport* t) {
1218   grpc_chttp2_stream* s;
1219   // maybe cancel out streams that haven't yet started if we have received a
1220   // GOAWAY
1221   if (!t->goaway_error.ok()) {
1222     cancel_unstarted_streams(t, t->goaway_error);
1223     return;
1224   }
1225   // start streams where we have free grpc_chttp2_stream ids and free
1226   // * concurrency
1227   while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
1228          grpc_chttp2_stream_map_size(&t->stream_map) <
1229              t->settings[GRPC_PEER_SETTINGS]
1230                         [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
1231          grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1232     // safe since we can't (legally) be parsing this stream yet
1233     GRPC_CHTTP2_IF_TRACING(gpr_log(
1234         GPR_INFO,
1235         "HTTP:%s: Transport %p allocating new grpc_chttp2_stream %p to id %d",
1236         t->is_client ? "CLI" : "SVR", t, s, t->next_stream_id));
1237 
1238     GPR_ASSERT(s->id == 0);
1239     s->id = t->next_stream_id;
1240     t->next_stream_id += 2;
1241 
1242     if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1243       connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE,
1244                              absl::Status(absl::StatusCode::kUnavailable,
1245                                           "Transport Stream IDs exhausted"),
1246                              "no_more_stream_ids");
1247     }
1248 
1249     grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
1250     post_destructive_reclaimer(t);
1251     grpc_chttp2_mark_stream_writable(t, s);
1252     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
1253   }
1254   // cancel out streams that will never be started
1255   if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1256     while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1257       s->trailing_metadata_buffer.Set(
1258           grpc_core::GrpcStreamNetworkState(),
1259           grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
1260       grpc_chttp2_cancel_stream(
1261           t, s,
1262           grpc_error_set_int(GRPC_ERROR_CREATE("Stream IDs exhausted"),
1263                              grpc_core::StatusIntProperty::kRpcStatus,
1264                              GRPC_STATUS_UNAVAILABLE));
1265     }
1266   }
1267 }
1268 
add_closure_barrier(grpc_closure * closure)1269 static grpc_closure* add_closure_barrier(grpc_closure* closure) {
1270   closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT;
1271   return closure;
1272 }
1273 
null_then_sched_closure(grpc_closure ** closure)1274 static void null_then_sched_closure(grpc_closure** closure) {
1275   grpc_closure* c = *closure;
1276   *closure = nullptr;
1277   // null_then_schedule_closure might be run during a start_batch which might
1278   // subsequently examine the batch for more operations contained within.
1279   // However, the closure run might make it back to the call object, push a
1280   // completion, have the application see it, and make a new operation on the
1281   // call which recycles the batch BEFORE the call to start_batch completes,
1282   // forcing a race.
1283   grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, absl::OkStatus());
1284 }
1285 
grpc_chttp2_complete_closure_step(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_closure ** pclosure,grpc_error_handle error,const char * desc,grpc_core::DebugLocation whence)1286 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
1287                                        grpc_chttp2_stream* s,
1288                                        grpc_closure** pclosure,
1289                                        grpc_error_handle error,
1290                                        const char* desc,
1291                                        grpc_core::DebugLocation whence) {
1292   grpc_closure* closure = *pclosure;
1293   *pclosure = nullptr;
1294   if (closure == nullptr) {
1295     return;
1296   }
1297   closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
1298   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1299     gpr_log(
1300         GPR_INFO,
1301         "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s "
1302         "write_state=%s whence=%s:%d",
1303         t, closure,
1304         static_cast<int>(closure->next_data.scratch /
1305                          CLOSURE_BARRIER_FIRST_REF_BIT),
1306         static_cast<int>(closure->next_data.scratch %
1307                          CLOSURE_BARRIER_FIRST_REF_BIT),
1308         desc, grpc_core::StatusToString(error).c_str(),
1309         write_state_name(t->write_state), whence.file(), whence.line());
1310   }
1311 
1312   auto* tracer = CallTracerIfEnabled(s);
1313   if (tracer != nullptr) {
1314     tracer->RecordAnnotation(
1315         absl::StrFormat("on_complete: s=%p %p desc=%s err=%s", s, closure, desc,
1316                         grpc_core::StatusToString(error).c_str()));
1317   }
1318 
1319   if (!error.ok()) {
1320     grpc_error_handle cl_err =
1321         grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
1322     if (cl_err.ok()) {
1323       cl_err = GRPC_ERROR_CREATE(absl::StrCat(
1324           "Error in HTTP transport completing operation: ", desc,
1325           " write_state=", write_state_name(t->write_state), " refs=",
1326           closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT, " flags=",
1327           closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT));
1328       cl_err = grpc_error_set_str(cl_err,
1329                                   grpc_core::StatusStrProperty::kTargetAddress,
1330                                   std::string(t->peer_string.as_string_view()));
1331     }
1332     cl_err = grpc_error_add_child(cl_err, error);
1333     closure->error_data.error = grpc_core::internal::StatusAllocHeapPtr(cl_err);
1334   }
1335   if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
1336     if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
1337         !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
1338       // Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running
1339       // closures earlier than when it is safe to do so.
1340       grpc_error_handle run_error =
1341           grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
1342       closure->error_data.error = 0;
1343       grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, run_error);
1344     } else {
1345       grpc_closure_list_append(&t->run_after_write, closure);
1346     }
1347   }
1348 }
1349 
contains_non_ok_status(grpc_metadata_batch * batch)1350 static bool contains_non_ok_status(grpc_metadata_batch* batch) {
1351   return batch->get(grpc_core::GrpcStatusMetadata()).value_or(GRPC_STATUS_OK) !=
1352          GRPC_STATUS_OK;
1353 }
1354 
log_metadata(const grpc_metadata_batch * md_batch,uint32_t id,bool is_client,bool is_initial)1355 static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
1356                          bool is_client, bool is_initial) {
1357   gpr_log(GPR_INFO, "--metadata--");
1358   const std::string prefix = absl::StrCat(
1359       "HTTP:", id, is_initial ? ":HDR" : ":TRL", is_client ? ":CLI:" : ":SVR:");
1360   md_batch->Log([&prefix](absl::string_view key, absl::string_view value) {
1361     gpr_log(GPR_INFO, "%s", absl::StrCat(prefix, key, ": ", value).c_str());
1362   });
1363 }
1364 
perform_stream_op_locked(void * stream_op,grpc_error_handle)1365 static void perform_stream_op_locked(void* stream_op,
1366                                      grpc_error_handle /*error_ignored*/) {
1367   grpc_transport_stream_op_batch* op =
1368       static_cast<grpc_transport_stream_op_batch*>(stream_op);
1369   grpc_chttp2_stream* s =
1370       static_cast<grpc_chttp2_stream*>(op->handler_private.extra_arg);
1371   grpc_transport_stream_op_batch_payload* op_payload = op->payload;
1372   grpc_chttp2_transport* t = s->t;
1373 
1374   s->context = op->payload->context;
1375   s->traced = op->is_traced;
1376   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1377     gpr_log(GPR_INFO,
1378             "perform_stream_op_locked[s=%p; op=%p]: %s; on_complete = %p", s,
1379             op, grpc_transport_stream_op_batch_string(op, false).c_str(),
1380             op->on_complete);
1381     if (op->send_initial_metadata) {
1382       log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
1383                    s->id, t->is_client, true);
1384     }
1385     if (op->send_trailing_metadata) {
1386       log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata,
1387                    s->id, t->is_client, false);
1388     }
1389   }
1390 
1391   auto* tracer = CallTracerIfEnabled(s);
1392   if (tracer != nullptr) {
1393     tracer->RecordAnnotation(absl::StrFormat(
1394         "perform_stream_op_locked[s=%p; op=%p]: %s; on_complete = %p", s, op,
1395         grpc_transport_stream_op_batch_string(op, true).c_str(),
1396         op->on_complete));
1397   }
1398 
1399   grpc_closure* on_complete = op->on_complete;
1400   // on_complete will be null if and only if there are no send ops in the batch.
1401   if (on_complete != nullptr) {
1402     // This batch has send ops. Use final_data as a barrier until enqueue time;
1403     // the initial counter is dropped at the end of this function.
1404     on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
1405     on_complete->error_data.error = 0;
1406   }
1407 
1408   if (op->cancel_stream) {
1409     grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
1410   }
1411 
1412   if (op->send_initial_metadata) {
1413     if (t->is_client && t->channelz_socket != nullptr) {
1414       t->channelz_socket->RecordStreamStartedFromLocal();
1415     }
1416     GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
1417     on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
1418 
1419     s->send_initial_metadata_finished = add_closure_barrier(on_complete);
1420     s->send_initial_metadata =
1421         op_payload->send_initial_metadata.send_initial_metadata;
1422     if (t->is_client) {
1423       s->deadline = std::min(
1424           s->deadline,
1425           s->send_initial_metadata->get(grpc_core::GrpcTimeoutMetadata())
1426               .value_or(grpc_core::Timestamp::InfFuture()));
1427     }
1428     if (contains_non_ok_status(s->send_initial_metadata)) {
1429       s->seen_error = true;
1430     }
1431     if (!s->write_closed) {
1432       if (t->is_client) {
1433         if (t->closed_with_error.ok()) {
1434           GPR_ASSERT(s->id == 0);
1435           grpc_chttp2_list_add_waiting_for_concurrency(t, s);
1436           maybe_start_some_streams(t);
1437         } else {
1438           s->trailing_metadata_buffer.Set(
1439               grpc_core::GrpcStreamNetworkState(),
1440               grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
1441           grpc_chttp2_cancel_stream(
1442               t, s,
1443               grpc_error_set_int(
1444                   GRPC_ERROR_CREATE_REFERENCING("Transport closed",
1445                                                 &t->closed_with_error, 1),
1446                   grpc_core::StatusIntProperty::kRpcStatus,
1447                   GRPC_STATUS_UNAVAILABLE));
1448         }
1449       } else {
1450         GPR_ASSERT(s->id != 0);
1451         grpc_chttp2_mark_stream_writable(t, s);
1452         if (!(op->send_message &&
1453               (op->payload->send_message.flags & GRPC_WRITE_BUFFER_HINT))) {
1454           grpc_chttp2_initiate_write(
1455               t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
1456         }
1457       }
1458     } else {
1459       s->send_initial_metadata = nullptr;
1460       grpc_chttp2_complete_closure_step(
1461           t, s, &s->send_initial_metadata_finished,
1462           GRPC_ERROR_CREATE_REFERENCING(
1463               "Attempt to send initial metadata after stream was closed",
1464               &s->write_closed_error, 1),
1465           "send_initial_metadata_finished");
1466     }
1467   }
1468 
1469   if (op->send_message) {
1470     t->num_messages_in_next_write++;
1471     grpc_core::global_stats().IncrementHttp2SendMessageSize(
1472         op->payload->send_message.send_message->Length());
1473     on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
1474     s->send_message_finished = add_closure_barrier(op->on_complete);
1475     const uint32_t flags = op_payload->send_message.flags;
1476     if (s->write_closed) {
1477       op->payload->send_message.stream_write_closed = true;
1478       // We should NOT return an error here, so as to avoid a cancel OP being
1479       // started. The surface layer will notice that the stream has been closed
1480       // for writes and fail the send message op.
1481       grpc_chttp2_complete_closure_step(t, s, &s->send_message_finished,
1482                                         absl::OkStatus(),
1483                                         "fetching_send_message_finished");
1484     } else {
1485       uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(
1486           &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
1487       frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
1488       size_t len = op_payload->send_message.send_message->Length();
1489       frame_hdr[1] = static_cast<uint8_t>(len >> 24);
1490       frame_hdr[2] = static_cast<uint8_t>(len >> 16);
1491       frame_hdr[3] = static_cast<uint8_t>(len >> 8);
1492       frame_hdr[4] = static_cast<uint8_t>(len);
1493 
1494       s->next_message_end_offset =
1495           s->flow_controlled_bytes_written +
1496           static_cast<int64_t>(s->flow_controlled_buffer.length) +
1497           static_cast<int64_t>(len);
1498       if (flags & GRPC_WRITE_BUFFER_HINT) {
1499         s->next_message_end_offset -= t->write_buffer_size;
1500         s->write_buffering = true;
1501       } else {
1502         s->write_buffering = false;
1503       }
1504 
1505       grpc_slice* const slices =
1506           op_payload->send_message.send_message->c_slice_buffer()->slices;
1507       grpc_slice* const end =
1508           slices + op_payload->send_message.send_message->Count();
1509       for (grpc_slice* slice = slices; slice != end; slice++) {
1510         grpc_slice_buffer_add(&s->flow_controlled_buffer,
1511                               grpc_core::CSliceRef(*slice));
1512       }
1513 
1514       int64_t notify_offset = s->next_message_end_offset;
1515       if (notify_offset <= s->flow_controlled_bytes_written) {
1516         grpc_chttp2_complete_closure_step(t, s, &s->send_message_finished,
1517                                           absl::OkStatus(),
1518                                           "fetching_send_message_finished");
1519       } else {
1520         grpc_chttp2_write_cb* cb = t->write_cb_pool;
1521         if (cb == nullptr) {
1522           cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb)));
1523         } else {
1524           t->write_cb_pool = cb->next;
1525         }
1526         cb->call_at_byte = notify_offset;
1527         cb->closure = s->send_message_finished;
1528         s->send_message_finished = nullptr;
1529         grpc_chttp2_write_cb** list = flags & GRPC_WRITE_THROUGH
1530                                           ? &s->on_write_finished_cbs
1531                                           : &s->on_flow_controlled_cbs;
1532         cb->next = *list;
1533         *list = cb;
1534       }
1535 
1536       if (s->id != 0 &&
1537           (!s->write_buffering ||
1538            s->flow_controlled_buffer.length > t->write_buffer_size)) {
1539         grpc_chttp2_mark_stream_writable(t, s);
1540         grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
1541       }
1542     }
1543   }
1544 
1545   if (op->send_trailing_metadata) {
1546     GPR_ASSERT(s->send_trailing_metadata_finished == nullptr);
1547     on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
1548     s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
1549     s->send_trailing_metadata =
1550         op_payload->send_trailing_metadata.send_trailing_metadata;
1551     s->sent_trailing_metadata_op = op_payload->send_trailing_metadata.sent;
1552     s->write_buffering = false;
1553     if (contains_non_ok_status(s->send_trailing_metadata)) {
1554       s->seen_error = true;
1555     }
1556     if (s->write_closed) {
1557       s->send_trailing_metadata = nullptr;
1558       s->sent_trailing_metadata_op = nullptr;
1559       grpc_chttp2_complete_closure_step(
1560           t, s, &s->send_trailing_metadata_finished,
1561           op->payload->send_trailing_metadata.send_trailing_metadata->empty()
1562               ? absl::OkStatus()
1563               : GRPC_ERROR_CREATE("Attempt to send trailing metadata after "
1564                                   "stream was closed"),
1565           "send_trailing_metadata_finished");
1566     } else if (s->id != 0) {
1567       // TODO(ctiller): check if there's flow control for any outstanding
1568       //   bytes before going writable
1569       grpc_chttp2_mark_stream_writable(t, s);
1570       grpc_chttp2_initiate_write(
1571           t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
1572     }
1573   }
1574 
1575   if (op->recv_initial_metadata) {
1576     GPR_ASSERT(s->recv_initial_metadata_ready == nullptr);
1577     s->recv_initial_metadata_ready =
1578         op_payload->recv_initial_metadata.recv_initial_metadata_ready;
1579     s->recv_initial_metadata =
1580         op_payload->recv_initial_metadata.recv_initial_metadata;
1581     s->trailing_metadata_available =
1582         op_payload->recv_initial_metadata.trailing_metadata_available;
1583     if (s->parsed_trailers_only && s->trailing_metadata_available != nullptr) {
1584       *s->trailing_metadata_available = true;
1585     }
1586     grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
1587   }
1588 
1589   if (op->recv_message) {
1590     GPR_ASSERT(s->recv_message_ready == nullptr);
1591     s->recv_message_ready = op_payload->recv_message.recv_message_ready;
1592     s->recv_message = op_payload->recv_message.recv_message;
1593     s->recv_message->emplace();
1594     s->recv_message_flags = op_payload->recv_message.flags;
1595     s->call_failed_before_recv_message =
1596         op_payload->recv_message.call_failed_before_recv_message;
1597     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1598   }
1599 
1600   if (op->recv_trailing_metadata) {
1601     GPR_ASSERT(s->collecting_stats == nullptr);
1602     s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
1603     GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
1604     s->recv_trailing_metadata_finished =
1605         op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1606     s->recv_trailing_metadata =
1607         op_payload->recv_trailing_metadata.recv_trailing_metadata;
1608     s->final_metadata_requested = true;
1609     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1610   }
1611 
1612   if (on_complete != nullptr) {
1613     grpc_chttp2_complete_closure_step(t, s, &on_complete, absl::OkStatus(),
1614                                       "op->on_complete");
1615   }
1616 
1617   GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op");
1618 }
1619 
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)1620 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1621                               grpc_transport_stream_op_batch* op) {
1622   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1623   grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
1624 
1625   if (!t->is_client) {
1626     if (op->send_initial_metadata) {
1627       GPR_ASSERT(!op->payload->send_initial_metadata.send_initial_metadata
1628                       ->get(grpc_core::GrpcTimeoutMetadata())
1629                       .has_value());
1630     }
1631     if (op->send_trailing_metadata) {
1632       GPR_ASSERT(!op->payload->send_trailing_metadata.send_trailing_metadata
1633                       ->get(grpc_core::GrpcTimeoutMetadata())
1634                       .has_value());
1635     }
1636   }
1637 
1638   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1639     gpr_log(GPR_INFO, "perform_stream_op[s=%p; op=%p]: %s", s, op,
1640             grpc_transport_stream_op_batch_string(op, false).c_str());
1641   }
1642 
1643   GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
1644   op->handler_private.extra_arg = gs;
1645   t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1646                                      perform_stream_op_locked, op, nullptr),
1647                    absl::OkStatus());
1648 }
1649 
cancel_pings(grpc_chttp2_transport * t,grpc_error_handle error)1650 static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error) {
1651   // callback remaining pings: they're not allowed to call into the transport,
1652   //   and maybe they hold resources that need to be freed
1653   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1654   GPR_ASSERT(!error.ok());
1655   for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
1656     grpc_closure_list_fail_all(&pq->lists[j], error);
1657     grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &pq->lists[j]);
1658   }
1659 }
1660 
send_ping_locked(grpc_chttp2_transport * t,grpc_closure * on_initiate,grpc_closure * on_ack)1661 static void send_ping_locked(grpc_chttp2_transport* t,
1662                              grpc_closure* on_initiate, grpc_closure* on_ack) {
1663   if (!t->closed_with_error.ok()) {
1664     grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_initiate, t->closed_with_error);
1665     grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_ack, t->closed_with_error);
1666     return;
1667   }
1668   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1669   grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
1670                            absl::OkStatus());
1671   grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
1672                            absl::OkStatus());
1673 }
1674 
1675 // Specialized form of send_ping_locked for keepalive ping. If there is already
1676 // a ping in progress, the keepalive ping would piggyback onto that ping,
1677 // instead of waiting for that ping to complete and then starting a new ping.
send_keepalive_ping_locked(grpc_chttp2_transport * t)1678 static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
1679   if (!t->closed_with_error.ok()) {
1680     t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
1681                                        start_keepalive_ping_locked, t, nullptr),
1682                      t->closed_with_error);
1683     t->combiner->Run(
1684         GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
1685                           finish_keepalive_ping_locked, t, nullptr),
1686         t->closed_with_error);
1687     return;
1688   }
1689   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1690   if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
1691     // There is a ping in flight. Add yourself to the inflight closure list.
1692     t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
1693                                        start_keepalive_ping_locked, t, nullptr),
1694                      t->closed_with_error);
1695     grpc_closure_list_append(
1696         &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT],
1697         GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
1698                           finish_keepalive_ping, t, grpc_schedule_on_exec_ctx),
1699         absl::OkStatus());
1700     return;
1701   }
1702   grpc_closure_list_append(
1703       &pq->lists[GRPC_CHTTP2_PCL_INITIATE],
1704       GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, start_keepalive_ping,
1705                         t, grpc_schedule_on_exec_ctx),
1706       absl::OkStatus());
1707   grpc_closure_list_append(
1708       &pq->lists[GRPC_CHTTP2_PCL_NEXT],
1709       GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, finish_keepalive_ping,
1710                         t, grpc_schedule_on_exec_ctx),
1711       absl::OkStatus());
1712 }
1713 
grpc_chttp2_retry_initiate_ping(grpc_chttp2_transport * t)1714 void grpc_chttp2_retry_initiate_ping(grpc_chttp2_transport* t) {
1715   t->combiner->Run(GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked,
1716                                      retry_initiate_ping_locked, t, nullptr),
1717                    absl::OkStatus());
1718 }
1719 
retry_initiate_ping_locked(void * tp,GRPC_UNUSED grpc_error_handle error)1720 static void retry_initiate_ping_locked(void* tp,
1721                                        GRPC_UNUSED grpc_error_handle error) {
1722   GPR_DEBUG_ASSERT(error.ok());
1723   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1724   GPR_ASSERT(t->ping_state.delayed_ping_timer_handle.has_value());
1725   t->ping_state.delayed_ping_timer_handle.reset();
1726   grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
1727   GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked");
1728 }
1729 
grpc_chttp2_ack_ping(grpc_chttp2_transport * t,uint64_t id)1730 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
1731   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1732   if (pq->inflight_id != id) {
1733     gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64,
1734             std::string(t->peer_string.as_string_view()).c_str(), id);
1735     return;
1736   }
1737   grpc_core::ExecCtx::RunList(DEBUG_LOCATION,
1738                               &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
1739   if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
1740     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
1741   }
1742 }
1743 
1744 namespace {
1745 
1746 // Fire and forget (deletes itself on completion). Does a graceful shutdown by
1747 // sending a GOAWAY frame with the last stream id set to 2^31-1, sending a ping
1748 // and waiting for an ack (effective waiting for an RTT) and then sending a
1749 // final GOAWAY frame with an updated last stream identifier. This helps ensure
1750 // that a connection can be cleanly shut down without losing requests.
1751 // In the event, that the client does not respond to the ping for some reason,
1752 // we add a 20 second deadline, after which we send the second goaway.
1753 class GracefulGoaway : public grpc_core::RefCounted<GracefulGoaway> {
1754  public:
Start(grpc_chttp2_transport * t)1755   static void Start(grpc_chttp2_transport* t) { new GracefulGoaway(t); }
1756 
~GracefulGoaway()1757   ~GracefulGoaway() override {
1758     GRPC_CHTTP2_UNREF_TRANSPORT(t_, "graceful goaway");
1759   }
1760 
1761  private:
1762   using TaskHandle = ::grpc_event_engine::experimental::EventEngine::TaskHandle;
1763 
GracefulGoaway(grpc_chttp2_transport * t)1764   explicit GracefulGoaway(grpc_chttp2_transport* t) : t_(t) {
1765     t->sent_goaway_state = GRPC_CHTTP2_GRACEFUL_GOAWAY;
1766     GRPC_CHTTP2_REF_TRANSPORT(t_, "graceful goaway");
1767     grpc_chttp2_goaway_append((1u << 31) - 1, 0, grpc_empty_slice(), &t->qbuf);
1768     send_ping_locked(
1769         t, nullptr, GRPC_CLOSURE_INIT(&on_ping_ack_, OnPingAck, this, nullptr));
1770     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1771     timer_handle_ = t_->event_engine->RunAfter(
1772         grpc_core::Duration::Seconds(20),
1773         [self = Ref(DEBUG_LOCATION, "GoawayTimer")]() mutable {
1774           grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1775           grpc_core::ExecCtx exec_ctx;
1776           // The ref will be unreffed in the combiner.
1777           auto* ptr = self.release();
1778           ptr->t_->combiner->Run(
1779               GRPC_CLOSURE_INIT(&ptr->on_timer_, OnTimerLocked, ptr, nullptr),
1780               absl::OkStatus());
1781         });
1782   }
1783 
MaybeSendFinalGoawayLocked()1784   void MaybeSendFinalGoawayLocked() {
1785     if (t_->sent_goaway_state != GRPC_CHTTP2_GRACEFUL_GOAWAY) {
1786       // We already sent the final GOAWAY.
1787       return;
1788     }
1789     if (t_->destroying || !t_->closed_with_error.ok()) {
1790       GRPC_CHTTP2_IF_TRACING(
1791           gpr_log(GPR_INFO,
1792                   "transport:%p %s peer:%s Transport already shutting down. "
1793                   "Graceful GOAWAY abandoned.",
1794                   t_, t_->is_client ? "CLIENT" : "SERVER",
1795                   std::string(t_->peer_string.as_string_view()).c_str()));
1796       return;
1797     }
1798     // Ping completed. Send final goaway.
1799     GRPC_CHTTP2_IF_TRACING(
1800         gpr_log(GPR_INFO,
1801                 "transport:%p %s peer:%s Graceful shutdown: Ping received. "
1802                 "Sending final GOAWAY with stream_id:%d",
1803                 t_, t_->is_client ? "CLIENT" : "SERVER",
1804                 std::string(t_->peer_string.as_string_view()).c_str(),
1805                 t_->last_new_stream_id));
1806     t_->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED;
1807     grpc_chttp2_goaway_append(t_->last_new_stream_id, 0, grpc_empty_slice(),
1808                               &t_->qbuf);
1809     grpc_chttp2_initiate_write(t_, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1810   }
1811 
OnPingAck(void * arg,grpc_error_handle)1812   static void OnPingAck(void* arg, grpc_error_handle /* error */) {
1813     auto* self = static_cast<GracefulGoaway*>(arg);
1814     self->t_->combiner->Run(
1815         GRPC_CLOSURE_INIT(&self->on_ping_ack_, OnPingAckLocked, self, nullptr),
1816         absl::OkStatus());
1817   }
1818 
OnPingAckLocked(void * arg,grpc_error_handle)1819   static void OnPingAckLocked(void* arg, grpc_error_handle /* error */) {
1820     auto* self = static_cast<GracefulGoaway*>(arg);
1821     if (self->timer_handle_ != TaskHandle::kInvalid) {
1822       self->t_->event_engine->Cancel(
1823           std::exchange(self->timer_handle_, TaskHandle::kInvalid));
1824     }
1825     self->MaybeSendFinalGoawayLocked();
1826     self->Unref();
1827   }
1828 
OnTimerLocked(void * arg,grpc_error_handle)1829   static void OnTimerLocked(void* arg, grpc_error_handle /* error */) {
1830     auto* self = static_cast<GracefulGoaway*>(arg);
1831     // Clearing the handle since the timer has fired and the handle is invalid.
1832     self->timer_handle_ = TaskHandle::kInvalid;
1833     self->MaybeSendFinalGoawayLocked();
1834     self->Unref();
1835   }
1836 
1837   grpc_chttp2_transport* t_;
1838   grpc_closure on_ping_ack_;
1839   TaskHandle timer_handle_ = TaskHandle::kInvalid;
1840   grpc_closure on_timer_;
1841 };
1842 
1843 }  // namespace
1844 
send_goaway(grpc_chttp2_transport * t,grpc_error_handle error,bool immediate_disconnect_hint)1845 static void send_goaway(grpc_chttp2_transport* t, grpc_error_handle error,
1846                         bool immediate_disconnect_hint) {
1847   grpc_http2_error_code http_error;
1848   std::string message;
1849   grpc_error_get_status(error, grpc_core::Timestamp::InfFuture(), nullptr,
1850                         &message, &http_error, nullptr);
1851   if (!t->is_client && http_error == GRPC_HTTP2_NO_ERROR &&
1852       !immediate_disconnect_hint) {
1853     // Do a graceful shutdown.
1854     if (t->sent_goaway_state == GRPC_CHTTP2_NO_GOAWAY_SEND) {
1855       GracefulGoaway::Start(t);
1856     } else {
1857       // Graceful GOAWAY is already in progress.
1858     }
1859   } else if (t->sent_goaway_state == GRPC_CHTTP2_NO_GOAWAY_SEND ||
1860              t->sent_goaway_state == GRPC_CHTTP2_GRACEFUL_GOAWAY) {
1861     // We want to log this irrespective of whether http tracing is enabled
1862     gpr_log(GPR_DEBUG, "%s: Sending goaway err=%s",
1863             std::string(t->peer_string.as_string_view()).c_str(),
1864             grpc_core::StatusToString(error).c_str());
1865     t->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED;
1866     grpc_chttp2_goaway_append(
1867         t->last_new_stream_id, static_cast<uint32_t>(http_error),
1868         grpc_slice_from_cpp_string(std::move(message)), &t->qbuf);
1869   } else {
1870     // Final GOAWAY has already been sent.
1871   }
1872   grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1873 }
1874 
grpc_chttp2_add_ping_strike(grpc_chttp2_transport * t)1875 void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) {
1876   if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes &&
1877       t->ping_policy.max_ping_strikes != 0) {
1878     send_goaway(t,
1879                 grpc_error_set_int(GRPC_ERROR_CREATE("too_many_pings"),
1880                                    grpc_core::StatusIntProperty::kHttp2Error,
1881                                    GRPC_HTTP2_ENHANCE_YOUR_CALM),
1882                 /*immediate_disconnect_hint=*/true);
1883     // The transport will be closed after the write is done
1884     close_transport_locked(
1885         t, grpc_error_set_int(GRPC_ERROR_CREATE("Too many pings"),
1886                               grpc_core::StatusIntProperty::kRpcStatus,
1887                               GRPC_STATUS_UNAVAILABLE));
1888   }
1889 }
1890 
grpc_chttp2_reset_ping_clock(grpc_chttp2_transport * t)1891 void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t) {
1892   if (!t->is_client) {
1893     t->ping_recv_state.last_ping_recv_time = grpc_core::Timestamp::InfPast();
1894     t->ping_recv_state.ping_strikes = 0;
1895   }
1896   t->ping_state.pings_before_data_required =
1897       t->ping_policy.max_pings_without_data;
1898 }
1899 
perform_transport_op_locked(void * stream_op,grpc_error_handle)1900 static void perform_transport_op_locked(void* stream_op,
1901                                         grpc_error_handle /*error_ignored*/) {
1902   grpc_transport_op* op = static_cast<grpc_transport_op*>(stream_op);
1903   grpc_chttp2_transport* t =
1904       static_cast<grpc_chttp2_transport*>(op->handler_private.extra_arg);
1905 
1906   if (!op->goaway_error.ok()) {
1907     send_goaway(t, op->goaway_error, /*immediate_disconnect_hint=*/false);
1908   }
1909 
1910   if (op->set_accept_stream) {
1911     t->accept_stream_cb = op->set_accept_stream_fn;
1912     t->accept_stream_cb_user_data = op->set_accept_stream_user_data;
1913   }
1914 
1915   if (op->bind_pollset) {
1916     grpc_endpoint_add_to_pollset(t->ep, op->bind_pollset);
1917   }
1918 
1919   if (op->bind_pollset_set) {
1920     grpc_endpoint_add_to_pollset_set(t->ep, op->bind_pollset_set);
1921   }
1922 
1923   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1924     send_ping_locked(t, op->send_ping.on_initiate, op->send_ping.on_ack);
1925     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
1926   }
1927 
1928   if (op->start_connectivity_watch != nullptr) {
1929     t->state_tracker.AddWatcher(op->start_connectivity_watch_state,
1930                                 std::move(op->start_connectivity_watch));
1931   }
1932   if (op->stop_connectivity_watch != nullptr) {
1933     t->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
1934   }
1935 
1936   if (!op->disconnect_with_error.ok()) {
1937     send_goaway(t, op->disconnect_with_error,
1938                 /*immediate_disconnect_hint=*/true);
1939     close_transport_locked(t, op->disconnect_with_error);
1940   }
1941 
1942   grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
1943 
1944   GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op");
1945 }
1946 
perform_transport_op(grpc_transport * gt,grpc_transport_op * op)1947 static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
1948   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1949   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1950     gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t,
1951             grpc_transport_op_string(op).c_str());
1952   }
1953   op->handler_private.extra_arg = gt;
1954   GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
1955   t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1956                                      perform_transport_op_locked, op, nullptr),
1957                    absl::OkStatus());
1958 }
1959 
1960 //
1961 // INPUT PROCESSING - GENERAL
1962 //
1963 
grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1964 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
1965                                                       grpc_chttp2_stream* s) {
1966   if (s->recv_initial_metadata_ready != nullptr &&
1967       s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
1968     if (s->seen_error) {
1969       grpc_slice_buffer_reset_and_unref(&s->frame_storage);
1970     }
1971     *s->recv_initial_metadata = std::move(s->initial_metadata_buffer);
1972     s->recv_initial_metadata->Set(grpc_core::PeerString(),
1973                                   t->peer_string.Ref());
1974     // If we didn't receive initial metadata from the wire and instead faked a
1975     // status (due to stream cancellations for example), let upper layers know
1976     // that trailing metadata is immediately available.
1977     if (s->trailing_metadata_available != nullptr &&
1978         s->published_metadata[0] != GRPC_METADATA_PUBLISHED_FROM_WIRE &&
1979         s->published_metadata[1] == GRPC_METADATA_SYNTHESIZED_FROM_FAKE) {
1980       *s->trailing_metadata_available = true;
1981       s->trailing_metadata_available = nullptr;
1982     }
1983     null_then_sched_closure(&s->recv_initial_metadata_ready);
1984   }
1985 }
1986 
grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1987 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
1988                                              grpc_chttp2_stream* s) {
1989   if (s->recv_message_ready == nullptr) return;
1990 
1991   grpc_core::chttp2::StreamFlowControl::IncomingUpdateContext upd(
1992       &s->flow_control);
1993   grpc_error_handle error;
1994 
1995   // Lambda is immediately invoked as a big scoped section that can be
1996   // exited out of at any point by returning.
1997   [&]() {
1998     if (grpc_http_trace.enabled()) {
1999       gpr_log(GPR_DEBUG,
2000               "maybe_complete_recv_message %p final_metadata_requested=%d "
2001               "seen_error=%d",
2002               s, s->final_metadata_requested, s->seen_error);
2003     }
2004     if (s->final_metadata_requested && s->seen_error) {
2005       grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2006       s->recv_message->reset();
2007     } else {
2008       if (s->frame_storage.length != 0) {
2009         while (true) {
2010           GPR_ASSERT(s->frame_storage.length > 0);
2011           int64_t min_progress_size;
2012           auto r = grpc_deframe_unprocessed_incoming_frames(
2013               s, &min_progress_size, &**s->recv_message, s->recv_message_flags);
2014           if (grpc_http_trace.enabled()) {
2015             gpr_log(GPR_DEBUG, "Deframe data frame: %s",
2016                     grpc_core::PollToString(r, [](absl::Status r) {
2017                       return r.ToString();
2018                     }).c_str());
2019           }
2020           if (r.pending()) {
2021             if (s->read_closed) {
2022               grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2023               s->recv_message->reset();
2024               break;
2025             } else {
2026               upd.SetMinProgressSize(min_progress_size);
2027               return;  // Out of lambda to enclosing function
2028             }
2029           } else {
2030             error = std::move(r.value());
2031             if (!error.ok()) {
2032               s->seen_error = true;
2033               grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2034               break;
2035             } else {
2036               if (t->channelz_socket != nullptr) {
2037                 t->channelz_socket->RecordMessageReceived();
2038               }
2039               break;
2040             }
2041           }
2042         }
2043       } else if (s->read_closed) {
2044         s->recv_message->reset();
2045       } else {
2046         upd.SetMinProgressSize(GRPC_HEADER_SIZE_IN_BYTES);
2047         return;  // Out of lambda to enclosing function
2048       }
2049     }
2050     // save the length of the buffer before handing control back to application
2051     // threads. Needed to support correct flow control bookkeeping
2052     if (error.ok() && s->recv_message->has_value()) {
2053       null_then_sched_closure(&s->recv_message_ready);
2054     } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
2055       if (s->call_failed_before_recv_message != nullptr) {
2056         *s->call_failed_before_recv_message =
2057             (s->published_metadata[1] != GRPC_METADATA_PUBLISHED_AT_CLOSE);
2058       }
2059       null_then_sched_closure(&s->recv_message_ready);
2060     }
2061   }();
2062 
2063   upd.SetPendingSize(s->frame_storage.length);
2064   grpc_chttp2_act_on_flowctl_action(upd.MakeAction(), t, s);
2065 }
2066 
grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)2067 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
2068                                                        grpc_chttp2_stream* s) {
2069   grpc_chttp2_maybe_complete_recv_message(t, s);
2070   if (grpc_http_trace.enabled()) {
2071     gpr_log(GPR_DEBUG,
2072             "maybe_complete_recv_trailing_metadata cli=%d s=%p closure=%p "
2073             "read_closed=%d "
2074             "write_closed=%d %" PRIdPTR,
2075             t->is_client, s, s->recv_trailing_metadata_finished, s->read_closed,
2076             s->write_closed, s->frame_storage.length);
2077   }
2078   if (s->recv_trailing_metadata_finished != nullptr && s->read_closed &&
2079       s->write_closed) {
2080     if (s->seen_error || !t->is_client) {
2081       grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2082     }
2083     if (s->read_closed && s->frame_storage.length == 0 &&
2084         s->recv_trailing_metadata_finished != nullptr) {
2085       grpc_transport_move_stats(&s->stats, s->collecting_stats);
2086       s->collecting_stats = nullptr;
2087       *s->recv_trailing_metadata = std::move(s->trailing_metadata_buffer);
2088       null_then_sched_closure(&s->recv_trailing_metadata_finished);
2089     }
2090   }
2091 }
2092 
remove_stream(grpc_chttp2_transport * t,uint32_t id,grpc_error_handle error)2093 static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
2094                           grpc_error_handle error) {
2095   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
2096       grpc_chttp2_stream_map_delete(&t->stream_map, id));
2097   GPR_DEBUG_ASSERT(s);
2098   if (t->incoming_stream == s) {
2099     t->incoming_stream = nullptr;
2100     grpc_chttp2_parsing_become_skip_parser(t);
2101   }
2102 
2103   if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
2104     post_benign_reclaimer(t);
2105     if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SENT) {
2106       close_transport_locked(
2107           t, GRPC_ERROR_CREATE_REFERENCING(
2108                  "Last stream closed after sending GOAWAY", &error, 1));
2109     }
2110   }
2111   if (grpc_chttp2_list_remove_writable_stream(t, s)) {
2112     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream");
2113   }
2114   grpc_chttp2_list_remove_stalled_by_stream(t, s);
2115   grpc_chttp2_list_remove_stalled_by_transport(t, s);
2116 
2117   maybe_start_some_streams(t);
2118 }
2119 
grpc_chttp2_cancel_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle due_to_error)2120 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2121                                grpc_error_handle due_to_error) {
2122   if (!t->is_client && !s->sent_trailing_metadata &&
2123       grpc_error_has_clear_grpc_status(due_to_error)) {
2124     close_from_api(t, s, due_to_error);
2125     return;
2126   }
2127 
2128   if (!s->read_closed || !s->write_closed) {
2129     if (s->id != 0) {
2130       grpc_http2_error_code http_error;
2131       grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr,
2132                             &http_error, nullptr);
2133       grpc_chttp2_add_rst_stream_to_next_write(
2134           t, s->id, static_cast<uint32_t>(http_error), &s->stats.outgoing);
2135       grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
2136     }
2137   }
2138   if (!due_to_error.ok() && !s->seen_error) {
2139     s->seen_error = true;
2140   }
2141   grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error);
2142 }
2143 
grpc_chttp2_fake_status(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error)2144 void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2145                              grpc_error_handle error) {
2146   grpc_status_code status;
2147   std::string message;
2148   grpc_error_get_status(error, s->deadline, &status, &message, nullptr,
2149                         nullptr);
2150   if (status != GRPC_STATUS_OK) {
2151     s->seen_error = true;
2152   }
2153   // stream_global->recv_trailing_metadata_finished gives us a
2154   //   last chance replacement: we've received trailing metadata,
2155   //   but something more important has become available to signal
2156   //   to the upper layers - drop what we've got, and then publish
2157   //   what we want - which is safe because we haven't told anyone
2158   //   about the metadata yet
2159   if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED ||
2160       s->recv_trailing_metadata_finished != nullptr ||
2161       !s->final_metadata_requested) {
2162     s->trailing_metadata_buffer.Set(grpc_core::GrpcStatusMetadata(), status);
2163     if (!message.empty()) {
2164       s->trailing_metadata_buffer.Set(
2165           grpc_core::GrpcMessageMetadata(),
2166           grpc_core::Slice::FromCopiedBuffer(message));
2167     }
2168     s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
2169     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2170   }
2171 }
2172 
add_error(grpc_error_handle error,grpc_error_handle * refs,size_t * nrefs)2173 static void add_error(grpc_error_handle error, grpc_error_handle* refs,
2174                       size_t* nrefs) {
2175   if (error.ok()) return;
2176   for (size_t i = 0; i < *nrefs; i++) {
2177     if (error == refs[i]) {
2178       return;
2179     }
2180   }
2181   refs[*nrefs] = error;
2182   ++*nrefs;
2183 }
2184 
removal_error(grpc_error_handle extra_error,grpc_chttp2_stream * s,const char * main_error_msg)2185 static grpc_error_handle removal_error(grpc_error_handle extra_error,
2186                                        grpc_chttp2_stream* s,
2187                                        const char* main_error_msg) {
2188   grpc_error_handle refs[3];
2189   size_t nrefs = 0;
2190   add_error(s->read_closed_error, refs, &nrefs);
2191   add_error(s->write_closed_error, refs, &nrefs);
2192   add_error(extra_error, refs, &nrefs);
2193   grpc_error_handle error;
2194   if (nrefs > 0) {
2195     error = GRPC_ERROR_CREATE_REFERENCING(main_error_msg, refs, nrefs);
2196   }
2197   return error;
2198 }
2199 
flush_write_list(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_write_cb ** list,grpc_error_handle error)2200 static void flush_write_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2201                              grpc_chttp2_write_cb** list,
2202                              grpc_error_handle error) {
2203   while (*list) {
2204     grpc_chttp2_write_cb* cb = *list;
2205     *list = cb->next;
2206     grpc_chttp2_complete_closure_step(t, s, &cb->closure, error,
2207                                       "on_write_finished_cb");
2208     cb->next = t->write_cb_pool;
2209     t->write_cb_pool = cb;
2210   }
2211 }
2212 
grpc_chttp2_fail_pending_writes(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error)2213 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
2214                                      grpc_chttp2_stream* s,
2215                                      grpc_error_handle error) {
2216   error =
2217       removal_error(error, s, "Pending writes failed due to stream closure");
2218   s->send_initial_metadata = nullptr;
2219   grpc_chttp2_complete_closure_step(t, s, &s->send_initial_metadata_finished,
2220                                     error, "send_initial_metadata_finished");
2221 
2222   s->send_trailing_metadata = nullptr;
2223   s->sent_trailing_metadata_op = nullptr;
2224   grpc_chttp2_complete_closure_step(t, s, &s->send_trailing_metadata_finished,
2225                                     error, "send_trailing_metadata_finished");
2226 
2227   grpc_chttp2_complete_closure_step(t, s, &s->send_message_finished, error,
2228                                     "fetching_send_message_finished");
2229   flush_write_list(t, s, &s->on_write_finished_cbs, error);
2230   flush_write_list(t, s, &s->on_flow_controlled_cbs, error);
2231 }
2232 
grpc_chttp2_mark_stream_closed(grpc_chttp2_transport * t,grpc_chttp2_stream * s,int close_reads,int close_writes,grpc_error_handle error)2233 void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
2234                                     grpc_chttp2_stream* s, int close_reads,
2235                                     int close_writes, grpc_error_handle error) {
2236   if (grpc_http_trace.enabled()) {
2237     gpr_log(
2238         GPR_DEBUG, "MARK_STREAM_CLOSED: t=%p s=%p(id=%d) %s [%s]", t, s, s->id,
2239         (close_reads && close_writes)
2240             ? "read+write"
2241             : (close_reads ? "read" : (close_writes ? "write" : "nothing??")),
2242         error.ToString().c_str());
2243   }
2244   if (s->read_closed && s->write_closed) {
2245     // already closed, but we should still fake the status if needed.
2246     grpc_error_handle overall_error = removal_error(error, s, "Stream removed");
2247     if (!overall_error.ok()) {
2248       grpc_chttp2_fake_status(t, s, overall_error);
2249     }
2250     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2251     return;
2252   }
2253   bool closed_read = false;
2254   bool became_closed = false;
2255   if (close_reads && !s->read_closed) {
2256     s->read_closed_error = error;
2257     s->read_closed = true;
2258     closed_read = true;
2259   }
2260   if (close_writes && !s->write_closed) {
2261     s->write_closed_error = error;
2262     s->write_closed = true;
2263     grpc_chttp2_fail_pending_writes(t, s, error);
2264   }
2265   if (s->read_closed && s->write_closed) {
2266     became_closed = true;
2267     grpc_error_handle overall_error = removal_error(error, s, "Stream removed");
2268     if (s->id != 0) {
2269       remove_stream(t, s->id, overall_error);
2270     } else {
2271       // Purge streams waiting on concurrency still waiting for id assignment
2272       grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
2273     }
2274     if (!overall_error.ok()) {
2275       grpc_chttp2_fake_status(t, s, overall_error);
2276     }
2277   }
2278   if (closed_read) {
2279     for (int i = 0; i < 2; i++) {
2280       if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) {
2281         s->published_metadata[i] = GRPC_METADATA_PUBLISHED_AT_CLOSE;
2282       }
2283     }
2284     grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
2285     grpc_chttp2_maybe_complete_recv_message(t, s);
2286   }
2287   if (became_closed) {
2288     s->stats.latency =
2289         gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), s->creation_time);
2290     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2291     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2");
2292   }
2293 }
2294 
close_from_api(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error)2295 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2296                            grpc_error_handle error) {
2297   grpc_slice hdr;
2298   grpc_slice status_hdr;
2299   grpc_slice http_status_hdr;
2300   grpc_slice content_type_hdr;
2301   grpc_slice message_pfx;
2302   uint8_t* p;
2303   uint32_t len = 0;
2304   grpc_status_code grpc_status;
2305   std::string message;
2306   grpc_error_get_status(error, s->deadline, &grpc_status, &message, nullptr,
2307                         nullptr);
2308 
2309   GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100);
2310 
2311   // Hand roll a header block.
2312   //   This is unnecessarily ugly - at some point we should find a more
2313   //   elegant solution.
2314   //   It's complicated by the fact that our send machinery would be dead by
2315   //   the time we got around to sending this, so instead we ignore HPACK
2316   //   compression and just write the uncompressed bytes onto the wire.
2317   if (!s->sent_initial_metadata) {
2318     http_status_hdr = GRPC_SLICE_MALLOC(13);
2319     p = GRPC_SLICE_START_PTR(http_status_hdr);
2320     *p++ = 0x00;
2321     *p++ = 7;
2322     *p++ = ':';
2323     *p++ = 's';
2324     *p++ = 't';
2325     *p++ = 'a';
2326     *p++ = 't';
2327     *p++ = 'u';
2328     *p++ = 's';
2329     *p++ = 3;
2330     *p++ = '2';
2331     *p++ = '0';
2332     *p++ = '0';
2333     GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
2334     len += static_cast<uint32_t> GRPC_SLICE_LENGTH(http_status_hdr);
2335 
2336     content_type_hdr = GRPC_SLICE_MALLOC(31);
2337     p = GRPC_SLICE_START_PTR(content_type_hdr);
2338     *p++ = 0x00;
2339     *p++ = 12;
2340     *p++ = 'c';
2341     *p++ = 'o';
2342     *p++ = 'n';
2343     *p++ = 't';
2344     *p++ = 'e';
2345     *p++ = 'n';
2346     *p++ = 't';
2347     *p++ = '-';
2348     *p++ = 't';
2349     *p++ = 'y';
2350     *p++ = 'p';
2351     *p++ = 'e';
2352     *p++ = 16;
2353     *p++ = 'a';
2354     *p++ = 'p';
2355     *p++ = 'p';
2356     *p++ = 'l';
2357     *p++ = 'i';
2358     *p++ = 'c';
2359     *p++ = 'a';
2360     *p++ = 't';
2361     *p++ = 'i';
2362     *p++ = 'o';
2363     *p++ = 'n';
2364     *p++ = '/';
2365     *p++ = 'g';
2366     *p++ = 'r';
2367     *p++ = 'p';
2368     *p++ = 'c';
2369     GPR_ASSERT(p == GRPC_SLICE_END_PTR(content_type_hdr));
2370     len += static_cast<uint32_t> GRPC_SLICE_LENGTH(content_type_hdr);
2371   }
2372 
2373   status_hdr = GRPC_SLICE_MALLOC(15 + (grpc_status >= 10));
2374   p = GRPC_SLICE_START_PTR(status_hdr);
2375   *p++ = 0x00;  // literal header, not indexed
2376   *p++ = 11;    // len(grpc-status)
2377   *p++ = 'g';
2378   *p++ = 'r';
2379   *p++ = 'p';
2380   *p++ = 'c';
2381   *p++ = '-';
2382   *p++ = 's';
2383   *p++ = 't';
2384   *p++ = 'a';
2385   *p++ = 't';
2386   *p++ = 'u';
2387   *p++ = 's';
2388   if (grpc_status < 10) {
2389     *p++ = 1;
2390     *p++ = static_cast<uint8_t>('0' + grpc_status);
2391   } else {
2392     *p++ = 2;
2393     *p++ = static_cast<uint8_t>('0' + (grpc_status / 10));
2394     *p++ = static_cast<uint8_t>('0' + (grpc_status % 10));
2395   }
2396   GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr));
2397   len += static_cast<uint32_t> GRPC_SLICE_LENGTH(status_hdr);
2398 
2399   size_t msg_len = message.length();
2400   GPR_ASSERT(msg_len <= UINT32_MAX);
2401   grpc_core::VarintWriter<1> msg_len_writer(static_cast<uint32_t>(msg_len));
2402   message_pfx = GRPC_SLICE_MALLOC(14 + msg_len_writer.length());
2403   p = GRPC_SLICE_START_PTR(message_pfx);
2404   *p++ = 0x00;  // literal header, not indexed
2405   *p++ = 12;    // len(grpc-message)
2406   *p++ = 'g';
2407   *p++ = 'r';
2408   *p++ = 'p';
2409   *p++ = 'c';
2410   *p++ = '-';
2411   *p++ = 'm';
2412   *p++ = 'e';
2413   *p++ = 's';
2414   *p++ = 's';
2415   *p++ = 'a';
2416   *p++ = 'g';
2417   *p++ = 'e';
2418   msg_len_writer.Write(0, p);
2419   p += msg_len_writer.length();
2420   GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
2421   len += static_cast<uint32_t> GRPC_SLICE_LENGTH(message_pfx);
2422   len += static_cast<uint32_t>(msg_len);
2423 
2424   hdr = GRPC_SLICE_MALLOC(9);
2425   p = GRPC_SLICE_START_PTR(hdr);
2426   *p++ = static_cast<uint8_t>(len >> 16);
2427   *p++ = static_cast<uint8_t>(len >> 8);
2428   *p++ = static_cast<uint8_t>(len);
2429   *p++ = GRPC_CHTTP2_FRAME_HEADER;
2430   *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
2431   *p++ = static_cast<uint8_t>(s->id >> 24);
2432   *p++ = static_cast<uint8_t>(s->id >> 16);
2433   *p++ = static_cast<uint8_t>(s->id >> 8);
2434   *p++ = static_cast<uint8_t>(s->id);
2435   GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
2436 
2437   grpc_slice_buffer_add(&t->qbuf, hdr);
2438   if (!s->sent_initial_metadata) {
2439     grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
2440     grpc_slice_buffer_add(&t->qbuf, content_type_hdr);
2441   }
2442   grpc_slice_buffer_add(&t->qbuf, status_hdr);
2443   grpc_slice_buffer_add(&t->qbuf, message_pfx);
2444   grpc_slice_buffer_add(&t->qbuf,
2445                         grpc_slice_from_cpp_string(std::move(message)));
2446   grpc_chttp2_reset_ping_clock(t);
2447   grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
2448                                            &s->stats.outgoing);
2449 
2450   grpc_chttp2_mark_stream_closed(t, s, 1, 1, error);
2451   grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
2452 }
2453 
2454 struct cancel_stream_cb_args {
2455   grpc_error_handle error;
2456   grpc_chttp2_transport* t;
2457 };
2458 
cancel_stream_cb(void * user_data,uint32_t,void * stream)2459 static void cancel_stream_cb(void* user_data, uint32_t /*key*/, void* stream) {
2460   cancel_stream_cb_args* args = static_cast<cancel_stream_cb_args*>(user_data);
2461   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(stream);
2462   grpc_chttp2_cancel_stream(args->t, s, args->error);
2463 }
2464 
end_all_the_calls(grpc_chttp2_transport * t,grpc_error_handle error)2465 static void end_all_the_calls(grpc_chttp2_transport* t,
2466                               grpc_error_handle error) {
2467   intptr_t http2_error;
2468   // If there is no explicit grpc or HTTP/2 error, set to UNAVAILABLE on server.
2469   if (!t->is_client && !grpc_error_has_clear_grpc_status(error) &&
2470       !grpc_error_get_int(error, grpc_core::StatusIntProperty::kHttp2Error,
2471                           &http2_error)) {
2472     error = grpc_error_set_int(error, grpc_core::StatusIntProperty::kRpcStatus,
2473                                GRPC_STATUS_UNAVAILABLE);
2474   }
2475   cancel_unstarted_streams(t, error);
2476   cancel_stream_cb_args args = {error, t};
2477   grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, &args);
2478 }
2479 
2480 //
2481 // INPUT PROCESSING - PARSING
2482 //
2483 
2484 template <class F>
WithUrgency(grpc_chttp2_transport * t,grpc_core::chttp2::FlowControlAction::Urgency urgency,grpc_chttp2_initiate_write_reason reason,F action)2485 static void WithUrgency(grpc_chttp2_transport* t,
2486                         grpc_core::chttp2::FlowControlAction::Urgency urgency,
2487                         grpc_chttp2_initiate_write_reason reason, F action) {
2488   switch (urgency) {
2489     case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
2490       break;
2491     case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
2492       grpc_chttp2_initiate_write(t, reason);
2493       ABSL_FALLTHROUGH_INTENDED;
2494     case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
2495       action();
2496       break;
2497   }
2498 }
2499 
grpc_chttp2_act_on_flowctl_action(const grpc_core::chttp2::FlowControlAction & action,grpc_chttp2_transport * t,grpc_chttp2_stream * s)2500 void grpc_chttp2_act_on_flowctl_action(
2501     const grpc_core::chttp2::FlowControlAction& action,
2502     grpc_chttp2_transport* t, grpc_chttp2_stream* s) {
2503   WithUrgency(t, action.send_stream_update(),
2504               GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL, [t, s]() {
2505                 if (s->id != 0 && !s->read_closed) {
2506                   grpc_chttp2_mark_stream_writable(t, s);
2507                 }
2508               });
2509   WithUrgency(t, action.send_transport_update(),
2510               GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
2511   WithUrgency(t, action.send_initial_window_update(),
2512               GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2513                 queue_setting_update(t,
2514                                      GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
2515                                      action.initial_window_size());
2516               });
2517   WithUrgency(t, action.send_max_frame_size_update(),
2518               GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2519                 queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
2520                                      action.max_frame_size());
2521               });
2522   if (t->enable_preferred_rx_crypto_frame_advertisement) {
2523     WithUrgency(
2524         t, action.preferred_rx_crypto_frame_size_update(),
2525         GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2526           queue_setting_update(
2527               t, GRPC_CHTTP2_SETTINGS_GRPC_PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE,
2528               action.preferred_rx_crypto_frame_size());
2529         });
2530   }
2531 }
2532 
try_http_parsing(grpc_chttp2_transport * t)2533 static grpc_error_handle try_http_parsing(grpc_chttp2_transport* t) {
2534   grpc_http_parser parser;
2535   size_t i = 0;
2536   grpc_error_handle error;
2537   grpc_http_response response;
2538 
2539   grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response);
2540 
2541   grpc_error_handle parse_error;
2542   for (; i < t->read_buffer.count && parse_error.ok(); i++) {
2543     parse_error =
2544         grpc_http_parser_parse(&parser, t->read_buffer.slices[i], nullptr);
2545   }
2546   if (parse_error.ok() &&
2547       (parse_error = grpc_http_parser_eof(&parser)) == absl::OkStatus()) {
2548     error = grpc_error_set_int(
2549         grpc_error_set_int(
2550             GRPC_ERROR_CREATE("Trying to connect an http1.x server"),
2551             grpc_core::StatusIntProperty::kHttpStatus, response.status),
2552         grpc_core::StatusIntProperty::kRpcStatus,
2553         grpc_http2_status_to_grpc_status(response.status));
2554   }
2555 
2556   grpc_http_parser_destroy(&parser);
2557   grpc_http_response_destroy(&response);
2558   return error;
2559 }
2560 
read_action(void * tp,grpc_error_handle error)2561 static void read_action(void* tp, grpc_error_handle error) {
2562   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2563   t->combiner->Run(
2564       GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr),
2565       error);
2566 }
2567 
read_action_locked(void * tp,grpc_error_handle error)2568 static void read_action_locked(void* tp, grpc_error_handle error) {
2569   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2570 
2571   grpc_error_handle err = error;
2572   if (!err.ok()) {
2573     err = grpc_error_set_int(
2574         GRPC_ERROR_CREATE_REFERENCING("Endpoint read failed", &err, 1),
2575         grpc_core::StatusIntProperty::kOccurredDuringWrite, t->write_state);
2576   }
2577   std::swap(err, error);
2578   if (t->closed_with_error.ok()) {
2579     size_t i = 0;
2580     grpc_error_handle errors[3] = {error, absl::OkStatus(), absl::OkStatus()};
2581     for (; i < t->read_buffer.count && errors[1] == absl::OkStatus(); i++) {
2582       errors[1] = grpc_chttp2_perform_read(t, t->read_buffer.slices[i]);
2583     }
2584     if (errors[1] != absl::OkStatus()) {
2585       errors[2] = try_http_parsing(t);
2586       error = GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors,
2587                                             GPR_ARRAY_SIZE(errors));
2588     }
2589 
2590     if (t->initial_window_update != 0) {
2591       if (t->initial_window_update > 0) {
2592         grpc_chttp2_stream* s;
2593         while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
2594           grpc_chttp2_mark_stream_writable(t, s);
2595           grpc_chttp2_initiate_write(
2596               t, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
2597         }
2598       }
2599       t->initial_window_update = 0;
2600     }
2601   }
2602 
2603   bool keep_reading = false;
2604   if (error.ok() && !t->closed_with_error.ok()) {
2605     error = GRPC_ERROR_CREATE_REFERENCING("Transport closed",
2606                                           &t->closed_with_error, 1);
2607   }
2608   if (!error.ok()) {
2609     // If a goaway frame was received, this might be the reason why the read
2610     // failed. Add this info to the error
2611     if (!t->goaway_error.ok()) {
2612       error = grpc_error_add_child(error, t->goaway_error);
2613     }
2614 
2615     close_transport_locked(t, error);
2616     t->endpoint_reading = 0;
2617   } else if (t->closed_with_error.ok()) {
2618     keep_reading = true;
2619     // Since we have read a byte, reset the keepalive timer
2620     if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2621       maybe_reset_keepalive_ping_timer_locked(t);
2622     }
2623   }
2624   grpc_slice_buffer_reset_and_unref(&t->read_buffer);
2625 
2626   if (keep_reading) {
2627     if (t->num_pending_induced_frames >= DEFAULT_MAX_PENDING_INDUCED_FRAMES) {
2628       t->reading_paused_on_pending_induced_frames = true;
2629       GRPC_CHTTP2_IF_TRACING(
2630           gpr_log(GPR_INFO,
2631                   "transport %p : Pausing reading due to too "
2632                   "many unwritten SETTINGS ACK and RST_STREAM frames",
2633                   t));
2634     } else {
2635       continue_read_action_locked(t);
2636     }
2637   } else {
2638     GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action");
2639   }
2640 }
2641 
continue_read_action_locked(grpc_chttp2_transport * t)2642 static void continue_read_action_locked(grpc_chttp2_transport* t) {
2643   const bool urgent = !t->goaway_error.ok();
2644   GRPC_CLOSURE_INIT(&t->read_action_locked, read_action, t,
2645                     grpc_schedule_on_exec_ctx);
2646   grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent,
2647                      grpc_chttp2_min_read_progress_size(t));
2648 }
2649 
2650 // t is reffed prior to calling the first time, and once the callback chain
2651 // that kicks off finishes, it's unreffed
schedule_bdp_ping_locked(grpc_chttp2_transport * t)2652 void schedule_bdp_ping_locked(grpc_chttp2_transport* t) {
2653   t->flow_control.bdp_estimator()->SchedulePing();
2654   send_ping_locked(
2655       t,
2656       GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping, t,
2657                         grpc_schedule_on_exec_ctx),
2658       GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping, t,
2659                         grpc_schedule_on_exec_ctx));
2660   grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_BDP_PING);
2661 }
2662 
start_bdp_ping(void * tp,grpc_error_handle error)2663 static void start_bdp_ping(void* tp, grpc_error_handle error) {
2664   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2665   t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked,
2666                                      start_bdp_ping_locked, t, nullptr),
2667                    error);
2668 }
2669 
start_bdp_ping_locked(void * tp,grpc_error_handle error)2670 static void start_bdp_ping_locked(void* tp, grpc_error_handle error) {
2671   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2672   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2673     gpr_log(GPR_INFO, "%s: Start BDP ping err=%s",
2674             std::string(t->peer_string.as_string_view()).c_str(),
2675             grpc_core::StatusToString(error).c_str());
2676   }
2677   if (!error.ok() || !t->closed_with_error.ok()) {
2678     return;
2679   }
2680   // Reset the keepalive ping timer
2681   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2682     maybe_reset_keepalive_ping_timer_locked(t);
2683   }
2684   t->flow_control.bdp_estimator()->StartPing();
2685   t->bdp_ping_started = true;
2686 }
2687 
finish_bdp_ping(void * tp,grpc_error_handle error)2688 static void finish_bdp_ping(void* tp, grpc_error_handle error) {
2689   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2690   t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked,
2691                                      finish_bdp_ping_locked, t, nullptr),
2692                    error);
2693 }
2694 
finish_bdp_ping_locked(void * tp,grpc_error_handle error)2695 static void finish_bdp_ping_locked(void* tp, grpc_error_handle error) {
2696   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2697   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2698     gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s",
2699             std::string(t->peer_string.as_string_view()).c_str(),
2700             grpc_core::StatusToString(error).c_str());
2701   }
2702   if (!error.ok() || !t->closed_with_error.ok()) {
2703     GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2704     return;
2705   }
2706   if (!t->bdp_ping_started) {
2707     // start_bdp_ping_locked has not been run yet. Schedule
2708     // finish_bdp_ping_locked to be run later.
2709     t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked,
2710                                        finish_bdp_ping_locked, t, nullptr),
2711                      error);
2712     return;
2713   }
2714   t->bdp_ping_started = false;
2715   grpc_core::Timestamp next_ping =
2716       t->flow_control.bdp_estimator()->CompletePing();
2717   grpc_chttp2_act_on_flowctl_action(t->flow_control.PeriodicUpdate(), t,
2718                                     nullptr);
2719   GPR_ASSERT(!t->next_bdp_ping_timer_handle.has_value());
2720   t->next_bdp_ping_timer_handle =
2721       t->event_engine->RunAfter(next_ping - grpc_core::Timestamp::Now(), [t] {
2722         grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2723         grpc_core::ExecCtx exec_ctx;
2724         next_bdp_ping_timer_expired(t);
2725       });
2726 }
2727 
next_bdp_ping_timer_expired(grpc_chttp2_transport * t)2728 static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t) {
2729   t->combiner->Run(
2730       GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
2731                         next_bdp_ping_timer_expired_locked, t, nullptr),
2732       absl::OkStatus());
2733 }
2734 
next_bdp_ping_timer_expired_locked(void * tp,GRPC_UNUSED grpc_error_handle error)2735 static void next_bdp_ping_timer_expired_locked(
2736     void* tp, GRPC_UNUSED grpc_error_handle error) {
2737   GPR_DEBUG_ASSERT(error.ok());
2738   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2739   GPR_ASSERT(t->next_bdp_ping_timer_handle.has_value());
2740   t->next_bdp_ping_timer_handle.reset();
2741   if (t->flow_control.bdp_estimator()->accumulator() == 0) {
2742     // Block the bdp ping till we receive more data.
2743     t->bdp_ping_blocked = true;
2744     GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2745   } else {
2746     schedule_bdp_ping_locked(t);
2747   }
2748 }
2749 
grpc_chttp2_config_default_keepalive_args(grpc_channel_args * args,bool is_client)2750 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
2751                                                bool is_client) {
2752   size_t i;
2753   if (args) {
2754     for (i = 0; i < args->num_args; i++) {
2755       if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
2756         const int value = grpc_channel_arg_get_integer(
2757             &args->args[i], {is_client ? g_default_client_keepalive_time_ms
2758                                        : g_default_server_keepalive_time_ms,
2759                              1, INT_MAX});
2760         if (is_client) {
2761           g_default_client_keepalive_time_ms = value;
2762         } else {
2763           g_default_server_keepalive_time_ms = value;
2764         }
2765       } else if (0 ==
2766                  strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
2767         const int value = grpc_channel_arg_get_integer(
2768             &args->args[i], {is_client ? g_default_client_keepalive_timeout_ms
2769                                        : g_default_server_keepalive_timeout_ms,
2770                              0, INT_MAX});
2771         if (is_client) {
2772           g_default_client_keepalive_timeout_ms = value;
2773         } else {
2774           g_default_server_keepalive_timeout_ms = value;
2775         }
2776       } else if (0 == strcmp(args->args[i].key,
2777                              GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
2778         const bool value = static_cast<uint32_t>(grpc_channel_arg_get_integer(
2779             &args->args[i],
2780             {is_client ? g_default_client_keepalive_permit_without_calls
2781                        : g_default_server_keepalive_timeout_ms,
2782              0, 1}));
2783         if (is_client) {
2784           g_default_client_keepalive_permit_without_calls = value;
2785         } else {
2786           g_default_server_keepalive_permit_without_calls = value;
2787         }
2788       } else if (0 ==
2789                  strcmp(args->args[i].key, GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
2790         g_default_max_ping_strikes = grpc_channel_arg_get_integer(
2791             &args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
2792       } else if (0 == strcmp(args->args[i].key,
2793                              GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
2794         g_default_max_pings_without_data = grpc_channel_arg_get_integer(
2795             &args->args[i], {g_default_max_pings_without_data, 0, INT_MAX});
2796       } else if (0 ==
2797                  strcmp(
2798                      args->args[i].key,
2799                      GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
2800         g_default_min_recv_ping_interval_without_data_ms =
2801             grpc_channel_arg_get_integer(
2802                 &args->args[i],
2803                 {g_default_min_recv_ping_interval_without_data_ms, 0, INT_MAX});
2804       }
2805     }
2806   }
2807 }
2808 
init_keepalive_ping(grpc_chttp2_transport * t)2809 static void init_keepalive_ping(grpc_chttp2_transport* t) {
2810   t->combiner->Run(GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked,
2811                                      init_keepalive_ping_locked, t, nullptr),
2812                    absl::OkStatus());
2813 }
2814 
init_keepalive_ping_locked(void * arg,GRPC_UNUSED grpc_error_handle error)2815 static void init_keepalive_ping_locked(void* arg,
2816                                        GRPC_UNUSED grpc_error_handle error) {
2817   GPR_DEBUG_ASSERT(error.ok());
2818   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2819   GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
2820   GPR_ASSERT(t->keepalive_ping_timer_handle.has_value());
2821   t->keepalive_ping_timer_handle.reset();
2822   if (t->destroying || !t->closed_with_error.ok()) {
2823     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2824   } else {
2825     if (t->keepalive_permit_without_calls ||
2826         grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
2827       t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
2828       GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
2829       send_keepalive_ping_locked(t);
2830       grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
2831     } else {
2832       GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2833       t->keepalive_ping_timer_handle =
2834           t->event_engine->RunAfter(t->keepalive_time, [t] {
2835             grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2836             grpc_core::ExecCtx exec_ctx;
2837             init_keepalive_ping(t);
2838           });
2839     }
2840   }
2841   GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
2842 }
2843 
start_keepalive_ping(void * arg,grpc_error_handle error)2844 static void start_keepalive_ping(void* arg, grpc_error_handle error) {
2845   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2846   t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
2847                                      start_keepalive_ping_locked, t, nullptr),
2848                    error);
2849 }
2850 
start_keepalive_ping_locked(void * arg,grpc_error_handle error)2851 static void start_keepalive_ping_locked(void* arg, grpc_error_handle error) {
2852   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2853   if (!error.ok()) {
2854     return;
2855   }
2856   if (t->channelz_socket != nullptr) {
2857     t->channelz_socket->RecordKeepaliveSent();
2858   }
2859   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2860       GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2861     gpr_log(GPR_INFO, "%s: Start keepalive ping",
2862             std::string(t->peer_string.as_string_view()).c_str());
2863   }
2864   GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
2865   t->keepalive_watchdog_timer_handle =
2866       t->event_engine->RunAfter(t->keepalive_timeout, [t] {
2867         grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2868         grpc_core::ExecCtx exec_ctx;
2869         keepalive_watchdog_fired(t);
2870       });
2871   t->keepalive_ping_started = true;
2872 }
2873 
finish_keepalive_ping(void * arg,grpc_error_handle error)2874 static void finish_keepalive_ping(void* arg, grpc_error_handle error) {
2875   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2876   t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
2877                                      finish_keepalive_ping_locked, t, nullptr),
2878                    error);
2879 }
2880 
finish_keepalive_ping_locked(void * arg,grpc_error_handle error)2881 static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error) {
2882   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2883   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2884     if (error.ok()) {
2885       if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2886           GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2887         gpr_log(GPR_INFO, "%s: Finish keepalive ping",
2888                 std::string(t->peer_string.as_string_view()).c_str());
2889       }
2890       if (!t->keepalive_ping_started) {
2891         // start_keepalive_ping_locked has not run yet. Reschedule
2892         // finish_keepalive_ping_locked for it to be run later.
2893         t->combiner->Run(
2894             GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
2895                               finish_keepalive_ping_locked, t, nullptr),
2896             error);
2897         return;
2898       }
2899       t->keepalive_ping_started = false;
2900       t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
2901       if (t->keepalive_watchdog_timer_handle.has_value()) {
2902         if (t->event_engine->Cancel(*t->keepalive_watchdog_timer_handle)) {
2903           GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
2904           t->keepalive_watchdog_timer_handle.reset();
2905         }
2906       }
2907       GPR_ASSERT(!t->keepalive_ping_timer_handle.has_value());
2908       GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2909       t->keepalive_ping_timer_handle =
2910           t->event_engine->RunAfter(t->keepalive_time, [t] {
2911             grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2912             grpc_core::ExecCtx exec_ctx;
2913             init_keepalive_ping(t);
2914           });
2915     }
2916   }
2917   GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end");
2918 }
2919 
keepalive_watchdog_fired(grpc_chttp2_transport * t)2920 static void keepalive_watchdog_fired(grpc_chttp2_transport* t) {
2921   t->combiner->Run(
2922       GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
2923                         keepalive_watchdog_fired_locked, t, nullptr),
2924       absl::OkStatus());
2925 }
2926 
keepalive_watchdog_fired_locked(void * arg,GRPC_UNUSED grpc_error_handle error)2927 static void keepalive_watchdog_fired_locked(
2928     void* arg, GRPC_UNUSED grpc_error_handle error) {
2929   GPR_DEBUG_ASSERT(error.ok());
2930   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2931   GPR_ASSERT(t->keepalive_watchdog_timer_handle.has_value());
2932   t->keepalive_watchdog_timer_handle.reset();
2933   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2934     gpr_log(GPR_INFO, "%s: Keepalive watchdog fired. Closing transport.",
2935             std::string(t->peer_string.as_string_view()).c_str());
2936     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2937     close_transport_locked(
2938         t, grpc_error_set_int(GRPC_ERROR_CREATE("keepalive watchdog timeout"),
2939                               grpc_core::StatusIntProperty::kRpcStatus,
2940                               GRPC_STATUS_UNAVAILABLE));
2941   } else {
2942     // If keepalive_state is not PINGING, we consider it as an error. Maybe the
2943     // cancellation failed in finish_keepalive_ping_locked. Users have seen
2944     // other states: https://github.com/grpc/grpc/issues/32085.
2945     gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
2946             t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
2947   }
2948   GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
2949 }
2950 
maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport * t)2951 static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t) {
2952   if (t->keepalive_ping_timer_handle.has_value()) {
2953     if (t->event_engine->Cancel(*t->keepalive_ping_timer_handle)) {
2954       // Cancel succeeds, resets the keepalive ping timer. Note that we don't
2955       // need to Ref or Unref here since we still hold the Ref.
2956       if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2957           GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2958         gpr_log(GPR_INFO, "%s: Keepalive ping cancelled. Resetting timer.",
2959                 std::string(t->peer_string.as_string_view()).c_str());
2960       }
2961       t->keepalive_ping_timer_handle =
2962           t->event_engine->RunAfter(t->keepalive_time, [t] {
2963             grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2964             grpc_core::ExecCtx exec_ctx;
2965             init_keepalive_ping(t);
2966           });
2967     }
2968   }
2969 }
2970 
2971 //
2972 // CALLBACK LOOP
2973 //
2974 
connectivity_state_set(grpc_chttp2_transport * t,grpc_connectivity_state state,const absl::Status & status,const char * reason)2975 static void connectivity_state_set(grpc_chttp2_transport* t,
2976                                    grpc_connectivity_state state,
2977                                    const absl::Status& status,
2978                                    const char* reason) {
2979   GRPC_CHTTP2_IF_TRACING(
2980       gpr_log(GPR_INFO, "transport %p set connectivity_state=%d", t, state));
2981   t->state_tracker.SetState(state, status, reason);
2982 }
2983 
2984 //
2985 // POLLSET STUFF
2986 //
2987 
set_pollset(grpc_transport * gt,grpc_stream *,grpc_pollset * pollset)2988 static void set_pollset(grpc_transport* gt, grpc_stream* /*gs*/,
2989                         grpc_pollset* pollset) {
2990   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2991   grpc_endpoint_add_to_pollset(t->ep, pollset);
2992 }
2993 
set_pollset_set(grpc_transport * gt,grpc_stream *,grpc_pollset_set * pollset_set)2994 static void set_pollset_set(grpc_transport* gt, grpc_stream* /*gs*/,
2995                             grpc_pollset_set* pollset_set) {
2996   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2997   grpc_endpoint_add_to_pollset_set(t->ep, pollset_set);
2998 }
2999 
3000 //
3001 // RESOURCE QUOTAS
3002 //
3003 
post_benign_reclaimer(grpc_chttp2_transport * t)3004 static void post_benign_reclaimer(grpc_chttp2_transport* t) {
3005   if (!t->benign_reclaimer_registered) {
3006     t->benign_reclaimer_registered = true;
3007     GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
3008     t->memory_owner.PostReclaimer(
3009         grpc_core::ReclamationPass::kBenign,
3010         [t](absl::optional<grpc_core::ReclamationSweep> sweep) {
3011           if (sweep.has_value()) {
3012             GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked,
3013                               benign_reclaimer_locked, t,
3014                               grpc_schedule_on_exec_ctx);
3015             t->active_reclamation = std::move(*sweep);
3016             t->combiner->Run(&t->benign_reclaimer_locked, absl::OkStatus());
3017           } else {
3018             GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer");
3019           }
3020         });
3021   }
3022 }
3023 
post_destructive_reclaimer(grpc_chttp2_transport * t)3024 static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
3025   if (!t->destructive_reclaimer_registered) {
3026     t->destructive_reclaimer_registered = true;
3027     GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
3028     t->memory_owner.PostReclaimer(
3029         grpc_core::ReclamationPass::kDestructive,
3030         [t](absl::optional<grpc_core::ReclamationSweep> sweep) {
3031           if (sweep.has_value()) {
3032             GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked,
3033                               destructive_reclaimer_locked, t,
3034                               grpc_schedule_on_exec_ctx);
3035             t->active_reclamation = std::move(*sweep);
3036             t->combiner->Run(&t->destructive_reclaimer_locked,
3037                              absl::OkStatus());
3038           } else {
3039             GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer");
3040           }
3041         });
3042   }
3043 }
3044 
benign_reclaimer_locked(void * arg,grpc_error_handle error)3045 static void benign_reclaimer_locked(void* arg, grpc_error_handle error) {
3046   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3047   if (error.ok() && grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
3048     // Channel with no active streams: send a goaway to try and make it
3049     // disconnect cleanly
3050     if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3051       gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory",
3052               std::string(t->peer_string.as_string_view()).c_str());
3053     }
3054     send_goaway(t,
3055                 grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
3056                                    grpc_core::StatusIntProperty::kHttp2Error,
3057                                    GRPC_HTTP2_ENHANCE_YOUR_CALM),
3058                 /*immediate_disconnect_hint=*/true);
3059   } else if (error.ok() && GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3060     gpr_log(GPR_INFO,
3061             "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
3062             " streams",
3063             std::string(t->peer_string.as_string_view()).c_str(),
3064             grpc_chttp2_stream_map_size(&t->stream_map));
3065   }
3066   t->benign_reclaimer_registered = false;
3067   if (error != absl::CancelledError()) {
3068     t->active_reclamation.Finish();
3069   }
3070   GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer");
3071 }
3072 
destructive_reclaimer_locked(void * arg,grpc_error_handle error)3073 static void destructive_reclaimer_locked(void* arg, grpc_error_handle error) {
3074   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3075   size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
3076   t->destructive_reclaimer_registered = false;
3077   if (error.ok() && n > 0) {
3078     grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
3079         grpc_chttp2_stream_map_rand(&t->stream_map));
3080     if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3081       gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d",
3082               std::string(t->peer_string.as_string_view()).c_str(), s->id);
3083     }
3084     grpc_chttp2_cancel_stream(
3085         t, s,
3086         grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
3087                            grpc_core::StatusIntProperty::kHttp2Error,
3088                            GRPC_HTTP2_ENHANCE_YOUR_CALM));
3089     if (n > 1) {
3090       // Since we cancel one stream per destructive reclamation, if
3091       //   there are more streams left, we can immediately post a new
3092       //   reclaimer in case the resource quota needs to free more
3093       //   memory
3094       post_destructive_reclaimer(t);
3095     }
3096   }
3097   if (error != absl::CancelledError()) {
3098     t->active_reclamation.Finish();
3099   }
3100   GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer");
3101 }
3102 
3103 //
3104 // MONITORING
3105 //
3106 
grpc_chttp2_initiate_write_reason_string(grpc_chttp2_initiate_write_reason reason)3107 const char* grpc_chttp2_initiate_write_reason_string(
3108     grpc_chttp2_initiate_write_reason reason) {
3109   switch (reason) {
3110     case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
3111       return "INITIAL_WRITE";
3112     case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
3113       return "START_NEW_STREAM";
3114     case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
3115       return "SEND_MESSAGE";
3116     case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
3117       return "SEND_INITIAL_METADATA";
3118     case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
3119       return "SEND_TRAILING_METADATA";
3120     case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
3121       return "RETRY_SEND_PING";
3122     case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
3123       return "CONTINUE_PINGS";
3124     case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
3125       return "GOAWAY_SENT";
3126     case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
3127       return "RST_STREAM";
3128     case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
3129       return "CLOSE_FROM_API";
3130     case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
3131       return "STREAM_FLOW_CONTROL";
3132     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
3133       return "TRANSPORT_FLOW_CONTROL";
3134     case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
3135       return "SEND_SETTINGS";
3136     case GRPC_CHTTP2_INITIATE_WRITE_SETTINGS_ACK:
3137       return "SETTINGS_ACK";
3138     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
3139       return "FLOW_CONTROL_UNSTALLED_BY_SETTING";
3140     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
3141       return "FLOW_CONTROL_UNSTALLED_BY_UPDATE";
3142     case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
3143       return "APPLICATION_PING";
3144     case GRPC_CHTTP2_INITIATE_WRITE_BDP_PING:
3145       return "BDP_PING";
3146     case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
3147       return "KEEPALIVE_PING";
3148     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
3149       return "TRANSPORT_FLOW_CONTROL_UNSTALLED";
3150     case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
3151       return "PING_RESPONSE";
3152     case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
3153       return "FORCE_RST_STREAM";
3154   }
3155   GPR_UNREACHABLE_CODE(return "unknown");
3156 }
3157 
chttp2_get_endpoint(grpc_transport * t)3158 static grpc_endpoint* chttp2_get_endpoint(grpc_transport* t) {
3159   return (reinterpret_cast<grpc_chttp2_transport*>(t))->ep;
3160 }
3161 
3162 static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
3163                                              false,
3164                                              "chttp2",
3165                                              init_stream,
3166                                              nullptr,
3167                                              set_pollset,
3168                                              set_pollset_set,
3169                                              perform_stream_op,
3170                                              perform_transport_op,
3171                                              destroy_stream,
3172                                              destroy_transport,
3173                                              chttp2_get_endpoint};
3174 
get_vtable(void)3175 static const grpc_transport_vtable* get_vtable(void) { return &vtable; }
3176 
3177 grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>
grpc_chttp2_transport_get_socket_node(grpc_transport * transport)3178 grpc_chttp2_transport_get_socket_node(grpc_transport* transport) {
3179   grpc_chttp2_transport* t =
3180       reinterpret_cast<grpc_chttp2_transport*>(transport);
3181   return t->channelz_socket;
3182 }
3183 
grpc_create_chttp2_transport(const grpc_core::ChannelArgs & channel_args,grpc_endpoint * ep,bool is_client)3184 grpc_transport* grpc_create_chttp2_transport(
3185     const grpc_core::ChannelArgs& channel_args, grpc_endpoint* ep,
3186     bool is_client) {
3187   auto t = new grpc_chttp2_transport(channel_args, ep, is_client);
3188   return &t->base;
3189 }
3190 
grpc_chttp2_transport_start_reading(grpc_transport * transport,grpc_slice_buffer * read_buffer,grpc_closure * notify_on_receive_settings,grpc_closure * notify_on_close)3191 void grpc_chttp2_transport_start_reading(
3192     grpc_transport* transport, grpc_slice_buffer* read_buffer,
3193     grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close) {
3194   grpc_chttp2_transport* t =
3195       reinterpret_cast<grpc_chttp2_transport*>(transport);
3196   GRPC_CHTTP2_REF_TRANSPORT(
3197       t, "reading_action");  // matches unref inside reading_action
3198   if (read_buffer != nullptr) {
3199     grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
3200     gpr_free(read_buffer);
3201   }
3202   t->combiner->Run(
3203       grpc_core::NewClosure([t, notify_on_receive_settings,
3204                              notify_on_close](grpc_error_handle) {
3205         if (!t->closed_with_error.ok()) {
3206           if (notify_on_receive_settings != nullptr) {
3207             grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify_on_receive_settings,
3208                                     t->closed_with_error);
3209           }
3210           if (notify_on_close != nullptr) {
3211             grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify_on_close,
3212                                     t->closed_with_error);
3213           }
3214           GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action");
3215           return;
3216         }
3217         t->notify_on_receive_settings = notify_on_receive_settings;
3218         t->notify_on_close = notify_on_close;
3219         read_action_locked(t, absl::OkStatus());
3220       }),
3221       absl::OkStatus());
3222 }
3223