1 // Copyright 2022 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/quic/core/io/event_loop_connecting_client_socket.h"
6 
7 #include <limits>
8 #include <string>
9 #include <utility>
10 
11 #include "absl/status/status.h"
12 #include "absl/status/statusor.h"
13 #include "absl/strings/string_view.h"
14 #include "absl/types/span.h"
15 #include "absl/types/variant.h"
16 #include "quiche/quic/core/io/quic_event_loop.h"
17 #include "quiche/quic/core/io/socket.h"
18 #include "quiche/quic/platform/api/quic_socket_address.h"
19 #include "quiche/common/platform/api/quiche_logging.h"
20 #include "quiche/common/platform/api/quiche_mem_slice.h"
21 
22 namespace quic {
23 
EventLoopConnectingClientSocket(socket_api::SocketProtocol protocol,const quic::QuicSocketAddress & peer_address,QuicByteCount receive_buffer_size,QuicByteCount send_buffer_size,QuicEventLoop * event_loop,quiche::QuicheBufferAllocator * buffer_allocator,AsyncVisitor * async_visitor)24 EventLoopConnectingClientSocket::EventLoopConnectingClientSocket(
25     socket_api::SocketProtocol protocol,
26     const quic::QuicSocketAddress& peer_address,
27     QuicByteCount receive_buffer_size, QuicByteCount send_buffer_size,
28     QuicEventLoop* event_loop, quiche::QuicheBufferAllocator* buffer_allocator,
29     AsyncVisitor* async_visitor)
30     : protocol_(protocol),
31       peer_address_(peer_address),
32       receive_buffer_size_(receive_buffer_size),
33       send_buffer_size_(send_buffer_size),
34       event_loop_(event_loop),
35       buffer_allocator_(buffer_allocator),
36       async_visitor_(async_visitor) {
37   QUICHE_DCHECK(event_loop_);
38   QUICHE_DCHECK(buffer_allocator_);
39 }
40 
~EventLoopConnectingClientSocket()41 EventLoopConnectingClientSocket::~EventLoopConnectingClientSocket() {
42   // Connected socket must be closed via Disconnect() before destruction. Cannot
43   // safely recover if state indicates caller may be expecting async callbacks.
44   QUICHE_DCHECK(connect_status_ != ConnectStatus::kConnecting);
45   QUICHE_DCHECK(!receive_max_size_.has_value());
46   QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
47   if (descriptor_ != kInvalidSocketFd) {
48     QUICHE_BUG(quic_event_loop_connecting_socket_invalid_destruction)
49         << "Must call Disconnect() on connected socket before destruction.";
50     Close();
51   }
52 
53   QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
54   QUICHE_DCHECK(send_remaining_.empty());
55 }
56 
ConnectBlocking()57 absl::Status EventLoopConnectingClientSocket::ConnectBlocking() {
58   QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
59   QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
60   QUICHE_DCHECK(!receive_max_size_.has_value());
61   QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
62 
63   absl::Status status = Open();
64   if (!status.ok()) {
65     return status;
66   }
67 
68   status = socket_api::SetSocketBlocking(descriptor_, /*blocking=*/true);
69   if (!status.ok()) {
70     QUICHE_LOG_FIRST_N(WARNING, 100)
71         << "Failed to set socket to address: " << peer_address_.ToString()
72         << " as blocking for connect with error: " << status;
73     Close();
74     return status;
75   }
76 
77   status = DoInitialConnect();
78 
79   if (absl::IsUnavailable(status)) {
80     QUICHE_LOG_FIRST_N(ERROR, 100)
81         << "Non-blocking connect to should-be blocking socket to address:"
82         << peer_address_.ToString() << ".";
83     Close();
84     connect_status_ = ConnectStatus::kNotConnected;
85     return status;
86   } else if (!status.ok()) {
87     // DoInitialConnect() closes the socket on failures.
88     QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
89     QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
90     return status;
91   }
92 
93   status = socket_api::SetSocketBlocking(descriptor_, /*blocking=*/false);
94   if (!status.ok()) {
95     QUICHE_LOG_FIRST_N(WARNING, 100)
96         << "Failed to return socket to address: " << peer_address_.ToString()
97         << " to non-blocking after connect with error: " << status;
98     Close();
99     connect_status_ = ConnectStatus::kNotConnected;
100   }
101 
102   QUICHE_DCHECK(connect_status_ != ConnectStatus::kConnecting);
103   return status;
104 }
105 
ConnectAsync()106 void EventLoopConnectingClientSocket::ConnectAsync() {
107   QUICHE_DCHECK(async_visitor_);
108   QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
109   QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
110   QUICHE_DCHECK(!receive_max_size_.has_value());
111   QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
112 
113   absl::Status status = Open();
114   if (!status.ok()) {
115     async_visitor_->ConnectComplete(status);
116     return;
117   }
118 
119   FinishOrRearmAsyncConnect(DoInitialConnect());
120 }
121 
Disconnect()122 void EventLoopConnectingClientSocket::Disconnect() {
123   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
124   QUICHE_DCHECK(connect_status_ != ConnectStatus::kNotConnected);
125 
126   Close();
127   QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
128 
129   // Reset all state before invoking any callbacks.
130   bool require_connect_callback = connect_status_ == ConnectStatus::kConnecting;
131   connect_status_ = ConnectStatus::kNotConnected;
132   bool require_receive_callback = receive_max_size_.has_value();
133   receive_max_size_.reset();
134   bool require_send_callback =
135       !absl::holds_alternative<absl::monostate>(send_data_);
136   send_data_ = absl::monostate();
137   send_remaining_ = "";
138 
139   if (require_connect_callback) {
140     QUICHE_DCHECK(async_visitor_);
141     async_visitor_->ConnectComplete(absl::CancelledError());
142   }
143   if (require_receive_callback) {
144     QUICHE_DCHECK(async_visitor_);
145     async_visitor_->ReceiveComplete(absl::CancelledError());
146   }
147   if (require_send_callback) {
148     QUICHE_DCHECK(async_visitor_);
149     async_visitor_->SendComplete(absl::CancelledError());
150   }
151 }
152 
153 absl::StatusOr<QuicSocketAddress>
GetLocalAddress()154 EventLoopConnectingClientSocket::GetLocalAddress() {
155   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
156   QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
157 
158   return socket_api::GetSocketAddress(descriptor_);
159 }
160 
161 absl::StatusOr<quiche::QuicheMemSlice>
ReceiveBlocking(QuicByteCount max_size)162 EventLoopConnectingClientSocket::ReceiveBlocking(QuicByteCount max_size) {
163   QUICHE_DCHECK_GT(max_size, 0u);
164   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
165   QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
166   QUICHE_DCHECK(!receive_max_size_.has_value());
167 
168   absl::Status status =
169       socket_api::SetSocketBlocking(descriptor_, /*blocking=*/true);
170   if (!status.ok()) {
171     QUICHE_LOG_FIRST_N(WARNING, 100)
172         << "Failed to set socket to address: " << peer_address_.ToString()
173         << " as blocking for receive with error: " << status;
174     return status;
175   }
176 
177   receive_max_size_ = max_size;
178   absl::StatusOr<quiche::QuicheMemSlice> buffer = ReceiveInternal();
179 
180   if (!buffer.ok() && absl::IsUnavailable(buffer.status())) {
181     QUICHE_LOG_FIRST_N(ERROR, 100)
182         << "Non-blocking receive from should-be blocking socket to address:"
183         << peer_address_.ToString() << ".";
184     receive_max_size_.reset();
185   } else {
186     QUICHE_DCHECK(!receive_max_size_.has_value());
187   }
188 
189   absl::Status set_non_blocking_status =
190       socket_api::SetSocketBlocking(descriptor_, /*blocking=*/false);
191   if (!set_non_blocking_status.ok()) {
192     QUICHE_LOG_FIRST_N(WARNING, 100)
193         << "Failed to return socket to address: " << peer_address_.ToString()
194         << " to non-blocking after receive with error: "
195         << set_non_blocking_status;
196     return set_non_blocking_status;
197   }
198 
199   return buffer;
200 }
201 
ReceiveAsync(QuicByteCount max_size)202 void EventLoopConnectingClientSocket::ReceiveAsync(QuicByteCount max_size) {
203   QUICHE_DCHECK(async_visitor_);
204   QUICHE_DCHECK_GT(max_size, 0u);
205   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
206   QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
207   QUICHE_DCHECK(!receive_max_size_.has_value());
208 
209   receive_max_size_ = max_size;
210 
211   FinishOrRearmAsyncReceive(ReceiveInternal());
212 }
213 
SendBlocking(std::string data)214 absl::Status EventLoopConnectingClientSocket::SendBlocking(std::string data) {
215   QUICHE_DCHECK(!data.empty());
216   QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
217 
218   send_data_ = std::move(data);
219   return SendBlockingInternal();
220 }
221 
SendBlocking(quiche::QuicheMemSlice data)222 absl::Status EventLoopConnectingClientSocket::SendBlocking(
223     quiche::QuicheMemSlice data) {
224   QUICHE_DCHECK(!data.empty());
225   QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
226 
227   send_data_ = std::move(data);
228   return SendBlockingInternal();
229 }
230 
SendAsync(std::string data)231 void EventLoopConnectingClientSocket::SendAsync(std::string data) {
232   QUICHE_DCHECK(!data.empty());
233   QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
234 
235   send_data_ = std::move(data);
236   send_remaining_ = absl::get<std::string>(send_data_);
237 
238   FinishOrRearmAsyncSend(SendInternal());
239 }
240 
SendAsync(quiche::QuicheMemSlice data)241 void EventLoopConnectingClientSocket::SendAsync(quiche::QuicheMemSlice data) {
242   QUICHE_DCHECK(!data.empty());
243   QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
244 
245   send_data_ = std::move(data);
246   send_remaining_ =
247       absl::get<quiche::QuicheMemSlice>(send_data_).AsStringView();
248 
249   FinishOrRearmAsyncSend(SendInternal());
250 }
251 
OnSocketEvent(QuicEventLoop * event_loop,SocketFd fd,QuicSocketEventMask events)252 void EventLoopConnectingClientSocket::OnSocketEvent(
253     QuicEventLoop* event_loop, SocketFd fd, QuicSocketEventMask events) {
254   QUICHE_DCHECK_EQ(event_loop, event_loop_);
255   QUICHE_DCHECK_EQ(fd, descriptor_);
256 
257   if (connect_status_ == ConnectStatus::kConnecting &&
258       (events & (kSocketEventWritable | kSocketEventError))) {
259     FinishOrRearmAsyncConnect(GetConnectResult());
260     return;
261   }
262 
263   if (receive_max_size_.has_value() &&
264       (events & (kSocketEventReadable | kSocketEventError))) {
265     FinishOrRearmAsyncReceive(ReceiveInternal());
266   }
267   if (!send_remaining_.empty() &&
268       (events & (kSocketEventWritable | kSocketEventError))) {
269     FinishOrRearmAsyncSend(SendInternal());
270   }
271 }
272 
Open()273 absl::Status EventLoopConnectingClientSocket::Open() {
274   QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
275   QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
276   QUICHE_DCHECK(!receive_max_size_.has_value());
277   QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
278   QUICHE_DCHECK(send_remaining_.empty());
279 
280   absl::StatusOr<SocketFd> descriptor =
281       socket_api::CreateSocket(peer_address_.host().address_family(), protocol_,
282                                /*blocking=*/false);
283   if (!descriptor.ok()) {
284     QUICHE_DVLOG(1) << "Failed to open socket for connection to address: "
285                     << peer_address_.ToString()
286                     << " with error: " << descriptor.status();
287     return descriptor.status();
288   }
289   QUICHE_DCHECK_NE(*descriptor, kInvalidSocketFd);
290 
291   descriptor_ = *descriptor;
292 
293   if (async_visitor_) {
294     bool registered;
295     if (event_loop_->SupportsEdgeTriggered()) {
296       registered = event_loop_->RegisterSocket(
297           descriptor_,
298           kSocketEventReadable | kSocketEventWritable | kSocketEventError,
299           this);
300     } else {
301       // Just register the socket without any armed events for now.  Will rearm
302       // with specific events as needed.  Registering now before events are
303       // needed makes it easier to ensure the socket is registered only once
304       // and can always be unregistered on socket close.
305       registered = event_loop_->RegisterSocket(descriptor_, /*events=*/0, this);
306     }
307     QUICHE_DCHECK(registered);
308   }
309 
310   if (receive_buffer_size_ != 0) {
311     absl::Status status =
312         socket_api::SetReceiveBufferSize(descriptor_, receive_buffer_size_);
313     if (!status.ok()) {
314       QUICHE_LOG_FIRST_N(WARNING, 100)
315           << "Failed to set receive buffer size to: " << receive_buffer_size_
316           << " for socket to address: " << peer_address_.ToString()
317           << " with error: " << status;
318       Close();
319       return status;
320     }
321   }
322 
323   if (send_buffer_size_ != 0) {
324     absl::Status status =
325         socket_api::SetSendBufferSize(descriptor_, send_buffer_size_);
326     if (!status.ok()) {
327       QUICHE_LOG_FIRST_N(WARNING, 100)
328           << "Failed to set send buffer size to: " << send_buffer_size_
329           << " for socket to address: " << peer_address_.ToString()
330           << " with error: " << status;
331       Close();
332       return status;
333     }
334   }
335 
336   return absl::OkStatus();
337 }
338 
Close()339 void EventLoopConnectingClientSocket::Close() {
340   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
341 
342   bool unregistered = event_loop_->UnregisterSocket(descriptor_);
343   QUICHE_DCHECK_EQ(unregistered, !!async_visitor_);
344 
345   absl::Status status = socket_api::Close(descriptor_);
346   if (!status.ok()) {
347     QUICHE_LOG_FIRST_N(WARNING, 100)
348         << "Could not close socket to address: " << peer_address_.ToString()
349         << " with error: " << status;
350   }
351 
352   descriptor_ = kInvalidSocketFd;
353 }
354 
DoInitialConnect()355 absl::Status EventLoopConnectingClientSocket::DoInitialConnect() {
356   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
357   QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
358   QUICHE_DCHECK(!receive_max_size_.has_value());
359   QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
360 
361   absl::Status connect_result = socket_api::Connect(descriptor_, peer_address_);
362 
363   if (connect_result.ok()) {
364     connect_status_ = ConnectStatus::kConnected;
365   } else if (absl::IsUnavailable(connect_result)) {
366     connect_status_ = ConnectStatus::kConnecting;
367   } else {
368     QUICHE_DVLOG(1) << "Synchronously failed to connect socket to address: "
369                     << peer_address_.ToString()
370                     << " with error: " << connect_result;
371     Close();
372     connect_status_ = ConnectStatus::kNotConnected;
373   }
374 
375   return connect_result;
376 }
377 
GetConnectResult()378 absl::Status EventLoopConnectingClientSocket::GetConnectResult() {
379   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
380   QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnecting);
381   QUICHE_DCHECK(!receive_max_size_.has_value());
382   QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
383 
384   absl::Status error = socket_api::GetSocketError(descriptor_);
385 
386   if (!error.ok()) {
387     QUICHE_DVLOG(1) << "Asynchronously failed to connect socket to address: "
388                     << peer_address_.ToString() << " with error: " << error;
389     Close();
390     connect_status_ = ConnectStatus::kNotConnected;
391     return error;
392   }
393 
394   // Peek at one byte to confirm the connection is actually alive. Motivation:
395   // 1) Plausibly could have a lot of cases where the connection operation
396   //    itself technically succeeds but the socket then quickly fails.  Don't
397   //    want to claim connection success here if, by the time this code is
398   //    running after event triggers and such, the socket has already failed.
399   //    Lot of undefined room around whether or not such errors would be saved
400   //    into SO_ERROR and returned by socket_api::GetSocketError().
401   // 2) With the various platforms and event systems involved, less than 100%
402   //    trust that it's impossible to end up in this method before the async
403   //    connect has completed/errored. Given that Connect() and GetSocketError()
404   //    does not difinitevely differentiate between success and
405   //    still-in-progress, and given that there's a very simple and performant
406   //    way to positively confirm the socket is connected (peek), do that here.
407   //    (Could consider making the not-connected case a QUIC_BUG if a way is
408   //    found to differentiate it from (1).)
409   absl::StatusOr<bool> peek_data = OneBytePeek();
410   if (peek_data.ok() || absl::IsUnavailable(peek_data.status())) {
411     connect_status_ = ConnectStatus::kConnected;
412   } else {
413     error = peek_data.status();
414     QUICHE_LOG_FIRST_N(WARNING, 100)
415         << "Socket to address: " << peer_address_.ToString()
416         << " signalled writable after connect and no connect error found, "
417            "but socket does not appear connected with error: "
418         << error;
419     Close();
420     connect_status_ = ConnectStatus::kNotConnected;
421   }
422 
423   return error;
424 }
425 
FinishOrRearmAsyncConnect(absl::Status status)426 void EventLoopConnectingClientSocket::FinishOrRearmAsyncConnect(
427     absl::Status status) {
428   if (absl::IsUnavailable(status)) {
429     if (!event_loop_->SupportsEdgeTriggered()) {
430       bool result = event_loop_->RearmSocket(
431           descriptor_, kSocketEventWritable | kSocketEventError);
432       QUICHE_DCHECK(result);
433     }
434     QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnecting);
435   } else {
436     QUICHE_DCHECK(connect_status_ != ConnectStatus::kConnecting);
437     async_visitor_->ConnectComplete(status);
438   }
439 }
440 
441 absl::StatusOr<quiche::QuicheMemSlice>
ReceiveInternal()442 EventLoopConnectingClientSocket::ReceiveInternal() {
443   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
444   QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
445   QUICHE_CHECK(receive_max_size_.has_value());
446   QUICHE_DCHECK_GE(*receive_max_size_, 1u);
447   QUICHE_DCHECK_LE(*receive_max_size_, std::numeric_limits<size_t>::max());
448 
449   // Before allocating a buffer, do a 1-byte peek to determine if needed.
450   if (*receive_max_size_ > 1) {
451     absl::StatusOr<bool> peek_data = OneBytePeek();
452     if (!peek_data.ok()) {
453       if (!absl::IsUnavailable(peek_data.status())) {
454         receive_max_size_.reset();
455       }
456       return peek_data.status();
457     } else if (!*peek_data) {
458       receive_max_size_.reset();
459       return quiche::QuicheMemSlice();
460     }
461   }
462 
463   quiche::QuicheBuffer buffer(buffer_allocator_, *receive_max_size_);
464   absl::StatusOr<absl::Span<char>> received = socket_api::Receive(
465       descriptor_, absl::MakeSpan(buffer.data(), buffer.size()));
466 
467   if (received.ok()) {
468     QUICHE_DCHECK_LE(received->size(), buffer.size());
469     QUICHE_DCHECK_EQ(received->data(), buffer.data());
470 
471     receive_max_size_.reset();
472     return quiche::QuicheMemSlice(
473         quiche::QuicheBuffer(buffer.Release(), received->size()));
474   } else {
475     if (!absl::IsUnavailable(received.status())) {
476       QUICHE_DVLOG(1) << "Failed to receive from socket to address: "
477                       << peer_address_.ToString()
478                       << " with error: " << received.status();
479       receive_max_size_.reset();
480     }
481     return received.status();
482   }
483 }
484 
FinishOrRearmAsyncReceive(absl::StatusOr<quiche::QuicheMemSlice> buffer)485 void EventLoopConnectingClientSocket::FinishOrRearmAsyncReceive(
486     absl::StatusOr<quiche::QuicheMemSlice> buffer) {
487   QUICHE_DCHECK(async_visitor_);
488   QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
489 
490   if (!buffer.ok() && absl::IsUnavailable(buffer.status())) {
491     if (!event_loop_->SupportsEdgeTriggered()) {
492       bool result = event_loop_->RearmSocket(
493           descriptor_, kSocketEventReadable | kSocketEventError);
494       QUICHE_DCHECK(result);
495     }
496     QUICHE_DCHECK(receive_max_size_.has_value());
497   } else {
498     QUICHE_DCHECK(!receive_max_size_.has_value());
499     async_visitor_->ReceiveComplete(std::move(buffer));
500   }
501 }
502 
OneBytePeek()503 absl::StatusOr<bool> EventLoopConnectingClientSocket::OneBytePeek() {
504   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
505 
506   char peek_buffer;
507   absl::StatusOr<absl::Span<char>> peek_received = socket_api::Receive(
508       descriptor_, absl::MakeSpan(&peek_buffer, /*size=*/1), /*peek=*/true);
509   if (!peek_received.ok()) {
510     return peek_received.status();
511   } else {
512     return !peek_received->empty();
513   }
514 }
515 
SendBlockingInternal()516 absl::Status EventLoopConnectingClientSocket::SendBlockingInternal() {
517   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
518   QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
519   QUICHE_DCHECK(!absl::holds_alternative<absl::monostate>(send_data_));
520   QUICHE_DCHECK(send_remaining_.empty());
521 
522   absl::Status status =
523       socket_api::SetSocketBlocking(descriptor_, /*blocking=*/true);
524   if (!status.ok()) {
525     QUICHE_LOG_FIRST_N(WARNING, 100)
526         << "Failed to set socket to address: " << peer_address_.ToString()
527         << " as blocking for send with error: " << status;
528     send_data_ = absl::monostate();
529     return status;
530   }
531 
532   if (absl::holds_alternative<std::string>(send_data_)) {
533     send_remaining_ = absl::get<std::string>(send_data_);
534   } else {
535     send_remaining_ =
536         absl::get<quiche::QuicheMemSlice>(send_data_).AsStringView();
537   }
538 
539   status = SendInternal();
540   if (absl::IsUnavailable(status)) {
541     QUICHE_LOG_FIRST_N(ERROR, 100)
542         << "Non-blocking send for should-be blocking socket to address:"
543         << peer_address_.ToString();
544     send_data_ = absl::monostate();
545     send_remaining_ = "";
546   } else {
547     QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
548     QUICHE_DCHECK(send_remaining_.empty());
549   }
550 
551   absl::Status set_non_blocking_status =
552       socket_api::SetSocketBlocking(descriptor_, /*blocking=*/false);
553   if (!set_non_blocking_status.ok()) {
554     QUICHE_LOG_FIRST_N(WARNING, 100)
555         << "Failed to return socket to address: " << peer_address_.ToString()
556         << " to non-blocking after send with error: "
557         << set_non_blocking_status;
558     return set_non_blocking_status;
559   }
560 
561   return status;
562 }
563 
SendInternal()564 absl::Status EventLoopConnectingClientSocket::SendInternal() {
565   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
566   QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
567   QUICHE_DCHECK(!absl::holds_alternative<absl::monostate>(send_data_));
568   QUICHE_DCHECK(!send_remaining_.empty());
569 
570   // Repeat send until all data sent, unavailable, or error.
571   while (!send_remaining_.empty()) {
572     absl::StatusOr<absl::string_view> remainder =
573         socket_api::Send(descriptor_, send_remaining_);
574 
575     if (remainder.ok()) {
576       QUICHE_DCHECK(remainder->empty() ||
577                     (remainder->data() >= send_remaining_.data() &&
578                      remainder->data() <
579                          send_remaining_.data() + send_remaining_.size()));
580       QUICHE_DCHECK(remainder->empty() ||
581                     (remainder->data() + remainder->size() ==
582                      send_remaining_.data() + send_remaining_.size()));
583       send_remaining_ = *remainder;
584     } else {
585       if (!absl::IsUnavailable(remainder.status())) {
586         QUICHE_DVLOG(1) << "Failed to send to socket to address: "
587                         << peer_address_.ToString()
588                         << " with error: " << remainder.status();
589         send_data_ = absl::monostate();
590         send_remaining_ = "";
591       }
592       return remainder.status();
593     }
594   }
595 
596   send_data_ = absl::monostate();
597   return absl::OkStatus();
598 }
599 
FinishOrRearmAsyncSend(absl::Status status)600 void EventLoopConnectingClientSocket::FinishOrRearmAsyncSend(
601     absl::Status status) {
602   QUICHE_DCHECK(async_visitor_);
603   QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
604 
605   if (absl::IsUnavailable(status)) {
606     if (!event_loop_->SupportsEdgeTriggered()) {
607       bool result = event_loop_->RearmSocket(
608           descriptor_, kSocketEventWritable | kSocketEventError);
609       QUICHE_DCHECK(result);
610     }
611     QUICHE_DCHECK(!absl::holds_alternative<absl::monostate>(send_data_));
612     QUICHE_DCHECK(!send_remaining_.empty());
613   } else {
614     QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
615     QUICHE_DCHECK(send_remaining_.empty());
616     async_visitor_->SendComplete(status);
617   }
618 }
619 
620 }  // namespace quic
621