xref: /aosp_15_r20/external/grpc-grpc/test/core/tsi/alts/handshaker/alts_concurrent_connectivity_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <fcntl.h>
20 #include <netinet/in.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <sys/socket.h>
24 #include <sys/stat.h>
25 #include <sys/types.h>
26 #include <unistd.h>
27 
28 #include <functional>
29 #include <set>
30 #include <thread>
31 
32 #include <gmock/gmock.h>
33 
34 #include "absl/memory/memory.h"
35 #include "absl/strings/str_cat.h"
36 
37 #include <grpc/grpc.h>
38 #include <grpc/grpc_security.h>
39 #include <grpc/slice.h>
40 #include <grpc/support/alloc.h>
41 #include <grpc/support/log.h>
42 #include <grpc/support/port_platform.h>
43 #include <grpc/support/string_util.h>
44 #include <grpc/support/time.h>
45 #include <grpcpp/impl/service_type.h>
46 #include <grpcpp/server_builder.h>
47 
48 #include "src/core/lib/gpr/useful.h"
49 #include "src/core/lib/gprpp/crash.h"
50 #include "src/core/lib/gprpp/host_port.h"
51 #include "src/core/lib/gprpp/thd.h"
52 #include "src/core/lib/iomgr/error.h"
53 #include "src/core/lib/security/credentials/alts/alts_credentials.h"
54 #include "src/core/lib/security/credentials/credentials.h"
55 #include "src/core/lib/security/security_connector/alts/alts_security_connector.h"
56 #include "src/core/lib/slice/slice_string_helpers.h"
57 #include "test/core/end2end/cq_verifier.h"
58 #include "test/core/tsi/alts/fake_handshaker/fake_handshaker_server.h"
59 #include "test/core/util/fake_udp_and_tcp_server.h"
60 #include "test/core/util/port.h"
61 #include "test/core/util/test_config.h"
62 
63 namespace {
64 
drain_cq(grpc_completion_queue * cq)65 void drain_cq(grpc_completion_queue* cq) {
66   grpc_event ev;
67   do {
68     ev = grpc_completion_queue_next(
69         cq, grpc_timeout_milliseconds_to_deadline(5000), nullptr);
70   } while (ev.type != GRPC_QUEUE_SHUTDOWN);
71 }
72 
create_secure_channel_for_test(const char * server_addr,const char * fake_handshake_server_addr,int reconnect_backoff_ms)73 grpc_channel* create_secure_channel_for_test(
74     const char* server_addr, const char* fake_handshake_server_addr,
75     int reconnect_backoff_ms) {
76   grpc_alts_credentials_options* alts_options =
77       grpc_alts_credentials_client_options_create();
78   grpc_channel_credentials* channel_creds =
79       grpc_alts_credentials_create_customized(alts_options,
80                                               fake_handshake_server_addr,
81                                               true /* enable_untrusted_alts */);
82   grpc_alts_credentials_options_destroy(alts_options);
83   // The main goal of these tests are to stress concurrent ALTS handshakes,
84   // so we prevent subchnannel sharing.
85   std::vector<grpc_arg> new_args;
86   new_args.push_back(grpc_channel_arg_integer_create(
87       const_cast<char*>(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), true));
88   if (reconnect_backoff_ms != 0) {
89     new_args.push_back(grpc_channel_arg_integer_create(
90         const_cast<char*>("grpc.testing.fixed_reconnect_backoff_ms"),
91         reconnect_backoff_ms));
92   }
93   grpc_channel_args* channel_args =
94       grpc_channel_args_copy_and_add(nullptr, new_args.data(), new_args.size());
95   grpc_channel* channel =
96       grpc_channel_create(server_addr, channel_creds, channel_args);
97   grpc_channel_args_destroy(channel_args);
98   grpc_channel_credentials_release(channel_creds);
99   return channel;
100 }
101 
102 class FakeHandshakeServer {
103  public:
FakeHandshakeServer()104   FakeHandshakeServer() {
105     int port = grpc_pick_unused_port_or_die();
106     address_ = grpc_core::JoinHostPort("localhost", port);
107     service_ = grpc::gcp::CreateFakeHandshakerService("peer_identity");
108     grpc::ServerBuilder builder;
109     builder.AddListeningPort(address_, grpc::InsecureServerCredentials());
110     builder.RegisterService(service_.get());
111     // TODO(apolcyn): when removing the global concurrent handshake limiting
112     // queue, set MAX_CONCURRENT_STREAMS on this server.
113     server_ = builder.BuildAndStart();
114     gpr_log(GPR_INFO, "Fake handshaker server listening on %s",
115             address_.c_str());
116   }
117 
~FakeHandshakeServer()118   ~FakeHandshakeServer() {
119     server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
120   }
121 
address()122   const char* address() { return address_.c_str(); }
123 
124  private:
125   std::string address_;
126   std::unique_ptr<grpc::Service> service_;
127   std::unique_ptr<grpc::Server> server_;
128 };
129 
130 class TestServer {
131  public:
TestServer()132   TestServer() {
133     grpc_alts_credentials_options* alts_options =
134         grpc_alts_credentials_server_options_create();
135     grpc_server_credentials* server_creds =
136         grpc_alts_server_credentials_create_customized(
137             alts_options, fake_handshake_server_.address(),
138             true /* enable_untrusted_alts */);
139     grpc_alts_credentials_options_destroy(alts_options);
140     server_ = grpc_server_create(nullptr, nullptr);
141     server_cq_ = grpc_completion_queue_create_for_next(nullptr);
142     grpc_server_register_completion_queue(server_, server_cq_, nullptr);
143     int port = grpc_pick_unused_port_or_die();
144     server_addr_ = grpc_core::JoinHostPort("localhost", port);
145     GPR_ASSERT(grpc_server_add_http2_port(server_, server_addr_.c_str(),
146                                           server_creds));
147     grpc_server_credentials_release(server_creds);
148     grpc_server_start(server_);
149     gpr_log(GPR_DEBUG, "Start TestServer %p. listen on %s", this,
150             server_addr_.c_str());
151     server_thd_ = std::make_unique<std::thread>(PollUntilShutdown, this);
152   }
153 
~TestServer()154   ~TestServer() {
155     gpr_log(GPR_DEBUG, "Begin dtor of TestServer %p", this);
156     grpc_server_shutdown_and_notify(server_, server_cq_, this);
157     server_thd_->join();
158     grpc_server_destroy(server_);
159     grpc_completion_queue_shutdown(server_cq_);
160     drain_cq(server_cq_);
161     grpc_completion_queue_destroy(server_cq_);
162   }
163 
address()164   const char* address() { return server_addr_.c_str(); }
165 
PollUntilShutdown(const TestServer * self)166   static void PollUntilShutdown(const TestServer* self) {
167     grpc_event ev = grpc_completion_queue_next(
168         self->server_cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
169     GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
170     GPR_ASSERT(ev.tag == self);
171     gpr_log(GPR_DEBUG, "TestServer %p stop polling", self);
172   }
173 
174  private:
175   grpc_server* server_;
176   grpc_completion_queue* server_cq_;
177   std::unique_ptr<std::thread> server_thd_;
178   std::string server_addr_;
179   // Give this test server its own ALTS handshake server
180   // so that we avoid competing for ALTS handshake server resources (e.g.
181   // available HTTP2 streams on a globally shared handshaker subchannel)
182   // with clients that are trying to do mutual ALTS handshakes
183   // with this server (which could "deadlock" mutual handshakes).
184   // TODO(apolcyn): remove this workaround from this test and have
185   // clients/servers share a single fake handshake server if
186   // the underlying issue needs to be fixed.
187   FakeHandshakeServer fake_handshake_server_;
188 };
189 
190 class ConnectLoopRunner {
191  public:
ConnectLoopRunner(const char * server_address,const char * fake_handshake_server_addr,int per_connect_deadline_seconds,size_t loops,grpc_connectivity_state expected_connectivity_states,int reconnect_backoff_ms)192   explicit ConnectLoopRunner(
193       const char* server_address, const char* fake_handshake_server_addr,
194       int per_connect_deadline_seconds, size_t loops,
195       grpc_connectivity_state expected_connectivity_states,
196       int reconnect_backoff_ms)
197       : server_address_(grpc_core::UniquePtr<char>(gpr_strdup(server_address))),
198         fake_handshake_server_addr_(
199             grpc_core::UniquePtr<char>(gpr_strdup(fake_handshake_server_addr))),
200         per_connect_deadline_seconds_(per_connect_deadline_seconds),
201         loops_(loops),
202         expected_connectivity_states_(expected_connectivity_states),
203         reconnect_backoff_ms_(reconnect_backoff_ms) {
204     thd_ = std::make_unique<std::thread>(ConnectLoop, this);
205   }
206 
~ConnectLoopRunner()207   ~ConnectLoopRunner() { thd_->join(); }
208 
ConnectLoop(const ConnectLoopRunner * self)209   static void ConnectLoop(const ConnectLoopRunner* self) {
210     for (size_t i = 0; i < self->loops_; i++) {
211       gpr_log(GPR_DEBUG, "runner:%p connect_loop begin loop %ld", self, i);
212       grpc_completion_queue* cq =
213           grpc_completion_queue_create_for_next(nullptr);
214       grpc_channel* channel = create_secure_channel_for_test(
215           self->server_address_.get(), self->fake_handshake_server_addr_.get(),
216           self->reconnect_backoff_ms_);
217       // Connect, forcing an ALTS handshake
218       grpc_connectivity_state state =
219           grpc_channel_check_connectivity_state(channel, 1);
220       ASSERT_EQ(state, GRPC_CHANNEL_IDLE);
221       while (state != self->expected_connectivity_states_) {
222         if (self->expected_connectivity_states_ ==
223             GRPC_CHANNEL_TRANSIENT_FAILURE) {
224           ASSERT_NE(state, GRPC_CHANNEL_READY);  // sanity check
225         } else {
226           ASSERT_EQ(self->expected_connectivity_states_, GRPC_CHANNEL_READY);
227         }
228         grpc_channel_watch_connectivity_state(
229             channel, state, gpr_inf_future(GPR_CLOCK_REALTIME), cq, nullptr);
230         grpc_event ev =
231             grpc_completion_queue_next(cq,
232                                        grpc_timeout_seconds_to_deadline(
233                                            self->per_connect_deadline_seconds_),
234                                        nullptr);
235         ASSERT_EQ(ev.type, GRPC_OP_COMPLETE)
236             << "connect_loop runner:" << std::hex << self
237             << " got ev.type:" << ev.type << " i:" << i;
238         ASSERT_TRUE(ev.success);
239         grpc_connectivity_state prev_state = state;
240         state = grpc_channel_check_connectivity_state(channel, 1);
241         if (self->expected_connectivity_states_ ==
242                 GRPC_CHANNEL_TRANSIENT_FAILURE &&
243             prev_state == GRPC_CHANNEL_CONNECTING &&
244             state == GRPC_CHANNEL_CONNECTING) {
245           // Detect a race in state checking: if the watch_connectivity_state
246           // completed from prior state "connecting", this could be because the
247           // channel momentarily entered state "transient failure", which is
248           // what we want. However, if the channel immediately re-enters
249           // "connecting" state, then the new state check might still result in
250           // "connecting". A continuous repeat of this can cause this loop to
251           // never terminate in time. So take this scenario to indicate that the
252           // channel momentarily entered transient failure.
253           break;
254         }
255       }
256       grpc_channel_destroy(channel);
257       grpc_completion_queue_shutdown(cq);
258       drain_cq(cq);
259       grpc_completion_queue_destroy(cq);
260       gpr_log(GPR_DEBUG, "runner:%p connect_loop finished loop %ld", self, i);
261     }
262   }
263 
264  private:
265   grpc_core::UniquePtr<char> server_address_;
266   grpc_core::UniquePtr<char> fake_handshake_server_addr_;
267   int per_connect_deadline_seconds_;
268   size_t loops_;
269   grpc_connectivity_state expected_connectivity_states_;
270   std::unique_ptr<std::thread> thd_;
271   int reconnect_backoff_ms_;
272 };
273 
274 // Perform a few ALTS handshakes sequentially (using the fake, in-process ALTS
275 // handshake server).
TEST(AltsConcurrentConnectivityTest,TestBasicClientServerHandshakes)276 TEST(AltsConcurrentConnectivityTest, TestBasicClientServerHandshakes) {
277   FakeHandshakeServer fake_handshake_server;
278   TestServer test_server;
279   {
280     ConnectLoopRunner runner(
281         test_server.address(), fake_handshake_server.address(),
282         10 /* per connect deadline seconds */, 10 /* loops */,
283         GRPC_CHANNEL_READY /* expected connectivity states */,
284         0 /* reconnect_backoff_ms unset */);
285   }
286 }
287 
288 // Run a bunch of concurrent ALTS handshakes on concurrent channels
289 // (using the fake, in-process handshake server).
TEST(AltsConcurrentConnectivityTest,TestConcurrentClientServerHandshakes)290 TEST(AltsConcurrentConnectivityTest, TestConcurrentClientServerHandshakes) {
291   FakeHandshakeServer fake_handshake_server;
292   // Test
293   {
294     TestServer test_server;
295     size_t num_concurrent_connects = 50;
296     std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
297     gpr_log(GPR_DEBUG,
298             "start performing concurrent expected-to-succeed connects");
299     for (size_t i = 0; i < num_concurrent_connects; i++) {
300       connect_loop_runners.push_back(std::make_unique<ConnectLoopRunner>(
301           test_server.address(), fake_handshake_server.address(),
302           15 /* per connect deadline seconds */, 5 /* loops */,
303           GRPC_CHANNEL_READY /* expected connectivity states */,
304           0 /* reconnect_backoff_ms unset */));
305     }
306     connect_loop_runners.clear();
307     gpr_log(GPR_DEBUG,
308             "done performing concurrent expected-to-succeed connects");
309   }
310 }
311 
312 // This test is intended to make sure that ALTS handshakes we correctly
313 // fail fast when the security handshaker gets an error while reading
314 // from the remote peer, after having earlier sent the first bytes of the
315 // ALTS handshake to the peer, i.e. after getting into the middle of a
316 // handshake.
TEST(AltsConcurrentConnectivityTest,TestHandshakeFailsFastWhenPeerEndpointClosesConnectionAfterAccepting)317 TEST(AltsConcurrentConnectivityTest,
318      TestHandshakeFailsFastWhenPeerEndpointClosesConnectionAfterAccepting) {
319   // Don't enforce the number of concurrent rpcs for the fake handshake
320   // server in this test, because this test will involve handshake RPCs
321   // getting cancelled. Because there isn't explicit synchronization between
322   // an ALTS handshake client's RECV_STATUS op completing after call
323   // cancellation, and the corresponding fake handshake server's sync
324   // method handler returning, enforcing a limit on the number of active
325   // RPCs at the fake handshake server would be inherently racey.
326   FakeHandshakeServer fake_handshake_server;
327   // The fake_backend_server emulates a secure (ALTS based) gRPC backend. So
328   // it waits for the client to send the first bytes.
329   grpc_core::testing::FakeUdpAndTcpServer fake_backend_server(
330       grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
331           kWaitForClientToSendFirstBytes,
332       grpc_core::testing::FakeUdpAndTcpServer::
333           CloseSocketUponReceivingBytesFromPeer);
334   {
335     std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
336     size_t num_concurrent_connects = 100;
337     gpr_log(GPR_DEBUG, "start performing concurrent expected-to-fail connects");
338     for (size_t i = 0; i < num_concurrent_connects; i++) {
339       connect_loop_runners.push_back(std::make_unique<ConnectLoopRunner>(
340           fake_backend_server.address(), fake_handshake_server.address(),
341           10 /* per connect deadline seconds */, 3 /* loops */,
342           GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */,
343           0 /* reconnect_backoff_ms unset */));
344     }
345     connect_loop_runners.clear();
346     gpr_log(GPR_DEBUG, "done performing concurrent expected-to-fail connects");
347   }
348 }
349 
350 // This test is intended to make sure that ALTS handshakes correctly
351 // fail fast when the ALTS handshake server fails incoming handshakes fast.
TEST(AltsConcurrentConnectivityTest,TestHandshakeFailsFastWhenHandshakeServerClosesConnectionAfterAccepting)352 TEST(AltsConcurrentConnectivityTest,
353      TestHandshakeFailsFastWhenHandshakeServerClosesConnectionAfterAccepting) {
354   // The fake_handshake_server emulates a broken ALTS handshaker, which
355   // is an insecure server. So send settings to the client eagerly.
356   grpc_core::testing::FakeUdpAndTcpServer fake_handshake_server(
357       grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::kEagerlySendSettings,
358       grpc_core::testing::FakeUdpAndTcpServer::
359           CloseSocketUponReceivingBytesFromPeer);
360   // The fake_backend_server emulates a secure (ALTS based) server, so wait
361   // for the client to send the first bytes.
362   grpc_core::testing::FakeUdpAndTcpServer fake_backend_server(
363       grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
364           kWaitForClientToSendFirstBytes,
365       grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
366   {
367     std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
368     size_t num_concurrent_connects = 100;
369     gpr_log(GPR_DEBUG, "start performing concurrent expected-to-fail connects");
370     for (size_t i = 0; i < num_concurrent_connects; i++) {
371       connect_loop_runners.push_back(std::make_unique<ConnectLoopRunner>(
372           fake_backend_server.address(), fake_handshake_server.address(),
373           20 /* per connect deadline seconds */, 2 /* loops */,
374           GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */,
375           0 /* reconnect_backoff_ms unset */));
376     }
377     connect_loop_runners.clear();
378     gpr_log(GPR_DEBUG, "done performing concurrent expected-to-fail connects");
379   }
380 }
381 
382 // This test is intended to make sure that ALTS handshakes correctly
383 // fail fast when the ALTS handshake server is non-responsive, in which case
384 // the overall connection deadline kicks in.
TEST(AltsConcurrentConnectivityTest,TestHandshakeFailsFastWhenHandshakeServerHangsAfterAccepting)385 TEST(AltsConcurrentConnectivityTest,
386      TestHandshakeFailsFastWhenHandshakeServerHangsAfterAccepting) {
387   // fake_handshake_server emulates an insecure server, so send settings first.
388   // It will be unresponsive for the rest of the connection, though.
389   grpc_core::testing::FakeUdpAndTcpServer fake_handshake_server(
390       grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::kEagerlySendSettings,
391       grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
392   // fake_backend_server emulates an ALTS based server, so wait for the client
393   // to send the first bytes.
394   grpc_core::testing::FakeUdpAndTcpServer fake_backend_server(
395       grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
396           kWaitForClientToSendFirstBytes,
397       grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
398   {
399     std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
400     size_t num_concurrent_connects = 100;
401     gpr_log(GPR_DEBUG, "start performing concurrent expected-to-fail connects");
402     for (size_t i = 0; i < num_concurrent_connects; i++) {
403       connect_loop_runners.push_back(std::make_unique<ConnectLoopRunner>(
404           fake_backend_server.address(), fake_handshake_server.address(),
405           10 /* per connect deadline seconds */, 2 /* loops */,
406           GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */,
407           100 /* reconnect_backoff_ms */));
408     }
409     connect_loop_runners.clear();
410     gpr_log(GPR_DEBUG, "done performing concurrent expected-to-fail connects");
411   }
412 }
413 
414 }  // namespace
415 
main(int argc,char ** argv)416 int main(int argc, char** argv) {
417   ::testing::InitGoogleTest(&argc, argv);
418   grpc::testing::TestEnvironment env(&argc, argv);
419   grpc_init();
420   auto result = RUN_ALL_TESTS();
421   grpc_shutdown();
422   return result;
423 }
424