1 // Copyright (c) 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/quic/core/batch_writer/quic_batch_writer_base.h"
6 
7 #include <cstdint>
8 
9 #include "quiche/quic/platform/api/quic_export.h"
10 #include "quiche/quic/platform/api/quic_flags.h"
11 #include "quiche/quic/platform/api/quic_server_stats.h"
12 
13 namespace quic {
14 
QuicBatchWriterBase(std::unique_ptr<QuicBatchWriterBuffer> batch_buffer)15 QuicBatchWriterBase::QuicBatchWriterBase(
16     std::unique_ptr<QuicBatchWriterBuffer> batch_buffer)
17     : write_blocked_(false), batch_buffer_(std::move(batch_buffer)) {}
18 
WritePacket(const char * buffer,size_t buf_len,const QuicIpAddress & self_address,const QuicSocketAddress & peer_address,PerPacketOptions * options,const QuicPacketWriterParams & params)19 WriteResult QuicBatchWriterBase::WritePacket(
20     const char* buffer, size_t buf_len, const QuicIpAddress& self_address,
21     const QuicSocketAddress& peer_address, PerPacketOptions* options,
22     const QuicPacketWriterParams& params) {
23   const WriteResult result = InternalWritePacket(buffer, buf_len, self_address,
24                                                  peer_address, options, params);
25 
26   if (IsWriteBlockedStatus(result.status)) {
27     write_blocked_ = true;
28   }
29 
30   return result;
31 }
32 
InternalWritePacket(const char * buffer,size_t buf_len,const QuicIpAddress & self_address,const QuicSocketAddress & peer_address,PerPacketOptions * options,const QuicPacketWriterParams & params)33 WriteResult QuicBatchWriterBase::InternalWritePacket(
34     const char* buffer, size_t buf_len, const QuicIpAddress& self_address,
35     const QuicSocketAddress& peer_address, PerPacketOptions* options,
36     const QuicPacketWriterParams& params) {
37   if (buf_len > kMaxOutgoingPacketSize) {
38     return WriteResult(WRITE_STATUS_MSG_TOO_BIG, EMSGSIZE);
39   }
40 
41   ReleaseTime release_time{0, QuicTime::Delta::Zero()};
42   if (SupportsReleaseTime()) {
43     release_time = GetReleaseTime(params);
44     if (release_time.release_time_offset >= QuicTime::Delta::Zero()) {
45       QUIC_SERVER_HISTOGRAM_TIMES(
46           "batch_writer_positive_release_time_offset",
47           release_time.release_time_offset.ToMicroseconds(), 1, 100000, 50,
48           "Duration from ideal release time to actual "
49           "release time, in microseconds.");
50     } else {
51       QUIC_SERVER_HISTOGRAM_TIMES(
52           "batch_writer_negative_release_time_offset",
53           -release_time.release_time_offset.ToMicroseconds(), 1, 100000, 50,
54           "Duration from actual release time to ideal "
55           "release time, in microseconds.");
56     }
57   }
58 
59   const CanBatchResult can_batch_result =
60       CanBatch(buffer, buf_len, self_address, peer_address, options, params,
61                release_time.actual_release_time);
62 
63   bool buffered = false;
64   bool flush = can_batch_result.must_flush;
65   uint32_t packet_batch_id = 0;
66 
67   if (can_batch_result.can_batch) {
68     QuicBatchWriterBuffer::PushResult push_result =
69         batch_buffer_->PushBufferedWrite(buffer, buf_len, self_address,
70                                          peer_address, options, params,
71                                          release_time.actual_release_time);
72     if (push_result.succeeded) {
73       buffered = true;
74       // If there's no space left after the packet is buffered, force a flush.
75       flush = flush || (batch_buffer_->GetNextWriteLocation() == nullptr);
76       packet_batch_id = push_result.batch_id;
77     } else {
78       // If there's no space without this packet, force a flush.
79       flush = true;
80     }
81   }
82 
83   if (!flush) {
84     WriteResult result(WRITE_STATUS_OK, 0);
85     result.send_time_offset = release_time.release_time_offset;
86     return result;
87   }
88 
89   size_t num_buffered_packets = buffered_writes().size();
90   const FlushImplResult flush_result = CheckedFlush();
91   WriteResult result = flush_result.write_result;
92   QUIC_DVLOG(1) << "Internally flushed " << flush_result.num_packets_sent
93                 << " out of " << num_buffered_packets
94                 << " packets. WriteResult=" << result;
95 
96   if (result.status != WRITE_STATUS_OK) {
97     if (IsWriteBlockedStatus(result.status)) {
98       return WriteResult(buffered ? WRITE_STATUS_BLOCKED_DATA_BUFFERED
99                                   : WRITE_STATUS_BLOCKED,
100                          result.error_code)
101           .set_batch_id(packet_batch_id);
102     }
103 
104     // Drop all packets, including the one being written.
105     size_t dropped_packets =
106         buffered ? buffered_writes().size() : buffered_writes().size() + 1;
107 
108     batch_buffer().Clear();
109     result.dropped_packets =
110         dropped_packets > std::numeric_limits<uint16_t>::max()
111             ? std::numeric_limits<uint16_t>::max()
112             : static_cast<uint16_t>(dropped_packets);
113     return result;
114   }
115 
116   if (!buffered) {
117     QuicBatchWriterBuffer::PushResult push_result =
118         batch_buffer_->PushBufferedWrite(buffer, buf_len, self_address,
119                                          peer_address, options, params,
120                                          release_time.actual_release_time);
121     buffered = push_result.succeeded;
122     packet_batch_id = push_result.batch_id;
123 
124     // Since buffered_writes has been emptied, this write must have been
125     // buffered successfully.
126     QUIC_BUG_IF(quic_bug_10826_1, !buffered)
127         << "Failed to push to an empty batch buffer."
128         << "  self_addr:" << self_address.ToString()
129         << ", peer_addr:" << peer_address.ToString() << ", buf_len:" << buf_len;
130   }
131 
132   result.send_time_offset = release_time.release_time_offset;
133   result.batch_id = packet_batch_id;
134   return result;
135 }
136 
CheckedFlush()137 QuicBatchWriterBase::FlushImplResult QuicBatchWriterBase::CheckedFlush() {
138   if (buffered_writes().empty()) {
139     return FlushImplResult{WriteResult(WRITE_STATUS_OK, 0),
140                            /*num_packets_sent=*/0, /*bytes_written=*/0};
141   }
142 
143   const FlushImplResult flush_result = FlushImpl();
144 
145   // Either flush_result.write_result.status is not WRITE_STATUS_OK, or it is
146   // WRITE_STATUS_OK and batch_buffer is empty.
147   QUICHE_DCHECK(flush_result.write_result.status != WRITE_STATUS_OK ||
148                 buffered_writes().empty());
149 
150   // Flush should never return WRITE_STATUS_BLOCKED_DATA_BUFFERED.
151   QUICHE_DCHECK(flush_result.write_result.status !=
152                 WRITE_STATUS_BLOCKED_DATA_BUFFERED);
153 
154   return flush_result;
155 }
156 
Flush()157 WriteResult QuicBatchWriterBase::Flush() {
158   size_t num_buffered_packets = buffered_writes().size();
159   FlushImplResult flush_result = CheckedFlush();
160   QUIC_DVLOG(1) << "Externally flushed " << flush_result.num_packets_sent
161                 << " out of " << num_buffered_packets
162                 << " packets. WriteResult=" << flush_result.write_result;
163 
164   if (IsWriteError(flush_result.write_result.status)) {
165     if (buffered_writes().size() > std::numeric_limits<uint16_t>::max()) {
166       flush_result.write_result.dropped_packets =
167           std::numeric_limits<uint16_t>::max();
168     } else {
169       flush_result.write_result.dropped_packets =
170           static_cast<uint16_t>(buffered_writes().size());
171     }
172     // Treat all errors as non-retryable fatal errors. Drop all buffered packets
173     // to avoid sending them and getting the same error again.
174     batch_buffer().Clear();
175   }
176 
177   if (flush_result.write_result.status == WRITE_STATUS_BLOCKED) {
178     write_blocked_ = true;
179   }
180   return flush_result.write_result;
181 }
182 
183 }  // namespace quic
184