1 #include "quiche/http2/adapter/oghttp2_session.h"
2
3 #include <cstdint>
4 #include <memory>
5 #include <optional>
6 #include <utility>
7 #include <vector>
8
9 #include "absl/cleanup/cleanup.h"
10 #include "absl/memory/memory.h"
11 #include "absl/strings/escaping.h"
12 #include "quiche/http2/adapter/header_validator.h"
13 #include "quiche/http2/adapter/http2_protocol.h"
14 #include "quiche/http2/adapter/http2_util.h"
15 #include "quiche/http2/adapter/http2_visitor_interface.h"
16 #include "quiche/http2/adapter/noop_header_validator.h"
17 #include "quiche/http2/adapter/oghttp2_util.h"
18 #include "quiche/common/quiche_callbacks.h"
19 #include "quiche/spdy/core/spdy_protocol.h"
20
21 namespace http2 {
22 namespace adapter {
23
24 namespace {
25
26 using ConnectionError = Http2VisitorInterface::ConnectionError;
27 using SpdyFramerError = Http2DecoderAdapter::SpdyFramerError;
28
29 using ::spdy::SpdySettingsIR;
30
31 const uint32_t kMaxAllowedMetadataFrameSize = 65536u;
32 const uint32_t kDefaultHpackTableCapacity = 4096u;
33 const uint32_t kMaximumHpackTableCapacity = 65536u;
34
35 // Corresponds to NGHTTP2_ERR_CALLBACK_FAILURE.
36 const int kSendError = -902;
37
38 constexpr absl::string_view kHeadValue = "HEAD";
39
40 // TODO(birenroy): Consider incorporating spdy::FlagsSerializionVisitor here.
41 class FrameAttributeCollector : public spdy::SpdyFrameVisitor {
42 public:
43 FrameAttributeCollector() = default;
VisitData(const spdy::SpdyDataIR & data)44 void VisitData(const spdy::SpdyDataIR& data) override {
45 frame_type_ = static_cast<uint8_t>(data.frame_type());
46 stream_id_ = data.stream_id();
47 flags_ =
48 (data.fin() ? END_STREAM_FLAG : 0) | (data.padded() ? PADDED_FLAG : 0);
49 }
VisitHeaders(const spdy::SpdyHeadersIR & headers)50 void VisitHeaders(const spdy::SpdyHeadersIR& headers) override {
51 frame_type_ = static_cast<uint8_t>(headers.frame_type());
52 stream_id_ = headers.stream_id();
53 flags_ = END_HEADERS_FLAG | (headers.fin() ? END_STREAM_FLAG : 0) |
54 (headers.padded() ? PADDED_FLAG : 0) |
55 (headers.has_priority() ? PRIORITY_FLAG : 0);
56 }
VisitPriority(const spdy::SpdyPriorityIR & priority)57 void VisitPriority(const spdy::SpdyPriorityIR& priority) override {
58 frame_type_ = static_cast<uint8_t>(priority.frame_type());
59 frame_type_ = 2;
60 stream_id_ = priority.stream_id();
61 }
VisitRstStream(const spdy::SpdyRstStreamIR & rst_stream)62 void VisitRstStream(const spdy::SpdyRstStreamIR& rst_stream) override {
63 frame_type_ = static_cast<uint8_t>(rst_stream.frame_type());
64 frame_type_ = 3;
65 stream_id_ = rst_stream.stream_id();
66 error_code_ = rst_stream.error_code();
67 }
VisitSettings(const spdy::SpdySettingsIR & settings)68 void VisitSettings(const spdy::SpdySettingsIR& settings) override {
69 frame_type_ = static_cast<uint8_t>(settings.frame_type());
70 frame_type_ = 4;
71 flags_ = (settings.is_ack() ? ACK_FLAG : 0);
72 }
VisitPushPromise(const spdy::SpdyPushPromiseIR & push_promise)73 void VisitPushPromise(const spdy::SpdyPushPromiseIR& push_promise) override {
74 frame_type_ = static_cast<uint8_t>(push_promise.frame_type());
75 frame_type_ = 5;
76 stream_id_ = push_promise.stream_id();
77 flags_ = (push_promise.padded() ? PADDED_FLAG : 0);
78 }
VisitPing(const spdy::SpdyPingIR & ping)79 void VisitPing(const spdy::SpdyPingIR& ping) override {
80 frame_type_ = static_cast<uint8_t>(ping.frame_type());
81 frame_type_ = 6;
82 flags_ = (ping.is_ack() ? ACK_FLAG : 0);
83 }
VisitGoAway(const spdy::SpdyGoAwayIR & goaway)84 void VisitGoAway(const spdy::SpdyGoAwayIR& goaway) override {
85 frame_type_ = static_cast<uint8_t>(goaway.frame_type());
86 frame_type_ = 7;
87 error_code_ = goaway.error_code();
88 }
VisitWindowUpdate(const spdy::SpdyWindowUpdateIR & window_update)89 void VisitWindowUpdate(
90 const spdy::SpdyWindowUpdateIR& window_update) override {
91 frame_type_ = static_cast<uint8_t>(window_update.frame_type());
92 frame_type_ = 8;
93 stream_id_ = window_update.stream_id();
94 }
VisitContinuation(const spdy::SpdyContinuationIR & continuation)95 void VisitContinuation(
96 const spdy::SpdyContinuationIR& continuation) override {
97 frame_type_ = static_cast<uint8_t>(continuation.frame_type());
98 stream_id_ = continuation.stream_id();
99 flags_ = continuation.end_headers() ? END_HEADERS_FLAG : 0;
100 }
VisitUnknown(const spdy::SpdyUnknownIR & unknown)101 void VisitUnknown(const spdy::SpdyUnknownIR& unknown) override {
102 frame_type_ = static_cast<uint8_t>(unknown.frame_type());
103 stream_id_ = unknown.stream_id();
104 flags_ = unknown.flags();
105 }
VisitAltSvc(const spdy::SpdyAltSvcIR &)106 void VisitAltSvc(const spdy::SpdyAltSvcIR& /*altsvc*/) override {}
VisitPriorityUpdate(const spdy::SpdyPriorityUpdateIR &)107 void VisitPriorityUpdate(
108 const spdy::SpdyPriorityUpdateIR& /*priority_update*/) override {}
VisitAcceptCh(const spdy::SpdyAcceptChIR &)109 void VisitAcceptCh(const spdy::SpdyAcceptChIR& /*accept_ch*/) override {}
110
stream_id()111 uint32_t stream_id() { return stream_id_; }
error_code()112 uint32_t error_code() { return error_code_; }
frame_type()113 uint8_t frame_type() { return frame_type_; }
flags()114 uint8_t flags() { return flags_; }
115
116 private:
117 uint32_t stream_id_ = 0;
118 uint32_t error_code_ = 0;
119 uint8_t frame_type_ = 0;
120 uint8_t flags_ = 0;
121 };
122
TracePerspectiveAsString(Perspective p)123 absl::string_view TracePerspectiveAsString(Perspective p) {
124 switch (p) {
125 case Perspective::kClient:
126 return "OGHTTP2_CLIENT";
127 case Perspective::kServer:
128 return "OGHTTP2_SERVER";
129 }
130 return "OGHTTP2_SERVER";
131 }
132
GetHttp2ErrorCode(SpdyFramerError error)133 Http2ErrorCode GetHttp2ErrorCode(SpdyFramerError error) {
134 switch (error) {
135 case SpdyFramerError::SPDY_NO_ERROR:
136 return Http2ErrorCode::HTTP2_NO_ERROR;
137 case SpdyFramerError::SPDY_INVALID_STREAM_ID:
138 case SpdyFramerError::SPDY_INVALID_CONTROL_FRAME:
139 case SpdyFramerError::SPDY_INVALID_PADDING:
140 case SpdyFramerError::SPDY_INVALID_DATA_FRAME_FLAGS:
141 case SpdyFramerError::SPDY_UNEXPECTED_FRAME:
142 return Http2ErrorCode::PROTOCOL_ERROR;
143 case SpdyFramerError::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
144 case SpdyFramerError::SPDY_INVALID_CONTROL_FRAME_SIZE:
145 case SpdyFramerError::SPDY_OVERSIZED_PAYLOAD:
146 return Http2ErrorCode::FRAME_SIZE_ERROR;
147 case SpdyFramerError::SPDY_DECOMPRESS_FAILURE:
148 case SpdyFramerError::SPDY_HPACK_INDEX_VARINT_ERROR:
149 case SpdyFramerError::SPDY_HPACK_NAME_LENGTH_VARINT_ERROR:
150 case SpdyFramerError::SPDY_HPACK_VALUE_LENGTH_VARINT_ERROR:
151 case SpdyFramerError::SPDY_HPACK_NAME_TOO_LONG:
152 case SpdyFramerError::SPDY_HPACK_VALUE_TOO_LONG:
153 case SpdyFramerError::SPDY_HPACK_NAME_HUFFMAN_ERROR:
154 case SpdyFramerError::SPDY_HPACK_VALUE_HUFFMAN_ERROR:
155 case SpdyFramerError::SPDY_HPACK_MISSING_DYNAMIC_TABLE_SIZE_UPDATE:
156 case SpdyFramerError::SPDY_HPACK_INVALID_INDEX:
157 case SpdyFramerError::SPDY_HPACK_INVALID_NAME_INDEX:
158 case SpdyFramerError::SPDY_HPACK_DYNAMIC_TABLE_SIZE_UPDATE_NOT_ALLOWED:
159 case SpdyFramerError::
160 SPDY_HPACK_INITIAL_DYNAMIC_TABLE_SIZE_UPDATE_IS_ABOVE_LOW_WATER_MARK:
161 case SpdyFramerError::
162 SPDY_HPACK_DYNAMIC_TABLE_SIZE_UPDATE_IS_ABOVE_ACKNOWLEDGED_SETTING:
163 case SpdyFramerError::SPDY_HPACK_TRUNCATED_BLOCK:
164 case SpdyFramerError::SPDY_HPACK_FRAGMENT_TOO_LONG:
165 case SpdyFramerError::SPDY_HPACK_COMPRESSED_HEADER_SIZE_EXCEEDS_LIMIT:
166 return Http2ErrorCode::COMPRESSION_ERROR;
167 case SpdyFramerError::SPDY_INTERNAL_FRAMER_ERROR:
168 case SpdyFramerError::SPDY_STOP_PROCESSING:
169 case SpdyFramerError::LAST_ERROR:
170 return Http2ErrorCode::INTERNAL_ERROR;
171 }
172 return Http2ErrorCode::INTERNAL_ERROR;
173 }
174
IsResponse(HeaderType type)175 bool IsResponse(HeaderType type) {
176 return type == HeaderType::RESPONSE_100 || type == HeaderType::RESPONSE;
177 }
178
StatusIs1xx(absl::string_view status)179 bool StatusIs1xx(absl::string_view status) {
180 return status.size() == 3 && status[0] == '1';
181 }
182
183 // Returns the upper bound on HPACK encoder table capacity. If not specified in
184 // the Options, a reasonable default upper bound is used.
HpackCapacityBound(const OgHttp2Session::Options & o)185 uint32_t HpackCapacityBound(const OgHttp2Session::Options& o) {
186 return o.max_hpack_encoding_table_capacity.value_or(
187 kMaximumHpackTableCapacity);
188 }
189
IsNonAckSettings(const spdy::SpdyFrameIR & frame)190 bool IsNonAckSettings(const spdy::SpdyFrameIR& frame) {
191 return frame.frame_type() == spdy::SpdyFrameType::SETTINGS &&
192 !reinterpret_cast<const SpdySettingsIR&>(frame).is_ack();
193 }
194
195 } // namespace
196
PassthroughHeadersHandler(OgHttp2Session & session,Http2VisitorInterface & visitor)197 OgHttp2Session::PassthroughHeadersHandler::PassthroughHeadersHandler(
198 OgHttp2Session& session, Http2VisitorInterface& visitor)
199 : session_(session), visitor_(visitor) {
200 if (session_.options_.validate_http_headers) {
201 QUICHE_VLOG(2) << "instantiating regular header validator";
202 validator_ = std::make_unique<HeaderValidator>();
203 if (session_.options_.validate_path) {
204 validator_->SetValidatePath();
205 }
206 if (session_.options_.allow_fragment_in_path) {
207 validator_->SetAllowFragmentInPath();
208 }
209 if (session_.options_.allow_different_host_and_authority) {
210 validator_->SetAllowDifferentHostAndAuthority();
211 }
212 } else {
213 QUICHE_VLOG(2) << "instantiating noop header validator";
214 validator_ = std::make_unique<NoopHeaderValidator>();
215 }
216 }
217
OnHeaderBlockStart()218 void OgHttp2Session::PassthroughHeadersHandler::OnHeaderBlockStart() {
219 Reset();
220 const bool status = visitor_.OnBeginHeadersForStream(stream_id_);
221 if (!status) {
222 QUICHE_VLOG(1)
223 << "Visitor rejected header block, returning HEADER_CONNECTION_ERROR";
224 SetResult(Http2VisitorInterface::HEADER_CONNECTION_ERROR);
225 }
226 validator_->StartHeaderBlock();
227 }
228
InterpretHeaderStatus(HeaderValidator::HeaderStatus status)229 Http2VisitorInterface::OnHeaderResult InterpretHeaderStatus(
230 HeaderValidator::HeaderStatus status) {
231 switch (status) {
232 case HeaderValidator::HEADER_OK:
233 case HeaderValidator::HEADER_SKIP:
234 return Http2VisitorInterface::HEADER_OK;
235 case HeaderValidator::HEADER_FIELD_INVALID:
236 return Http2VisitorInterface::HEADER_FIELD_INVALID;
237 case HeaderValidator::HEADER_FIELD_TOO_LONG:
238 return Http2VisitorInterface::HEADER_RST_STREAM;
239 }
240 return Http2VisitorInterface::HEADER_CONNECTION_ERROR;
241 }
242
OnHeader(absl::string_view key,absl::string_view value)243 void OgHttp2Session::PassthroughHeadersHandler::OnHeader(
244 absl::string_view key, absl::string_view value) {
245 if (error_encountered_) {
246 QUICHE_VLOG(2) << "Early return; status not HEADER_OK";
247 return;
248 }
249 const HeaderValidator::HeaderStatus validation_result =
250 validator_->ValidateSingleHeader(key, value);
251 if (validation_result == HeaderValidator::HEADER_SKIP) {
252 return;
253 }
254 if (validation_result != HeaderValidator::HEADER_OK) {
255 QUICHE_VLOG(2) << "Header validation failed with result "
256 << static_cast<int>(validation_result);
257 SetResult(InterpretHeaderStatus(validation_result));
258 return;
259 }
260 const Http2VisitorInterface::OnHeaderResult result =
261 visitor_.OnHeaderForStream(stream_id_, key, value);
262 SetResult(result);
263 }
264
OnHeaderBlockEnd(size_t,size_t)265 void OgHttp2Session::PassthroughHeadersHandler::OnHeaderBlockEnd(
266 size_t /* uncompressed_header_bytes */,
267 size_t /* compressed_header_bytes */) {
268 if (error_encountered_) {
269 // The error has already been handled.
270 return;
271 }
272 if (!validator_->FinishHeaderBlock(type_)) {
273 QUICHE_VLOG(1) << "FinishHeaderBlock returned false; returning "
274 << "HEADER_HTTP_MESSAGING";
275 SetResult(Http2VisitorInterface::HEADER_HTTP_MESSAGING);
276 return;
277 }
278 if (frame_contains_fin_ && IsResponse(type_) &&
279 StatusIs1xx(status_header())) {
280 QUICHE_VLOG(1) << "Unexpected end of stream without final headers";
281 SetResult(Http2VisitorInterface::HEADER_HTTP_MESSAGING);
282 return;
283 }
284 const bool result = visitor_.OnEndHeadersForStream(stream_id_);
285 if (!result) {
286 session_.fatal_visitor_callback_failure_ = true;
287 session_.decoder_.StopProcessing();
288 }
289 }
290
291 // TODO(diannahu): Add checks for request methods.
CanReceiveBody() const292 bool OgHttp2Session::PassthroughHeadersHandler::CanReceiveBody() const {
293 switch (header_type()) {
294 case HeaderType::REQUEST_TRAILER:
295 case HeaderType::RESPONSE_TRAILER:
296 case HeaderType::RESPONSE_100:
297 return false;
298 case HeaderType::RESPONSE:
299 // 304 responses should not have a body:
300 // https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.2
301 // Neither should 204 responses:
302 // https://httpwg.org/specs/rfc7231.html#rfc.section.6.3.5
303 return status_header() != "304" && status_header() != "204";
304 case HeaderType::REQUEST:
305 return true;
306 }
307 return true;
308 }
309
SetResult(Http2VisitorInterface::OnHeaderResult result)310 void OgHttp2Session::PassthroughHeadersHandler::SetResult(
311 Http2VisitorInterface::OnHeaderResult result) {
312 if (result != Http2VisitorInterface::HEADER_OK) {
313 error_encountered_ = true;
314 session_.OnHeaderStatus(stream_id_, result);
315 }
316 }
317
318 // A visitor that extracts an int64_t from each type of a ProcessBytesResult.
319 struct OgHttp2Session::ProcessBytesResultVisitor {
operator ()http2::adapter::OgHttp2Session::ProcessBytesResultVisitor320 int64_t operator()(const int64_t bytes) const { return bytes; }
321
operator ()http2::adapter::OgHttp2Session::ProcessBytesResultVisitor322 int64_t operator()(const ProcessBytesError error) const {
323 switch (error) {
324 case ProcessBytesError::kUnspecified:
325 return -1;
326 case ProcessBytesError::kInvalidConnectionPreface:
327 return -903; // NGHTTP2_ERR_BAD_CLIENT_MAGIC
328 case ProcessBytesError::kVisitorCallbackFailed:
329 return -902; // NGHTTP2_ERR_CALLBACK_FAILURE
330 }
331 return -1;
332 }
333 };
334
OgHttp2Session(Http2VisitorInterface & visitor,Options options)335 OgHttp2Session::OgHttp2Session(Http2VisitorInterface& visitor, Options options)
336 : visitor_(visitor),
337 options_(options),
338 event_forwarder_([this]() { return !latched_error_; }, *this),
339 receive_logger_(
340 &event_forwarder_, TracePerspectiveAsString(options.perspective),
__anond62579530302() 341 [logging_enabled = GetQuicheFlag(quiche_oghttp2_debug_trace)]() {
342 return logging_enabled;
343 },
344 this),
345 send_logger_(
346 TracePerspectiveAsString(options.perspective),
__anond62579530402() 347 [logging_enabled = GetQuicheFlag(quiche_oghttp2_debug_trace)]() {
348 return logging_enabled;
349 },
350 this),
351 headers_handler_(*this, visitor),
352 noop_headers_handler_(/*listener=*/nullptr),
353 connection_window_manager_(
354 kInitialFlowControlWindowSize,
__anond62579530502(size_t window_update_delta) 355 [this](size_t window_update_delta) {
356 SendWindowUpdate(kConnectionStreamId, window_update_delta);
357 },
358 options.should_window_update_fn,
359 /*update_window_on_notify=*/false),
360 max_outbound_concurrent_streams_(
361 options.remote_max_concurrent_streams.value_or(100u)) {
362 decoder_.set_visitor(&receive_logger_);
363 if (options_.max_header_list_bytes) {
364 // Limit buffering of encoded HPACK data to 2x the decoded limit.
365 decoder_.GetHpackDecoder().set_max_decode_buffer_size_bytes(
366 2 * *options_.max_header_list_bytes);
367 // Limit the total bytes accepted for HPACK decoding to 4x the limit.
368 decoder_.GetHpackDecoder().set_max_header_block_bytes(
369 4 * *options_.max_header_list_bytes);
370 }
371 if (IsServerSession()) {
372 remaining_preface_ = {spdy::kHttp2ConnectionHeaderPrefix,
373 spdy::kHttp2ConnectionHeaderPrefixSize};
374 }
375 if (options_.max_header_field_size.has_value()) {
376 headers_handler_.SetMaxFieldSize(*options_.max_header_field_size);
377 }
378 headers_handler_.SetAllowObsText(options_.allow_obs_text);
379 if (!options_.crumble_cookies) {
380 // As seen in https://github.com/envoyproxy/envoy/issues/32611, some HTTP/2
381 // endpoints don't properly handle multiple `Cookie` header fields.
382 framer_.GetHpackEncoder()->DisableCookieCrumbling();
383 }
384 }
385
~OgHttp2Session()386 OgHttp2Session::~OgHttp2Session() {}
387
SetStreamUserData(Http2StreamId stream_id,void * user_data)388 void OgHttp2Session::SetStreamUserData(Http2StreamId stream_id,
389 void* user_data) {
390 auto it = stream_map_.find(stream_id);
391 if (it != stream_map_.end()) {
392 it->second.user_data = user_data;
393 }
394 }
395
GetStreamUserData(Http2StreamId stream_id)396 void* OgHttp2Session::GetStreamUserData(Http2StreamId stream_id) {
397 auto it = stream_map_.find(stream_id);
398 if (it != stream_map_.end()) {
399 return it->second.user_data;
400 }
401 auto p = pending_streams_.find(stream_id);
402 if (p != pending_streams_.end()) {
403 return p->second.user_data;
404 }
405 return nullptr;
406 }
407
ResumeStream(Http2StreamId stream_id)408 bool OgHttp2Session::ResumeStream(Http2StreamId stream_id) {
409 auto it = stream_map_.find(stream_id);
410 if (it == stream_map_.end() || !HasMoreData(it->second) ||
411 !write_scheduler_.StreamRegistered(stream_id)) {
412 return false;
413 }
414 it->second.data_deferred = false;
415 write_scheduler_.MarkStreamReady(stream_id, /*add_to_front=*/false);
416 return true;
417 }
418
GetStreamSendWindowSize(Http2StreamId stream_id) const419 int OgHttp2Session::GetStreamSendWindowSize(Http2StreamId stream_id) const {
420 auto it = stream_map_.find(stream_id);
421 if (it != stream_map_.end()) {
422 return it->second.send_window;
423 }
424 return -1;
425 }
426
GetStreamReceiveWindowLimit(Http2StreamId stream_id) const427 int OgHttp2Session::GetStreamReceiveWindowLimit(Http2StreamId stream_id) const {
428 auto it = stream_map_.find(stream_id);
429 if (it != stream_map_.end()) {
430 return it->second.window_manager.WindowSizeLimit();
431 }
432 return -1;
433 }
434
GetStreamReceiveWindowSize(Http2StreamId stream_id) const435 int OgHttp2Session::GetStreamReceiveWindowSize(Http2StreamId stream_id) const {
436 auto it = stream_map_.find(stream_id);
437 if (it != stream_map_.end()) {
438 return it->second.window_manager.CurrentWindowSize();
439 }
440 return -1;
441 }
442
GetReceiveWindowSize() const443 int OgHttp2Session::GetReceiveWindowSize() const {
444 return connection_window_manager_.CurrentWindowSize();
445 }
446
GetHpackEncoderDynamicTableSize() const447 int OgHttp2Session::GetHpackEncoderDynamicTableSize() const {
448 const spdy::HpackEncoder* encoder = framer_.GetHpackEncoder();
449 return encoder == nullptr ? 0 : encoder->GetDynamicTableSize();
450 }
451
GetHpackEncoderDynamicTableCapacity() const452 int OgHttp2Session::GetHpackEncoderDynamicTableCapacity() const {
453 const spdy::HpackEncoder* encoder = framer_.GetHpackEncoder();
454 return encoder == nullptr ? kDefaultHpackTableCapacity
455 : encoder->CurrentHeaderTableSizeSetting();
456 }
457
GetHpackDecoderDynamicTableSize() const458 int OgHttp2Session::GetHpackDecoderDynamicTableSize() const {
459 return decoder_.GetHpackDecoder().GetDynamicTableSize();
460 }
461
GetHpackDecoderSizeLimit() const462 int OgHttp2Session::GetHpackDecoderSizeLimit() const {
463 return decoder_.GetHpackDecoder().GetCurrentHeaderTableSizeSetting();
464 }
465
ProcessBytes(absl::string_view bytes)466 int64_t OgHttp2Session::ProcessBytes(absl::string_view bytes) {
467 QUICHE_VLOG(2) << TracePerspectiveAsString(options_.perspective)
468 << " processing [" << absl::CEscape(bytes) << "]";
469 return absl::visit(ProcessBytesResultVisitor(), ProcessBytesImpl(bytes));
470 }
471
472 absl::variant<int64_t, OgHttp2Session::ProcessBytesError>
ProcessBytesImpl(absl::string_view bytes)473 OgHttp2Session::ProcessBytesImpl(absl::string_view bytes) {
474 if (processing_bytes_) {
475 QUICHE_VLOG(1) << "Returning early; already processing bytes.";
476 return 0;
477 }
478 processing_bytes_ = true;
479 auto cleanup = absl::MakeCleanup([this]() { processing_bytes_ = false; });
480
481 if (options_.blackhole_data_on_connection_error && latched_error_) {
482 return static_cast<int64_t>(bytes.size());
483 }
484
485 int64_t preface_consumed = 0;
486 if (!remaining_preface_.empty()) {
487 QUICHE_VLOG(2) << "Preface bytes remaining: " << remaining_preface_.size();
488 // decoder_ does not understand the client connection preface.
489 size_t min_size = std::min(remaining_preface_.size(), bytes.size());
490 if (!absl::StartsWith(remaining_preface_, bytes.substr(0, min_size))) {
491 // Preface doesn't match!
492 QUICHE_DLOG(INFO) << "Preface doesn't match! Expected: ["
493 << absl::CEscape(remaining_preface_) << "], actual: ["
494 << absl::CEscape(bytes) << "]";
495 LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
496 ConnectionError::kInvalidConnectionPreface);
497 return ProcessBytesError::kInvalidConnectionPreface;
498 }
499 remaining_preface_.remove_prefix(min_size);
500 bytes.remove_prefix(min_size);
501 if (!remaining_preface_.empty()) {
502 QUICHE_VLOG(2) << "Preface bytes remaining: "
503 << remaining_preface_.size();
504 return static_cast<int64_t>(min_size);
505 }
506 preface_consumed = min_size;
507 }
508 int64_t result = decoder_.ProcessInput(bytes.data(), bytes.size());
509 QUICHE_VLOG(2) << "ProcessBytes result: " << result;
510 if (fatal_visitor_callback_failure_) {
511 QUICHE_DCHECK(latched_error_);
512 QUICHE_VLOG(2) << "Visitor callback failed while processing bytes.";
513 return ProcessBytesError::kVisitorCallbackFailed;
514 }
515 if (latched_error_ || result < 0) {
516 QUICHE_VLOG(2) << "ProcessBytes encountered an error.";
517 if (options_.blackhole_data_on_connection_error) {
518 return static_cast<int64_t>(bytes.size() + preface_consumed);
519 } else {
520 return ProcessBytesError::kUnspecified;
521 }
522 }
523 return result + preface_consumed;
524 }
525
Consume(Http2StreamId stream_id,size_t num_bytes)526 int OgHttp2Session::Consume(Http2StreamId stream_id, size_t num_bytes) {
527 auto it = stream_map_.find(stream_id);
528 if (it == stream_map_.end()) {
529 QUICHE_LOG(ERROR) << "Stream " << stream_id << " not found when consuming "
530 << num_bytes << " bytes";
531 } else {
532 it->second.window_manager.MarkDataFlushed(num_bytes);
533 }
534 connection_window_manager_.MarkDataFlushed(num_bytes);
535 return 0; // Remove?
536 }
537
StartGracefulShutdown()538 void OgHttp2Session::StartGracefulShutdown() {
539 if (IsServerSession()) {
540 if (!queued_goaway_) {
541 EnqueueFrame(std::make_unique<spdy::SpdyGoAwayIR>(
542 std::numeric_limits<int32_t>::max(), spdy::ERROR_CODE_NO_ERROR,
543 "graceful_shutdown"));
544 }
545 } else {
546 QUICHE_LOG(ERROR) << "Graceful shutdown not needed for clients.";
547 }
548 }
549
EnqueueFrame(std::unique_ptr<spdy::SpdyFrameIR> frame)550 void OgHttp2Session::EnqueueFrame(std::unique_ptr<spdy::SpdyFrameIR> frame) {
551 if (queued_immediate_goaway_) {
552 // Do not allow additional frames to be enqueued after the GOAWAY.
553 return;
554 }
555
556 const bool is_non_ack_settings = IsNonAckSettings(*frame);
557 MaybeSetupPreface(is_non_ack_settings);
558
559 if (frame->frame_type() == spdy::SpdyFrameType::GOAWAY) {
560 queued_goaway_ = true;
561 if (latched_error_) {
562 PrepareForImmediateGoAway();
563 }
564 } else if (frame->fin() ||
565 frame->frame_type() == spdy::SpdyFrameType::RST_STREAM) {
566 auto iter = stream_map_.find(frame->stream_id());
567 if (iter != stream_map_.end()) {
568 iter->second.half_closed_local = true;
569 }
570 if (frame->frame_type() == spdy::SpdyFrameType::RST_STREAM) {
571 // TODO(diannahu): Condition on existence in the stream map?
572 streams_reset_.insert(frame->stream_id());
573 }
574 } else if (frame->frame_type() == spdy::SpdyFrameType::WINDOW_UPDATE) {
575 UpdateReceiveWindow(
576 frame->stream_id(),
577 reinterpret_cast<spdy::SpdyWindowUpdateIR&>(*frame).delta());
578 } else if (is_non_ack_settings) {
579 HandleOutboundSettings(
580 *reinterpret_cast<spdy::SpdySettingsIR*>(frame.get()));
581 }
582 if (frame->stream_id() != 0) {
583 auto result = queued_frames_.insert({frame->stream_id(), 1});
584 if (!result.second) {
585 ++(result.first->second);
586 }
587 }
588 frames_.push_back(std::move(frame));
589 }
590
Send()591 int OgHttp2Session::Send() {
592 if (sending_) {
593 QUICHE_VLOG(1) << TracePerspectiveAsString(options_.perspective)
594 << " returning early; already sending.";
595 return 0;
596 }
597 sending_ = true;
598 auto cleanup = absl::MakeCleanup([this]() { sending_ = false; });
599
600 if (fatal_send_error_) {
601 return kSendError;
602 }
603
604 MaybeSetupPreface(/*sending_outbound_settings=*/false);
605
606 SendResult continue_writing = SendQueuedFrames();
607 if (queued_immediate_goaway_) {
608 // If an immediate GOAWAY was queued, then the above flush either sent the
609 // GOAWAY or buffered it to be sent on the next successful flush. In either
610 // case, return early here to avoid sending other frames.
611 return InterpretSendResult(continue_writing);
612 }
613 // Notify on new/pending streams closed due to GOAWAY receipt.
614 CloseGoAwayRejectedStreams();
615 // Wake streams for writes.
616 while (continue_writing == SendResult::SEND_OK && HasReadyStream()) {
617 const Http2StreamId stream_id = GetNextReadyStream();
618 // TODO(birenroy): Add a return value to indicate write blockage, so streams
619 // aren't woken unnecessarily.
620 QUICHE_VLOG(1) << "Waking stream " << stream_id << " for writes.";
621 continue_writing = WriteForStream(stream_id);
622 }
623 if (continue_writing == SendResult::SEND_OK) {
624 continue_writing = SendQueuedFrames();
625 }
626 return InterpretSendResult(continue_writing);
627 }
628
InterpretSendResult(SendResult result)629 int OgHttp2Session::InterpretSendResult(SendResult result) {
630 if (result == SendResult::SEND_ERROR) {
631 fatal_send_error_ = true;
632 return kSendError;
633 } else {
634 return 0;
635 }
636 }
637
HasReadyStream() const638 bool OgHttp2Session::HasReadyStream() const {
639 return !trailers_ready_.empty() ||
640 (write_scheduler_.HasReadyStreams() && connection_send_window_ > 0);
641 }
642
GetNextReadyStream()643 Http2StreamId OgHttp2Session::GetNextReadyStream() {
644 QUICHE_DCHECK(HasReadyStream());
645 if (!trailers_ready_.empty()) {
646 const Http2StreamId stream_id = *trailers_ready_.begin();
647 // WriteForStream() will re-mark the stream as ready, if necessary.
648 write_scheduler_.MarkStreamNotReady(stream_id);
649 trailers_ready_.erase(trailers_ready_.begin());
650 return stream_id;
651 }
652 return write_scheduler_.PopNextReadyStream();
653 }
654
SubmitRequestInternal(absl::Span<const Header> headers,std::unique_ptr<DataFrameSource> data_source,void * user_data)655 int32_t OgHttp2Session::SubmitRequestInternal(
656 absl::Span<const Header> headers,
657 std::unique_ptr<DataFrameSource> data_source, void* user_data) {
658 // TODO(birenroy): return an error for the incorrect perspective
659 const Http2StreamId stream_id = next_stream_id_;
660 next_stream_id_ += 2;
661 if (!pending_streams_.empty() || !CanCreateStream()) {
662 // TODO(diannahu): There should probably be a limit to the number of allowed
663 // pending streams.
664 pending_streams_.insert(
665 {stream_id, PendingStreamState{ToHeaderBlock(headers),
666 std::move(data_source), user_data}});
667 StartPendingStreams();
668 } else {
669 StartRequest(stream_id, ToHeaderBlock(headers), std::move(data_source),
670 user_data);
671 }
672 return stream_id;
673 }
674
SubmitResponseInternal(Http2StreamId stream_id,absl::Span<const Header> headers,std::unique_ptr<DataFrameSource> data_source)675 int OgHttp2Session::SubmitResponseInternal(
676 Http2StreamId stream_id, absl::Span<const Header> headers,
677 std::unique_ptr<DataFrameSource> data_source) {
678 // TODO(birenroy): return an error for the incorrect perspective
679 auto iter = stream_map_.find(stream_id);
680 if (iter == stream_map_.end()) {
681 QUICHE_LOG(ERROR) << "Unable to find stream " << stream_id;
682 return -501; // NGHTTP2_ERR_INVALID_ARGUMENT
683 }
684 const bool end_stream = data_source == nullptr;
685 if (!end_stream) {
686 // Add data source to stream state
687 iter->second.outbound_body = std::move(data_source);
688 write_scheduler_.MarkStreamReady(stream_id, false);
689 }
690 SendHeaders(stream_id, ToHeaderBlock(headers), end_stream);
691 return 0;
692 }
693
MaybeSendBufferedData()694 OgHttp2Session::SendResult OgHttp2Session::MaybeSendBufferedData() {
695 int64_t result = std::numeric_limits<int64_t>::max();
696 while (result > 0 && !buffered_data_.empty()) {
697 result = visitor_.OnReadyToSend(buffered_data_);
698 if (result > 0) {
699 buffered_data_.erase(0, result);
700 }
701 }
702 if (result < 0) {
703 LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
704 ConnectionError::kSendError);
705 return SendResult::SEND_ERROR;
706 }
707 return buffered_data_.empty() ? SendResult::SEND_OK
708 : SendResult::SEND_BLOCKED;
709 }
710
SendQueuedFrames()711 OgHttp2Session::SendResult OgHttp2Session::SendQueuedFrames() {
712 // Flush any serialized prefix.
713 if (const SendResult result = MaybeSendBufferedData();
714 result != SendResult::SEND_OK) {
715 return result;
716 }
717 // Serialize and send frames in the queue.
718 while (!frames_.empty()) {
719 const auto& frame_ptr = frames_.front();
720 FrameAttributeCollector c;
721 frame_ptr->Visit(&c);
722
723 // DATA frames should never be queued.
724 QUICHE_DCHECK_NE(c.frame_type(), 0);
725
726 const bool stream_reset =
727 c.stream_id() != 0 && streams_reset_.count(c.stream_id()) > 0;
728 if (stream_reset &&
729 c.frame_type() != static_cast<uint8_t>(FrameType::RST_STREAM)) {
730 // The stream has been reset, so any other remaining frames can be
731 // skipped.
732 // TODO(birenroy): inform the visitor of frames that are skipped.
733 DecrementQueuedFrameCount(c.stream_id(), c.frame_type());
734 frames_.pop_front();
735 continue;
736 } else if (!IsServerSession() && received_goaway_ &&
737 c.stream_id() >
738 static_cast<uint32_t>(received_goaway_stream_id_)) {
739 // This frame will be ignored by the server, so don't send it. The stream
740 // associated with this frame should have been closed in OnGoAway().
741 frames_.pop_front();
742 continue;
743 }
744 // Frames can't accurately report their own length; the actual serialized
745 // length must be used instead.
746 spdy::SpdySerializedFrame frame = framer_.SerializeFrame(*frame_ptr);
747 const size_t frame_payload_length = frame.size() - spdy::kFrameHeaderSize;
748 frame_ptr->Visit(&send_logger_);
749 visitor_.OnBeforeFrameSent(c.frame_type(), c.stream_id(),
750 frame_payload_length, c.flags());
751 const int64_t result = visitor_.OnReadyToSend(absl::string_view(frame));
752 if (result < 0) {
753 LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
754 ConnectionError::kSendError);
755 return SendResult::SEND_ERROR;
756 } else if (result == 0) {
757 // Write blocked.
758 return SendResult::SEND_BLOCKED;
759 } else {
760 frames_.pop_front();
761
762 const bool ok =
763 AfterFrameSent(c.frame_type(), c.stream_id(), frame_payload_length,
764 c.flags(), c.error_code());
765 if (!ok) {
766 LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
767 ConnectionError::kSendError);
768 return SendResult::SEND_ERROR;
769 }
770 if (static_cast<size_t>(result) < frame.size()) {
771 // The frame was partially written, so the rest must be buffered.
772 buffered_data_.append(frame.data() + result, frame.size() - result);
773 return SendResult::SEND_BLOCKED;
774 }
775 }
776 }
777 return SendResult::SEND_OK;
778 }
779
AfterFrameSent(uint8_t frame_type_int,uint32_t stream_id,size_t payload_length,uint8_t flags,uint32_t error_code)780 bool OgHttp2Session::AfterFrameSent(uint8_t frame_type_int, uint32_t stream_id,
781 size_t payload_length, uint8_t flags,
782 uint32_t error_code) {
783 const FrameType frame_type = static_cast<FrameType>(frame_type_int);
784 int result = visitor_.OnFrameSent(frame_type_int, stream_id, payload_length,
785 flags, error_code);
786 if (result < 0) {
787 return false;
788 }
789 if (stream_id == 0) {
790 if (frame_type == FrameType::SETTINGS) {
791 const bool is_settings_ack = (flags & ACK_FLAG);
792 if (is_settings_ack && encoder_header_table_capacity_when_acking_) {
793 framer_.UpdateHeaderEncoderTableSize(
794 *encoder_header_table_capacity_when_acking_);
795 encoder_header_table_capacity_when_acking_ = std::nullopt;
796 } else if (!is_settings_ack) {
797 sent_non_ack_settings_ = true;
798 }
799 }
800 return true;
801 }
802
803 const bool contains_fin =
804 (frame_type == FrameType::DATA || frame_type == FrameType::HEADERS) &&
805 (flags & END_STREAM_FLAG) == END_STREAM_FLAG;
806 auto it = stream_map_.find(stream_id);
807 const bool still_open_remote =
808 it != stream_map_.end() && !it->second.half_closed_remote;
809 if (contains_fin && still_open_remote &&
810 options_.rst_stream_no_error_when_incomplete && IsServerSession()) {
811 // Since the peer has not yet ended the stream, this endpoint should
812 // send a RST_STREAM NO_ERROR. See RFC 7540 Section 8.1.
813 frames_.push_front(std::make_unique<spdy::SpdyRstStreamIR>(
814 stream_id, spdy::SpdyErrorCode::ERROR_CODE_NO_ERROR));
815 auto queued_result = queued_frames_.insert({stream_id, 1});
816 if (!queued_result.second) {
817 ++(queued_result.first->second);
818 }
819 it->second.half_closed_remote = true;
820 }
821
822 DecrementQueuedFrameCount(stream_id, frame_type_int);
823 return true;
824 }
825
WriteForStream(Http2StreamId stream_id)826 OgHttp2Session::SendResult OgHttp2Session::WriteForStream(
827 Http2StreamId stream_id) {
828 auto it = stream_map_.find(stream_id);
829 if (it == stream_map_.end()) {
830 QUICHE_LOG(ERROR) << "Can't find stream " << stream_id
831 << " which is ready to write!";
832 return SendResult::SEND_OK;
833 }
834 StreamState& state = it->second;
835 auto reset_it = streams_reset_.find(stream_id);
836 if (reset_it != streams_reset_.end()) {
837 // The stream has been reset; there's no point in sending DATA or trailing
838 // HEADERS.
839 AbandonData(state);
840 state.trailers = nullptr;
841 return SendResult::SEND_OK;
842 }
843
844 SendResult connection_can_write = SendResult::SEND_OK;
845 if (!IsReadyToWriteData(state)) {
846 // No data to send, but there might be trailers.
847 if (state.trailers != nullptr) {
848 // Trailers will include END_STREAM, so the data source can be discarded.
849 // Since data_deferred is true, there is no data waiting to be flushed for
850 // this stream.
851 AbandonData(state);
852 auto block_ptr = std::move(state.trailers);
853 if (state.half_closed_local) {
854 QUICHE_LOG(ERROR) << "Sent fin; can't send trailers.";
855
856 // TODO(birenroy,diannahu): Consider queuing a RST_STREAM INTERNAL_ERROR
857 // instead.
858 CloseStream(stream_id, Http2ErrorCode::INTERNAL_ERROR);
859 } else {
860 SendTrailers(stream_id, std::move(*block_ptr));
861 }
862 }
863 return SendResult::SEND_OK;
864 }
865 int32_t available_window =
866 std::min({connection_send_window_, state.send_window,
867 static_cast<int32_t>(max_frame_payload_)});
868 while (connection_can_write == SendResult::SEND_OK && available_window > 0 &&
869 IsReadyToWriteData(state)) {
870 DataFrameInfo info = GetDataFrameInfo(stream_id, available_window, state);
871 QUICHE_VLOG(2) << "WriteForStream | length: " << info.payload_length
872 << " end_data: " << info.end_data
873 << " trailers: " << state.trailers.get();
874 if (info.payload_length == 0 && !info.end_data &&
875 state.trailers == nullptr) {
876 // An unproductive call to SelectPayloadLength() results in this stream
877 // entering the "deferred" state only if no trailers are available to
878 // send.
879 state.data_deferred = true;
880 break;
881 } else if (info.payload_length == DataFrameSource::kError) {
882 // TODO(birenroy,diannahu): Consider queuing a RST_STREAM INTERNAL_ERROR
883 // instead.
884 CloseStream(stream_id, Http2ErrorCode::INTERNAL_ERROR);
885 // No more work on the stream; it has been closed.
886 break;
887 }
888 if (info.payload_length > 0 || info.send_fin) {
889 spdy::SpdyDataIR data(stream_id);
890 data.set_fin(info.send_fin);
891 data.SetDataShallow(info.payload_length);
892 spdy::SpdySerializedFrame header =
893 spdy::SpdyFramer::SerializeDataFrameHeaderWithPaddingLengthField(
894 data);
895 QUICHE_DCHECK(buffered_data_.empty() && frames_.empty());
896 data.Visit(&send_logger_);
897 const bool success = SendDataFrame(stream_id, absl::string_view(header),
898 info.payload_length, state);
899 if (!success) {
900 connection_can_write = SendResult::SEND_BLOCKED;
901 break;
902 }
903 connection_send_window_ -= info.payload_length;
904 state.send_window -= info.payload_length;
905 available_window = std::min({connection_send_window_, state.send_window,
906 static_cast<int32_t>(max_frame_payload_)});
907 if (info.send_fin) {
908 state.half_closed_local = true;
909 MaybeFinWithRstStream(it);
910 }
911 const bool ok =
912 AfterFrameSent(/* DATA */ 0, stream_id, info.payload_length,
913 info.send_fin ? END_STREAM_FLAG : 0x0, 0);
914 if (!ok) {
915 LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
916 ConnectionError::kSendError);
917 return SendResult::SEND_ERROR;
918 }
919 if (!stream_map_.contains(stream_id)) {
920 // Note: the stream may have been closed if `fin` is true.
921 break;
922 }
923 }
924 if (info.end_data ||
925 (info.payload_length == 0 && state.trailers != nullptr)) {
926 // If SelectPayloadLength() returned {0, false}, and there are trailers to
927 // send, it's okay to send the trailers.
928 if (state.trailers != nullptr) {
929 auto block_ptr = std::move(state.trailers);
930 if (info.send_fin) {
931 QUICHE_LOG(ERROR) << "Sent fin; can't send trailers.";
932
933 // TODO(birenroy,diannahu): Consider queuing a RST_STREAM
934 // INTERNAL_ERROR instead.
935 CloseStream(stream_id, Http2ErrorCode::INTERNAL_ERROR);
936 // No more work on this stream; it has been closed.
937 break;
938 } else {
939 SendTrailers(stream_id, std::move(*block_ptr));
940 }
941 }
942 AbandonData(state);
943 }
944 }
945 // If the stream still exists and has data to send, it should be marked as
946 // ready in the write scheduler.
947 if (stream_map_.contains(stream_id) && !state.data_deferred &&
948 state.send_window > 0 && HasMoreData(state)) {
949 write_scheduler_.MarkStreamReady(stream_id, false);
950 }
951 // Streams can continue writing as long as the connection is not write-blocked
952 // and there is additional flow control quota available.
953 if (connection_can_write != SendResult::SEND_OK) {
954 return connection_can_write;
955 }
956 return connection_send_window_ <= 0 ? SendResult::SEND_BLOCKED
957 : SendResult::SEND_OK;
958 }
959
SerializeMetadata(Http2StreamId stream_id,std::unique_ptr<MetadataSource> source)960 void OgHttp2Session::SerializeMetadata(Http2StreamId stream_id,
961 std::unique_ptr<MetadataSource> source) {
962 const uint32_t max_payload_size =
963 std::min(kMaxAllowedMetadataFrameSize, max_frame_payload_);
964 auto payload_buffer = std::make_unique<uint8_t[]>(max_payload_size);
965
966 while (true) {
967 auto [written, end_metadata] =
968 source->Pack(payload_buffer.get(), max_payload_size);
969 if (written < 0) {
970 // Unable to pack any metadata.
971 return;
972 }
973 QUICHE_DCHECK_LE(static_cast<size_t>(written), max_payload_size);
974 auto payload = absl::string_view(
975 reinterpret_cast<const char*>(payload_buffer.get()), written);
976 EnqueueFrame(std::make_unique<spdy::SpdyUnknownIR>(
977 stream_id, kMetadataFrameType, end_metadata ? kMetadataEndFlag : 0u,
978 std::string(payload)));
979 if (end_metadata) {
980 return;
981 }
982 }
983 }
984
SubmitRequest(absl::Span<const Header> headers,std::unique_ptr<DataFrameSource> data_source,void * user_data)985 int32_t OgHttp2Session::SubmitRequest(
986 absl::Span<const Header> headers,
987 std::unique_ptr<DataFrameSource> data_source, void* user_data) {
988 return SubmitRequestInternal(headers, std::move(data_source), user_data);
989 }
990
SubmitResponse(Http2StreamId stream_id,absl::Span<const Header> headers,std::unique_ptr<DataFrameSource> data_source)991 int OgHttp2Session::SubmitResponse(
992 Http2StreamId stream_id, absl::Span<const Header> headers,
993 std::unique_ptr<DataFrameSource> data_source) {
994 return SubmitResponseInternal(stream_id, headers, std::move(data_source));
995 }
996
SubmitTrailer(Http2StreamId stream_id,absl::Span<const Header> trailers)997 int OgHttp2Session::SubmitTrailer(Http2StreamId stream_id,
998 absl::Span<const Header> trailers) {
999 // TODO(birenroy): Reject trailers when acting as a client?
1000 auto iter = stream_map_.find(stream_id);
1001 if (iter == stream_map_.end()) {
1002 QUICHE_LOG(ERROR) << "Unable to find stream " << stream_id;
1003 return -501; // NGHTTP2_ERR_INVALID_ARGUMENT
1004 }
1005 StreamState& state = iter->second;
1006 if (state.half_closed_local) {
1007 QUICHE_LOG(ERROR) << "Stream " << stream_id << " is half closed (local)";
1008 return -514; // NGHTTP2_ERR_INVALID_STREAM_STATE
1009 }
1010 if (state.trailers != nullptr) {
1011 QUICHE_LOG(ERROR) << "Stream " << stream_id
1012 << " already has trailers queued";
1013 return -514; // NGHTTP2_ERR_INVALID_STREAM_STATE
1014 }
1015 if (!HasMoreData(state)) {
1016 // Enqueue trailers immediately.
1017 SendTrailers(stream_id, ToHeaderBlock(trailers));
1018 } else {
1019 QUICHE_LOG_IF(ERROR, state.outbound_body->send_fin())
1020 << "DataFrameSource will send fin, preventing trailers!";
1021 // Save trailers so they can be written once data is done.
1022 state.trailers =
1023 std::make_unique<spdy::Http2HeaderBlock>(ToHeaderBlock(trailers));
1024 trailers_ready_.insert(stream_id);
1025 }
1026 return 0;
1027 }
1028
SubmitMetadata(Http2StreamId stream_id,std::unique_ptr<MetadataSource> source)1029 void OgHttp2Session::SubmitMetadata(Http2StreamId stream_id,
1030 std::unique_ptr<MetadataSource> source) {
1031 SerializeMetadata(stream_id, std::move(source));
1032 }
1033
SubmitSettings(absl::Span<const Http2Setting> settings)1034 void OgHttp2Session::SubmitSettings(absl::Span<const Http2Setting> settings) {
1035 auto frame = PrepareSettingsFrame(settings);
1036 EnqueueFrame(std::move(frame));
1037 }
1038
OnError(SpdyFramerError error,std::string detailed_error)1039 void OgHttp2Session::OnError(SpdyFramerError error,
1040 std::string detailed_error) {
1041 QUICHE_VLOG(1) << "Error: "
1042 << http2::Http2DecoderAdapter::SpdyFramerErrorToString(error)
1043 << " details: " << detailed_error;
1044 // TODO(diannahu): Consider propagating `detailed_error`.
1045 LatchErrorAndNotify(GetHttp2ErrorCode(error), ConnectionError::kParseError);
1046 }
1047
OnCommonHeader(spdy::SpdyStreamId stream_id,size_t length,uint8_t type,uint8_t flags)1048 void OgHttp2Session::OnCommonHeader(spdy::SpdyStreamId stream_id, size_t length,
1049 uint8_t type, uint8_t flags) {
1050 current_frame_type_ = type;
1051 highest_received_stream_id_ = std::max(static_cast<Http2StreamId>(stream_id),
1052 highest_received_stream_id_);
1053 if (streams_reset_.contains(stream_id)) {
1054 return;
1055 }
1056 const bool result = visitor_.OnFrameHeader(stream_id, length, type, flags);
1057 if (!result) {
1058 fatal_visitor_callback_failure_ = true;
1059 decoder_.StopProcessing();
1060 }
1061 }
1062
OnDataFrameHeader(spdy::SpdyStreamId stream_id,size_t length,bool)1063 void OgHttp2Session::OnDataFrameHeader(spdy::SpdyStreamId stream_id,
1064 size_t length, bool /*fin*/) {
1065 auto iter = stream_map_.find(stream_id);
1066 if (iter == stream_map_.end() || streams_reset_.contains(stream_id)) {
1067 // The stream does not exist; it could be an error or a benign close, e.g.,
1068 // getting data for a stream this connection recently closed.
1069 if (static_cast<Http2StreamId>(stream_id) > highest_processed_stream_id_) {
1070 // Receiving DATA before HEADERS is a connection error.
1071 LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1072 ConnectionError::kWrongFrameSequence);
1073 }
1074 return;
1075 }
1076
1077 if (static_cast<int64_t>(length) >
1078 connection_window_manager_.CurrentWindowSize()) {
1079 // Peer exceeded the connection flow control limit.
1080 LatchErrorAndNotify(
1081 Http2ErrorCode::FLOW_CONTROL_ERROR,
1082 Http2VisitorInterface::ConnectionError::kFlowControlError);
1083 return;
1084 }
1085
1086 if (static_cast<int64_t>(length) >
1087 iter->second.window_manager.CurrentWindowSize()) {
1088 // Peer exceeded the stream flow control limit.
1089 EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1090 stream_id, spdy::ERROR_CODE_FLOW_CONTROL_ERROR));
1091 return;
1092 }
1093
1094 const bool result = visitor_.OnBeginDataForStream(stream_id, length);
1095 if (!result) {
1096 fatal_visitor_callback_failure_ = true;
1097 decoder_.StopProcessing();
1098 }
1099
1100 if (!iter->second.can_receive_body && length > 0) {
1101 EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1102 stream_id, spdy::ERROR_CODE_PROTOCOL_ERROR));
1103 return;
1104 }
1105
1106 // Validate against the content-length if it exists.
1107 if (iter->second.remaining_content_length.has_value()) {
1108 if (length > *iter->second.remaining_content_length) {
1109 HandleContentLengthError(stream_id);
1110 iter->second.remaining_content_length.reset();
1111 } else {
1112 *iter->second.remaining_content_length -= length;
1113 }
1114 }
1115 }
1116
OnStreamFrameData(spdy::SpdyStreamId stream_id,const char * data,size_t len)1117 void OgHttp2Session::OnStreamFrameData(spdy::SpdyStreamId stream_id,
1118 const char* data, size_t len) {
1119 // Count the data against flow control, even if the stream is unknown.
1120 MarkDataBuffered(stream_id, len);
1121
1122 if (!stream_map_.contains(stream_id) || streams_reset_.contains(stream_id)) {
1123 // If the stream was unknown due to a protocol error, the visitor was
1124 // informed in OnDataFrameHeader().
1125 return;
1126 }
1127
1128 const bool result =
1129 visitor_.OnDataForStream(stream_id, absl::string_view(data, len));
1130 if (!result) {
1131 fatal_visitor_callback_failure_ = true;
1132 decoder_.StopProcessing();
1133 }
1134 }
1135
OnStreamEnd(spdy::SpdyStreamId stream_id)1136 void OgHttp2Session::OnStreamEnd(spdy::SpdyStreamId stream_id) {
1137 auto iter = stream_map_.find(stream_id);
1138 if (iter != stream_map_.end()) {
1139 iter->second.half_closed_remote = true;
1140 if (streams_reset_.contains(stream_id)) {
1141 return;
1142 }
1143
1144 // Validate against the content-length if it exists.
1145 if (iter->second.remaining_content_length.has_value() &&
1146 *iter->second.remaining_content_length != 0) {
1147 HandleContentLengthError(stream_id);
1148 return;
1149 }
1150
1151 const bool result = visitor_.OnEndStream(stream_id);
1152 if (!result) {
1153 fatal_visitor_callback_failure_ = true;
1154 decoder_.StopProcessing();
1155 }
1156 }
1157
1158 auto queued_frames_iter = queued_frames_.find(stream_id);
1159 const bool no_queued_frames = queued_frames_iter == queued_frames_.end() ||
1160 queued_frames_iter->second == 0;
1161 if (iter != stream_map_.end() && iter->second.half_closed_local &&
1162 !IsServerSession() && no_queued_frames) {
1163 // From the client's perspective, the stream can be closed if it's already
1164 // half_closed_local.
1165 CloseStream(stream_id, Http2ErrorCode::HTTP2_NO_ERROR);
1166 }
1167 }
1168
OnStreamPadLength(spdy::SpdyStreamId stream_id,size_t value)1169 void OgHttp2Session::OnStreamPadLength(spdy::SpdyStreamId stream_id,
1170 size_t value) {
1171 const size_t padding_length = 1 + value;
1172 const bool result = visitor_.OnDataPaddingLength(stream_id, padding_length);
1173 if (!result) {
1174 fatal_visitor_callback_failure_ = true;
1175 decoder_.StopProcessing();
1176 }
1177 connection_window_manager_.MarkWindowConsumed(padding_length);
1178 if (auto it = stream_map_.find(stream_id); it != stream_map_.end()) {
1179 it->second.window_manager.MarkWindowConsumed(padding_length);
1180 }
1181 }
1182
OnStreamPadding(spdy::SpdyStreamId,size_t)1183 void OgHttp2Session::OnStreamPadding(spdy::SpdyStreamId /*stream_id*/, size_t
1184 /*len*/) {
1185 // Flow control was accounted for in OnStreamPadLength().
1186 // TODO(181586191): Pass padding to the visitor?
1187 }
1188
OnHeaderFrameStart(spdy::SpdyStreamId stream_id)1189 spdy::SpdyHeadersHandlerInterface* OgHttp2Session::OnHeaderFrameStart(
1190 spdy::SpdyStreamId stream_id) {
1191 auto it = stream_map_.find(stream_id);
1192 if (it != stream_map_.end() && !streams_reset_.contains(stream_id)) {
1193 headers_handler_.set_stream_id(stream_id);
1194 headers_handler_.set_header_type(
1195 NextHeaderType(it->second.received_header_type));
1196 return &headers_handler_;
1197 } else {
1198 return &noop_headers_handler_;
1199 }
1200 }
1201
OnHeaderFrameEnd(spdy::SpdyStreamId stream_id)1202 void OgHttp2Session::OnHeaderFrameEnd(spdy::SpdyStreamId stream_id) {
1203 auto it = stream_map_.find(stream_id);
1204 if (it != stream_map_.end()) {
1205 if (headers_handler_.header_type() == HeaderType::RESPONSE &&
1206 !headers_handler_.status_header().empty() &&
1207 headers_handler_.status_header()[0] == '1') {
1208 // If response headers carried a 1xx response code, final response headers
1209 // should still be forthcoming.
1210 headers_handler_.set_header_type(HeaderType::RESPONSE_100);
1211 }
1212 it->second.received_header_type = headers_handler_.header_type();
1213
1214 // Track the content-length if the headers indicate that a body can follow.
1215 it->second.can_receive_body =
1216 headers_handler_.CanReceiveBody() && !it->second.sent_head_method;
1217 if (it->second.can_receive_body) {
1218 it->second.remaining_content_length = headers_handler_.content_length();
1219 }
1220
1221 headers_handler_.set_stream_id(0);
1222 }
1223 }
1224
OnRstStream(spdy::SpdyStreamId stream_id,spdy::SpdyErrorCode error_code)1225 void OgHttp2Session::OnRstStream(spdy::SpdyStreamId stream_id,
1226 spdy::SpdyErrorCode error_code) {
1227 auto iter = stream_map_.find(stream_id);
1228 if (iter != stream_map_.end()) {
1229 iter->second.half_closed_remote = true;
1230 AbandonData(iter->second);
1231 } else if (static_cast<Http2StreamId>(stream_id) >
1232 highest_processed_stream_id_) {
1233 // Receiving RST_STREAM before HEADERS is a connection error.
1234 LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1235 ConnectionError::kWrongFrameSequence);
1236 return;
1237 }
1238 if (streams_reset_.contains(stream_id)) {
1239 return;
1240 }
1241 visitor_.OnRstStream(stream_id, TranslateErrorCode(error_code));
1242 // TODO(birenroy): Consider whether there are outbound frames queued for the
1243 // stream.
1244 CloseStream(stream_id, TranslateErrorCode(error_code));
1245 }
1246
OnSettings()1247 void OgHttp2Session::OnSettings() {
1248 visitor_.OnSettingsStart();
1249 auto settings = std::make_unique<SpdySettingsIR>();
1250 settings->set_is_ack(true);
1251 EnqueueFrame(std::move(settings));
1252 }
1253
OnSetting(spdy::SpdySettingsId id,uint32_t value)1254 void OgHttp2Session::OnSetting(spdy::SpdySettingsId id, uint32_t value) {
1255 switch (id) {
1256 case HEADER_TABLE_SIZE:
1257 value = std::min(value, HpackCapacityBound(options_));
1258 if (value < framer_.GetHpackEncoder()->CurrentHeaderTableSizeSetting()) {
1259 // Safe to apply a smaller table capacity immediately.
1260 QUICHE_VLOG(2) << TracePerspectiveAsString(options_.perspective)
1261 << " applying encoder table capacity " << value;
1262 framer_.GetHpackEncoder()->ApplyHeaderTableSizeSetting(value);
1263 } else {
1264 QUICHE_VLOG(2)
1265 << TracePerspectiveAsString(options_.perspective)
1266 << " NOT applying encoder table capacity until writing ack: "
1267 << value;
1268 encoder_header_table_capacity_when_acking_ = value;
1269 }
1270 break;
1271 case ENABLE_PUSH:
1272 if (value > 1u) {
1273 visitor_.OnInvalidFrame(
1274 0, Http2VisitorInterface::InvalidFrameError::kProtocol);
1275 // The specification says this is a connection-level protocol error.
1276 LatchErrorAndNotify(
1277 Http2ErrorCode::PROTOCOL_ERROR,
1278 Http2VisitorInterface::ConnectionError::kInvalidSetting);
1279 return;
1280 }
1281 // Aside from validation, this setting is ignored.
1282 break;
1283 case MAX_CONCURRENT_STREAMS:
1284 max_outbound_concurrent_streams_ = value;
1285 if (!IsServerSession()) {
1286 // We may now be able to start pending streams.
1287 StartPendingStreams();
1288 }
1289 break;
1290 case INITIAL_WINDOW_SIZE:
1291 if (value > spdy::kSpdyMaximumWindowSize) {
1292 visitor_.OnInvalidFrame(
1293 0, Http2VisitorInterface::InvalidFrameError::kFlowControl);
1294 // The specification says this is a connection-level flow control error.
1295 LatchErrorAndNotify(
1296 Http2ErrorCode::FLOW_CONTROL_ERROR,
1297 Http2VisitorInterface::ConnectionError::kFlowControlError);
1298 return;
1299 } else {
1300 UpdateStreamSendWindowSizes(value);
1301 }
1302 break;
1303 case MAX_FRAME_SIZE:
1304 if (value < kDefaultFramePayloadSizeLimit ||
1305 value > kMaximumFramePayloadSizeLimit) {
1306 visitor_.OnInvalidFrame(
1307 0, Http2VisitorInterface::InvalidFrameError::kProtocol);
1308 // The specification says this is a connection-level protocol error.
1309 LatchErrorAndNotify(
1310 Http2ErrorCode::PROTOCOL_ERROR,
1311 Http2VisitorInterface::ConnectionError::kInvalidSetting);
1312 return;
1313 }
1314 max_frame_payload_ = value;
1315 break;
1316 case ENABLE_CONNECT_PROTOCOL:
1317 if (value > 1u || (value == 0 && peer_enables_connect_protocol_)) {
1318 visitor_.OnInvalidFrame(
1319 0, Http2VisitorInterface::InvalidFrameError::kProtocol);
1320 LatchErrorAndNotify(
1321 Http2ErrorCode::PROTOCOL_ERROR,
1322 Http2VisitorInterface::ConnectionError::kInvalidSetting);
1323 return;
1324 }
1325 peer_enables_connect_protocol_ = (value == 1u);
1326 break;
1327 default:
1328 // TODO(bnc): See if C++17 inline constants are allowed in QUICHE.
1329 if (id == kMetadataExtensionId) {
1330 peer_supports_metadata_ = (value != 0);
1331 } else {
1332 QUICHE_VLOG(1) << "Unimplemented SETTING id: " << id;
1333 }
1334 }
1335 visitor_.OnSetting({id, value});
1336 }
1337
OnSettingsEnd()1338 void OgHttp2Session::OnSettingsEnd() { visitor_.OnSettingsEnd(); }
1339
OnSettingsAck()1340 void OgHttp2Session::OnSettingsAck() {
1341 if (!settings_ack_callbacks_.empty()) {
1342 SettingsAckCallback callback = std::move(settings_ack_callbacks_.front());
1343 settings_ack_callbacks_.pop_front();
1344 std::move(callback)();
1345 }
1346
1347 visitor_.OnSettingsAck();
1348 }
1349
OnPing(spdy::SpdyPingId unique_id,bool is_ack)1350 void OgHttp2Session::OnPing(spdy::SpdyPingId unique_id, bool is_ack) {
1351 visitor_.OnPing(unique_id, is_ack);
1352 if (options_.auto_ping_ack && !is_ack) {
1353 auto ping = std::make_unique<spdy::SpdyPingIR>(unique_id);
1354 ping->set_is_ack(true);
1355 EnqueueFrame(std::move(ping));
1356 }
1357 }
1358
OnGoAway(spdy::SpdyStreamId last_accepted_stream_id,spdy::SpdyErrorCode error_code)1359 void OgHttp2Session::OnGoAway(spdy::SpdyStreamId last_accepted_stream_id,
1360 spdy::SpdyErrorCode error_code) {
1361 if (received_goaway_ &&
1362 last_accepted_stream_id >
1363 static_cast<spdy::SpdyStreamId>(received_goaway_stream_id_)) {
1364 // This GOAWAY has a higher `last_accepted_stream_id` than a previous
1365 // GOAWAY, a connection-level spec violation.
1366 const bool ok = visitor_.OnInvalidFrame(
1367 kConnectionStreamId,
1368 Http2VisitorInterface::InvalidFrameError::kProtocol);
1369 if (!ok) {
1370 fatal_visitor_callback_failure_ = true;
1371 }
1372 LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1373 ConnectionError::kInvalidGoAwayLastStreamId);
1374 return;
1375 }
1376
1377 received_goaway_ = true;
1378 received_goaway_stream_id_ = last_accepted_stream_id;
1379 const bool result = visitor_.OnGoAway(last_accepted_stream_id,
1380 TranslateErrorCode(error_code), "");
1381 if (!result) {
1382 fatal_visitor_callback_failure_ = true;
1383 decoder_.StopProcessing();
1384 }
1385
1386 // Close the streams above `last_accepted_stream_id`. Only applies if the
1387 // session receives a GOAWAY as a client, as we do not support server push.
1388 if (last_accepted_stream_id == spdy::kMaxStreamId || IsServerSession()) {
1389 return;
1390 }
1391 std::vector<Http2StreamId> streams_to_close;
1392 for (const auto& [stream_id, stream_state] : stream_map_) {
1393 if (static_cast<spdy::SpdyStreamId>(stream_id) > last_accepted_stream_id) {
1394 streams_to_close.push_back(stream_id);
1395 }
1396 }
1397 for (Http2StreamId stream_id : streams_to_close) {
1398 CloseStream(stream_id, Http2ErrorCode::REFUSED_STREAM);
1399 }
1400 }
1401
OnGoAwayFrameData(const char *,size_t)1402 bool OgHttp2Session::OnGoAwayFrameData(const char* /*goaway_data*/, size_t
1403 /*len*/) {
1404 // Opaque data is currently ignored.
1405 return true;
1406 }
1407
OnHeaders(spdy::SpdyStreamId stream_id,size_t,bool,int,spdy::SpdyStreamId,bool,bool fin,bool)1408 void OgHttp2Session::OnHeaders(spdy::SpdyStreamId stream_id,
1409 size_t /*payload_length*/, bool /*has_priority*/,
1410 int /*weight*/,
1411 spdy::SpdyStreamId /*parent_stream_id*/,
1412 bool /*exclusive*/, bool fin, bool /*end*/) {
1413 if (stream_id % 2 == 0) {
1414 // Server push is disabled; receiving push HEADERS is a connection error.
1415 LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1416 ConnectionError::kInvalidNewStreamId);
1417 return;
1418 }
1419 headers_handler_.set_frame_contains_fin(fin);
1420 if (IsServerSession()) {
1421 const auto new_stream_id = static_cast<Http2StreamId>(stream_id);
1422 if (stream_map_.find(new_stream_id) != stream_map_.end() && fin) {
1423 // Not a new stream, must be trailers.
1424 return;
1425 }
1426 if (new_stream_id <= highest_processed_stream_id_) {
1427 // A new stream ID lower than the watermark is a connection error.
1428 LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1429 ConnectionError::kInvalidNewStreamId);
1430 return;
1431 }
1432
1433 if (stream_map_.size() >= max_inbound_concurrent_streams_) {
1434 // The new stream would exceed our advertised and acknowledged
1435 // MAX_CONCURRENT_STREAMS. For parity with nghttp2, treat this error as a
1436 // connection-level PROTOCOL_ERROR.
1437 bool ok = visitor_.OnInvalidFrame(
1438 stream_id, Http2VisitorInterface::InvalidFrameError::kProtocol);
1439 if (!ok) {
1440 fatal_visitor_callback_failure_ = true;
1441 }
1442 LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1443 ConnectionError::kExceededMaxConcurrentStreams);
1444 return;
1445 }
1446 if (stream_map_.size() >= pending_max_inbound_concurrent_streams_) {
1447 // The new stream would exceed our advertised but unacked
1448 // MAX_CONCURRENT_STREAMS. Refuse the stream for parity with nghttp2.
1449 EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1450 stream_id, spdy::ERROR_CODE_REFUSED_STREAM));
1451 const bool ok = visitor_.OnInvalidFrame(
1452 stream_id, Http2VisitorInterface::InvalidFrameError::kRefusedStream);
1453 if (!ok) {
1454 fatal_visitor_callback_failure_ = true;
1455 LatchErrorAndNotify(Http2ErrorCode::REFUSED_STREAM,
1456 ConnectionError::kExceededMaxConcurrentStreams);
1457 }
1458 return;
1459 }
1460
1461 CreateStream(stream_id);
1462 }
1463 }
1464
OnWindowUpdate(spdy::SpdyStreamId stream_id,int delta_window_size)1465 void OgHttp2Session::OnWindowUpdate(spdy::SpdyStreamId stream_id,
1466 int delta_window_size) {
1467 constexpr int kMaxWindowValue = 2147483647; // (1 << 31) - 1
1468 if (stream_id == 0) {
1469 if (delta_window_size == 0) {
1470 // A PROTOCOL_ERROR, according to RFC 9113 Section 6.9.
1471 LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1472 ConnectionError::kFlowControlError);
1473 return;
1474 }
1475 if (connection_send_window_ > 0 &&
1476 delta_window_size > (kMaxWindowValue - connection_send_window_)) {
1477 // Window overflow is a FLOW_CONTROL_ERROR.
1478 LatchErrorAndNotify(Http2ErrorCode::FLOW_CONTROL_ERROR,
1479 ConnectionError::kFlowControlError);
1480 return;
1481 }
1482 connection_send_window_ += delta_window_size;
1483 } else {
1484 if (delta_window_size == 0) {
1485 // A PROTOCOL_ERROR, according to RFC 9113 Section 6.9.
1486 EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1487 stream_id, spdy::ERROR_CODE_PROTOCOL_ERROR));
1488 return;
1489 }
1490 auto it = stream_map_.find(stream_id);
1491 if (it == stream_map_.end()) {
1492 QUICHE_VLOG(1) << "Stream " << stream_id << " not found!";
1493 if (static_cast<Http2StreamId>(stream_id) >
1494 highest_processed_stream_id_) {
1495 // Receiving WINDOW_UPDATE before HEADERS is a connection error.
1496 LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1497 ConnectionError::kWrongFrameSequence);
1498 }
1499 // Do not inform the visitor of a WINDOW_UPDATE for a non-existent stream.
1500 return;
1501 } else {
1502 if (streams_reset_.contains(stream_id)) {
1503 return;
1504 }
1505 if (it->second.send_window > 0 &&
1506 delta_window_size > (kMaxWindowValue - it->second.send_window)) {
1507 // Window overflow is a FLOW_CONTROL_ERROR.
1508 EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1509 stream_id, spdy::ERROR_CODE_FLOW_CONTROL_ERROR));
1510 return;
1511 }
1512 const bool was_blocked = (it->second.send_window <= 0);
1513 it->second.send_window += delta_window_size;
1514 if (was_blocked && it->second.send_window > 0) {
1515 // The stream was blocked on flow control.
1516 QUICHE_VLOG(1) << "Marking stream " << stream_id << " ready to write.";
1517 write_scheduler_.MarkStreamReady(stream_id, false);
1518 }
1519 }
1520 }
1521 visitor_.OnWindowUpdate(stream_id, delta_window_size);
1522 }
1523
OnPushPromise(spdy::SpdyStreamId,spdy::SpdyStreamId,bool)1524 void OgHttp2Session::OnPushPromise(spdy::SpdyStreamId /*stream_id*/,
1525 spdy::SpdyStreamId /*promised_stream_id*/,
1526 bool /*end*/) {
1527 // Server push is disabled; PUSH_PROMISE is an invalid frame.
1528 LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1529 ConnectionError::kInvalidPushPromise);
1530 }
1531
OnContinuation(spdy::SpdyStreamId,size_t,bool)1532 void OgHttp2Session::OnContinuation(spdy::SpdyStreamId /*stream_id*/,
1533 size_t /*payload_length*/, bool /*end*/) {}
1534
OnAltSvc(spdy::SpdyStreamId,absl::string_view,const spdy::SpdyAltSvcWireFormat::AlternativeServiceVector &)1535 void OgHttp2Session::OnAltSvc(spdy::SpdyStreamId /*stream_id*/,
1536 absl::string_view /*origin*/,
1537 const spdy::SpdyAltSvcWireFormat::
1538 AlternativeServiceVector& /*altsvc_vector*/) {
1539 }
1540
OnPriority(spdy::SpdyStreamId,spdy::SpdyStreamId,int,bool)1541 void OgHttp2Session::OnPriority(spdy::SpdyStreamId /*stream_id*/,
1542 spdy::SpdyStreamId /*parent_stream_id*/,
1543 int /*weight*/, bool /*exclusive*/) {}
1544
OnPriorityUpdate(spdy::SpdyStreamId,absl::string_view)1545 void OgHttp2Session::OnPriorityUpdate(
1546 spdy::SpdyStreamId /*prioritized_stream_id*/,
1547 absl::string_view /*priority_field_value*/) {}
1548
OnUnknownFrame(spdy::SpdyStreamId,uint8_t)1549 bool OgHttp2Session::OnUnknownFrame(spdy::SpdyStreamId /*stream_id*/,
1550 uint8_t /*frame_type*/) {
1551 return true;
1552 }
1553
OnUnknownFrameStart(spdy::SpdyStreamId stream_id,size_t length,uint8_t type,uint8_t flags)1554 void OgHttp2Session::OnUnknownFrameStart(spdy::SpdyStreamId stream_id,
1555 size_t length, uint8_t type,
1556 uint8_t flags) {
1557 process_metadata_ = false;
1558 if (streams_reset_.contains(stream_id)) {
1559 return;
1560 }
1561 if (type == kMetadataFrameType) {
1562 QUICHE_DCHECK_EQ(metadata_length_, 0u);
1563 visitor_.OnBeginMetadataForStream(stream_id, length);
1564 metadata_length_ = length;
1565 process_metadata_ = true;
1566 end_metadata_ = flags & kMetadataEndFlag;
1567
1568 // Empty metadata payloads will not trigger OnUnknownFramePayload(), so
1569 // handle that possibility here.
1570 MaybeHandleMetadataEndForStream(stream_id);
1571 } else {
1572 QUICHE_DLOG(INFO) << "Received unexpected frame type "
1573 << static_cast<int>(type);
1574 }
1575 }
1576
OnUnknownFramePayload(spdy::SpdyStreamId stream_id,absl::string_view payload)1577 void OgHttp2Session::OnUnknownFramePayload(spdy::SpdyStreamId stream_id,
1578 absl::string_view payload) {
1579 if (!process_metadata_) {
1580 return;
1581 }
1582 if (streams_reset_.contains(stream_id)) {
1583 return;
1584 }
1585 if (metadata_length_ > 0) {
1586 QUICHE_DCHECK_LE(payload.size(), metadata_length_);
1587 const bool payload_success =
1588 visitor_.OnMetadataForStream(stream_id, payload);
1589 if (payload_success) {
1590 metadata_length_ -= payload.size();
1591 MaybeHandleMetadataEndForStream(stream_id);
1592 } else {
1593 fatal_visitor_callback_failure_ = true;
1594 decoder_.StopProcessing();
1595 }
1596 } else {
1597 QUICHE_DLOG(INFO) << "Unexpected metadata payload for stream " << stream_id;
1598 }
1599 }
1600
OnHeaderStatus(Http2StreamId stream_id,Http2VisitorInterface::OnHeaderResult result)1601 void OgHttp2Session::OnHeaderStatus(
1602 Http2StreamId stream_id, Http2VisitorInterface::OnHeaderResult result) {
1603 QUICHE_DCHECK_NE(result, Http2VisitorInterface::HEADER_OK);
1604 QUICHE_VLOG(1) << "OnHeaderStatus(stream_id=" << stream_id
1605 << ", result=" << result << ")";
1606 const bool should_reset_stream =
1607 result == Http2VisitorInterface::HEADER_RST_STREAM ||
1608 result == Http2VisitorInterface::HEADER_FIELD_INVALID ||
1609 result == Http2VisitorInterface::HEADER_HTTP_MESSAGING;
1610 if (should_reset_stream) {
1611 const Http2ErrorCode error_code =
1612 (result == Http2VisitorInterface::HEADER_RST_STREAM)
1613 ? Http2ErrorCode::INTERNAL_ERROR
1614 : Http2ErrorCode::PROTOCOL_ERROR;
1615 const spdy::SpdyErrorCode spdy_error_code = TranslateErrorCode(error_code);
1616 const Http2VisitorInterface::InvalidFrameError frame_error =
1617 (result == Http2VisitorInterface::HEADER_RST_STREAM ||
1618 result == Http2VisitorInterface::HEADER_FIELD_INVALID)
1619 ? Http2VisitorInterface::InvalidFrameError::kHttpHeader
1620 : Http2VisitorInterface::InvalidFrameError::kHttpMessaging;
1621 auto it = streams_reset_.find(stream_id);
1622 if (it == streams_reset_.end()) {
1623 EnqueueFrame(
1624 std::make_unique<spdy::SpdyRstStreamIR>(stream_id, spdy_error_code));
1625
1626 if (result == Http2VisitorInterface::HEADER_FIELD_INVALID ||
1627 result == Http2VisitorInterface::HEADER_HTTP_MESSAGING) {
1628 const bool ok = visitor_.OnInvalidFrame(stream_id, frame_error);
1629 if (!ok) {
1630 fatal_visitor_callback_failure_ = true;
1631 LatchErrorAndNotify(error_code, ConnectionError::kHeaderError);
1632 }
1633 }
1634 }
1635 } else if (result == Http2VisitorInterface::HEADER_CONNECTION_ERROR) {
1636 fatal_visitor_callback_failure_ = true;
1637 LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
1638 ConnectionError::kHeaderError);
1639 } else if (result == Http2VisitorInterface::HEADER_COMPRESSION_ERROR) {
1640 LatchErrorAndNotify(Http2ErrorCode::COMPRESSION_ERROR,
1641 ConnectionError::kHeaderError);
1642 }
1643 }
1644
MaybeSetupPreface(bool sending_outbound_settings)1645 void OgHttp2Session::MaybeSetupPreface(bool sending_outbound_settings) {
1646 if (!queued_preface_) {
1647 queued_preface_ = true;
1648 if (!IsServerSession()) {
1649 buffered_data_.assign(spdy::kHttp2ConnectionHeaderPrefix,
1650 spdy::kHttp2ConnectionHeaderPrefixSize);
1651 }
1652 if (!sending_outbound_settings) {
1653 QUICHE_DCHECK(frames_.empty());
1654 // First frame must be a non-ack SETTINGS.
1655 EnqueueFrame(PrepareSettingsFrame(GetInitialSettings()));
1656 }
1657 }
1658 }
1659
GetInitialSettings() const1660 std::vector<Http2Setting> OgHttp2Session::GetInitialSettings() const {
1661 std::vector<Http2Setting> settings;
1662 if (!IsServerSession()) {
1663 // Disable server push. Note that server push from clients is already
1664 // disabled, so the server does not need to send this disabling setting.
1665 // TODO(diannahu): Consider applying server push disabling on SETTINGS ack.
1666 settings.push_back({Http2KnownSettingsId::ENABLE_PUSH, 0});
1667 }
1668 if (options_.max_header_list_bytes) {
1669 settings.push_back({Http2KnownSettingsId::MAX_HEADER_LIST_SIZE,
1670 *options_.max_header_list_bytes});
1671 }
1672 if (options_.allow_extended_connect && IsServerSession()) {
1673 settings.push_back({Http2KnownSettingsId::ENABLE_CONNECT_PROTOCOL, 1u});
1674 }
1675 return settings;
1676 }
1677
PrepareSettingsFrame(absl::Span<const Http2Setting> settings)1678 std::unique_ptr<SpdySettingsIR> OgHttp2Session::PrepareSettingsFrame(
1679 absl::Span<const Http2Setting> settings) {
1680 auto settings_ir = std::make_unique<SpdySettingsIR>();
1681 for (const Http2Setting& setting : settings) {
1682 settings_ir->AddSetting(setting.id, setting.value);
1683 }
1684 return settings_ir;
1685 }
1686
HandleOutboundSettings(const spdy::SpdySettingsIR & settings_frame)1687 void OgHttp2Session::HandleOutboundSettings(
1688 const spdy::SpdySettingsIR& settings_frame) {
1689 for (const auto& [id, value] : settings_frame.values()) {
1690 switch (static_cast<Http2KnownSettingsId>(id)) {
1691 case MAX_CONCURRENT_STREAMS:
1692 pending_max_inbound_concurrent_streams_ = value;
1693 break;
1694 case ENABLE_CONNECT_PROTOCOL:
1695 if (value == 1u && IsServerSession()) {
1696 // Allow extended CONNECT semantics even before SETTINGS are acked, to
1697 // make things easier for clients.
1698 headers_handler_.SetAllowExtendedConnect();
1699 }
1700 break;
1701 case HEADER_TABLE_SIZE:
1702 case ENABLE_PUSH:
1703 case INITIAL_WINDOW_SIZE:
1704 case MAX_FRAME_SIZE:
1705 case MAX_HEADER_LIST_SIZE:
1706 QUICHE_VLOG(2)
1707 << "Not adjusting internal state for outbound setting with id "
1708 << id;
1709 break;
1710 }
1711 }
1712
1713 // Copy the (small) map of settings we are about to send so that we can set
1714 // values in the SETTINGS ack callback.
1715 settings_ack_callbacks_.push_back(
1716 [this, settings_map = settings_frame.values()]() {
1717 for (const auto& [id, value] : settings_map) {
1718 switch (static_cast<Http2KnownSettingsId>(id)) {
1719 case MAX_CONCURRENT_STREAMS:
1720 max_inbound_concurrent_streams_ = value;
1721 break;
1722 case HEADER_TABLE_SIZE:
1723 decoder_.GetHpackDecoder().ApplyHeaderTableSizeSetting(value);
1724 break;
1725 case INITIAL_WINDOW_SIZE:
1726 UpdateStreamReceiveWindowSizes(value);
1727 initial_stream_receive_window_ = value;
1728 break;
1729 case MAX_FRAME_SIZE:
1730 decoder_.SetMaxFrameSize(value);
1731 break;
1732 case ENABLE_PUSH:
1733 case MAX_HEADER_LIST_SIZE:
1734 case ENABLE_CONNECT_PROTOCOL:
1735 QUICHE_VLOG(2)
1736 << "No action required in ack for outbound setting with id "
1737 << id;
1738 break;
1739 }
1740 }
1741 });
1742 }
1743
SendWindowUpdate(Http2StreamId stream_id,size_t update_delta)1744 void OgHttp2Session::SendWindowUpdate(Http2StreamId stream_id,
1745 size_t update_delta) {
1746 EnqueueFrame(
1747 std::make_unique<spdy::SpdyWindowUpdateIR>(stream_id, update_delta));
1748 }
1749
SendHeaders(Http2StreamId stream_id,spdy::Http2HeaderBlock headers,bool end_stream)1750 void OgHttp2Session::SendHeaders(Http2StreamId stream_id,
1751 spdy::Http2HeaderBlock headers,
1752 bool end_stream) {
1753 auto frame =
1754 std::make_unique<spdy::SpdyHeadersIR>(stream_id, std::move(headers));
1755 frame->set_fin(end_stream);
1756 EnqueueFrame(std::move(frame));
1757 }
1758
SendTrailers(Http2StreamId stream_id,spdy::Http2HeaderBlock trailers)1759 void OgHttp2Session::SendTrailers(Http2StreamId stream_id,
1760 spdy::Http2HeaderBlock trailers) {
1761 auto frame =
1762 std::make_unique<spdy::SpdyHeadersIR>(stream_id, std::move(trailers));
1763 frame->set_fin(true);
1764 EnqueueFrame(std::move(frame));
1765 trailers_ready_.erase(stream_id);
1766 }
1767
MaybeFinWithRstStream(StreamStateMap::iterator iter)1768 void OgHttp2Session::MaybeFinWithRstStream(StreamStateMap::iterator iter) {
1769 QUICHE_DCHECK(iter != stream_map_.end() && iter->second.half_closed_local);
1770
1771 if (options_.rst_stream_no_error_when_incomplete && IsServerSession() &&
1772 !iter->second.half_closed_remote) {
1773 // Since the peer has not yet ended the stream, this endpoint should
1774 // send a RST_STREAM NO_ERROR. See RFC 7540 Section 8.1.
1775 EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1776 iter->first, spdy::SpdyErrorCode::ERROR_CODE_NO_ERROR));
1777 iter->second.half_closed_remote = true;
1778 }
1779 }
1780
MarkDataBuffered(Http2StreamId stream_id,size_t bytes)1781 void OgHttp2Session::MarkDataBuffered(Http2StreamId stream_id, size_t bytes) {
1782 connection_window_manager_.MarkDataBuffered(bytes);
1783 if (auto it = stream_map_.find(stream_id); it != stream_map_.end()) {
1784 it->second.window_manager.MarkDataBuffered(bytes);
1785 }
1786 }
1787
CreateStream(Http2StreamId stream_id)1788 OgHttp2Session::StreamStateMap::iterator OgHttp2Session::CreateStream(
1789 Http2StreamId stream_id) {
1790 WindowManager::WindowUpdateListener listener =
1791 [this, stream_id](size_t window_update_delta) {
1792 SendWindowUpdate(stream_id, window_update_delta);
1793 };
1794 auto [iter, inserted] = stream_map_.try_emplace(
1795 stream_id,
1796 StreamState(initial_stream_receive_window_, initial_stream_send_window_,
1797 std::move(listener), options_.should_window_update_fn));
1798 if (inserted) {
1799 // Add the stream to the write scheduler.
1800 const spdy::SpdyPriority priority = 3;
1801 write_scheduler_.RegisterStream(stream_id, priority);
1802
1803 highest_processed_stream_id_ =
1804 std::max(highest_processed_stream_id_, stream_id);
1805 }
1806 return iter;
1807 }
1808
StartRequest(Http2StreamId stream_id,spdy::Http2HeaderBlock headers,std::unique_ptr<DataFrameSource> data_source,void * user_data)1809 void OgHttp2Session::StartRequest(Http2StreamId stream_id,
1810 spdy::Http2HeaderBlock headers,
1811 std::unique_ptr<DataFrameSource> data_source,
1812 void* user_data) {
1813 if (received_goaway_) {
1814 // Do not start new streams after receiving a GOAWAY.
1815 goaway_rejected_streams_.insert(stream_id);
1816 return;
1817 }
1818
1819 auto iter = CreateStream(stream_id);
1820 const bool end_stream = data_source == nullptr;
1821 if (!end_stream) {
1822 iter->second.outbound_body = std::move(data_source);
1823 write_scheduler_.MarkStreamReady(stream_id, false);
1824 }
1825 iter->second.user_data = user_data;
1826 for (const auto& [name, value] : headers) {
1827 if (name == kHttp2MethodPseudoHeader && value == kHeadValue) {
1828 iter->second.sent_head_method = true;
1829 }
1830 }
1831 SendHeaders(stream_id, std::move(headers), end_stream);
1832 }
1833
StartPendingStreams()1834 void OgHttp2Session::StartPendingStreams() {
1835 while (!pending_streams_.empty() && CanCreateStream()) {
1836 auto& [stream_id, pending_stream] = pending_streams_.front();
1837 StartRequest(stream_id, std::move(pending_stream.headers),
1838 std::move(pending_stream.data_source),
1839 pending_stream.user_data);
1840 pending_streams_.pop_front();
1841 }
1842 }
1843
CloseStream(Http2StreamId stream_id,Http2ErrorCode error_code)1844 void OgHttp2Session::CloseStream(Http2StreamId stream_id,
1845 Http2ErrorCode error_code) {
1846 const bool result = visitor_.OnCloseStream(stream_id, error_code);
1847 if (!result) {
1848 latched_error_ = true;
1849 decoder_.StopProcessing();
1850 }
1851 stream_map_.erase(stream_id);
1852 trailers_ready_.erase(stream_id);
1853 streams_reset_.erase(stream_id);
1854 auto queued_it = queued_frames_.find(stream_id);
1855 if (queued_it != queued_frames_.end()) {
1856 // Remove any queued frames for this stream.
1857 int frames_remaining = queued_it->second;
1858 queued_frames_.erase(queued_it);
1859 for (auto it = frames_.begin();
1860 frames_remaining > 0 && it != frames_.end();) {
1861 if (static_cast<Http2StreamId>((*it)->stream_id()) == stream_id) {
1862 it = frames_.erase(it);
1863 --frames_remaining;
1864 } else {
1865 ++it;
1866 }
1867 }
1868 }
1869 if (write_scheduler_.StreamRegistered(stream_id)) {
1870 write_scheduler_.UnregisterStream(stream_id);
1871 }
1872
1873 StartPendingStreams();
1874 }
1875
CanCreateStream() const1876 bool OgHttp2Session::CanCreateStream() const {
1877 return stream_map_.size() < max_outbound_concurrent_streams_;
1878 }
1879
NextHeaderType(std::optional<HeaderType> current_type)1880 HeaderType OgHttp2Session::NextHeaderType(
1881 std::optional<HeaderType> current_type) {
1882 if (IsServerSession()) {
1883 if (!current_type) {
1884 return HeaderType::REQUEST;
1885 } else {
1886 return HeaderType::REQUEST_TRAILER;
1887 }
1888 } else if (!current_type || *current_type == HeaderType::RESPONSE_100) {
1889 return HeaderType::RESPONSE;
1890 } else {
1891 return HeaderType::RESPONSE_TRAILER;
1892 }
1893 }
1894
LatchErrorAndNotify(Http2ErrorCode error_code,ConnectionError error)1895 void OgHttp2Session::LatchErrorAndNotify(Http2ErrorCode error_code,
1896 ConnectionError error) {
1897 if (latched_error_) {
1898 // Do not kick a connection when it is down.
1899 return;
1900 }
1901
1902 latched_error_ = true;
1903 visitor_.OnConnectionError(error);
1904 decoder_.StopProcessing();
1905 EnqueueFrame(std::make_unique<spdy::SpdyGoAwayIR>(
1906 highest_processed_stream_id_, TranslateErrorCode(error_code),
1907 ConnectionErrorToString(error)));
1908 }
1909
CloseStreamIfReady(uint8_t frame_type,uint32_t stream_id)1910 void OgHttp2Session::CloseStreamIfReady(uint8_t frame_type,
1911 uint32_t stream_id) {
1912 auto iter = stream_map_.find(stream_id);
1913 if (iter == stream_map_.end()) {
1914 return;
1915 }
1916 const StreamState& state = iter->second;
1917 if (static_cast<FrameType>(frame_type) == FrameType::RST_STREAM ||
1918 (state.half_closed_local && state.half_closed_remote)) {
1919 CloseStream(stream_id, Http2ErrorCode::HTTP2_NO_ERROR);
1920 }
1921 }
1922
CloseGoAwayRejectedStreams()1923 void OgHttp2Session::CloseGoAwayRejectedStreams() {
1924 for (Http2StreamId stream_id : goaway_rejected_streams_) {
1925 const bool result =
1926 visitor_.OnCloseStream(stream_id, Http2ErrorCode::REFUSED_STREAM);
1927 if (!result) {
1928 latched_error_ = true;
1929 decoder_.StopProcessing();
1930 }
1931 }
1932 goaway_rejected_streams_.clear();
1933 }
1934
PrepareForImmediateGoAway()1935 void OgHttp2Session::PrepareForImmediateGoAway() {
1936 queued_immediate_goaway_ = true;
1937
1938 // Keep the initial SETTINGS frame if the session has SETTINGS at the front of
1939 // the queue but has not sent SETTINGS yet. The session should send initial
1940 // SETTINGS before GOAWAY.
1941 std::unique_ptr<spdy::SpdyFrameIR> initial_settings;
1942 if (!sent_non_ack_settings_ && !frames_.empty() &&
1943 IsNonAckSettings(*frames_.front())) {
1944 initial_settings = std::move(frames_.front());
1945 frames_.pop_front();
1946 }
1947
1948 // Remove all pending frames except for RST_STREAMs. It is important to send
1949 // RST_STREAMs so the peer knows of errors below the GOAWAY last stream ID.
1950 // TODO(diannahu): Consider informing the visitor of dropped frames. This may
1951 // mean keeping the frames and invoking a frame-not-sent callback, similar to
1952 // nghttp2. Could add a closure to each frame in the frames queue.
1953 frames_.remove_if([](const auto& frame) {
1954 return frame->frame_type() != spdy::SpdyFrameType::RST_STREAM;
1955 });
1956
1957 if (initial_settings != nullptr) {
1958 frames_.push_front(std::move(initial_settings));
1959 }
1960 }
1961
MaybeHandleMetadataEndForStream(Http2StreamId stream_id)1962 void OgHttp2Session::MaybeHandleMetadataEndForStream(Http2StreamId stream_id) {
1963 if (metadata_length_ == 0 && end_metadata_) {
1964 const bool completion_success = visitor_.OnMetadataEndForStream(stream_id);
1965 if (!completion_success) {
1966 fatal_visitor_callback_failure_ = true;
1967 decoder_.StopProcessing();
1968 }
1969 process_metadata_ = false;
1970 end_metadata_ = false;
1971 }
1972 }
1973
DecrementQueuedFrameCount(uint32_t stream_id,uint8_t frame_type)1974 void OgHttp2Session::DecrementQueuedFrameCount(uint32_t stream_id,
1975 uint8_t frame_type) {
1976 auto iter = queued_frames_.find(stream_id);
1977 if (iter == queued_frames_.end()) {
1978 QUICHE_LOG(ERROR) << "Unable to find a queued frame count for stream "
1979 << stream_id;
1980 return;
1981 }
1982 if (static_cast<FrameType>(frame_type) != FrameType::DATA) {
1983 --iter->second;
1984 }
1985 if (iter->second == 0) {
1986 // TODO(birenroy): Consider passing through `error_code` here.
1987 CloseStreamIfReady(frame_type, stream_id);
1988 }
1989 }
1990
HandleContentLengthError(Http2StreamId stream_id)1991 void OgHttp2Session::HandleContentLengthError(Http2StreamId stream_id) {
1992 if (current_frame_type_ == static_cast<uint8_t>(FrameType::HEADERS)) {
1993 // For consistency, either OnInvalidFrame should always be invoked,
1994 // regardless of frame type, or perhaps we should introduce an OnStreamError
1995 // callback.
1996 visitor_.OnInvalidFrame(
1997 stream_id, Http2VisitorInterface::InvalidFrameError::kHttpMessaging);
1998 }
1999 EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
2000 stream_id, spdy::ERROR_CODE_PROTOCOL_ERROR));
2001 }
2002
UpdateReceiveWindow(Http2StreamId stream_id,int32_t delta)2003 void OgHttp2Session::UpdateReceiveWindow(Http2StreamId stream_id,
2004 int32_t delta) {
2005 if (stream_id == 0) {
2006 connection_window_manager_.IncreaseWindow(delta);
2007 // TODO(b/181586191): Provide an explicit way to set the desired window
2008 // limit, remove the upsize-on-window-update behavior.
2009 const int64_t current_window =
2010 connection_window_manager_.CurrentWindowSize();
2011 if (current_window > connection_window_manager_.WindowSizeLimit()) {
2012 connection_window_manager_.SetWindowSizeLimit(current_window);
2013 }
2014 } else {
2015 auto iter = stream_map_.find(stream_id);
2016 if (iter != stream_map_.end()) {
2017 WindowManager& manager = iter->second.window_manager;
2018 manager.IncreaseWindow(delta);
2019 // TODO(b/181586191): Provide an explicit way to set the desired window
2020 // limit, remove the upsize-on-window-update behavior.
2021 const int64_t current_window = manager.CurrentWindowSize();
2022 if (current_window > manager.WindowSizeLimit()) {
2023 manager.SetWindowSizeLimit(current_window);
2024 }
2025 }
2026 }
2027 }
2028
UpdateStreamSendWindowSizes(uint32_t new_value)2029 void OgHttp2Session::UpdateStreamSendWindowSizes(uint32_t new_value) {
2030 const int32_t delta =
2031 static_cast<int32_t>(new_value) - initial_stream_send_window_;
2032 initial_stream_send_window_ = new_value;
2033 for (auto& [stream_id, stream_state] : stream_map_) {
2034 const int64_t current_window_size = stream_state.send_window;
2035 const int64_t new_window_size = current_window_size + delta;
2036 if (new_window_size > spdy::kSpdyMaximumWindowSize) {
2037 EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
2038 stream_id, spdy::ERROR_CODE_FLOW_CONTROL_ERROR));
2039 } else {
2040 stream_state.send_window += delta;
2041 }
2042 if (current_window_size <= 0 && new_window_size > 0) {
2043 write_scheduler_.MarkStreamReady(stream_id, false);
2044 }
2045 }
2046 }
2047
UpdateStreamReceiveWindowSizes(uint32_t new_value)2048 void OgHttp2Session::UpdateStreamReceiveWindowSizes(uint32_t new_value) {
2049 for (auto& [stream_id, stream_state] : stream_map_) {
2050 stream_state.window_manager.OnWindowSizeLimitChange(new_value);
2051 }
2052 }
2053
HasMoreData(const StreamState & stream_state) const2054 bool OgHttp2Session::HasMoreData(const StreamState& stream_state) const {
2055 return stream_state.outbound_body != nullptr;
2056 }
2057
IsReadyToWriteData(const StreamState & stream_state) const2058 bool OgHttp2Session::IsReadyToWriteData(const StreamState& stream_state) const {
2059 return stream_state.outbound_body != nullptr && !stream_state.data_deferred;
2060 }
2061
AbandonData(StreamState & stream_state)2062 void OgHttp2Session::AbandonData(StreamState& stream_state) {
2063 stream_state.outbound_body = nullptr;
2064 }
2065
GetDataFrameInfo(Http2StreamId,size_t flow_control_available,StreamState & stream_state)2066 OgHttp2Session::DataFrameInfo OgHttp2Session::GetDataFrameInfo(
2067 Http2StreamId /*stream_id*/, size_t flow_control_available,
2068 StreamState& stream_state) {
2069 DataFrameInfo info;
2070 std::tie(info.payload_length, info.end_data) =
2071 stream_state.outbound_body->SelectPayloadLength(flow_control_available);
2072 info.send_fin =
2073 info.end_data ? stream_state.outbound_body->send_fin() : false;
2074 return info;
2075 }
2076
SendDataFrame(Http2StreamId,absl::string_view frame_header,size_t payload_length,StreamState & stream_state)2077 bool OgHttp2Session::SendDataFrame(Http2StreamId /*stream_id*/,
2078 absl::string_view frame_header,
2079 size_t payload_length,
2080 StreamState& stream_state) {
2081 return stream_state.outbound_body->Send(frame_header, payload_length);
2082 }
2083
2084 } // namespace adapter
2085 } // namespace http2
2086