xref: /aosp_15_r20/external/grpc-grpc/test/core/transport/chttp2/streams_not_seen_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2022 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <limits.h>
20 #include <stdint.h>
21 #include <stdlib.h>
22 #include <string.h>
23 
24 #include <atomic>
25 #include <memory>
26 #include <new>
27 #include <string>
28 #include <thread>
29 #include <vector>
30 
31 #include "absl/base/thread_annotations.h"
32 #include "absl/status/status.h"
33 #include "absl/strings/match.h"
34 #include "absl/strings/str_cat.h"
35 #include "absl/strings/string_view.h"
36 #include "absl/time/time.h"
37 #include "absl/types/optional.h"
38 #include "gtest/gtest.h"
39 
40 #include <grpc/grpc.h>
41 #include <grpc/grpc_security.h>
42 #include <grpc/impl/channel_arg_names.h>
43 #include <grpc/impl/propagation_bits.h>
44 #include <grpc/slice.h>
45 #include <grpc/slice_buffer.h>
46 #include <grpc/status.h>
47 #include <grpc/support/alloc.h>
48 #include <grpc/support/log.h>
49 #include <grpc/support/port_platform.h>
50 
51 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
52 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
53 #include "src/core/lib/channel/channel_args.h"
54 #include "src/core/lib/channel/channel_fwd.h"
55 #include "src/core/lib/channel/channel_stack.h"
56 #include "src/core/lib/config/core_configuration.h"
57 #include "src/core/lib/gpr/useful.h"
58 #include "src/core/lib/gprpp/debug_location.h"
59 #include "src/core/lib/gprpp/host_port.h"
60 #include "src/core/lib/gprpp/notification.h"
61 #include "src/core/lib/gprpp/sync.h"
62 #include "src/core/lib/gprpp/time.h"
63 #include "src/core/lib/iomgr/closure.h"
64 #include "src/core/lib/iomgr/endpoint.h"
65 #include "src/core/lib/iomgr/error.h"
66 #include "src/core/lib/iomgr/exec_ctx.h"
67 #include "src/core/lib/iomgr/iomgr_fwd.h"
68 #include "src/core/lib/iomgr/tcp_server.h"
69 #include "src/core/lib/slice/slice.h"
70 #include "src/core/lib/slice/slice_internal.h"
71 #include "src/core/lib/surface/channel_stack_type.h"
72 #include "src/core/lib/transport/metadata_batch.h"
73 #include "src/core/lib/transport/transport.h"
74 #include "test/core/end2end/cq_verifier.h"
75 #include "test/core/util/port.h"
76 #include "test/core/util/test_config.h"
77 #include "test/core/util/test_tcp_server.h"
78 
79 namespace grpc_core {
80 namespace {
81 
Tag(intptr_t t)82 void* Tag(intptr_t t) { return reinterpret_cast<void*>(t); }
83 
84 // A filter that records state about trailing metadata.
85 class TrailingMetadataRecordingFilter {
86  public:
87   static grpc_channel_filter kFilterVtable;
88 
trailing_metadata_available()89   static bool trailing_metadata_available() {
90     return trailing_metadata_available_;
91   }
92 
reset_trailing_metadata_available()93   static void reset_trailing_metadata_available() {
94     trailing_metadata_available_ = false;
95   }
96 
97   static absl::optional<GrpcStreamNetworkState::ValueType>
stream_network_state()98   stream_network_state() {
99     return stream_network_state_;
100   }
101 
reset_stream_network_state()102   static void reset_stream_network_state() {
103     stream_network_state_ = absl::nullopt;
104   }
105 
reset_state()106   static void reset_state() {
107     reset_trailing_metadata_available();
108     reset_stream_network_state();
109   }
110 
111  private:
112   class CallData {
113    public:
Init(grpc_call_element * elem,const grpc_call_element_args * args)114     static grpc_error_handle Init(grpc_call_element* elem,
115                                   const grpc_call_element_args* args) {
116       new (elem->call_data) CallData(args);
117       return absl::OkStatus();
118     }
119 
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)120     static void Destroy(grpc_call_element* elem,
121                         const grpc_call_final_info* /*final_info*/,
122                         grpc_closure* /*ignored*/) {
123       auto* calld = static_cast<CallData*>(elem->call_data);
124       calld->~CallData();
125     }
126 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)127     static void StartTransportStreamOpBatch(
128         grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
129       auto* calld = static_cast<CallData*>(elem->call_data);
130       if (batch->recv_initial_metadata) {
131         calld->trailing_metadata_available_ =
132             batch->payload->recv_initial_metadata.trailing_metadata_available;
133         calld->original_recv_initial_metadata_ready_ =
134             batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
135         batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
136             &calld->recv_initial_metadata_ready_;
137       }
138       if (batch->recv_trailing_metadata) {
139         calld->recv_trailing_metadata_ =
140             batch->payload->recv_trailing_metadata.recv_trailing_metadata;
141         calld->original_recv_trailing_metadata_ready_ =
142             batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
143         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
144             &calld->recv_trailing_metadata_ready_;
145       }
146       grpc_call_next_op(elem, batch);
147     }
148 
149    private:
CallData(const grpc_call_element_args *)150     explicit CallData(const grpc_call_element_args* /*args*/) {
151       GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
152                         this, nullptr);
153       GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
154                         RecvTrailingMetadataReady, this, nullptr);
155     }
156 
RecvInitialMetadataReady(void * arg,grpc_error_handle error)157     static void RecvInitialMetadataReady(void* arg, grpc_error_handle error) {
158       auto* calld = static_cast<CallData*>(arg);
159       TrailingMetadataRecordingFilter::trailing_metadata_available_ =
160           *calld->trailing_metadata_available_;
161       Closure::Run(DEBUG_LOCATION, calld->original_recv_initial_metadata_ready_,
162                    error);
163     }
164 
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)165     static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error) {
166       auto* calld = static_cast<CallData*>(arg);
167       stream_network_state_ =
168           calld->recv_trailing_metadata_->get(GrpcStreamNetworkState());
169       Closure::Run(DEBUG_LOCATION,
170                    calld->original_recv_trailing_metadata_ready_, error);
171     }
172 
173     bool* trailing_metadata_available_ = nullptr;
174     grpc_closure recv_initial_metadata_ready_;
175     grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
176     grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
177     grpc_closure recv_trailing_metadata_ready_;
178     grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
179   };
180 
Init(grpc_channel_element * elem,grpc_channel_element_args *)181   static grpc_error_handle Init(grpc_channel_element* elem,
182                                 grpc_channel_element_args* /*args*/) {
183     new (elem->channel_data) TrailingMetadataRecordingFilter();
184     return absl::OkStatus();
185   }
186 
Destroy(grpc_channel_element * elem)187   static void Destroy(grpc_channel_element* elem) {
188     auto* chand =
189         static_cast<TrailingMetadataRecordingFilter*>(elem->channel_data);
190     chand->~TrailingMetadataRecordingFilter();
191   }
192 
193   static bool trailing_metadata_available_;
194   static absl::optional<GrpcStreamNetworkState::ValueType>
195       stream_network_state_;
196 };
197 
198 grpc_channel_filter TrailingMetadataRecordingFilter::kFilterVtable = {
199     CallData::StartTransportStreamOpBatch,
200     nullptr,
201     nullptr,
202     grpc_channel_next_op,
203     sizeof(CallData),
204     CallData::Init,
205     grpc_call_stack_ignore_set_pollset_or_pollset_set,
206     CallData::Destroy,
207     sizeof(TrailingMetadataRecordingFilter),
208     Init,
209     grpc_channel_stack_no_post_init,
210     Destroy,
211     grpc_channel_next_get_info,
212     // Want to add the filter as close to the end as possible, to
213     // make sure that all of the filters work well together.
214     // However, we can't add it at the very end, because the
215     // connected channel filter must be the last one.
216     // Channel init code falls back to lexical ordering of filters if there are
217     // otherwise no dependencies, so we leverage that.
218     "zzzzzz_trailing-metadata-recording-filter",
219 };
220 bool TrailingMetadataRecordingFilter::trailing_metadata_available_;
221 absl::optional<GrpcStreamNetworkState::ValueType>
222     TrailingMetadataRecordingFilter::stream_network_state_;
223 
224 class StreamsNotSeenTest : public ::testing::Test {
225  protected:
StreamsNotSeenTest(bool server_allows_streams=true)226   explicit StreamsNotSeenTest(bool server_allows_streams = true)
227       : server_allows_streams_(server_allows_streams) {
228     // Reset the filter state
229     TrailingMetadataRecordingFilter::reset_state();
230     grpc_slice_buffer_init(&read_buffer_);
231     GRPC_CLOSURE_INIT(&on_read_done_, OnReadDone, this, nullptr);
232     // Start the test tcp server
233     port_ = grpc_pick_unused_port_or_die();
234     test_tcp_server_init(&server_, OnConnect, this);
235     test_tcp_server_start(&server_, port_);
236     // Start polling on the test tcp server
237     server_poll_thread_ = std::make_unique<std::thread>([this]() {
238       while (!shutdown_) {
239         test_tcp_server_poll(&server_, 10);
240       }
241     });
242     // Create the channel
243     cq_ = grpc_completion_queue_create_for_next(nullptr);
244     cqv_ = std::make_unique<CqVerifier>(cq_);
245     grpc_arg client_args[] = {
246         grpc_channel_arg_integer_create(
247             const_cast<char*>(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0),
248         grpc_channel_arg_integer_create(
249             const_cast<char*>(GRPC_ARG_HTTP2_BDP_PROBE), 0),
250         grpc_channel_arg_integer_create(
251             const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 0)};
252     grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args),
253                                              client_args};
254     grpc_channel_credentials* creds = grpc_insecure_credentials_create();
255     channel_ = grpc_channel_create(JoinHostPort("127.0.0.1", port_).c_str(),
256                                    creds, &client_channel_args);
257     grpc_channel_credentials_release(creds);
258     // Wait for the channel to connect
259     grpc_connectivity_state state = grpc_channel_check_connectivity_state(
260         channel_, /*try_to_connect=*/true);
261     while (state != GRPC_CHANNEL_READY) {
262       grpc_channel_watch_connectivity_state(
263           channel_, state, grpc_timeout_seconds_to_deadline(1), cq_, Tag(1));
264       cqv_->Expect(Tag(1), true);
265       cqv_->Verify(Duration::Seconds(5));
266       state = grpc_channel_check_connectivity_state(channel_, false);
267     }
268     ExecCtx::Get()->Flush();
269     GPR_ASSERT(
270         connect_notification_.WaitForNotificationWithTimeout(absl::Seconds(1)));
271   }
272 
~StreamsNotSeenTest()273   ~StreamsNotSeenTest() override {
274     cqv_.reset();
275     grpc_completion_queue_shutdown(cq_);
276     grpc_event ev;
277     do {
278       ev = grpc_completion_queue_next(cq_, grpc_timeout_seconds_to_deadline(1),
279                                       nullptr);
280     } while (ev.type != GRPC_QUEUE_SHUTDOWN);
281     grpc_completion_queue_destroy(cq_);
282     grpc_channel_destroy(channel_);
283     grpc_endpoint_shutdown(tcp_, GRPC_ERROR_CREATE("Test Shutdown"));
284     ExecCtx::Get()->Flush();
285     GPR_ASSERT(read_end_notification_.WaitForNotificationWithTimeout(
286         absl::Seconds(5)));
287     grpc_endpoint_destroy(tcp_);
288     shutdown_ = true;
289     server_poll_thread_->join();
290     test_tcp_server_destroy(&server_);
291     ExecCtx::Get()->Flush();
292   }
293 
OnConnect(void * arg,grpc_endpoint * tcp,grpc_pollset *,grpc_tcp_server_acceptor * acceptor)294   static void OnConnect(void* arg, grpc_endpoint* tcp,
295                         grpc_pollset* /* accepting_pollset */,
296                         grpc_tcp_server_acceptor* acceptor) {
297     gpr_free(acceptor);
298     StreamsNotSeenTest* self = static_cast<StreamsNotSeenTest*>(arg);
299     self->tcp_ = tcp;
300     grpc_endpoint_add_to_pollset(tcp, self->server_.pollset[0]);
301     grpc_endpoint_read(tcp, &self->read_buffer_, &self->on_read_done_, false,
302                        /*min_progress_size=*/1);
303     std::thread([self]() {
304       ExecCtx exec_ctx;
305       // Send settings frame from server
306       if (self->server_allows_streams_) {
307         constexpr char kHttp2SettingsFrame[] =
308             "\x00\x00\x00\x04\x00\x00\x00\x00\x00";
309         self->Write(absl::string_view(kHttp2SettingsFrame,
310                                       sizeof(kHttp2SettingsFrame) - 1));
311       } else {
312         // Create a settings frame with a max concurrent stream setting of 0
313         constexpr char kHttp2SettingsFrame[] =
314             "\x00\x00\x06\x04\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00";
315         self->Write(absl::string_view(kHttp2SettingsFrame,
316                                       sizeof(kHttp2SettingsFrame) - 1));
317       }
318       self->connect_notification_.Notify();
319     }).detach();
320   }
321 
322   // This is a blocking call. It waits for the write callback to be invoked
323   // before returning. (In other words, do not call this from a thread that
324   // should not be blocked, for example, a polling thread.)
Write(absl::string_view bytes)325   void Write(absl::string_view bytes) {
326     grpc_slice slice =
327         StaticSlice::FromStaticBuffer(bytes.data(), bytes.size()).TakeCSlice();
328     grpc_slice_buffer buffer;
329     grpc_slice_buffer_init(&buffer);
330     grpc_slice_buffer_add(&buffer, slice);
331     WriteBuffer(&buffer);
332     grpc_slice_buffer_destroy(&buffer);
333   }
334 
SendPing()335   void SendPing() {
336     // Send and recv ping ack
337     const char ping_bytes[] =
338         "\x00\x00\x08\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00";
339     const char ping_ack_bytes[] =
340         "\x00\x00\x08\x06\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00";
341     Write(absl::string_view(ping_bytes, sizeof(ping_bytes) - 1));
342     WaitForReadBytes(
343         absl::string_view(ping_ack_bytes, sizeof(ping_ack_bytes) - 1));
344   }
345 
SendGoaway(uint32_t last_stream_id)346   void SendGoaway(uint32_t last_stream_id) {
347     grpc_slice_buffer buffer;
348     grpc_slice_buffer_init(&buffer);
349     grpc_chttp2_goaway_append(last_stream_id, 0, grpc_empty_slice(), &buffer);
350     WriteBuffer(&buffer);
351     grpc_slice_buffer_destroy(&buffer);
352   }
353 
WriteBuffer(grpc_slice_buffer * buffer)354   void WriteBuffer(grpc_slice_buffer* buffer) {
355     Notification on_write_done_notification_;
356     GRPC_CLOSURE_INIT(&on_write_done_, OnWriteDone,
357                       &on_write_done_notification_, nullptr);
358     grpc_endpoint_write(tcp_, buffer, &on_write_done_, nullptr,
359                         /*max_frame_size=*/INT_MAX);
360     ExecCtx::Get()->Flush();
361     GPR_ASSERT(on_write_done_notification_.WaitForNotificationWithTimeout(
362         absl::Seconds(5)));
363   }
364 
OnWriteDone(void * arg,grpc_error_handle error)365   static void OnWriteDone(void* arg, grpc_error_handle error) {
366     GPR_ASSERT(error.ok());
367     Notification* on_write_done_notification_ = static_cast<Notification*>(arg);
368     on_write_done_notification_->Notify();
369   }
370 
OnReadDone(void * arg,grpc_error_handle error)371   static void OnReadDone(void* arg, grpc_error_handle error) {
372     StreamsNotSeenTest* self = static_cast<StreamsNotSeenTest*>(arg);
373     if (error.ok()) {
374       {
375         MutexLock lock(&self->mu_);
376         for (size_t i = 0; i < self->read_buffer_.count; ++i) {
377           absl::StrAppend(&self->read_bytes_,
378                           StringViewFromSlice(self->read_buffer_.slices[i]));
379         }
380         self->read_cv_.SignalAll();
381       }
382       grpc_slice_buffer_reset_and_unref(&self->read_buffer_);
383       grpc_endpoint_read(self->tcp_, &self->read_buffer_, &self->on_read_done_,
384                          false, /*min_progress_size=*/1);
385     } else {
386       grpc_slice_buffer_destroy(&self->read_buffer_);
387       self->read_end_notification_.Notify();
388     }
389   }
390 
391   // Waits for \a bytes to show up in read_bytes_
WaitForReadBytes(absl::string_view bytes)392   void WaitForReadBytes(absl::string_view bytes) {
393     std::atomic<bool> done{false};
394     std::thread cq_driver([&]() {
395       while (!done) {
396         grpc_event ev = grpc_completion_queue_next(
397             cq_, grpc_timeout_milliseconds_to_deadline(10), nullptr);
398         GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
399       }
400     });
401     {
402       MutexLock lock(&mu_);
403       while (!absl::StrContains(read_bytes_, bytes)) {
404         read_cv_.WaitWithTimeout(&mu_, absl::Seconds(5));
405       }
406     }
407     done = true;
408     cq_driver.join();
409   }
410 
411   // Flag to check whether the server's MAX_CONCURRENT_STREAM setting is
412   // non-zero or not.
413   bool server_allows_streams_;
414   int port_;
415   test_tcp_server server_;
416   std::unique_ptr<std::thread> server_poll_thread_;
417   grpc_endpoint* tcp_ = nullptr;
418   Notification connect_notification_;
419   grpc_slice_buffer read_buffer_;
420   grpc_closure on_write_done_;
421   grpc_closure on_read_done_;
422   Notification read_end_notification_;
423   std::string read_bytes_ ABSL_GUARDED_BY(mu_);
424   grpc_channel* channel_ = nullptr;
425   grpc_completion_queue* cq_ = nullptr;
426   std::unique_ptr<CqVerifier> cqv_;
427   Mutex mu_;
428   CondVar read_cv_;
429   std::atomic<bool> shutdown_{false};
430 };
431 
432 // Client's HTTP2 transport starts a new stream, sends the request on the wire,
433 // but receives a GOAWAY with a stream ID of 0, meaning that the request was
434 // unseen by the server.The test verifies that the HTTP2 transport adds
435 // GrpcNetworkStreamState with a value of kNotSeenByServer to the trailing
436 // metadata.
TEST_F(StreamsNotSeenTest,StartStreamBeforeGoaway)437 TEST_F(StreamsNotSeenTest, StartStreamBeforeGoaway) {
438   grpc_call* c =
439       grpc_channel_create_call(channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, cq_,
440                                grpc_slice_from_static_string("/foo"), nullptr,
441                                grpc_timeout_seconds_to_deadline(1), nullptr);
442   GPR_ASSERT(c);
443   grpc_metadata_array initial_metadata_recv;
444   grpc_metadata_array trailing_metadata_recv;
445   grpc_metadata_array_init(&initial_metadata_recv);
446   grpc_metadata_array_init(&trailing_metadata_recv);
447   grpc_op* op;
448   grpc_op ops[6];
449   grpc_status_code status;
450   const char* error_string;
451   grpc_call_error error;
452   grpc_slice details;
453   // Send the request
454   memset(ops, 0, sizeof(ops));
455   op = ops;
456   op->op = GRPC_OP_SEND_INITIAL_METADATA;
457   op->data.send_initial_metadata.count = 0;
458   op->flags = 0;
459   op->reserved = nullptr;
460   op++;
461   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
462   op->flags = 0;
463   op->reserved = nullptr;
464   op++;
465   error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(101),
466                                 nullptr);
467   cqv_->Expect(Tag(101), true);
468   cqv_->Verify();
469   // Send a goaway from server signalling that the request was unseen by the
470   // server.
471   SendGoaway(0);
472   memset(ops, 0, sizeof(ops));
473   op = ops;
474   op->op = GRPC_OP_RECV_INITIAL_METADATA;
475   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
476   op->flags = 0;
477   op->reserved = nullptr;
478   op++;
479   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
480   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
481   op->data.recv_status_on_client.status = &status;
482   op->data.recv_status_on_client.status_details = &details;
483   op->data.recv_status_on_client.error_string = &error_string;
484   op->flags = 0;
485   op->reserved = nullptr;
486   op++;
487   error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(102),
488                                 nullptr);
489   GPR_ASSERT(GRPC_CALL_OK == error);
490   cqv_->Expect(Tag(102), true);
491   cqv_->Verify();
492   // Verify status and metadata
493   EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
494   ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
495   ASSERT_TRUE(
496       TrailingMetadataRecordingFilter::stream_network_state().has_value());
497   EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().value(),
498             GrpcStreamNetworkState::kNotSeenByServer);
499   grpc_slice_unref(details);
500   gpr_free(const_cast<char*>(error_string));
501   grpc_metadata_array_destroy(&initial_metadata_recv);
502   grpc_metadata_array_destroy(&trailing_metadata_recv);
503   grpc_call_unref(c);
504   ExecCtx::Get()->Flush();
505 }
506 
507 // Client's HTTP2 transport starts a new stream, sends the request on the wire,
508 // notices that the transport is destroyed. The test verifies that the HTTP2
509 // transport does not add GrpcNetworkStreamState metadata since we don't know
510 // whether the server saw the request or not.
TEST_F(StreamsNotSeenTest,TransportDestroyed)511 TEST_F(StreamsNotSeenTest, TransportDestroyed) {
512   grpc_call* c =
513       grpc_channel_create_call(channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, cq_,
514                                grpc_slice_from_static_string("/foo"), nullptr,
515                                grpc_timeout_seconds_to_deadline(1), nullptr);
516   GPR_ASSERT(c);
517   grpc_metadata_array initial_metadata_recv;
518   grpc_metadata_array trailing_metadata_recv;
519   grpc_metadata_array_init(&initial_metadata_recv);
520   grpc_metadata_array_init(&trailing_metadata_recv);
521   grpc_op* op;
522   grpc_op ops[6];
523   grpc_status_code status;
524   const char* error_string;
525   grpc_call_error error;
526   grpc_slice details;
527   // Send the request
528   memset(ops, 0, sizeof(ops));
529   op = ops;
530   op->op = GRPC_OP_SEND_INITIAL_METADATA;
531   op->data.send_initial_metadata.count = 0;
532   op->flags = 0;
533   op->reserved = nullptr;
534   op++;
535   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
536   op->flags = 0;
537   op->reserved = nullptr;
538   op++;
539   error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(101),
540                                 nullptr);
541   cqv_->Expect(Tag(101), true);
542   cqv_->Verify();
543   // Shutdown the server endpoint
544   grpc_endpoint_shutdown(tcp_, GRPC_ERROR_CREATE("Server shutdown"));
545   memset(ops, 0, sizeof(ops));
546   op = ops;
547   op->op = GRPC_OP_RECV_INITIAL_METADATA;
548   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
549   op->flags = 0;
550   op->reserved = nullptr;
551   op++;
552   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
553   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
554   op->data.recv_status_on_client.status = &status;
555   op->data.recv_status_on_client.status_details = &details;
556   op->data.recv_status_on_client.error_string = &error_string;
557   op->flags = 0;
558   op->reserved = nullptr;
559   op++;
560   error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(102),
561                                 nullptr);
562   GPR_ASSERT(GRPC_CALL_OK == error);
563   cqv_->Expect(Tag(102), true);
564   cqv_->Verify();
565   // Verify status and metadata
566   EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
567   EXPECT_FALSE(
568       TrailingMetadataRecordingFilter::stream_network_state().has_value());
569   grpc_slice_unref(details);
570   gpr_free(const_cast<char*>(error_string));
571   grpc_metadata_array_destroy(&initial_metadata_recv);
572   grpc_metadata_array_destroy(&trailing_metadata_recv);
573   grpc_call_unref(c);
574   ExecCtx::Get()->Flush();
575 }
576 
577 // Client's HTTP2 transport tries to send an RPC after having received a GOAWAY
578 // frame. The test verifies that the HTTP2 transport adds GrpcNetworkStreamState
579 // with a value of kNotSentOnWire to the trailing metadata.
TEST_F(StreamsNotSeenTest,StartStreamAfterGoaway)580 TEST_F(StreamsNotSeenTest, StartStreamAfterGoaway) {
581   // Send Goaway from the server
582   SendGoaway(0);
583   // Send a ping to make sure that the goaway was received.
584   SendPing();
585   // Try sending an RPC
586   grpc_call* c =
587       grpc_channel_create_call(channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, cq_,
588                                grpc_slice_from_static_string("/foo"), nullptr,
589                                grpc_timeout_seconds_to_deadline(1), nullptr);
590   GPR_ASSERT(c);
591   grpc_metadata_array initial_metadata_recv;
592   grpc_metadata_array trailing_metadata_recv;
593   grpc_metadata_array_init(&initial_metadata_recv);
594   grpc_metadata_array_init(&trailing_metadata_recv);
595   grpc_op* op;
596   grpc_op ops[6];
597   grpc_status_code status;
598   const char* error_string;
599   grpc_call_error error;
600   grpc_slice details;
601   memset(ops, 0, sizeof(ops));
602   op = ops;
603   op->op = GRPC_OP_SEND_INITIAL_METADATA;
604   op->data.send_initial_metadata.count = 0;
605   op->flags = 0;
606   op->reserved = nullptr;
607   op++;
608   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
609   op->flags = 0;
610   op->reserved = nullptr;
611   op++;
612   op->op = GRPC_OP_RECV_INITIAL_METADATA;
613   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
614   op->flags = 0;
615   op->reserved = nullptr;
616   op++;
617   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
618   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
619   op->data.recv_status_on_client.status = &status;
620   op->data.recv_status_on_client.status_details = &details;
621   op->data.recv_status_on_client.error_string = &error_string;
622   op->flags = 0;
623   op->reserved = nullptr;
624   op++;
625   error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(101),
626                                 nullptr);
627   GPR_ASSERT(GRPC_CALL_OK == error);
628   cqv_->Expect(Tag(101), true);
629   cqv_->Verify();
630   // Verify status and metadata
631   EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
632   ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
633   ASSERT_TRUE(
634       TrailingMetadataRecordingFilter::stream_network_state().has_value());
635   EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().value(),
636             GrpcStreamNetworkState::kNotSentOnWire);
637   grpc_slice_unref(details);
638   gpr_free(const_cast<char*>(error_string));
639   grpc_metadata_array_destroy(&initial_metadata_recv);
640   grpc_metadata_array_destroy(&trailing_metadata_recv);
641   grpc_call_unref(c);
642   ExecCtx::Get()->Flush();
643 }
644 
645 // These tests have the server sending a SETTINGS_FRAME with a max concurrent
646 // streams settings of 0 which denies the client the chance to start a stream.
647 // Note that in the future, these tests might become outdated if the
648 // client_channel learns about the max concurrent streams setting.
649 class ZeroConcurrencyTest : public StreamsNotSeenTest {
650  protected:
ZeroConcurrencyTest()651   ZeroConcurrencyTest() : StreamsNotSeenTest(/*server_allows_streams=*/false) {}
652 };
653 
654 // Client's HTTP2 transport receives a RPC request, but it cannot start the RPC
655 // because of the max concurrent streams setting. A goaway frame is then
656 // received which should result in the RPC getting cancelled with
657 // kNotSentOnWire.
TEST_F(ZeroConcurrencyTest,StartStreamBeforeGoaway)658 TEST_F(ZeroConcurrencyTest, StartStreamBeforeGoaway) {
659   grpc_call* c =
660       grpc_channel_create_call(channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, cq_,
661                                grpc_slice_from_static_string("/foo"), nullptr,
662                                grpc_timeout_seconds_to_deadline(5), nullptr);
663   GPR_ASSERT(c);
664   grpc_metadata_array initial_metadata_recv;
665   grpc_metadata_array trailing_metadata_recv;
666   grpc_metadata_array_init(&initial_metadata_recv);
667   grpc_metadata_array_init(&trailing_metadata_recv);
668   grpc_op* op;
669   grpc_op ops[6];
670   grpc_status_code status;
671   const char* error_string;
672   grpc_call_error error;
673   grpc_slice details;
674   // Send the request
675   memset(ops, 0, sizeof(ops));
676   op = ops;
677   op->op = GRPC_OP_SEND_INITIAL_METADATA;
678   op->data.send_initial_metadata.count = 0;
679   op->flags = 0;
680   op->reserved = nullptr;
681   op++;
682   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
683   op->flags = 0;
684   op->reserved = nullptr;
685   op++;
686   op->op = GRPC_OP_RECV_INITIAL_METADATA;
687   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
688   op->flags = 0;
689   op->reserved = nullptr;
690   op++;
691   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
692   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
693   op->data.recv_status_on_client.status = &status;
694   op->data.recv_status_on_client.status_details = &details;
695   op->data.recv_status_on_client.error_string = &error_string;
696   op->flags = 0;
697   op->reserved = nullptr;
698   op++;
699   error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(101),
700                                 nullptr);
701   // This test assumes that nothing would pause the RPC before its received by
702   // the transport. If that no longer holds true, we might need to drive the cq
703   // for some time to make sure that the RPC reaches the HTTP2 layer.
704   SendGoaway(0);
705   GPR_ASSERT(GRPC_CALL_OK == error);
706   cqv_->Expect(Tag(101), true);
707   cqv_->Verify();
708   // Verify status and metadata
709   EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
710   ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
711   ASSERT_TRUE(
712       TrailingMetadataRecordingFilter::stream_network_state().has_value());
713   EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().value(),
714             GrpcStreamNetworkState::kNotSentOnWire);
715   grpc_slice_unref(details);
716   gpr_free(const_cast<char*>(error_string));
717   grpc_metadata_array_destroy(&initial_metadata_recv);
718   grpc_metadata_array_destroy(&trailing_metadata_recv);
719   grpc_call_unref(c);
720   ExecCtx::Get()->Flush();
721 }
722 
723 // Client's HTTP2 transport receives a RPC request, but it cannot start the RPC
724 // because of the max concurrent streams setting. Server then shuts its endpoint
725 // which should result in the RPC getting cancelled with kNotSentOnWire.
TEST_F(ZeroConcurrencyTest,TransportDestroyed)726 TEST_F(ZeroConcurrencyTest, TransportDestroyed) {
727   grpc_call* c =
728       grpc_channel_create_call(channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, cq_,
729                                grpc_slice_from_static_string("/foo"), nullptr,
730                                grpc_timeout_seconds_to_deadline(5), nullptr);
731   GPR_ASSERT(c);
732   grpc_metadata_array initial_metadata_recv;
733   grpc_metadata_array trailing_metadata_recv;
734   grpc_metadata_array_init(&initial_metadata_recv);
735   grpc_metadata_array_init(&trailing_metadata_recv);
736   grpc_op* op;
737   grpc_op ops[6];
738   grpc_status_code status;
739   const char* error_string;
740   grpc_call_error error;
741   grpc_slice details;
742   // Send the request
743   memset(ops, 0, sizeof(ops));
744   op = ops;
745   op->op = GRPC_OP_SEND_INITIAL_METADATA;
746   op->data.send_initial_metadata.count = 0;
747   op->flags = 0;
748   op->reserved = nullptr;
749   op++;
750   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
751   op->flags = 0;
752   op->reserved = nullptr;
753   op++;
754   op->op = GRPC_OP_RECV_INITIAL_METADATA;
755   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
756   op->flags = 0;
757   op->reserved = nullptr;
758   op++;
759   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
760   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
761   op->data.recv_status_on_client.status = &status;
762   op->data.recv_status_on_client.status_details = &details;
763   op->data.recv_status_on_client.error_string = &error_string;
764   op->flags = 0;
765   op->reserved = nullptr;
766   op++;
767   error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(101),
768                                 nullptr);
769   grpc_endpoint_shutdown(tcp_, GRPC_ERROR_CREATE("Server shutdown"));
770   GPR_ASSERT(GRPC_CALL_OK == error);
771   cqv_->Expect(Tag(101), true);
772   cqv_->Verify();
773   // Verify status and metadata
774   EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
775   ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
776   ASSERT_TRUE(
777       TrailingMetadataRecordingFilter::stream_network_state().has_value());
778   EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().value(),
779             GrpcStreamNetworkState::kNotSentOnWire);
780   grpc_slice_unref(details);
781   gpr_free(const_cast<char*>(error_string));
782   grpc_metadata_array_destroy(&initial_metadata_recv);
783   grpc_metadata_array_destroy(&trailing_metadata_recv);
784   grpc_call_unref(c);
785   ExecCtx::Get()->Flush();
786 }
787 
788 }  // namespace
789 }  // namespace grpc_core
790 
main(int argc,char ** argv)791 int main(int argc, char** argv) {
792   ::testing::InitGoogleTest(&argc, argv);
793   grpc::testing::TestEnvironment env(&argc, argv);
794   int result;
795   grpc_core::CoreConfiguration::RunWithSpecialConfiguration(
796       [](grpc_core::CoreConfiguration::Builder* builder) {
797         grpc_core::BuildCoreConfiguration(builder);
798         builder->channel_init()->RegisterFilter(
799             GRPC_CLIENT_SUBCHANNEL,
800             &grpc_core::TrailingMetadataRecordingFilter::kFilterVtable);
801       },
802       [&] {
803         grpc_core::
804             TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification(
805                 true);
806         grpc_init();
807         {
808           grpc_core::ExecCtx exec_ctx;
809           result = RUN_ALL_TESTS();
810         }
811         grpc_shutdown();
812       });
813   return result;
814 }
815