1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "test/cpp/end2end/interceptors_util.h"
20
21 #include "absl/memory/memory.h"
22
23 #include "test/core/util/test_config.h"
24
25 namespace grpc {
26 namespace testing {
27
28 std::atomic<int> PhonyInterceptor::num_times_run_;
29 std::atomic<int> PhonyInterceptor::num_times_run_reverse_;
30 std::atomic<int> PhonyInterceptor::num_times_cancel_;
31
MakeCall(const std::shared_ptr<Channel> & channel,const StubOptions & options)32 void MakeCall(const std::shared_ptr<Channel>& channel,
33 const StubOptions& options) {
34 auto stub = grpc::testing::EchoTestService::NewStub(channel, options);
35 ClientContext ctx;
36 EchoRequest req;
37 req.mutable_param()->set_echo_metadata(true);
38 ctx.AddMetadata("testkey", "testvalue");
39 req.set_message("Hello");
40 EchoResponse resp;
41 Status s = stub->Echo(&ctx, req, &resp);
42 EXPECT_EQ(s.ok(), true);
43 EXPECT_EQ(resp.message(), "Hello");
44 }
45
MakeClientStreamingCall(const std::shared_ptr<Channel> & channel)46 void MakeClientStreamingCall(const std::shared_ptr<Channel>& channel) {
47 auto stub = grpc::testing::EchoTestService::NewStub(channel);
48 ClientContext ctx;
49 EchoRequest req;
50 req.mutable_param()->set_echo_metadata(true);
51 ctx.AddMetadata("testkey", "testvalue");
52 req.set_message("Hello");
53 EchoResponse resp;
54 string expected_resp;
55 auto writer = stub->RequestStream(&ctx, &resp);
56 for (int i = 0; i < kNumStreamingMessages; i++) {
57 writer->Write(req);
58 expected_resp += "Hello";
59 }
60 writer->WritesDone();
61 Status s = writer->Finish();
62 EXPECT_EQ(s.ok(), true);
63 EXPECT_EQ(resp.message(), expected_resp);
64 }
65
MakeServerStreamingCall(const std::shared_ptr<Channel> & channel)66 void MakeServerStreamingCall(const std::shared_ptr<Channel>& channel) {
67 auto stub = grpc::testing::EchoTestService::NewStub(channel);
68 ClientContext ctx;
69 EchoRequest req;
70 req.mutable_param()->set_echo_metadata(true);
71 ctx.AddMetadata("testkey", "testvalue");
72 req.set_message("Hello");
73 EchoResponse resp;
74 auto reader = stub->ResponseStream(&ctx, req);
75 int count = 0;
76 while (reader->Read(&resp)) {
77 EXPECT_EQ(resp.message(), "Hello");
78 count++;
79 }
80 ASSERT_EQ(count, kNumStreamingMessages);
81 Status s = reader->Finish();
82 EXPECT_EQ(s.ok(), true);
83 }
84
MakeBidiStreamingCall(const std::shared_ptr<Channel> & channel)85 void MakeBidiStreamingCall(const std::shared_ptr<Channel>& channel) {
86 auto stub = grpc::testing::EchoTestService::NewStub(channel);
87 ClientContext ctx;
88 EchoRequest req;
89 EchoResponse resp;
90 ctx.AddMetadata("testkey", "testvalue");
91 req.mutable_param()->set_echo_metadata(true);
92 auto stream = stub->BidiStream(&ctx);
93 for (auto i = 0; i < kNumStreamingMessages; i++) {
94 req.set_message("Hello" + std::to_string(i));
95 stream->Write(req);
96 stream->Read(&resp);
97 EXPECT_EQ(req.message(), resp.message());
98 }
99 ASSERT_TRUE(stream->WritesDone());
100 Status s = stream->Finish();
101 EXPECT_EQ(s.ok(), true);
102 }
103
MakeAsyncCQCall(const std::shared_ptr<Channel> & channel)104 void MakeAsyncCQCall(const std::shared_ptr<Channel>& channel) {
105 auto stub = grpc::testing::EchoTestService::NewStub(channel);
106 CompletionQueue cq;
107 EchoRequest send_request;
108 EchoResponse recv_response;
109 Status recv_status;
110 ClientContext cli_ctx;
111
112 send_request.set_message("Hello");
113 cli_ctx.AddMetadata("testkey", "testvalue");
114 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
115 stub->AsyncEcho(&cli_ctx, send_request, &cq));
116 response_reader->Finish(&recv_response, &recv_status, tag(1));
117 Verifier().Expect(1, true).Verify(&cq);
118 EXPECT_EQ(send_request.message(), recv_response.message());
119 EXPECT_TRUE(recv_status.ok());
120 }
121
MakeAsyncCQClientStreamingCall(const std::shared_ptr<Channel> &)122 void MakeAsyncCQClientStreamingCall(
123 const std::shared_ptr<Channel>& /*channel*/) {
124 // TODO(yashykt) : Fill this out
125 }
126
MakeAsyncCQServerStreamingCall(const std::shared_ptr<Channel> & channel)127 void MakeAsyncCQServerStreamingCall(const std::shared_ptr<Channel>& channel) {
128 auto stub = grpc::testing::EchoTestService::NewStub(channel);
129 CompletionQueue cq;
130 EchoRequest send_request;
131 EchoResponse recv_response;
132 Status recv_status;
133 ClientContext cli_ctx;
134
135 cli_ctx.AddMetadata("testkey", "testvalue");
136 send_request.set_message("Hello");
137 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
138 stub->AsyncResponseStream(&cli_ctx, send_request, &cq, tag(1)));
139 Verifier().Expect(1, true).Verify(&cq);
140 // Read the expected number of messages
141 for (int i = 0; i < kNumStreamingMessages; i++) {
142 cli_stream->Read(&recv_response, tag(2));
143 Verifier().Expect(2, true).Verify(&cq);
144 ASSERT_EQ(recv_response.message(), send_request.message());
145 }
146 // The next read should fail
147 cli_stream->Read(&recv_response, tag(3));
148 Verifier().Expect(3, false).Verify(&cq);
149 // Get the status
150 cli_stream->Finish(&recv_status, tag(4));
151 Verifier().Expect(4, true).Verify(&cq);
152 EXPECT_TRUE(recv_status.ok());
153 }
154
MakeAsyncCQBidiStreamingCall(const std::shared_ptr<Channel> &)155 void MakeAsyncCQBidiStreamingCall(const std::shared_ptr<Channel>& /*channel*/) {
156 // TODO(yashykt) : Fill this out
157 }
158
MakeCallbackCall(const std::shared_ptr<Channel> & channel)159 void MakeCallbackCall(const std::shared_ptr<Channel>& channel) {
160 auto stub = grpc::testing::EchoTestService::NewStub(channel);
161 ClientContext ctx;
162 ctx.set_deadline(grpc_timeout_milliseconds_to_deadline(20000));
163 EchoRequest req;
164 std::mutex mu;
165 std::condition_variable cv;
166 bool done = false;
167 req.mutable_param()->set_echo_metadata(true);
168 ctx.AddMetadata("testkey", "testvalue");
169 req.set_message("Hello");
170 EchoResponse resp;
171 stub->async()->Echo(&ctx, &req, &resp, [&resp, &mu, &done, &cv](Status s) {
172 EXPECT_EQ(s.ok(), true);
173 EXPECT_EQ(resp.message(), "Hello");
174 std::lock_guard<std::mutex> l(mu);
175 done = true;
176 cv.notify_one();
177 });
178 std::unique_lock<std::mutex> l(mu);
179 while (!done) {
180 cv.wait(l);
181 }
182 }
183
CheckMetadata(const std::multimap<grpc::string_ref,grpc::string_ref> & map,const string & key,const string & value)184 bool CheckMetadata(const std::multimap<grpc::string_ref, grpc::string_ref>& map,
185 const string& key, const string& value) {
186 for (const auto& pair : map) {
187 if (pair.first.starts_with(key) && pair.second.starts_with(value)) {
188 return true;
189 }
190 }
191 return false;
192 }
193
CheckMetadata(const std::multimap<std::string,std::string> & map,const string & key,const string & value)194 bool CheckMetadata(const std::multimap<std::string, std::string>& map,
195 const string& key, const string& value) {
196 for (const auto& pair : map) {
197 if (pair.first == key && pair.second == value) {
198 return true;
199 }
200 }
201 return false;
202 }
203
204 std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
CreatePhonyClientInterceptors()205 CreatePhonyClientInterceptors() {
206 std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
207 creators;
208 // Add 20 phony interceptors before hijacking interceptor
209 creators.reserve(20);
210 for (auto i = 0; i < 20; i++) {
211 creators.push_back(std::make_unique<PhonyInterceptorFactory>());
212 }
213 return creators;
214 }
215
216 } // namespace testing
217 } // namespace grpc
218