1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <inttypes.h>
22 #include <stddef.h>
23 
24 #include <algorithm>
25 #include <memory>
26 #include <string>
27 
28 #include "absl/status/status.h"
29 #include "absl/strings/string_view.h"
30 #include "absl/types/optional.h"
31 
32 #include <grpc/event_engine/event_engine.h>
33 #include <grpc/slice.h>
34 #include <grpc/slice_buffer.h>
35 #include <grpc/support/log.h>
36 
37 #include "src/core/ext/transport/chttp2/transport/http_trace.h"
38 
39 // IWYU pragma: no_include "src/core/lib/gprpp/orphanable.h"
40 
41 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
42 #include "src/core/ext/transport/chttp2/transport/context_list_entry.h"
43 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
44 #include "src/core/ext/transport/chttp2/transport/frame.h"
45 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
46 #include "src/core/ext/transport/chttp2/transport/frame_ping.h"
47 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
48 #include "src/core/ext/transport/chttp2/transport/frame_settings.h"
49 #include "src/core/ext/transport/chttp2/transport/frame_window_update.h"
50 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
51 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
52 #include "src/core/ext/transport/chttp2/transport/internal.h"
53 #include "src/core/ext/transport/chttp2/transport/stream_map.h"
54 #include "src/core/lib/channel/channelz.h"
55 #include "src/core/lib/debug/stats.h"
56 #include "src/core/lib/debug/stats_data.h"
57 #include "src/core/lib/debug/trace.h"
58 #include "src/core/lib/gprpp/debug_location.h"
59 #include "src/core/lib/gprpp/ref_counted.h"
60 #include "src/core/lib/gprpp/ref_counted_ptr.h"
61 #include "src/core/lib/gprpp/time.h"
62 #include "src/core/lib/iomgr/closure.h"
63 #include "src/core/lib/iomgr/endpoint.h"
64 #include "src/core/lib/iomgr/error.h"
65 #include "src/core/lib/iomgr/exec_ctx.h"
66 #include "src/core/lib/slice/slice.h"
67 #include "src/core/lib/transport/bdp_estimator.h"
68 #include "src/core/lib/transport/http2_errors.h"
69 #include "src/core/lib/transport/metadata_batch.h"
70 #include "src/core/lib/transport/transport.h"
71 
add_to_write_list(grpc_chttp2_write_cb ** list,grpc_chttp2_write_cb * cb)72 static void add_to_write_list(grpc_chttp2_write_cb** list,
73                               grpc_chttp2_write_cb* cb) {
74   cb->next = *list;
75   *list = cb;
76 }
77 
finish_write_cb(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_write_cb * cb,grpc_error_handle error)78 static void finish_write_cb(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
79                             grpc_chttp2_write_cb* cb, grpc_error_handle error) {
80   grpc_chttp2_complete_closure_step(t, s, &cb->closure, error,
81                                     "finish_write_cb");
82   cb->next = t->write_cb_pool;
83   t->write_cb_pool = cb;
84 }
85 
maybe_initiate_ping(grpc_chttp2_transport * t)86 static void maybe_initiate_ping(grpc_chttp2_transport* t) {
87   grpc_chttp2_ping_queue* pq = &t->ping_queue;
88   if (grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
89     // no ping needed: wait
90     return;
91   }
92   if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
93     // ping already in-flight: wait
94     if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
95         GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
96         GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
97       gpr_log(GPR_INFO, "%s: Ping delayed [%s]: already pinging",
98               t->is_client ? "CLIENT" : "SERVER",
99               std::string(t->peer_string.as_string_view()).c_str());
100     }
101     return;
102   }
103   if (t->is_client && t->ping_state.pings_before_data_required == 0 &&
104       t->ping_policy.max_pings_without_data != 0) {
105     // need to receive something of substance before sending a ping again
106     if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
107         GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
108         GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
109       gpr_log(GPR_INFO,
110               "CLIENT: Ping delayed [%s]: too many recent pings: %d/%d",
111               std::string(t->peer_string.as_string_view()).c_str(),
112               t->ping_state.pings_before_data_required,
113               t->ping_policy.max_pings_without_data);
114     }
115     return;
116   }
117   // InvalidateNow to avoid getting stuck re-initializing the ping timer
118   // in a loop while draining the currently-held combiner. Also see
119   // https://github.com/grpc/grpc/issues/26079.
120   grpc_core::ExecCtx::Get()->InvalidateNow();
121   grpc_core::Timestamp now = grpc_core::Timestamp::Now();
122 
123   grpc_core::Duration next_allowed_ping_interval = grpc_core::Duration::Zero();
124   if (t->is_client) {
125     next_allowed_ping_interval =
126         (t->keepalive_permit_without_calls == 0 &&
127          grpc_chttp2_stream_map_size(&t->stream_map) == 0)
128             ? grpc_core::Duration::Hours(2)
129             : grpc_core::Duration::Seconds(
130                   1);  // A second is added to deal with
131                        // network delays and timing imprecision
132   } else if (t->sent_goaway_state != GRPC_CHTTP2_GRACEFUL_GOAWAY) {
133     // The gRPC keepalive spec doesn't call for any throttling on the server
134     // side, but we are adding some throttling for protection anyway, unless
135     // we are doing a graceful GOAWAY in which case we don't want to wait.
136     next_allowed_ping_interval =
137         t->keepalive_time == grpc_core::Duration::Infinity()
138             ? grpc_core::Duration::Seconds(20)
139             : t->keepalive_time / 2;
140   }
141   grpc_core::Timestamp next_allowed_ping =
142       t->ping_state.last_ping_sent_time + next_allowed_ping_interval;
143 
144   if (next_allowed_ping > now) {
145     // not enough elapsed time between successive pings
146     if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
147         GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
148         GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
149       gpr_log(
150           GPR_INFO,
151           "%s: Ping delayed [%s]: not enough time elapsed since last "
152           "ping. "
153           " Last ping %" PRId64 ": Next ping %" PRId64 ": Now %" PRId64,
154           t->is_client ? "CLIENT" : "SERVER",
155           std::string(t->peer_string.as_string_view()).c_str(),
156           t->ping_state.last_ping_sent_time.milliseconds_after_process_epoch(),
157           next_allowed_ping.milliseconds_after_process_epoch(),
158           now.milliseconds_after_process_epoch());
159     }
160     if (!t->ping_state.delayed_ping_timer_handle.has_value()) {
161       GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked");
162       t->ping_state.delayed_ping_timer_handle =
163           t->event_engine->RunAfter(next_allowed_ping - now, [t] {
164             grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
165             grpc_core::ExecCtx exec_ctx;
166             grpc_chttp2_retry_initiate_ping(t);
167           });
168     }
169     return;
170   }
171   t->ping_state.last_ping_sent_time = now;
172 
173   pq->inflight_id = t->ping_ctr;
174   t->ping_ctr++;
175   grpc_core::ExecCtx::RunList(DEBUG_LOCATION,
176                               &pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
177   grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
178                          &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
179   grpc_slice_buffer_add(&t->outbuf,
180                         grpc_chttp2_ping_create(false, pq->inflight_id));
181   grpc_core::global_stats().IncrementHttp2PingsSent();
182   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
183       GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
184       GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
185     gpr_log(GPR_INFO, "%s: Ping sent [%s]: %d/%d",
186             t->is_client ? "CLIENT" : "SERVER",
187             std::string(t->peer_string.as_string_view()).c_str(),
188             t->ping_state.pings_before_data_required,
189             t->ping_policy.max_pings_without_data);
190   }
191   t->ping_state.pings_before_data_required -=
192       (t->ping_state.pings_before_data_required != 0);
193 }
194 
update_list(grpc_chttp2_transport * t,grpc_chttp2_stream * s,int64_t send_bytes,grpc_chttp2_write_cb ** list,int64_t * ctr,grpc_error_handle error)195 static bool update_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
196                         int64_t send_bytes, grpc_chttp2_write_cb** list,
197                         int64_t* ctr, grpc_error_handle error) {
198   bool sched_any = false;
199   grpc_chttp2_write_cb* cb = *list;
200   *list = nullptr;
201   *ctr += send_bytes;
202   while (cb) {
203     grpc_chttp2_write_cb* next = cb->next;
204     if (cb->call_at_byte <= *ctr) {
205       sched_any = true;
206       finish_write_cb(t, s, cb, error);
207     } else {
208       add_to_write_list(list, cb);
209     }
210     cb = next;
211   }
212   return sched_any;
213 }
214 
report_stall(grpc_chttp2_transport * t,grpc_chttp2_stream * s,const char * staller)215 static void report_stall(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
216                          const char* staller) {
217   if (GRPC_TRACE_FLAG_ENABLED(grpc_flowctl_trace)) {
218     gpr_log(
219         GPR_DEBUG,
220         "%s:%p stream %d moved to stalled list by %s. This is FULLY expected "
221         "to happen in a healthy program that is not seeing flow control stalls."
222         " However, if you know that there are unwanted stalls, here is some "
223         "helpful data: [fc:pending=%" PRIdPTR ":flowed=%" PRId64
224         ":peer_initwin=%d:t_win=%" PRId64 ":s_win=%d:s_delta=%" PRId64 "]",
225         std::string(t->peer_string.as_string_view()).c_str(), t, s->id, staller,
226         s->flow_controlled_buffer.length, s->flow_controlled_bytes_flowed,
227         t->settings[GRPC_ACKED_SETTINGS]
228                    [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
229         t->flow_control.remote_window(),
230         static_cast<uint32_t>(std::max(
231             int64_t{0},
232             s->flow_control.remote_window_delta() +
233                 static_cast<int64_t>(
234                     t->settings[GRPC_PEER_SETTINGS]
235                                [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]))),
236         s->flow_control.remote_window_delta());
237   }
238 }
239 
240 // How many bytes would we like to put on the wire during a single syscall
target_write_size(grpc_chttp2_transport *)241 static uint32_t target_write_size(grpc_chttp2_transport* /*t*/) {
242   return 1024 * 1024;
243 }
244 
245 namespace {
246 
247 class CountDefaultMetadataEncoder {
248  public:
count() const249   size_t count() const { return count_; }
250 
Encode(const grpc_core::Slice &,const grpc_core::Slice &)251   void Encode(const grpc_core::Slice&, const grpc_core::Slice&) {}
252 
253   template <typename Which>
Encode(Which,const typename Which::ValueType &)254   void Encode(Which, const typename Which::ValueType&) {
255     count_++;
256   }
257 
258  private:
259   size_t count_ = 0;
260 };
261 
262 }  // namespace
263 
264 // Returns true if initial_metadata contains only default headers.
is_default_initial_metadata(grpc_metadata_batch * initial_metadata)265 static bool is_default_initial_metadata(grpc_metadata_batch* initial_metadata) {
266   CountDefaultMetadataEncoder enc;
267   initial_metadata->Encode(&enc);
268   return enc.count() == initial_metadata->count();
269 }
270 
271 namespace {
272 
273 class WriteContext {
274  public:
WriteContext(grpc_chttp2_transport * t)275   explicit WriteContext(grpc_chttp2_transport* t) : t_(t) {
276     grpc_core::global_stats().IncrementHttp2WritesBegun();
277   }
278 
FlushSettings()279   void FlushSettings() {
280     if (t_->dirtied_local_settings && !t_->sent_local_settings) {
281       grpc_slice_buffer_add(
282           &t_->outbuf, grpc_chttp2_settings_create(
283                            t_->settings[GRPC_SENT_SETTINGS],
284                            t_->settings[GRPC_LOCAL_SETTINGS],
285                            t_->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
286       t_->force_send_settings = false;
287       t_->dirtied_local_settings = false;
288       t_->sent_local_settings = true;
289       grpc_core::global_stats().IncrementHttp2SettingsWrites();
290     }
291   }
292 
FlushQueuedBuffers()293   void FlushQueuedBuffers() {
294     // simple writes are queued to qbuf, and flushed here
295     grpc_slice_buffer_move_into(&t_->qbuf, &t_->outbuf);
296     t_->num_pending_induced_frames = 0;
297     GPR_ASSERT(t_->qbuf.count == 0);
298   }
299 
FlushWindowUpdates()300   void FlushWindowUpdates() {
301     uint32_t transport_announce =
302         t_->flow_control.MaybeSendUpdate(t_->outbuf.count > 0);
303     if (transport_announce) {
304       grpc_transport_one_way_stats throwaway_stats;
305       grpc_slice_buffer_add(
306           &t_->outbuf, grpc_chttp2_window_update_create(0, transport_announce,
307                                                         &throwaway_stats));
308       grpc_chttp2_reset_ping_clock(t_);
309     }
310   }
311 
FlushPingAcks()312   void FlushPingAcks() {
313     for (size_t i = 0; i < t_->ping_ack_count; i++) {
314       grpc_slice_buffer_add(&t_->outbuf,
315                             grpc_chttp2_ping_create(true, t_->ping_acks[i]));
316     }
317     t_->ping_ack_count = 0;
318   }
319 
EnactHpackSettings()320   void EnactHpackSettings() {
321     t_->hpack_compressor.SetMaxTableSize(
322         t_->settings[GRPC_PEER_SETTINGS]
323                     [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
324   }
325 
UpdateStreamsNoLongerStalled()326   void UpdateStreamsNoLongerStalled() {
327     grpc_chttp2_stream* s;
328     while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) {
329       if (t_->closed_with_error.ok() &&
330           grpc_chttp2_list_add_writable_stream(t_, s)) {
331         if (!s->refcount->refs.RefIfNonZero()) {
332           grpc_chttp2_list_remove_writable_stream(t_, s);
333         }
334       }
335     }
336   }
337 
NextStream()338   grpc_chttp2_stream* NextStream() {
339     if (t_->outbuf.length > target_write_size(t_)) {
340       result_.partial = true;
341       return nullptr;
342     }
343 
344     grpc_chttp2_stream* s;
345     if (!grpc_chttp2_list_pop_writable_stream(t_, &s)) {
346       return nullptr;
347     }
348 
349     return s;
350   }
351 
IncInitialMetadataWrites()352   void IncInitialMetadataWrites() { ++initial_metadata_writes_; }
IncWindowUpdateWrites()353   void IncWindowUpdateWrites() { ++flow_control_writes_; }
IncMessageWrites()354   void IncMessageWrites() { ++message_writes_; }
IncTrailingMetadataWrites()355   void IncTrailingMetadataWrites() { ++trailing_metadata_writes_; }
356 
NoteScheduledResults()357   void NoteScheduledResults() { result_.early_results_scheduled = true; }
358 
transport() const359   grpc_chttp2_transport* transport() const { return t_; }
360 
Result()361   grpc_chttp2_begin_write_result Result() {
362     result_.writing = t_->outbuf.count > 0;
363     return result_;
364   }
365 
366  private:
367   grpc_chttp2_transport* const t_;
368 
369   // stats histogram counters: we increment these throughout this function,
370   // and at the end publish to the central stats histograms
371   int flow_control_writes_ = 0;
372   int initial_metadata_writes_ = 0;
373   int trailing_metadata_writes_ = 0;
374   int message_writes_ = 0;
375   grpc_chttp2_begin_write_result result_ = {false, false, false};
376 };
377 
378 class DataSendContext {
379  public:
DataSendContext(WriteContext * write_context,grpc_chttp2_transport * t,grpc_chttp2_stream * s)380   DataSendContext(WriteContext* write_context, grpc_chttp2_transport* t,
381                   grpc_chttp2_stream* s)
382       : write_context_(write_context),
383         t_(t),
384         s_(s),
385         sending_bytes_before_(s_->sending_bytes) {}
386 
stream_remote_window() const387   uint32_t stream_remote_window() const {
388     return static_cast<uint32_t>(std::max(
389         int64_t{0},
390         s_->flow_control.remote_window_delta() +
391             static_cast<int64_t>(
392                 t_->settings[GRPC_PEER_SETTINGS]
393                             [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE])));
394   }
395 
max_outgoing() const396   uint32_t max_outgoing() const {
397     return static_cast<uint32_t>(std::min(
398         t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
399         static_cast<uint32_t>(
400             std::min(static_cast<int64_t>(stream_remote_window()),
401                      t_->flow_control.remote_window()))));
402   }
403 
AnyOutgoing() const404   bool AnyOutgoing() const { return max_outgoing() > 0; }
405 
FlushBytes()406   void FlushBytes() {
407     uint32_t send_bytes =
408         static_cast<uint32_t>(std::min(static_cast<size_t>(max_outgoing()),
409                                        s_->flow_controlled_buffer.length));
410     is_last_frame_ = send_bytes == s_->flow_controlled_buffer.length &&
411                      s_->send_trailing_metadata != nullptr &&
412                      s_->send_trailing_metadata->empty();
413     grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, send_bytes,
414                             is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
415     sfc_upd_.SentData(send_bytes);
416     s_->sending_bytes += send_bytes;
417   }
418 
is_last_frame() const419   bool is_last_frame() const { return is_last_frame_; }
420 
CallCallbacks()421   void CallCallbacks() {
422     if (update_list(
423             t_, s_,
424             static_cast<int64_t>(s_->sending_bytes - sending_bytes_before_),
425             &s_->on_flow_controlled_cbs, &s_->flow_controlled_bytes_flowed,
426             absl::OkStatus())) {
427       write_context_->NoteScheduledResults();
428     }
429   }
430 
431  private:
432   WriteContext* write_context_;
433   grpc_chttp2_transport* t_;
434   grpc_chttp2_stream* s_;
435   grpc_core::chttp2::StreamFlowControl::OutgoingUpdateContext sfc_upd_{
436       &s_->flow_control};
437   const size_t sending_bytes_before_;
438   bool is_last_frame_ = false;
439 };
440 
441 class StreamWriteContext {
442  public:
StreamWriteContext(WriteContext * write_context,grpc_chttp2_stream * s)443   StreamWriteContext(WriteContext* write_context, grpc_chttp2_stream* s)
444       : write_context_(write_context), t_(write_context->transport()), s_(s) {
445     GRPC_CHTTP2_IF_TRACING(
446         gpr_log(GPR_INFO, "W:%p %s[%d] im-(sent,send)=(%d,%d)", t_,
447                 t_->is_client ? "CLIENT" : "SERVER", s->id,
448                 s->sent_initial_metadata, s->send_initial_metadata != nullptr));
449   }
450 
FlushInitialMetadata()451   void FlushInitialMetadata() {
452     // send initial metadata if it's available
453     if (s_->sent_initial_metadata) return;
454     if (s_->send_initial_metadata == nullptr) return;
455 
456     // We skip this on the server side if there is no custom initial
457     // metadata, there are no messages to send, and we are also sending
458     // trailing metadata.  This results in a Trailers-Only response,
459     // which is required for retries, as per:
460     // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid
461     if (!t_->is_client && s_->flow_controlled_buffer.length == 0 &&
462         s_->send_trailing_metadata != nullptr &&
463         is_default_initial_metadata(s_->send_initial_metadata)) {
464       ConvertInitialMetadataToTrailingMetadata();
465     } else {
466       t_->hpack_compressor.EncodeHeaders(
467           grpc_core::HPackCompressor::EncodeHeaderOptions{
468               s_->id,  // stream_id
469               false,   // is_eof
470               t_->settings
471                       [GRPC_PEER_SETTINGS]
472                       [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
473                   0,  // use_true_binary_metadata
474               t_->settings
475                   [GRPC_PEER_SETTINGS]
476                   [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],  // max_frame_size
477               &s_->stats.outgoing                         // stats
478           },
479           *s_->send_initial_metadata, &t_->outbuf);
480       grpc_chttp2_reset_ping_clock(t_);
481       write_context_->IncInitialMetadataWrites();
482     }
483 
484     s_->send_initial_metadata = nullptr;
485     s_->sent_initial_metadata = true;
486     write_context_->NoteScheduledResults();
487     grpc_chttp2_complete_closure_step(
488         t_, s_, &s_->send_initial_metadata_finished, absl::OkStatus(),
489         "send_initial_metadata_finished");
490   }
491 
FlushWindowUpdates()492   void FlushWindowUpdates() {
493     if (s_->read_closed) return;
494 
495     // send any window updates
496     const uint32_t stream_announce = s_->flow_control.MaybeSendUpdate();
497     if (stream_announce == 0) return;
498 
499     grpc_slice_buffer_add(
500         &t_->outbuf, grpc_chttp2_window_update_create(s_->id, stream_announce,
501                                                       &s_->stats.outgoing));
502     grpc_chttp2_reset_ping_clock(t_);
503     write_context_->IncWindowUpdateWrites();
504   }
505 
FlushData()506   void FlushData() {
507     if (!s_->sent_initial_metadata) return;
508 
509     if (s_->flow_controlled_buffer.length == 0) {
510       return;  // early out: nothing to do
511     }
512 
513     DataSendContext data_send_context(write_context_, t_, s_);
514 
515     if (!data_send_context.AnyOutgoing()) {
516       if (t_->flow_control.remote_window() <= 0) {
517         grpc_core::global_stats().IncrementHttp2TransportStalls();
518         report_stall(t_, s_, "transport");
519         grpc_chttp2_list_add_stalled_by_transport(t_, s_);
520       } else if (data_send_context.stream_remote_window() <= 0) {
521         grpc_core::global_stats().IncrementHttp2StreamStalls();
522         report_stall(t_, s_, "stream");
523         grpc_chttp2_list_add_stalled_by_stream(t_, s_);
524       }
525       return;  // early out: nothing to do
526     }
527 
528     while (s_->flow_controlled_buffer.length > 0 &&
529            data_send_context.max_outgoing() > 0) {
530       data_send_context.FlushBytes();
531     }
532     grpc_chttp2_reset_ping_clock(t_);
533     if (data_send_context.is_last_frame()) {
534       SentLastFrame();
535     }
536     data_send_context.CallCallbacks();
537     stream_became_writable_ = true;
538     if (s_->flow_controlled_buffer.length > 0) {
539       GRPC_CHTTP2_STREAM_REF(s_, "chttp2_writing:fork");
540       grpc_chttp2_list_add_writable_stream(t_, s_);
541     }
542     write_context_->IncMessageWrites();
543   }
544 
FlushTrailingMetadata()545   void FlushTrailingMetadata() {
546     if (!s_->sent_initial_metadata) return;
547 
548     if (s_->send_trailing_metadata == nullptr) return;
549     if (s_->flow_controlled_buffer.length != 0) return;
550 
551     GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata"));
552     if (s_->send_trailing_metadata->empty()) {
553       grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, 0, true,
554                               &s_->stats.outgoing, &t_->outbuf);
555     } else {
556       if (send_status_.has_value()) {
557         s_->send_trailing_metadata->Set(grpc_core::HttpStatusMetadata(),
558                                         *send_status_);
559       }
560       if (send_content_type_.has_value()) {
561         s_->send_trailing_metadata->Set(grpc_core::ContentTypeMetadata(),
562                                         *send_content_type_);
563       }
564       t_->hpack_compressor.EncodeHeaders(
565           grpc_core::HPackCompressor::EncodeHeaderOptions{
566               s_->id, true,
567               t_->settings
568                       [GRPC_PEER_SETTINGS]
569                       [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
570                   0,
571               t_->settings[GRPC_PEER_SETTINGS]
572                           [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
573               &s_->stats.outgoing},
574           *s_->send_trailing_metadata, &t_->outbuf);
575     }
576     write_context_->IncTrailingMetadataWrites();
577     grpc_chttp2_reset_ping_clock(t_);
578     SentLastFrame();
579 
580     write_context_->NoteScheduledResults();
581     grpc_chttp2_complete_closure_step(
582         t_, s_, &s_->send_trailing_metadata_finished, absl::OkStatus(),
583         "send_trailing_metadata_finished");
584   }
585 
stream_became_writable()586   bool stream_became_writable() { return stream_became_writable_; }
587 
588  private:
ConvertInitialMetadataToTrailingMetadata()589   void ConvertInitialMetadataToTrailingMetadata() {
590     GRPC_CHTTP2_IF_TRACING(
591         gpr_log(GPR_INFO, "not sending initial_metadata (Trailers-Only)"));
592     // When sending Trailers-Only, we need to move the :status and
593     // content-type headers to the trailers.
594     send_status_ =
595         s_->send_initial_metadata->get(grpc_core::HttpStatusMetadata());
596     send_content_type_ =
597         s_->send_initial_metadata->get(grpc_core::ContentTypeMetadata());
598   }
599 
SentLastFrame()600   void SentLastFrame() {
601     s_->send_trailing_metadata = nullptr;
602     if (s_->sent_trailing_metadata_op) {
603       *s_->sent_trailing_metadata_op = true;
604       s_->sent_trailing_metadata_op = nullptr;
605     }
606     s_->sent_trailing_metadata = true;
607     s_->eos_sent = true;
608 
609     if (!t_->is_client && !s_->read_closed) {
610       grpc_slice_buffer_add(
611           &t_->outbuf, grpc_chttp2_rst_stream_create(
612                            s_->id, GRPC_HTTP2_NO_ERROR, &s_->stats.outgoing));
613     }
614     grpc_chttp2_mark_stream_closed(t_, s_, !t_->is_client, true,
615                                    absl::OkStatus());
616   }
617 
618   WriteContext* const write_context_;
619   grpc_chttp2_transport* const t_;
620   grpc_chttp2_stream* const s_;
621   bool stream_became_writable_ = false;
622   absl::optional<uint32_t> send_status_;
623   absl::optional<grpc_core::ContentTypeMetadata::ValueType> send_content_type_ =
624       {};
625 };
626 }  // namespace
627 
grpc_chttp2_begin_write(grpc_chttp2_transport * t)628 grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
629     grpc_chttp2_transport* t) {
630   int64_t outbuf_relative_start_pos = 0;
631   WriteContext ctx(t);
632   ctx.FlushSettings();
633   ctx.FlushPingAcks();
634   ctx.FlushQueuedBuffers();
635   ctx.EnactHpackSettings();
636 
637   if (t->flow_control.remote_window() > 0) {
638     ctx.UpdateStreamsNoLongerStalled();
639   }
640 
641   // for each grpc_chttp2_stream that's become writable, frame it's data
642   // (according to available window sizes) and add to the output buffer
643   while (grpc_chttp2_stream* s = ctx.NextStream()) {
644     StreamWriteContext stream_ctx(&ctx, s);
645     size_t orig_len = t->outbuf.length;
646     int64_t num_stream_bytes = 0;
647     stream_ctx.FlushInitialMetadata();
648     stream_ctx.FlushWindowUpdates();
649     stream_ctx.FlushData();
650     stream_ctx.FlushTrailingMetadata();
651     if (t->outbuf.length > orig_len) {
652       // Add this stream to the list of the contexts to be traced at TCP
653       num_stream_bytes = t->outbuf.length - orig_len;
654       s->byte_counter += static_cast<size_t>(num_stream_bytes);
655       if (s->traced && grpc_endpoint_can_track_err(t->ep)) {
656         grpc_core::CopyContextFn copy_context_fn =
657             grpc_core::GrpcHttp2GetCopyContextFn();
658         if (copy_context_fn != nullptr &&
659             grpc_core::GrpcHttp2GetWriteTimestampsCallback() != nullptr) {
660           t->cl->emplace_back(copy_context_fn(s->context),
661                               outbuf_relative_start_pos, num_stream_bytes,
662                               s->byte_counter);
663         }
664       }
665       outbuf_relative_start_pos += num_stream_bytes;
666     }
667     if (stream_ctx.stream_became_writable()) {
668       if (!grpc_chttp2_list_add_writing_stream(t, s)) {
669         // already in writing list: drop ref
670         GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:already_writing");
671       } else {
672         // ref will be dropped at end of write
673       }
674     } else {
675       GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:no_write");
676     }
677   }
678 
679   ctx.FlushWindowUpdates();
680 
681   maybe_initiate_ping(t);
682 
683   return ctx.Result();
684 }
685 
grpc_chttp2_end_write(grpc_chttp2_transport * t,grpc_error_handle error)686 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error_handle error) {
687   grpc_chttp2_stream* s;
688 
689   if (t->channelz_socket != nullptr) {
690     t->channelz_socket->RecordMessagesSent(t->num_messages_in_next_write);
691   }
692   t->num_messages_in_next_write = 0;
693 
694   while (grpc_chttp2_list_pop_writing_stream(t, &s)) {
695     if (s->sending_bytes != 0) {
696       update_list(t, s, static_cast<int64_t>(s->sending_bytes),
697                   &s->on_write_finished_cbs, &s->flow_controlled_bytes_written,
698                   error);
699       s->sending_bytes = 0;
700     }
701     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:end");
702   }
703   grpc_slice_buffer_reset_and_unref(&t->outbuf);
704 }
705