1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include <inttypes.h>
22 #include <string.h>
23
24 #include <initializer_list>
25 #include <string>
26
27 #include "absl/base/attributes.h"
28 #include "absl/status/status.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
31 #include "absl/strings/string_view.h"
32
33 #include <grpc/slice.h>
34 #include <grpc/support/log.h>
35
36 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
37 #include "src/core/ext/transport/chttp2/transport/frame.h"
38 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
39 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
40 #include "src/core/ext/transport/chttp2/transport/frame_ping.h"
41 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
42 #include "src/core/ext/transport/chttp2/transport/frame_settings.h"
43 #include "src/core/ext/transport/chttp2/transport/frame_window_update.h"
44 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
45 #include "src/core/ext/transport/chttp2/transport/hpack_parser_table.h"
46 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
47 #include "src/core/ext/transport/chttp2/transport/http_trace.h"
48 #include "src/core/ext/transport/chttp2/transport/internal.h"
49 #include "src/core/ext/transport/chttp2/transport/stream_map.h"
50 #include "src/core/lib/channel/channelz.h"
51 #include "src/core/lib/debug/trace.h"
52 #include "src/core/lib/gprpp/ref_counted_ptr.h"
53 #include "src/core/lib/gprpp/status_helper.h"
54 #include "src/core/lib/gprpp/time.h"
55 #include "src/core/lib/iomgr/closure.h"
56 #include "src/core/lib/iomgr/combiner.h"
57 #include "src/core/lib/iomgr/error.h"
58 #include "src/core/lib/slice/slice.h"
59 #include "src/core/lib/transport/bdp_estimator.h"
60 #include "src/core/lib/transport/error_utils.h"
61 #include "src/core/lib/transport/http2_errors.h"
62 #include "src/core/lib/transport/metadata_batch.h"
63 #include "src/core/lib/transport/transport.h"
64
65 using grpc_core::HPackParser;
66
67 static grpc_error_handle init_frame_parser(grpc_chttp2_transport* t);
68 static grpc_error_handle init_header_frame_parser(grpc_chttp2_transport* t,
69 int is_continuation);
70 static grpc_error_handle init_data_frame_parser(grpc_chttp2_transport* t);
71 static grpc_error_handle init_rst_stream_parser(grpc_chttp2_transport* t);
72 static grpc_error_handle init_settings_frame_parser(grpc_chttp2_transport* t);
73 static grpc_error_handle init_window_update_frame_parser(
74 grpc_chttp2_transport* t);
75 static grpc_error_handle init_ping_parser(grpc_chttp2_transport* t);
76 static grpc_error_handle init_goaway_parser(grpc_chttp2_transport* t);
77 static grpc_error_handle init_non_header_skip_frame_parser(
78 grpc_chttp2_transport* t);
79
80 static grpc_error_handle parse_frame_slice(grpc_chttp2_transport* t,
81 const grpc_slice& slice,
82 int is_last);
83
get_utf8_safe_char(char c)84 static char get_utf8_safe_char(char c) {
85 return static_cast<unsigned char>(c) < 128 ? c : 32;
86 }
87
grpc_chttp2_min_read_progress_size(grpc_chttp2_transport * t)88 uint32_t grpc_chttp2_min_read_progress_size(grpc_chttp2_transport* t) {
89 switch (t->deframe_state) {
90 case GRPC_DTS_CLIENT_PREFIX_0:
91 case GRPC_DTS_CLIENT_PREFIX_1:
92 case GRPC_DTS_CLIENT_PREFIX_2:
93 case GRPC_DTS_CLIENT_PREFIX_3:
94 case GRPC_DTS_CLIENT_PREFIX_4:
95 case GRPC_DTS_CLIENT_PREFIX_5:
96 case GRPC_DTS_CLIENT_PREFIX_6:
97 case GRPC_DTS_CLIENT_PREFIX_7:
98 case GRPC_DTS_CLIENT_PREFIX_8:
99 case GRPC_DTS_CLIENT_PREFIX_9:
100 case GRPC_DTS_CLIENT_PREFIX_10:
101 case GRPC_DTS_CLIENT_PREFIX_11:
102 case GRPC_DTS_CLIENT_PREFIX_12:
103 case GRPC_DTS_CLIENT_PREFIX_13:
104 case GRPC_DTS_CLIENT_PREFIX_14:
105 case GRPC_DTS_CLIENT_PREFIX_15:
106 case GRPC_DTS_CLIENT_PREFIX_16:
107 case GRPC_DTS_CLIENT_PREFIX_17:
108 case GRPC_DTS_CLIENT_PREFIX_18:
109 case GRPC_DTS_CLIENT_PREFIX_19:
110 case GRPC_DTS_CLIENT_PREFIX_20:
111 case GRPC_DTS_CLIENT_PREFIX_21:
112 case GRPC_DTS_CLIENT_PREFIX_22:
113 case GRPC_DTS_CLIENT_PREFIX_23:
114 // Need the client prefix *and* the first fixed header to make progress.
115 return 9 + 24 - (t->deframe_state - GRPC_DTS_CLIENT_PREFIX_0);
116 case GRPC_DTS_FH_0:
117 case GRPC_DTS_FH_1:
118 case GRPC_DTS_FH_2:
119 case GRPC_DTS_FH_3:
120 case GRPC_DTS_FH_4:
121 case GRPC_DTS_FH_5:
122 case GRPC_DTS_FH_6:
123 case GRPC_DTS_FH_7:
124 case GRPC_DTS_FH_8:
125 return 9 - (t->deframe_state - GRPC_DTS_FH_0);
126 case GRPC_DTS_FRAME:
127 return t->incoming_frame_size;
128 }
129 GPR_UNREACHABLE_CODE(return 1);
130 }
131
132 namespace {
133 struct KnownFlag {
134 uint8_t flag;
135 absl::string_view name;
136 };
137
MakeFrameTypeString(absl::string_view frame_type,uint8_t flags,std::initializer_list<KnownFlag> known_flags)138 std::string MakeFrameTypeString(absl::string_view frame_type, uint8_t flags,
139 std::initializer_list<KnownFlag> known_flags) {
140 std::string result(frame_type);
141 for (const KnownFlag& known_flag : known_flags) {
142 if (flags & known_flag.flag) {
143 absl::StrAppend(&result, ":", known_flag.name);
144 flags &= ~known_flag.flag;
145 }
146 }
147 if (flags != 0) {
148 absl::StrAppend(&result, ":UNKNOWN_FLAGS=0x",
149 absl::Hex(flags, absl::kZeroPad2));
150 }
151 return result;
152 }
153
FrameTypeString(uint8_t frame_type,uint8_t flags)154 std::string FrameTypeString(uint8_t frame_type, uint8_t flags) {
155 switch (frame_type) {
156 case GRPC_CHTTP2_FRAME_DATA:
157 return MakeFrameTypeString(
158 "DATA", flags, {{GRPC_CHTTP2_DATA_FLAG_END_STREAM, "END_STREAM"}});
159 case GRPC_CHTTP2_FRAME_HEADER:
160 return MakeFrameTypeString(
161 "HEADERS", flags,
162 {{GRPC_CHTTP2_DATA_FLAG_END_STREAM, "END_STREAM"},
163 {GRPC_CHTTP2_DATA_FLAG_END_HEADERS, "END_HEADERS"},
164 {GRPC_CHTTP2_FLAG_HAS_PRIORITY, "PRIORITY"}});
165 case GRPC_CHTTP2_FRAME_CONTINUATION:
166 return MakeFrameTypeString(
167 "HEADERS", flags,
168 {{GRPC_CHTTP2_DATA_FLAG_END_STREAM, "END_STREAM"},
169 {GRPC_CHTTP2_DATA_FLAG_END_HEADERS, "END_HEADERS"},
170 {GRPC_CHTTP2_FLAG_HAS_PRIORITY, "PRIORITY"}});
171 case GRPC_CHTTP2_FRAME_RST_STREAM:
172 return MakeFrameTypeString("RST_STREAM", flags, {});
173 case GRPC_CHTTP2_FRAME_SETTINGS:
174 return MakeFrameTypeString("SETTINGS", flags,
175 {{GRPC_CHTTP2_FLAG_ACK, "ACK"}});
176 case GRPC_CHTTP2_FRAME_PING:
177 return MakeFrameTypeString("PING", flags,
178 {{GRPC_CHTTP2_FLAG_ACK, "ACK"}});
179 case GRPC_CHTTP2_FRAME_GOAWAY:
180 return MakeFrameTypeString("GOAWAY", flags, {});
181 case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
182 return MakeFrameTypeString("WINDOW_UPDATE", flags, {});
183 default:
184 return MakeFrameTypeString(
185 absl::StrCat("UNKNOWN_FRAME_TYPE_", static_cast<int>(frame_type)),
186 flags, {});
187 }
188 }
189 } // namespace
190
grpc_chttp2_perform_read(grpc_chttp2_transport * t,const grpc_slice & slice)191 grpc_error_handle grpc_chttp2_perform_read(grpc_chttp2_transport* t,
192 const grpc_slice& slice) {
193 const uint8_t* beg = GRPC_SLICE_START_PTR(slice);
194 const uint8_t* end = GRPC_SLICE_END_PTR(slice);
195 const uint8_t* cur = beg;
196 grpc_error_handle err;
197
198 if (cur == end) return absl::OkStatus();
199
200 switch (t->deframe_state) {
201 case GRPC_DTS_CLIENT_PREFIX_0:
202 case GRPC_DTS_CLIENT_PREFIX_1:
203 case GRPC_DTS_CLIENT_PREFIX_2:
204 case GRPC_DTS_CLIENT_PREFIX_3:
205 case GRPC_DTS_CLIENT_PREFIX_4:
206 case GRPC_DTS_CLIENT_PREFIX_5:
207 case GRPC_DTS_CLIENT_PREFIX_6:
208 case GRPC_DTS_CLIENT_PREFIX_7:
209 case GRPC_DTS_CLIENT_PREFIX_8:
210 case GRPC_DTS_CLIENT_PREFIX_9:
211 case GRPC_DTS_CLIENT_PREFIX_10:
212 case GRPC_DTS_CLIENT_PREFIX_11:
213 case GRPC_DTS_CLIENT_PREFIX_12:
214 case GRPC_DTS_CLIENT_PREFIX_13:
215 case GRPC_DTS_CLIENT_PREFIX_14:
216 case GRPC_DTS_CLIENT_PREFIX_15:
217 case GRPC_DTS_CLIENT_PREFIX_16:
218 case GRPC_DTS_CLIENT_PREFIX_17:
219 case GRPC_DTS_CLIENT_PREFIX_18:
220 case GRPC_DTS_CLIENT_PREFIX_19:
221 case GRPC_DTS_CLIENT_PREFIX_20:
222 case GRPC_DTS_CLIENT_PREFIX_21:
223 case GRPC_DTS_CLIENT_PREFIX_22:
224 case GRPC_DTS_CLIENT_PREFIX_23:
225 while (cur != end && t->deframe_state != GRPC_DTS_FH_0) {
226 if (*cur != GRPC_CHTTP2_CLIENT_CONNECT_STRING[t->deframe_state]) {
227 return GRPC_ERROR_CREATE(absl::StrFormat(
228 "Connect string mismatch: expected '%c' (%d) got '%c' (%d) "
229 "at byte %d",
230 get_utf8_safe_char(
231 GRPC_CHTTP2_CLIENT_CONNECT_STRING[t->deframe_state]),
232 static_cast<int>(static_cast<uint8_t>(
233 GRPC_CHTTP2_CLIENT_CONNECT_STRING[t->deframe_state])),
234 get_utf8_safe_char(*cur), static_cast<int>(*cur),
235 t->deframe_state));
236 }
237 ++cur;
238 // NOLINTNEXTLINE(bugprone-misplaced-widening-cast)
239 t->deframe_state = static_cast<grpc_chttp2_deframe_transport_state>(
240 1 + static_cast<int>(t->deframe_state));
241 }
242 if (cur == end) {
243 return absl::OkStatus();
244 }
245 dts_fh_0:
246 ABSL_FALLTHROUGH_INTENDED;
247 case GRPC_DTS_FH_0:
248 GPR_DEBUG_ASSERT(cur < end);
249 t->incoming_frame_size = (static_cast<uint32_t>(*cur)) << 16;
250 if (++cur == end) {
251 t->deframe_state = GRPC_DTS_FH_1;
252 return absl::OkStatus();
253 }
254 ABSL_FALLTHROUGH_INTENDED;
255 case GRPC_DTS_FH_1:
256 GPR_DEBUG_ASSERT(cur < end);
257 t->incoming_frame_size |= (static_cast<uint32_t>(*cur)) << 8;
258 if (++cur == end) {
259 t->deframe_state = GRPC_DTS_FH_2;
260 return absl::OkStatus();
261 }
262 ABSL_FALLTHROUGH_INTENDED;
263 case GRPC_DTS_FH_2:
264 GPR_DEBUG_ASSERT(cur < end);
265 t->incoming_frame_size |= *cur;
266 if (++cur == end) {
267 t->deframe_state = GRPC_DTS_FH_3;
268 return absl::OkStatus();
269 }
270 ABSL_FALLTHROUGH_INTENDED;
271 case GRPC_DTS_FH_3:
272 GPR_DEBUG_ASSERT(cur < end);
273 t->incoming_frame_type = *cur;
274 if (++cur == end) {
275 t->deframe_state = GRPC_DTS_FH_4;
276 return absl::OkStatus();
277 }
278 ABSL_FALLTHROUGH_INTENDED;
279 case GRPC_DTS_FH_4:
280 GPR_DEBUG_ASSERT(cur < end);
281 t->incoming_frame_flags = *cur;
282 if (++cur == end) {
283 t->deframe_state = GRPC_DTS_FH_5;
284 return absl::OkStatus();
285 }
286 ABSL_FALLTHROUGH_INTENDED;
287 case GRPC_DTS_FH_5:
288 GPR_DEBUG_ASSERT(cur < end);
289 t->incoming_stream_id = ((static_cast<uint32_t>(*cur)) & 0x7f) << 24;
290 if (++cur == end) {
291 t->deframe_state = GRPC_DTS_FH_6;
292 return absl::OkStatus();
293 }
294 ABSL_FALLTHROUGH_INTENDED;
295 case GRPC_DTS_FH_6:
296 GPR_DEBUG_ASSERT(cur < end);
297 t->incoming_stream_id |= (static_cast<uint32_t>(*cur)) << 16;
298 if (++cur == end) {
299 t->deframe_state = GRPC_DTS_FH_7;
300 return absl::OkStatus();
301 }
302 ABSL_FALLTHROUGH_INTENDED;
303 case GRPC_DTS_FH_7:
304 GPR_DEBUG_ASSERT(cur < end);
305 t->incoming_stream_id |= (static_cast<uint32_t>(*cur)) << 8;
306 if (++cur == end) {
307 t->deframe_state = GRPC_DTS_FH_8;
308 return absl::OkStatus();
309 }
310 ABSL_FALLTHROUGH_INTENDED;
311 case GRPC_DTS_FH_8:
312 GPR_DEBUG_ASSERT(cur < end);
313 t->incoming_stream_id |= (static_cast<uint32_t>(*cur));
314 if (grpc_http_trace.enabled()) {
315 gpr_log(GPR_INFO, "INCOMING[%p]: %s len:%d id:0x%08x", t,
316 FrameTypeString(t->incoming_frame_type, t->incoming_frame_flags)
317 .c_str(),
318 t->incoming_frame_size, t->incoming_stream_id);
319 }
320 t->deframe_state = GRPC_DTS_FRAME;
321 err = init_frame_parser(t);
322 if (!err.ok()) {
323 return err;
324 }
325 if (t->incoming_frame_size == 0) {
326 err = parse_frame_slice(t, grpc_empty_slice(), 1);
327 if (!err.ok()) {
328 return err;
329 }
330 t->incoming_stream = nullptr;
331 if (++cur == end) {
332 t->deframe_state = GRPC_DTS_FH_0;
333 return absl::OkStatus();
334 }
335 goto dts_fh_0; // loop
336 } else if (t->incoming_frame_size >
337 t->settings[GRPC_ACKED_SETTINGS]
338 [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]) {
339 return GRPC_ERROR_CREATE(
340 absl::StrFormat("Frame size %d is larger than max frame size %d",
341 t->incoming_frame_size,
342 t->settings[GRPC_ACKED_SETTINGS]
343 [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]));
344 }
345 if (++cur == end) {
346 return absl::OkStatus();
347 }
348 ABSL_FALLTHROUGH_INTENDED;
349 case GRPC_DTS_FRAME:
350 GPR_DEBUG_ASSERT(cur < end);
351 if (static_cast<uint32_t>(end - cur) == t->incoming_frame_size) {
352 err = parse_frame_slice(
353 t,
354 grpc_slice_sub_no_ref(slice, static_cast<size_t>(cur - beg),
355 static_cast<size_t>(end - beg)),
356 1);
357 if (!err.ok()) {
358 return err;
359 }
360 t->deframe_state = GRPC_DTS_FH_0;
361 t->incoming_stream = nullptr;
362 return absl::OkStatus();
363 } else if (static_cast<uint32_t>(end - cur) > t->incoming_frame_size) {
364 size_t cur_offset = static_cast<size_t>(cur - beg);
365 err = parse_frame_slice(
366 t,
367 grpc_slice_sub_no_ref(slice, cur_offset,
368 cur_offset + t->incoming_frame_size),
369 1);
370 if (!err.ok()) {
371 return err;
372 }
373 cur += t->incoming_frame_size;
374 t->incoming_stream = nullptr;
375 goto dts_fh_0; // loop
376 } else {
377 err = parse_frame_slice(
378 t,
379 grpc_slice_sub_no_ref(slice, static_cast<size_t>(cur - beg),
380 static_cast<size_t>(end - beg)),
381 0);
382 if (!err.ok()) {
383 return err;
384 }
385 t->incoming_frame_size -= static_cast<uint32_t>(end - cur);
386 return absl::OkStatus();
387 }
388 GPR_UNREACHABLE_CODE(return absl::OkStatus());
389 }
390
391 GPR_UNREACHABLE_CODE(return absl::OkStatus());
392 }
393
init_frame_parser(grpc_chttp2_transport * t)394 static grpc_error_handle init_frame_parser(grpc_chttp2_transport* t) {
395 if (t->is_first_frame &&
396 t->incoming_frame_type != GRPC_CHTTP2_FRAME_SETTINGS) {
397 return GRPC_ERROR_CREATE(absl::StrCat(
398 "Expected SETTINGS frame as the first frame, got frame type ",
399 t->incoming_frame_type));
400 }
401 t->is_first_frame = false;
402 if (t->expect_continuation_stream_id != 0) {
403 if (t->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) {
404 return GRPC_ERROR_CREATE(
405 absl::StrFormat("Expected CONTINUATION frame, got frame type %02x",
406 t->incoming_frame_type));
407 }
408 if (t->expect_continuation_stream_id != t->incoming_stream_id) {
409 return GRPC_ERROR_CREATE(absl::StrFormat(
410 "Expected CONTINUATION frame for grpc_chttp2_stream %08x, got "
411 "grpc_chttp2_stream %08x",
412 t->expect_continuation_stream_id, t->incoming_stream_id));
413 }
414 return init_header_frame_parser(t, 1);
415 }
416 switch (t->incoming_frame_type) {
417 case GRPC_CHTTP2_FRAME_DATA:
418 return init_data_frame_parser(t);
419 case GRPC_CHTTP2_FRAME_HEADER:
420 return init_header_frame_parser(t, 0);
421 case GRPC_CHTTP2_FRAME_CONTINUATION:
422 return GRPC_ERROR_CREATE("Unexpected CONTINUATION frame");
423 case GRPC_CHTTP2_FRAME_RST_STREAM:
424 return init_rst_stream_parser(t);
425 case GRPC_CHTTP2_FRAME_SETTINGS:
426 return init_settings_frame_parser(t);
427 case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
428 return init_window_update_frame_parser(t);
429 case GRPC_CHTTP2_FRAME_PING:
430 return init_ping_parser(t);
431 case GRPC_CHTTP2_FRAME_GOAWAY:
432 return init_goaway_parser(t);
433 default:
434 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
435 gpr_log(GPR_ERROR, "Unknown frame type %02x", t->incoming_frame_type);
436 }
437 return init_non_header_skip_frame_parser(t);
438 }
439 }
440
skip_parser(void *,grpc_chttp2_transport *,grpc_chttp2_stream *,const grpc_slice &,int)441 static grpc_error_handle skip_parser(void* /*parser*/,
442 grpc_chttp2_transport* /*t*/,
443 grpc_chttp2_stream* /*s*/,
444 const grpc_slice& /*slice*/,
445 int /*is_last*/) {
446 return absl::OkStatus();
447 }
448
hpack_boundary_type(grpc_chttp2_transport * t,bool is_eoh)449 static HPackParser::Boundary hpack_boundary_type(grpc_chttp2_transport* t,
450 bool is_eoh) {
451 if (is_eoh) {
452 if (t->header_eof) {
453 return HPackParser::Boundary::EndOfStream;
454 } else {
455 return HPackParser::Boundary::EndOfHeaders;
456 }
457 } else {
458 return HPackParser::Boundary::None;
459 }
460 }
461
hpack_parser_log_info(grpc_chttp2_transport * t,HPackParser::LogInfo::Type type)462 static HPackParser::LogInfo hpack_parser_log_info(
463 grpc_chttp2_transport* t, HPackParser::LogInfo::Type type) {
464 return HPackParser::LogInfo{
465 t->incoming_stream_id,
466 type,
467 t->is_client,
468 };
469 }
470
init_header_skip_frame_parser(grpc_chttp2_transport * t,HPackParser::Priority priority_type,bool is_eoh)471 static grpc_error_handle init_header_skip_frame_parser(
472 grpc_chttp2_transport* t, HPackParser::Priority priority_type,
473 bool is_eoh) {
474 t->parser = grpc_chttp2_transport::Parser{
475 "header", grpc_chttp2_header_parser_parse, &t->hpack_parser};
476 t->hpack_parser.BeginFrame(
477 nullptr,
478 /*metadata_size_soft_limit=*/
479 t->max_header_list_size_soft_limit,
480 /*metadata_size_hard_limit=*/
481 t->settings[GRPC_ACKED_SETTINGS]
482 [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE],
483 hpack_boundary_type(t, is_eoh), priority_type,
484 hpack_parser_log_info(t, HPackParser::LogInfo::kDontKnow));
485 return absl::OkStatus();
486 }
487
init_non_header_skip_frame_parser(grpc_chttp2_transport * t)488 static grpc_error_handle init_non_header_skip_frame_parser(
489 grpc_chttp2_transport* t) {
490 t->parser =
491 grpc_chttp2_transport::Parser{"skip_parser", skip_parser, nullptr};
492 return absl::OkStatus();
493 }
494
grpc_chttp2_parsing_become_skip_parser(grpc_chttp2_transport * t)495 void grpc_chttp2_parsing_become_skip_parser(grpc_chttp2_transport* t) {
496 if (t->parser.parser == grpc_chttp2_header_parser_parse) {
497 t->hpack_parser.StopBufferingFrame();
498 } else {
499 t->parser =
500 grpc_chttp2_transport::Parser{"skip_parser", skip_parser, nullptr};
501 }
502 }
503
init_data_frame_parser(grpc_chttp2_transport * t)504 static grpc_error_handle init_data_frame_parser(grpc_chttp2_transport* t) {
505 // Update BDP accounting since we have received a data frame.
506 grpc_core::BdpEstimator* bdp_est = t->flow_control.bdp_estimator();
507 if (bdp_est) {
508 if (t->bdp_ping_blocked) {
509 t->bdp_ping_blocked = false;
510 GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
511 schedule_bdp_ping_locked(t);
512 }
513 bdp_est->AddIncomingBytes(t->incoming_frame_size);
514 }
515 grpc_chttp2_stream* s =
516 grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
517 absl::Status status;
518 grpc_core::chttp2::FlowControlAction action;
519 if (s == nullptr) {
520 grpc_core::chttp2::TransportFlowControl::IncomingUpdateContext upd(
521 &t->flow_control);
522 status = upd.RecvData(t->incoming_frame_size);
523 action = upd.MakeAction();
524 } else {
525 grpc_core::chttp2::StreamFlowControl::IncomingUpdateContext upd(
526 &s->flow_control);
527 status = upd.RecvData(t->incoming_frame_size);
528 action = upd.MakeAction();
529 }
530 grpc_chttp2_act_on_flowctl_action(action, t, s);
531 if (!status.ok()) {
532 goto error_handler;
533 }
534 if (s == nullptr) {
535 return init_non_header_skip_frame_parser(t);
536 }
537 s->received_bytes += t->incoming_frame_size;
538 s->stats.incoming.framing_bytes += 9;
539 if (s->read_closed) {
540 return init_non_header_skip_frame_parser(t);
541 }
542 status =
543 grpc_chttp2_data_parser_begin_frame(t->incoming_frame_flags, s->id, s);
544 error_handler:
545 if (status.ok()) {
546 t->incoming_stream = s;
547 t->parser = grpc_chttp2_transport::Parser{
548 "data", grpc_chttp2_data_parser_parse, nullptr};
549 t->ping_state.last_ping_sent_time = grpc_core::Timestamp::InfPast();
550 return absl::OkStatus();
551 } else if (s != nullptr) {
552 // handle stream errors by closing the stream
553 grpc_chttp2_mark_stream_closed(t, s, true, false,
554 absl_status_to_grpc_error(status));
555 grpc_chttp2_add_rst_stream_to_next_write(t, t->incoming_stream_id,
556 GRPC_HTTP2_PROTOCOL_ERROR,
557 &s->stats.outgoing);
558 return init_non_header_skip_frame_parser(t);
559 } else {
560 return absl_status_to_grpc_error(status);
561 }
562 }
563
init_header_frame_parser(grpc_chttp2_transport * t,int is_continuation)564 static grpc_error_handle init_header_frame_parser(grpc_chttp2_transport* t,
565 int is_continuation) {
566 const bool is_eoh =
567 (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0;
568 grpc_chttp2_stream* s;
569
570 // TODO(ctiller): when to increment header_frames_received?
571
572 if (is_eoh) {
573 t->expect_continuation_stream_id = 0;
574 } else {
575 t->expect_continuation_stream_id = t->incoming_stream_id;
576 }
577
578 if (!is_continuation) {
579 t->header_eof =
580 (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0;
581 }
582
583 const auto priority_type = !is_continuation && (t->incoming_frame_flags &
584 GRPC_CHTTP2_FLAG_HAS_PRIORITY)
585 ? HPackParser::Priority::Included
586 : HPackParser::Priority::None;
587
588 t->ping_state.last_ping_sent_time = grpc_core::Timestamp::InfPast();
589
590 // could be a new grpc_chttp2_stream or an existing grpc_chttp2_stream
591 s = grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
592 if (s == nullptr) {
593 if (GPR_UNLIKELY(is_continuation)) {
594 GRPC_CHTTP2_IF_TRACING(
595 gpr_log(GPR_ERROR,
596 "grpc_chttp2_stream disbanded before CONTINUATION received"));
597 return init_header_skip_frame_parser(t, priority_type, is_eoh);
598 }
599 if (t->is_client) {
600 if (GPR_LIKELY((t->incoming_stream_id & 1) &&
601 t->incoming_stream_id < t->next_stream_id)) {
602 // this is an old (probably cancelled) grpc_chttp2_stream
603 } else {
604 GRPC_CHTTP2_IF_TRACING(gpr_log(
605 GPR_ERROR, "ignoring new grpc_chttp2_stream creation on client"));
606 }
607 return init_header_skip_frame_parser(t, priority_type, is_eoh);
608 } else if (GPR_UNLIKELY(t->last_new_stream_id >= t->incoming_stream_id)) {
609 GRPC_CHTTP2_IF_TRACING(gpr_log(
610 GPR_ERROR,
611 "ignoring out of order new grpc_chttp2_stream request on server; "
612 "last grpc_chttp2_stream "
613 "id=%d, new grpc_chttp2_stream id=%d",
614 t->last_new_stream_id, t->incoming_stream_id));
615 return init_header_skip_frame_parser(t, priority_type, is_eoh);
616 } else if (GPR_UNLIKELY((t->incoming_stream_id & 1) == 0)) {
617 GRPC_CHTTP2_IF_TRACING(gpr_log(
618 GPR_ERROR,
619 "ignoring grpc_chttp2_stream with non-client generated index %d",
620 t->incoming_stream_id));
621 return init_header_skip_frame_parser(t, priority_type, is_eoh);
622 } else if (GPR_UNLIKELY(
623 grpc_chttp2_stream_map_size(&t->stream_map) >=
624 t->settings[GRPC_ACKED_SETTINGS]
625 [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS])) {
626 return GRPC_ERROR_CREATE("Max stream count exceeded");
627 } else if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SENT) {
628 GRPC_CHTTP2_IF_TRACING(gpr_log(
629 GPR_INFO,
630 "transport:%p SERVER peer:%s Final GOAWAY sent. Ignoring new "
631 "grpc_chttp2_stream request id=%d, last grpc_chttp2_stream id=%d",
632 t, std::string(t->peer_string.as_string_view()).c_str(),
633 t->incoming_stream_id, t->last_new_stream_id));
634 return init_header_skip_frame_parser(t, priority_type, is_eoh);
635 }
636 t->last_new_stream_id = t->incoming_stream_id;
637 s = t->incoming_stream =
638 grpc_chttp2_parsing_accept_stream(t, t->incoming_stream_id);
639 if (GPR_UNLIKELY(s == nullptr)) {
640 GRPC_CHTTP2_IF_TRACING(
641 gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted"));
642 return init_header_skip_frame_parser(t, priority_type, is_eoh);
643 }
644 if (t->channelz_socket != nullptr) {
645 t->channelz_socket->RecordStreamStartedFromRemote();
646 }
647 } else {
648 t->incoming_stream = s;
649 }
650 GPR_DEBUG_ASSERT(s != nullptr);
651 s->stats.incoming.framing_bytes += 9;
652 if (GPR_UNLIKELY(s->read_closed)) {
653 GRPC_CHTTP2_IF_TRACING(gpr_log(
654 GPR_ERROR, "skipping already closed grpc_chttp2_stream header"));
655 t->incoming_stream = nullptr;
656 return init_header_skip_frame_parser(t, priority_type, is_eoh);
657 }
658 t->parser = grpc_chttp2_transport::Parser{
659 "header", grpc_chttp2_header_parser_parse, &t->hpack_parser};
660 if (t->header_eof) {
661 s->eos_received = true;
662 }
663 grpc_metadata_batch* incoming_metadata_buffer = nullptr;
664 HPackParser::LogInfo::Type frame_type = HPackParser::LogInfo::kDontKnow;
665 switch (s->header_frames_received) {
666 case 0:
667 if (t->is_client && t->header_eof) {
668 GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "parsing Trailers-Only"));
669 if (s->trailing_metadata_available != nullptr) {
670 *s->trailing_metadata_available = true;
671 }
672 s->parsed_trailers_only = true;
673 s->trailing_metadata_buffer.Set(grpc_core::GrpcTrailersOnly(), true);
674 incoming_metadata_buffer = &s->trailing_metadata_buffer;
675 frame_type = HPackParser::LogInfo::kTrailers;
676 } else {
677 GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "parsing initial_metadata"));
678 incoming_metadata_buffer = &s->initial_metadata_buffer;
679 frame_type = HPackParser::LogInfo::kHeaders;
680 }
681 break;
682 case 1:
683 GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "parsing trailing_metadata"));
684 incoming_metadata_buffer = &s->trailing_metadata_buffer;
685 frame_type = HPackParser::LogInfo::kTrailers;
686 break;
687 case 2:
688 gpr_log(GPR_ERROR, "too many header frames received");
689 return init_header_skip_frame_parser(t, priority_type, is_eoh);
690 }
691 if (frame_type == HPackParser::LogInfo::kTrailers && !t->header_eof) {
692 return GRPC_ERROR_CREATE(
693 "Trailing metadata frame received without an end-o-stream");
694 }
695 t->hpack_parser.BeginFrame(
696 incoming_metadata_buffer,
697 /*metadata_size_soft_limit=*/
698 t->max_header_list_size_soft_limit,
699 /*metadata_size_hard_limit=*/
700 t->settings[GRPC_ACKED_SETTINGS]
701 [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE],
702 hpack_boundary_type(t, is_eoh), priority_type,
703 hpack_parser_log_info(t, frame_type));
704 return absl::OkStatus();
705 }
706
init_window_update_frame_parser(grpc_chttp2_transport * t)707 static grpc_error_handle init_window_update_frame_parser(
708 grpc_chttp2_transport* t) {
709 grpc_error_handle err = grpc_chttp2_window_update_parser_begin_frame(
710 &t->simple.window_update, t->incoming_frame_size,
711 t->incoming_frame_flags);
712 if (!err.ok()) return err;
713 if (t->incoming_stream_id != 0) {
714 grpc_chttp2_stream* s = t->incoming_stream =
715 grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
716 if (s == nullptr) {
717 if (grpc_http_trace.enabled()) {
718 gpr_log(GPR_ERROR, "Stream %d not found, ignoring WINDOW_UPDATE",
719 t->incoming_stream_id);
720 }
721 return init_non_header_skip_frame_parser(t);
722 }
723 s->stats.incoming.framing_bytes += 9;
724 }
725 t->parser = grpc_chttp2_transport::Parser{
726 "window_update", grpc_chttp2_window_update_parser_parse,
727 &t->simple.window_update};
728 return absl::OkStatus();
729 }
730
init_ping_parser(grpc_chttp2_transport * t)731 static grpc_error_handle init_ping_parser(grpc_chttp2_transport* t) {
732 grpc_error_handle err = grpc_chttp2_ping_parser_begin_frame(
733 &t->simple.ping, t->incoming_frame_size, t->incoming_frame_flags);
734 if (!err.ok()) return err;
735 t->parser = grpc_chttp2_transport::Parser{
736 "ping", grpc_chttp2_ping_parser_parse, &t->simple.ping};
737 return absl::OkStatus();
738 }
739
init_rst_stream_parser(grpc_chttp2_transport * t)740 static grpc_error_handle init_rst_stream_parser(grpc_chttp2_transport* t) {
741 grpc_error_handle err = grpc_chttp2_rst_stream_parser_begin_frame(
742 &t->simple.rst_stream, t->incoming_frame_size, t->incoming_frame_flags);
743 if (!err.ok()) return err;
744 grpc_chttp2_stream* s = t->incoming_stream =
745 grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
746 if (!t->incoming_stream) {
747 return init_non_header_skip_frame_parser(t);
748 }
749 s->stats.incoming.framing_bytes += 9;
750 t->parser = grpc_chttp2_transport::Parser{
751 "rst_stream", grpc_chttp2_rst_stream_parser_parse, &t->simple.rst_stream};
752 return absl::OkStatus();
753 }
754
init_goaway_parser(grpc_chttp2_transport * t)755 static grpc_error_handle init_goaway_parser(grpc_chttp2_transport* t) {
756 grpc_error_handle err = grpc_chttp2_goaway_parser_begin_frame(
757 &t->goaway_parser, t->incoming_frame_size, t->incoming_frame_flags);
758 if (!err.ok()) return err;
759 t->parser = grpc_chttp2_transport::Parser{
760 "goaway", grpc_chttp2_goaway_parser_parse, &t->goaway_parser};
761 return absl::OkStatus();
762 }
763
init_settings_frame_parser(grpc_chttp2_transport * t)764 static grpc_error_handle init_settings_frame_parser(grpc_chttp2_transport* t) {
765 if (t->incoming_stream_id != 0) {
766 return GRPC_ERROR_CREATE("Settings frame received for grpc_chttp2_stream");
767 }
768
769 grpc_error_handle err = grpc_chttp2_settings_parser_begin_frame(
770 &t->simple.settings, t->incoming_frame_size, t->incoming_frame_flags,
771 t->settings[GRPC_PEER_SETTINGS]);
772 if (!err.ok()) {
773 return err;
774 }
775 if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) {
776 memcpy(t->settings[GRPC_ACKED_SETTINGS], t->settings[GRPC_SENT_SETTINGS],
777 GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t));
778 t->hpack_parser.hpack_table()->SetMaxBytes(
779 t->settings[GRPC_ACKED_SETTINGS]
780 [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
781 grpc_chttp2_act_on_flowctl_action(
782 t->flow_control.SetAckedInitialWindow(
783 t->settings[GRPC_ACKED_SETTINGS]
784 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]),
785 t, nullptr);
786 t->sent_local_settings = false;
787 }
788 t->parser = grpc_chttp2_transport::Parser{
789 "settings", grpc_chttp2_settings_parser_parse, &t->simple.settings};
790 return absl::OkStatus();
791 }
792
parse_frame_slice(grpc_chttp2_transport * t,const grpc_slice & slice,int is_last)793 static grpc_error_handle parse_frame_slice(grpc_chttp2_transport* t,
794 const grpc_slice& slice,
795 int is_last) {
796 grpc_chttp2_stream* s = t->incoming_stream;
797 if (grpc_http_trace.enabled()) {
798 gpr_log(GPR_DEBUG,
799 "INCOMING[%p;%p]: Parse %" PRIdPTR "b %sframe fragment with %s", t,
800 s, GRPC_SLICE_LENGTH(slice), is_last ? "last " : "",
801 t->parser.name);
802 }
803 grpc_error_handle err =
804 t->parser.parser(t->parser.user_data, t, s, slice, is_last);
805 intptr_t unused;
806 if (GPR_LIKELY(err.ok())) {
807 return err;
808 }
809 if (grpc_http_trace.enabled()) {
810 gpr_log(GPR_ERROR, "INCOMING[%p;%p]: Parse failed with %s", t, s,
811 err.ToString().c_str());
812 }
813 if (grpc_error_get_int(err, grpc_core::StatusIntProperty::kStreamId,
814 &unused)) {
815 grpc_chttp2_parsing_become_skip_parser(t);
816 if (s) {
817 grpc_chttp2_cancel_stream(t, s, err);
818 }
819 return absl::OkStatus();
820 }
821 return err;
822 }
823
824 typedef void (*maybe_complete_func_type)(grpc_chttp2_transport* t,
825 grpc_chttp2_stream* s);
826 static const maybe_complete_func_type maybe_complete_funcs[] = {
827 grpc_chttp2_maybe_complete_recv_initial_metadata,
828 grpc_chttp2_maybe_complete_recv_trailing_metadata};
829
force_client_rst_stream(void * sp,grpc_error_handle)830 static void force_client_rst_stream(void* sp, grpc_error_handle /*error*/) {
831 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
832 grpc_chttp2_transport* t = s->t;
833 if (!s->write_closed) {
834 grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
835 &s->stats.outgoing);
836 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM);
837 grpc_chttp2_mark_stream_closed(t, s, true, true, absl::OkStatus());
838 }
839 GRPC_CHTTP2_STREAM_UNREF(s, "final_rst");
840 }
841
grpc_chttp2_header_parser_parse(void * hpack_parser,grpc_chttp2_transport * t,grpc_chttp2_stream * s,const grpc_slice & slice,int is_last)842 grpc_error_handle grpc_chttp2_header_parser_parse(void* hpack_parser,
843 grpc_chttp2_transport* t,
844 grpc_chttp2_stream* s,
845 const grpc_slice& slice,
846 int is_last) {
847 auto* parser = static_cast<grpc_core::HPackParser*>(hpack_parser);
848 if (s != nullptr) {
849 s->stats.incoming.header_bytes += GRPC_SLICE_LENGTH(slice);
850 }
851 grpc_error_handle error = parser->Parse(slice, is_last != 0);
852 if (!error.ok()) {
853 return error;
854 }
855 if (is_last) {
856 // need to check for null stream: this can occur if we receive an invalid
857 // stream id on a header
858 if (s != nullptr) {
859 if (parser->is_boundary()) {
860 if (s->header_frames_received == 2) {
861 return GRPC_ERROR_CREATE("Too many trailer frames");
862 }
863 s->published_metadata[s->header_frames_received] =
864 GRPC_METADATA_PUBLISHED_FROM_WIRE;
865 maybe_complete_funcs[s->header_frames_received](t, s);
866 s->header_frames_received++;
867 }
868 if (parser->is_eof()) {
869 if (t->is_client && !s->write_closed) {
870 // server eof ==> complete closure; we may need to forcefully close
871 // the stream. Wait until the combiner lock is ready to be released
872 // however -- it might be that we receive a RST_STREAM following this
873 // and can avoid the extra write
874 GRPC_CHTTP2_STREAM_REF(s, "final_rst");
875 t->combiner->FinallyRun(
876 GRPC_CLOSURE_CREATE(force_client_rst_stream, s, nullptr),
877 absl::OkStatus());
878 }
879 grpc_chttp2_mark_stream_closed(t, s, true, false, absl::OkStatus());
880 }
881 }
882 parser->FinishFrame();
883 }
884 return absl::OkStatus();
885 }
886