1 // Copyright (c) 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/quic/core/batch_writer/quic_batch_writer_base.h"
6
7 #include <cstdint>
8
9 #include "quiche/quic/platform/api/quic_export.h"
10 #include "quiche/quic/platform/api/quic_flags.h"
11 #include "quiche/quic/platform/api/quic_server_stats.h"
12
13 namespace quic {
14
QuicBatchWriterBase(std::unique_ptr<QuicBatchWriterBuffer> batch_buffer)15 QuicBatchWriterBase::QuicBatchWriterBase(
16 std::unique_ptr<QuicBatchWriterBuffer> batch_buffer)
17 : write_blocked_(false), batch_buffer_(std::move(batch_buffer)) {}
18
WritePacket(const char * buffer,size_t buf_len,const QuicIpAddress & self_address,const QuicSocketAddress & peer_address,PerPacketOptions * options,const QuicPacketWriterParams & params)19 WriteResult QuicBatchWriterBase::WritePacket(
20 const char* buffer, size_t buf_len, const QuicIpAddress& self_address,
21 const QuicSocketAddress& peer_address, PerPacketOptions* options,
22 const QuicPacketWriterParams& params) {
23 const WriteResult result = InternalWritePacket(buffer, buf_len, self_address,
24 peer_address, options, params);
25
26 if (IsWriteBlockedStatus(result.status)) {
27 write_blocked_ = true;
28 }
29
30 return result;
31 }
32
InternalWritePacket(const char * buffer,size_t buf_len,const QuicIpAddress & self_address,const QuicSocketAddress & peer_address,PerPacketOptions * options,const QuicPacketWriterParams & params)33 WriteResult QuicBatchWriterBase::InternalWritePacket(
34 const char* buffer, size_t buf_len, const QuicIpAddress& self_address,
35 const QuicSocketAddress& peer_address, PerPacketOptions* options,
36 const QuicPacketWriterParams& params) {
37 if (buf_len > kMaxOutgoingPacketSize) {
38 return WriteResult(WRITE_STATUS_MSG_TOO_BIG, EMSGSIZE);
39 }
40
41 ReleaseTime release_time{0, QuicTime::Delta::Zero()};
42 if (SupportsReleaseTime()) {
43 release_time = GetReleaseTime(params);
44 if (release_time.release_time_offset >= QuicTime::Delta::Zero()) {
45 QUIC_SERVER_HISTOGRAM_TIMES(
46 "batch_writer_positive_release_time_offset",
47 release_time.release_time_offset.ToMicroseconds(), 1, 100000, 50,
48 "Duration from ideal release time to actual "
49 "release time, in microseconds.");
50 } else {
51 QUIC_SERVER_HISTOGRAM_TIMES(
52 "batch_writer_negative_release_time_offset",
53 -release_time.release_time_offset.ToMicroseconds(), 1, 100000, 50,
54 "Duration from actual release time to ideal "
55 "release time, in microseconds.");
56 }
57 }
58
59 const CanBatchResult can_batch_result =
60 CanBatch(buffer, buf_len, self_address, peer_address, options, params,
61 release_time.actual_release_time);
62
63 bool buffered = false;
64 bool flush = can_batch_result.must_flush;
65 uint32_t packet_batch_id = 0;
66
67 if (can_batch_result.can_batch) {
68 QuicBatchWriterBuffer::PushResult push_result =
69 batch_buffer_->PushBufferedWrite(buffer, buf_len, self_address,
70 peer_address, options, params,
71 release_time.actual_release_time);
72 if (push_result.succeeded) {
73 buffered = true;
74 // If there's no space left after the packet is buffered, force a flush.
75 flush = flush || (batch_buffer_->GetNextWriteLocation() == nullptr);
76 packet_batch_id = push_result.batch_id;
77 } else {
78 // If there's no space without this packet, force a flush.
79 flush = true;
80 }
81 }
82
83 if (!flush) {
84 WriteResult result(WRITE_STATUS_OK, 0);
85 result.send_time_offset = release_time.release_time_offset;
86 return result;
87 }
88
89 size_t num_buffered_packets = buffered_writes().size();
90 const FlushImplResult flush_result = CheckedFlush();
91 WriteResult result = flush_result.write_result;
92 QUIC_DVLOG(1) << "Internally flushed " << flush_result.num_packets_sent
93 << " out of " << num_buffered_packets
94 << " packets. WriteResult=" << result;
95
96 if (result.status != WRITE_STATUS_OK) {
97 if (IsWriteBlockedStatus(result.status)) {
98 return WriteResult(buffered ? WRITE_STATUS_BLOCKED_DATA_BUFFERED
99 : WRITE_STATUS_BLOCKED,
100 result.error_code)
101 .set_batch_id(packet_batch_id);
102 }
103
104 // Drop all packets, including the one being written.
105 size_t dropped_packets =
106 buffered ? buffered_writes().size() : buffered_writes().size() + 1;
107
108 batch_buffer().Clear();
109 result.dropped_packets =
110 dropped_packets > std::numeric_limits<uint16_t>::max()
111 ? std::numeric_limits<uint16_t>::max()
112 : static_cast<uint16_t>(dropped_packets);
113 return result;
114 }
115
116 if (!buffered) {
117 QuicBatchWriterBuffer::PushResult push_result =
118 batch_buffer_->PushBufferedWrite(buffer, buf_len, self_address,
119 peer_address, options, params,
120 release_time.actual_release_time);
121 buffered = push_result.succeeded;
122 packet_batch_id = push_result.batch_id;
123
124 // Since buffered_writes has been emptied, this write must have been
125 // buffered successfully.
126 QUIC_BUG_IF(quic_bug_10826_1, !buffered)
127 << "Failed to push to an empty batch buffer."
128 << " self_addr:" << self_address.ToString()
129 << ", peer_addr:" << peer_address.ToString() << ", buf_len:" << buf_len;
130 }
131
132 result.send_time_offset = release_time.release_time_offset;
133 result.batch_id = packet_batch_id;
134 return result;
135 }
136
CheckedFlush()137 QuicBatchWriterBase::FlushImplResult QuicBatchWriterBase::CheckedFlush() {
138 if (buffered_writes().empty()) {
139 return FlushImplResult{WriteResult(WRITE_STATUS_OK, 0),
140 /*num_packets_sent=*/0, /*bytes_written=*/0};
141 }
142
143 const FlushImplResult flush_result = FlushImpl();
144
145 // Either flush_result.write_result.status is not WRITE_STATUS_OK, or it is
146 // WRITE_STATUS_OK and batch_buffer is empty.
147 QUICHE_DCHECK(flush_result.write_result.status != WRITE_STATUS_OK ||
148 buffered_writes().empty());
149
150 // Flush should never return WRITE_STATUS_BLOCKED_DATA_BUFFERED.
151 QUICHE_DCHECK(flush_result.write_result.status !=
152 WRITE_STATUS_BLOCKED_DATA_BUFFERED);
153
154 return flush_result;
155 }
156
Flush()157 WriteResult QuicBatchWriterBase::Flush() {
158 size_t num_buffered_packets = buffered_writes().size();
159 FlushImplResult flush_result = CheckedFlush();
160 QUIC_DVLOG(1) << "Externally flushed " << flush_result.num_packets_sent
161 << " out of " << num_buffered_packets
162 << " packets. WriteResult=" << flush_result.write_result;
163
164 if (IsWriteError(flush_result.write_result.status)) {
165 if (buffered_writes().size() > std::numeric_limits<uint16_t>::max()) {
166 flush_result.write_result.dropped_packets =
167 std::numeric_limits<uint16_t>::max();
168 } else {
169 flush_result.write_result.dropped_packets =
170 static_cast<uint16_t>(buffered_writes().size());
171 }
172 // Treat all errors as non-retryable fatal errors. Drop all buffered packets
173 // to avoid sending them and getting the same error again.
174 batch_buffer().Clear();
175 }
176
177 if (flush_result.write_result.status == WRITE_STATUS_BLOCKED) {
178 write_blocked_ = true;
179 }
180 return flush_result.write_result;
181 }
182
183 } // namespace quic
184