1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "test/core/util/fake_udp_and_tcp_server.h"
18
19 #include <errno.h>
20 #include <string.h>
21
22 #include <set>
23 #include <utility>
24 #include <vector>
25
26 #include "absl/status/statusor.h"
27 #include "absl/strings/str_cat.h"
28
29 #include <grpc/support/log.h>
30 #include <grpc/support/port_platform.h>
31 #include <grpc/support/time.h>
32
33 #include "src/core/lib/address_utils/sockaddr_utils.h"
34 #include "src/core/lib/iomgr/resolved_address.h"
35 #include "src/core/lib/iomgr/sockaddr.h"
36 #include "test/core/util/port.h"
37
38 // IWYU pragma: no_include <arpa/inet.h>
39
40 #ifdef GPR_WINDOWS
41 #include "src/core/lib/iomgr/sockaddr_windows.h"
42 #include "src/core/lib/iomgr/socket_windows.h"
43 #include "src/core/lib/iomgr/tcp_windows.h"
44
45 #define BAD_SOCKET_RETURN_VAL INVALID_SOCKET
46 #define CLOSE_SOCKET closesocket
47 #define ERRNO WSAGetLastError()
48 #else
49 #include <fcntl.h>
50 #include <unistd.h>
51
52 #define BAD_SOCKET_RETURN_VAL (-1)
53 #define CLOSE_SOCKET close
54 #define ERRNO errno
55 #endif
56
57 namespace grpc_core {
58 namespace testing {
59
60 namespace {
61
ErrorIsRetryable(int error)62 bool ErrorIsRetryable(int error) {
63 #ifdef GPR_WINDOWS
64 return error == WSAEWOULDBLOCK || error == WSAEINPROGRESS;
65 #else
66 return error == EWOULDBLOCK || error == EAGAIN;
67 #endif
68 }
69
70 } // namespace
71
FakeUdpAndTcpServer(AcceptMode accept_mode,std::function<FakeUdpAndTcpServer::ProcessReadResult (int,int,int)> process_read_cb)72 FakeUdpAndTcpServer::FakeUdpAndTcpServer(
73 AcceptMode accept_mode,
74 std::function<FakeUdpAndTcpServer::ProcessReadResult(int, int, int)>
75 process_read_cb)
76 : accept_mode_(accept_mode), process_read_cb_(std::move(process_read_cb)) {
77 port_ = grpc_pick_unused_port_or_die();
78 udp_socket_ = socket(AF_INET6, SOCK_DGRAM, 0);
79 if (udp_socket_ == BAD_SOCKET_RETURN_VAL) {
80 gpr_log(GPR_ERROR, "Failed to create UDP ipv6 socket: %d", ERRNO);
81 GPR_ASSERT(0);
82 }
83 accept_socket_ = socket(AF_INET6, SOCK_STREAM, 0);
84 address_ = absl::StrCat("[::1]:", port_);
85 if (accept_socket_ == BAD_SOCKET_RETURN_VAL) {
86 gpr_log(GPR_ERROR, "Failed to create TCP IPv6 socket: %d", ERRNO);
87 GPR_ASSERT(0);
88 }
89 #ifdef GPR_WINDOWS
90 char val = 1;
91 if (setsockopt(accept_socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) ==
92 SOCKET_ERROR) {
93 gpr_log(GPR_ERROR,
94 "Failed to set SO_REUSEADDR on TCP ipv6 socket to [::1]:%d, "
95 "errno: %d",
96 port_, ERRNO);
97 GPR_ASSERT(0);
98 }
99 grpc_error_handle set_non_block_error;
100 set_non_block_error = grpc_tcp_set_non_block(udp_socket_);
101 if (!set_non_block_error.ok()) {
102 gpr_log(GPR_ERROR, "Failed to configure non-blocking socket: %s",
103 StatusToString(set_non_block_error).c_str());
104 GPR_ASSERT(0);
105 }
106 set_non_block_error = grpc_tcp_set_non_block(accept_socket_);
107 if (!set_non_block_error.ok()) {
108 gpr_log(GPR_ERROR, "Failed to configure non-blocking socket: %s",
109 StatusToString(set_non_block_error).c_str());
110 GPR_ASSERT(0);
111 }
112 #else
113 int val = 1;
114 if (setsockopt(accept_socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) !=
115 0) {
116 gpr_log(GPR_ERROR, "Failed to set SO_REUSEADDR on socket [::1]:%d", port_);
117 GPR_ASSERT(0);
118 }
119 if (fcntl(udp_socket_, F_SETFL, O_NONBLOCK) != 0) {
120 gpr_log(GPR_ERROR, "Failed to set O_NONBLOCK on socket: %d", ERRNO);
121 GPR_ASSERT(0);
122 }
123 if (fcntl(accept_socket_, F_SETFL, O_NONBLOCK) != 0) {
124 gpr_log(GPR_ERROR, "Failed to set O_NONBLOCK on socket: %d", ERRNO);
125 GPR_ASSERT(0);
126 }
127 #endif
128 sockaddr_in6 addr;
129 memset(&addr, 0, sizeof(addr));
130 addr.sin6_family = AF_INET6;
131 addr.sin6_port = htons(port_);
132 (reinterpret_cast<char*>(&addr.sin6_addr))[15] = 1;
133 grpc_resolved_address resolved_addr;
134 memcpy(resolved_addr.addr, &addr, sizeof(addr));
135 resolved_addr.len = sizeof(addr);
136 std::string addr_str = grpc_sockaddr_to_string(&resolved_addr, false).value();
137 gpr_log(GPR_INFO, "Fake UDP and TCP server listening on %s",
138 addr_str.c_str());
139 if (bind(udp_socket_, reinterpret_cast<const sockaddr*>(&addr),
140 sizeof(addr)) != 0) {
141 gpr_log(GPR_ERROR, "Failed to bind UDP socket to [::1]:%d", port_);
142 GPR_ASSERT(0);
143 }
144 if (bind(accept_socket_, reinterpret_cast<const sockaddr*>(&addr),
145 sizeof(addr)) != 0) {
146 gpr_log(GPR_ERROR, "Failed to bind TCP socket to [::1]:%d : %d", port_,
147 ERRNO);
148 GPR_ASSERT(0);
149 }
150 if (listen(accept_socket_, 100)) {
151 gpr_log(GPR_ERROR, "Failed to listen on socket bound to [::1]:%d : %d",
152 port_, ERRNO);
153 GPR_ASSERT(0);
154 }
155 gpr_event_init(&stop_ev_);
156 run_server_loop_thd_ = std::make_unique<std::thread>(
157 std::bind(&FakeUdpAndTcpServer::RunServerLoop, this));
158 }
159
~FakeUdpAndTcpServer()160 FakeUdpAndTcpServer::~FakeUdpAndTcpServer() {
161 gpr_log(GPR_DEBUG,
162 "FakeUdpAndTcpServer stop and "
163 "join server thread");
164 gpr_event_set(&stop_ev_, reinterpret_cast<void*>(1));
165 run_server_loop_thd_->join();
166 gpr_log(GPR_DEBUG,
167 "FakeUdpAndTcpServer join server "
168 "thread complete");
169 CLOSE_SOCKET(accept_socket_);
170 CLOSE_SOCKET(udp_socket_);
171 }
172
173 FakeUdpAndTcpServer::ProcessReadResult
CloseSocketUponReceivingBytesFromPeer(int bytes_received_size,int read_error,int s)174 FakeUdpAndTcpServer::CloseSocketUponReceivingBytesFromPeer(
175 int bytes_received_size, int read_error, int s) {
176 if (bytes_received_size < 0 && !ErrorIsRetryable(read_error)) {
177 gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s,
178 read_error);
179 GPR_ASSERT(0);
180 }
181 if (bytes_received_size >= 0) {
182 gpr_log(GPR_DEBUG,
183 "Fake TCP server received %d bytes from peer socket: %d. Close "
184 "the "
185 "connection.",
186 bytes_received_size, s);
187 return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket;
188 }
189 return FakeUdpAndTcpServer::ProcessReadResult::kContinueReading;
190 }
191
192 FakeUdpAndTcpServer::ProcessReadResult
CloseSocketUponCloseFromPeer(int bytes_received_size,int read_error,int s)193 FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer(int bytes_received_size,
194 int read_error, int s) {
195 if (bytes_received_size < 0 && !ErrorIsRetryable(read_error)) {
196 gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s,
197 read_error);
198 GPR_ASSERT(0);
199 }
200 if (bytes_received_size == 0) {
201 // The peer has shut down the connection.
202 gpr_log(GPR_DEBUG,
203 "Fake TCP server received 0 bytes from peer socket: %d. Close "
204 "the "
205 "connection.",
206 s);
207 return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket;
208 }
209 return FakeUdpAndTcpServer::ProcessReadResult::kContinueReading;
210 }
211
212 FakeUdpAndTcpServer::ProcessReadResult
SendThreeAllZeroBytes(int bytes_received_size,int read_error,int s)213 FakeUdpAndTcpServer::SendThreeAllZeroBytes(int bytes_received_size,
214 int read_error, int s) {
215 if (bytes_received_size < 0 && !ErrorIsRetryable(read_error)) {
216 gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s,
217 read_error);
218 GPR_ASSERT(0);
219 }
220 if (bytes_received_size == 0) {
221 // The peer has shut down the connection.
222 gpr_log(GPR_DEBUG, "Fake TCP server received 0 bytes from peer socket: %d.",
223 s);
224 return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket;
225 }
226 char buf[3] = {0, 0, 0};
227 int bytes_sent = send(s, buf, sizeof(buf), 0);
228 gpr_log(GPR_DEBUG,
229 "Fake TCP server sent %d all-zero bytes on peer socket: %d.",
230 bytes_sent, s);
231 return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket;
232 }
233
FakeUdpAndTcpServerPeer(int fd)234 FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::FakeUdpAndTcpServerPeer(int fd)
235 : fd_(fd) {}
236
~FakeUdpAndTcpServerPeer()237 FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::~FakeUdpAndTcpServerPeer() {
238 CLOSE_SOCKET(fd_);
239 }
240
241 void FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::
MaybeContinueSendingSettings()242 MaybeContinueSendingSettings() {
243 // https://tools.ietf.org/html/rfc7540#section-4.1
244 const std::vector<char> kEmptyHttp2SettingsFrame = {
245 0x00, 0x00, 0x00, // length
246 0x04, // settings type
247 0x00, // flags
248 0x00, 0x00, 0x00, 0x00 // stream identifier
249 };
250 if (total_bytes_sent_ < static_cast<int>(kEmptyHttp2SettingsFrame.size())) {
251 int bytes_to_send = kEmptyHttp2SettingsFrame.size() - total_bytes_sent_;
252 int bytes_sent =
253 send(fd_, kEmptyHttp2SettingsFrame.data() + total_bytes_sent_,
254 bytes_to_send, 0);
255 if (bytes_sent < 0 && !ErrorIsRetryable(ERRNO)) {
256 gpr_log(GPR_ERROR,
257 "Fake TCP server encountered unexpected error:%d "
258 "sending %d bytes on fd:%d",
259 ERRNO, bytes_to_send, fd_);
260 GPR_ASSERT(0);
261 } else if (bytes_sent > 0) {
262 total_bytes_sent_ += bytes_sent;
263 GPR_ASSERT(total_bytes_sent_ <= int(kEmptyHttp2SettingsFrame.size()));
264 }
265 }
266 }
267
ReadFromUdpSocket()268 void FakeUdpAndTcpServer::ReadFromUdpSocket() {
269 char buf[100];
270 recvfrom(udp_socket_, buf, sizeof(buf), 0, nullptr, nullptr);
271 }
272
RunServerLoop()273 void FakeUdpAndTcpServer::RunServerLoop() {
274 std::set<std::unique_ptr<FakeUdpAndTcpServerPeer>> peers;
275 while (!gpr_event_get(&stop_ev_)) {
276 // handle TCP connections
277 int p = accept(accept_socket_, nullptr, nullptr);
278 if (p != BAD_SOCKET_RETURN_VAL) {
279 gpr_log(GPR_DEBUG, "accepted peer socket: %d", p);
280 #ifdef GPR_WINDOWS
281 grpc_error_handle set_non_block_error;
282 set_non_block_error = grpc_tcp_set_non_block(p);
283 if (!set_non_block_error.ok()) {
284 gpr_log(GPR_ERROR, "Failed to configure non-blocking socket: %s",
285 StatusToString(set_non_block_error).c_str());
286 GPR_ASSERT(0);
287 }
288 #else
289 if (fcntl(p, F_SETFL, O_NONBLOCK) != 0) {
290 gpr_log(GPR_ERROR, "Failed to configure non-blocking socket, errno: %d",
291 ERRNO);
292 GPR_ASSERT(0);
293 }
294 #endif
295 peers.insert(std::make_unique<FakeUdpAndTcpServerPeer>(p));
296 }
297 auto it = peers.begin();
298 while (it != peers.end()) {
299 FakeUdpAndTcpServerPeer* peer = (*it).get();
300 if (accept_mode_ == AcceptMode::kEagerlySendSettings) {
301 peer->MaybeContinueSendingSettings();
302 }
303 char buf[100];
304 int bytes_received_size = recv(peer->fd(), buf, 100, 0);
305 FakeUdpAndTcpServer::ProcessReadResult r =
306 process_read_cb_(bytes_received_size, ERRNO, peer->fd());
307 if (r == FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket) {
308 it = peers.erase(it);
309 } else {
310 GPR_ASSERT(r ==
311 FakeUdpAndTcpServer::ProcessReadResult::kContinueReading);
312 it++;
313 }
314 }
315 // read from the UDP socket
316 ReadFromUdpSocket();
317 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
318 gpr_time_from_millis(10, GPR_TIMESPAN)));
319 }
320 }
321
322 } // namespace testing
323 } // namespace grpc_core
324