1 // Copyright 2023 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // General-purpose abstractions for read/write streams.
6
7 #ifndef QUICHE_COMMON_QUICHE_STREAM_H_
8 #define QUICHE_COMMON_QUICHE_STREAM_H_
9
10 #include <cstddef>
11 #include <string>
12
13 #include "absl/status/status.h"
14 #include "absl/strings/string_view.h"
15 #include "absl/types/span.h"
16 #include "quiche/common/platform/api/quiche_export.h"
17 #include "quiche/common/quiche_callbacks.h"
18
19 namespace quiche {
20
21 // A shared base class for read and write stream to support abrupt termination.
22 class QUICHE_EXPORT TerminableStream {
23 public:
24 virtual ~TerminableStream() = default;
25
26 // Abruptly terminate the stream due to an error. If `error` is not OK, it may
27 // carry the error information that could be potentially communicated to the
28 // peer in case the stream is remote. If the stream is a duplex stream, both
29 // ends of the stream are terminated.
30 virtual void AbruptlyTerminate(absl::Status error) = 0;
31 };
32
33 // A general-purpose visitor API that gets notifications for ReadStream-related
34 // events.
35 class QUICHE_EXPORT ReadStreamVisitor {
36 public:
37 virtual ~ReadStreamVisitor() = default;
38
39 // Called whenever the stream has new data available to read. Unless otherwise
40 // specified, QUICHE stream reads are level-triggered, which means that the
41 // callback will be called repeatedly as long as there is still data in the
42 // buffer.
43 virtual void OnCanRead() = 0;
44 };
45
46 // General purpose abstraction for a stream of data that can be read from the
47 // network. The class is designed around the idea that a network stream stores
48 // all of the received data in a sequence of contiguous buffers. Because of
49 // that, there are two ways to read from a stream:
50 // - Read() will copy data into a user-provided buffer, reassembling it if it
51 // is split across multiple buffers internally.
52 // - PeekNextReadableRegion()/SkipBytes() let the caller access the underlying
53 // buffers directly, potentially avoiding the copying at the cost of the
54 // caller having to deal with discontinuities.
55 class QUICHE_EXPORT ReadStream {
56 public:
57 struct QUICHE_EXPORT ReadResult {
58 // Number of bytes actually read.
59 size_t bytes_read = 0;
60 // Whether the FIN has been received; if true, no further data will arrive
61 // on the stream, and the stream object can be soon potentially garbage
62 // collected.
63 bool fin = false;
64 };
65
66 struct PeekResult {
67 // The next available chunk in the sequencer buffer.
68 absl::string_view peeked_data;
69 // True if all of the data up to the FIN has been read.
70 bool fin_next = false;
71 // True if all of the data up to the FIN has been received (but not
72 // necessarily read).
73 bool all_data_received = false;
74
75 // Indicates that `SkipBytes()` will make progress if called.
has_dataPeekResult76 bool has_data() const { return !peeked_data.empty() || fin_next; }
77 };
78
79 virtual ~ReadStream() = default;
80
81 // Reads at most `buffer.size()` bytes into `buffer`.
82 [[nodiscard]] virtual ReadResult Read(absl::Span<char> buffer) = 0;
83
84 // Reads all available data and appends it to the end of `output`.
85 [[nodiscard]] virtual ReadResult Read(std::string* output) = 0;
86
87 // Indicates the total number of bytes that can be read from the stream.
88 virtual size_t ReadableBytes() const = 0;
89
90 // Returns a contiguous buffer to read (or an empty buffer, if there is no
91 // data to read). See `ProcessAllReadableRegions` below for an example of how
92 // to use this method while handling FIN correctly.
93 virtual PeekResult PeekNextReadableRegion() const = 0;
94
95 // Equivalent to reading `bytes`, but does not perform any copying. `bytes`
96 // must be less than or equal to `ReadableBytes()`. The return value indicates
97 // if the FIN has been reached. `SkipBytes(0)` can be used to consume the FIN
98 // if it's the only thing remaining on the stream.
99 [[nodiscard]] virtual bool SkipBytes(size_t bytes) = 0;
100 };
101
102 // Calls `callback` for every contiguous chunk available inside the stream.
103 // Returns true if the FIN has been reached.
ProcessAllReadableRegions(ReadStream & stream,UnretainedCallback<void (absl::string_view)> callback)104 inline bool ProcessAllReadableRegions(
105 ReadStream& stream, UnretainedCallback<void(absl::string_view)> callback) {
106 for (;;) {
107 ReadStream::PeekResult peek_result = stream.PeekNextReadableRegion();
108 if (!peek_result.has_data()) {
109 return false;
110 }
111 callback(peek_result.peeked_data);
112 bool fin = stream.SkipBytes(peek_result.peeked_data.size());
113 if (fin) {
114 return true;
115 }
116 }
117 }
118
119 // A general-purpose visitor API that gets notifications for WriteStream-related
120 // events.
121 class QUICHE_EXPORT WriteStreamVisitor {
122 public:
~WriteStreamVisitor()123 virtual ~WriteStreamVisitor() {}
124
125 // Called whenever the stream is not write-blocked and can accept new data.
126 virtual void OnCanWrite() = 0;
127 };
128
129 // Options for writing data into a WriteStream.
130 class QUICHE_EXPORT StreamWriteOptions {
131 public:
132 StreamWriteOptions() = default;
133
134 // If send_fin() is set to true, the write operation also sends a FIN on the
135 // stream.
send_fin()136 bool send_fin() const { return send_fin_; }
set_send_fin(bool send_fin)137 void set_send_fin(bool send_fin) { send_fin_ = send_fin; }
138
139 // If buffer_unconditionally() is set to true, the write operation will buffer
140 // data even if the internal buffer limit is exceeded.
buffer_unconditionally()141 bool buffer_unconditionally() const { return buffer_unconditionally_; }
set_buffer_unconditionally(bool value)142 void set_buffer_unconditionally(bool value) {
143 buffer_unconditionally_ = value;
144 }
145
146 private:
147 bool send_fin_ = false;
148 bool buffer_unconditionally_ = false;
149 };
150
151 inline constexpr StreamWriteOptions kDefaultStreamWriteOptions =
152 StreamWriteOptions();
153
154 // WriteStream is an object that can accept a stream of bytes.
155 //
156 // The writes into a WriteStream are all-or-nothing. A WriteStream object has
157 // to either accept all data written into it by returning absl::OkStatus, or ask
158 // the caller to try again once via OnCanWrite() by returning
159 // absl::UnavailableError.
160 class QUICHE_EXPORT WriteStream {
161 public:
~WriteStream()162 virtual ~WriteStream() {}
163
164 // Writes |data| into the stream.
165 virtual absl::Status Writev(absl::Span<const absl::string_view> data,
166 const StreamWriteOptions& options) = 0;
167
168 // Indicates whether it is possible to write into stream right now.
169 virtual bool CanWrite() const = 0;
170
171 // Legacy convenience method for writing a single string_view. New users
172 // should use quiche::WriteIntoStream instead, since this method does not
173 // return useful failure information.
SendFin()174 [[nodiscard]] bool SendFin() {
175 StreamWriteOptions options;
176 options.set_send_fin(true);
177 return Writev(absl::Span<const absl::string_view>(), options).ok();
178 }
179
180 // Legacy convenience method for writing a single string_view. New users
181 // should use quiche::WriteIntoStream instead, since this method does not
182 // return useful failure information.
Write(absl::string_view data)183 [[nodiscard]] bool Write(absl::string_view data) {
184 return Writev(absl::MakeSpan(&data, 1), kDefaultStreamWriteOptions).ok();
185 }
186 };
187
188 // Convenience methods to write a single chunk of data into the stream.
189 inline absl::Status WriteIntoStream(
190 WriteStream& stream, absl::string_view data,
191 const StreamWriteOptions& options = kDefaultStreamWriteOptions) {
192 return stream.Writev(absl::MakeSpan(&data, 1), options);
193 }
194
195 // Convenience methods to send a FIN on the stream.
SendFinOnStream(WriteStream & stream)196 inline absl::Status SendFinOnStream(WriteStream& stream) {
197 StreamWriteOptions options;
198 options.set_send_fin(true);
199 return stream.Writev(absl::Span<const absl::string_view>(), options);
200 }
201
TotalStringViewSpanSize(absl::Span<const absl::string_view> span)202 inline size_t TotalStringViewSpanSize(
203 absl::Span<const absl::string_view> span) {
204 size_t total = 0;
205 for (absl::string_view view : span) {
206 total += view.size();
207 }
208 return total;
209 }
210
211 } // namespace quiche
212
213 #endif // QUICHE_COMMON_QUICHE_STREAM_H_
214