1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
20
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stdio.h>
24 #include <string.h>
25
26 #include <algorithm>
27 #include <initializer_list>
28 #include <memory>
29 #include <new>
30 #include <string>
31 #include <utility>
32 #include <vector>
33
34 #include "absl/base/attributes.h"
35 #include "absl/status/status.h"
36 #include "absl/strings/cord.h"
37 #include "absl/strings/str_cat.h"
38 #include "absl/strings/str_format.h"
39 #include "absl/strings/string_view.h"
40 #include "absl/types/optional.h"
41
42 #include <grpc/event_engine/event_engine.h>
43 #include <grpc/grpc.h>
44 #include <grpc/impl/connectivity_state.h>
45 #include <grpc/slice_buffer.h>
46 #include <grpc/status.h>
47 #include <grpc/support/alloc.h>
48 #include <grpc/support/log.h>
49 #include <grpc/support/time.h>
50
51 #include "src/core/ext/transport/chttp2/transport/context_list_entry.h"
52 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
53 #include "src/core/ext/transport/chttp2/transport/frame.h"
54 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
55 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
56 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
57 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
58 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
59 #include "src/core/ext/transport/chttp2/transport/http_trace.h"
60 #include "src/core/ext/transport/chttp2/transport/internal.h"
61 #include "src/core/ext/transport/chttp2/transport/stream_map.h"
62 #include "src/core/ext/transport/chttp2/transport/varint.h"
63 #include "src/core/lib/channel/call_tracer.h"
64 #include "src/core/lib/channel/channel_args.h"
65 #include "src/core/lib/channel/context.h"
66 #include "src/core/lib/debug/stats.h"
67 #include "src/core/lib/debug/stats_data.h"
68 #include "src/core/lib/experiments/experiments.h"
69 #include "src/core/lib/gpr/useful.h"
70 #include "src/core/lib/gprpp/bitset.h"
71 #include "src/core/lib/gprpp/crash.h"
72 #include "src/core/lib/gprpp/debug_location.h"
73 #include "src/core/lib/gprpp/ref_counted.h"
74 #include "src/core/lib/gprpp/status_helper.h"
75 #include "src/core/lib/gprpp/time.h"
76 #include "src/core/lib/http/parser.h"
77 #include "src/core/lib/iomgr/combiner.h"
78 #include "src/core/lib/iomgr/error.h"
79 #include "src/core/lib/iomgr/exec_ctx.h"
80 #include "src/core/lib/iomgr/iomgr_fwd.h"
81 #include "src/core/lib/iomgr/port.h"
82 #include "src/core/lib/promise/poll.h"
83 #include "src/core/lib/resource_quota/arena.h"
84 #include "src/core/lib/resource_quota/memory_quota.h"
85 #include "src/core/lib/resource_quota/resource_quota.h"
86 #include "src/core/lib/resource_quota/trace.h"
87 #include "src/core/lib/slice/slice.h"
88 #include "src/core/lib/slice/slice_buffer.h"
89 #include "src/core/lib/slice/slice_internal.h"
90 #include "src/core/lib/transport/bdp_estimator.h"
91 #include "src/core/lib/transport/connectivity_state.h"
92 #include "src/core/lib/transport/error_utils.h"
93 #include "src/core/lib/transport/http2_errors.h"
94 #include "src/core/lib/transport/metadata_batch.h"
95 #include "src/core/lib/transport/status_conversion.h"
96 #include "src/core/lib/transport/transport.h"
97 #include "src/core/lib/transport/transport_impl.h"
98
99 #ifdef GRPC_POSIX_SOCKET_TCP
100 #include "src/core/lib/iomgr/ev_posix.h"
101 #endif
102
103 #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
104 #define MAX_WINDOW 0x7fffffffu
105 #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
106 #define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024)
107 #define DEFAULT_MAX_HEADER_LIST_SIZE_SOFT_LIMIT (8 * 1024)
108
109 #define DEFAULT_CLIENT_KEEPALIVE_TIME_MS INT_MAX
110 #define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS 20000 // 20 seconds
111 #define DEFAULT_SERVER_KEEPALIVE_TIME_MS 7200000 // 2 hours
112 #define DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS 20000 // 20 seconds
113 #define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false
114 #define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2
115
116 #define DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS 300000 // 5 minutes
117 #define DEFAULT_MAX_PINGS_BETWEEN_DATA 2
118 #define DEFAULT_MAX_PING_STRIKES 2
119
120 #define DEFAULT_MAX_PENDING_INDUCED_FRAMES 10000
121
122 static int g_default_client_keepalive_time_ms =
123 DEFAULT_CLIENT_KEEPALIVE_TIME_MS;
124 static int g_default_client_keepalive_timeout_ms =
125 DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS;
126 static int g_default_server_keepalive_time_ms =
127 DEFAULT_SERVER_KEEPALIVE_TIME_MS;
128 static int g_default_server_keepalive_timeout_ms =
129 DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS;
130 static bool g_default_client_keepalive_permit_without_calls =
131 DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
132 static bool g_default_server_keepalive_permit_without_calls =
133 DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
134
135 static int g_default_min_recv_ping_interval_without_data_ms =
136 DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS;
137 static int g_default_max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA;
138 static int g_default_max_ping_strikes = DEFAULT_MAX_PING_STRIKES;
139
140 #define MAX_CLIENT_STREAM_ID 0x7fffffffu
141 grpc_core::TraceFlag grpc_keepalive_trace(false, "http_keepalive");
142 grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
143 "chttp2_refcount");
144
145 // forward declarations of various callbacks that we'll build closures around
146 static void write_action_begin_locked(void* t, grpc_error_handle error);
147 static void write_action(void* t, grpc_error_handle error);
148 static void write_action_end(void* t, grpc_error_handle error);
149 static void write_action_end_locked(void* t, grpc_error_handle error);
150
151 static void read_action(void* t, grpc_error_handle error);
152 static void read_action_locked(void* t, grpc_error_handle error);
153 static void continue_read_action_locked(grpc_chttp2_transport* t);
154
155 // Set a transport level setting, and push it to our peer
156 static void queue_setting_update(grpc_chttp2_transport* t,
157 grpc_chttp2_setting_id id, uint32_t value);
158
159 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
160 grpc_error_handle error);
161
162 // Start new streams that have been created if we can
163 static void maybe_start_some_streams(grpc_chttp2_transport* t);
164
165 static void connectivity_state_set(grpc_chttp2_transport* t,
166 grpc_connectivity_state state,
167 const absl::Status& status,
168 const char* reason);
169
170 static void benign_reclaimer_locked(void* arg, grpc_error_handle error);
171 static void destructive_reclaimer_locked(void* arg, grpc_error_handle error);
172
173 static void post_benign_reclaimer(grpc_chttp2_transport* t);
174 static void post_destructive_reclaimer(grpc_chttp2_transport* t);
175
176 static void close_transport_locked(grpc_chttp2_transport* t,
177 grpc_error_handle error);
178 static void end_all_the_calls(grpc_chttp2_transport* t,
179 grpc_error_handle error);
180
181 static void start_bdp_ping(void* tp, grpc_error_handle error);
182 static void finish_bdp_ping(void* tp, grpc_error_handle error);
183 static void start_bdp_ping_locked(void* tp, grpc_error_handle error);
184 static void finish_bdp_ping_locked(void* tp, grpc_error_handle error);
185 static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t);
186 static void next_bdp_ping_timer_expired_locked(
187 void* tp, GRPC_UNUSED grpc_error_handle error);
188
189 static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error);
190 static void send_ping_locked(grpc_chttp2_transport* t,
191 grpc_closure* on_initiate, grpc_closure* on_ack);
192 static void retry_initiate_ping_locked(void* tp,
193 GRPC_UNUSED grpc_error_handle error);
194
195 // keepalive-relevant functions
196 static void init_keepalive_ping(grpc_chttp2_transport* t);
197 static void init_keepalive_ping_locked(void* arg,
198 GRPC_UNUSED grpc_error_handle error);
199 static void start_keepalive_ping(void* arg, grpc_error_handle error);
200 static void finish_keepalive_ping(void* arg, grpc_error_handle error);
201 static void start_keepalive_ping_locked(void* arg, grpc_error_handle error);
202 static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error);
203 static void keepalive_watchdog_fired(grpc_chttp2_transport* t);
204 static void keepalive_watchdog_fired_locked(
205 void* arg, GRPC_UNUSED grpc_error_handle error);
206 static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t);
207
208 namespace {
CallTracerIfEnabled(grpc_chttp2_stream * s)209 grpc_core::CallTracerInterface* CallTracerIfEnabled(grpc_chttp2_stream* s) {
210 if (s->context == nullptr || !grpc_core::IsTraceRecordCallopsEnabled()) {
211 return nullptr;
212 }
213 return static_cast<grpc_core::CallTracerInterface*>(
214 static_cast<grpc_call_context_element*>(
215 s->context)[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE]
216 .value);
217 }
218
219 grpc_core::WriteTimestampsCallback g_write_timestamps_callback = nullptr;
220 grpc_core::CopyContextFn g_get_copied_context_fn = nullptr;
221 } // namespace
222
223 namespace grpc_core {
224
225 namespace {
226 TestOnlyGlobalHttp2TransportInitCallback test_only_init_callback = nullptr;
227 TestOnlyGlobalHttp2TransportDestructCallback test_only_destruct_callback =
228 nullptr;
229 bool test_only_disable_transient_failure_state_notification = false;
230 } // namespace
231
TestOnlySetGlobalHttp2TransportInitCallback(TestOnlyGlobalHttp2TransportInitCallback callback)232 void TestOnlySetGlobalHttp2TransportInitCallback(
233 TestOnlyGlobalHttp2TransportInitCallback callback) {
234 test_only_init_callback = callback;
235 }
236
TestOnlySetGlobalHttp2TransportDestructCallback(TestOnlyGlobalHttp2TransportDestructCallback callback)237 void TestOnlySetGlobalHttp2TransportDestructCallback(
238 TestOnlyGlobalHttp2TransportDestructCallback callback) {
239 test_only_destruct_callback = callback;
240 }
241
TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification(bool disable)242 void TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification(
243 bool disable) {
244 test_only_disable_transient_failure_state_notification = disable;
245 }
246
GrpcHttp2SetWriteTimestampsCallback(WriteTimestampsCallback fn)247 void GrpcHttp2SetWriteTimestampsCallback(WriteTimestampsCallback fn) {
248 g_write_timestamps_callback = fn;
249 }
250
GrpcHttp2SetCopyContextFn(CopyContextFn fn)251 void GrpcHttp2SetCopyContextFn(CopyContextFn fn) {
252 g_get_copied_context_fn = fn;
253 }
254
GrpcHttp2GetWriteTimestampsCallback()255 WriteTimestampsCallback GrpcHttp2GetWriteTimestampsCallback() {
256 return g_write_timestamps_callback;
257 }
258
GrpcHttp2GetCopyContextFn()259 CopyContextFn GrpcHttp2GetCopyContextFn() { return g_get_copied_context_fn; }
260
261 // For each entry in the passed ContextList, it executes the function set using
262 // GrpcHttp2SetWriteTimestampsCallback method with each context in the list
263 // and \a ts. It also deletes/frees up the passed ContextList after this
264 // operation.
ForEachContextListEntryExecute(void * arg,Timestamps * ts,grpc_error_handle error)265 void ForEachContextListEntryExecute(void* arg, Timestamps* ts,
266 grpc_error_handle error) {
267 ContextList* context_list = reinterpret_cast<ContextList*>(arg);
268 if (!context_list) {
269 return;
270 }
271 for (auto it = context_list->begin(); it != context_list->end(); it++) {
272 ContextListEntry& entry = (*it);
273 if (ts) {
274 ts->byte_offset = static_cast<uint32_t>(entry.ByteOffsetInStream());
275 }
276 g_write_timestamps_callback(entry.TraceContext(), ts, error);
277 }
278 delete context_list;
279 }
280
281 } // namespace grpc_core
282
283 //
284 // CONSTRUCTION/DESTRUCTION/REFCOUNTING
285 //
286
~grpc_chttp2_transport()287 grpc_chttp2_transport::~grpc_chttp2_transport() {
288 size_t i;
289
290 event_engine.reset();
291
292 if (channelz_socket != nullptr) {
293 channelz_socket.reset();
294 }
295
296 grpc_endpoint_destroy(ep);
297
298 grpc_slice_buffer_destroy(&qbuf);
299
300 grpc_slice_buffer_destroy(&outbuf);
301
302 grpc_error_handle error = GRPC_ERROR_CREATE("Transport destroyed");
303 // ContextList::Execute follows semantics of a callback function and does not
304 // take a ref on error
305 if (cl != nullptr) {
306 grpc_core::ForEachContextListEntryExecute(cl, nullptr, error);
307 }
308 cl = nullptr;
309
310 grpc_slice_buffer_destroy(&read_buffer);
311 grpc_chttp2_goaway_parser_destroy(&goaway_parser);
312
313 for (i = 0; i < STREAM_LIST_COUNT; i++) {
314 GPR_ASSERT(lists[i].head == nullptr);
315 GPR_ASSERT(lists[i].tail == nullptr);
316 }
317
318 GPR_ASSERT(grpc_chttp2_stream_map_size(&stream_map) == 0);
319
320 grpc_chttp2_stream_map_destroy(&stream_map);
321
322 GRPC_COMBINER_UNREF(combiner, "chttp2_transport");
323
324 cancel_pings(this, GRPC_ERROR_CREATE("Transport destroyed"));
325
326 while (write_cb_pool) {
327 grpc_chttp2_write_cb* next = write_cb_pool->next;
328 gpr_free(write_cb_pool);
329 write_cb_pool = next;
330 }
331
332 gpr_free(ping_acks);
333 if (grpc_core::test_only_destruct_callback != nullptr) {
334 grpc_core::test_only_destruct_callback();
335 }
336 }
337
338 static const grpc_transport_vtable* get_vtable(void);
339
read_channel_args(grpc_chttp2_transport * t,const grpc_core::ChannelArgs & channel_args,bool is_client)340 static void read_channel_args(grpc_chttp2_transport* t,
341 const grpc_core::ChannelArgs& channel_args,
342 bool is_client) {
343 const int initial_sequence_number =
344 channel_args.GetInt(GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER).value_or(-1);
345 if (initial_sequence_number > 0) {
346 if ((t->next_stream_id & 1) != (initial_sequence_number & 1)) {
347 gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
348 GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1,
349 is_client ? "client" : "server");
350 } else {
351 t->next_stream_id = static_cast<uint32_t>(initial_sequence_number);
352 }
353 }
354
355 const int max_hpack_table_size =
356 channel_args.GetInt(GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER).value_or(-1);
357 if (max_hpack_table_size >= 0) {
358 t->hpack_compressor.SetMaxUsableSize(max_hpack_table_size);
359 }
360
361 t->ping_policy.max_pings_without_data =
362 std::max(0, channel_args.GetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)
363 .value_or(g_default_max_pings_without_data));
364 t->ping_policy.max_ping_strikes =
365 std::max(0, channel_args.GetInt(GRPC_ARG_HTTP2_MAX_PING_STRIKES)
366 .value_or(g_default_max_ping_strikes));
367 t->ping_policy.min_recv_ping_interval_without_data =
368 std::max(grpc_core::Duration::Zero(),
369 channel_args
370 .GetDurationFromIntMillis(
371 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)
372 .value_or(grpc_core::Duration::Milliseconds(
373 g_default_min_recv_ping_interval_without_data_ms)));
374 t->write_buffer_size =
375 std::max(0, channel_args.GetInt(GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)
376 .value_or(grpc_core::chttp2::kDefaultWindow));
377 t->keepalive_time =
378 std::max(grpc_core::Duration::Milliseconds(1),
379 channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIME_MS)
380 .value_or(grpc_core::Duration::Milliseconds(
381 t->is_client ? g_default_client_keepalive_time_ms
382 : g_default_server_keepalive_time_ms)));
383 t->keepalive_timeout = std::max(
384 grpc_core::Duration::Zero(),
385 channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIMEOUT_MS)
386 .value_or(grpc_core::Duration::Milliseconds(
387 t->is_client ? g_default_client_keepalive_timeout_ms
388 : g_default_server_keepalive_timeout_ms)));
389 t->keepalive_permit_without_calls =
390 channel_args.GetBool(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)
391 .value_or(false);
392 // Only send the prefered rx frame size http2 setting if we are instructed
393 // to auto size the buffers allocated at tcp level and we also can adjust
394 // sending frame size.
395 t->enable_preferred_rx_crypto_frame_advertisement =
396 channel_args
397 .GetBool(GRPC_ARG_EXPERIMENTAL_HTTP2_PREFERRED_CRYPTO_FRAME_SIZE)
398 .value_or(false);
399
400 if (channel_args.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
401 .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) {
402 t->channelz_socket =
403 grpc_core::MakeRefCounted<grpc_core::channelz::SocketNode>(
404 std::string(grpc_endpoint_get_local_address(t->ep)),
405 std::string(t->peer_string.as_string_view()),
406 absl::StrCat(get_vtable()->name, " ",
407 t->peer_string.as_string_view()),
408 channel_args
409 .GetObjectRef<grpc_core::channelz::SocketNode::Security>());
410 }
411
412 t->ack_pings = channel_args.GetBool("grpc.http2.ack_pings").value_or(true);
413
414 const int soft_limit =
415 channel_args.GetInt(GRPC_ARG_MAX_METADATA_SIZE).value_or(-1);
416 if (soft_limit < 0) {
417 // Set soft limit to 0.8 * hard limit if this is larger than
418 // `DEFAULT_MAX_HEADER_LIST_SIZE_SOFT_LIMIT` and
419 // `GRPC_ARG_MAX_METADATA_SIZE` is not set.
420 t->max_header_list_size_soft_limit = std::max(
421 DEFAULT_MAX_HEADER_LIST_SIZE_SOFT_LIMIT,
422 static_cast<int>(
423 0.8 * channel_args.GetInt(GRPC_ARG_ABSOLUTE_MAX_METADATA_SIZE)
424 .value_or(-1)));
425 } else {
426 t->max_header_list_size_soft_limit = soft_limit;
427 }
428
429 static const struct {
430 absl::string_view channel_arg_name;
431 grpc_chttp2_setting_id setting_id;
432 int default_value;
433 int min;
434 int max;
435 bool availability[2] /* server, client */;
436 } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS,
437 GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
438 -1,
439 0,
440 INT32_MAX,
441 {true, false}},
442 {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
443 GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
444 -1,
445 0,
446 INT32_MAX,
447 {true, true}},
448 {GRPC_ARG_ABSOLUTE_MAX_METADATA_SIZE,
449 GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
450 -1,
451 0,
452 INT32_MAX,
453 {true, true}},
454 {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
455 GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
456 -1,
457 16384,
458 16777215,
459 {true, true}},
460 {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY,
461 GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA,
462 1,
463 0,
464 1,
465 {true, true}},
466 {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES,
467 GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
468 -1,
469 5,
470 INT32_MAX,
471 {true, true}}};
472
473 for (size_t i = 0; i < GPR_ARRAY_SIZE(settings_map); i++) {
474 const auto& setting = settings_map[i];
475 if (setting.availability[is_client]) {
476 const int value = channel_args.GetInt(setting.channel_arg_name)
477 .value_or(setting.default_value);
478 if (value >= 0) {
479 queue_setting_update(t, setting.setting_id,
480 grpc_core::Clamp(value, setting.min, setting.max));
481 } else if (setting.setting_id ==
482 GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE) {
483 // Set value to 1.25 * soft limit if this is larger than
484 // `DEFAULT_MAX_HEADER_LIST_SIZE` and
485 // `GRPC_ARG_ABSOLUTE_MAX_METADATA_SIZE` is not set.
486 const int soft_limit = channel_args.GetInt(GRPC_ARG_MAX_METADATA_SIZE)
487 .value_or(setting.default_value);
488 const int value = (soft_limit >= 0 && soft_limit < (INT_MAX / 1.25))
489 ? static_cast<int>(soft_limit * 1.25)
490 : soft_limit;
491 if (value > DEFAULT_MAX_HEADER_LIST_SIZE) {
492 queue_setting_update(
493 t, setting.setting_id,
494 grpc_core::Clamp(value, setting.min, setting.max));
495 }
496 }
497 } else if (channel_args.Contains(setting.channel_arg_name)) {
498 gpr_log(GPR_DEBUG, "%s is not available on %s",
499 std::string(setting.channel_arg_name).c_str(),
500 is_client ? "clients" : "servers");
501 }
502 }
503
504 if (t->enable_preferred_rx_crypto_frame_advertisement) {
505 const grpc_chttp2_setting_parameters* sp =
506 &grpc_chttp2_settings_parameters
507 [GRPC_CHTTP2_SETTINGS_GRPC_PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE];
508 queue_setting_update(
509 t, GRPC_CHTTP2_SETTINGS_GRPC_PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE,
510 grpc_core::Clamp(INT_MAX, static_cast<int>(sp->min_value),
511 static_cast<int>(sp->max_value)));
512 }
513 }
514
init_transport_keepalive_settings(grpc_chttp2_transport * t)515 static void init_transport_keepalive_settings(grpc_chttp2_transport* t) {
516 if (t->is_client) {
517 t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX
518 ? grpc_core::Duration::Infinity()
519 : grpc_core::Duration::Milliseconds(
520 g_default_client_keepalive_time_ms);
521 t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX
522 ? grpc_core::Duration::Infinity()
523 : grpc_core::Duration::Milliseconds(
524 g_default_client_keepalive_timeout_ms);
525 t->keepalive_permit_without_calls =
526 g_default_client_keepalive_permit_without_calls;
527 } else {
528 t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX
529 ? grpc_core::Duration::Infinity()
530 : grpc_core::Duration::Milliseconds(
531 g_default_server_keepalive_time_ms);
532 t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX
533 ? grpc_core::Duration::Infinity()
534 : grpc_core::Duration::Milliseconds(
535 g_default_server_keepalive_timeout_ms);
536 t->keepalive_permit_without_calls =
537 g_default_server_keepalive_permit_without_calls;
538 }
539 }
540
configure_transport_ping_policy(grpc_chttp2_transport * t)541 static void configure_transport_ping_policy(grpc_chttp2_transport* t) {
542 t->ping_policy.max_pings_without_data = g_default_max_pings_without_data;
543 t->ping_policy.max_ping_strikes = g_default_max_ping_strikes;
544 t->ping_policy.min_recv_ping_interval_without_data =
545 grpc_core::Duration::Milliseconds(
546 g_default_min_recv_ping_interval_without_data_ms);
547 }
548
init_keepalive_pings_if_enabled_locked(void * arg,GRPC_UNUSED grpc_error_handle error)549 static void init_keepalive_pings_if_enabled_locked(
550 void* arg, GRPC_UNUSED grpc_error_handle error) {
551 GPR_DEBUG_ASSERT(error.ok());
552 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
553 if (t->keepalive_time != grpc_core::Duration::Infinity()) {
554 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
555 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
556 t->keepalive_ping_timer_handle =
557 t->event_engine->RunAfter(t->keepalive_time, [t] {
558 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
559 grpc_core::ExecCtx exec_ctx;
560 init_keepalive_ping(t);
561 });
562 } else {
563 // Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
564 // inflight keepalive timers
565 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
566 }
567 }
568
grpc_chttp2_transport(const grpc_core::ChannelArgs & channel_args,grpc_endpoint * ep,bool is_client)569 grpc_chttp2_transport::grpc_chttp2_transport(
570 const grpc_core::ChannelArgs& channel_args, grpc_endpoint* ep,
571 bool is_client)
572 : refs(1, GRPC_TRACE_FLAG_ENABLED(grpc_trace_chttp2_refcount)
573 ? "chttp2_refcount"
574 : nullptr),
575 ep(ep),
576 peer_string(
577 grpc_core::Slice::FromCopiedString(grpc_endpoint_get_peer(ep))),
578 memory_owner(channel_args.GetObject<grpc_core::ResourceQuota>()
579 ->memory_quota()
580 ->CreateMemoryOwner(absl::StrCat(
581 grpc_endpoint_get_peer(ep), ":client_transport"))),
582 self_reservation(
583 memory_owner.MakeReservation(sizeof(grpc_chttp2_transport))),
584 combiner(grpc_combiner_create()),
585 state_tracker(is_client ? "client_transport" : "server_transport",
586 GRPC_CHANNEL_READY),
587 is_client(is_client),
588 next_stream_id(is_client ? 1 : 2),
589 flow_control(
590 peer_string.as_string_view(),
591 channel_args.GetBool(GRPC_ARG_HTTP2_BDP_PROBE).value_or(true),
592 &memory_owner),
593 deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0),
594 event_engine(
595 channel_args
596 .GetObjectRef<grpc_event_engine::experimental::EventEngine>()) {
597 cl = new grpc_core::ContextList();
598 GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
599 GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
600 base.vtable = get_vtable();
601 // 8 is a random stab in the dark as to a good initial size: it's small enough
602 // that it shouldn't waste memory for infrequently used connections, yet
603 // large enough that the exponential growth should happen nicely when it's
604 // needed.
605 // TODO(ctiller): tune this
606 grpc_chttp2_stream_map_init(&stream_map, 8);
607
608 grpc_slice_buffer_init(&read_buffer);
609 grpc_slice_buffer_init(&outbuf);
610 if (is_client) {
611 grpc_slice_buffer_add(&outbuf, grpc_slice_from_copied_string(
612 GRPC_CHTTP2_CLIENT_CONNECT_STRING));
613 }
614 grpc_slice_buffer_init(&qbuf);
615 // copy in initial settings to all setting sets
616 size_t i;
617 int j;
618 for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
619 for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) {
620 settings[j][i] = grpc_chttp2_settings_parameters[i].default_value;
621 }
622 }
623 grpc_chttp2_goaway_parser_init(&goaway_parser);
624
625 // configure http2 the way we like it
626 if (is_client) {
627 queue_setting_update(this, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
628 queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
629 }
630 queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
631 DEFAULT_MAX_HEADER_LIST_SIZE);
632 queue_setting_update(this,
633 GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1);
634
635 configure_transport_ping_policy(this);
636 init_transport_keepalive_settings(this);
637
638 read_channel_args(this, channel_args, is_client);
639
640 // No pings allowed before receiving a header or data frame.
641 ping_state.pings_before_data_required = 0;
642 ping_state.last_ping_sent_time = grpc_core::Timestamp::InfPast();
643
644 ping_recv_state.last_ping_recv_time = grpc_core::Timestamp::InfPast();
645 ping_recv_state.ping_strikes = 0;
646
647 grpc_core::ExecCtx exec_ctx;
648 combiner->Run(
649 GRPC_CLOSURE_INIT(&init_keepalive_ping_locked,
650 init_keepalive_pings_if_enabled_locked, this, nullptr),
651 absl::OkStatus());
652
653 if (flow_control.bdp_probe()) {
654 bdp_ping_blocked = true;
655 grpc_chttp2_act_on_flowctl_action(flow_control.PeriodicUpdate(), this,
656 nullptr);
657 }
658
659 grpc_chttp2_initiate_write(this, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
660 post_benign_reclaimer(this);
661 if (grpc_core::test_only_init_callback != nullptr) {
662 grpc_core::test_only_init_callback();
663 }
664
665 #ifdef GRPC_POSIX_SOCKET_TCP
666 closure_barrier_may_cover_write =
667 grpc_event_engine_run_in_background() &&
668 grpc_core::IsScheduleCancellationOverWriteEnabled()
669 ? 0
670 : CLOSURE_BARRIER_MAY_COVER_WRITE;
671 #endif
672 }
673
destroy_transport_locked(void * tp,grpc_error_handle)674 static void destroy_transport_locked(void* tp, grpc_error_handle /*error*/) {
675 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
676 t->destroying = 1;
677 close_transport_locked(
678 t, grpc_error_set_int(GRPC_ERROR_CREATE("Transport destroyed"),
679 grpc_core::StatusIntProperty::kOccurredDuringWrite,
680 t->write_state));
681 t->memory_owner.Reset();
682 // Must be the last line.
683 GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy");
684 }
685
destroy_transport(grpc_transport * gt)686 static void destroy_transport(grpc_transport* gt) {
687 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
688 t->combiner->Run(GRPC_CLOSURE_CREATE(destroy_transport_locked, t, nullptr),
689 absl::OkStatus());
690 }
691
close_transport_locked(grpc_chttp2_transport * t,grpc_error_handle error)692 static void close_transport_locked(grpc_chttp2_transport* t,
693 grpc_error_handle error) {
694 end_all_the_calls(t, error);
695 cancel_pings(t, error);
696 if (t->closed_with_error.ok()) {
697 if (!grpc_error_has_clear_grpc_status(error)) {
698 error =
699 grpc_error_set_int(error, grpc_core::StatusIntProperty::kRpcStatus,
700 GRPC_STATUS_UNAVAILABLE);
701 }
702 if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) {
703 if (t->close_transport_on_writes_finished.ok()) {
704 t->close_transport_on_writes_finished =
705 GRPC_ERROR_CREATE("Delayed close due to in-progress write");
706 }
707 t->close_transport_on_writes_finished =
708 grpc_error_add_child(t->close_transport_on_writes_finished, error);
709 return;
710 }
711 GPR_ASSERT(!error.ok());
712 t->closed_with_error = error;
713 connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, absl::Status(),
714 "close_transport");
715 if (t->ping_state.delayed_ping_timer_handle.has_value()) {
716 if (t->event_engine->Cancel(*t->ping_state.delayed_ping_timer_handle)) {
717 GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked");
718 t->ping_state.delayed_ping_timer_handle.reset();
719 }
720 }
721 if (t->next_bdp_ping_timer_handle.has_value()) {
722 if (t->event_engine->Cancel(*t->next_bdp_ping_timer_handle)) {
723 GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
724 t->next_bdp_ping_timer_handle.reset();
725 }
726 }
727 switch (t->keepalive_state) {
728 case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
729 if (t->keepalive_ping_timer_handle.has_value()) {
730 if (t->event_engine->Cancel(*t->keepalive_ping_timer_handle)) {
731 GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
732 t->keepalive_ping_timer_handle.reset();
733 }
734 }
735 break;
736 case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
737 if (t->keepalive_ping_timer_handle.has_value()) {
738 if (t->event_engine->Cancel(*t->keepalive_ping_timer_handle)) {
739 GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
740 t->keepalive_ping_timer_handle.reset();
741 }
742 }
743 if (t->keepalive_watchdog_timer_handle.has_value()) {
744 if (t->event_engine->Cancel(*t->keepalive_watchdog_timer_handle)) {
745 GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
746 t->keepalive_watchdog_timer_handle.reset();
747 }
748 }
749 break;
750 case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
751 case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
752 // keepalive timers are not set in these two states
753 break;
754 }
755
756 // flush writable stream list to avoid dangling references
757 grpc_chttp2_stream* s;
758 while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
759 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
760 }
761 GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
762 grpc_endpoint_shutdown(t->ep, error);
763 }
764 if (t->notify_on_receive_settings != nullptr) {
765 grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings,
766 error);
767 t->notify_on_receive_settings = nullptr;
768 }
769 if (t->notify_on_close != nullptr) {
770 grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_close, error);
771 t->notify_on_close = nullptr;
772 }
773 }
774
775 #ifndef NDEBUG
grpc_chttp2_stream_ref(grpc_chttp2_stream * s,const char * reason)776 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) {
777 grpc_stream_ref(s->refcount, reason);
778 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s,const char * reason)779 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) {
780 grpc_stream_unref(s->refcount, reason);
781 }
782 #else
grpc_chttp2_stream_ref(grpc_chttp2_stream * s)783 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) {
784 grpc_stream_ref(s->refcount);
785 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s)786 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
787 grpc_stream_unref(s->refcount);
788 }
789 #endif
790
Reffer(grpc_chttp2_stream * s)791 grpc_chttp2_stream::Reffer::Reffer(grpc_chttp2_stream* s) {
792 // We reserve one 'active stream' that's dropped when the stream is
793 // read-closed. The others are for Chttp2IncomingByteStreams that are
794 // actively reading
795 GRPC_CHTTP2_STREAM_REF(s, "chttp2");
796 GRPC_CHTTP2_REF_TRANSPORT(s->t, "stream");
797 }
798
grpc_chttp2_stream(grpc_chttp2_transport * t,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)799 grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
800 grpc_stream_refcount* refcount,
801 const void* server_data,
802 grpc_core::Arena* arena)
803 : t(t),
804 refcount(refcount),
805 reffer(this),
806 initial_metadata_buffer(arena),
807 trailing_metadata_buffer(arena),
808 flow_control(&t->flow_control) {
809 if (server_data) {
810 id = static_cast<uint32_t>(reinterpret_cast<uintptr_t>(server_data));
811 if (grpc_http_trace.enabled()) {
812 gpr_log(GPR_DEBUG, "HTTP:%p/%p creating accept stream %d [from %p]", t,
813 this, id, server_data);
814 }
815 *t->accepting_stream = this;
816 grpc_chttp2_stream_map_add(&t->stream_map, id, this);
817 post_destructive_reclaimer(t);
818 }
819
820 grpc_slice_buffer_init(&frame_storage);
821 grpc_slice_buffer_init(&flow_controlled_buffer);
822 }
823
~grpc_chttp2_stream()824 grpc_chttp2_stream::~grpc_chttp2_stream() {
825 grpc_chttp2_list_remove_stalled_by_stream(t, this);
826 grpc_chttp2_list_remove_stalled_by_transport(t, this);
827
828 if (t->channelz_socket != nullptr) {
829 if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) {
830 t->channelz_socket->RecordStreamSucceeded();
831 } else {
832 t->channelz_socket->RecordStreamFailed();
833 }
834 }
835
836 GPR_ASSERT((write_closed && read_closed) || id == 0);
837 if (id != 0) {
838 GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr);
839 }
840
841 grpc_slice_buffer_destroy(&frame_storage);
842
843 for (int i = 0; i < STREAM_LIST_COUNT; i++) {
844 if (GPR_UNLIKELY(included.is_set(i))) {
845 grpc_core::Crash(absl::StrFormat("%s stream %d still included in list %d",
846 t->is_client ? "client" : "server", id,
847 i));
848 }
849 }
850
851 GPR_ASSERT(send_initial_metadata_finished == nullptr);
852 GPR_ASSERT(send_trailing_metadata_finished == nullptr);
853 GPR_ASSERT(recv_initial_metadata_ready == nullptr);
854 GPR_ASSERT(recv_message_ready == nullptr);
855 GPR_ASSERT(recv_trailing_metadata_finished == nullptr);
856 grpc_slice_buffer_destroy(&flow_controlled_buffer);
857 GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream");
858 grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_stream_arg, absl::OkStatus());
859 }
860
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)861 static int init_stream(grpc_transport* gt, grpc_stream* gs,
862 grpc_stream_refcount* refcount, const void* server_data,
863 grpc_core::Arena* arena) {
864 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
865 new (gs) grpc_chttp2_stream(t, refcount, server_data, arena);
866 return 0;
867 }
868
destroy_stream_locked(void * sp,grpc_error_handle)869 static void destroy_stream_locked(void* sp, grpc_error_handle /*error*/) {
870 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
871 s->~grpc_chttp2_stream();
872 }
873
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)874 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
875 grpc_closure* then_schedule_closure) {
876 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
877 grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
878
879 s->destroy_stream_arg = then_schedule_closure;
880 t->combiner->Run(
881 GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, nullptr),
882 absl::OkStatus());
883 }
884
grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport * t,uint32_t id)885 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
886 uint32_t id) {
887 if (t->accept_stream_cb == nullptr) {
888 return nullptr;
889 }
890 grpc_chttp2_stream* accepting = nullptr;
891 GPR_ASSERT(t->accepting_stream == nullptr);
892 t->accepting_stream = &accepting;
893 t->accept_stream_cb(t->accept_stream_cb_user_data, &t->base,
894 reinterpret_cast<void*>(id));
895 t->accepting_stream = nullptr;
896 return accepting;
897 }
898
899 //
900 // OUTPUT PROCESSING
901 //
902
write_state_name(grpc_chttp2_write_state st)903 static const char* write_state_name(grpc_chttp2_write_state st) {
904 switch (st) {
905 case GRPC_CHTTP2_WRITE_STATE_IDLE:
906 return "IDLE";
907 case GRPC_CHTTP2_WRITE_STATE_WRITING:
908 return "WRITING";
909 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
910 return "WRITING+MORE";
911 }
912 GPR_UNREACHABLE_CODE(return "UNKNOWN");
913 }
914
set_write_state(grpc_chttp2_transport * t,grpc_chttp2_write_state st,const char * reason)915 static void set_write_state(grpc_chttp2_transport* t,
916 grpc_chttp2_write_state st, const char* reason) {
917 GRPC_CHTTP2_IF_TRACING(
918 gpr_log(GPR_INFO, "W:%p %s [%s] state %s -> %s [%s]", t,
919 t->is_client ? "CLIENT" : "SERVER",
920 std::string(t->peer_string.as_string_view()).c_str(),
921 write_state_name(t->write_state), write_state_name(st), reason));
922 t->write_state = st;
923 // If the state is being reset back to idle, it means a write was just
924 // finished. Make sure all the run_after_write closures are scheduled.
925 //
926 // This is also our chance to close the transport if the transport was marked
927 // to be closed after all writes finish (for example, if we received a go-away
928 // from peer while we had some pending writes)
929 if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
930 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
931 if (!t->close_transport_on_writes_finished.ok()) {
932 grpc_error_handle err = t->close_transport_on_writes_finished;
933 t->close_transport_on_writes_finished = absl::OkStatus();
934 close_transport_locked(t, err);
935 }
936 }
937 }
938
grpc_chttp2_initiate_write(grpc_chttp2_transport * t,grpc_chttp2_initiate_write_reason reason)939 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
940 grpc_chttp2_initiate_write_reason reason) {
941 switch (t->write_state) {
942 case GRPC_CHTTP2_WRITE_STATE_IDLE:
943 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
944 grpc_chttp2_initiate_write_reason_string(reason));
945 GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
946 // Note that the 'write_action_begin_locked' closure is being scheduled
947 // on the 'finally_scheduler' of t->combiner. This means that
948 // 'write_action_begin_locked' is called only *after* all the other
949 // closures (some of which are potentially initiating more writes on the
950 // transport) are executed on the t->combiner.
951 //
952 // The reason for scheduling on finally_scheduler is to make sure we batch
953 // as many writes as possible. 'write_action_begin_locked' is the function
954 // that gathers all the relevant bytes (which are at various places in the
955 // grpc_chttp2_transport structure) and append them to 'outbuf' field in
956 // grpc_chttp2_transport thereby batching what would have been potentially
957 // multiple write operations.
958 //
959 // Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
960 // It does not call the endpoint to write the bytes. That is done by the
961 // 'write_action' (which is scheduled by 'write_action_begin_locked')
962 t->combiner->FinallyRun(
963 GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
964 write_action_begin_locked, t, nullptr),
965 absl::OkStatus());
966 break;
967 case GRPC_CHTTP2_WRITE_STATE_WRITING:
968 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
969 grpc_chttp2_initiate_write_reason_string(reason));
970 break;
971 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
972 break;
973 }
974 }
975
grpc_chttp2_mark_stream_writable(grpc_chttp2_transport * t,grpc_chttp2_stream * s)976 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
977 grpc_chttp2_stream* s) {
978 if (t->closed_with_error.ok() && grpc_chttp2_list_add_writable_stream(t, s)) {
979 GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
980 }
981 }
982
begin_writing_desc(bool partial)983 static const char* begin_writing_desc(bool partial) {
984 if (partial) {
985 return "begin partial write in background";
986 } else {
987 return "begin write in current thread";
988 }
989 }
990
write_action_begin_locked(void * gt,grpc_error_handle)991 static void write_action_begin_locked(void* gt,
992 grpc_error_handle /*error_ignored*/) {
993 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
994 GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
995 grpc_chttp2_begin_write_result r;
996 if (!t->closed_with_error.ok()) {
997 r.writing = false;
998 } else {
999 r = grpc_chttp2_begin_write(t);
1000 }
1001 if (r.writing) {
1002 set_write_state(t,
1003 r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
1004 : GRPC_CHTTP2_WRITE_STATE_WRITING,
1005 begin_writing_desc(r.partial));
1006 write_action(t, absl::OkStatus());
1007 if (t->reading_paused_on_pending_induced_frames) {
1008 GPR_ASSERT(t->num_pending_induced_frames == 0);
1009 // We had paused reading, because we had many induced frames (SETTINGS
1010 // ACK, PINGS ACK and RST_STREAMS) pending in t->qbuf. Now that we have
1011 // been able to flush qbuf, we can resume reading.
1012 GRPC_CHTTP2_IF_TRACING(gpr_log(
1013 GPR_INFO,
1014 "transport %p : Resuming reading after being paused due to too "
1015 "many unwritten SETTINGS ACK, PINGS ACK and RST_STREAM frames",
1016 t));
1017 t->reading_paused_on_pending_induced_frames = false;
1018 continue_read_action_locked(t);
1019 }
1020 } else {
1021 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing");
1022 GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
1023 }
1024 }
1025
write_action(void * gt,grpc_error_handle)1026 static void write_action(void* gt, grpc_error_handle /*error*/) {
1027 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
1028 void* cl = t->cl;
1029 if (!t->cl->empty()) {
1030 // Transfer the ownership of the context list to the endpoint and create and
1031 // associate a new context list with the transport.
1032 // The old context list is stored in the cl local variable which is passed
1033 // to the endpoint. Its upto the endpoint to manage its lifetime.
1034 t->cl = new grpc_core::ContextList();
1035 } else {
1036 // t->cl is Empty. There is nothing to trace in this endpoint_write. set cl
1037 // to nullptr.
1038 cl = nullptr;
1039 }
1040 // Choose max_frame_size as the prefered rx crypto frame size indicated by the
1041 // peer.
1042 int max_frame_size =
1043 t->settings
1044 [GRPC_PEER_SETTINGS]
1045 [GRPC_CHTTP2_SETTINGS_GRPC_PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE];
1046 // Note: max frame size is 0 if the remote peer does not support adjusting the
1047 // sending frame size.
1048 if (max_frame_size == 0) {
1049 max_frame_size = INT_MAX;
1050 }
1051 grpc_endpoint_write(
1052 t->ep, &t->outbuf,
1053 GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end, t,
1054 grpc_schedule_on_exec_ctx),
1055 cl, max_frame_size);
1056 }
1057
write_action_end(void * tp,grpc_error_handle error)1058 static void write_action_end(void* tp, grpc_error_handle error) {
1059 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1060 t->combiner->Run(GRPC_CLOSURE_INIT(&t->write_action_end_locked,
1061 write_action_end_locked, t, nullptr),
1062 error);
1063 }
1064
1065 // Callback from the grpc_endpoint after bytes have been written by calling
1066 // sendmsg
write_action_end_locked(void * tp,grpc_error_handle error)1067 static void write_action_end_locked(void* tp, grpc_error_handle error) {
1068 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1069
1070 bool closed = false;
1071 if (!error.ok()) {
1072 close_transport_locked(t, error);
1073 closed = true;
1074 }
1075
1076 if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED) {
1077 t->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SENT;
1078 closed = true;
1079 if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
1080 close_transport_locked(t, GRPC_ERROR_CREATE("goaway sent"));
1081 }
1082 }
1083
1084 switch (t->write_state) {
1085 case GRPC_CHTTP2_WRITE_STATE_IDLE:
1086 GPR_UNREACHABLE_CODE(break);
1087 case GRPC_CHTTP2_WRITE_STATE_WRITING:
1088 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing");
1089 break;
1090 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
1091 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing");
1092 GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
1093 // If the transport is closed, we will retry writing on the endpoint
1094 // and next write may contain part of the currently serialized frames.
1095 // So, we should only call the run_after_write callbacks when the next
1096 // write finishes, or the callbacks will be invoked when the stream is
1097 // closed.
1098 if (!closed) {
1099 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
1100 }
1101 t->combiner->FinallyRun(
1102 GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
1103 write_action_begin_locked, t, nullptr),
1104 absl::OkStatus());
1105 break;
1106 }
1107
1108 grpc_chttp2_end_write(t, error);
1109 GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
1110 }
1111
1112 // Dirties an HTTP2 setting to be sent out next time a writing path occurs.
1113 // If the change needs to occur immediately, manually initiate a write.
queue_setting_update(grpc_chttp2_transport * t,grpc_chttp2_setting_id id,uint32_t value)1114 static void queue_setting_update(grpc_chttp2_transport* t,
1115 grpc_chttp2_setting_id id, uint32_t value) {
1116 const grpc_chttp2_setting_parameters* sp =
1117 &grpc_chttp2_settings_parameters[id];
1118 uint32_t use_value = grpc_core::Clamp(value, sp->min_value, sp->max_value);
1119 if (use_value != value) {
1120 gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
1121 value, use_value);
1122 }
1123 if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) {
1124 t->settings[GRPC_LOCAL_SETTINGS][id] = use_value;
1125 t->dirtied_local_settings = true;
1126 }
1127 }
1128
1129 // Cancel out streams that haven't yet started if we have received a GOAWAY
cancel_unstarted_streams(grpc_chttp2_transport * t,grpc_error_handle error)1130 static void cancel_unstarted_streams(grpc_chttp2_transport* t,
1131 grpc_error_handle error) {
1132 grpc_chttp2_stream* s;
1133 while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1134 s->trailing_metadata_buffer.Set(
1135 grpc_core::GrpcStreamNetworkState(),
1136 grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
1137 grpc_chttp2_cancel_stream(t, s, error);
1138 }
1139 }
1140
grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport * t,uint32_t goaway_error,uint32_t last_stream_id,absl::string_view goaway_text)1141 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
1142 uint32_t goaway_error,
1143 uint32_t last_stream_id,
1144 absl::string_view goaway_text) {
1145 t->goaway_error = grpc_error_set_str(
1146 grpc_error_set_int(
1147 grpc_error_set_int(
1148 grpc_core::StatusCreate(
1149 absl::StatusCode::kUnavailable,
1150 absl::StrFormat(
1151 "GOAWAY received; Error code: %u; Debug Text: %s",
1152 goaway_error, goaway_text),
1153 DEBUG_LOCATION, {}),
1154 grpc_core::StatusIntProperty::kHttp2Error,
1155 static_cast<intptr_t>(goaway_error)),
1156 grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE),
1157 grpc_core::StatusStrProperty::kRawBytes, goaway_text);
1158
1159 GRPC_CHTTP2_IF_TRACING(
1160 gpr_log(GPR_INFO, "transport %p got goaway with last stream id %d", t,
1161 last_stream_id));
1162 // We want to log this irrespective of whether http tracing is enabled if we
1163 // received a GOAWAY with a non NO_ERROR code.
1164 if (goaway_error != GRPC_HTTP2_NO_ERROR) {
1165 gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s",
1166 std::string(t->peer_string.as_string_view()).c_str(), goaway_error,
1167 grpc_core::StatusToString(t->goaway_error).c_str());
1168 }
1169 if (t->is_client) {
1170 cancel_unstarted_streams(t, t->goaway_error);
1171 // Cancel all unseen streams
1172 grpc_chttp2_stream_map_for_each(
1173 &t->stream_map,
1174 [](void* user_data, uint32_t /* key */, void* stream) {
1175 uint32_t last_stream_id = *(static_cast<uint32_t*>(user_data));
1176 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(stream);
1177 if (s->id > last_stream_id) {
1178 s->trailing_metadata_buffer.Set(
1179 grpc_core::GrpcStreamNetworkState(),
1180 grpc_core::GrpcStreamNetworkState::kNotSeenByServer);
1181 grpc_chttp2_cancel_stream(s->t, s, s->t->goaway_error);
1182 }
1183 },
1184 &last_stream_id);
1185 }
1186 absl::Status status = grpc_error_to_absl_status(t->goaway_error);
1187 // When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
1188 // data equal to "too_many_pings", it should log the occurrence at a log level
1189 // that is enabled by default and double the configured KEEPALIVE_TIME used
1190 // for new connections on that channel.
1191 if (GPR_UNLIKELY(t->is_client &&
1192 goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM &&
1193 goaway_text == "too_many_pings")) {
1194 gpr_log(GPR_ERROR,
1195 "%s: Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug "
1196 "data equal to \"too_many_pings\". Current keepalive time (before "
1197 "throttling): %s",
1198 std::string(t->peer_string.as_string_view()).c_str(),
1199 t->keepalive_time.ToString().c_str());
1200 constexpr int max_keepalive_time_millis =
1201 INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER;
1202 int64_t throttled_keepalive_time =
1203 t->keepalive_time.millis() > max_keepalive_time_millis
1204 ? INT_MAX
1205 : t->keepalive_time.millis() * KEEPALIVE_TIME_BACKOFF_MULTIPLIER;
1206 status.SetPayload(grpc_core::kKeepaliveThrottlingKey,
1207 absl::Cord(std::to_string(throttled_keepalive_time)));
1208 }
1209 // lie: use transient failure from the transport to indicate goaway has been
1210 // received.
1211 if (!grpc_core::test_only_disable_transient_failure_state_notification) {
1212 connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1213 "got_goaway");
1214 }
1215 }
1216
maybe_start_some_streams(grpc_chttp2_transport * t)1217 static void maybe_start_some_streams(grpc_chttp2_transport* t) {
1218 grpc_chttp2_stream* s;
1219 // maybe cancel out streams that haven't yet started if we have received a
1220 // GOAWAY
1221 if (!t->goaway_error.ok()) {
1222 cancel_unstarted_streams(t, t->goaway_error);
1223 return;
1224 }
1225 // start streams where we have free grpc_chttp2_stream ids and free
1226 // * concurrency
1227 while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
1228 grpc_chttp2_stream_map_size(&t->stream_map) <
1229 t->settings[GRPC_PEER_SETTINGS]
1230 [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
1231 grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1232 // safe since we can't (legally) be parsing this stream yet
1233 GRPC_CHTTP2_IF_TRACING(gpr_log(
1234 GPR_INFO,
1235 "HTTP:%s: Transport %p allocating new grpc_chttp2_stream %p to id %d",
1236 t->is_client ? "CLI" : "SVR", t, s, t->next_stream_id));
1237
1238 GPR_ASSERT(s->id == 0);
1239 s->id = t->next_stream_id;
1240 t->next_stream_id += 2;
1241
1242 if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1243 connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE,
1244 absl::Status(absl::StatusCode::kUnavailable,
1245 "Transport Stream IDs exhausted"),
1246 "no_more_stream_ids");
1247 }
1248
1249 grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
1250 post_destructive_reclaimer(t);
1251 grpc_chttp2_mark_stream_writable(t, s);
1252 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
1253 }
1254 // cancel out streams that will never be started
1255 if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1256 while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1257 s->trailing_metadata_buffer.Set(
1258 grpc_core::GrpcStreamNetworkState(),
1259 grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
1260 grpc_chttp2_cancel_stream(
1261 t, s,
1262 grpc_error_set_int(GRPC_ERROR_CREATE("Stream IDs exhausted"),
1263 grpc_core::StatusIntProperty::kRpcStatus,
1264 GRPC_STATUS_UNAVAILABLE));
1265 }
1266 }
1267 }
1268
add_closure_barrier(grpc_closure * closure)1269 static grpc_closure* add_closure_barrier(grpc_closure* closure) {
1270 closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT;
1271 return closure;
1272 }
1273
null_then_sched_closure(grpc_closure ** closure)1274 static void null_then_sched_closure(grpc_closure** closure) {
1275 grpc_closure* c = *closure;
1276 *closure = nullptr;
1277 // null_then_schedule_closure might be run during a start_batch which might
1278 // subsequently examine the batch for more operations contained within.
1279 // However, the closure run might make it back to the call object, push a
1280 // completion, have the application see it, and make a new operation on the
1281 // call which recycles the batch BEFORE the call to start_batch completes,
1282 // forcing a race.
1283 grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, absl::OkStatus());
1284 }
1285
grpc_chttp2_complete_closure_step(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_closure ** pclosure,grpc_error_handle error,const char * desc,grpc_core::DebugLocation whence)1286 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
1287 grpc_chttp2_stream* s,
1288 grpc_closure** pclosure,
1289 grpc_error_handle error,
1290 const char* desc,
1291 grpc_core::DebugLocation whence) {
1292 grpc_closure* closure = *pclosure;
1293 *pclosure = nullptr;
1294 if (closure == nullptr) {
1295 return;
1296 }
1297 closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
1298 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1299 gpr_log(
1300 GPR_INFO,
1301 "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s "
1302 "write_state=%s whence=%s:%d",
1303 t, closure,
1304 static_cast<int>(closure->next_data.scratch /
1305 CLOSURE_BARRIER_FIRST_REF_BIT),
1306 static_cast<int>(closure->next_data.scratch %
1307 CLOSURE_BARRIER_FIRST_REF_BIT),
1308 desc, grpc_core::StatusToString(error).c_str(),
1309 write_state_name(t->write_state), whence.file(), whence.line());
1310 }
1311
1312 auto* tracer = CallTracerIfEnabled(s);
1313 if (tracer != nullptr) {
1314 tracer->RecordAnnotation(
1315 absl::StrFormat("on_complete: s=%p %p desc=%s err=%s", s, closure, desc,
1316 grpc_core::StatusToString(error).c_str()));
1317 }
1318
1319 if (!error.ok()) {
1320 grpc_error_handle cl_err =
1321 grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
1322 if (cl_err.ok()) {
1323 cl_err = GRPC_ERROR_CREATE(absl::StrCat(
1324 "Error in HTTP transport completing operation: ", desc,
1325 " write_state=", write_state_name(t->write_state), " refs=",
1326 closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT, " flags=",
1327 closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT));
1328 cl_err = grpc_error_set_str(cl_err,
1329 grpc_core::StatusStrProperty::kTargetAddress,
1330 std::string(t->peer_string.as_string_view()));
1331 }
1332 cl_err = grpc_error_add_child(cl_err, error);
1333 closure->error_data.error = grpc_core::internal::StatusAllocHeapPtr(cl_err);
1334 }
1335 if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
1336 if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
1337 !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
1338 // Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running
1339 // closures earlier than when it is safe to do so.
1340 grpc_error_handle run_error =
1341 grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
1342 closure->error_data.error = 0;
1343 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, run_error);
1344 } else {
1345 grpc_closure_list_append(&t->run_after_write, closure);
1346 }
1347 }
1348 }
1349
contains_non_ok_status(grpc_metadata_batch * batch)1350 static bool contains_non_ok_status(grpc_metadata_batch* batch) {
1351 return batch->get(grpc_core::GrpcStatusMetadata()).value_or(GRPC_STATUS_OK) !=
1352 GRPC_STATUS_OK;
1353 }
1354
log_metadata(const grpc_metadata_batch * md_batch,uint32_t id,bool is_client,bool is_initial)1355 static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
1356 bool is_client, bool is_initial) {
1357 gpr_log(GPR_INFO, "--metadata--");
1358 const std::string prefix = absl::StrCat(
1359 "HTTP:", id, is_initial ? ":HDR" : ":TRL", is_client ? ":CLI:" : ":SVR:");
1360 md_batch->Log([&prefix](absl::string_view key, absl::string_view value) {
1361 gpr_log(GPR_INFO, "%s", absl::StrCat(prefix, key, ": ", value).c_str());
1362 });
1363 }
1364
perform_stream_op_locked(void * stream_op,grpc_error_handle)1365 static void perform_stream_op_locked(void* stream_op,
1366 grpc_error_handle /*error_ignored*/) {
1367 grpc_transport_stream_op_batch* op =
1368 static_cast<grpc_transport_stream_op_batch*>(stream_op);
1369 grpc_chttp2_stream* s =
1370 static_cast<grpc_chttp2_stream*>(op->handler_private.extra_arg);
1371 grpc_transport_stream_op_batch_payload* op_payload = op->payload;
1372 grpc_chttp2_transport* t = s->t;
1373
1374 s->context = op->payload->context;
1375 s->traced = op->is_traced;
1376 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1377 gpr_log(GPR_INFO,
1378 "perform_stream_op_locked[s=%p; op=%p]: %s; on_complete = %p", s,
1379 op, grpc_transport_stream_op_batch_string(op, false).c_str(),
1380 op->on_complete);
1381 if (op->send_initial_metadata) {
1382 log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
1383 s->id, t->is_client, true);
1384 }
1385 if (op->send_trailing_metadata) {
1386 log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata,
1387 s->id, t->is_client, false);
1388 }
1389 }
1390
1391 auto* tracer = CallTracerIfEnabled(s);
1392 if (tracer != nullptr) {
1393 tracer->RecordAnnotation(absl::StrFormat(
1394 "perform_stream_op_locked[s=%p; op=%p]: %s; on_complete = %p", s, op,
1395 grpc_transport_stream_op_batch_string(op, true).c_str(),
1396 op->on_complete));
1397 }
1398
1399 grpc_closure* on_complete = op->on_complete;
1400 // on_complete will be null if and only if there are no send ops in the batch.
1401 if (on_complete != nullptr) {
1402 // This batch has send ops. Use final_data as a barrier until enqueue time;
1403 // the initial counter is dropped at the end of this function.
1404 on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
1405 on_complete->error_data.error = 0;
1406 }
1407
1408 if (op->cancel_stream) {
1409 grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
1410 }
1411
1412 if (op->send_initial_metadata) {
1413 if (t->is_client && t->channelz_socket != nullptr) {
1414 t->channelz_socket->RecordStreamStartedFromLocal();
1415 }
1416 GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
1417 on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
1418
1419 s->send_initial_metadata_finished = add_closure_barrier(on_complete);
1420 s->send_initial_metadata =
1421 op_payload->send_initial_metadata.send_initial_metadata;
1422 if (t->is_client) {
1423 s->deadline = std::min(
1424 s->deadline,
1425 s->send_initial_metadata->get(grpc_core::GrpcTimeoutMetadata())
1426 .value_or(grpc_core::Timestamp::InfFuture()));
1427 }
1428 if (contains_non_ok_status(s->send_initial_metadata)) {
1429 s->seen_error = true;
1430 }
1431 if (!s->write_closed) {
1432 if (t->is_client) {
1433 if (t->closed_with_error.ok()) {
1434 GPR_ASSERT(s->id == 0);
1435 grpc_chttp2_list_add_waiting_for_concurrency(t, s);
1436 maybe_start_some_streams(t);
1437 } else {
1438 s->trailing_metadata_buffer.Set(
1439 grpc_core::GrpcStreamNetworkState(),
1440 grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
1441 grpc_chttp2_cancel_stream(
1442 t, s,
1443 grpc_error_set_int(
1444 GRPC_ERROR_CREATE_REFERENCING("Transport closed",
1445 &t->closed_with_error, 1),
1446 grpc_core::StatusIntProperty::kRpcStatus,
1447 GRPC_STATUS_UNAVAILABLE));
1448 }
1449 } else {
1450 GPR_ASSERT(s->id != 0);
1451 grpc_chttp2_mark_stream_writable(t, s);
1452 if (!(op->send_message &&
1453 (op->payload->send_message.flags & GRPC_WRITE_BUFFER_HINT))) {
1454 grpc_chttp2_initiate_write(
1455 t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
1456 }
1457 }
1458 } else {
1459 s->send_initial_metadata = nullptr;
1460 grpc_chttp2_complete_closure_step(
1461 t, s, &s->send_initial_metadata_finished,
1462 GRPC_ERROR_CREATE_REFERENCING(
1463 "Attempt to send initial metadata after stream was closed",
1464 &s->write_closed_error, 1),
1465 "send_initial_metadata_finished");
1466 }
1467 }
1468
1469 if (op->send_message) {
1470 t->num_messages_in_next_write++;
1471 grpc_core::global_stats().IncrementHttp2SendMessageSize(
1472 op->payload->send_message.send_message->Length());
1473 on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
1474 s->send_message_finished = add_closure_barrier(op->on_complete);
1475 const uint32_t flags = op_payload->send_message.flags;
1476 if (s->write_closed) {
1477 op->payload->send_message.stream_write_closed = true;
1478 // We should NOT return an error here, so as to avoid a cancel OP being
1479 // started. The surface layer will notice that the stream has been closed
1480 // for writes and fail the send message op.
1481 grpc_chttp2_complete_closure_step(t, s, &s->send_message_finished,
1482 absl::OkStatus(),
1483 "fetching_send_message_finished");
1484 } else {
1485 uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(
1486 &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
1487 frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
1488 size_t len = op_payload->send_message.send_message->Length();
1489 frame_hdr[1] = static_cast<uint8_t>(len >> 24);
1490 frame_hdr[2] = static_cast<uint8_t>(len >> 16);
1491 frame_hdr[3] = static_cast<uint8_t>(len >> 8);
1492 frame_hdr[4] = static_cast<uint8_t>(len);
1493
1494 s->next_message_end_offset =
1495 s->flow_controlled_bytes_written +
1496 static_cast<int64_t>(s->flow_controlled_buffer.length) +
1497 static_cast<int64_t>(len);
1498 if (flags & GRPC_WRITE_BUFFER_HINT) {
1499 s->next_message_end_offset -= t->write_buffer_size;
1500 s->write_buffering = true;
1501 } else {
1502 s->write_buffering = false;
1503 }
1504
1505 grpc_slice* const slices =
1506 op_payload->send_message.send_message->c_slice_buffer()->slices;
1507 grpc_slice* const end =
1508 slices + op_payload->send_message.send_message->Count();
1509 for (grpc_slice* slice = slices; slice != end; slice++) {
1510 grpc_slice_buffer_add(&s->flow_controlled_buffer,
1511 grpc_core::CSliceRef(*slice));
1512 }
1513
1514 int64_t notify_offset = s->next_message_end_offset;
1515 if (notify_offset <= s->flow_controlled_bytes_written) {
1516 grpc_chttp2_complete_closure_step(t, s, &s->send_message_finished,
1517 absl::OkStatus(),
1518 "fetching_send_message_finished");
1519 } else {
1520 grpc_chttp2_write_cb* cb = t->write_cb_pool;
1521 if (cb == nullptr) {
1522 cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb)));
1523 } else {
1524 t->write_cb_pool = cb->next;
1525 }
1526 cb->call_at_byte = notify_offset;
1527 cb->closure = s->send_message_finished;
1528 s->send_message_finished = nullptr;
1529 grpc_chttp2_write_cb** list = flags & GRPC_WRITE_THROUGH
1530 ? &s->on_write_finished_cbs
1531 : &s->on_flow_controlled_cbs;
1532 cb->next = *list;
1533 *list = cb;
1534 }
1535
1536 if (s->id != 0 &&
1537 (!s->write_buffering ||
1538 s->flow_controlled_buffer.length > t->write_buffer_size)) {
1539 grpc_chttp2_mark_stream_writable(t, s);
1540 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
1541 }
1542 }
1543 }
1544
1545 if (op->send_trailing_metadata) {
1546 GPR_ASSERT(s->send_trailing_metadata_finished == nullptr);
1547 on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
1548 s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
1549 s->send_trailing_metadata =
1550 op_payload->send_trailing_metadata.send_trailing_metadata;
1551 s->sent_trailing_metadata_op = op_payload->send_trailing_metadata.sent;
1552 s->write_buffering = false;
1553 if (contains_non_ok_status(s->send_trailing_metadata)) {
1554 s->seen_error = true;
1555 }
1556 if (s->write_closed) {
1557 s->send_trailing_metadata = nullptr;
1558 s->sent_trailing_metadata_op = nullptr;
1559 grpc_chttp2_complete_closure_step(
1560 t, s, &s->send_trailing_metadata_finished,
1561 op->payload->send_trailing_metadata.send_trailing_metadata->empty()
1562 ? absl::OkStatus()
1563 : GRPC_ERROR_CREATE("Attempt to send trailing metadata after "
1564 "stream was closed"),
1565 "send_trailing_metadata_finished");
1566 } else if (s->id != 0) {
1567 // TODO(ctiller): check if there's flow control for any outstanding
1568 // bytes before going writable
1569 grpc_chttp2_mark_stream_writable(t, s);
1570 grpc_chttp2_initiate_write(
1571 t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
1572 }
1573 }
1574
1575 if (op->recv_initial_metadata) {
1576 GPR_ASSERT(s->recv_initial_metadata_ready == nullptr);
1577 s->recv_initial_metadata_ready =
1578 op_payload->recv_initial_metadata.recv_initial_metadata_ready;
1579 s->recv_initial_metadata =
1580 op_payload->recv_initial_metadata.recv_initial_metadata;
1581 s->trailing_metadata_available =
1582 op_payload->recv_initial_metadata.trailing_metadata_available;
1583 if (s->parsed_trailers_only && s->trailing_metadata_available != nullptr) {
1584 *s->trailing_metadata_available = true;
1585 }
1586 grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
1587 }
1588
1589 if (op->recv_message) {
1590 GPR_ASSERT(s->recv_message_ready == nullptr);
1591 s->recv_message_ready = op_payload->recv_message.recv_message_ready;
1592 s->recv_message = op_payload->recv_message.recv_message;
1593 s->recv_message->emplace();
1594 s->recv_message_flags = op_payload->recv_message.flags;
1595 s->call_failed_before_recv_message =
1596 op_payload->recv_message.call_failed_before_recv_message;
1597 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1598 }
1599
1600 if (op->recv_trailing_metadata) {
1601 GPR_ASSERT(s->collecting_stats == nullptr);
1602 s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
1603 GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
1604 s->recv_trailing_metadata_finished =
1605 op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1606 s->recv_trailing_metadata =
1607 op_payload->recv_trailing_metadata.recv_trailing_metadata;
1608 s->final_metadata_requested = true;
1609 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1610 }
1611
1612 if (on_complete != nullptr) {
1613 grpc_chttp2_complete_closure_step(t, s, &on_complete, absl::OkStatus(),
1614 "op->on_complete");
1615 }
1616
1617 GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op");
1618 }
1619
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)1620 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1621 grpc_transport_stream_op_batch* op) {
1622 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1623 grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
1624
1625 if (!t->is_client) {
1626 if (op->send_initial_metadata) {
1627 GPR_ASSERT(!op->payload->send_initial_metadata.send_initial_metadata
1628 ->get(grpc_core::GrpcTimeoutMetadata())
1629 .has_value());
1630 }
1631 if (op->send_trailing_metadata) {
1632 GPR_ASSERT(!op->payload->send_trailing_metadata.send_trailing_metadata
1633 ->get(grpc_core::GrpcTimeoutMetadata())
1634 .has_value());
1635 }
1636 }
1637
1638 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1639 gpr_log(GPR_INFO, "perform_stream_op[s=%p; op=%p]: %s", s, op,
1640 grpc_transport_stream_op_batch_string(op, false).c_str());
1641 }
1642
1643 GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
1644 op->handler_private.extra_arg = gs;
1645 t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1646 perform_stream_op_locked, op, nullptr),
1647 absl::OkStatus());
1648 }
1649
cancel_pings(grpc_chttp2_transport * t,grpc_error_handle error)1650 static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error) {
1651 // callback remaining pings: they're not allowed to call into the transport,
1652 // and maybe they hold resources that need to be freed
1653 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1654 GPR_ASSERT(!error.ok());
1655 for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
1656 grpc_closure_list_fail_all(&pq->lists[j], error);
1657 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &pq->lists[j]);
1658 }
1659 }
1660
send_ping_locked(grpc_chttp2_transport * t,grpc_closure * on_initiate,grpc_closure * on_ack)1661 static void send_ping_locked(grpc_chttp2_transport* t,
1662 grpc_closure* on_initiate, grpc_closure* on_ack) {
1663 if (!t->closed_with_error.ok()) {
1664 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_initiate, t->closed_with_error);
1665 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_ack, t->closed_with_error);
1666 return;
1667 }
1668 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1669 grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
1670 absl::OkStatus());
1671 grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
1672 absl::OkStatus());
1673 }
1674
1675 // Specialized form of send_ping_locked for keepalive ping. If there is already
1676 // a ping in progress, the keepalive ping would piggyback onto that ping,
1677 // instead of waiting for that ping to complete and then starting a new ping.
send_keepalive_ping_locked(grpc_chttp2_transport * t)1678 static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
1679 if (!t->closed_with_error.ok()) {
1680 t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
1681 start_keepalive_ping_locked, t, nullptr),
1682 t->closed_with_error);
1683 t->combiner->Run(
1684 GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
1685 finish_keepalive_ping_locked, t, nullptr),
1686 t->closed_with_error);
1687 return;
1688 }
1689 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1690 if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
1691 // There is a ping in flight. Add yourself to the inflight closure list.
1692 t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
1693 start_keepalive_ping_locked, t, nullptr),
1694 t->closed_with_error);
1695 grpc_closure_list_append(
1696 &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT],
1697 GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
1698 finish_keepalive_ping, t, grpc_schedule_on_exec_ctx),
1699 absl::OkStatus());
1700 return;
1701 }
1702 grpc_closure_list_append(
1703 &pq->lists[GRPC_CHTTP2_PCL_INITIATE],
1704 GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, start_keepalive_ping,
1705 t, grpc_schedule_on_exec_ctx),
1706 absl::OkStatus());
1707 grpc_closure_list_append(
1708 &pq->lists[GRPC_CHTTP2_PCL_NEXT],
1709 GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, finish_keepalive_ping,
1710 t, grpc_schedule_on_exec_ctx),
1711 absl::OkStatus());
1712 }
1713
grpc_chttp2_retry_initiate_ping(grpc_chttp2_transport * t)1714 void grpc_chttp2_retry_initiate_ping(grpc_chttp2_transport* t) {
1715 t->combiner->Run(GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked,
1716 retry_initiate_ping_locked, t, nullptr),
1717 absl::OkStatus());
1718 }
1719
retry_initiate_ping_locked(void * tp,GRPC_UNUSED grpc_error_handle error)1720 static void retry_initiate_ping_locked(void* tp,
1721 GRPC_UNUSED grpc_error_handle error) {
1722 GPR_DEBUG_ASSERT(error.ok());
1723 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1724 GPR_ASSERT(t->ping_state.delayed_ping_timer_handle.has_value());
1725 t->ping_state.delayed_ping_timer_handle.reset();
1726 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
1727 GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked");
1728 }
1729
grpc_chttp2_ack_ping(grpc_chttp2_transport * t,uint64_t id)1730 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
1731 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1732 if (pq->inflight_id != id) {
1733 gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64,
1734 std::string(t->peer_string.as_string_view()).c_str(), id);
1735 return;
1736 }
1737 grpc_core::ExecCtx::RunList(DEBUG_LOCATION,
1738 &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
1739 if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
1740 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
1741 }
1742 }
1743
1744 namespace {
1745
1746 // Fire and forget (deletes itself on completion). Does a graceful shutdown by
1747 // sending a GOAWAY frame with the last stream id set to 2^31-1, sending a ping
1748 // and waiting for an ack (effective waiting for an RTT) and then sending a
1749 // final GOAWAY frame with an updated last stream identifier. This helps ensure
1750 // that a connection can be cleanly shut down without losing requests.
1751 // In the event, that the client does not respond to the ping for some reason,
1752 // we add a 20 second deadline, after which we send the second goaway.
1753 class GracefulGoaway : public grpc_core::RefCounted<GracefulGoaway> {
1754 public:
Start(grpc_chttp2_transport * t)1755 static void Start(grpc_chttp2_transport* t) { new GracefulGoaway(t); }
1756
~GracefulGoaway()1757 ~GracefulGoaway() override {
1758 GRPC_CHTTP2_UNREF_TRANSPORT(t_, "graceful goaway");
1759 }
1760
1761 private:
1762 using TaskHandle = ::grpc_event_engine::experimental::EventEngine::TaskHandle;
1763
GracefulGoaway(grpc_chttp2_transport * t)1764 explicit GracefulGoaway(grpc_chttp2_transport* t) : t_(t) {
1765 t->sent_goaway_state = GRPC_CHTTP2_GRACEFUL_GOAWAY;
1766 GRPC_CHTTP2_REF_TRANSPORT(t_, "graceful goaway");
1767 grpc_chttp2_goaway_append((1u << 31) - 1, 0, grpc_empty_slice(), &t->qbuf);
1768 send_ping_locked(
1769 t, nullptr, GRPC_CLOSURE_INIT(&on_ping_ack_, OnPingAck, this, nullptr));
1770 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1771 timer_handle_ = t_->event_engine->RunAfter(
1772 grpc_core::Duration::Seconds(20),
1773 [self = Ref(DEBUG_LOCATION, "GoawayTimer")]() mutable {
1774 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1775 grpc_core::ExecCtx exec_ctx;
1776 // The ref will be unreffed in the combiner.
1777 auto* ptr = self.release();
1778 ptr->t_->combiner->Run(
1779 GRPC_CLOSURE_INIT(&ptr->on_timer_, OnTimerLocked, ptr, nullptr),
1780 absl::OkStatus());
1781 });
1782 }
1783
MaybeSendFinalGoawayLocked()1784 void MaybeSendFinalGoawayLocked() {
1785 if (t_->sent_goaway_state != GRPC_CHTTP2_GRACEFUL_GOAWAY) {
1786 // We already sent the final GOAWAY.
1787 return;
1788 }
1789 if (t_->destroying || !t_->closed_with_error.ok()) {
1790 GRPC_CHTTP2_IF_TRACING(
1791 gpr_log(GPR_INFO,
1792 "transport:%p %s peer:%s Transport already shutting down. "
1793 "Graceful GOAWAY abandoned.",
1794 t_, t_->is_client ? "CLIENT" : "SERVER",
1795 std::string(t_->peer_string.as_string_view()).c_str()));
1796 return;
1797 }
1798 // Ping completed. Send final goaway.
1799 GRPC_CHTTP2_IF_TRACING(
1800 gpr_log(GPR_INFO,
1801 "transport:%p %s peer:%s Graceful shutdown: Ping received. "
1802 "Sending final GOAWAY with stream_id:%d",
1803 t_, t_->is_client ? "CLIENT" : "SERVER",
1804 std::string(t_->peer_string.as_string_view()).c_str(),
1805 t_->last_new_stream_id));
1806 t_->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED;
1807 grpc_chttp2_goaway_append(t_->last_new_stream_id, 0, grpc_empty_slice(),
1808 &t_->qbuf);
1809 grpc_chttp2_initiate_write(t_, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1810 }
1811
OnPingAck(void * arg,grpc_error_handle)1812 static void OnPingAck(void* arg, grpc_error_handle /* error */) {
1813 auto* self = static_cast<GracefulGoaway*>(arg);
1814 self->t_->combiner->Run(
1815 GRPC_CLOSURE_INIT(&self->on_ping_ack_, OnPingAckLocked, self, nullptr),
1816 absl::OkStatus());
1817 }
1818
OnPingAckLocked(void * arg,grpc_error_handle)1819 static void OnPingAckLocked(void* arg, grpc_error_handle /* error */) {
1820 auto* self = static_cast<GracefulGoaway*>(arg);
1821 if (self->timer_handle_ != TaskHandle::kInvalid) {
1822 self->t_->event_engine->Cancel(
1823 std::exchange(self->timer_handle_, TaskHandle::kInvalid));
1824 }
1825 self->MaybeSendFinalGoawayLocked();
1826 self->Unref();
1827 }
1828
OnTimerLocked(void * arg,grpc_error_handle)1829 static void OnTimerLocked(void* arg, grpc_error_handle /* error */) {
1830 auto* self = static_cast<GracefulGoaway*>(arg);
1831 // Clearing the handle since the timer has fired and the handle is invalid.
1832 self->timer_handle_ = TaskHandle::kInvalid;
1833 self->MaybeSendFinalGoawayLocked();
1834 self->Unref();
1835 }
1836
1837 grpc_chttp2_transport* t_;
1838 grpc_closure on_ping_ack_;
1839 TaskHandle timer_handle_ = TaskHandle::kInvalid;
1840 grpc_closure on_timer_;
1841 };
1842
1843 } // namespace
1844
send_goaway(grpc_chttp2_transport * t,grpc_error_handle error,bool immediate_disconnect_hint)1845 static void send_goaway(grpc_chttp2_transport* t, grpc_error_handle error,
1846 bool immediate_disconnect_hint) {
1847 grpc_http2_error_code http_error;
1848 std::string message;
1849 grpc_error_get_status(error, grpc_core::Timestamp::InfFuture(), nullptr,
1850 &message, &http_error, nullptr);
1851 if (!t->is_client && http_error == GRPC_HTTP2_NO_ERROR &&
1852 !immediate_disconnect_hint) {
1853 // Do a graceful shutdown.
1854 if (t->sent_goaway_state == GRPC_CHTTP2_NO_GOAWAY_SEND) {
1855 GracefulGoaway::Start(t);
1856 } else {
1857 // Graceful GOAWAY is already in progress.
1858 }
1859 } else if (t->sent_goaway_state == GRPC_CHTTP2_NO_GOAWAY_SEND ||
1860 t->sent_goaway_state == GRPC_CHTTP2_GRACEFUL_GOAWAY) {
1861 // We want to log this irrespective of whether http tracing is enabled
1862 gpr_log(GPR_DEBUG, "%s: Sending goaway err=%s",
1863 std::string(t->peer_string.as_string_view()).c_str(),
1864 grpc_core::StatusToString(error).c_str());
1865 t->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED;
1866 grpc_chttp2_goaway_append(
1867 t->last_new_stream_id, static_cast<uint32_t>(http_error),
1868 grpc_slice_from_cpp_string(std::move(message)), &t->qbuf);
1869 } else {
1870 // Final GOAWAY has already been sent.
1871 }
1872 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1873 }
1874
grpc_chttp2_add_ping_strike(grpc_chttp2_transport * t)1875 void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) {
1876 if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes &&
1877 t->ping_policy.max_ping_strikes != 0) {
1878 send_goaway(t,
1879 grpc_error_set_int(GRPC_ERROR_CREATE("too_many_pings"),
1880 grpc_core::StatusIntProperty::kHttp2Error,
1881 GRPC_HTTP2_ENHANCE_YOUR_CALM),
1882 /*immediate_disconnect_hint=*/true);
1883 // The transport will be closed after the write is done
1884 close_transport_locked(
1885 t, grpc_error_set_int(GRPC_ERROR_CREATE("Too many pings"),
1886 grpc_core::StatusIntProperty::kRpcStatus,
1887 GRPC_STATUS_UNAVAILABLE));
1888 }
1889 }
1890
grpc_chttp2_reset_ping_clock(grpc_chttp2_transport * t)1891 void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t) {
1892 if (!t->is_client) {
1893 t->ping_recv_state.last_ping_recv_time = grpc_core::Timestamp::InfPast();
1894 t->ping_recv_state.ping_strikes = 0;
1895 }
1896 t->ping_state.pings_before_data_required =
1897 t->ping_policy.max_pings_without_data;
1898 }
1899
perform_transport_op_locked(void * stream_op,grpc_error_handle)1900 static void perform_transport_op_locked(void* stream_op,
1901 grpc_error_handle /*error_ignored*/) {
1902 grpc_transport_op* op = static_cast<grpc_transport_op*>(stream_op);
1903 grpc_chttp2_transport* t =
1904 static_cast<grpc_chttp2_transport*>(op->handler_private.extra_arg);
1905
1906 if (!op->goaway_error.ok()) {
1907 send_goaway(t, op->goaway_error, /*immediate_disconnect_hint=*/false);
1908 }
1909
1910 if (op->set_accept_stream) {
1911 t->accept_stream_cb = op->set_accept_stream_fn;
1912 t->accept_stream_cb_user_data = op->set_accept_stream_user_data;
1913 }
1914
1915 if (op->bind_pollset) {
1916 grpc_endpoint_add_to_pollset(t->ep, op->bind_pollset);
1917 }
1918
1919 if (op->bind_pollset_set) {
1920 grpc_endpoint_add_to_pollset_set(t->ep, op->bind_pollset_set);
1921 }
1922
1923 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1924 send_ping_locked(t, op->send_ping.on_initiate, op->send_ping.on_ack);
1925 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
1926 }
1927
1928 if (op->start_connectivity_watch != nullptr) {
1929 t->state_tracker.AddWatcher(op->start_connectivity_watch_state,
1930 std::move(op->start_connectivity_watch));
1931 }
1932 if (op->stop_connectivity_watch != nullptr) {
1933 t->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
1934 }
1935
1936 if (!op->disconnect_with_error.ok()) {
1937 send_goaway(t, op->disconnect_with_error,
1938 /*immediate_disconnect_hint=*/true);
1939 close_transport_locked(t, op->disconnect_with_error);
1940 }
1941
1942 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
1943
1944 GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op");
1945 }
1946
perform_transport_op(grpc_transport * gt,grpc_transport_op * op)1947 static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
1948 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1949 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1950 gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t,
1951 grpc_transport_op_string(op).c_str());
1952 }
1953 op->handler_private.extra_arg = gt;
1954 GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
1955 t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1956 perform_transport_op_locked, op, nullptr),
1957 absl::OkStatus());
1958 }
1959
1960 //
1961 // INPUT PROCESSING - GENERAL
1962 //
1963
grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1964 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
1965 grpc_chttp2_stream* s) {
1966 if (s->recv_initial_metadata_ready != nullptr &&
1967 s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
1968 if (s->seen_error) {
1969 grpc_slice_buffer_reset_and_unref(&s->frame_storage);
1970 }
1971 *s->recv_initial_metadata = std::move(s->initial_metadata_buffer);
1972 s->recv_initial_metadata->Set(grpc_core::PeerString(),
1973 t->peer_string.Ref());
1974 // If we didn't receive initial metadata from the wire and instead faked a
1975 // status (due to stream cancellations for example), let upper layers know
1976 // that trailing metadata is immediately available.
1977 if (s->trailing_metadata_available != nullptr &&
1978 s->published_metadata[0] != GRPC_METADATA_PUBLISHED_FROM_WIRE &&
1979 s->published_metadata[1] == GRPC_METADATA_SYNTHESIZED_FROM_FAKE) {
1980 *s->trailing_metadata_available = true;
1981 s->trailing_metadata_available = nullptr;
1982 }
1983 null_then_sched_closure(&s->recv_initial_metadata_ready);
1984 }
1985 }
1986
grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1987 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
1988 grpc_chttp2_stream* s) {
1989 if (s->recv_message_ready == nullptr) return;
1990
1991 grpc_core::chttp2::StreamFlowControl::IncomingUpdateContext upd(
1992 &s->flow_control);
1993 grpc_error_handle error;
1994
1995 // Lambda is immediately invoked as a big scoped section that can be
1996 // exited out of at any point by returning.
1997 [&]() {
1998 if (grpc_http_trace.enabled()) {
1999 gpr_log(GPR_DEBUG,
2000 "maybe_complete_recv_message %p final_metadata_requested=%d "
2001 "seen_error=%d",
2002 s, s->final_metadata_requested, s->seen_error);
2003 }
2004 if (s->final_metadata_requested && s->seen_error) {
2005 grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2006 s->recv_message->reset();
2007 } else {
2008 if (s->frame_storage.length != 0) {
2009 while (true) {
2010 GPR_ASSERT(s->frame_storage.length > 0);
2011 int64_t min_progress_size;
2012 auto r = grpc_deframe_unprocessed_incoming_frames(
2013 s, &min_progress_size, &**s->recv_message, s->recv_message_flags);
2014 if (grpc_http_trace.enabled()) {
2015 gpr_log(GPR_DEBUG, "Deframe data frame: %s",
2016 grpc_core::PollToString(r, [](absl::Status r) {
2017 return r.ToString();
2018 }).c_str());
2019 }
2020 if (r.pending()) {
2021 if (s->read_closed) {
2022 grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2023 s->recv_message->reset();
2024 break;
2025 } else {
2026 upd.SetMinProgressSize(min_progress_size);
2027 return; // Out of lambda to enclosing function
2028 }
2029 } else {
2030 error = std::move(r.value());
2031 if (!error.ok()) {
2032 s->seen_error = true;
2033 grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2034 break;
2035 } else {
2036 if (t->channelz_socket != nullptr) {
2037 t->channelz_socket->RecordMessageReceived();
2038 }
2039 break;
2040 }
2041 }
2042 }
2043 } else if (s->read_closed) {
2044 s->recv_message->reset();
2045 } else {
2046 upd.SetMinProgressSize(GRPC_HEADER_SIZE_IN_BYTES);
2047 return; // Out of lambda to enclosing function
2048 }
2049 }
2050 // save the length of the buffer before handing control back to application
2051 // threads. Needed to support correct flow control bookkeeping
2052 if (error.ok() && s->recv_message->has_value()) {
2053 null_then_sched_closure(&s->recv_message_ready);
2054 } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
2055 if (s->call_failed_before_recv_message != nullptr) {
2056 *s->call_failed_before_recv_message =
2057 (s->published_metadata[1] != GRPC_METADATA_PUBLISHED_AT_CLOSE);
2058 }
2059 null_then_sched_closure(&s->recv_message_ready);
2060 }
2061 }();
2062
2063 upd.SetPendingSize(s->frame_storage.length);
2064 grpc_chttp2_act_on_flowctl_action(upd.MakeAction(), t, s);
2065 }
2066
grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)2067 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
2068 grpc_chttp2_stream* s) {
2069 grpc_chttp2_maybe_complete_recv_message(t, s);
2070 if (grpc_http_trace.enabled()) {
2071 gpr_log(GPR_DEBUG,
2072 "maybe_complete_recv_trailing_metadata cli=%d s=%p closure=%p "
2073 "read_closed=%d "
2074 "write_closed=%d %" PRIdPTR,
2075 t->is_client, s, s->recv_trailing_metadata_finished, s->read_closed,
2076 s->write_closed, s->frame_storage.length);
2077 }
2078 if (s->recv_trailing_metadata_finished != nullptr && s->read_closed &&
2079 s->write_closed) {
2080 if (s->seen_error || !t->is_client) {
2081 grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2082 }
2083 if (s->read_closed && s->frame_storage.length == 0 &&
2084 s->recv_trailing_metadata_finished != nullptr) {
2085 grpc_transport_move_stats(&s->stats, s->collecting_stats);
2086 s->collecting_stats = nullptr;
2087 *s->recv_trailing_metadata = std::move(s->trailing_metadata_buffer);
2088 null_then_sched_closure(&s->recv_trailing_metadata_finished);
2089 }
2090 }
2091 }
2092
remove_stream(grpc_chttp2_transport * t,uint32_t id,grpc_error_handle error)2093 static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
2094 grpc_error_handle error) {
2095 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
2096 grpc_chttp2_stream_map_delete(&t->stream_map, id));
2097 GPR_DEBUG_ASSERT(s);
2098 if (t->incoming_stream == s) {
2099 t->incoming_stream = nullptr;
2100 grpc_chttp2_parsing_become_skip_parser(t);
2101 }
2102
2103 if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
2104 post_benign_reclaimer(t);
2105 if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SENT) {
2106 close_transport_locked(
2107 t, GRPC_ERROR_CREATE_REFERENCING(
2108 "Last stream closed after sending GOAWAY", &error, 1));
2109 }
2110 }
2111 if (grpc_chttp2_list_remove_writable_stream(t, s)) {
2112 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream");
2113 }
2114 grpc_chttp2_list_remove_stalled_by_stream(t, s);
2115 grpc_chttp2_list_remove_stalled_by_transport(t, s);
2116
2117 maybe_start_some_streams(t);
2118 }
2119
grpc_chttp2_cancel_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle due_to_error)2120 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2121 grpc_error_handle due_to_error) {
2122 if (!t->is_client && !s->sent_trailing_metadata &&
2123 grpc_error_has_clear_grpc_status(due_to_error)) {
2124 close_from_api(t, s, due_to_error);
2125 return;
2126 }
2127
2128 if (!s->read_closed || !s->write_closed) {
2129 if (s->id != 0) {
2130 grpc_http2_error_code http_error;
2131 grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr,
2132 &http_error, nullptr);
2133 grpc_chttp2_add_rst_stream_to_next_write(
2134 t, s->id, static_cast<uint32_t>(http_error), &s->stats.outgoing);
2135 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
2136 }
2137 }
2138 if (!due_to_error.ok() && !s->seen_error) {
2139 s->seen_error = true;
2140 }
2141 grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error);
2142 }
2143
grpc_chttp2_fake_status(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error)2144 void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2145 grpc_error_handle error) {
2146 grpc_status_code status;
2147 std::string message;
2148 grpc_error_get_status(error, s->deadline, &status, &message, nullptr,
2149 nullptr);
2150 if (status != GRPC_STATUS_OK) {
2151 s->seen_error = true;
2152 }
2153 // stream_global->recv_trailing_metadata_finished gives us a
2154 // last chance replacement: we've received trailing metadata,
2155 // but something more important has become available to signal
2156 // to the upper layers - drop what we've got, and then publish
2157 // what we want - which is safe because we haven't told anyone
2158 // about the metadata yet
2159 if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED ||
2160 s->recv_trailing_metadata_finished != nullptr ||
2161 !s->final_metadata_requested) {
2162 s->trailing_metadata_buffer.Set(grpc_core::GrpcStatusMetadata(), status);
2163 if (!message.empty()) {
2164 s->trailing_metadata_buffer.Set(
2165 grpc_core::GrpcMessageMetadata(),
2166 grpc_core::Slice::FromCopiedBuffer(message));
2167 }
2168 s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
2169 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2170 }
2171 }
2172
add_error(grpc_error_handle error,grpc_error_handle * refs,size_t * nrefs)2173 static void add_error(grpc_error_handle error, grpc_error_handle* refs,
2174 size_t* nrefs) {
2175 if (error.ok()) return;
2176 for (size_t i = 0; i < *nrefs; i++) {
2177 if (error == refs[i]) {
2178 return;
2179 }
2180 }
2181 refs[*nrefs] = error;
2182 ++*nrefs;
2183 }
2184
removal_error(grpc_error_handle extra_error,grpc_chttp2_stream * s,const char * main_error_msg)2185 static grpc_error_handle removal_error(grpc_error_handle extra_error,
2186 grpc_chttp2_stream* s,
2187 const char* main_error_msg) {
2188 grpc_error_handle refs[3];
2189 size_t nrefs = 0;
2190 add_error(s->read_closed_error, refs, &nrefs);
2191 add_error(s->write_closed_error, refs, &nrefs);
2192 add_error(extra_error, refs, &nrefs);
2193 grpc_error_handle error;
2194 if (nrefs > 0) {
2195 error = GRPC_ERROR_CREATE_REFERENCING(main_error_msg, refs, nrefs);
2196 }
2197 return error;
2198 }
2199
flush_write_list(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_write_cb ** list,grpc_error_handle error)2200 static void flush_write_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2201 grpc_chttp2_write_cb** list,
2202 grpc_error_handle error) {
2203 while (*list) {
2204 grpc_chttp2_write_cb* cb = *list;
2205 *list = cb->next;
2206 grpc_chttp2_complete_closure_step(t, s, &cb->closure, error,
2207 "on_write_finished_cb");
2208 cb->next = t->write_cb_pool;
2209 t->write_cb_pool = cb;
2210 }
2211 }
2212
grpc_chttp2_fail_pending_writes(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error)2213 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
2214 grpc_chttp2_stream* s,
2215 grpc_error_handle error) {
2216 error =
2217 removal_error(error, s, "Pending writes failed due to stream closure");
2218 s->send_initial_metadata = nullptr;
2219 grpc_chttp2_complete_closure_step(t, s, &s->send_initial_metadata_finished,
2220 error, "send_initial_metadata_finished");
2221
2222 s->send_trailing_metadata = nullptr;
2223 s->sent_trailing_metadata_op = nullptr;
2224 grpc_chttp2_complete_closure_step(t, s, &s->send_trailing_metadata_finished,
2225 error, "send_trailing_metadata_finished");
2226
2227 grpc_chttp2_complete_closure_step(t, s, &s->send_message_finished, error,
2228 "fetching_send_message_finished");
2229 flush_write_list(t, s, &s->on_write_finished_cbs, error);
2230 flush_write_list(t, s, &s->on_flow_controlled_cbs, error);
2231 }
2232
grpc_chttp2_mark_stream_closed(grpc_chttp2_transport * t,grpc_chttp2_stream * s,int close_reads,int close_writes,grpc_error_handle error)2233 void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
2234 grpc_chttp2_stream* s, int close_reads,
2235 int close_writes, grpc_error_handle error) {
2236 if (grpc_http_trace.enabled()) {
2237 gpr_log(
2238 GPR_DEBUG, "MARK_STREAM_CLOSED: t=%p s=%p(id=%d) %s [%s]", t, s, s->id,
2239 (close_reads && close_writes)
2240 ? "read+write"
2241 : (close_reads ? "read" : (close_writes ? "write" : "nothing??")),
2242 error.ToString().c_str());
2243 }
2244 if (s->read_closed && s->write_closed) {
2245 // already closed, but we should still fake the status if needed.
2246 grpc_error_handle overall_error = removal_error(error, s, "Stream removed");
2247 if (!overall_error.ok()) {
2248 grpc_chttp2_fake_status(t, s, overall_error);
2249 }
2250 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2251 return;
2252 }
2253 bool closed_read = false;
2254 bool became_closed = false;
2255 if (close_reads && !s->read_closed) {
2256 s->read_closed_error = error;
2257 s->read_closed = true;
2258 closed_read = true;
2259 }
2260 if (close_writes && !s->write_closed) {
2261 s->write_closed_error = error;
2262 s->write_closed = true;
2263 grpc_chttp2_fail_pending_writes(t, s, error);
2264 }
2265 if (s->read_closed && s->write_closed) {
2266 became_closed = true;
2267 grpc_error_handle overall_error = removal_error(error, s, "Stream removed");
2268 if (s->id != 0) {
2269 remove_stream(t, s->id, overall_error);
2270 } else {
2271 // Purge streams waiting on concurrency still waiting for id assignment
2272 grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
2273 }
2274 if (!overall_error.ok()) {
2275 grpc_chttp2_fake_status(t, s, overall_error);
2276 }
2277 }
2278 if (closed_read) {
2279 for (int i = 0; i < 2; i++) {
2280 if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) {
2281 s->published_metadata[i] = GRPC_METADATA_PUBLISHED_AT_CLOSE;
2282 }
2283 }
2284 grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
2285 grpc_chttp2_maybe_complete_recv_message(t, s);
2286 }
2287 if (became_closed) {
2288 s->stats.latency =
2289 gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), s->creation_time);
2290 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2291 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2");
2292 }
2293 }
2294
close_from_api(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error)2295 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2296 grpc_error_handle error) {
2297 grpc_slice hdr;
2298 grpc_slice status_hdr;
2299 grpc_slice http_status_hdr;
2300 grpc_slice content_type_hdr;
2301 grpc_slice message_pfx;
2302 uint8_t* p;
2303 uint32_t len = 0;
2304 grpc_status_code grpc_status;
2305 std::string message;
2306 grpc_error_get_status(error, s->deadline, &grpc_status, &message, nullptr,
2307 nullptr);
2308
2309 GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100);
2310
2311 // Hand roll a header block.
2312 // This is unnecessarily ugly - at some point we should find a more
2313 // elegant solution.
2314 // It's complicated by the fact that our send machinery would be dead by
2315 // the time we got around to sending this, so instead we ignore HPACK
2316 // compression and just write the uncompressed bytes onto the wire.
2317 if (!s->sent_initial_metadata) {
2318 http_status_hdr = GRPC_SLICE_MALLOC(13);
2319 p = GRPC_SLICE_START_PTR(http_status_hdr);
2320 *p++ = 0x00;
2321 *p++ = 7;
2322 *p++ = ':';
2323 *p++ = 's';
2324 *p++ = 't';
2325 *p++ = 'a';
2326 *p++ = 't';
2327 *p++ = 'u';
2328 *p++ = 's';
2329 *p++ = 3;
2330 *p++ = '2';
2331 *p++ = '0';
2332 *p++ = '0';
2333 GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
2334 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(http_status_hdr);
2335
2336 content_type_hdr = GRPC_SLICE_MALLOC(31);
2337 p = GRPC_SLICE_START_PTR(content_type_hdr);
2338 *p++ = 0x00;
2339 *p++ = 12;
2340 *p++ = 'c';
2341 *p++ = 'o';
2342 *p++ = 'n';
2343 *p++ = 't';
2344 *p++ = 'e';
2345 *p++ = 'n';
2346 *p++ = 't';
2347 *p++ = '-';
2348 *p++ = 't';
2349 *p++ = 'y';
2350 *p++ = 'p';
2351 *p++ = 'e';
2352 *p++ = 16;
2353 *p++ = 'a';
2354 *p++ = 'p';
2355 *p++ = 'p';
2356 *p++ = 'l';
2357 *p++ = 'i';
2358 *p++ = 'c';
2359 *p++ = 'a';
2360 *p++ = 't';
2361 *p++ = 'i';
2362 *p++ = 'o';
2363 *p++ = 'n';
2364 *p++ = '/';
2365 *p++ = 'g';
2366 *p++ = 'r';
2367 *p++ = 'p';
2368 *p++ = 'c';
2369 GPR_ASSERT(p == GRPC_SLICE_END_PTR(content_type_hdr));
2370 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(content_type_hdr);
2371 }
2372
2373 status_hdr = GRPC_SLICE_MALLOC(15 + (grpc_status >= 10));
2374 p = GRPC_SLICE_START_PTR(status_hdr);
2375 *p++ = 0x00; // literal header, not indexed
2376 *p++ = 11; // len(grpc-status)
2377 *p++ = 'g';
2378 *p++ = 'r';
2379 *p++ = 'p';
2380 *p++ = 'c';
2381 *p++ = '-';
2382 *p++ = 's';
2383 *p++ = 't';
2384 *p++ = 'a';
2385 *p++ = 't';
2386 *p++ = 'u';
2387 *p++ = 's';
2388 if (grpc_status < 10) {
2389 *p++ = 1;
2390 *p++ = static_cast<uint8_t>('0' + grpc_status);
2391 } else {
2392 *p++ = 2;
2393 *p++ = static_cast<uint8_t>('0' + (grpc_status / 10));
2394 *p++ = static_cast<uint8_t>('0' + (grpc_status % 10));
2395 }
2396 GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr));
2397 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(status_hdr);
2398
2399 size_t msg_len = message.length();
2400 GPR_ASSERT(msg_len <= UINT32_MAX);
2401 grpc_core::VarintWriter<1> msg_len_writer(static_cast<uint32_t>(msg_len));
2402 message_pfx = GRPC_SLICE_MALLOC(14 + msg_len_writer.length());
2403 p = GRPC_SLICE_START_PTR(message_pfx);
2404 *p++ = 0x00; // literal header, not indexed
2405 *p++ = 12; // len(grpc-message)
2406 *p++ = 'g';
2407 *p++ = 'r';
2408 *p++ = 'p';
2409 *p++ = 'c';
2410 *p++ = '-';
2411 *p++ = 'm';
2412 *p++ = 'e';
2413 *p++ = 's';
2414 *p++ = 's';
2415 *p++ = 'a';
2416 *p++ = 'g';
2417 *p++ = 'e';
2418 msg_len_writer.Write(0, p);
2419 p += msg_len_writer.length();
2420 GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
2421 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(message_pfx);
2422 len += static_cast<uint32_t>(msg_len);
2423
2424 hdr = GRPC_SLICE_MALLOC(9);
2425 p = GRPC_SLICE_START_PTR(hdr);
2426 *p++ = static_cast<uint8_t>(len >> 16);
2427 *p++ = static_cast<uint8_t>(len >> 8);
2428 *p++ = static_cast<uint8_t>(len);
2429 *p++ = GRPC_CHTTP2_FRAME_HEADER;
2430 *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
2431 *p++ = static_cast<uint8_t>(s->id >> 24);
2432 *p++ = static_cast<uint8_t>(s->id >> 16);
2433 *p++ = static_cast<uint8_t>(s->id >> 8);
2434 *p++ = static_cast<uint8_t>(s->id);
2435 GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
2436
2437 grpc_slice_buffer_add(&t->qbuf, hdr);
2438 if (!s->sent_initial_metadata) {
2439 grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
2440 grpc_slice_buffer_add(&t->qbuf, content_type_hdr);
2441 }
2442 grpc_slice_buffer_add(&t->qbuf, status_hdr);
2443 grpc_slice_buffer_add(&t->qbuf, message_pfx);
2444 grpc_slice_buffer_add(&t->qbuf,
2445 grpc_slice_from_cpp_string(std::move(message)));
2446 grpc_chttp2_reset_ping_clock(t);
2447 grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
2448 &s->stats.outgoing);
2449
2450 grpc_chttp2_mark_stream_closed(t, s, 1, 1, error);
2451 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
2452 }
2453
2454 struct cancel_stream_cb_args {
2455 grpc_error_handle error;
2456 grpc_chttp2_transport* t;
2457 };
2458
cancel_stream_cb(void * user_data,uint32_t,void * stream)2459 static void cancel_stream_cb(void* user_data, uint32_t /*key*/, void* stream) {
2460 cancel_stream_cb_args* args = static_cast<cancel_stream_cb_args*>(user_data);
2461 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(stream);
2462 grpc_chttp2_cancel_stream(args->t, s, args->error);
2463 }
2464
end_all_the_calls(grpc_chttp2_transport * t,grpc_error_handle error)2465 static void end_all_the_calls(grpc_chttp2_transport* t,
2466 grpc_error_handle error) {
2467 intptr_t http2_error;
2468 // If there is no explicit grpc or HTTP/2 error, set to UNAVAILABLE on server.
2469 if (!t->is_client && !grpc_error_has_clear_grpc_status(error) &&
2470 !grpc_error_get_int(error, grpc_core::StatusIntProperty::kHttp2Error,
2471 &http2_error)) {
2472 error = grpc_error_set_int(error, grpc_core::StatusIntProperty::kRpcStatus,
2473 GRPC_STATUS_UNAVAILABLE);
2474 }
2475 cancel_unstarted_streams(t, error);
2476 cancel_stream_cb_args args = {error, t};
2477 grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, &args);
2478 }
2479
2480 //
2481 // INPUT PROCESSING - PARSING
2482 //
2483
2484 template <class F>
WithUrgency(grpc_chttp2_transport * t,grpc_core::chttp2::FlowControlAction::Urgency urgency,grpc_chttp2_initiate_write_reason reason,F action)2485 static void WithUrgency(grpc_chttp2_transport* t,
2486 grpc_core::chttp2::FlowControlAction::Urgency urgency,
2487 grpc_chttp2_initiate_write_reason reason, F action) {
2488 switch (urgency) {
2489 case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
2490 break;
2491 case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
2492 grpc_chttp2_initiate_write(t, reason);
2493 ABSL_FALLTHROUGH_INTENDED;
2494 case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
2495 action();
2496 break;
2497 }
2498 }
2499
grpc_chttp2_act_on_flowctl_action(const grpc_core::chttp2::FlowControlAction & action,grpc_chttp2_transport * t,grpc_chttp2_stream * s)2500 void grpc_chttp2_act_on_flowctl_action(
2501 const grpc_core::chttp2::FlowControlAction& action,
2502 grpc_chttp2_transport* t, grpc_chttp2_stream* s) {
2503 WithUrgency(t, action.send_stream_update(),
2504 GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL, [t, s]() {
2505 if (s->id != 0 && !s->read_closed) {
2506 grpc_chttp2_mark_stream_writable(t, s);
2507 }
2508 });
2509 WithUrgency(t, action.send_transport_update(),
2510 GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
2511 WithUrgency(t, action.send_initial_window_update(),
2512 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2513 queue_setting_update(t,
2514 GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
2515 action.initial_window_size());
2516 });
2517 WithUrgency(t, action.send_max_frame_size_update(),
2518 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2519 queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
2520 action.max_frame_size());
2521 });
2522 if (t->enable_preferred_rx_crypto_frame_advertisement) {
2523 WithUrgency(
2524 t, action.preferred_rx_crypto_frame_size_update(),
2525 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2526 queue_setting_update(
2527 t, GRPC_CHTTP2_SETTINGS_GRPC_PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE,
2528 action.preferred_rx_crypto_frame_size());
2529 });
2530 }
2531 }
2532
try_http_parsing(grpc_chttp2_transport * t)2533 static grpc_error_handle try_http_parsing(grpc_chttp2_transport* t) {
2534 grpc_http_parser parser;
2535 size_t i = 0;
2536 grpc_error_handle error;
2537 grpc_http_response response;
2538
2539 grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response);
2540
2541 grpc_error_handle parse_error;
2542 for (; i < t->read_buffer.count && parse_error.ok(); i++) {
2543 parse_error =
2544 grpc_http_parser_parse(&parser, t->read_buffer.slices[i], nullptr);
2545 }
2546 if (parse_error.ok() &&
2547 (parse_error = grpc_http_parser_eof(&parser)) == absl::OkStatus()) {
2548 error = grpc_error_set_int(
2549 grpc_error_set_int(
2550 GRPC_ERROR_CREATE("Trying to connect an http1.x server"),
2551 grpc_core::StatusIntProperty::kHttpStatus, response.status),
2552 grpc_core::StatusIntProperty::kRpcStatus,
2553 grpc_http2_status_to_grpc_status(response.status));
2554 }
2555
2556 grpc_http_parser_destroy(&parser);
2557 grpc_http_response_destroy(&response);
2558 return error;
2559 }
2560
read_action(void * tp,grpc_error_handle error)2561 static void read_action(void* tp, grpc_error_handle error) {
2562 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2563 t->combiner->Run(
2564 GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr),
2565 error);
2566 }
2567
read_action_locked(void * tp,grpc_error_handle error)2568 static void read_action_locked(void* tp, grpc_error_handle error) {
2569 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2570
2571 grpc_error_handle err = error;
2572 if (!err.ok()) {
2573 err = grpc_error_set_int(
2574 GRPC_ERROR_CREATE_REFERENCING("Endpoint read failed", &err, 1),
2575 grpc_core::StatusIntProperty::kOccurredDuringWrite, t->write_state);
2576 }
2577 std::swap(err, error);
2578 if (t->closed_with_error.ok()) {
2579 size_t i = 0;
2580 grpc_error_handle errors[3] = {error, absl::OkStatus(), absl::OkStatus()};
2581 for (; i < t->read_buffer.count && errors[1] == absl::OkStatus(); i++) {
2582 errors[1] = grpc_chttp2_perform_read(t, t->read_buffer.slices[i]);
2583 }
2584 if (errors[1] != absl::OkStatus()) {
2585 errors[2] = try_http_parsing(t);
2586 error = GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors,
2587 GPR_ARRAY_SIZE(errors));
2588 }
2589
2590 if (t->initial_window_update != 0) {
2591 if (t->initial_window_update > 0) {
2592 grpc_chttp2_stream* s;
2593 while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
2594 grpc_chttp2_mark_stream_writable(t, s);
2595 grpc_chttp2_initiate_write(
2596 t, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
2597 }
2598 }
2599 t->initial_window_update = 0;
2600 }
2601 }
2602
2603 bool keep_reading = false;
2604 if (error.ok() && !t->closed_with_error.ok()) {
2605 error = GRPC_ERROR_CREATE_REFERENCING("Transport closed",
2606 &t->closed_with_error, 1);
2607 }
2608 if (!error.ok()) {
2609 // If a goaway frame was received, this might be the reason why the read
2610 // failed. Add this info to the error
2611 if (!t->goaway_error.ok()) {
2612 error = grpc_error_add_child(error, t->goaway_error);
2613 }
2614
2615 close_transport_locked(t, error);
2616 t->endpoint_reading = 0;
2617 } else if (t->closed_with_error.ok()) {
2618 keep_reading = true;
2619 // Since we have read a byte, reset the keepalive timer
2620 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2621 maybe_reset_keepalive_ping_timer_locked(t);
2622 }
2623 }
2624 grpc_slice_buffer_reset_and_unref(&t->read_buffer);
2625
2626 if (keep_reading) {
2627 if (t->num_pending_induced_frames >= DEFAULT_MAX_PENDING_INDUCED_FRAMES) {
2628 t->reading_paused_on_pending_induced_frames = true;
2629 GRPC_CHTTP2_IF_TRACING(
2630 gpr_log(GPR_INFO,
2631 "transport %p : Pausing reading due to too "
2632 "many unwritten SETTINGS ACK and RST_STREAM frames",
2633 t));
2634 } else {
2635 continue_read_action_locked(t);
2636 }
2637 } else {
2638 GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action");
2639 }
2640 }
2641
continue_read_action_locked(grpc_chttp2_transport * t)2642 static void continue_read_action_locked(grpc_chttp2_transport* t) {
2643 const bool urgent = !t->goaway_error.ok();
2644 GRPC_CLOSURE_INIT(&t->read_action_locked, read_action, t,
2645 grpc_schedule_on_exec_ctx);
2646 grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent,
2647 grpc_chttp2_min_read_progress_size(t));
2648 }
2649
2650 // t is reffed prior to calling the first time, and once the callback chain
2651 // that kicks off finishes, it's unreffed
schedule_bdp_ping_locked(grpc_chttp2_transport * t)2652 void schedule_bdp_ping_locked(grpc_chttp2_transport* t) {
2653 t->flow_control.bdp_estimator()->SchedulePing();
2654 send_ping_locked(
2655 t,
2656 GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping, t,
2657 grpc_schedule_on_exec_ctx),
2658 GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping, t,
2659 grpc_schedule_on_exec_ctx));
2660 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_BDP_PING);
2661 }
2662
start_bdp_ping(void * tp,grpc_error_handle error)2663 static void start_bdp_ping(void* tp, grpc_error_handle error) {
2664 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2665 t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked,
2666 start_bdp_ping_locked, t, nullptr),
2667 error);
2668 }
2669
start_bdp_ping_locked(void * tp,grpc_error_handle error)2670 static void start_bdp_ping_locked(void* tp, grpc_error_handle error) {
2671 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2672 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2673 gpr_log(GPR_INFO, "%s: Start BDP ping err=%s",
2674 std::string(t->peer_string.as_string_view()).c_str(),
2675 grpc_core::StatusToString(error).c_str());
2676 }
2677 if (!error.ok() || !t->closed_with_error.ok()) {
2678 return;
2679 }
2680 // Reset the keepalive ping timer
2681 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2682 maybe_reset_keepalive_ping_timer_locked(t);
2683 }
2684 t->flow_control.bdp_estimator()->StartPing();
2685 t->bdp_ping_started = true;
2686 }
2687
finish_bdp_ping(void * tp,grpc_error_handle error)2688 static void finish_bdp_ping(void* tp, grpc_error_handle error) {
2689 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2690 t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked,
2691 finish_bdp_ping_locked, t, nullptr),
2692 error);
2693 }
2694
finish_bdp_ping_locked(void * tp,grpc_error_handle error)2695 static void finish_bdp_ping_locked(void* tp, grpc_error_handle error) {
2696 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2697 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2698 gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s",
2699 std::string(t->peer_string.as_string_view()).c_str(),
2700 grpc_core::StatusToString(error).c_str());
2701 }
2702 if (!error.ok() || !t->closed_with_error.ok()) {
2703 GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2704 return;
2705 }
2706 if (!t->bdp_ping_started) {
2707 // start_bdp_ping_locked has not been run yet. Schedule
2708 // finish_bdp_ping_locked to be run later.
2709 t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked,
2710 finish_bdp_ping_locked, t, nullptr),
2711 error);
2712 return;
2713 }
2714 t->bdp_ping_started = false;
2715 grpc_core::Timestamp next_ping =
2716 t->flow_control.bdp_estimator()->CompletePing();
2717 grpc_chttp2_act_on_flowctl_action(t->flow_control.PeriodicUpdate(), t,
2718 nullptr);
2719 GPR_ASSERT(!t->next_bdp_ping_timer_handle.has_value());
2720 t->next_bdp_ping_timer_handle =
2721 t->event_engine->RunAfter(next_ping - grpc_core::Timestamp::Now(), [t] {
2722 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2723 grpc_core::ExecCtx exec_ctx;
2724 next_bdp_ping_timer_expired(t);
2725 });
2726 }
2727
next_bdp_ping_timer_expired(grpc_chttp2_transport * t)2728 static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t) {
2729 t->combiner->Run(
2730 GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
2731 next_bdp_ping_timer_expired_locked, t, nullptr),
2732 absl::OkStatus());
2733 }
2734
next_bdp_ping_timer_expired_locked(void * tp,GRPC_UNUSED grpc_error_handle error)2735 static void next_bdp_ping_timer_expired_locked(
2736 void* tp, GRPC_UNUSED grpc_error_handle error) {
2737 GPR_DEBUG_ASSERT(error.ok());
2738 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2739 GPR_ASSERT(t->next_bdp_ping_timer_handle.has_value());
2740 t->next_bdp_ping_timer_handle.reset();
2741 if (t->flow_control.bdp_estimator()->accumulator() == 0) {
2742 // Block the bdp ping till we receive more data.
2743 t->bdp_ping_blocked = true;
2744 GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2745 } else {
2746 schedule_bdp_ping_locked(t);
2747 }
2748 }
2749
grpc_chttp2_config_default_keepalive_args(grpc_channel_args * args,bool is_client)2750 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
2751 bool is_client) {
2752 size_t i;
2753 if (args) {
2754 for (i = 0; i < args->num_args; i++) {
2755 if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
2756 const int value = grpc_channel_arg_get_integer(
2757 &args->args[i], {is_client ? g_default_client_keepalive_time_ms
2758 : g_default_server_keepalive_time_ms,
2759 1, INT_MAX});
2760 if (is_client) {
2761 g_default_client_keepalive_time_ms = value;
2762 } else {
2763 g_default_server_keepalive_time_ms = value;
2764 }
2765 } else if (0 ==
2766 strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
2767 const int value = grpc_channel_arg_get_integer(
2768 &args->args[i], {is_client ? g_default_client_keepalive_timeout_ms
2769 : g_default_server_keepalive_timeout_ms,
2770 0, INT_MAX});
2771 if (is_client) {
2772 g_default_client_keepalive_timeout_ms = value;
2773 } else {
2774 g_default_server_keepalive_timeout_ms = value;
2775 }
2776 } else if (0 == strcmp(args->args[i].key,
2777 GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
2778 const bool value = static_cast<uint32_t>(grpc_channel_arg_get_integer(
2779 &args->args[i],
2780 {is_client ? g_default_client_keepalive_permit_without_calls
2781 : g_default_server_keepalive_timeout_ms,
2782 0, 1}));
2783 if (is_client) {
2784 g_default_client_keepalive_permit_without_calls = value;
2785 } else {
2786 g_default_server_keepalive_permit_without_calls = value;
2787 }
2788 } else if (0 ==
2789 strcmp(args->args[i].key, GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
2790 g_default_max_ping_strikes = grpc_channel_arg_get_integer(
2791 &args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
2792 } else if (0 == strcmp(args->args[i].key,
2793 GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
2794 g_default_max_pings_without_data = grpc_channel_arg_get_integer(
2795 &args->args[i], {g_default_max_pings_without_data, 0, INT_MAX});
2796 } else if (0 ==
2797 strcmp(
2798 args->args[i].key,
2799 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
2800 g_default_min_recv_ping_interval_without_data_ms =
2801 grpc_channel_arg_get_integer(
2802 &args->args[i],
2803 {g_default_min_recv_ping_interval_without_data_ms, 0, INT_MAX});
2804 }
2805 }
2806 }
2807 }
2808
init_keepalive_ping(grpc_chttp2_transport * t)2809 static void init_keepalive_ping(grpc_chttp2_transport* t) {
2810 t->combiner->Run(GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked,
2811 init_keepalive_ping_locked, t, nullptr),
2812 absl::OkStatus());
2813 }
2814
init_keepalive_ping_locked(void * arg,GRPC_UNUSED grpc_error_handle error)2815 static void init_keepalive_ping_locked(void* arg,
2816 GRPC_UNUSED grpc_error_handle error) {
2817 GPR_DEBUG_ASSERT(error.ok());
2818 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2819 GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
2820 GPR_ASSERT(t->keepalive_ping_timer_handle.has_value());
2821 t->keepalive_ping_timer_handle.reset();
2822 if (t->destroying || !t->closed_with_error.ok()) {
2823 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2824 } else {
2825 if (t->keepalive_permit_without_calls ||
2826 grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
2827 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
2828 GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
2829 send_keepalive_ping_locked(t);
2830 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
2831 } else {
2832 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2833 t->keepalive_ping_timer_handle =
2834 t->event_engine->RunAfter(t->keepalive_time, [t] {
2835 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2836 grpc_core::ExecCtx exec_ctx;
2837 init_keepalive_ping(t);
2838 });
2839 }
2840 }
2841 GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
2842 }
2843
start_keepalive_ping(void * arg,grpc_error_handle error)2844 static void start_keepalive_ping(void* arg, grpc_error_handle error) {
2845 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2846 t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
2847 start_keepalive_ping_locked, t, nullptr),
2848 error);
2849 }
2850
start_keepalive_ping_locked(void * arg,grpc_error_handle error)2851 static void start_keepalive_ping_locked(void* arg, grpc_error_handle error) {
2852 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2853 if (!error.ok()) {
2854 return;
2855 }
2856 if (t->channelz_socket != nullptr) {
2857 t->channelz_socket->RecordKeepaliveSent();
2858 }
2859 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2860 GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2861 gpr_log(GPR_INFO, "%s: Start keepalive ping",
2862 std::string(t->peer_string.as_string_view()).c_str());
2863 }
2864 GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
2865 t->keepalive_watchdog_timer_handle =
2866 t->event_engine->RunAfter(t->keepalive_timeout, [t] {
2867 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2868 grpc_core::ExecCtx exec_ctx;
2869 keepalive_watchdog_fired(t);
2870 });
2871 t->keepalive_ping_started = true;
2872 }
2873
finish_keepalive_ping(void * arg,grpc_error_handle error)2874 static void finish_keepalive_ping(void* arg, grpc_error_handle error) {
2875 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2876 t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
2877 finish_keepalive_ping_locked, t, nullptr),
2878 error);
2879 }
2880
finish_keepalive_ping_locked(void * arg,grpc_error_handle error)2881 static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error) {
2882 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2883 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2884 if (error.ok()) {
2885 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2886 GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2887 gpr_log(GPR_INFO, "%s: Finish keepalive ping",
2888 std::string(t->peer_string.as_string_view()).c_str());
2889 }
2890 if (!t->keepalive_ping_started) {
2891 // start_keepalive_ping_locked has not run yet. Reschedule
2892 // finish_keepalive_ping_locked for it to be run later.
2893 t->combiner->Run(
2894 GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
2895 finish_keepalive_ping_locked, t, nullptr),
2896 error);
2897 return;
2898 }
2899 t->keepalive_ping_started = false;
2900 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
2901 if (t->keepalive_watchdog_timer_handle.has_value()) {
2902 if (t->event_engine->Cancel(*t->keepalive_watchdog_timer_handle)) {
2903 GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
2904 t->keepalive_watchdog_timer_handle.reset();
2905 }
2906 }
2907 GPR_ASSERT(!t->keepalive_ping_timer_handle.has_value());
2908 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2909 t->keepalive_ping_timer_handle =
2910 t->event_engine->RunAfter(t->keepalive_time, [t] {
2911 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2912 grpc_core::ExecCtx exec_ctx;
2913 init_keepalive_ping(t);
2914 });
2915 }
2916 }
2917 GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end");
2918 }
2919
keepalive_watchdog_fired(grpc_chttp2_transport * t)2920 static void keepalive_watchdog_fired(grpc_chttp2_transport* t) {
2921 t->combiner->Run(
2922 GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
2923 keepalive_watchdog_fired_locked, t, nullptr),
2924 absl::OkStatus());
2925 }
2926
keepalive_watchdog_fired_locked(void * arg,GRPC_UNUSED grpc_error_handle error)2927 static void keepalive_watchdog_fired_locked(
2928 void* arg, GRPC_UNUSED grpc_error_handle error) {
2929 GPR_DEBUG_ASSERT(error.ok());
2930 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2931 GPR_ASSERT(t->keepalive_watchdog_timer_handle.has_value());
2932 t->keepalive_watchdog_timer_handle.reset();
2933 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2934 gpr_log(GPR_INFO, "%s: Keepalive watchdog fired. Closing transport.",
2935 std::string(t->peer_string.as_string_view()).c_str());
2936 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2937 close_transport_locked(
2938 t, grpc_error_set_int(GRPC_ERROR_CREATE("keepalive watchdog timeout"),
2939 grpc_core::StatusIntProperty::kRpcStatus,
2940 GRPC_STATUS_UNAVAILABLE));
2941 } else {
2942 // If keepalive_state is not PINGING, we consider it as an error. Maybe the
2943 // cancellation failed in finish_keepalive_ping_locked. Users have seen
2944 // other states: https://github.com/grpc/grpc/issues/32085.
2945 gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
2946 t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
2947 }
2948 GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
2949 }
2950
maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport * t)2951 static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t) {
2952 if (t->keepalive_ping_timer_handle.has_value()) {
2953 if (t->event_engine->Cancel(*t->keepalive_ping_timer_handle)) {
2954 // Cancel succeeds, resets the keepalive ping timer. Note that we don't
2955 // need to Ref or Unref here since we still hold the Ref.
2956 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2957 GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2958 gpr_log(GPR_INFO, "%s: Keepalive ping cancelled. Resetting timer.",
2959 std::string(t->peer_string.as_string_view()).c_str());
2960 }
2961 t->keepalive_ping_timer_handle =
2962 t->event_engine->RunAfter(t->keepalive_time, [t] {
2963 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2964 grpc_core::ExecCtx exec_ctx;
2965 init_keepalive_ping(t);
2966 });
2967 }
2968 }
2969 }
2970
2971 //
2972 // CALLBACK LOOP
2973 //
2974
connectivity_state_set(grpc_chttp2_transport * t,grpc_connectivity_state state,const absl::Status & status,const char * reason)2975 static void connectivity_state_set(grpc_chttp2_transport* t,
2976 grpc_connectivity_state state,
2977 const absl::Status& status,
2978 const char* reason) {
2979 GRPC_CHTTP2_IF_TRACING(
2980 gpr_log(GPR_INFO, "transport %p set connectivity_state=%d", t, state));
2981 t->state_tracker.SetState(state, status, reason);
2982 }
2983
2984 //
2985 // POLLSET STUFF
2986 //
2987
set_pollset(grpc_transport * gt,grpc_stream *,grpc_pollset * pollset)2988 static void set_pollset(grpc_transport* gt, grpc_stream* /*gs*/,
2989 grpc_pollset* pollset) {
2990 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2991 grpc_endpoint_add_to_pollset(t->ep, pollset);
2992 }
2993
set_pollset_set(grpc_transport * gt,grpc_stream *,grpc_pollset_set * pollset_set)2994 static void set_pollset_set(grpc_transport* gt, grpc_stream* /*gs*/,
2995 grpc_pollset_set* pollset_set) {
2996 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2997 grpc_endpoint_add_to_pollset_set(t->ep, pollset_set);
2998 }
2999
3000 //
3001 // RESOURCE QUOTAS
3002 //
3003
post_benign_reclaimer(grpc_chttp2_transport * t)3004 static void post_benign_reclaimer(grpc_chttp2_transport* t) {
3005 if (!t->benign_reclaimer_registered) {
3006 t->benign_reclaimer_registered = true;
3007 GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
3008 t->memory_owner.PostReclaimer(
3009 grpc_core::ReclamationPass::kBenign,
3010 [t](absl::optional<grpc_core::ReclamationSweep> sweep) {
3011 if (sweep.has_value()) {
3012 GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked,
3013 benign_reclaimer_locked, t,
3014 grpc_schedule_on_exec_ctx);
3015 t->active_reclamation = std::move(*sweep);
3016 t->combiner->Run(&t->benign_reclaimer_locked, absl::OkStatus());
3017 } else {
3018 GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer");
3019 }
3020 });
3021 }
3022 }
3023
post_destructive_reclaimer(grpc_chttp2_transport * t)3024 static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
3025 if (!t->destructive_reclaimer_registered) {
3026 t->destructive_reclaimer_registered = true;
3027 GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
3028 t->memory_owner.PostReclaimer(
3029 grpc_core::ReclamationPass::kDestructive,
3030 [t](absl::optional<grpc_core::ReclamationSweep> sweep) {
3031 if (sweep.has_value()) {
3032 GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked,
3033 destructive_reclaimer_locked, t,
3034 grpc_schedule_on_exec_ctx);
3035 t->active_reclamation = std::move(*sweep);
3036 t->combiner->Run(&t->destructive_reclaimer_locked,
3037 absl::OkStatus());
3038 } else {
3039 GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer");
3040 }
3041 });
3042 }
3043 }
3044
benign_reclaimer_locked(void * arg,grpc_error_handle error)3045 static void benign_reclaimer_locked(void* arg, grpc_error_handle error) {
3046 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3047 if (error.ok() && grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
3048 // Channel with no active streams: send a goaway to try and make it
3049 // disconnect cleanly
3050 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3051 gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory",
3052 std::string(t->peer_string.as_string_view()).c_str());
3053 }
3054 send_goaway(t,
3055 grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
3056 grpc_core::StatusIntProperty::kHttp2Error,
3057 GRPC_HTTP2_ENHANCE_YOUR_CALM),
3058 /*immediate_disconnect_hint=*/true);
3059 } else if (error.ok() && GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3060 gpr_log(GPR_INFO,
3061 "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
3062 " streams",
3063 std::string(t->peer_string.as_string_view()).c_str(),
3064 grpc_chttp2_stream_map_size(&t->stream_map));
3065 }
3066 t->benign_reclaimer_registered = false;
3067 if (error != absl::CancelledError()) {
3068 t->active_reclamation.Finish();
3069 }
3070 GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer");
3071 }
3072
destructive_reclaimer_locked(void * arg,grpc_error_handle error)3073 static void destructive_reclaimer_locked(void* arg, grpc_error_handle error) {
3074 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3075 size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
3076 t->destructive_reclaimer_registered = false;
3077 if (error.ok() && n > 0) {
3078 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
3079 grpc_chttp2_stream_map_rand(&t->stream_map));
3080 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3081 gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d",
3082 std::string(t->peer_string.as_string_view()).c_str(), s->id);
3083 }
3084 grpc_chttp2_cancel_stream(
3085 t, s,
3086 grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
3087 grpc_core::StatusIntProperty::kHttp2Error,
3088 GRPC_HTTP2_ENHANCE_YOUR_CALM));
3089 if (n > 1) {
3090 // Since we cancel one stream per destructive reclamation, if
3091 // there are more streams left, we can immediately post a new
3092 // reclaimer in case the resource quota needs to free more
3093 // memory
3094 post_destructive_reclaimer(t);
3095 }
3096 }
3097 if (error != absl::CancelledError()) {
3098 t->active_reclamation.Finish();
3099 }
3100 GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer");
3101 }
3102
3103 //
3104 // MONITORING
3105 //
3106
grpc_chttp2_initiate_write_reason_string(grpc_chttp2_initiate_write_reason reason)3107 const char* grpc_chttp2_initiate_write_reason_string(
3108 grpc_chttp2_initiate_write_reason reason) {
3109 switch (reason) {
3110 case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
3111 return "INITIAL_WRITE";
3112 case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
3113 return "START_NEW_STREAM";
3114 case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
3115 return "SEND_MESSAGE";
3116 case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
3117 return "SEND_INITIAL_METADATA";
3118 case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
3119 return "SEND_TRAILING_METADATA";
3120 case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
3121 return "RETRY_SEND_PING";
3122 case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
3123 return "CONTINUE_PINGS";
3124 case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
3125 return "GOAWAY_SENT";
3126 case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
3127 return "RST_STREAM";
3128 case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
3129 return "CLOSE_FROM_API";
3130 case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
3131 return "STREAM_FLOW_CONTROL";
3132 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
3133 return "TRANSPORT_FLOW_CONTROL";
3134 case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
3135 return "SEND_SETTINGS";
3136 case GRPC_CHTTP2_INITIATE_WRITE_SETTINGS_ACK:
3137 return "SETTINGS_ACK";
3138 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
3139 return "FLOW_CONTROL_UNSTALLED_BY_SETTING";
3140 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
3141 return "FLOW_CONTROL_UNSTALLED_BY_UPDATE";
3142 case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
3143 return "APPLICATION_PING";
3144 case GRPC_CHTTP2_INITIATE_WRITE_BDP_PING:
3145 return "BDP_PING";
3146 case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
3147 return "KEEPALIVE_PING";
3148 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
3149 return "TRANSPORT_FLOW_CONTROL_UNSTALLED";
3150 case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
3151 return "PING_RESPONSE";
3152 case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
3153 return "FORCE_RST_STREAM";
3154 }
3155 GPR_UNREACHABLE_CODE(return "unknown");
3156 }
3157
chttp2_get_endpoint(grpc_transport * t)3158 static grpc_endpoint* chttp2_get_endpoint(grpc_transport* t) {
3159 return (reinterpret_cast<grpc_chttp2_transport*>(t))->ep;
3160 }
3161
3162 static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
3163 false,
3164 "chttp2",
3165 init_stream,
3166 nullptr,
3167 set_pollset,
3168 set_pollset_set,
3169 perform_stream_op,
3170 perform_transport_op,
3171 destroy_stream,
3172 destroy_transport,
3173 chttp2_get_endpoint};
3174
get_vtable(void)3175 static const grpc_transport_vtable* get_vtable(void) { return &vtable; }
3176
3177 grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>
grpc_chttp2_transport_get_socket_node(grpc_transport * transport)3178 grpc_chttp2_transport_get_socket_node(grpc_transport* transport) {
3179 grpc_chttp2_transport* t =
3180 reinterpret_cast<grpc_chttp2_transport*>(transport);
3181 return t->channelz_socket;
3182 }
3183
grpc_create_chttp2_transport(const grpc_core::ChannelArgs & channel_args,grpc_endpoint * ep,bool is_client)3184 grpc_transport* grpc_create_chttp2_transport(
3185 const grpc_core::ChannelArgs& channel_args, grpc_endpoint* ep,
3186 bool is_client) {
3187 auto t = new grpc_chttp2_transport(channel_args, ep, is_client);
3188 return &t->base;
3189 }
3190
grpc_chttp2_transport_start_reading(grpc_transport * transport,grpc_slice_buffer * read_buffer,grpc_closure * notify_on_receive_settings,grpc_closure * notify_on_close)3191 void grpc_chttp2_transport_start_reading(
3192 grpc_transport* transport, grpc_slice_buffer* read_buffer,
3193 grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close) {
3194 grpc_chttp2_transport* t =
3195 reinterpret_cast<grpc_chttp2_transport*>(transport);
3196 GRPC_CHTTP2_REF_TRANSPORT(
3197 t, "reading_action"); // matches unref inside reading_action
3198 if (read_buffer != nullptr) {
3199 grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
3200 gpr_free(read_buffer);
3201 }
3202 t->combiner->Run(
3203 grpc_core::NewClosure([t, notify_on_receive_settings,
3204 notify_on_close](grpc_error_handle) {
3205 if (!t->closed_with_error.ok()) {
3206 if (notify_on_receive_settings != nullptr) {
3207 grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify_on_receive_settings,
3208 t->closed_with_error);
3209 }
3210 if (notify_on_close != nullptr) {
3211 grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify_on_close,
3212 t->closed_with_error);
3213 }
3214 GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action");
3215 return;
3216 }
3217 t->notify_on_receive_settings = notify_on_receive_settings;
3218 t->notify_on_close = notify_on_close;
3219 read_action_locked(t, absl::OkStatus());
3220 }),
3221 absl::OkStatus());
3222 }
3223