1 // Copyright (C) 2021 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <errno.h>
16 #include <gtest/gtest.h>
17 #include <netdb.h>
18 #include <netinet/in.h>
19 #include <signal.h>
20 #include <string.h>
21 #include <sys/socket.h>
22 #include <sys/types.h>
23 #include <unistd.h>
24
25 #include <condition_variable>
26 #include <cstdint>
27 #include <cstring>
28 #include <functional>
29 #include <memory>
30 #include <mutex>
31 #include <random>
32 #include <vector>
33
34 #include "model/setup/async_manager.h"
35 #include "net/posix/posix_async_socket_connector.h"
36 #include "net/posix/posix_async_socket_server.h"
37
38 namespace android {
39 namespace net {
40
41 using clock = std::chrono::system_clock;
42
43 class SigPipeSignalHandler {
44 public:
SigPipeSignalHandler()45 SigPipeSignalHandler() {
46 sSignal = -1;
47 struct sigaction act = {};
48 act.sa_handler = myHandler;
49 ::sigaction(SIGPIPE, &act, &mOldAction);
50 }
51
~SigPipeSignalHandler()52 ~SigPipeSignalHandler() { ::sigaction(SIGPIPE, &mOldAction, nullptr); }
53
signaled() const54 int signaled() const { return sSignal; }
55
56 private:
57 struct sigaction mOldAction;
58
59 static int sSignal;
60
myHandler(int sig)61 static void myHandler(int sig) { sSignal = sig; }
62 };
63
64 // static
65 int SigPipeSignalHandler::sSignal = 0;
66
67 using SocketCon = std::shared_ptr<AsyncDataChannel>;
68
69 class PosixSocketTest : public testing::Test {
70 public:
PosixSocketTest()71 PosixSocketTest() : pasc_(&async_manager_), pass_(0, &async_manager_) {}
72
~PosixSocketTest()73 ~PosixSocketTest() { pass_.Close(); }
74
connectPair(std::chrono::milliseconds timeout=500ms)75 std::tuple<SocketCon, SocketCon> connectPair(std::chrono::milliseconds timeout = 500ms) {
76 std::mutex m;
77 std::condition_variable cv;
78
79 std::shared_ptr<AsyncDataChannel> sock1;
80 std::shared_ptr<AsyncDataChannel> sock2;
81
82 pass_.SetOnConnectCallback(
83 [&](std::shared_ptr<AsyncDataChannel> sock, AsyncDataChannelServer*) {
84 std::unique_lock<std::mutex> guard(m);
85 sock1 = std::move(sock);
86 cv.notify_all();
87 });
88 EXPECT_TRUE(pass_.StartListening());
89
90 sock2 = pasc_.ConnectToRemoteServer("localhost", pass_.port(), 1000ms);
91 EXPECT_TRUE(sock2.get() != nullptr);
92 EXPECT_TRUE(sock2->Connected());
93
94 std::unique_lock<std::mutex> lk(m);
95 EXPECT_TRUE(cv.wait_for(lk, timeout, [&] { return sock1.get() != nullptr; }));
96 EXPECT_TRUE(sock1);
97 EXPECT_TRUE(sock1->Connected());
98
99 return {sock1, sock2};
100 }
101
102 protected:
103 AsyncManager async_manager_;
104 PosixAsyncSocketConnector pasc_;
105 PosixAsyncSocketServer pass_;
106 };
107
TEST_F(PosixSocketTest,canConnect)108 TEST_F(PosixSocketTest, canConnect) {
109 auto [sock1, sock2] = connectPair();
110 ASSERT_TRUE(sock1->Connected());
111 ASSERT_TRUE(sock2->Connected());
112
113 sock1->Close();
114 sock2->Close();
115
116 ASSERT_FALSE(sock1->Connected());
117 ASSERT_FALSE(sock2->Connected());
118 }
119
TEST_F(PosixSocketTest,socketSendDoesNotGenerateSigPipe)120 TEST_F(PosixSocketTest, socketSendDoesNotGenerateSigPipe) {
121 // Check that writing to a broken pipe does not generate a SIGPIPE
122 // signal.
123 SigPipeSignalHandler handler;
124 ASSERT_EQ(-1, handler.signaled());
125 auto [sock1, sock2] = connectPair();
126
127 // s1 and s2 are now connected. Close s1 immediately, then try to
128 // send data through s2.
129 sock1->Close();
130 ASSERT_FALSE(sock1->Connected());
131 // The EPIPE might not happen on the first send due to
132 // TCP packet buffering in the kernel. Perform multiple send()
133 // in a loop to work-around this.
134 errno = 0;
135 const int kMaxSendCount = 1000;
136 int n = 0;
137 while (n < kMaxSendCount) {
138 int ret = sock2->Send((uint8_t*)"xxxx", 4);
139 if (ret < 0) {
140 #ifdef __APPLE__
141 // On OS X, errno is sometimes EPROTOTYPE instead of EPIPE
142 // when this happens.
143 ASSERT_TRUE(errno == EPIPE || errno == EPROTOTYPE) << strerror(errno);
144 #else
145 ASSERT_EQ(EPIPE, errno) << strerror(errno);
146 #endif
147 break;
148 }
149 n++;
150 }
151
152 // On MacOS you usually have n < 30
153 ASSERT_LT(n, kMaxSendCount);
154
155 // No signals were raised.
156 ASSERT_EQ(-1, handler.signaled());
157 }
158
TEST_F(PosixSocketTest,can_send_data_around_poll)159 TEST_F(PosixSocketTest, can_send_data_around_poll) {
160 auto [sock1, sock2] = connectPair();
161 std::string word = "Hello World";
162 std::string input = " ";
163
164 ASSERT_EQ(word.size(), input.size());
165 ASSERT_NE(word, input);
166
167 ssize_t snd = sock1->Send((uint8_t*)word.data(), word.size());
168 ASSERT_EQ((ssize_t)word.size(), snd);
169
170 uint8_t* buffer = (uint8_t*)input.data();
171 int buflen = input.size();
172
173 // Poll for at most 250ms.
174 clock::time_point until = clock::now() + 250ms;
175 do {
176 int recv = sock2->Recv(buffer, buflen);
177 if (recv > 0) {
178 buflen -= recv;
179 buffer += recv;
180 }
181 } while (buflen > 0 && clock::now() < until);
182
183 ASSERT_EQ(word, input);
184 }
185
TEST_F(PosixSocketTest,data_results_in_read_event)186 TEST_F(PosixSocketTest, data_results_in_read_event) {
187 auto [sock1, sock2] = connectPair();
188 std::mutex m;
189 std::condition_variable cv;
190 std::string word = "Hello World";
191 std::string input = " ";
192
193 bool received = false;
194
195 // Register a callback that only gets called once..
196 sock2->WatchForNonBlockingRead([&](auto sock) {
197 std::unique_lock<std::mutex> guard(m);
198 received = true;
199 // Unregister, to prevent surprises..
200 sock->StopWatching();
201 cv.notify_all();
202 });
203
204 ssize_t snd = sock1->Send((uint8_t*)word.data(), word.size());
205 ASSERT_EQ((ssize_t)word.size(), snd);
206
207 {
208 std::unique_lock<std::mutex> lk(m);
209
210 // The callback will be called within 250ms.
211 ASSERT_TRUE(cv.wait_for(lk, 250ms, [&] { return received; }));
212
213 uint8_t* buffer = (uint8_t*)input.data();
214 int buflen = input.size();
215
216 // At least 1 byte is coming in. (Note, we might get just a few
217 // bytes. vs the whole thing as you never know what happens in the
218 // ip stack.)
219 ASSERT_GT(sock2->Recv(buffer, buflen), 0);
220 }
221 }
222
TEST_F(PosixSocketTest,connectFails)223 TEST_F(PosixSocketTest, connectFails) {
224 int port = pass_.port();
225
226 // Close the port, we should not be able to connect
227 pass_.Close();
228 ASSERT_FALSE(pass_.Connected());
229
230 // Max 250ms to go to nowhere...
231 auto socket = pasc_.ConnectToRemoteServer("localhost", port, 250ms);
232 ASSERT_FALSE(socket->Connected());
233 }
234
TEST_F(PosixSocketTest,canConnectMultiple)235 TEST_F(PosixSocketTest, canConnectMultiple) {
236 int port = pass_.port();
237 int CONNECTION_COUNT = 10;
238 std::mutex m;
239 std::condition_variable cv;
240 std::vector<std::shared_ptr<AsyncDataChannel>> connections;
241 bool connected = false;
242
243 pass_.SetOnConnectCallback(
244 [&](std::shared_ptr<AsyncDataChannel> const& sock, AsyncDataChannelServer*) {
245 std::unique_lock<std::mutex> guard(m);
246 connections.push_back(sock);
247 connected = true;
248 ASSERT_TRUE(pass_.StartListening());
249 cv.notify_all();
250 });
251 ASSERT_TRUE(pass_.StartListening());
252
253 for (int i = 0; i < CONNECTION_COUNT; i++) {
254 connected = false;
255 auto socket = pasc_.ConnectToRemoteServer("localhost", port, 250ms);
256 ASSERT_TRUE(socket->Connected());
257 std::unique_lock<std::mutex> lk(m);
258 ASSERT_TRUE(cv.wait_for(lk, 250ms, [&] { return connected; }));
259 connected = false;
260 }
261
262 ASSERT_EQ(CONNECTION_COUNT, (int)connections.size());
263 }
264
TEST_F(PosixSocketTest,noConnectWhenNotCallingStart)265 TEST_F(PosixSocketTest, noConnectWhenNotCallingStart) {
266 int port = pass_.port();
267 std::mutex m;
268 std::condition_variable cv;
269 std::vector<std::shared_ptr<AsyncDataChannel>> connections;
270 bool connected = false;
271
272 pass_.SetOnConnectCallback([&](std::shared_ptr<AsyncDataChannel> sock, AsyncDataChannelServer*) {
273 std::unique_lock<std::mutex> guard(m);
274 connections.push_back(sock);
275 connected = true;
276 cv.notify_all();
277 });
278 ASSERT_TRUE(pass_.StartListening());
279
280 {
281 connected = false;
282 auto socket = pasc_.ConnectToRemoteServer("localhost", port, 250ms);
283 ASSERT_TRUE(socket->Connected());
284 std::unique_lock<std::mutex> lk(m);
285 ASSERT_TRUE(cv.wait_for(lk, 250ms, [&] { return connected; }));
286 }
287
288 // After the first connection there was no call to startListening, and hence
289 // no new sockets should be accepted.
290 {
291 connected = false;
292 auto socket = pasc_.ConnectToRemoteServer("localhost", port, 250ms);
293
294 // We should have a partial connection, so we don't know yet that it is not
295 // working..
296 ASSERT_TRUE(socket->Connected());
297 std::unique_lock<std::mutex> lk(m);
298
299 // Should timeout, as we never invoke the callback that accepts the socket.
300 ASSERT_FALSE(cv.wait_for(lk, 250ms, [&] { return connected; }));
301 }
302
303 ASSERT_EQ(1, (int)connections.size());
304 }
305 } // namespace net
306 } // namespace android
307