1 //
2 //
3 // Copyright 2020 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <memory>
20
21 #include "gtest/gtest.h"
22
23 #include <grpc/status.h>
24 #include <grpc/support/log.h>
25
26 #include "src/core/lib/gprpp/time.h"
27 #include "test/core/end2end/cq_verifier.h"
28 #include "test/core/end2end/end2end_tests.h"
29
30 namespace grpc_core {
31 namespace {
32
33 // Client requests status along with the initial metadata. Server streams
34 // messages and ends with a non-OK status. Client reads after server is done
35 // writing, and expects to get the status after the messages.
ServerStreaming(CoreEnd2endTest & test,int num_messages)36 void ServerStreaming(CoreEnd2endTest& test, int num_messages) {
37 auto c = test.NewClientCall("/foo").Timeout(Duration::Minutes(1)).Create();
38 CoreEnd2endTest::IncomingMetadata server_initial_metadata;
39 CoreEnd2endTest::IncomingStatusOnClient server_status;
40 c.NewBatch(1)
41 .SendInitialMetadata({})
42 .RecvInitialMetadata(server_initial_metadata)
43 // Client requests status early but should not receive status till all the
44 // messages are received.
45 .RecvStatusOnClient(server_status);
46 // Client sends close early
47 c.NewBatch(3).SendCloseFromClient();
48 test.Expect(3, true);
49 test.Step();
50 auto s = test.RequestCall(100);
51 test.Expect(100, true);
52 test.Step();
53 s.NewBatch(101).SendInitialMetadata({});
54 test.Expect(101, true);
55 test.Step();
56 // Server writes bunch of messages
57 for (int i = 0; i < num_messages; i++) {
58 s.NewBatch(103).SendMessage("hello world");
59 test.Expect(103, true);
60 test.Step();
61 }
62 // Server sends status
63 CoreEnd2endTest::IncomingCloseOnServer client_close;
64 s.NewBatch(104)
65 .SendStatusFromServer(GRPC_STATUS_UNIMPLEMENTED, "xyz", {})
66 .RecvCloseOnServer(client_close);
67 bool seen_status = false;
68 test.Expect(1, CoreEnd2endTest::Maybe{&seen_status});
69 test.Expect(104, true);
70 test.Step();
71
72 gpr_log(GPR_DEBUG, "SEEN_STATUS:%d", seen_status);
73
74 // Client keeps reading messages till it gets the status
75 int num_messages_received = 0;
76 while (true) {
77 CoreEnd2endTest::IncomingMessage server_message;
78 c.NewBatch(102).RecvMessage(server_message);
79 test.Expect(1, CqVerifier::Maybe{&seen_status});
80 test.Expect(102, true);
81 test.Step();
82 if (server_message.is_end_of_stream()) {
83 // The transport has received the trailing metadata.
84 break;
85 }
86 EXPECT_EQ(server_message.payload(), "hello world");
87 num_messages_received++;
88 }
89 GPR_ASSERT(num_messages_received == num_messages);
90 if (!seen_status) {
91 test.Expect(1, true);
92 test.Step();
93 }
94 EXPECT_EQ(server_status.status(), GRPC_STATUS_UNIMPLEMENTED);
95 EXPECT_EQ(server_status.message(), "xyz");
96 }
97
CORE_END2END_TEST(Http2Test,ServerStreaming)98 CORE_END2END_TEST(Http2Test, ServerStreaming) { ServerStreaming(*this, 1); }
99
CORE_END2END_TEST(Http2Test,ServerStreamingEmptyStream)100 CORE_END2END_TEST(Http2Test, ServerStreamingEmptyStream) {
101 ServerStreaming(*this, 0);
102 }
103
CORE_END2END_TEST(Http2Test,ServerStreaming10Messages)104 CORE_END2END_TEST(Http2Test, ServerStreaming10Messages) {
105 ServerStreaming(*this, 10);
106 }
107
108 } // namespace
109 } // namespace grpc_core
110