xref: /aosp_15_r20/external/cronet/net/third_party/quiche/src/quiche/http2/adapter/oghttp2_session.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 #include "quiche/http2/adapter/oghttp2_session.h"
2 
3 #include <cstdint>
4 #include <memory>
5 #include <optional>
6 #include <utility>
7 #include <vector>
8 
9 #include "absl/cleanup/cleanup.h"
10 #include "absl/memory/memory.h"
11 #include "absl/strings/escaping.h"
12 #include "quiche/http2/adapter/header_validator.h"
13 #include "quiche/http2/adapter/http2_protocol.h"
14 #include "quiche/http2/adapter/http2_util.h"
15 #include "quiche/http2/adapter/http2_visitor_interface.h"
16 #include "quiche/http2/adapter/noop_header_validator.h"
17 #include "quiche/http2/adapter/oghttp2_util.h"
18 #include "quiche/common/quiche_callbacks.h"
19 #include "quiche/spdy/core/spdy_protocol.h"
20 
21 namespace http2 {
22 namespace adapter {
23 
24 namespace {
25 
26 using ConnectionError = Http2VisitorInterface::ConnectionError;
27 using SpdyFramerError = Http2DecoderAdapter::SpdyFramerError;
28 
29 using ::spdy::SpdySettingsIR;
30 
31 const uint32_t kMaxAllowedMetadataFrameSize = 65536u;
32 const uint32_t kDefaultHpackTableCapacity = 4096u;
33 const uint32_t kMaximumHpackTableCapacity = 65536u;
34 
35 // Corresponds to NGHTTP2_ERR_CALLBACK_FAILURE.
36 const int kSendError = -902;
37 
38 constexpr absl::string_view kHeadValue = "HEAD";
39 
40 // TODO(birenroy): Consider incorporating spdy::FlagsSerializionVisitor here.
41 class FrameAttributeCollector : public spdy::SpdyFrameVisitor {
42  public:
43   FrameAttributeCollector() = default;
VisitData(const spdy::SpdyDataIR & data)44   void VisitData(const spdy::SpdyDataIR& data) override {
45     frame_type_ = static_cast<uint8_t>(data.frame_type());
46     stream_id_ = data.stream_id();
47     flags_ =
48         (data.fin() ? END_STREAM_FLAG : 0) | (data.padded() ? PADDED_FLAG : 0);
49   }
VisitHeaders(const spdy::SpdyHeadersIR & headers)50   void VisitHeaders(const spdy::SpdyHeadersIR& headers) override {
51     frame_type_ = static_cast<uint8_t>(headers.frame_type());
52     stream_id_ = headers.stream_id();
53     flags_ = END_HEADERS_FLAG | (headers.fin() ? END_STREAM_FLAG : 0) |
54              (headers.padded() ? PADDED_FLAG : 0) |
55              (headers.has_priority() ? PRIORITY_FLAG : 0);
56   }
VisitPriority(const spdy::SpdyPriorityIR & priority)57   void VisitPriority(const spdy::SpdyPriorityIR& priority) override {
58     frame_type_ = static_cast<uint8_t>(priority.frame_type());
59     frame_type_ = 2;
60     stream_id_ = priority.stream_id();
61   }
VisitRstStream(const spdy::SpdyRstStreamIR & rst_stream)62   void VisitRstStream(const spdy::SpdyRstStreamIR& rst_stream) override {
63     frame_type_ = static_cast<uint8_t>(rst_stream.frame_type());
64     frame_type_ = 3;
65     stream_id_ = rst_stream.stream_id();
66     error_code_ = rst_stream.error_code();
67   }
VisitSettings(const spdy::SpdySettingsIR & settings)68   void VisitSettings(const spdy::SpdySettingsIR& settings) override {
69     frame_type_ = static_cast<uint8_t>(settings.frame_type());
70     frame_type_ = 4;
71     flags_ = (settings.is_ack() ? ACK_FLAG : 0);
72   }
VisitPushPromise(const spdy::SpdyPushPromiseIR & push_promise)73   void VisitPushPromise(const spdy::SpdyPushPromiseIR& push_promise) override {
74     frame_type_ = static_cast<uint8_t>(push_promise.frame_type());
75     frame_type_ = 5;
76     stream_id_ = push_promise.stream_id();
77     flags_ = (push_promise.padded() ? PADDED_FLAG : 0);
78   }
VisitPing(const spdy::SpdyPingIR & ping)79   void VisitPing(const spdy::SpdyPingIR& ping) override {
80     frame_type_ = static_cast<uint8_t>(ping.frame_type());
81     frame_type_ = 6;
82     flags_ = (ping.is_ack() ? ACK_FLAG : 0);
83   }
VisitGoAway(const spdy::SpdyGoAwayIR & goaway)84   void VisitGoAway(const spdy::SpdyGoAwayIR& goaway) override {
85     frame_type_ = static_cast<uint8_t>(goaway.frame_type());
86     frame_type_ = 7;
87     error_code_ = goaway.error_code();
88   }
VisitWindowUpdate(const spdy::SpdyWindowUpdateIR & window_update)89   void VisitWindowUpdate(
90       const spdy::SpdyWindowUpdateIR& window_update) override {
91     frame_type_ = static_cast<uint8_t>(window_update.frame_type());
92     frame_type_ = 8;
93     stream_id_ = window_update.stream_id();
94   }
VisitContinuation(const spdy::SpdyContinuationIR & continuation)95   void VisitContinuation(
96       const spdy::SpdyContinuationIR& continuation) override {
97     frame_type_ = static_cast<uint8_t>(continuation.frame_type());
98     stream_id_ = continuation.stream_id();
99     flags_ = continuation.end_headers() ? END_HEADERS_FLAG : 0;
100   }
VisitUnknown(const spdy::SpdyUnknownIR & unknown)101   void VisitUnknown(const spdy::SpdyUnknownIR& unknown) override {
102     frame_type_ = static_cast<uint8_t>(unknown.frame_type());
103     stream_id_ = unknown.stream_id();
104     flags_ = unknown.flags();
105   }
VisitAltSvc(const spdy::SpdyAltSvcIR &)106   void VisitAltSvc(const spdy::SpdyAltSvcIR& /*altsvc*/) override {}
VisitPriorityUpdate(const spdy::SpdyPriorityUpdateIR &)107   void VisitPriorityUpdate(
108       const spdy::SpdyPriorityUpdateIR& /*priority_update*/) override {}
VisitAcceptCh(const spdy::SpdyAcceptChIR &)109   void VisitAcceptCh(const spdy::SpdyAcceptChIR& /*accept_ch*/) override {}
110 
stream_id()111   uint32_t stream_id() { return stream_id_; }
error_code()112   uint32_t error_code() { return error_code_; }
frame_type()113   uint8_t frame_type() { return frame_type_; }
flags()114   uint8_t flags() { return flags_; }
115 
116  private:
117   uint32_t stream_id_ = 0;
118   uint32_t error_code_ = 0;
119   uint8_t frame_type_ = 0;
120   uint8_t flags_ = 0;
121 };
122 
TracePerspectiveAsString(Perspective p)123 absl::string_view TracePerspectiveAsString(Perspective p) {
124   switch (p) {
125     case Perspective::kClient:
126       return "OGHTTP2_CLIENT";
127     case Perspective::kServer:
128       return "OGHTTP2_SERVER";
129   }
130   return "OGHTTP2_SERVER";
131 }
132 
GetHttp2ErrorCode(SpdyFramerError error)133 Http2ErrorCode GetHttp2ErrorCode(SpdyFramerError error) {
134   switch (error) {
135     case SpdyFramerError::SPDY_NO_ERROR:
136       return Http2ErrorCode::HTTP2_NO_ERROR;
137     case SpdyFramerError::SPDY_INVALID_STREAM_ID:
138     case SpdyFramerError::SPDY_INVALID_CONTROL_FRAME:
139     case SpdyFramerError::SPDY_INVALID_PADDING:
140     case SpdyFramerError::SPDY_INVALID_DATA_FRAME_FLAGS:
141     case SpdyFramerError::SPDY_UNEXPECTED_FRAME:
142       return Http2ErrorCode::PROTOCOL_ERROR;
143     case SpdyFramerError::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
144     case SpdyFramerError::SPDY_INVALID_CONTROL_FRAME_SIZE:
145     case SpdyFramerError::SPDY_OVERSIZED_PAYLOAD:
146       return Http2ErrorCode::FRAME_SIZE_ERROR;
147     case SpdyFramerError::SPDY_DECOMPRESS_FAILURE:
148     case SpdyFramerError::SPDY_HPACK_INDEX_VARINT_ERROR:
149     case SpdyFramerError::SPDY_HPACK_NAME_LENGTH_VARINT_ERROR:
150     case SpdyFramerError::SPDY_HPACK_VALUE_LENGTH_VARINT_ERROR:
151     case SpdyFramerError::SPDY_HPACK_NAME_TOO_LONG:
152     case SpdyFramerError::SPDY_HPACK_VALUE_TOO_LONG:
153     case SpdyFramerError::SPDY_HPACK_NAME_HUFFMAN_ERROR:
154     case SpdyFramerError::SPDY_HPACK_VALUE_HUFFMAN_ERROR:
155     case SpdyFramerError::SPDY_HPACK_MISSING_DYNAMIC_TABLE_SIZE_UPDATE:
156     case SpdyFramerError::SPDY_HPACK_INVALID_INDEX:
157     case SpdyFramerError::SPDY_HPACK_INVALID_NAME_INDEX:
158     case SpdyFramerError::SPDY_HPACK_DYNAMIC_TABLE_SIZE_UPDATE_NOT_ALLOWED:
159     case SpdyFramerError::
160         SPDY_HPACK_INITIAL_DYNAMIC_TABLE_SIZE_UPDATE_IS_ABOVE_LOW_WATER_MARK:
161     case SpdyFramerError::
162         SPDY_HPACK_DYNAMIC_TABLE_SIZE_UPDATE_IS_ABOVE_ACKNOWLEDGED_SETTING:
163     case SpdyFramerError::SPDY_HPACK_TRUNCATED_BLOCK:
164     case SpdyFramerError::SPDY_HPACK_FRAGMENT_TOO_LONG:
165     case SpdyFramerError::SPDY_HPACK_COMPRESSED_HEADER_SIZE_EXCEEDS_LIMIT:
166       return Http2ErrorCode::COMPRESSION_ERROR;
167     case SpdyFramerError::SPDY_INTERNAL_FRAMER_ERROR:
168     case SpdyFramerError::SPDY_STOP_PROCESSING:
169     case SpdyFramerError::LAST_ERROR:
170       return Http2ErrorCode::INTERNAL_ERROR;
171   }
172   return Http2ErrorCode::INTERNAL_ERROR;
173 }
174 
IsResponse(HeaderType type)175 bool IsResponse(HeaderType type) {
176   return type == HeaderType::RESPONSE_100 || type == HeaderType::RESPONSE;
177 }
178 
StatusIs1xx(absl::string_view status)179 bool StatusIs1xx(absl::string_view status) {
180   return status.size() == 3 && status[0] == '1';
181 }
182 
183 // Returns the upper bound on HPACK encoder table capacity. If not specified in
184 // the Options, a reasonable default upper bound is used.
HpackCapacityBound(const OgHttp2Session::Options & o)185 uint32_t HpackCapacityBound(const OgHttp2Session::Options& o) {
186   return o.max_hpack_encoding_table_capacity.value_or(
187       kMaximumHpackTableCapacity);
188 }
189 
IsNonAckSettings(const spdy::SpdyFrameIR & frame)190 bool IsNonAckSettings(const spdy::SpdyFrameIR& frame) {
191   return frame.frame_type() == spdy::SpdyFrameType::SETTINGS &&
192          !reinterpret_cast<const SpdySettingsIR&>(frame).is_ack();
193 }
194 
195 }  // namespace
196 
PassthroughHeadersHandler(OgHttp2Session & session,Http2VisitorInterface & visitor)197 OgHttp2Session::PassthroughHeadersHandler::PassthroughHeadersHandler(
198     OgHttp2Session& session, Http2VisitorInterface& visitor)
199     : session_(session), visitor_(visitor) {
200   if (session_.options_.validate_http_headers) {
201     QUICHE_VLOG(2) << "instantiating regular header validator";
202     validator_ = std::make_unique<HeaderValidator>();
203     if (session_.options_.validate_path) {
204       validator_->SetValidatePath();
205     }
206     if (session_.options_.allow_fragment_in_path) {
207       validator_->SetAllowFragmentInPath();
208     }
209     if (session_.options_.allow_different_host_and_authority) {
210       validator_->SetAllowDifferentHostAndAuthority();
211     }
212   } else {
213     QUICHE_VLOG(2) << "instantiating noop header validator";
214     validator_ = std::make_unique<NoopHeaderValidator>();
215   }
216 }
217 
OnHeaderBlockStart()218 void OgHttp2Session::PassthroughHeadersHandler::OnHeaderBlockStart() {
219   Reset();
220   const bool status = visitor_.OnBeginHeadersForStream(stream_id_);
221   if (!status) {
222     QUICHE_VLOG(1)
223         << "Visitor rejected header block, returning HEADER_CONNECTION_ERROR";
224     SetResult(Http2VisitorInterface::HEADER_CONNECTION_ERROR);
225   }
226   validator_->StartHeaderBlock();
227 }
228 
InterpretHeaderStatus(HeaderValidator::HeaderStatus status)229 Http2VisitorInterface::OnHeaderResult InterpretHeaderStatus(
230     HeaderValidator::HeaderStatus status) {
231   switch (status) {
232     case HeaderValidator::HEADER_OK:
233     case HeaderValidator::HEADER_SKIP:
234       return Http2VisitorInterface::HEADER_OK;
235     case HeaderValidator::HEADER_FIELD_INVALID:
236       return Http2VisitorInterface::HEADER_FIELD_INVALID;
237     case HeaderValidator::HEADER_FIELD_TOO_LONG:
238       return Http2VisitorInterface::HEADER_RST_STREAM;
239   }
240   return Http2VisitorInterface::HEADER_CONNECTION_ERROR;
241 }
242 
OnHeader(absl::string_view key,absl::string_view value)243 void OgHttp2Session::PassthroughHeadersHandler::OnHeader(
244     absl::string_view key, absl::string_view value) {
245   if (error_encountered_) {
246     QUICHE_VLOG(2) << "Early return; status not HEADER_OK";
247     return;
248   }
249   const HeaderValidator::HeaderStatus validation_result =
250       validator_->ValidateSingleHeader(key, value);
251   if (validation_result == HeaderValidator::HEADER_SKIP) {
252     return;
253   }
254   if (validation_result != HeaderValidator::HEADER_OK) {
255     QUICHE_VLOG(2) << "Header validation failed with result "
256                    << static_cast<int>(validation_result);
257     SetResult(InterpretHeaderStatus(validation_result));
258     return;
259   }
260   const Http2VisitorInterface::OnHeaderResult result =
261       visitor_.OnHeaderForStream(stream_id_, key, value);
262   SetResult(result);
263 }
264 
OnHeaderBlockEnd(size_t,size_t)265 void OgHttp2Session::PassthroughHeadersHandler::OnHeaderBlockEnd(
266     size_t /* uncompressed_header_bytes */,
267     size_t /* compressed_header_bytes */) {
268   if (error_encountered_) {
269     // The error has already been handled.
270     return;
271   }
272   if (!validator_->FinishHeaderBlock(type_)) {
273     QUICHE_VLOG(1) << "FinishHeaderBlock returned false; returning "
274                    << "HEADER_HTTP_MESSAGING";
275     SetResult(Http2VisitorInterface::HEADER_HTTP_MESSAGING);
276     return;
277   }
278   if (frame_contains_fin_ && IsResponse(type_) &&
279       StatusIs1xx(status_header())) {
280     QUICHE_VLOG(1) << "Unexpected end of stream without final headers";
281     SetResult(Http2VisitorInterface::HEADER_HTTP_MESSAGING);
282     return;
283   }
284   const bool result = visitor_.OnEndHeadersForStream(stream_id_);
285   if (!result) {
286     session_.fatal_visitor_callback_failure_ = true;
287     session_.decoder_.StopProcessing();
288   }
289 }
290 
291 // TODO(diannahu): Add checks for request methods.
CanReceiveBody() const292 bool OgHttp2Session::PassthroughHeadersHandler::CanReceiveBody() const {
293   switch (header_type()) {
294     case HeaderType::REQUEST_TRAILER:
295     case HeaderType::RESPONSE_TRAILER:
296     case HeaderType::RESPONSE_100:
297       return false;
298     case HeaderType::RESPONSE:
299       // 304 responses should not have a body:
300       // https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.2
301       // Neither should 204 responses:
302       // https://httpwg.org/specs/rfc7231.html#rfc.section.6.3.5
303       return status_header() != "304" && status_header() != "204";
304     case HeaderType::REQUEST:
305       return true;
306   }
307   return true;
308 }
309 
SetResult(Http2VisitorInterface::OnHeaderResult result)310 void OgHttp2Session::PassthroughHeadersHandler::SetResult(
311     Http2VisitorInterface::OnHeaderResult result) {
312   if (result != Http2VisitorInterface::HEADER_OK) {
313     error_encountered_ = true;
314     session_.OnHeaderStatus(stream_id_, result);
315   }
316 }
317 
318 // A visitor that extracts an int64_t from each type of a ProcessBytesResult.
319 struct OgHttp2Session::ProcessBytesResultVisitor {
operator ()http2::adapter::OgHttp2Session::ProcessBytesResultVisitor320   int64_t operator()(const int64_t bytes) const { return bytes; }
321 
operator ()http2::adapter::OgHttp2Session::ProcessBytesResultVisitor322   int64_t operator()(const ProcessBytesError error) const {
323     switch (error) {
324       case ProcessBytesError::kUnspecified:
325         return -1;
326       case ProcessBytesError::kInvalidConnectionPreface:
327         return -903;  // NGHTTP2_ERR_BAD_CLIENT_MAGIC
328       case ProcessBytesError::kVisitorCallbackFailed:
329         return -902;  // NGHTTP2_ERR_CALLBACK_FAILURE
330     }
331     return -1;
332   }
333 };
334 
OgHttp2Session(Http2VisitorInterface & visitor,Options options)335 OgHttp2Session::OgHttp2Session(Http2VisitorInterface& visitor, Options options)
336     : visitor_(visitor),
337       options_(options),
338       event_forwarder_([this]() { return !latched_error_; }, *this),
339       receive_logger_(
340           &event_forwarder_, TracePerspectiveAsString(options.perspective),
__anond62579530302() 341           [logging_enabled = GetQuicheFlag(quiche_oghttp2_debug_trace)]() {
342             return logging_enabled;
343           },
344           this),
345       send_logger_(
346           TracePerspectiveAsString(options.perspective),
__anond62579530402() 347           [logging_enabled = GetQuicheFlag(quiche_oghttp2_debug_trace)]() {
348             return logging_enabled;
349           },
350           this),
351       headers_handler_(*this, visitor),
352       noop_headers_handler_(/*listener=*/nullptr),
353       connection_window_manager_(
354           kInitialFlowControlWindowSize,
__anond62579530502(size_t window_update_delta) 355           [this](size_t window_update_delta) {
356             SendWindowUpdate(kConnectionStreamId, window_update_delta);
357           },
358           options.should_window_update_fn,
359           /*update_window_on_notify=*/false),
360       max_outbound_concurrent_streams_(
361           options.remote_max_concurrent_streams.value_or(100u)) {
362   decoder_.set_visitor(&receive_logger_);
363   if (options_.max_header_list_bytes) {
364     // Limit buffering of encoded HPACK data to 2x the decoded limit.
365     decoder_.GetHpackDecoder().set_max_decode_buffer_size_bytes(
366         2 * *options_.max_header_list_bytes);
367     // Limit the total bytes accepted for HPACK decoding to 4x the limit.
368     decoder_.GetHpackDecoder().set_max_header_block_bytes(
369         4 * *options_.max_header_list_bytes);
370   }
371   if (IsServerSession()) {
372     remaining_preface_ = {spdy::kHttp2ConnectionHeaderPrefix,
373                           spdy::kHttp2ConnectionHeaderPrefixSize};
374   }
375   if (options_.max_header_field_size.has_value()) {
376     headers_handler_.SetMaxFieldSize(*options_.max_header_field_size);
377   }
378   headers_handler_.SetAllowObsText(options_.allow_obs_text);
379   if (!options_.crumble_cookies) {
380     // As seen in https://github.com/envoyproxy/envoy/issues/32611, some HTTP/2
381     // endpoints don't properly handle multiple `Cookie` header fields.
382     framer_.GetHpackEncoder()->DisableCookieCrumbling();
383   }
384 }
385 
~OgHttp2Session()386 OgHttp2Session::~OgHttp2Session() {}
387 
SetStreamUserData(Http2StreamId stream_id,void * user_data)388 void OgHttp2Session::SetStreamUserData(Http2StreamId stream_id,
389                                        void* user_data) {
390   auto it = stream_map_.find(stream_id);
391   if (it != stream_map_.end()) {
392     it->second.user_data = user_data;
393   }
394 }
395 
GetStreamUserData(Http2StreamId stream_id)396 void* OgHttp2Session::GetStreamUserData(Http2StreamId stream_id) {
397   auto it = stream_map_.find(stream_id);
398   if (it != stream_map_.end()) {
399     return it->second.user_data;
400   }
401   auto p = pending_streams_.find(stream_id);
402   if (p != pending_streams_.end()) {
403     return p->second.user_data;
404   }
405   return nullptr;
406 }
407 
ResumeStream(Http2StreamId stream_id)408 bool OgHttp2Session::ResumeStream(Http2StreamId stream_id) {
409   auto it = stream_map_.find(stream_id);
410   if (it == stream_map_.end() || !HasMoreData(it->second) ||
411       !write_scheduler_.StreamRegistered(stream_id)) {
412     return false;
413   }
414   it->second.data_deferred = false;
415   write_scheduler_.MarkStreamReady(stream_id, /*add_to_front=*/false);
416   return true;
417 }
418 
GetStreamSendWindowSize(Http2StreamId stream_id) const419 int OgHttp2Session::GetStreamSendWindowSize(Http2StreamId stream_id) const {
420   auto it = stream_map_.find(stream_id);
421   if (it != stream_map_.end()) {
422     return it->second.send_window;
423   }
424   return -1;
425 }
426 
GetStreamReceiveWindowLimit(Http2StreamId stream_id) const427 int OgHttp2Session::GetStreamReceiveWindowLimit(Http2StreamId stream_id) const {
428   auto it = stream_map_.find(stream_id);
429   if (it != stream_map_.end()) {
430     return it->second.window_manager.WindowSizeLimit();
431   }
432   return -1;
433 }
434 
GetStreamReceiveWindowSize(Http2StreamId stream_id) const435 int OgHttp2Session::GetStreamReceiveWindowSize(Http2StreamId stream_id) const {
436   auto it = stream_map_.find(stream_id);
437   if (it != stream_map_.end()) {
438     return it->second.window_manager.CurrentWindowSize();
439   }
440   return -1;
441 }
442 
GetReceiveWindowSize() const443 int OgHttp2Session::GetReceiveWindowSize() const {
444   return connection_window_manager_.CurrentWindowSize();
445 }
446 
GetHpackEncoderDynamicTableSize() const447 int OgHttp2Session::GetHpackEncoderDynamicTableSize() const {
448   const spdy::HpackEncoder* encoder = framer_.GetHpackEncoder();
449   return encoder == nullptr ? 0 : encoder->GetDynamicTableSize();
450 }
451 
GetHpackEncoderDynamicTableCapacity() const452 int OgHttp2Session::GetHpackEncoderDynamicTableCapacity() const {
453   const spdy::HpackEncoder* encoder = framer_.GetHpackEncoder();
454   return encoder == nullptr ? kDefaultHpackTableCapacity
455                             : encoder->CurrentHeaderTableSizeSetting();
456 }
457 
GetHpackDecoderDynamicTableSize() const458 int OgHttp2Session::GetHpackDecoderDynamicTableSize() const {
459   return decoder_.GetHpackDecoder().GetDynamicTableSize();
460 }
461 
GetHpackDecoderSizeLimit() const462 int OgHttp2Session::GetHpackDecoderSizeLimit() const {
463   return decoder_.GetHpackDecoder().GetCurrentHeaderTableSizeSetting();
464 }
465 
ProcessBytes(absl::string_view bytes)466 int64_t OgHttp2Session::ProcessBytes(absl::string_view bytes) {
467   QUICHE_VLOG(2) << TracePerspectiveAsString(options_.perspective)
468                  << " processing [" << absl::CEscape(bytes) << "]";
469   return absl::visit(ProcessBytesResultVisitor(), ProcessBytesImpl(bytes));
470 }
471 
472 absl::variant<int64_t, OgHttp2Session::ProcessBytesError>
ProcessBytesImpl(absl::string_view bytes)473 OgHttp2Session::ProcessBytesImpl(absl::string_view bytes) {
474   if (processing_bytes_) {
475     QUICHE_VLOG(1) << "Returning early; already processing bytes.";
476     return 0;
477   }
478   processing_bytes_ = true;
479   auto cleanup = absl::MakeCleanup([this]() { processing_bytes_ = false; });
480 
481   if (options_.blackhole_data_on_connection_error && latched_error_) {
482     return static_cast<int64_t>(bytes.size());
483   }
484 
485   int64_t preface_consumed = 0;
486   if (!remaining_preface_.empty()) {
487     QUICHE_VLOG(2) << "Preface bytes remaining: " << remaining_preface_.size();
488     // decoder_ does not understand the client connection preface.
489     size_t min_size = std::min(remaining_preface_.size(), bytes.size());
490     if (!absl::StartsWith(remaining_preface_, bytes.substr(0, min_size))) {
491       // Preface doesn't match!
492       QUICHE_DLOG(INFO) << "Preface doesn't match! Expected: ["
493                         << absl::CEscape(remaining_preface_) << "], actual: ["
494                         << absl::CEscape(bytes) << "]";
495       LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
496                           ConnectionError::kInvalidConnectionPreface);
497       return ProcessBytesError::kInvalidConnectionPreface;
498     }
499     remaining_preface_.remove_prefix(min_size);
500     bytes.remove_prefix(min_size);
501     if (!remaining_preface_.empty()) {
502       QUICHE_VLOG(2) << "Preface bytes remaining: "
503                      << remaining_preface_.size();
504       return static_cast<int64_t>(min_size);
505     }
506     preface_consumed = min_size;
507   }
508   int64_t result = decoder_.ProcessInput(bytes.data(), bytes.size());
509   QUICHE_VLOG(2) << "ProcessBytes result: " << result;
510   if (fatal_visitor_callback_failure_) {
511     QUICHE_DCHECK(latched_error_);
512     QUICHE_VLOG(2) << "Visitor callback failed while processing bytes.";
513     return ProcessBytesError::kVisitorCallbackFailed;
514   }
515   if (latched_error_ || result < 0) {
516     QUICHE_VLOG(2) << "ProcessBytes encountered an error.";
517     if (options_.blackhole_data_on_connection_error) {
518       return static_cast<int64_t>(bytes.size() + preface_consumed);
519     } else {
520       return ProcessBytesError::kUnspecified;
521     }
522   }
523   return result + preface_consumed;
524 }
525 
Consume(Http2StreamId stream_id,size_t num_bytes)526 int OgHttp2Session::Consume(Http2StreamId stream_id, size_t num_bytes) {
527   auto it = stream_map_.find(stream_id);
528   if (it == stream_map_.end()) {
529     QUICHE_LOG(ERROR) << "Stream " << stream_id << " not found when consuming "
530                       << num_bytes << " bytes";
531   } else {
532     it->second.window_manager.MarkDataFlushed(num_bytes);
533   }
534   connection_window_manager_.MarkDataFlushed(num_bytes);
535   return 0;  // Remove?
536 }
537 
StartGracefulShutdown()538 void OgHttp2Session::StartGracefulShutdown() {
539   if (IsServerSession()) {
540     if (!queued_goaway_) {
541       EnqueueFrame(std::make_unique<spdy::SpdyGoAwayIR>(
542           std::numeric_limits<int32_t>::max(), spdy::ERROR_CODE_NO_ERROR,
543           "graceful_shutdown"));
544     }
545   } else {
546     QUICHE_LOG(ERROR) << "Graceful shutdown not needed for clients.";
547   }
548 }
549 
EnqueueFrame(std::unique_ptr<spdy::SpdyFrameIR> frame)550 void OgHttp2Session::EnqueueFrame(std::unique_ptr<spdy::SpdyFrameIR> frame) {
551   if (queued_immediate_goaway_) {
552     // Do not allow additional frames to be enqueued after the GOAWAY.
553     return;
554   }
555 
556   const bool is_non_ack_settings = IsNonAckSettings(*frame);
557   MaybeSetupPreface(is_non_ack_settings);
558 
559   if (frame->frame_type() == spdy::SpdyFrameType::GOAWAY) {
560     queued_goaway_ = true;
561     if (latched_error_) {
562       PrepareForImmediateGoAway();
563     }
564   } else if (frame->fin() ||
565              frame->frame_type() == spdy::SpdyFrameType::RST_STREAM) {
566     auto iter = stream_map_.find(frame->stream_id());
567     if (iter != stream_map_.end()) {
568       iter->second.half_closed_local = true;
569     }
570     if (frame->frame_type() == spdy::SpdyFrameType::RST_STREAM) {
571       // TODO(diannahu): Condition on existence in the stream map?
572       streams_reset_.insert(frame->stream_id());
573     }
574   } else if (frame->frame_type() == spdy::SpdyFrameType::WINDOW_UPDATE) {
575     UpdateReceiveWindow(
576         frame->stream_id(),
577         reinterpret_cast<spdy::SpdyWindowUpdateIR&>(*frame).delta());
578   } else if (is_non_ack_settings) {
579     HandleOutboundSettings(
580         *reinterpret_cast<spdy::SpdySettingsIR*>(frame.get()));
581   }
582   if (frame->stream_id() != 0) {
583     auto result = queued_frames_.insert({frame->stream_id(), 1});
584     if (!result.second) {
585       ++(result.first->second);
586     }
587   }
588   frames_.push_back(std::move(frame));
589 }
590 
Send()591 int OgHttp2Session::Send() {
592   if (sending_) {
593     QUICHE_VLOG(1) << TracePerspectiveAsString(options_.perspective)
594                    << " returning early; already sending.";
595     return 0;
596   }
597   sending_ = true;
598   auto cleanup = absl::MakeCleanup([this]() { sending_ = false; });
599 
600   if (fatal_send_error_) {
601     return kSendError;
602   }
603 
604   MaybeSetupPreface(/*sending_outbound_settings=*/false);
605 
606   SendResult continue_writing = SendQueuedFrames();
607   if (queued_immediate_goaway_) {
608     // If an immediate GOAWAY was queued, then the above flush either sent the
609     // GOAWAY or buffered it to be sent on the next successful flush. In either
610     // case, return early here to avoid sending other frames.
611     return InterpretSendResult(continue_writing);
612   }
613   // Notify on new/pending streams closed due to GOAWAY receipt.
614   CloseGoAwayRejectedStreams();
615   // Wake streams for writes.
616   while (continue_writing == SendResult::SEND_OK && HasReadyStream()) {
617     const Http2StreamId stream_id = GetNextReadyStream();
618     // TODO(birenroy): Add a return value to indicate write blockage, so streams
619     // aren't woken unnecessarily.
620     QUICHE_VLOG(1) << "Waking stream " << stream_id << " for writes.";
621     continue_writing = WriteForStream(stream_id);
622   }
623   if (continue_writing == SendResult::SEND_OK) {
624     continue_writing = SendQueuedFrames();
625   }
626   return InterpretSendResult(continue_writing);
627 }
628 
InterpretSendResult(SendResult result)629 int OgHttp2Session::InterpretSendResult(SendResult result) {
630   if (result == SendResult::SEND_ERROR) {
631     fatal_send_error_ = true;
632     return kSendError;
633   } else {
634     return 0;
635   }
636 }
637 
HasReadyStream() const638 bool OgHttp2Session::HasReadyStream() const {
639   return !trailers_ready_.empty() ||
640          (write_scheduler_.HasReadyStreams() && connection_send_window_ > 0);
641 }
642 
GetNextReadyStream()643 Http2StreamId OgHttp2Session::GetNextReadyStream() {
644   QUICHE_DCHECK(HasReadyStream());
645   if (!trailers_ready_.empty()) {
646     const Http2StreamId stream_id = *trailers_ready_.begin();
647     // WriteForStream() will re-mark the stream as ready, if necessary.
648     write_scheduler_.MarkStreamNotReady(stream_id);
649     trailers_ready_.erase(trailers_ready_.begin());
650     return stream_id;
651   }
652   return write_scheduler_.PopNextReadyStream();
653 }
654 
SubmitRequestInternal(absl::Span<const Header> headers,std::unique_ptr<DataFrameSource> data_source,void * user_data)655 int32_t OgHttp2Session::SubmitRequestInternal(
656     absl::Span<const Header> headers,
657     std::unique_ptr<DataFrameSource> data_source, void* user_data) {
658   // TODO(birenroy): return an error for the incorrect perspective
659   const Http2StreamId stream_id = next_stream_id_;
660   next_stream_id_ += 2;
661   if (!pending_streams_.empty() || !CanCreateStream()) {
662     // TODO(diannahu): There should probably be a limit to the number of allowed
663     // pending streams.
664     pending_streams_.insert(
665         {stream_id, PendingStreamState{ToHeaderBlock(headers),
666                                        std::move(data_source), user_data}});
667     StartPendingStreams();
668   } else {
669     StartRequest(stream_id, ToHeaderBlock(headers), std::move(data_source),
670                  user_data);
671   }
672   return stream_id;
673 }
674 
SubmitResponseInternal(Http2StreamId stream_id,absl::Span<const Header> headers,std::unique_ptr<DataFrameSource> data_source)675 int OgHttp2Session::SubmitResponseInternal(
676     Http2StreamId stream_id, absl::Span<const Header> headers,
677     std::unique_ptr<DataFrameSource> data_source) {
678   // TODO(birenroy): return an error for the incorrect perspective
679   auto iter = stream_map_.find(stream_id);
680   if (iter == stream_map_.end()) {
681     QUICHE_LOG(ERROR) << "Unable to find stream " << stream_id;
682     return -501;  // NGHTTP2_ERR_INVALID_ARGUMENT
683   }
684   const bool end_stream = data_source == nullptr;
685   if (!end_stream) {
686     // Add data source to stream state
687     iter->second.outbound_body = std::move(data_source);
688     write_scheduler_.MarkStreamReady(stream_id, false);
689   }
690   SendHeaders(stream_id, ToHeaderBlock(headers), end_stream);
691   return 0;
692 }
693 
MaybeSendBufferedData()694 OgHttp2Session::SendResult OgHttp2Session::MaybeSendBufferedData() {
695   int64_t result = std::numeric_limits<int64_t>::max();
696   while (result > 0 && !buffered_data_.empty()) {
697     result = visitor_.OnReadyToSend(buffered_data_);
698     if (result > 0) {
699       buffered_data_.erase(0, result);
700     }
701   }
702   if (result < 0) {
703     LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
704                         ConnectionError::kSendError);
705     return SendResult::SEND_ERROR;
706   }
707   return buffered_data_.empty() ? SendResult::SEND_OK
708                                 : SendResult::SEND_BLOCKED;
709 }
710 
SendQueuedFrames()711 OgHttp2Session::SendResult OgHttp2Session::SendQueuedFrames() {
712   // Flush any serialized prefix.
713   if (const SendResult result = MaybeSendBufferedData();
714       result != SendResult::SEND_OK) {
715     return result;
716   }
717   // Serialize and send frames in the queue.
718   while (!frames_.empty()) {
719     const auto& frame_ptr = frames_.front();
720     FrameAttributeCollector c;
721     frame_ptr->Visit(&c);
722 
723     // DATA frames should never be queued.
724     QUICHE_DCHECK_NE(c.frame_type(), 0);
725 
726     const bool stream_reset =
727         c.stream_id() != 0 && streams_reset_.count(c.stream_id()) > 0;
728     if (stream_reset &&
729         c.frame_type() != static_cast<uint8_t>(FrameType::RST_STREAM)) {
730       // The stream has been reset, so any other remaining frames can be
731       // skipped.
732       // TODO(birenroy): inform the visitor of frames that are skipped.
733       DecrementQueuedFrameCount(c.stream_id(), c.frame_type());
734       frames_.pop_front();
735       continue;
736     } else if (!IsServerSession() && received_goaway_ &&
737                c.stream_id() >
738                    static_cast<uint32_t>(received_goaway_stream_id_)) {
739       // This frame will be ignored by the server, so don't send it. The stream
740       // associated with this frame should have been closed in OnGoAway().
741       frames_.pop_front();
742       continue;
743     }
744     // Frames can't accurately report their own length; the actual serialized
745     // length must be used instead.
746     spdy::SpdySerializedFrame frame = framer_.SerializeFrame(*frame_ptr);
747     const size_t frame_payload_length = frame.size() - spdy::kFrameHeaderSize;
748     frame_ptr->Visit(&send_logger_);
749     visitor_.OnBeforeFrameSent(c.frame_type(), c.stream_id(),
750                                frame_payload_length, c.flags());
751     const int64_t result = visitor_.OnReadyToSend(absl::string_view(frame));
752     if (result < 0) {
753       LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
754                           ConnectionError::kSendError);
755       return SendResult::SEND_ERROR;
756     } else if (result == 0) {
757       // Write blocked.
758       return SendResult::SEND_BLOCKED;
759     } else {
760       frames_.pop_front();
761 
762       const bool ok =
763           AfterFrameSent(c.frame_type(), c.stream_id(), frame_payload_length,
764                          c.flags(), c.error_code());
765       if (!ok) {
766         LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
767                             ConnectionError::kSendError);
768         return SendResult::SEND_ERROR;
769       }
770       if (static_cast<size_t>(result) < frame.size()) {
771         // The frame was partially written, so the rest must be buffered.
772         buffered_data_.append(frame.data() + result, frame.size() - result);
773         return SendResult::SEND_BLOCKED;
774       }
775     }
776   }
777   return SendResult::SEND_OK;
778 }
779 
AfterFrameSent(uint8_t frame_type_int,uint32_t stream_id,size_t payload_length,uint8_t flags,uint32_t error_code)780 bool OgHttp2Session::AfterFrameSent(uint8_t frame_type_int, uint32_t stream_id,
781                                     size_t payload_length, uint8_t flags,
782                                     uint32_t error_code) {
783   const FrameType frame_type = static_cast<FrameType>(frame_type_int);
784   int result = visitor_.OnFrameSent(frame_type_int, stream_id, payload_length,
785                                     flags, error_code);
786   if (result < 0) {
787     return false;
788   }
789   if (stream_id == 0) {
790     if (frame_type == FrameType::SETTINGS) {
791       const bool is_settings_ack = (flags & ACK_FLAG);
792       if (is_settings_ack && encoder_header_table_capacity_when_acking_) {
793         framer_.UpdateHeaderEncoderTableSize(
794             *encoder_header_table_capacity_when_acking_);
795         encoder_header_table_capacity_when_acking_ = std::nullopt;
796       } else if (!is_settings_ack) {
797         sent_non_ack_settings_ = true;
798       }
799     }
800     return true;
801   }
802 
803   const bool contains_fin =
804       (frame_type == FrameType::DATA || frame_type == FrameType::HEADERS) &&
805       (flags & END_STREAM_FLAG) == END_STREAM_FLAG;
806   auto it = stream_map_.find(stream_id);
807   const bool still_open_remote =
808       it != stream_map_.end() && !it->second.half_closed_remote;
809   if (contains_fin && still_open_remote &&
810       options_.rst_stream_no_error_when_incomplete && IsServerSession()) {
811     // Since the peer has not yet ended the stream, this endpoint should
812     // send a RST_STREAM NO_ERROR. See RFC 7540 Section 8.1.
813     frames_.push_front(std::make_unique<spdy::SpdyRstStreamIR>(
814         stream_id, spdy::SpdyErrorCode::ERROR_CODE_NO_ERROR));
815     auto queued_result = queued_frames_.insert({stream_id, 1});
816     if (!queued_result.second) {
817       ++(queued_result.first->second);
818     }
819     it->second.half_closed_remote = true;
820   }
821 
822   DecrementQueuedFrameCount(stream_id, frame_type_int);
823   return true;
824 }
825 
WriteForStream(Http2StreamId stream_id)826 OgHttp2Session::SendResult OgHttp2Session::WriteForStream(
827     Http2StreamId stream_id) {
828   auto it = stream_map_.find(stream_id);
829   if (it == stream_map_.end()) {
830     QUICHE_LOG(ERROR) << "Can't find stream " << stream_id
831                       << " which is ready to write!";
832     return SendResult::SEND_OK;
833   }
834   StreamState& state = it->second;
835   auto reset_it = streams_reset_.find(stream_id);
836   if (reset_it != streams_reset_.end()) {
837     // The stream has been reset; there's no point in sending DATA or trailing
838     // HEADERS.
839     AbandonData(state);
840     state.trailers = nullptr;
841     return SendResult::SEND_OK;
842   }
843 
844   SendResult connection_can_write = SendResult::SEND_OK;
845   if (!IsReadyToWriteData(state)) {
846     // No data to send, but there might be trailers.
847     if (state.trailers != nullptr) {
848       // Trailers will include END_STREAM, so the data source can be discarded.
849       // Since data_deferred is true, there is no data waiting to be flushed for
850       // this stream.
851       AbandonData(state);
852       auto block_ptr = std::move(state.trailers);
853       if (state.half_closed_local) {
854         QUICHE_LOG(ERROR) << "Sent fin; can't send trailers.";
855 
856         // TODO(birenroy,diannahu): Consider queuing a RST_STREAM INTERNAL_ERROR
857         // instead.
858         CloseStream(stream_id, Http2ErrorCode::INTERNAL_ERROR);
859       } else {
860         SendTrailers(stream_id, std::move(*block_ptr));
861       }
862     }
863     return SendResult::SEND_OK;
864   }
865   int32_t available_window =
866       std::min({connection_send_window_, state.send_window,
867                 static_cast<int32_t>(max_frame_payload_)});
868   while (connection_can_write == SendResult::SEND_OK && available_window > 0 &&
869          IsReadyToWriteData(state)) {
870     DataFrameInfo info = GetDataFrameInfo(stream_id, available_window, state);
871     QUICHE_VLOG(2) << "WriteForStream | length: " << info.payload_length
872                    << " end_data: " << info.end_data
873                    << " trailers: " << state.trailers.get();
874     if (info.payload_length == 0 && !info.end_data &&
875         state.trailers == nullptr) {
876       // An unproductive call to SelectPayloadLength() results in this stream
877       // entering the "deferred" state only if no trailers are available to
878       // send.
879       state.data_deferred = true;
880       break;
881     } else if (info.payload_length == DataFrameSource::kError) {
882       // TODO(birenroy,diannahu): Consider queuing a RST_STREAM INTERNAL_ERROR
883       // instead.
884       CloseStream(stream_id, Http2ErrorCode::INTERNAL_ERROR);
885       // No more work on the stream; it has been closed.
886       break;
887     }
888     if (info.payload_length > 0 || info.send_fin) {
889       spdy::SpdyDataIR data(stream_id);
890       data.set_fin(info.send_fin);
891       data.SetDataShallow(info.payload_length);
892       spdy::SpdySerializedFrame header =
893           spdy::SpdyFramer::SerializeDataFrameHeaderWithPaddingLengthField(
894               data);
895       QUICHE_DCHECK(buffered_data_.empty() && frames_.empty());
896       data.Visit(&send_logger_);
897       const bool success = SendDataFrame(stream_id, absl::string_view(header),
898                                          info.payload_length, state);
899       if (!success) {
900         connection_can_write = SendResult::SEND_BLOCKED;
901         break;
902       }
903       connection_send_window_ -= info.payload_length;
904       state.send_window -= info.payload_length;
905       available_window = std::min({connection_send_window_, state.send_window,
906                                    static_cast<int32_t>(max_frame_payload_)});
907       if (info.send_fin) {
908         state.half_closed_local = true;
909         MaybeFinWithRstStream(it);
910       }
911       const bool ok =
912           AfterFrameSent(/* DATA */ 0, stream_id, info.payload_length,
913                          info.send_fin ? END_STREAM_FLAG : 0x0, 0);
914       if (!ok) {
915         LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
916                             ConnectionError::kSendError);
917         return SendResult::SEND_ERROR;
918       }
919       if (!stream_map_.contains(stream_id)) {
920         // Note: the stream may have been closed if `fin` is true.
921         break;
922       }
923     }
924     if (info.end_data ||
925         (info.payload_length == 0 && state.trailers != nullptr)) {
926       // If SelectPayloadLength() returned {0, false}, and there are trailers to
927       // send, it's okay to send the trailers.
928       if (state.trailers != nullptr) {
929         auto block_ptr = std::move(state.trailers);
930         if (info.send_fin) {
931           QUICHE_LOG(ERROR) << "Sent fin; can't send trailers.";
932 
933           // TODO(birenroy,diannahu): Consider queuing a RST_STREAM
934           // INTERNAL_ERROR instead.
935           CloseStream(stream_id, Http2ErrorCode::INTERNAL_ERROR);
936           // No more work on this stream; it has been closed.
937           break;
938         } else {
939           SendTrailers(stream_id, std::move(*block_ptr));
940         }
941       }
942       AbandonData(state);
943     }
944   }
945   // If the stream still exists and has data to send, it should be marked as
946   // ready in the write scheduler.
947   if (stream_map_.contains(stream_id) && !state.data_deferred &&
948       state.send_window > 0 && HasMoreData(state)) {
949     write_scheduler_.MarkStreamReady(stream_id, false);
950   }
951   // Streams can continue writing as long as the connection is not write-blocked
952   // and there is additional flow control quota available.
953   if (connection_can_write != SendResult::SEND_OK) {
954     return connection_can_write;
955   }
956   return connection_send_window_ <= 0 ? SendResult::SEND_BLOCKED
957                                       : SendResult::SEND_OK;
958 }
959 
SerializeMetadata(Http2StreamId stream_id,std::unique_ptr<MetadataSource> source)960 void OgHttp2Session::SerializeMetadata(Http2StreamId stream_id,
961                                        std::unique_ptr<MetadataSource> source) {
962   const uint32_t max_payload_size =
963       std::min(kMaxAllowedMetadataFrameSize, max_frame_payload_);
964   auto payload_buffer = std::make_unique<uint8_t[]>(max_payload_size);
965 
966   while (true) {
967     auto [written, end_metadata] =
968         source->Pack(payload_buffer.get(), max_payload_size);
969     if (written < 0) {
970       // Unable to pack any metadata.
971       return;
972     }
973     QUICHE_DCHECK_LE(static_cast<size_t>(written), max_payload_size);
974     auto payload = absl::string_view(
975         reinterpret_cast<const char*>(payload_buffer.get()), written);
976     EnqueueFrame(std::make_unique<spdy::SpdyUnknownIR>(
977         stream_id, kMetadataFrameType, end_metadata ? kMetadataEndFlag : 0u,
978         std::string(payload)));
979     if (end_metadata) {
980       return;
981     }
982   }
983 }
984 
SubmitRequest(absl::Span<const Header> headers,std::unique_ptr<DataFrameSource> data_source,void * user_data)985 int32_t OgHttp2Session::SubmitRequest(
986     absl::Span<const Header> headers,
987     std::unique_ptr<DataFrameSource> data_source, void* user_data) {
988   return SubmitRequestInternal(headers, std::move(data_source), user_data);
989 }
990 
SubmitResponse(Http2StreamId stream_id,absl::Span<const Header> headers,std::unique_ptr<DataFrameSource> data_source)991 int OgHttp2Session::SubmitResponse(
992     Http2StreamId stream_id, absl::Span<const Header> headers,
993     std::unique_ptr<DataFrameSource> data_source) {
994   return SubmitResponseInternal(stream_id, headers, std::move(data_source));
995 }
996 
SubmitTrailer(Http2StreamId stream_id,absl::Span<const Header> trailers)997 int OgHttp2Session::SubmitTrailer(Http2StreamId stream_id,
998                                   absl::Span<const Header> trailers) {
999   // TODO(birenroy): Reject trailers when acting as a client?
1000   auto iter = stream_map_.find(stream_id);
1001   if (iter == stream_map_.end()) {
1002     QUICHE_LOG(ERROR) << "Unable to find stream " << stream_id;
1003     return -501;  // NGHTTP2_ERR_INVALID_ARGUMENT
1004   }
1005   StreamState& state = iter->second;
1006   if (state.half_closed_local) {
1007     QUICHE_LOG(ERROR) << "Stream " << stream_id << " is half closed (local)";
1008     return -514;  // NGHTTP2_ERR_INVALID_STREAM_STATE
1009   }
1010   if (state.trailers != nullptr) {
1011     QUICHE_LOG(ERROR) << "Stream " << stream_id
1012                       << " already has trailers queued";
1013     return -514;  // NGHTTP2_ERR_INVALID_STREAM_STATE
1014   }
1015   if (!HasMoreData(state)) {
1016     // Enqueue trailers immediately.
1017     SendTrailers(stream_id, ToHeaderBlock(trailers));
1018   } else {
1019     QUICHE_LOG_IF(ERROR, state.outbound_body->send_fin())
1020         << "DataFrameSource will send fin, preventing trailers!";
1021     // Save trailers so they can be written once data is done.
1022     state.trailers =
1023         std::make_unique<spdy::Http2HeaderBlock>(ToHeaderBlock(trailers));
1024     trailers_ready_.insert(stream_id);
1025   }
1026   return 0;
1027 }
1028 
SubmitMetadata(Http2StreamId stream_id,std::unique_ptr<MetadataSource> source)1029 void OgHttp2Session::SubmitMetadata(Http2StreamId stream_id,
1030                                     std::unique_ptr<MetadataSource> source) {
1031   SerializeMetadata(stream_id, std::move(source));
1032 }
1033 
SubmitSettings(absl::Span<const Http2Setting> settings)1034 void OgHttp2Session::SubmitSettings(absl::Span<const Http2Setting> settings) {
1035   auto frame = PrepareSettingsFrame(settings);
1036   EnqueueFrame(std::move(frame));
1037 }
1038 
OnError(SpdyFramerError error,std::string detailed_error)1039 void OgHttp2Session::OnError(SpdyFramerError error,
1040                              std::string detailed_error) {
1041   QUICHE_VLOG(1) << "Error: "
1042                  << http2::Http2DecoderAdapter::SpdyFramerErrorToString(error)
1043                  << " details: " << detailed_error;
1044   // TODO(diannahu): Consider propagating `detailed_error`.
1045   LatchErrorAndNotify(GetHttp2ErrorCode(error), ConnectionError::kParseError);
1046 }
1047 
OnCommonHeader(spdy::SpdyStreamId stream_id,size_t length,uint8_t type,uint8_t flags)1048 void OgHttp2Session::OnCommonHeader(spdy::SpdyStreamId stream_id, size_t length,
1049                                     uint8_t type, uint8_t flags) {
1050   current_frame_type_ = type;
1051   highest_received_stream_id_ = std::max(static_cast<Http2StreamId>(stream_id),
1052                                          highest_received_stream_id_);
1053   if (streams_reset_.contains(stream_id)) {
1054     return;
1055   }
1056   const bool result = visitor_.OnFrameHeader(stream_id, length, type, flags);
1057   if (!result) {
1058     fatal_visitor_callback_failure_ = true;
1059     decoder_.StopProcessing();
1060   }
1061 }
1062 
OnDataFrameHeader(spdy::SpdyStreamId stream_id,size_t length,bool)1063 void OgHttp2Session::OnDataFrameHeader(spdy::SpdyStreamId stream_id,
1064                                        size_t length, bool /*fin*/) {
1065   auto iter = stream_map_.find(stream_id);
1066   if (iter == stream_map_.end() || streams_reset_.contains(stream_id)) {
1067     // The stream does not exist; it could be an error or a benign close, e.g.,
1068     // getting data for a stream this connection recently closed.
1069     if (static_cast<Http2StreamId>(stream_id) > highest_processed_stream_id_) {
1070       // Receiving DATA before HEADERS is a connection error.
1071       LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1072                           ConnectionError::kWrongFrameSequence);
1073     }
1074     return;
1075   }
1076 
1077   if (static_cast<int64_t>(length) >
1078       connection_window_manager_.CurrentWindowSize()) {
1079     // Peer exceeded the connection flow control limit.
1080     LatchErrorAndNotify(
1081         Http2ErrorCode::FLOW_CONTROL_ERROR,
1082         Http2VisitorInterface::ConnectionError::kFlowControlError);
1083     return;
1084   }
1085 
1086   if (static_cast<int64_t>(length) >
1087       iter->second.window_manager.CurrentWindowSize()) {
1088     // Peer exceeded the stream flow control limit.
1089     EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1090         stream_id, spdy::ERROR_CODE_FLOW_CONTROL_ERROR));
1091     return;
1092   }
1093 
1094   const bool result = visitor_.OnBeginDataForStream(stream_id, length);
1095   if (!result) {
1096     fatal_visitor_callback_failure_ = true;
1097     decoder_.StopProcessing();
1098   }
1099 
1100   if (!iter->second.can_receive_body && length > 0) {
1101     EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1102         stream_id, spdy::ERROR_CODE_PROTOCOL_ERROR));
1103     return;
1104   }
1105 
1106   // Validate against the content-length if it exists.
1107   if (iter->second.remaining_content_length.has_value()) {
1108     if (length > *iter->second.remaining_content_length) {
1109       HandleContentLengthError(stream_id);
1110       iter->second.remaining_content_length.reset();
1111     } else {
1112       *iter->second.remaining_content_length -= length;
1113     }
1114   }
1115 }
1116 
OnStreamFrameData(spdy::SpdyStreamId stream_id,const char * data,size_t len)1117 void OgHttp2Session::OnStreamFrameData(spdy::SpdyStreamId stream_id,
1118                                        const char* data, size_t len) {
1119   // Count the data against flow control, even if the stream is unknown.
1120   MarkDataBuffered(stream_id, len);
1121 
1122   if (!stream_map_.contains(stream_id) || streams_reset_.contains(stream_id)) {
1123     // If the stream was unknown due to a protocol error, the visitor was
1124     // informed in OnDataFrameHeader().
1125     return;
1126   }
1127 
1128   const bool result =
1129       visitor_.OnDataForStream(stream_id, absl::string_view(data, len));
1130   if (!result) {
1131     fatal_visitor_callback_failure_ = true;
1132     decoder_.StopProcessing();
1133   }
1134 }
1135 
OnStreamEnd(spdy::SpdyStreamId stream_id)1136 void OgHttp2Session::OnStreamEnd(spdy::SpdyStreamId stream_id) {
1137   auto iter = stream_map_.find(stream_id);
1138   if (iter != stream_map_.end()) {
1139     iter->second.half_closed_remote = true;
1140     if (streams_reset_.contains(stream_id)) {
1141       return;
1142     }
1143 
1144     // Validate against the content-length if it exists.
1145     if (iter->second.remaining_content_length.has_value() &&
1146         *iter->second.remaining_content_length != 0) {
1147       HandleContentLengthError(stream_id);
1148       return;
1149     }
1150 
1151     const bool result = visitor_.OnEndStream(stream_id);
1152     if (!result) {
1153       fatal_visitor_callback_failure_ = true;
1154       decoder_.StopProcessing();
1155     }
1156   }
1157 
1158   auto queued_frames_iter = queued_frames_.find(stream_id);
1159   const bool no_queued_frames = queued_frames_iter == queued_frames_.end() ||
1160                                 queued_frames_iter->second == 0;
1161   if (iter != stream_map_.end() && iter->second.half_closed_local &&
1162       !IsServerSession() && no_queued_frames) {
1163     // From the client's perspective, the stream can be closed if it's already
1164     // half_closed_local.
1165     CloseStream(stream_id, Http2ErrorCode::HTTP2_NO_ERROR);
1166   }
1167 }
1168 
OnStreamPadLength(spdy::SpdyStreamId stream_id,size_t value)1169 void OgHttp2Session::OnStreamPadLength(spdy::SpdyStreamId stream_id,
1170                                        size_t value) {
1171   const size_t padding_length = 1 + value;
1172   const bool result = visitor_.OnDataPaddingLength(stream_id, padding_length);
1173   if (!result) {
1174     fatal_visitor_callback_failure_ = true;
1175     decoder_.StopProcessing();
1176   }
1177   connection_window_manager_.MarkWindowConsumed(padding_length);
1178   if (auto it = stream_map_.find(stream_id); it != stream_map_.end()) {
1179     it->second.window_manager.MarkWindowConsumed(padding_length);
1180   }
1181 }
1182 
OnStreamPadding(spdy::SpdyStreamId,size_t)1183 void OgHttp2Session::OnStreamPadding(spdy::SpdyStreamId /*stream_id*/, size_t
1184                                      /*len*/) {
1185   // Flow control was accounted for in OnStreamPadLength().
1186   // TODO(181586191): Pass padding to the visitor?
1187 }
1188 
OnHeaderFrameStart(spdy::SpdyStreamId stream_id)1189 spdy::SpdyHeadersHandlerInterface* OgHttp2Session::OnHeaderFrameStart(
1190     spdy::SpdyStreamId stream_id) {
1191   auto it = stream_map_.find(stream_id);
1192   if (it != stream_map_.end() && !streams_reset_.contains(stream_id)) {
1193     headers_handler_.set_stream_id(stream_id);
1194     headers_handler_.set_header_type(
1195         NextHeaderType(it->second.received_header_type));
1196     return &headers_handler_;
1197   } else {
1198     return &noop_headers_handler_;
1199   }
1200 }
1201 
OnHeaderFrameEnd(spdy::SpdyStreamId stream_id)1202 void OgHttp2Session::OnHeaderFrameEnd(spdy::SpdyStreamId stream_id) {
1203   auto it = stream_map_.find(stream_id);
1204   if (it != stream_map_.end()) {
1205     if (headers_handler_.header_type() == HeaderType::RESPONSE &&
1206         !headers_handler_.status_header().empty() &&
1207         headers_handler_.status_header()[0] == '1') {
1208       // If response headers carried a 1xx response code, final response headers
1209       // should still be forthcoming.
1210       headers_handler_.set_header_type(HeaderType::RESPONSE_100);
1211     }
1212     it->second.received_header_type = headers_handler_.header_type();
1213 
1214     // Track the content-length if the headers indicate that a body can follow.
1215     it->second.can_receive_body =
1216         headers_handler_.CanReceiveBody() && !it->second.sent_head_method;
1217     if (it->second.can_receive_body) {
1218       it->second.remaining_content_length = headers_handler_.content_length();
1219     }
1220 
1221     headers_handler_.set_stream_id(0);
1222   }
1223 }
1224 
OnRstStream(spdy::SpdyStreamId stream_id,spdy::SpdyErrorCode error_code)1225 void OgHttp2Session::OnRstStream(spdy::SpdyStreamId stream_id,
1226                                  spdy::SpdyErrorCode error_code) {
1227   auto iter = stream_map_.find(stream_id);
1228   if (iter != stream_map_.end()) {
1229     iter->second.half_closed_remote = true;
1230     AbandonData(iter->second);
1231   } else if (static_cast<Http2StreamId>(stream_id) >
1232              highest_processed_stream_id_) {
1233     // Receiving RST_STREAM before HEADERS is a connection error.
1234     LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1235                         ConnectionError::kWrongFrameSequence);
1236     return;
1237   }
1238   if (streams_reset_.contains(stream_id)) {
1239     return;
1240   }
1241   visitor_.OnRstStream(stream_id, TranslateErrorCode(error_code));
1242   // TODO(birenroy): Consider whether there are outbound frames queued for the
1243   // stream.
1244   CloseStream(stream_id, TranslateErrorCode(error_code));
1245 }
1246 
OnSettings()1247 void OgHttp2Session::OnSettings() {
1248   visitor_.OnSettingsStart();
1249   auto settings = std::make_unique<SpdySettingsIR>();
1250   settings->set_is_ack(true);
1251   EnqueueFrame(std::move(settings));
1252 }
1253 
OnSetting(spdy::SpdySettingsId id,uint32_t value)1254 void OgHttp2Session::OnSetting(spdy::SpdySettingsId id, uint32_t value) {
1255   switch (id) {
1256     case HEADER_TABLE_SIZE:
1257       value = std::min(value, HpackCapacityBound(options_));
1258       if (value < framer_.GetHpackEncoder()->CurrentHeaderTableSizeSetting()) {
1259         // Safe to apply a smaller table capacity immediately.
1260         QUICHE_VLOG(2) << TracePerspectiveAsString(options_.perspective)
1261                        << " applying encoder table capacity " << value;
1262         framer_.GetHpackEncoder()->ApplyHeaderTableSizeSetting(value);
1263       } else {
1264         QUICHE_VLOG(2)
1265             << TracePerspectiveAsString(options_.perspective)
1266             << " NOT applying encoder table capacity until writing ack: "
1267             << value;
1268         encoder_header_table_capacity_when_acking_ = value;
1269       }
1270       break;
1271     case ENABLE_PUSH:
1272       if (value > 1u) {
1273         visitor_.OnInvalidFrame(
1274             0, Http2VisitorInterface::InvalidFrameError::kProtocol);
1275         // The specification says this is a connection-level protocol error.
1276         LatchErrorAndNotify(
1277             Http2ErrorCode::PROTOCOL_ERROR,
1278             Http2VisitorInterface::ConnectionError::kInvalidSetting);
1279         return;
1280       }
1281       // Aside from validation, this setting is ignored.
1282       break;
1283     case MAX_CONCURRENT_STREAMS:
1284       max_outbound_concurrent_streams_ = value;
1285       if (!IsServerSession()) {
1286         // We may now be able to start pending streams.
1287         StartPendingStreams();
1288       }
1289       break;
1290     case INITIAL_WINDOW_SIZE:
1291       if (value > spdy::kSpdyMaximumWindowSize) {
1292         visitor_.OnInvalidFrame(
1293             0, Http2VisitorInterface::InvalidFrameError::kFlowControl);
1294         // The specification says this is a connection-level flow control error.
1295         LatchErrorAndNotify(
1296             Http2ErrorCode::FLOW_CONTROL_ERROR,
1297             Http2VisitorInterface::ConnectionError::kFlowControlError);
1298         return;
1299       } else {
1300         UpdateStreamSendWindowSizes(value);
1301       }
1302       break;
1303     case MAX_FRAME_SIZE:
1304       if (value < kDefaultFramePayloadSizeLimit ||
1305           value > kMaximumFramePayloadSizeLimit) {
1306         visitor_.OnInvalidFrame(
1307             0, Http2VisitorInterface::InvalidFrameError::kProtocol);
1308         // The specification says this is a connection-level protocol error.
1309         LatchErrorAndNotify(
1310             Http2ErrorCode::PROTOCOL_ERROR,
1311             Http2VisitorInterface::ConnectionError::kInvalidSetting);
1312         return;
1313       }
1314       max_frame_payload_ = value;
1315       break;
1316     case ENABLE_CONNECT_PROTOCOL:
1317       if (value > 1u || (value == 0 && peer_enables_connect_protocol_)) {
1318         visitor_.OnInvalidFrame(
1319             0, Http2VisitorInterface::InvalidFrameError::kProtocol);
1320         LatchErrorAndNotify(
1321             Http2ErrorCode::PROTOCOL_ERROR,
1322             Http2VisitorInterface::ConnectionError::kInvalidSetting);
1323         return;
1324       }
1325       peer_enables_connect_protocol_ = (value == 1u);
1326       break;
1327     default:
1328       // TODO(bnc): See if C++17 inline constants are allowed in QUICHE.
1329       if (id == kMetadataExtensionId) {
1330         peer_supports_metadata_ = (value != 0);
1331       } else {
1332         QUICHE_VLOG(1) << "Unimplemented SETTING id: " << id;
1333       }
1334   }
1335   visitor_.OnSetting({id, value});
1336 }
1337 
OnSettingsEnd()1338 void OgHttp2Session::OnSettingsEnd() { visitor_.OnSettingsEnd(); }
1339 
OnSettingsAck()1340 void OgHttp2Session::OnSettingsAck() {
1341   if (!settings_ack_callbacks_.empty()) {
1342     SettingsAckCallback callback = std::move(settings_ack_callbacks_.front());
1343     settings_ack_callbacks_.pop_front();
1344     std::move(callback)();
1345   }
1346 
1347   visitor_.OnSettingsAck();
1348 }
1349 
OnPing(spdy::SpdyPingId unique_id,bool is_ack)1350 void OgHttp2Session::OnPing(spdy::SpdyPingId unique_id, bool is_ack) {
1351   visitor_.OnPing(unique_id, is_ack);
1352   if (options_.auto_ping_ack && !is_ack) {
1353     auto ping = std::make_unique<spdy::SpdyPingIR>(unique_id);
1354     ping->set_is_ack(true);
1355     EnqueueFrame(std::move(ping));
1356   }
1357 }
1358 
OnGoAway(spdy::SpdyStreamId last_accepted_stream_id,spdy::SpdyErrorCode error_code)1359 void OgHttp2Session::OnGoAway(spdy::SpdyStreamId last_accepted_stream_id,
1360                               spdy::SpdyErrorCode error_code) {
1361   if (received_goaway_ &&
1362       last_accepted_stream_id >
1363           static_cast<spdy::SpdyStreamId>(received_goaway_stream_id_)) {
1364     // This GOAWAY has a higher `last_accepted_stream_id` than a previous
1365     // GOAWAY, a connection-level spec violation.
1366     const bool ok = visitor_.OnInvalidFrame(
1367         kConnectionStreamId,
1368         Http2VisitorInterface::InvalidFrameError::kProtocol);
1369     if (!ok) {
1370       fatal_visitor_callback_failure_ = true;
1371     }
1372     LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1373                         ConnectionError::kInvalidGoAwayLastStreamId);
1374     return;
1375   }
1376 
1377   received_goaway_ = true;
1378   received_goaway_stream_id_ = last_accepted_stream_id;
1379   const bool result = visitor_.OnGoAway(last_accepted_stream_id,
1380                                         TranslateErrorCode(error_code), "");
1381   if (!result) {
1382     fatal_visitor_callback_failure_ = true;
1383     decoder_.StopProcessing();
1384   }
1385 
1386   // Close the streams above `last_accepted_stream_id`. Only applies if the
1387   // session receives a GOAWAY as a client, as we do not support server push.
1388   if (last_accepted_stream_id == spdy::kMaxStreamId || IsServerSession()) {
1389     return;
1390   }
1391   std::vector<Http2StreamId> streams_to_close;
1392   for (const auto& [stream_id, stream_state] : stream_map_) {
1393     if (static_cast<spdy::SpdyStreamId>(stream_id) > last_accepted_stream_id) {
1394       streams_to_close.push_back(stream_id);
1395     }
1396   }
1397   for (Http2StreamId stream_id : streams_to_close) {
1398     CloseStream(stream_id, Http2ErrorCode::REFUSED_STREAM);
1399   }
1400 }
1401 
OnGoAwayFrameData(const char *,size_t)1402 bool OgHttp2Session::OnGoAwayFrameData(const char* /*goaway_data*/, size_t
1403                                        /*len*/) {
1404   // Opaque data is currently ignored.
1405   return true;
1406 }
1407 
OnHeaders(spdy::SpdyStreamId stream_id,size_t,bool,int,spdy::SpdyStreamId,bool,bool fin,bool)1408 void OgHttp2Session::OnHeaders(spdy::SpdyStreamId stream_id,
1409                                size_t /*payload_length*/, bool /*has_priority*/,
1410                                int /*weight*/,
1411                                spdy::SpdyStreamId /*parent_stream_id*/,
1412                                bool /*exclusive*/, bool fin, bool /*end*/) {
1413   if (stream_id % 2 == 0) {
1414     // Server push is disabled; receiving push HEADERS is a connection error.
1415     LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1416                         ConnectionError::kInvalidNewStreamId);
1417     return;
1418   }
1419   headers_handler_.set_frame_contains_fin(fin);
1420   if (IsServerSession()) {
1421     const auto new_stream_id = static_cast<Http2StreamId>(stream_id);
1422     if (stream_map_.find(new_stream_id) != stream_map_.end() && fin) {
1423       // Not a new stream, must be trailers.
1424       return;
1425     }
1426     if (new_stream_id <= highest_processed_stream_id_) {
1427       // A new stream ID lower than the watermark is a connection error.
1428       LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1429                           ConnectionError::kInvalidNewStreamId);
1430       return;
1431     }
1432 
1433     if (stream_map_.size() >= max_inbound_concurrent_streams_) {
1434       // The new stream would exceed our advertised and acknowledged
1435       // MAX_CONCURRENT_STREAMS. For parity with nghttp2, treat this error as a
1436       // connection-level PROTOCOL_ERROR.
1437       bool ok = visitor_.OnInvalidFrame(
1438           stream_id, Http2VisitorInterface::InvalidFrameError::kProtocol);
1439       if (!ok) {
1440         fatal_visitor_callback_failure_ = true;
1441       }
1442       LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1443                           ConnectionError::kExceededMaxConcurrentStreams);
1444       return;
1445     }
1446     if (stream_map_.size() >= pending_max_inbound_concurrent_streams_) {
1447       // The new stream would exceed our advertised but unacked
1448       // MAX_CONCURRENT_STREAMS. Refuse the stream for parity with nghttp2.
1449       EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1450           stream_id, spdy::ERROR_CODE_REFUSED_STREAM));
1451       const bool ok = visitor_.OnInvalidFrame(
1452           stream_id, Http2VisitorInterface::InvalidFrameError::kRefusedStream);
1453       if (!ok) {
1454         fatal_visitor_callback_failure_ = true;
1455         LatchErrorAndNotify(Http2ErrorCode::REFUSED_STREAM,
1456                             ConnectionError::kExceededMaxConcurrentStreams);
1457       }
1458       return;
1459     }
1460 
1461     CreateStream(stream_id);
1462   }
1463 }
1464 
OnWindowUpdate(spdy::SpdyStreamId stream_id,int delta_window_size)1465 void OgHttp2Session::OnWindowUpdate(spdy::SpdyStreamId stream_id,
1466                                     int delta_window_size) {
1467   constexpr int kMaxWindowValue = 2147483647;  // (1 << 31) - 1
1468   if (stream_id == 0) {
1469     if (delta_window_size == 0) {
1470       // A PROTOCOL_ERROR, according to RFC 9113 Section 6.9.
1471       LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1472                           ConnectionError::kFlowControlError);
1473       return;
1474     }
1475     if (connection_send_window_ > 0 &&
1476         delta_window_size > (kMaxWindowValue - connection_send_window_)) {
1477       // Window overflow is a FLOW_CONTROL_ERROR.
1478       LatchErrorAndNotify(Http2ErrorCode::FLOW_CONTROL_ERROR,
1479                           ConnectionError::kFlowControlError);
1480       return;
1481     }
1482     connection_send_window_ += delta_window_size;
1483   } else {
1484     if (delta_window_size == 0) {
1485       // A PROTOCOL_ERROR, according to RFC 9113 Section 6.9.
1486       EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1487           stream_id, spdy::ERROR_CODE_PROTOCOL_ERROR));
1488       return;
1489     }
1490     auto it = stream_map_.find(stream_id);
1491     if (it == stream_map_.end()) {
1492       QUICHE_VLOG(1) << "Stream " << stream_id << " not found!";
1493       if (static_cast<Http2StreamId>(stream_id) >
1494           highest_processed_stream_id_) {
1495         // Receiving WINDOW_UPDATE before HEADERS is a connection error.
1496         LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1497                             ConnectionError::kWrongFrameSequence);
1498       }
1499       // Do not inform the visitor of a WINDOW_UPDATE for a non-existent stream.
1500       return;
1501     } else {
1502       if (streams_reset_.contains(stream_id)) {
1503         return;
1504       }
1505       if (it->second.send_window > 0 &&
1506           delta_window_size > (kMaxWindowValue - it->second.send_window)) {
1507         // Window overflow is a FLOW_CONTROL_ERROR.
1508         EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1509             stream_id, spdy::ERROR_CODE_FLOW_CONTROL_ERROR));
1510         return;
1511       }
1512       const bool was_blocked = (it->second.send_window <= 0);
1513       it->second.send_window += delta_window_size;
1514       if (was_blocked && it->second.send_window > 0) {
1515         // The stream was blocked on flow control.
1516         QUICHE_VLOG(1) << "Marking stream " << stream_id << " ready to write.";
1517         write_scheduler_.MarkStreamReady(stream_id, false);
1518       }
1519     }
1520   }
1521   visitor_.OnWindowUpdate(stream_id, delta_window_size);
1522 }
1523 
OnPushPromise(spdy::SpdyStreamId,spdy::SpdyStreamId,bool)1524 void OgHttp2Session::OnPushPromise(spdy::SpdyStreamId /*stream_id*/,
1525                                    spdy::SpdyStreamId /*promised_stream_id*/,
1526                                    bool /*end*/) {
1527   // Server push is disabled; PUSH_PROMISE is an invalid frame.
1528   LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1529                       ConnectionError::kInvalidPushPromise);
1530 }
1531 
OnContinuation(spdy::SpdyStreamId,size_t,bool)1532 void OgHttp2Session::OnContinuation(spdy::SpdyStreamId /*stream_id*/,
1533                                     size_t /*payload_length*/, bool /*end*/) {}
1534 
OnAltSvc(spdy::SpdyStreamId,absl::string_view,const spdy::SpdyAltSvcWireFormat::AlternativeServiceVector &)1535 void OgHttp2Session::OnAltSvc(spdy::SpdyStreamId /*stream_id*/,
1536                               absl::string_view /*origin*/,
1537                               const spdy::SpdyAltSvcWireFormat::
1538                                   AlternativeServiceVector& /*altsvc_vector*/) {
1539 }
1540 
OnPriority(spdy::SpdyStreamId,spdy::SpdyStreamId,int,bool)1541 void OgHttp2Session::OnPriority(spdy::SpdyStreamId /*stream_id*/,
1542                                 spdy::SpdyStreamId /*parent_stream_id*/,
1543                                 int /*weight*/, bool /*exclusive*/) {}
1544 
OnPriorityUpdate(spdy::SpdyStreamId,absl::string_view)1545 void OgHttp2Session::OnPriorityUpdate(
1546     spdy::SpdyStreamId /*prioritized_stream_id*/,
1547     absl::string_view /*priority_field_value*/) {}
1548 
OnUnknownFrame(spdy::SpdyStreamId,uint8_t)1549 bool OgHttp2Session::OnUnknownFrame(spdy::SpdyStreamId /*stream_id*/,
1550                                     uint8_t /*frame_type*/) {
1551   return true;
1552 }
1553 
OnUnknownFrameStart(spdy::SpdyStreamId stream_id,size_t length,uint8_t type,uint8_t flags)1554 void OgHttp2Session::OnUnknownFrameStart(spdy::SpdyStreamId stream_id,
1555                                          size_t length, uint8_t type,
1556                                          uint8_t flags) {
1557   process_metadata_ = false;
1558   if (streams_reset_.contains(stream_id)) {
1559     return;
1560   }
1561   if (type == kMetadataFrameType) {
1562     QUICHE_DCHECK_EQ(metadata_length_, 0u);
1563     visitor_.OnBeginMetadataForStream(stream_id, length);
1564     metadata_length_ = length;
1565     process_metadata_ = true;
1566     end_metadata_ = flags & kMetadataEndFlag;
1567 
1568     // Empty metadata payloads will not trigger OnUnknownFramePayload(), so
1569     // handle that possibility here.
1570     MaybeHandleMetadataEndForStream(stream_id);
1571   } else {
1572     QUICHE_DLOG(INFO) << "Received unexpected frame type "
1573                       << static_cast<int>(type);
1574   }
1575 }
1576 
OnUnknownFramePayload(spdy::SpdyStreamId stream_id,absl::string_view payload)1577 void OgHttp2Session::OnUnknownFramePayload(spdy::SpdyStreamId stream_id,
1578                                            absl::string_view payload) {
1579   if (!process_metadata_) {
1580     return;
1581   }
1582   if (streams_reset_.contains(stream_id)) {
1583     return;
1584   }
1585   if (metadata_length_ > 0) {
1586     QUICHE_DCHECK_LE(payload.size(), metadata_length_);
1587     const bool payload_success =
1588         visitor_.OnMetadataForStream(stream_id, payload);
1589     if (payload_success) {
1590       metadata_length_ -= payload.size();
1591       MaybeHandleMetadataEndForStream(stream_id);
1592     } else {
1593       fatal_visitor_callback_failure_ = true;
1594       decoder_.StopProcessing();
1595     }
1596   } else {
1597     QUICHE_DLOG(INFO) << "Unexpected metadata payload for stream " << stream_id;
1598   }
1599 }
1600 
OnHeaderStatus(Http2StreamId stream_id,Http2VisitorInterface::OnHeaderResult result)1601 void OgHttp2Session::OnHeaderStatus(
1602     Http2StreamId stream_id, Http2VisitorInterface::OnHeaderResult result) {
1603   QUICHE_DCHECK_NE(result, Http2VisitorInterface::HEADER_OK);
1604   QUICHE_VLOG(1) << "OnHeaderStatus(stream_id=" << stream_id
1605                  << ", result=" << result << ")";
1606   const bool should_reset_stream =
1607       result == Http2VisitorInterface::HEADER_RST_STREAM ||
1608       result == Http2VisitorInterface::HEADER_FIELD_INVALID ||
1609       result == Http2VisitorInterface::HEADER_HTTP_MESSAGING;
1610   if (should_reset_stream) {
1611     const Http2ErrorCode error_code =
1612         (result == Http2VisitorInterface::HEADER_RST_STREAM)
1613             ? Http2ErrorCode::INTERNAL_ERROR
1614             : Http2ErrorCode::PROTOCOL_ERROR;
1615     const spdy::SpdyErrorCode spdy_error_code = TranslateErrorCode(error_code);
1616     const Http2VisitorInterface::InvalidFrameError frame_error =
1617         (result == Http2VisitorInterface::HEADER_RST_STREAM ||
1618          result == Http2VisitorInterface::HEADER_FIELD_INVALID)
1619             ? Http2VisitorInterface::InvalidFrameError::kHttpHeader
1620             : Http2VisitorInterface::InvalidFrameError::kHttpMessaging;
1621     auto it = streams_reset_.find(stream_id);
1622     if (it == streams_reset_.end()) {
1623       EnqueueFrame(
1624           std::make_unique<spdy::SpdyRstStreamIR>(stream_id, spdy_error_code));
1625 
1626       if (result == Http2VisitorInterface::HEADER_FIELD_INVALID ||
1627           result == Http2VisitorInterface::HEADER_HTTP_MESSAGING) {
1628         const bool ok = visitor_.OnInvalidFrame(stream_id, frame_error);
1629         if (!ok) {
1630           fatal_visitor_callback_failure_ = true;
1631           LatchErrorAndNotify(error_code, ConnectionError::kHeaderError);
1632         }
1633       }
1634     }
1635   } else if (result == Http2VisitorInterface::HEADER_CONNECTION_ERROR) {
1636     fatal_visitor_callback_failure_ = true;
1637     LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
1638                         ConnectionError::kHeaderError);
1639   } else if (result == Http2VisitorInterface::HEADER_COMPRESSION_ERROR) {
1640     LatchErrorAndNotify(Http2ErrorCode::COMPRESSION_ERROR,
1641                         ConnectionError::kHeaderError);
1642   }
1643 }
1644 
MaybeSetupPreface(bool sending_outbound_settings)1645 void OgHttp2Session::MaybeSetupPreface(bool sending_outbound_settings) {
1646   if (!queued_preface_) {
1647     queued_preface_ = true;
1648     if (!IsServerSession()) {
1649       buffered_data_.assign(spdy::kHttp2ConnectionHeaderPrefix,
1650                             spdy::kHttp2ConnectionHeaderPrefixSize);
1651     }
1652     if (!sending_outbound_settings) {
1653       QUICHE_DCHECK(frames_.empty());
1654       // First frame must be a non-ack SETTINGS.
1655       EnqueueFrame(PrepareSettingsFrame(GetInitialSettings()));
1656     }
1657   }
1658 }
1659 
GetInitialSettings() const1660 std::vector<Http2Setting> OgHttp2Session::GetInitialSettings() const {
1661   std::vector<Http2Setting> settings;
1662   if (!IsServerSession()) {
1663     // Disable server push. Note that server push from clients is already
1664     // disabled, so the server does not need to send this disabling setting.
1665     // TODO(diannahu): Consider applying server push disabling on SETTINGS ack.
1666     settings.push_back({Http2KnownSettingsId::ENABLE_PUSH, 0});
1667   }
1668   if (options_.max_header_list_bytes) {
1669     settings.push_back({Http2KnownSettingsId::MAX_HEADER_LIST_SIZE,
1670                         *options_.max_header_list_bytes});
1671   }
1672   if (options_.allow_extended_connect && IsServerSession()) {
1673     settings.push_back({Http2KnownSettingsId::ENABLE_CONNECT_PROTOCOL, 1u});
1674   }
1675   return settings;
1676 }
1677 
PrepareSettingsFrame(absl::Span<const Http2Setting> settings)1678 std::unique_ptr<SpdySettingsIR> OgHttp2Session::PrepareSettingsFrame(
1679     absl::Span<const Http2Setting> settings) {
1680   auto settings_ir = std::make_unique<SpdySettingsIR>();
1681   for (const Http2Setting& setting : settings) {
1682     settings_ir->AddSetting(setting.id, setting.value);
1683   }
1684   return settings_ir;
1685 }
1686 
HandleOutboundSettings(const spdy::SpdySettingsIR & settings_frame)1687 void OgHttp2Session::HandleOutboundSettings(
1688     const spdy::SpdySettingsIR& settings_frame) {
1689   for (const auto& [id, value] : settings_frame.values()) {
1690     switch (static_cast<Http2KnownSettingsId>(id)) {
1691       case MAX_CONCURRENT_STREAMS:
1692         pending_max_inbound_concurrent_streams_ = value;
1693         break;
1694       case ENABLE_CONNECT_PROTOCOL:
1695         if (value == 1u && IsServerSession()) {
1696           // Allow extended CONNECT semantics even before SETTINGS are acked, to
1697           // make things easier for clients.
1698           headers_handler_.SetAllowExtendedConnect();
1699         }
1700         break;
1701       case HEADER_TABLE_SIZE:
1702       case ENABLE_PUSH:
1703       case INITIAL_WINDOW_SIZE:
1704       case MAX_FRAME_SIZE:
1705       case MAX_HEADER_LIST_SIZE:
1706         QUICHE_VLOG(2)
1707             << "Not adjusting internal state for outbound setting with id "
1708             << id;
1709         break;
1710     }
1711   }
1712 
1713   // Copy the (small) map of settings we are about to send so that we can set
1714   // values in the SETTINGS ack callback.
1715   settings_ack_callbacks_.push_back(
1716       [this, settings_map = settings_frame.values()]() {
1717         for (const auto& [id, value] : settings_map) {
1718           switch (static_cast<Http2KnownSettingsId>(id)) {
1719             case MAX_CONCURRENT_STREAMS:
1720               max_inbound_concurrent_streams_ = value;
1721               break;
1722             case HEADER_TABLE_SIZE:
1723               decoder_.GetHpackDecoder().ApplyHeaderTableSizeSetting(value);
1724               break;
1725             case INITIAL_WINDOW_SIZE:
1726               UpdateStreamReceiveWindowSizes(value);
1727               initial_stream_receive_window_ = value;
1728               break;
1729             case MAX_FRAME_SIZE:
1730               decoder_.SetMaxFrameSize(value);
1731               break;
1732             case ENABLE_PUSH:
1733             case MAX_HEADER_LIST_SIZE:
1734             case ENABLE_CONNECT_PROTOCOL:
1735               QUICHE_VLOG(2)
1736                   << "No action required in ack for outbound setting with id "
1737                   << id;
1738               break;
1739           }
1740         }
1741       });
1742 }
1743 
SendWindowUpdate(Http2StreamId stream_id,size_t update_delta)1744 void OgHttp2Session::SendWindowUpdate(Http2StreamId stream_id,
1745                                       size_t update_delta) {
1746   EnqueueFrame(
1747       std::make_unique<spdy::SpdyWindowUpdateIR>(stream_id, update_delta));
1748 }
1749 
SendHeaders(Http2StreamId stream_id,spdy::Http2HeaderBlock headers,bool end_stream)1750 void OgHttp2Session::SendHeaders(Http2StreamId stream_id,
1751                                  spdy::Http2HeaderBlock headers,
1752                                  bool end_stream) {
1753   auto frame =
1754       std::make_unique<spdy::SpdyHeadersIR>(stream_id, std::move(headers));
1755   frame->set_fin(end_stream);
1756   EnqueueFrame(std::move(frame));
1757 }
1758 
SendTrailers(Http2StreamId stream_id,spdy::Http2HeaderBlock trailers)1759 void OgHttp2Session::SendTrailers(Http2StreamId stream_id,
1760                                   spdy::Http2HeaderBlock trailers) {
1761   auto frame =
1762       std::make_unique<spdy::SpdyHeadersIR>(stream_id, std::move(trailers));
1763   frame->set_fin(true);
1764   EnqueueFrame(std::move(frame));
1765   trailers_ready_.erase(stream_id);
1766 }
1767 
MaybeFinWithRstStream(StreamStateMap::iterator iter)1768 void OgHttp2Session::MaybeFinWithRstStream(StreamStateMap::iterator iter) {
1769   QUICHE_DCHECK(iter != stream_map_.end() && iter->second.half_closed_local);
1770 
1771   if (options_.rst_stream_no_error_when_incomplete && IsServerSession() &&
1772       !iter->second.half_closed_remote) {
1773     // Since the peer has not yet ended the stream, this endpoint should
1774     // send a RST_STREAM NO_ERROR. See RFC 7540 Section 8.1.
1775     EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1776         iter->first, spdy::SpdyErrorCode::ERROR_CODE_NO_ERROR));
1777     iter->second.half_closed_remote = true;
1778   }
1779 }
1780 
MarkDataBuffered(Http2StreamId stream_id,size_t bytes)1781 void OgHttp2Session::MarkDataBuffered(Http2StreamId stream_id, size_t bytes) {
1782   connection_window_manager_.MarkDataBuffered(bytes);
1783   if (auto it = stream_map_.find(stream_id); it != stream_map_.end()) {
1784     it->second.window_manager.MarkDataBuffered(bytes);
1785   }
1786 }
1787 
CreateStream(Http2StreamId stream_id)1788 OgHttp2Session::StreamStateMap::iterator OgHttp2Session::CreateStream(
1789     Http2StreamId stream_id) {
1790   WindowManager::WindowUpdateListener listener =
1791       [this, stream_id](size_t window_update_delta) {
1792         SendWindowUpdate(stream_id, window_update_delta);
1793       };
1794   auto [iter, inserted] = stream_map_.try_emplace(
1795       stream_id,
1796       StreamState(initial_stream_receive_window_, initial_stream_send_window_,
1797                   std::move(listener), options_.should_window_update_fn));
1798   if (inserted) {
1799     // Add the stream to the write scheduler.
1800     const spdy::SpdyPriority priority = 3;
1801     write_scheduler_.RegisterStream(stream_id, priority);
1802 
1803     highest_processed_stream_id_ =
1804         std::max(highest_processed_stream_id_, stream_id);
1805   }
1806   return iter;
1807 }
1808 
StartRequest(Http2StreamId stream_id,spdy::Http2HeaderBlock headers,std::unique_ptr<DataFrameSource> data_source,void * user_data)1809 void OgHttp2Session::StartRequest(Http2StreamId stream_id,
1810                                   spdy::Http2HeaderBlock headers,
1811                                   std::unique_ptr<DataFrameSource> data_source,
1812                                   void* user_data) {
1813   if (received_goaway_) {
1814     // Do not start new streams after receiving a GOAWAY.
1815     goaway_rejected_streams_.insert(stream_id);
1816     return;
1817   }
1818 
1819   auto iter = CreateStream(stream_id);
1820   const bool end_stream = data_source == nullptr;
1821   if (!end_stream) {
1822     iter->second.outbound_body = std::move(data_source);
1823     write_scheduler_.MarkStreamReady(stream_id, false);
1824   }
1825   iter->second.user_data = user_data;
1826   for (const auto& [name, value] : headers) {
1827     if (name == kHttp2MethodPseudoHeader && value == kHeadValue) {
1828       iter->second.sent_head_method = true;
1829     }
1830   }
1831   SendHeaders(stream_id, std::move(headers), end_stream);
1832 }
1833 
StartPendingStreams()1834 void OgHttp2Session::StartPendingStreams() {
1835   while (!pending_streams_.empty() && CanCreateStream()) {
1836     auto& [stream_id, pending_stream] = pending_streams_.front();
1837     StartRequest(stream_id, std::move(pending_stream.headers),
1838                  std::move(pending_stream.data_source),
1839                  pending_stream.user_data);
1840     pending_streams_.pop_front();
1841   }
1842 }
1843 
CloseStream(Http2StreamId stream_id,Http2ErrorCode error_code)1844 void OgHttp2Session::CloseStream(Http2StreamId stream_id,
1845                                  Http2ErrorCode error_code) {
1846   const bool result = visitor_.OnCloseStream(stream_id, error_code);
1847   if (!result) {
1848     latched_error_ = true;
1849     decoder_.StopProcessing();
1850   }
1851   stream_map_.erase(stream_id);
1852   trailers_ready_.erase(stream_id);
1853   streams_reset_.erase(stream_id);
1854   auto queued_it = queued_frames_.find(stream_id);
1855   if (queued_it != queued_frames_.end()) {
1856     // Remove any queued frames for this stream.
1857     int frames_remaining = queued_it->second;
1858     queued_frames_.erase(queued_it);
1859     for (auto it = frames_.begin();
1860          frames_remaining > 0 && it != frames_.end();) {
1861       if (static_cast<Http2StreamId>((*it)->stream_id()) == stream_id) {
1862         it = frames_.erase(it);
1863         --frames_remaining;
1864       } else {
1865         ++it;
1866       }
1867     }
1868   }
1869   if (write_scheduler_.StreamRegistered(stream_id)) {
1870     write_scheduler_.UnregisterStream(stream_id);
1871   }
1872 
1873   StartPendingStreams();
1874 }
1875 
CanCreateStream() const1876 bool OgHttp2Session::CanCreateStream() const {
1877   return stream_map_.size() < max_outbound_concurrent_streams_;
1878 }
1879 
NextHeaderType(std::optional<HeaderType> current_type)1880 HeaderType OgHttp2Session::NextHeaderType(
1881     std::optional<HeaderType> current_type) {
1882   if (IsServerSession()) {
1883     if (!current_type) {
1884       return HeaderType::REQUEST;
1885     } else {
1886       return HeaderType::REQUEST_TRAILER;
1887     }
1888   } else if (!current_type || *current_type == HeaderType::RESPONSE_100) {
1889     return HeaderType::RESPONSE;
1890   } else {
1891     return HeaderType::RESPONSE_TRAILER;
1892   }
1893 }
1894 
LatchErrorAndNotify(Http2ErrorCode error_code,ConnectionError error)1895 void OgHttp2Session::LatchErrorAndNotify(Http2ErrorCode error_code,
1896                                          ConnectionError error) {
1897   if (latched_error_) {
1898     // Do not kick a connection when it is down.
1899     return;
1900   }
1901 
1902   latched_error_ = true;
1903   visitor_.OnConnectionError(error);
1904   decoder_.StopProcessing();
1905   EnqueueFrame(std::make_unique<spdy::SpdyGoAwayIR>(
1906       highest_processed_stream_id_, TranslateErrorCode(error_code),
1907       ConnectionErrorToString(error)));
1908 }
1909 
CloseStreamIfReady(uint8_t frame_type,uint32_t stream_id)1910 void OgHttp2Session::CloseStreamIfReady(uint8_t frame_type,
1911                                         uint32_t stream_id) {
1912   auto iter = stream_map_.find(stream_id);
1913   if (iter == stream_map_.end()) {
1914     return;
1915   }
1916   const StreamState& state = iter->second;
1917   if (static_cast<FrameType>(frame_type) == FrameType::RST_STREAM ||
1918       (state.half_closed_local && state.half_closed_remote)) {
1919     CloseStream(stream_id, Http2ErrorCode::HTTP2_NO_ERROR);
1920   }
1921 }
1922 
CloseGoAwayRejectedStreams()1923 void OgHttp2Session::CloseGoAwayRejectedStreams() {
1924   for (Http2StreamId stream_id : goaway_rejected_streams_) {
1925     const bool result =
1926         visitor_.OnCloseStream(stream_id, Http2ErrorCode::REFUSED_STREAM);
1927     if (!result) {
1928       latched_error_ = true;
1929       decoder_.StopProcessing();
1930     }
1931   }
1932   goaway_rejected_streams_.clear();
1933 }
1934 
PrepareForImmediateGoAway()1935 void OgHttp2Session::PrepareForImmediateGoAway() {
1936   queued_immediate_goaway_ = true;
1937 
1938   // Keep the initial SETTINGS frame if the session has SETTINGS at the front of
1939   // the queue but has not sent SETTINGS yet. The session should send initial
1940   // SETTINGS before GOAWAY.
1941   std::unique_ptr<spdy::SpdyFrameIR> initial_settings;
1942   if (!sent_non_ack_settings_ && !frames_.empty() &&
1943       IsNonAckSettings(*frames_.front())) {
1944     initial_settings = std::move(frames_.front());
1945     frames_.pop_front();
1946   }
1947 
1948   // Remove all pending frames except for RST_STREAMs. It is important to send
1949   // RST_STREAMs so the peer knows of errors below the GOAWAY last stream ID.
1950   // TODO(diannahu): Consider informing the visitor of dropped frames. This may
1951   // mean keeping the frames and invoking a frame-not-sent callback, similar to
1952   // nghttp2. Could add a closure to each frame in the frames queue.
1953   frames_.remove_if([](const auto& frame) {
1954     return frame->frame_type() != spdy::SpdyFrameType::RST_STREAM;
1955   });
1956 
1957   if (initial_settings != nullptr) {
1958     frames_.push_front(std::move(initial_settings));
1959   }
1960 }
1961 
MaybeHandleMetadataEndForStream(Http2StreamId stream_id)1962 void OgHttp2Session::MaybeHandleMetadataEndForStream(Http2StreamId stream_id) {
1963   if (metadata_length_ == 0 && end_metadata_) {
1964     const bool completion_success = visitor_.OnMetadataEndForStream(stream_id);
1965     if (!completion_success) {
1966       fatal_visitor_callback_failure_ = true;
1967       decoder_.StopProcessing();
1968     }
1969     process_metadata_ = false;
1970     end_metadata_ = false;
1971   }
1972 }
1973 
DecrementQueuedFrameCount(uint32_t stream_id,uint8_t frame_type)1974 void OgHttp2Session::DecrementQueuedFrameCount(uint32_t stream_id,
1975                                                uint8_t frame_type) {
1976   auto iter = queued_frames_.find(stream_id);
1977   if (iter == queued_frames_.end()) {
1978     QUICHE_LOG(ERROR) << "Unable to find a queued frame count for stream "
1979                       << stream_id;
1980     return;
1981   }
1982   if (static_cast<FrameType>(frame_type) != FrameType::DATA) {
1983     --iter->second;
1984   }
1985   if (iter->second == 0) {
1986     // TODO(birenroy): Consider passing through `error_code` here.
1987     CloseStreamIfReady(frame_type, stream_id);
1988   }
1989 }
1990 
HandleContentLengthError(Http2StreamId stream_id)1991 void OgHttp2Session::HandleContentLengthError(Http2StreamId stream_id) {
1992   if (current_frame_type_ == static_cast<uint8_t>(FrameType::HEADERS)) {
1993     // For consistency, either OnInvalidFrame should always be invoked,
1994     // regardless of frame type, or perhaps we should introduce an OnStreamError
1995     // callback.
1996     visitor_.OnInvalidFrame(
1997         stream_id, Http2VisitorInterface::InvalidFrameError::kHttpMessaging);
1998   }
1999   EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
2000       stream_id, spdy::ERROR_CODE_PROTOCOL_ERROR));
2001 }
2002 
UpdateReceiveWindow(Http2StreamId stream_id,int32_t delta)2003 void OgHttp2Session::UpdateReceiveWindow(Http2StreamId stream_id,
2004                                          int32_t delta) {
2005   if (stream_id == 0) {
2006     connection_window_manager_.IncreaseWindow(delta);
2007     // TODO(b/181586191): Provide an explicit way to set the desired window
2008     // limit, remove the upsize-on-window-update behavior.
2009     const int64_t current_window =
2010         connection_window_manager_.CurrentWindowSize();
2011     if (current_window > connection_window_manager_.WindowSizeLimit()) {
2012       connection_window_manager_.SetWindowSizeLimit(current_window);
2013     }
2014   } else {
2015     auto iter = stream_map_.find(stream_id);
2016     if (iter != stream_map_.end()) {
2017       WindowManager& manager = iter->second.window_manager;
2018       manager.IncreaseWindow(delta);
2019       // TODO(b/181586191): Provide an explicit way to set the desired window
2020       // limit, remove the upsize-on-window-update behavior.
2021       const int64_t current_window = manager.CurrentWindowSize();
2022       if (current_window > manager.WindowSizeLimit()) {
2023         manager.SetWindowSizeLimit(current_window);
2024       }
2025     }
2026   }
2027 }
2028 
UpdateStreamSendWindowSizes(uint32_t new_value)2029 void OgHttp2Session::UpdateStreamSendWindowSizes(uint32_t new_value) {
2030   const int32_t delta =
2031       static_cast<int32_t>(new_value) - initial_stream_send_window_;
2032   initial_stream_send_window_ = new_value;
2033   for (auto& [stream_id, stream_state] : stream_map_) {
2034     const int64_t current_window_size = stream_state.send_window;
2035     const int64_t new_window_size = current_window_size + delta;
2036     if (new_window_size > spdy::kSpdyMaximumWindowSize) {
2037       EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
2038           stream_id, spdy::ERROR_CODE_FLOW_CONTROL_ERROR));
2039     } else {
2040       stream_state.send_window += delta;
2041     }
2042     if (current_window_size <= 0 && new_window_size > 0) {
2043       write_scheduler_.MarkStreamReady(stream_id, false);
2044     }
2045   }
2046 }
2047 
UpdateStreamReceiveWindowSizes(uint32_t new_value)2048 void OgHttp2Session::UpdateStreamReceiveWindowSizes(uint32_t new_value) {
2049   for (auto& [stream_id, stream_state] : stream_map_) {
2050     stream_state.window_manager.OnWindowSizeLimitChange(new_value);
2051   }
2052 }
2053 
HasMoreData(const StreamState & stream_state) const2054 bool OgHttp2Session::HasMoreData(const StreamState& stream_state) const {
2055   return stream_state.outbound_body != nullptr;
2056 }
2057 
IsReadyToWriteData(const StreamState & stream_state) const2058 bool OgHttp2Session::IsReadyToWriteData(const StreamState& stream_state) const {
2059   return stream_state.outbound_body != nullptr && !stream_state.data_deferred;
2060 }
2061 
AbandonData(StreamState & stream_state)2062 void OgHttp2Session::AbandonData(StreamState& stream_state) {
2063   stream_state.outbound_body = nullptr;
2064 }
2065 
GetDataFrameInfo(Http2StreamId,size_t flow_control_available,StreamState & stream_state)2066 OgHttp2Session::DataFrameInfo OgHttp2Session::GetDataFrameInfo(
2067     Http2StreamId /*stream_id*/, size_t flow_control_available,
2068     StreamState& stream_state) {
2069   DataFrameInfo info;
2070   std::tie(info.payload_length, info.end_data) =
2071       stream_state.outbound_body->SelectPayloadLength(flow_control_available);
2072   info.send_fin =
2073       info.end_data ? stream_state.outbound_body->send_fin() : false;
2074   return info;
2075 }
2076 
SendDataFrame(Http2StreamId,absl::string_view frame_header,size_t payload_length,StreamState & stream_state)2077 bool OgHttp2Session::SendDataFrame(Http2StreamId /*stream_id*/,
2078                                    absl::string_view frame_header,
2079                                    size_t payload_length,
2080                                    StreamState& stream_state) {
2081   return stream_state.outbound_body->Send(frame_header, payload_length);
2082 }
2083 
2084 }  // namespace adapter
2085 }  // namespace http2
2086