1 //
2 //
3 // Copyright 2022 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <limits.h>
20 #include <stdint.h>
21 #include <stdlib.h>
22 #include <string.h>
23
24 #include <atomic>
25 #include <memory>
26 #include <new>
27 #include <string>
28 #include <thread>
29 #include <vector>
30
31 #include "absl/base/thread_annotations.h"
32 #include "absl/status/status.h"
33 #include "absl/strings/match.h"
34 #include "absl/strings/str_cat.h"
35 #include "absl/strings/string_view.h"
36 #include "absl/time/time.h"
37 #include "absl/types/optional.h"
38 #include "gtest/gtest.h"
39
40 #include <grpc/grpc.h>
41 #include <grpc/grpc_security.h>
42 #include <grpc/impl/channel_arg_names.h>
43 #include <grpc/impl/propagation_bits.h>
44 #include <grpc/slice.h>
45 #include <grpc/slice_buffer.h>
46 #include <grpc/status.h>
47 #include <grpc/support/alloc.h>
48 #include <grpc/support/log.h>
49 #include <grpc/support/port_platform.h>
50
51 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
52 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
53 #include "src/core/lib/channel/channel_args.h"
54 #include "src/core/lib/channel/channel_fwd.h"
55 #include "src/core/lib/channel/channel_stack.h"
56 #include "src/core/lib/config/core_configuration.h"
57 #include "src/core/lib/gpr/useful.h"
58 #include "src/core/lib/gprpp/debug_location.h"
59 #include "src/core/lib/gprpp/host_port.h"
60 #include "src/core/lib/gprpp/notification.h"
61 #include "src/core/lib/gprpp/sync.h"
62 #include "src/core/lib/gprpp/time.h"
63 #include "src/core/lib/iomgr/closure.h"
64 #include "src/core/lib/iomgr/endpoint.h"
65 #include "src/core/lib/iomgr/error.h"
66 #include "src/core/lib/iomgr/exec_ctx.h"
67 #include "src/core/lib/iomgr/iomgr_fwd.h"
68 #include "src/core/lib/iomgr/tcp_server.h"
69 #include "src/core/lib/slice/slice.h"
70 #include "src/core/lib/slice/slice_internal.h"
71 #include "src/core/lib/surface/channel_stack_type.h"
72 #include "src/core/lib/transport/metadata_batch.h"
73 #include "src/core/lib/transport/transport.h"
74 #include "test/core/end2end/cq_verifier.h"
75 #include "test/core/util/port.h"
76 #include "test/core/util/test_config.h"
77 #include "test/core/util/test_tcp_server.h"
78
79 namespace grpc_core {
80 namespace {
81
Tag(intptr_t t)82 void* Tag(intptr_t t) { return reinterpret_cast<void*>(t); }
83
84 // A filter that records state about trailing metadata.
85 class TrailingMetadataRecordingFilter {
86 public:
87 static grpc_channel_filter kFilterVtable;
88
trailing_metadata_available()89 static bool trailing_metadata_available() {
90 return trailing_metadata_available_;
91 }
92
reset_trailing_metadata_available()93 static void reset_trailing_metadata_available() {
94 trailing_metadata_available_ = false;
95 }
96
97 static absl::optional<GrpcStreamNetworkState::ValueType>
stream_network_state()98 stream_network_state() {
99 return stream_network_state_;
100 }
101
reset_stream_network_state()102 static void reset_stream_network_state() {
103 stream_network_state_ = absl::nullopt;
104 }
105
reset_state()106 static void reset_state() {
107 reset_trailing_metadata_available();
108 reset_stream_network_state();
109 }
110
111 private:
112 class CallData {
113 public:
Init(grpc_call_element * elem,const grpc_call_element_args * args)114 static grpc_error_handle Init(grpc_call_element* elem,
115 const grpc_call_element_args* args) {
116 new (elem->call_data) CallData(args);
117 return absl::OkStatus();
118 }
119
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)120 static void Destroy(grpc_call_element* elem,
121 const grpc_call_final_info* /*final_info*/,
122 grpc_closure* /*ignored*/) {
123 auto* calld = static_cast<CallData*>(elem->call_data);
124 calld->~CallData();
125 }
126
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)127 static void StartTransportStreamOpBatch(
128 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
129 auto* calld = static_cast<CallData*>(elem->call_data);
130 if (batch->recv_initial_metadata) {
131 calld->trailing_metadata_available_ =
132 batch->payload->recv_initial_metadata.trailing_metadata_available;
133 calld->original_recv_initial_metadata_ready_ =
134 batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
135 batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
136 &calld->recv_initial_metadata_ready_;
137 }
138 if (batch->recv_trailing_metadata) {
139 calld->recv_trailing_metadata_ =
140 batch->payload->recv_trailing_metadata.recv_trailing_metadata;
141 calld->original_recv_trailing_metadata_ready_ =
142 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
143 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
144 &calld->recv_trailing_metadata_ready_;
145 }
146 grpc_call_next_op(elem, batch);
147 }
148
149 private:
CallData(const grpc_call_element_args *)150 explicit CallData(const grpc_call_element_args* /*args*/) {
151 GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
152 this, nullptr);
153 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
154 RecvTrailingMetadataReady, this, nullptr);
155 }
156
RecvInitialMetadataReady(void * arg,grpc_error_handle error)157 static void RecvInitialMetadataReady(void* arg, grpc_error_handle error) {
158 auto* calld = static_cast<CallData*>(arg);
159 TrailingMetadataRecordingFilter::trailing_metadata_available_ =
160 *calld->trailing_metadata_available_;
161 Closure::Run(DEBUG_LOCATION, calld->original_recv_initial_metadata_ready_,
162 error);
163 }
164
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)165 static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error) {
166 auto* calld = static_cast<CallData*>(arg);
167 stream_network_state_ =
168 calld->recv_trailing_metadata_->get(GrpcStreamNetworkState());
169 Closure::Run(DEBUG_LOCATION,
170 calld->original_recv_trailing_metadata_ready_, error);
171 }
172
173 bool* trailing_metadata_available_ = nullptr;
174 grpc_closure recv_initial_metadata_ready_;
175 grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
176 grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
177 grpc_closure recv_trailing_metadata_ready_;
178 grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
179 };
180
Init(grpc_channel_element * elem,grpc_channel_element_args *)181 static grpc_error_handle Init(grpc_channel_element* elem,
182 grpc_channel_element_args* /*args*/) {
183 new (elem->channel_data) TrailingMetadataRecordingFilter();
184 return absl::OkStatus();
185 }
186
Destroy(grpc_channel_element * elem)187 static void Destroy(grpc_channel_element* elem) {
188 auto* chand =
189 static_cast<TrailingMetadataRecordingFilter*>(elem->channel_data);
190 chand->~TrailingMetadataRecordingFilter();
191 }
192
193 static bool trailing_metadata_available_;
194 static absl::optional<GrpcStreamNetworkState::ValueType>
195 stream_network_state_;
196 };
197
198 grpc_channel_filter TrailingMetadataRecordingFilter::kFilterVtable = {
199 CallData::StartTransportStreamOpBatch,
200 nullptr,
201 nullptr,
202 grpc_channel_next_op,
203 sizeof(CallData),
204 CallData::Init,
205 grpc_call_stack_ignore_set_pollset_or_pollset_set,
206 CallData::Destroy,
207 sizeof(TrailingMetadataRecordingFilter),
208 Init,
209 grpc_channel_stack_no_post_init,
210 Destroy,
211 grpc_channel_next_get_info,
212 // Want to add the filter as close to the end as possible, to
213 // make sure that all of the filters work well together.
214 // However, we can't add it at the very end, because the
215 // connected channel filter must be the last one.
216 // Channel init code falls back to lexical ordering of filters if there are
217 // otherwise no dependencies, so we leverage that.
218 "zzzzzz_trailing-metadata-recording-filter",
219 };
220 bool TrailingMetadataRecordingFilter::trailing_metadata_available_;
221 absl::optional<GrpcStreamNetworkState::ValueType>
222 TrailingMetadataRecordingFilter::stream_network_state_;
223
224 class StreamsNotSeenTest : public ::testing::Test {
225 protected:
StreamsNotSeenTest(bool server_allows_streams=true)226 explicit StreamsNotSeenTest(bool server_allows_streams = true)
227 : server_allows_streams_(server_allows_streams) {
228 // Reset the filter state
229 TrailingMetadataRecordingFilter::reset_state();
230 grpc_slice_buffer_init(&read_buffer_);
231 GRPC_CLOSURE_INIT(&on_read_done_, OnReadDone, this, nullptr);
232 // Start the test tcp server
233 port_ = grpc_pick_unused_port_or_die();
234 test_tcp_server_init(&server_, OnConnect, this);
235 test_tcp_server_start(&server_, port_);
236 // Start polling on the test tcp server
237 server_poll_thread_ = std::make_unique<std::thread>([this]() {
238 while (!shutdown_) {
239 test_tcp_server_poll(&server_, 10);
240 }
241 });
242 // Create the channel
243 cq_ = grpc_completion_queue_create_for_next(nullptr);
244 cqv_ = std::make_unique<CqVerifier>(cq_);
245 grpc_arg client_args[] = {
246 grpc_channel_arg_integer_create(
247 const_cast<char*>(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0),
248 grpc_channel_arg_integer_create(
249 const_cast<char*>(GRPC_ARG_HTTP2_BDP_PROBE), 0),
250 grpc_channel_arg_integer_create(
251 const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 0)};
252 grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args),
253 client_args};
254 grpc_channel_credentials* creds = grpc_insecure_credentials_create();
255 channel_ = grpc_channel_create(JoinHostPort("127.0.0.1", port_).c_str(),
256 creds, &client_channel_args);
257 grpc_channel_credentials_release(creds);
258 // Wait for the channel to connect
259 grpc_connectivity_state state = grpc_channel_check_connectivity_state(
260 channel_, /*try_to_connect=*/true);
261 while (state != GRPC_CHANNEL_READY) {
262 grpc_channel_watch_connectivity_state(
263 channel_, state, grpc_timeout_seconds_to_deadline(1), cq_, Tag(1));
264 cqv_->Expect(Tag(1), true);
265 cqv_->Verify(Duration::Seconds(5));
266 state = grpc_channel_check_connectivity_state(channel_, false);
267 }
268 ExecCtx::Get()->Flush();
269 GPR_ASSERT(
270 connect_notification_.WaitForNotificationWithTimeout(absl::Seconds(1)));
271 }
272
~StreamsNotSeenTest()273 ~StreamsNotSeenTest() override {
274 cqv_.reset();
275 grpc_completion_queue_shutdown(cq_);
276 grpc_event ev;
277 do {
278 ev = grpc_completion_queue_next(cq_, grpc_timeout_seconds_to_deadline(1),
279 nullptr);
280 } while (ev.type != GRPC_QUEUE_SHUTDOWN);
281 grpc_completion_queue_destroy(cq_);
282 grpc_channel_destroy(channel_);
283 grpc_endpoint_shutdown(tcp_, GRPC_ERROR_CREATE("Test Shutdown"));
284 ExecCtx::Get()->Flush();
285 GPR_ASSERT(read_end_notification_.WaitForNotificationWithTimeout(
286 absl::Seconds(5)));
287 grpc_endpoint_destroy(tcp_);
288 shutdown_ = true;
289 server_poll_thread_->join();
290 test_tcp_server_destroy(&server_);
291 ExecCtx::Get()->Flush();
292 }
293
OnConnect(void * arg,grpc_endpoint * tcp,grpc_pollset *,grpc_tcp_server_acceptor * acceptor)294 static void OnConnect(void* arg, grpc_endpoint* tcp,
295 grpc_pollset* /* accepting_pollset */,
296 grpc_tcp_server_acceptor* acceptor) {
297 gpr_free(acceptor);
298 StreamsNotSeenTest* self = static_cast<StreamsNotSeenTest*>(arg);
299 self->tcp_ = tcp;
300 grpc_endpoint_add_to_pollset(tcp, self->server_.pollset[0]);
301 grpc_endpoint_read(tcp, &self->read_buffer_, &self->on_read_done_, false,
302 /*min_progress_size=*/1);
303 std::thread([self]() {
304 ExecCtx exec_ctx;
305 // Send settings frame from server
306 if (self->server_allows_streams_) {
307 constexpr char kHttp2SettingsFrame[] =
308 "\x00\x00\x00\x04\x00\x00\x00\x00\x00";
309 self->Write(absl::string_view(kHttp2SettingsFrame,
310 sizeof(kHttp2SettingsFrame) - 1));
311 } else {
312 // Create a settings frame with a max concurrent stream setting of 0
313 constexpr char kHttp2SettingsFrame[] =
314 "\x00\x00\x06\x04\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00";
315 self->Write(absl::string_view(kHttp2SettingsFrame,
316 sizeof(kHttp2SettingsFrame) - 1));
317 }
318 self->connect_notification_.Notify();
319 }).detach();
320 }
321
322 // This is a blocking call. It waits for the write callback to be invoked
323 // before returning. (In other words, do not call this from a thread that
324 // should not be blocked, for example, a polling thread.)
Write(absl::string_view bytes)325 void Write(absl::string_view bytes) {
326 grpc_slice slice =
327 StaticSlice::FromStaticBuffer(bytes.data(), bytes.size()).TakeCSlice();
328 grpc_slice_buffer buffer;
329 grpc_slice_buffer_init(&buffer);
330 grpc_slice_buffer_add(&buffer, slice);
331 WriteBuffer(&buffer);
332 grpc_slice_buffer_destroy(&buffer);
333 }
334
SendPing()335 void SendPing() {
336 // Send and recv ping ack
337 const char ping_bytes[] =
338 "\x00\x00\x08\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00";
339 const char ping_ack_bytes[] =
340 "\x00\x00\x08\x06\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00";
341 Write(absl::string_view(ping_bytes, sizeof(ping_bytes) - 1));
342 WaitForReadBytes(
343 absl::string_view(ping_ack_bytes, sizeof(ping_ack_bytes) - 1));
344 }
345
SendGoaway(uint32_t last_stream_id)346 void SendGoaway(uint32_t last_stream_id) {
347 grpc_slice_buffer buffer;
348 grpc_slice_buffer_init(&buffer);
349 grpc_chttp2_goaway_append(last_stream_id, 0, grpc_empty_slice(), &buffer);
350 WriteBuffer(&buffer);
351 grpc_slice_buffer_destroy(&buffer);
352 }
353
WriteBuffer(grpc_slice_buffer * buffer)354 void WriteBuffer(grpc_slice_buffer* buffer) {
355 Notification on_write_done_notification_;
356 GRPC_CLOSURE_INIT(&on_write_done_, OnWriteDone,
357 &on_write_done_notification_, nullptr);
358 grpc_endpoint_write(tcp_, buffer, &on_write_done_, nullptr,
359 /*max_frame_size=*/INT_MAX);
360 ExecCtx::Get()->Flush();
361 GPR_ASSERT(on_write_done_notification_.WaitForNotificationWithTimeout(
362 absl::Seconds(5)));
363 }
364
OnWriteDone(void * arg,grpc_error_handle error)365 static void OnWriteDone(void* arg, grpc_error_handle error) {
366 GPR_ASSERT(error.ok());
367 Notification* on_write_done_notification_ = static_cast<Notification*>(arg);
368 on_write_done_notification_->Notify();
369 }
370
OnReadDone(void * arg,grpc_error_handle error)371 static void OnReadDone(void* arg, grpc_error_handle error) {
372 StreamsNotSeenTest* self = static_cast<StreamsNotSeenTest*>(arg);
373 if (error.ok()) {
374 {
375 MutexLock lock(&self->mu_);
376 for (size_t i = 0; i < self->read_buffer_.count; ++i) {
377 absl::StrAppend(&self->read_bytes_,
378 StringViewFromSlice(self->read_buffer_.slices[i]));
379 }
380 self->read_cv_.SignalAll();
381 }
382 grpc_slice_buffer_reset_and_unref(&self->read_buffer_);
383 grpc_endpoint_read(self->tcp_, &self->read_buffer_, &self->on_read_done_,
384 false, /*min_progress_size=*/1);
385 } else {
386 grpc_slice_buffer_destroy(&self->read_buffer_);
387 self->read_end_notification_.Notify();
388 }
389 }
390
391 // Waits for \a bytes to show up in read_bytes_
WaitForReadBytes(absl::string_view bytes)392 void WaitForReadBytes(absl::string_view bytes) {
393 std::atomic<bool> done{false};
394 std::thread cq_driver([&]() {
395 while (!done) {
396 grpc_event ev = grpc_completion_queue_next(
397 cq_, grpc_timeout_milliseconds_to_deadline(10), nullptr);
398 GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
399 }
400 });
401 {
402 MutexLock lock(&mu_);
403 while (!absl::StrContains(read_bytes_, bytes)) {
404 read_cv_.WaitWithTimeout(&mu_, absl::Seconds(5));
405 }
406 }
407 done = true;
408 cq_driver.join();
409 }
410
411 // Flag to check whether the server's MAX_CONCURRENT_STREAM setting is
412 // non-zero or not.
413 bool server_allows_streams_;
414 int port_;
415 test_tcp_server server_;
416 std::unique_ptr<std::thread> server_poll_thread_;
417 grpc_endpoint* tcp_ = nullptr;
418 Notification connect_notification_;
419 grpc_slice_buffer read_buffer_;
420 grpc_closure on_write_done_;
421 grpc_closure on_read_done_;
422 Notification read_end_notification_;
423 std::string read_bytes_ ABSL_GUARDED_BY(mu_);
424 grpc_channel* channel_ = nullptr;
425 grpc_completion_queue* cq_ = nullptr;
426 std::unique_ptr<CqVerifier> cqv_;
427 Mutex mu_;
428 CondVar read_cv_;
429 std::atomic<bool> shutdown_{false};
430 };
431
432 // Client's HTTP2 transport starts a new stream, sends the request on the wire,
433 // but receives a GOAWAY with a stream ID of 0, meaning that the request was
434 // unseen by the server.The test verifies that the HTTP2 transport adds
435 // GrpcNetworkStreamState with a value of kNotSeenByServer to the trailing
436 // metadata.
TEST_F(StreamsNotSeenTest,StartStreamBeforeGoaway)437 TEST_F(StreamsNotSeenTest, StartStreamBeforeGoaway) {
438 grpc_call* c =
439 grpc_channel_create_call(channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, cq_,
440 grpc_slice_from_static_string("/foo"), nullptr,
441 grpc_timeout_seconds_to_deadline(1), nullptr);
442 GPR_ASSERT(c);
443 grpc_metadata_array initial_metadata_recv;
444 grpc_metadata_array trailing_metadata_recv;
445 grpc_metadata_array_init(&initial_metadata_recv);
446 grpc_metadata_array_init(&trailing_metadata_recv);
447 grpc_op* op;
448 grpc_op ops[6];
449 grpc_status_code status;
450 const char* error_string;
451 grpc_call_error error;
452 grpc_slice details;
453 // Send the request
454 memset(ops, 0, sizeof(ops));
455 op = ops;
456 op->op = GRPC_OP_SEND_INITIAL_METADATA;
457 op->data.send_initial_metadata.count = 0;
458 op->flags = 0;
459 op->reserved = nullptr;
460 op++;
461 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
462 op->flags = 0;
463 op->reserved = nullptr;
464 op++;
465 error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(101),
466 nullptr);
467 cqv_->Expect(Tag(101), true);
468 cqv_->Verify();
469 // Send a goaway from server signalling that the request was unseen by the
470 // server.
471 SendGoaway(0);
472 memset(ops, 0, sizeof(ops));
473 op = ops;
474 op->op = GRPC_OP_RECV_INITIAL_METADATA;
475 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
476 op->flags = 0;
477 op->reserved = nullptr;
478 op++;
479 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
480 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
481 op->data.recv_status_on_client.status = &status;
482 op->data.recv_status_on_client.status_details = &details;
483 op->data.recv_status_on_client.error_string = &error_string;
484 op->flags = 0;
485 op->reserved = nullptr;
486 op++;
487 error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(102),
488 nullptr);
489 GPR_ASSERT(GRPC_CALL_OK == error);
490 cqv_->Expect(Tag(102), true);
491 cqv_->Verify();
492 // Verify status and metadata
493 EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
494 ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
495 ASSERT_TRUE(
496 TrailingMetadataRecordingFilter::stream_network_state().has_value());
497 EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().value(),
498 GrpcStreamNetworkState::kNotSeenByServer);
499 grpc_slice_unref(details);
500 gpr_free(const_cast<char*>(error_string));
501 grpc_metadata_array_destroy(&initial_metadata_recv);
502 grpc_metadata_array_destroy(&trailing_metadata_recv);
503 grpc_call_unref(c);
504 ExecCtx::Get()->Flush();
505 }
506
507 // Client's HTTP2 transport starts a new stream, sends the request on the wire,
508 // notices that the transport is destroyed. The test verifies that the HTTP2
509 // transport does not add GrpcNetworkStreamState metadata since we don't know
510 // whether the server saw the request or not.
TEST_F(StreamsNotSeenTest,TransportDestroyed)511 TEST_F(StreamsNotSeenTest, TransportDestroyed) {
512 grpc_call* c =
513 grpc_channel_create_call(channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, cq_,
514 grpc_slice_from_static_string("/foo"), nullptr,
515 grpc_timeout_seconds_to_deadline(1), nullptr);
516 GPR_ASSERT(c);
517 grpc_metadata_array initial_metadata_recv;
518 grpc_metadata_array trailing_metadata_recv;
519 grpc_metadata_array_init(&initial_metadata_recv);
520 grpc_metadata_array_init(&trailing_metadata_recv);
521 grpc_op* op;
522 grpc_op ops[6];
523 grpc_status_code status;
524 const char* error_string;
525 grpc_call_error error;
526 grpc_slice details;
527 // Send the request
528 memset(ops, 0, sizeof(ops));
529 op = ops;
530 op->op = GRPC_OP_SEND_INITIAL_METADATA;
531 op->data.send_initial_metadata.count = 0;
532 op->flags = 0;
533 op->reserved = nullptr;
534 op++;
535 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
536 op->flags = 0;
537 op->reserved = nullptr;
538 op++;
539 error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(101),
540 nullptr);
541 cqv_->Expect(Tag(101), true);
542 cqv_->Verify();
543 // Shutdown the server endpoint
544 grpc_endpoint_shutdown(tcp_, GRPC_ERROR_CREATE("Server shutdown"));
545 memset(ops, 0, sizeof(ops));
546 op = ops;
547 op->op = GRPC_OP_RECV_INITIAL_METADATA;
548 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
549 op->flags = 0;
550 op->reserved = nullptr;
551 op++;
552 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
553 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
554 op->data.recv_status_on_client.status = &status;
555 op->data.recv_status_on_client.status_details = &details;
556 op->data.recv_status_on_client.error_string = &error_string;
557 op->flags = 0;
558 op->reserved = nullptr;
559 op++;
560 error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(102),
561 nullptr);
562 GPR_ASSERT(GRPC_CALL_OK == error);
563 cqv_->Expect(Tag(102), true);
564 cqv_->Verify();
565 // Verify status and metadata
566 EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
567 EXPECT_FALSE(
568 TrailingMetadataRecordingFilter::stream_network_state().has_value());
569 grpc_slice_unref(details);
570 gpr_free(const_cast<char*>(error_string));
571 grpc_metadata_array_destroy(&initial_metadata_recv);
572 grpc_metadata_array_destroy(&trailing_metadata_recv);
573 grpc_call_unref(c);
574 ExecCtx::Get()->Flush();
575 }
576
577 // Client's HTTP2 transport tries to send an RPC after having received a GOAWAY
578 // frame. The test verifies that the HTTP2 transport adds GrpcNetworkStreamState
579 // with a value of kNotSentOnWire to the trailing metadata.
TEST_F(StreamsNotSeenTest,StartStreamAfterGoaway)580 TEST_F(StreamsNotSeenTest, StartStreamAfterGoaway) {
581 // Send Goaway from the server
582 SendGoaway(0);
583 // Send a ping to make sure that the goaway was received.
584 SendPing();
585 // Try sending an RPC
586 grpc_call* c =
587 grpc_channel_create_call(channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, cq_,
588 grpc_slice_from_static_string("/foo"), nullptr,
589 grpc_timeout_seconds_to_deadline(1), nullptr);
590 GPR_ASSERT(c);
591 grpc_metadata_array initial_metadata_recv;
592 grpc_metadata_array trailing_metadata_recv;
593 grpc_metadata_array_init(&initial_metadata_recv);
594 grpc_metadata_array_init(&trailing_metadata_recv);
595 grpc_op* op;
596 grpc_op ops[6];
597 grpc_status_code status;
598 const char* error_string;
599 grpc_call_error error;
600 grpc_slice details;
601 memset(ops, 0, sizeof(ops));
602 op = ops;
603 op->op = GRPC_OP_SEND_INITIAL_METADATA;
604 op->data.send_initial_metadata.count = 0;
605 op->flags = 0;
606 op->reserved = nullptr;
607 op++;
608 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
609 op->flags = 0;
610 op->reserved = nullptr;
611 op++;
612 op->op = GRPC_OP_RECV_INITIAL_METADATA;
613 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
614 op->flags = 0;
615 op->reserved = nullptr;
616 op++;
617 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
618 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
619 op->data.recv_status_on_client.status = &status;
620 op->data.recv_status_on_client.status_details = &details;
621 op->data.recv_status_on_client.error_string = &error_string;
622 op->flags = 0;
623 op->reserved = nullptr;
624 op++;
625 error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(101),
626 nullptr);
627 GPR_ASSERT(GRPC_CALL_OK == error);
628 cqv_->Expect(Tag(101), true);
629 cqv_->Verify();
630 // Verify status and metadata
631 EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
632 ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
633 ASSERT_TRUE(
634 TrailingMetadataRecordingFilter::stream_network_state().has_value());
635 EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().value(),
636 GrpcStreamNetworkState::kNotSentOnWire);
637 grpc_slice_unref(details);
638 gpr_free(const_cast<char*>(error_string));
639 grpc_metadata_array_destroy(&initial_metadata_recv);
640 grpc_metadata_array_destroy(&trailing_metadata_recv);
641 grpc_call_unref(c);
642 ExecCtx::Get()->Flush();
643 }
644
645 // These tests have the server sending a SETTINGS_FRAME with a max concurrent
646 // streams settings of 0 which denies the client the chance to start a stream.
647 // Note that in the future, these tests might become outdated if the
648 // client_channel learns about the max concurrent streams setting.
649 class ZeroConcurrencyTest : public StreamsNotSeenTest {
650 protected:
ZeroConcurrencyTest()651 ZeroConcurrencyTest() : StreamsNotSeenTest(/*server_allows_streams=*/false) {}
652 };
653
654 // Client's HTTP2 transport receives a RPC request, but it cannot start the RPC
655 // because of the max concurrent streams setting. A goaway frame is then
656 // received which should result in the RPC getting cancelled with
657 // kNotSentOnWire.
TEST_F(ZeroConcurrencyTest,StartStreamBeforeGoaway)658 TEST_F(ZeroConcurrencyTest, StartStreamBeforeGoaway) {
659 grpc_call* c =
660 grpc_channel_create_call(channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, cq_,
661 grpc_slice_from_static_string("/foo"), nullptr,
662 grpc_timeout_seconds_to_deadline(5), nullptr);
663 GPR_ASSERT(c);
664 grpc_metadata_array initial_metadata_recv;
665 grpc_metadata_array trailing_metadata_recv;
666 grpc_metadata_array_init(&initial_metadata_recv);
667 grpc_metadata_array_init(&trailing_metadata_recv);
668 grpc_op* op;
669 grpc_op ops[6];
670 grpc_status_code status;
671 const char* error_string;
672 grpc_call_error error;
673 grpc_slice details;
674 // Send the request
675 memset(ops, 0, sizeof(ops));
676 op = ops;
677 op->op = GRPC_OP_SEND_INITIAL_METADATA;
678 op->data.send_initial_metadata.count = 0;
679 op->flags = 0;
680 op->reserved = nullptr;
681 op++;
682 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
683 op->flags = 0;
684 op->reserved = nullptr;
685 op++;
686 op->op = GRPC_OP_RECV_INITIAL_METADATA;
687 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
688 op->flags = 0;
689 op->reserved = nullptr;
690 op++;
691 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
692 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
693 op->data.recv_status_on_client.status = &status;
694 op->data.recv_status_on_client.status_details = &details;
695 op->data.recv_status_on_client.error_string = &error_string;
696 op->flags = 0;
697 op->reserved = nullptr;
698 op++;
699 error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(101),
700 nullptr);
701 // This test assumes that nothing would pause the RPC before its received by
702 // the transport. If that no longer holds true, we might need to drive the cq
703 // for some time to make sure that the RPC reaches the HTTP2 layer.
704 SendGoaway(0);
705 GPR_ASSERT(GRPC_CALL_OK == error);
706 cqv_->Expect(Tag(101), true);
707 cqv_->Verify();
708 // Verify status and metadata
709 EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
710 ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
711 ASSERT_TRUE(
712 TrailingMetadataRecordingFilter::stream_network_state().has_value());
713 EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().value(),
714 GrpcStreamNetworkState::kNotSentOnWire);
715 grpc_slice_unref(details);
716 gpr_free(const_cast<char*>(error_string));
717 grpc_metadata_array_destroy(&initial_metadata_recv);
718 grpc_metadata_array_destroy(&trailing_metadata_recv);
719 grpc_call_unref(c);
720 ExecCtx::Get()->Flush();
721 }
722
723 // Client's HTTP2 transport receives a RPC request, but it cannot start the RPC
724 // because of the max concurrent streams setting. Server then shuts its endpoint
725 // which should result in the RPC getting cancelled with kNotSentOnWire.
TEST_F(ZeroConcurrencyTest,TransportDestroyed)726 TEST_F(ZeroConcurrencyTest, TransportDestroyed) {
727 grpc_call* c =
728 grpc_channel_create_call(channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, cq_,
729 grpc_slice_from_static_string("/foo"), nullptr,
730 grpc_timeout_seconds_to_deadline(5), nullptr);
731 GPR_ASSERT(c);
732 grpc_metadata_array initial_metadata_recv;
733 grpc_metadata_array trailing_metadata_recv;
734 grpc_metadata_array_init(&initial_metadata_recv);
735 grpc_metadata_array_init(&trailing_metadata_recv);
736 grpc_op* op;
737 grpc_op ops[6];
738 grpc_status_code status;
739 const char* error_string;
740 grpc_call_error error;
741 grpc_slice details;
742 // Send the request
743 memset(ops, 0, sizeof(ops));
744 op = ops;
745 op->op = GRPC_OP_SEND_INITIAL_METADATA;
746 op->data.send_initial_metadata.count = 0;
747 op->flags = 0;
748 op->reserved = nullptr;
749 op++;
750 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
751 op->flags = 0;
752 op->reserved = nullptr;
753 op++;
754 op->op = GRPC_OP_RECV_INITIAL_METADATA;
755 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
756 op->flags = 0;
757 op->reserved = nullptr;
758 op++;
759 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
760 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
761 op->data.recv_status_on_client.status = &status;
762 op->data.recv_status_on_client.status_details = &details;
763 op->data.recv_status_on_client.error_string = &error_string;
764 op->flags = 0;
765 op->reserved = nullptr;
766 op++;
767 error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(101),
768 nullptr);
769 grpc_endpoint_shutdown(tcp_, GRPC_ERROR_CREATE("Server shutdown"));
770 GPR_ASSERT(GRPC_CALL_OK == error);
771 cqv_->Expect(Tag(101), true);
772 cqv_->Verify();
773 // Verify status and metadata
774 EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
775 ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
776 ASSERT_TRUE(
777 TrailingMetadataRecordingFilter::stream_network_state().has_value());
778 EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().value(),
779 GrpcStreamNetworkState::kNotSentOnWire);
780 grpc_slice_unref(details);
781 gpr_free(const_cast<char*>(error_string));
782 grpc_metadata_array_destroy(&initial_metadata_recv);
783 grpc_metadata_array_destroy(&trailing_metadata_recv);
784 grpc_call_unref(c);
785 ExecCtx::Get()->Flush();
786 }
787
788 } // namespace
789 } // namespace grpc_core
790
main(int argc,char ** argv)791 int main(int argc, char** argv) {
792 ::testing::InitGoogleTest(&argc, argv);
793 grpc::testing::TestEnvironment env(&argc, argv);
794 int result;
795 grpc_core::CoreConfiguration::RunWithSpecialConfiguration(
796 [](grpc_core::CoreConfiguration::Builder* builder) {
797 grpc_core::BuildCoreConfiguration(builder);
798 builder->channel_init()->RegisterFilter(
799 GRPC_CLIENT_SUBCHANNEL,
800 &grpc_core::TrailingMetadataRecordingFilter::kFilterVtable);
801 },
802 [&] {
803 grpc_core::
804 TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification(
805 true);
806 grpc_init();
807 {
808 grpc_core::ExecCtx exec_ctx;
809 result = RUN_ALL_TESTS();
810 }
811 grpc_shutdown();
812 });
813 return result;
814 }
815