1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <inttypes.h>
22 
23 #include "src/core/lib/iomgr/port.h"
24 
25 #ifdef GRPC_WINSOCK_SOCKET
26 
27 #include <grpc/event_engine/endpoint_config.h>
28 #include <grpc/slice_buffer.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/log_windows.h>
32 
33 #include "src/core/lib/address_utils/sockaddr_utils.h"
34 #include "src/core/lib/event_engine/shim.h"
35 #include "src/core/lib/gprpp/crash.h"
36 #include "src/core/lib/iomgr/event_engine_shims/tcp_client.h"
37 #include "src/core/lib/iomgr/iocp_windows.h"
38 #include "src/core/lib/iomgr/sockaddr.h"
39 #include "src/core/lib/iomgr/sockaddr_windows.h"
40 #include "src/core/lib/iomgr/socket_windows.h"
41 #include "src/core/lib/iomgr/tcp_client.h"
42 #include "src/core/lib/iomgr/tcp_windows.h"
43 #include "src/core/lib/iomgr/timer.h"
44 #include "src/core/lib/resource_quota/api.h"
45 #include "src/core/lib/slice/slice_internal.h"
46 
47 using ::grpc_event_engine::experimental::EndpointConfig;
48 
49 struct async_connect {
50   grpc_closure* on_done;
51   gpr_mu mu;
52   grpc_winsocket* socket;
53   grpc_timer alarm;
54   grpc_closure on_alarm;
55   std::string addr_name;
56   int refs;
57   grpc_closure on_connect;
58   grpc_endpoint** endpoint;
59 };
60 
async_connect_unlock_and_cleanup(async_connect * ac,grpc_winsocket * socket)61 static void async_connect_unlock_and_cleanup(async_connect* ac,
62                                              grpc_winsocket* socket) {
63   int done = (--ac->refs == 0);
64   gpr_mu_unlock(&ac->mu);
65   if (done) {
66     gpr_mu_destroy(&ac->mu);
67     delete ac;
68   }
69   if (socket != NULL) grpc_winsocket_destroy(socket);
70 }
71 
on_alarm(void * acp,grpc_error_handle)72 static void on_alarm(void* acp, grpc_error_handle /* error */) {
73   async_connect* ac = (async_connect*)acp;
74   gpr_mu_lock(&ac->mu);
75   grpc_winsocket* socket = ac->socket;
76   ac->socket = NULL;
77   if (socket != NULL) {
78     grpc_winsocket_shutdown(socket);
79   }
80   async_connect_unlock_and_cleanup(ac, socket);
81 }
82 
on_connect(void * acp,grpc_error_handle error)83 static void on_connect(void* acp, grpc_error_handle error) {
84   async_connect* ac = (async_connect*)acp;
85   grpc_endpoint** ep = ac->endpoint;
86   GPR_ASSERT(*ep == NULL);
87   grpc_closure* on_done = ac->on_done;
88 
89   gpr_mu_lock(&ac->mu);
90   grpc_winsocket* socket = ac->socket;
91   ac->socket = NULL;
92   gpr_mu_unlock(&ac->mu);
93 
94   grpc_timer_cancel(&ac->alarm);
95 
96   gpr_mu_lock(&ac->mu);
97 
98   if (error.ok()) {
99     if (socket != NULL) {
100       DWORD transfered_bytes = 0;
101       DWORD flags;
102       BOOL wsa_success =
103           WSAGetOverlappedResult(socket->socket, &socket->write_info.overlapped,
104                                  &transfered_bytes, FALSE, &flags);
105       GPR_ASSERT(transfered_bytes == 0);
106       if (!wsa_success) {
107         error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx");
108         closesocket(socket->socket);
109       } else {
110         *ep = grpc_tcp_create(socket, ac->addr_name);
111         socket = nullptr;
112       }
113     } else {
114       error = GRPC_ERROR_CREATE("socket is null");
115     }
116   }
117 
118   async_connect_unlock_and_cleanup(ac, socket);
119   // If the connection was aborted, the callback was already called when
120   // the deadline was met.
121   grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, error);
122 }
123 
124 // Tries to issue one async connection, then schedules both an IOCP
125 // notification request for the connection, and one timeout alert.
tcp_connect(grpc_closure * on_done,grpc_endpoint ** endpoint,grpc_pollset_set *,const EndpointConfig & config,const grpc_resolved_address * addr,grpc_core::Timestamp deadline)126 static int64_t tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint,
127                            grpc_pollset_set* /* interested_parties */,
128                            const EndpointConfig& config,
129                            const grpc_resolved_address* addr,
130                            grpc_core::Timestamp deadline) {
131   if (grpc_event_engine::experimental::UseEventEngineClient()) {
132     return grpc_event_engine::experimental::event_engine_tcp_client_connect(
133         on_done, endpoint, config, addr, deadline);
134   }
135   SOCKET sock = INVALID_SOCKET;
136   BOOL success;
137   int status;
138   grpc_resolved_address addr6_v4mapped;
139   grpc_resolved_address local_address;
140   grpc_winsocket* socket = NULL;
141   LPFN_CONNECTEX ConnectEx;
142   GUID guid = WSAID_CONNECTEX;
143   DWORD ioctl_num_bytes;
144   grpc_winsocket_callback_info* info;
145   grpc_error_handle error;
146   async_connect* ac = NULL;
147   absl::StatusOr<std::string> addr_uri;
148 
149   addr_uri = grpc_sockaddr_to_uri(addr);
150   if (!addr_uri.ok()) {
151     error = GRPC_ERROR_CREATE(addr_uri.status().ToString());
152     goto failure;
153   }
154 
155   *endpoint = NULL;
156 
157   // Use dualstack sockets where available.
158   if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
159     addr = &addr6_v4mapped;
160   }
161 
162   sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
163                    grpc_get_default_wsa_socket_flags());
164   if (sock == INVALID_SOCKET) {
165     error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
166     goto failure;
167   }
168 
169   error = grpc_tcp_prepare_socket(sock);
170   if (!error.ok()) {
171     goto failure;
172   }
173 
174   // Grab the function pointer for ConnectEx for that specific socket.
175   // It may change depending on the interface.
176   status =
177       WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
178                &ConnectEx, sizeof(ConnectEx), &ioctl_num_bytes, NULL, NULL);
179 
180   if (status != 0) {
181     error = GRPC_WSA_ERROR(WSAGetLastError(),
182                            "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER)");
183     goto failure;
184   }
185 
186   grpc_sockaddr_make_wildcard6(0, &local_address);
187 
188   status =
189       bind(sock, (grpc_sockaddr*)&local_address.addr, (int)local_address.len);
190   if (status != 0) {
191     error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
192     goto failure;
193   }
194 
195   socket = grpc_winsocket_create(sock, "client");
196   info = &socket->write_info;
197   success = ConnectEx(sock, (grpc_sockaddr*)&addr->addr, (int)addr->len, NULL,
198                       0, NULL, &info->overlapped);
199 
200   // It wouldn't be unusual to get a success immediately. But we'll still get
201   // an IOCP notification, so let's ignore it.
202   if (!success) {
203     int last_error = WSAGetLastError();
204     if (last_error != ERROR_IO_PENDING) {
205       error = GRPC_WSA_ERROR(last_error, "ConnectEx");
206       goto failure;
207     }
208   }
209 
210   ac = new async_connect();
211   ac->on_done = on_done;
212   ac->socket = socket;
213   gpr_mu_init(&ac->mu);
214   ac->refs = 2;
215   ac->addr_name = addr_uri.value();
216   ac->endpoint = endpoint;
217   GRPC_CLOSURE_INIT(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx);
218 
219   GRPC_CLOSURE_INIT(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx);
220   gpr_mu_lock(&ac->mu);
221   grpc_timer_init(&ac->alarm, deadline, &ac->on_alarm);
222   grpc_socket_notify_on_write(socket, &ac->on_connect);
223   gpr_mu_unlock(&ac->mu);
224   return 0;
225 
226 failure:
227   GPR_ASSERT(!error.ok());
228   grpc_error_handle final_error = grpc_error_set_str(
229       GRPC_ERROR_CREATE_REFERENCING("Failed to connect", &error, 1),
230       grpc_core::StatusStrProperty::kTargetAddress,
231       addr_uri.ok() ? *addr_uri : addr_uri.status().ToString());
232   if (socket != NULL) {
233     grpc_winsocket_destroy(socket);
234   } else if (sock != INVALID_SOCKET) {
235     closesocket(sock);
236   }
237   grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, final_error);
238   return 0;
239 }
240 
tcp_cancel_connect(int64_t connection_handle)241 static bool tcp_cancel_connect(int64_t connection_handle) {
242   if (grpc_event_engine::experimental::UseEventEngineClient()) {
243     return grpc_event_engine::experimental::
244         event_engine_tcp_client_cancel_connect(connection_handle);
245   }
246   return false;
247 }
248 
249 grpc_tcp_client_vtable grpc_windows_tcp_client_vtable = {tcp_connect,
250                                                          tcp_cancel_connect};
251 
252 #endif  // GRPC_WINSOCK_SOCKET
253