1 // Copyright 2022 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/quic/core/io/event_loop_connecting_client_socket.h"
6
7 #include <limits>
8 #include <string>
9 #include <utility>
10
11 #include "absl/status/status.h"
12 #include "absl/status/statusor.h"
13 #include "absl/strings/string_view.h"
14 #include "absl/types/span.h"
15 #include "absl/types/variant.h"
16 #include "quiche/quic/core/io/quic_event_loop.h"
17 #include "quiche/quic/core/io/socket.h"
18 #include "quiche/quic/platform/api/quic_socket_address.h"
19 #include "quiche/common/platform/api/quiche_logging.h"
20 #include "quiche/common/platform/api/quiche_mem_slice.h"
21
22 namespace quic {
23
EventLoopConnectingClientSocket(socket_api::SocketProtocol protocol,const quic::QuicSocketAddress & peer_address,QuicByteCount receive_buffer_size,QuicByteCount send_buffer_size,QuicEventLoop * event_loop,quiche::QuicheBufferAllocator * buffer_allocator,AsyncVisitor * async_visitor)24 EventLoopConnectingClientSocket::EventLoopConnectingClientSocket(
25 socket_api::SocketProtocol protocol,
26 const quic::QuicSocketAddress& peer_address,
27 QuicByteCount receive_buffer_size, QuicByteCount send_buffer_size,
28 QuicEventLoop* event_loop, quiche::QuicheBufferAllocator* buffer_allocator,
29 AsyncVisitor* async_visitor)
30 : protocol_(protocol),
31 peer_address_(peer_address),
32 receive_buffer_size_(receive_buffer_size),
33 send_buffer_size_(send_buffer_size),
34 event_loop_(event_loop),
35 buffer_allocator_(buffer_allocator),
36 async_visitor_(async_visitor) {
37 QUICHE_DCHECK(event_loop_);
38 QUICHE_DCHECK(buffer_allocator_);
39 }
40
~EventLoopConnectingClientSocket()41 EventLoopConnectingClientSocket::~EventLoopConnectingClientSocket() {
42 // Connected socket must be closed via Disconnect() before destruction. Cannot
43 // safely recover if state indicates caller may be expecting async callbacks.
44 QUICHE_DCHECK(connect_status_ != ConnectStatus::kConnecting);
45 QUICHE_DCHECK(!receive_max_size_.has_value());
46 QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
47 if (descriptor_ != kInvalidSocketFd) {
48 QUICHE_BUG(quic_event_loop_connecting_socket_invalid_destruction)
49 << "Must call Disconnect() on connected socket before destruction.";
50 Close();
51 }
52
53 QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
54 QUICHE_DCHECK(send_remaining_.empty());
55 }
56
ConnectBlocking()57 absl::Status EventLoopConnectingClientSocket::ConnectBlocking() {
58 QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
59 QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
60 QUICHE_DCHECK(!receive_max_size_.has_value());
61 QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
62
63 absl::Status status = Open();
64 if (!status.ok()) {
65 return status;
66 }
67
68 status = socket_api::SetSocketBlocking(descriptor_, /*blocking=*/true);
69 if (!status.ok()) {
70 QUICHE_LOG_FIRST_N(WARNING, 100)
71 << "Failed to set socket to address: " << peer_address_.ToString()
72 << " as blocking for connect with error: " << status;
73 Close();
74 return status;
75 }
76
77 status = DoInitialConnect();
78
79 if (absl::IsUnavailable(status)) {
80 QUICHE_LOG_FIRST_N(ERROR, 100)
81 << "Non-blocking connect to should-be blocking socket to address:"
82 << peer_address_.ToString() << ".";
83 Close();
84 connect_status_ = ConnectStatus::kNotConnected;
85 return status;
86 } else if (!status.ok()) {
87 // DoInitialConnect() closes the socket on failures.
88 QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
89 QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
90 return status;
91 }
92
93 status = socket_api::SetSocketBlocking(descriptor_, /*blocking=*/false);
94 if (!status.ok()) {
95 QUICHE_LOG_FIRST_N(WARNING, 100)
96 << "Failed to return socket to address: " << peer_address_.ToString()
97 << " to non-blocking after connect with error: " << status;
98 Close();
99 connect_status_ = ConnectStatus::kNotConnected;
100 }
101
102 QUICHE_DCHECK(connect_status_ != ConnectStatus::kConnecting);
103 return status;
104 }
105
ConnectAsync()106 void EventLoopConnectingClientSocket::ConnectAsync() {
107 QUICHE_DCHECK(async_visitor_);
108 QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
109 QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
110 QUICHE_DCHECK(!receive_max_size_.has_value());
111 QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
112
113 absl::Status status = Open();
114 if (!status.ok()) {
115 async_visitor_->ConnectComplete(status);
116 return;
117 }
118
119 FinishOrRearmAsyncConnect(DoInitialConnect());
120 }
121
Disconnect()122 void EventLoopConnectingClientSocket::Disconnect() {
123 QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
124 QUICHE_DCHECK(connect_status_ != ConnectStatus::kNotConnected);
125
126 Close();
127 QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
128
129 // Reset all state before invoking any callbacks.
130 bool require_connect_callback = connect_status_ == ConnectStatus::kConnecting;
131 connect_status_ = ConnectStatus::kNotConnected;
132 bool require_receive_callback = receive_max_size_.has_value();
133 receive_max_size_.reset();
134 bool require_send_callback =
135 !absl::holds_alternative<absl::monostate>(send_data_);
136 send_data_ = absl::monostate();
137 send_remaining_ = "";
138
139 if (require_connect_callback) {
140 QUICHE_DCHECK(async_visitor_);
141 async_visitor_->ConnectComplete(absl::CancelledError());
142 }
143 if (require_receive_callback) {
144 QUICHE_DCHECK(async_visitor_);
145 async_visitor_->ReceiveComplete(absl::CancelledError());
146 }
147 if (require_send_callback) {
148 QUICHE_DCHECK(async_visitor_);
149 async_visitor_->SendComplete(absl::CancelledError());
150 }
151 }
152
153 absl::StatusOr<QuicSocketAddress>
GetLocalAddress()154 EventLoopConnectingClientSocket::GetLocalAddress() {
155 QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
156 QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
157
158 return socket_api::GetSocketAddress(descriptor_);
159 }
160
161 absl::StatusOr<quiche::QuicheMemSlice>
ReceiveBlocking(QuicByteCount max_size)162 EventLoopConnectingClientSocket::ReceiveBlocking(QuicByteCount max_size) {
163 QUICHE_DCHECK_GT(max_size, 0u);
164 QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
165 QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
166 QUICHE_DCHECK(!receive_max_size_.has_value());
167
168 absl::Status status =
169 socket_api::SetSocketBlocking(descriptor_, /*blocking=*/true);
170 if (!status.ok()) {
171 QUICHE_LOG_FIRST_N(WARNING, 100)
172 << "Failed to set socket to address: " << peer_address_.ToString()
173 << " as blocking for receive with error: " << status;
174 return status;
175 }
176
177 receive_max_size_ = max_size;
178 absl::StatusOr<quiche::QuicheMemSlice> buffer = ReceiveInternal();
179
180 if (!buffer.ok() && absl::IsUnavailable(buffer.status())) {
181 QUICHE_LOG_FIRST_N(ERROR, 100)
182 << "Non-blocking receive from should-be blocking socket to address:"
183 << peer_address_.ToString() << ".";
184 receive_max_size_.reset();
185 } else {
186 QUICHE_DCHECK(!receive_max_size_.has_value());
187 }
188
189 absl::Status set_non_blocking_status =
190 socket_api::SetSocketBlocking(descriptor_, /*blocking=*/false);
191 if (!set_non_blocking_status.ok()) {
192 QUICHE_LOG_FIRST_N(WARNING, 100)
193 << "Failed to return socket to address: " << peer_address_.ToString()
194 << " to non-blocking after receive with error: "
195 << set_non_blocking_status;
196 return set_non_blocking_status;
197 }
198
199 return buffer;
200 }
201
ReceiveAsync(QuicByteCount max_size)202 void EventLoopConnectingClientSocket::ReceiveAsync(QuicByteCount max_size) {
203 QUICHE_DCHECK(async_visitor_);
204 QUICHE_DCHECK_GT(max_size, 0u);
205 QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
206 QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
207 QUICHE_DCHECK(!receive_max_size_.has_value());
208
209 receive_max_size_ = max_size;
210
211 FinishOrRearmAsyncReceive(ReceiveInternal());
212 }
213
SendBlocking(std::string data)214 absl::Status EventLoopConnectingClientSocket::SendBlocking(std::string data) {
215 QUICHE_DCHECK(!data.empty());
216 QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
217
218 send_data_ = std::move(data);
219 return SendBlockingInternal();
220 }
221
SendBlocking(quiche::QuicheMemSlice data)222 absl::Status EventLoopConnectingClientSocket::SendBlocking(
223 quiche::QuicheMemSlice data) {
224 QUICHE_DCHECK(!data.empty());
225 QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
226
227 send_data_ = std::move(data);
228 return SendBlockingInternal();
229 }
230
SendAsync(std::string data)231 void EventLoopConnectingClientSocket::SendAsync(std::string data) {
232 QUICHE_DCHECK(!data.empty());
233 QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
234
235 send_data_ = std::move(data);
236 send_remaining_ = absl::get<std::string>(send_data_);
237
238 FinishOrRearmAsyncSend(SendInternal());
239 }
240
SendAsync(quiche::QuicheMemSlice data)241 void EventLoopConnectingClientSocket::SendAsync(quiche::QuicheMemSlice data) {
242 QUICHE_DCHECK(!data.empty());
243 QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
244
245 send_data_ = std::move(data);
246 send_remaining_ =
247 absl::get<quiche::QuicheMemSlice>(send_data_).AsStringView();
248
249 FinishOrRearmAsyncSend(SendInternal());
250 }
251
OnSocketEvent(QuicEventLoop * event_loop,SocketFd fd,QuicSocketEventMask events)252 void EventLoopConnectingClientSocket::OnSocketEvent(
253 QuicEventLoop* event_loop, SocketFd fd, QuicSocketEventMask events) {
254 QUICHE_DCHECK_EQ(event_loop, event_loop_);
255 QUICHE_DCHECK_EQ(fd, descriptor_);
256
257 if (connect_status_ == ConnectStatus::kConnecting &&
258 (events & (kSocketEventWritable | kSocketEventError))) {
259 FinishOrRearmAsyncConnect(GetConnectResult());
260 return;
261 }
262
263 if (receive_max_size_.has_value() &&
264 (events & (kSocketEventReadable | kSocketEventError))) {
265 FinishOrRearmAsyncReceive(ReceiveInternal());
266 }
267 if (!send_remaining_.empty() &&
268 (events & (kSocketEventWritable | kSocketEventError))) {
269 FinishOrRearmAsyncSend(SendInternal());
270 }
271 }
272
Open()273 absl::Status EventLoopConnectingClientSocket::Open() {
274 QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
275 QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
276 QUICHE_DCHECK(!receive_max_size_.has_value());
277 QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
278 QUICHE_DCHECK(send_remaining_.empty());
279
280 absl::StatusOr<SocketFd> descriptor =
281 socket_api::CreateSocket(peer_address_.host().address_family(), protocol_,
282 /*blocking=*/false);
283 if (!descriptor.ok()) {
284 QUICHE_DVLOG(1) << "Failed to open socket for connection to address: "
285 << peer_address_.ToString()
286 << " with error: " << descriptor.status();
287 return descriptor.status();
288 }
289 QUICHE_DCHECK_NE(*descriptor, kInvalidSocketFd);
290
291 descriptor_ = *descriptor;
292
293 if (async_visitor_) {
294 bool registered;
295 if (event_loop_->SupportsEdgeTriggered()) {
296 registered = event_loop_->RegisterSocket(
297 descriptor_,
298 kSocketEventReadable | kSocketEventWritable | kSocketEventError,
299 this);
300 } else {
301 // Just register the socket without any armed events for now. Will rearm
302 // with specific events as needed. Registering now before events are
303 // needed makes it easier to ensure the socket is registered only once
304 // and can always be unregistered on socket close.
305 registered = event_loop_->RegisterSocket(descriptor_, /*events=*/0, this);
306 }
307 QUICHE_DCHECK(registered);
308 }
309
310 if (receive_buffer_size_ != 0) {
311 absl::Status status =
312 socket_api::SetReceiveBufferSize(descriptor_, receive_buffer_size_);
313 if (!status.ok()) {
314 QUICHE_LOG_FIRST_N(WARNING, 100)
315 << "Failed to set receive buffer size to: " << receive_buffer_size_
316 << " for socket to address: " << peer_address_.ToString()
317 << " with error: " << status;
318 Close();
319 return status;
320 }
321 }
322
323 if (send_buffer_size_ != 0) {
324 absl::Status status =
325 socket_api::SetSendBufferSize(descriptor_, send_buffer_size_);
326 if (!status.ok()) {
327 QUICHE_LOG_FIRST_N(WARNING, 100)
328 << "Failed to set send buffer size to: " << send_buffer_size_
329 << " for socket to address: " << peer_address_.ToString()
330 << " with error: " << status;
331 Close();
332 return status;
333 }
334 }
335
336 return absl::OkStatus();
337 }
338
Close()339 void EventLoopConnectingClientSocket::Close() {
340 QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
341
342 bool unregistered = event_loop_->UnregisterSocket(descriptor_);
343 QUICHE_DCHECK_EQ(unregistered, !!async_visitor_);
344
345 absl::Status status = socket_api::Close(descriptor_);
346 if (!status.ok()) {
347 QUICHE_LOG_FIRST_N(WARNING, 100)
348 << "Could not close socket to address: " << peer_address_.ToString()
349 << " with error: " << status;
350 }
351
352 descriptor_ = kInvalidSocketFd;
353 }
354
DoInitialConnect()355 absl::Status EventLoopConnectingClientSocket::DoInitialConnect() {
356 QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
357 QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
358 QUICHE_DCHECK(!receive_max_size_.has_value());
359 QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
360
361 absl::Status connect_result = socket_api::Connect(descriptor_, peer_address_);
362
363 if (connect_result.ok()) {
364 connect_status_ = ConnectStatus::kConnected;
365 } else if (absl::IsUnavailable(connect_result)) {
366 connect_status_ = ConnectStatus::kConnecting;
367 } else {
368 QUICHE_DVLOG(1) << "Synchronously failed to connect socket to address: "
369 << peer_address_.ToString()
370 << " with error: " << connect_result;
371 Close();
372 connect_status_ = ConnectStatus::kNotConnected;
373 }
374
375 return connect_result;
376 }
377
GetConnectResult()378 absl::Status EventLoopConnectingClientSocket::GetConnectResult() {
379 QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
380 QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnecting);
381 QUICHE_DCHECK(!receive_max_size_.has_value());
382 QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
383
384 absl::Status error = socket_api::GetSocketError(descriptor_);
385
386 if (!error.ok()) {
387 QUICHE_DVLOG(1) << "Asynchronously failed to connect socket to address: "
388 << peer_address_.ToString() << " with error: " << error;
389 Close();
390 connect_status_ = ConnectStatus::kNotConnected;
391 return error;
392 }
393
394 // Peek at one byte to confirm the connection is actually alive. Motivation:
395 // 1) Plausibly could have a lot of cases where the connection operation
396 // itself technically succeeds but the socket then quickly fails. Don't
397 // want to claim connection success here if, by the time this code is
398 // running after event triggers and such, the socket has already failed.
399 // Lot of undefined room around whether or not such errors would be saved
400 // into SO_ERROR and returned by socket_api::GetSocketError().
401 // 2) With the various platforms and event systems involved, less than 100%
402 // trust that it's impossible to end up in this method before the async
403 // connect has completed/errored. Given that Connect() and GetSocketError()
404 // does not difinitevely differentiate between success and
405 // still-in-progress, and given that there's a very simple and performant
406 // way to positively confirm the socket is connected (peek), do that here.
407 // (Could consider making the not-connected case a QUIC_BUG if a way is
408 // found to differentiate it from (1).)
409 absl::StatusOr<bool> peek_data = OneBytePeek();
410 if (peek_data.ok() || absl::IsUnavailable(peek_data.status())) {
411 connect_status_ = ConnectStatus::kConnected;
412 } else {
413 error = peek_data.status();
414 QUICHE_LOG_FIRST_N(WARNING, 100)
415 << "Socket to address: " << peer_address_.ToString()
416 << " signalled writable after connect and no connect error found, "
417 "but socket does not appear connected with error: "
418 << error;
419 Close();
420 connect_status_ = ConnectStatus::kNotConnected;
421 }
422
423 return error;
424 }
425
FinishOrRearmAsyncConnect(absl::Status status)426 void EventLoopConnectingClientSocket::FinishOrRearmAsyncConnect(
427 absl::Status status) {
428 if (absl::IsUnavailable(status)) {
429 if (!event_loop_->SupportsEdgeTriggered()) {
430 bool result = event_loop_->RearmSocket(
431 descriptor_, kSocketEventWritable | kSocketEventError);
432 QUICHE_DCHECK(result);
433 }
434 QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnecting);
435 } else {
436 QUICHE_DCHECK(connect_status_ != ConnectStatus::kConnecting);
437 async_visitor_->ConnectComplete(status);
438 }
439 }
440
441 absl::StatusOr<quiche::QuicheMemSlice>
ReceiveInternal()442 EventLoopConnectingClientSocket::ReceiveInternal() {
443 QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
444 QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
445 QUICHE_CHECK(receive_max_size_.has_value());
446 QUICHE_DCHECK_GE(*receive_max_size_, 1u);
447 QUICHE_DCHECK_LE(*receive_max_size_, std::numeric_limits<size_t>::max());
448
449 // Before allocating a buffer, do a 1-byte peek to determine if needed.
450 if (*receive_max_size_ > 1) {
451 absl::StatusOr<bool> peek_data = OneBytePeek();
452 if (!peek_data.ok()) {
453 if (!absl::IsUnavailable(peek_data.status())) {
454 receive_max_size_.reset();
455 }
456 return peek_data.status();
457 } else if (!*peek_data) {
458 receive_max_size_.reset();
459 return quiche::QuicheMemSlice();
460 }
461 }
462
463 quiche::QuicheBuffer buffer(buffer_allocator_, *receive_max_size_);
464 absl::StatusOr<absl::Span<char>> received = socket_api::Receive(
465 descriptor_, absl::MakeSpan(buffer.data(), buffer.size()));
466
467 if (received.ok()) {
468 QUICHE_DCHECK_LE(received->size(), buffer.size());
469 QUICHE_DCHECK_EQ(received->data(), buffer.data());
470
471 receive_max_size_.reset();
472 return quiche::QuicheMemSlice(
473 quiche::QuicheBuffer(buffer.Release(), received->size()));
474 } else {
475 if (!absl::IsUnavailable(received.status())) {
476 QUICHE_DVLOG(1) << "Failed to receive from socket to address: "
477 << peer_address_.ToString()
478 << " with error: " << received.status();
479 receive_max_size_.reset();
480 }
481 return received.status();
482 }
483 }
484
FinishOrRearmAsyncReceive(absl::StatusOr<quiche::QuicheMemSlice> buffer)485 void EventLoopConnectingClientSocket::FinishOrRearmAsyncReceive(
486 absl::StatusOr<quiche::QuicheMemSlice> buffer) {
487 QUICHE_DCHECK(async_visitor_);
488 QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
489
490 if (!buffer.ok() && absl::IsUnavailable(buffer.status())) {
491 if (!event_loop_->SupportsEdgeTriggered()) {
492 bool result = event_loop_->RearmSocket(
493 descriptor_, kSocketEventReadable | kSocketEventError);
494 QUICHE_DCHECK(result);
495 }
496 QUICHE_DCHECK(receive_max_size_.has_value());
497 } else {
498 QUICHE_DCHECK(!receive_max_size_.has_value());
499 async_visitor_->ReceiveComplete(std::move(buffer));
500 }
501 }
502
OneBytePeek()503 absl::StatusOr<bool> EventLoopConnectingClientSocket::OneBytePeek() {
504 QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
505
506 char peek_buffer;
507 absl::StatusOr<absl::Span<char>> peek_received = socket_api::Receive(
508 descriptor_, absl::MakeSpan(&peek_buffer, /*size=*/1), /*peek=*/true);
509 if (!peek_received.ok()) {
510 return peek_received.status();
511 } else {
512 return !peek_received->empty();
513 }
514 }
515
SendBlockingInternal()516 absl::Status EventLoopConnectingClientSocket::SendBlockingInternal() {
517 QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
518 QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
519 QUICHE_DCHECK(!absl::holds_alternative<absl::monostate>(send_data_));
520 QUICHE_DCHECK(send_remaining_.empty());
521
522 absl::Status status =
523 socket_api::SetSocketBlocking(descriptor_, /*blocking=*/true);
524 if (!status.ok()) {
525 QUICHE_LOG_FIRST_N(WARNING, 100)
526 << "Failed to set socket to address: " << peer_address_.ToString()
527 << " as blocking for send with error: " << status;
528 send_data_ = absl::monostate();
529 return status;
530 }
531
532 if (absl::holds_alternative<std::string>(send_data_)) {
533 send_remaining_ = absl::get<std::string>(send_data_);
534 } else {
535 send_remaining_ =
536 absl::get<quiche::QuicheMemSlice>(send_data_).AsStringView();
537 }
538
539 status = SendInternal();
540 if (absl::IsUnavailable(status)) {
541 QUICHE_LOG_FIRST_N(ERROR, 100)
542 << "Non-blocking send for should-be blocking socket to address:"
543 << peer_address_.ToString();
544 send_data_ = absl::monostate();
545 send_remaining_ = "";
546 } else {
547 QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
548 QUICHE_DCHECK(send_remaining_.empty());
549 }
550
551 absl::Status set_non_blocking_status =
552 socket_api::SetSocketBlocking(descriptor_, /*blocking=*/false);
553 if (!set_non_blocking_status.ok()) {
554 QUICHE_LOG_FIRST_N(WARNING, 100)
555 << "Failed to return socket to address: " << peer_address_.ToString()
556 << " to non-blocking after send with error: "
557 << set_non_blocking_status;
558 return set_non_blocking_status;
559 }
560
561 return status;
562 }
563
SendInternal()564 absl::Status EventLoopConnectingClientSocket::SendInternal() {
565 QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
566 QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
567 QUICHE_DCHECK(!absl::holds_alternative<absl::monostate>(send_data_));
568 QUICHE_DCHECK(!send_remaining_.empty());
569
570 // Repeat send until all data sent, unavailable, or error.
571 while (!send_remaining_.empty()) {
572 absl::StatusOr<absl::string_view> remainder =
573 socket_api::Send(descriptor_, send_remaining_);
574
575 if (remainder.ok()) {
576 QUICHE_DCHECK(remainder->empty() ||
577 (remainder->data() >= send_remaining_.data() &&
578 remainder->data() <
579 send_remaining_.data() + send_remaining_.size()));
580 QUICHE_DCHECK(remainder->empty() ||
581 (remainder->data() + remainder->size() ==
582 send_remaining_.data() + send_remaining_.size()));
583 send_remaining_ = *remainder;
584 } else {
585 if (!absl::IsUnavailable(remainder.status())) {
586 QUICHE_DVLOG(1) << "Failed to send to socket to address: "
587 << peer_address_.ToString()
588 << " with error: " << remainder.status();
589 send_data_ = absl::monostate();
590 send_remaining_ = "";
591 }
592 return remainder.status();
593 }
594 }
595
596 send_data_ = absl::monostate();
597 return absl::OkStatus();
598 }
599
FinishOrRearmAsyncSend(absl::Status status)600 void EventLoopConnectingClientSocket::FinishOrRearmAsyncSend(
601 absl::Status status) {
602 QUICHE_DCHECK(async_visitor_);
603 QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
604
605 if (absl::IsUnavailable(status)) {
606 if (!event_loop_->SupportsEdgeTriggered()) {
607 bool result = event_loop_->RearmSocket(
608 descriptor_, kSocketEventWritable | kSocketEventError);
609 QUICHE_DCHECK(result);
610 }
611 QUICHE_DCHECK(!absl::holds_alternative<absl::monostate>(send_data_));
612 QUICHE_DCHECK(!send_remaining_.empty());
613 } else {
614 QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
615 QUICHE_DCHECK(send_remaining_.empty());
616 async_visitor_->SendComplete(status);
617 }
618 }
619
620 } // namespace quic
621