1 // Copyright (C) 2021 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <errno.h>
16 #include <gtest/gtest.h>
17 #include <netdb.h>
18 #include <netinet/in.h>
19 #include <signal.h>
20 #include <string.h>
21 #include <sys/socket.h>
22 #include <sys/types.h>
23 #include <unistd.h>
24 
25 #include <condition_variable>
26 #include <cstdint>
27 #include <cstring>
28 #include <functional>
29 #include <memory>
30 #include <mutex>
31 #include <random>
32 #include <vector>
33 
34 #include "model/setup/async_manager.h"
35 #include "net/posix/posix_async_socket_connector.h"
36 #include "net/posix/posix_async_socket_server.h"
37 
38 namespace android {
39 namespace net {
40 
41 using clock = std::chrono::system_clock;
42 
43 class SigPipeSignalHandler {
44 public:
SigPipeSignalHandler()45   SigPipeSignalHandler() {
46     sSignal = -1;
47     struct sigaction act = {};
48     act.sa_handler = myHandler;
49     ::sigaction(SIGPIPE, &act, &mOldAction);
50   }
51 
~SigPipeSignalHandler()52   ~SigPipeSignalHandler() { ::sigaction(SIGPIPE, &mOldAction, nullptr); }
53 
signaled() const54   int signaled() const { return sSignal; }
55 
56 private:
57   struct sigaction mOldAction;
58 
59   static int sSignal;
60 
myHandler(int sig)61   static void myHandler(int sig) { sSignal = sig; }
62 };
63 
64 // static
65 int SigPipeSignalHandler::sSignal = 0;
66 
67 using SocketCon = std::shared_ptr<AsyncDataChannel>;
68 
69 class PosixSocketTest : public testing::Test {
70 public:
PosixSocketTest()71   PosixSocketTest() : pasc_(&async_manager_), pass_(0, &async_manager_) {}
72 
~PosixSocketTest()73   ~PosixSocketTest() { pass_.Close(); }
74 
connectPair(std::chrono::milliseconds timeout=500ms)75   std::tuple<SocketCon, SocketCon> connectPair(std::chrono::milliseconds timeout = 500ms) {
76     std::mutex m;
77     std::condition_variable cv;
78 
79     std::shared_ptr<AsyncDataChannel> sock1;
80     std::shared_ptr<AsyncDataChannel> sock2;
81 
82     pass_.SetOnConnectCallback(
83             [&](std::shared_ptr<AsyncDataChannel> sock, AsyncDataChannelServer*) {
84               std::unique_lock<std::mutex> guard(m);
85               sock1 = std::move(sock);
86               cv.notify_all();
87             });
88     EXPECT_TRUE(pass_.StartListening());
89 
90     sock2 = pasc_.ConnectToRemoteServer("localhost", pass_.port(), 1000ms);
91     EXPECT_TRUE(sock2.get() != nullptr);
92     EXPECT_TRUE(sock2->Connected());
93 
94     std::unique_lock<std::mutex> lk(m);
95     EXPECT_TRUE(cv.wait_for(lk, timeout, [&] { return sock1.get() != nullptr; }));
96     EXPECT_TRUE(sock1);
97     EXPECT_TRUE(sock1->Connected());
98 
99     return {sock1, sock2};
100   }
101 
102 protected:
103   AsyncManager async_manager_;
104   PosixAsyncSocketConnector pasc_;
105   PosixAsyncSocketServer pass_;
106 };
107 
TEST_F(PosixSocketTest,canConnect)108 TEST_F(PosixSocketTest, canConnect) {
109   auto [sock1, sock2] = connectPair();
110   ASSERT_TRUE(sock1->Connected());
111   ASSERT_TRUE(sock2->Connected());
112 
113   sock1->Close();
114   sock2->Close();
115 
116   ASSERT_FALSE(sock1->Connected());
117   ASSERT_FALSE(sock2->Connected());
118 }
119 
TEST_F(PosixSocketTest,socketSendDoesNotGenerateSigPipe)120 TEST_F(PosixSocketTest, socketSendDoesNotGenerateSigPipe) {
121   // Check that writing to a broken pipe does not generate a SIGPIPE
122   // signal.
123   SigPipeSignalHandler handler;
124   ASSERT_EQ(-1, handler.signaled());
125   auto [sock1, sock2] = connectPair();
126 
127   // s1 and s2 are now connected. Close s1 immediately, then try to
128   // send data through s2.
129   sock1->Close();
130   ASSERT_FALSE(sock1->Connected());
131   // The EPIPE might not happen on the first send due to
132   // TCP packet buffering in the kernel. Perform multiple send()
133   // in a loop to work-around this.
134   errno = 0;
135   const int kMaxSendCount = 1000;
136   int n = 0;
137   while (n < kMaxSendCount) {
138     int ret = sock2->Send((uint8_t*)"xxxx", 4);
139     if (ret < 0) {
140 #ifdef __APPLE__
141       // On OS X, errno is sometimes EPROTOTYPE instead of EPIPE
142       // when this happens.
143       ASSERT_TRUE(errno == EPIPE || errno == EPROTOTYPE) << strerror(errno);
144 #else
145       ASSERT_EQ(EPIPE, errno) << strerror(errno);
146 #endif
147       break;
148     }
149     n++;
150   }
151 
152   // On MacOS you usually have n < 30
153   ASSERT_LT(n, kMaxSendCount);
154 
155   // No signals were raised.
156   ASSERT_EQ(-1, handler.signaled());
157 }
158 
TEST_F(PosixSocketTest,can_send_data_around_poll)159 TEST_F(PosixSocketTest, can_send_data_around_poll) {
160   auto [sock1, sock2] = connectPair();
161   std::string word = "Hello World";
162   std::string input = "           ";
163 
164   ASSERT_EQ(word.size(), input.size());
165   ASSERT_NE(word, input);
166 
167   ssize_t snd = sock1->Send((uint8_t*)word.data(), word.size());
168   ASSERT_EQ((ssize_t)word.size(), snd);
169 
170   uint8_t* buffer = (uint8_t*)input.data();
171   int buflen = input.size();
172 
173   // Poll for at most 250ms.
174   clock::time_point until = clock::now() + 250ms;
175   do {
176     int recv = sock2->Recv(buffer, buflen);
177     if (recv > 0) {
178       buflen -= recv;
179       buffer += recv;
180     }
181   } while (buflen > 0 && clock::now() < until);
182 
183   ASSERT_EQ(word, input);
184 }
185 
TEST_F(PosixSocketTest,data_results_in_read_event)186 TEST_F(PosixSocketTest, data_results_in_read_event) {
187   auto [sock1, sock2] = connectPair();
188   std::mutex m;
189   std::condition_variable cv;
190   std::string word = "Hello World";
191   std::string input = "           ";
192 
193   bool received = false;
194 
195   // Register a callback that only gets called once..
196   sock2->WatchForNonBlockingRead([&](auto sock) {
197     std::unique_lock<std::mutex> guard(m);
198     received = true;
199     // Unregister, to prevent surprises..
200     sock->StopWatching();
201     cv.notify_all();
202   });
203 
204   ssize_t snd = sock1->Send((uint8_t*)word.data(), word.size());
205   ASSERT_EQ((ssize_t)word.size(), snd);
206 
207   {
208     std::unique_lock<std::mutex> lk(m);
209 
210     // The callback will be called within 250ms.
211     ASSERT_TRUE(cv.wait_for(lk, 250ms, [&] { return received; }));
212 
213     uint8_t* buffer = (uint8_t*)input.data();
214     int buflen = input.size();
215 
216     // At least 1 byte is coming in. (Note, we might get just a few
217     // bytes. vs the whole thing as you never know what happens in the
218     // ip stack.)
219     ASSERT_GT(sock2->Recv(buffer, buflen), 0);
220   }
221 }
222 
TEST_F(PosixSocketTest,connectFails)223 TEST_F(PosixSocketTest, connectFails) {
224   int port = pass_.port();
225 
226   // Close the port, we should not be able to connect
227   pass_.Close();
228   ASSERT_FALSE(pass_.Connected());
229 
230   // Max 250ms to go to nowhere...
231   auto socket = pasc_.ConnectToRemoteServer("localhost", port, 250ms);
232   ASSERT_FALSE(socket->Connected());
233 }
234 
TEST_F(PosixSocketTest,canConnectMultiple)235 TEST_F(PosixSocketTest, canConnectMultiple) {
236   int port = pass_.port();
237   int CONNECTION_COUNT = 10;
238   std::mutex m;
239   std::condition_variable cv;
240   std::vector<std::shared_ptr<AsyncDataChannel>> connections;
241   bool connected = false;
242 
243   pass_.SetOnConnectCallback(
244           [&](std::shared_ptr<AsyncDataChannel> const& sock, AsyncDataChannelServer*) {
245             std::unique_lock<std::mutex> guard(m);
246             connections.push_back(sock);
247             connected = true;
248             ASSERT_TRUE(pass_.StartListening());
249             cv.notify_all();
250           });
251   ASSERT_TRUE(pass_.StartListening());
252 
253   for (int i = 0; i < CONNECTION_COUNT; i++) {
254     connected = false;
255     auto socket = pasc_.ConnectToRemoteServer("localhost", port, 250ms);
256     ASSERT_TRUE(socket->Connected());
257     std::unique_lock<std::mutex> lk(m);
258     ASSERT_TRUE(cv.wait_for(lk, 250ms, [&] { return connected; }));
259     connected = false;
260   }
261 
262   ASSERT_EQ(CONNECTION_COUNT, (int)connections.size());
263 }
264 
TEST_F(PosixSocketTest,noConnectWhenNotCallingStart)265 TEST_F(PosixSocketTest, noConnectWhenNotCallingStart) {
266   int port = pass_.port();
267   std::mutex m;
268   std::condition_variable cv;
269   std::vector<std::shared_ptr<AsyncDataChannel>> connections;
270   bool connected = false;
271 
272   pass_.SetOnConnectCallback([&](std::shared_ptr<AsyncDataChannel> sock, AsyncDataChannelServer*) {
273     std::unique_lock<std::mutex> guard(m);
274     connections.push_back(sock);
275     connected = true;
276     cv.notify_all();
277   });
278   ASSERT_TRUE(pass_.StartListening());
279 
280   {
281     connected = false;
282     auto socket = pasc_.ConnectToRemoteServer("localhost", port, 250ms);
283     ASSERT_TRUE(socket->Connected());
284     std::unique_lock<std::mutex> lk(m);
285     ASSERT_TRUE(cv.wait_for(lk, 250ms, [&] { return connected; }));
286   }
287 
288   // After the first connection there was no call to startListening, and hence
289   // no new sockets should be accepted.
290   {
291     connected = false;
292     auto socket = pasc_.ConnectToRemoteServer("localhost", port, 250ms);
293 
294     // We should have a partial connection, so we don't know yet that it is not
295     // working..
296     ASSERT_TRUE(socket->Connected());
297     std::unique_lock<std::mutex> lk(m);
298 
299     // Should timeout, as we never invoke the callback that accepts the socket.
300     ASSERT_FALSE(cv.wait_for(lk, 250ms, [&] { return connected; }));
301   }
302 
303   ASSERT_EQ(1, (int)connections.size());
304 }
305 }  // namespace net
306 }  // namespace android
307