xref: /aosp_15_r20/external/libbrillo/brillo/streams/stream.cc (revision 1a96fba65179ea7d3f56207137718607415c5953)
1*1a96fba6SXin Li // Copyright 2015 The Chromium OS Authors. All rights reserved.
2*1a96fba6SXin Li // Use of this source code is governed by a BSD-style license that can be
3*1a96fba6SXin Li // found in the LICENSE file.
4*1a96fba6SXin Li 
5*1a96fba6SXin Li #include <brillo/streams/stream.h>
6*1a96fba6SXin Li 
7*1a96fba6SXin Li #include <algorithm>
8*1a96fba6SXin Li 
9*1a96fba6SXin Li #include <base/bind.h>
10*1a96fba6SXin Li #include <brillo/message_loops/message_loop.h>
11*1a96fba6SXin Li #include <brillo/pointer_utils.h>
12*1a96fba6SXin Li #include <brillo/streams/stream_errors.h>
13*1a96fba6SXin Li #include <brillo/streams/stream_utils.h>
14*1a96fba6SXin Li 
15*1a96fba6SXin Li namespace brillo {
16*1a96fba6SXin Li 
TruncateBlocking(ErrorPtr * error)17*1a96fba6SXin Li bool Stream::TruncateBlocking(ErrorPtr* error) {
18*1a96fba6SXin Li   return SetSizeBlocking(GetPosition(), error);
19*1a96fba6SXin Li }
20*1a96fba6SXin Li 
SetPosition(uint64_t position,ErrorPtr * error)21*1a96fba6SXin Li bool Stream::SetPosition(uint64_t position, ErrorPtr* error) {
22*1a96fba6SXin Li   if (!stream_utils::CheckInt64Overflow(FROM_HERE, position, 0, error))
23*1a96fba6SXin Li     return false;
24*1a96fba6SXin Li   return Seek(position, Whence::FROM_BEGIN, nullptr, error);
25*1a96fba6SXin Li }
26*1a96fba6SXin Li 
ReadAsync(void * buffer,size_t size_to_read,const base::Callback<void (size_t)> & success_callback,const ErrorCallback & error_callback,ErrorPtr * error)27*1a96fba6SXin Li bool Stream::ReadAsync(void* buffer,
28*1a96fba6SXin Li                        size_t size_to_read,
29*1a96fba6SXin Li                        const base::Callback<void(size_t)>& success_callback,
30*1a96fba6SXin Li                        const ErrorCallback& error_callback,
31*1a96fba6SXin Li                        ErrorPtr* error) {
32*1a96fba6SXin Li   if (is_async_read_pending_) {
33*1a96fba6SXin Li     Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
34*1a96fba6SXin Li                  errors::stream::kOperationNotSupported,
35*1a96fba6SXin Li                  "Another asynchronous operation is still pending");
36*1a96fba6SXin Li     return false;
37*1a96fba6SXin Li   }
38*1a96fba6SXin Li 
39*1a96fba6SXin Li   auto callback = base::Bind(&Stream::IgnoreEOSCallback, success_callback);
40*1a96fba6SXin Li   // If we can read some data right away non-blocking we should still run the
41*1a96fba6SXin Li   // callback from the main loop, so we pass true here for force_async_callback.
42*1a96fba6SXin Li   return ReadAsyncImpl(buffer, size_to_read, callback, error_callback, error,
43*1a96fba6SXin Li                        true);
44*1a96fba6SXin Li }
45*1a96fba6SXin Li 
ReadAllAsync(void * buffer,size_t size_to_read,const base::Closure & success_callback,const ErrorCallback & error_callback,ErrorPtr * error)46*1a96fba6SXin Li bool Stream::ReadAllAsync(void* buffer,
47*1a96fba6SXin Li                           size_t size_to_read,
48*1a96fba6SXin Li                           const base::Closure& success_callback,
49*1a96fba6SXin Li                           const ErrorCallback& error_callback,
50*1a96fba6SXin Li                           ErrorPtr* error) {
51*1a96fba6SXin Li   if (is_async_read_pending_) {
52*1a96fba6SXin Li     Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
53*1a96fba6SXin Li                  errors::stream::kOperationNotSupported,
54*1a96fba6SXin Li                  "Another asynchronous operation is still pending");
55*1a96fba6SXin Li     return false;
56*1a96fba6SXin Li   }
57*1a96fba6SXin Li 
58*1a96fba6SXin Li   auto callback = base::Bind(&Stream::ReadAllAsyncCallback,
59*1a96fba6SXin Li                              weak_ptr_factory_.GetWeakPtr(), buffer,
60*1a96fba6SXin Li                              size_to_read, success_callback, error_callback);
61*1a96fba6SXin Li   return ReadAsyncImpl(buffer, size_to_read, callback, error_callback, error,
62*1a96fba6SXin Li                        true);
63*1a96fba6SXin Li }
64*1a96fba6SXin Li 
ReadBlocking(void * buffer,size_t size_to_read,size_t * size_read,ErrorPtr * error)65*1a96fba6SXin Li bool Stream::ReadBlocking(void* buffer,
66*1a96fba6SXin Li                           size_t size_to_read,
67*1a96fba6SXin Li                           size_t* size_read,
68*1a96fba6SXin Li                           ErrorPtr* error) {
69*1a96fba6SXin Li   for (;;) {
70*1a96fba6SXin Li     bool eos = false;
71*1a96fba6SXin Li     if (!ReadNonBlocking(buffer, size_to_read, size_read, &eos, error))
72*1a96fba6SXin Li       return false;
73*1a96fba6SXin Li 
74*1a96fba6SXin Li     if (*size_read > 0 || eos)
75*1a96fba6SXin Li       break;
76*1a96fba6SXin Li 
77*1a96fba6SXin Li     if (!WaitForDataBlocking(AccessMode::READ, base::TimeDelta::Max(), nullptr,
78*1a96fba6SXin Li                              error)) {
79*1a96fba6SXin Li       return false;
80*1a96fba6SXin Li     }
81*1a96fba6SXin Li   }
82*1a96fba6SXin Li   return true;
83*1a96fba6SXin Li }
84*1a96fba6SXin Li 
ReadAllBlocking(void * buffer,size_t size_to_read,ErrorPtr * error)85*1a96fba6SXin Li bool Stream::ReadAllBlocking(void* buffer,
86*1a96fba6SXin Li                              size_t size_to_read,
87*1a96fba6SXin Li                              ErrorPtr* error) {
88*1a96fba6SXin Li   while (size_to_read > 0) {
89*1a96fba6SXin Li     size_t size_read = 0;
90*1a96fba6SXin Li     if (!ReadBlocking(buffer, size_to_read, &size_read, error))
91*1a96fba6SXin Li       return false;
92*1a96fba6SXin Li 
93*1a96fba6SXin Li     if (size_read == 0)
94*1a96fba6SXin Li       return stream_utils::ErrorReadPastEndOfStream(FROM_HERE, error);
95*1a96fba6SXin Li 
96*1a96fba6SXin Li     size_to_read -= size_read;
97*1a96fba6SXin Li     buffer = AdvancePointer(buffer, size_read);
98*1a96fba6SXin Li   }
99*1a96fba6SXin Li   return true;
100*1a96fba6SXin Li }
101*1a96fba6SXin Li 
WriteAsync(const void * buffer,size_t size_to_write,const base::Callback<void (size_t)> & success_callback,const ErrorCallback & error_callback,ErrorPtr * error)102*1a96fba6SXin Li bool Stream::WriteAsync(const void* buffer,
103*1a96fba6SXin Li                         size_t size_to_write,
104*1a96fba6SXin Li                         const base::Callback<void(size_t)>& success_callback,
105*1a96fba6SXin Li                         const ErrorCallback& error_callback,
106*1a96fba6SXin Li                         ErrorPtr* error) {
107*1a96fba6SXin Li   if (is_async_write_pending_) {
108*1a96fba6SXin Li     Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
109*1a96fba6SXin Li                  errors::stream::kOperationNotSupported,
110*1a96fba6SXin Li                  "Another asynchronous operation is still pending");
111*1a96fba6SXin Li     return false;
112*1a96fba6SXin Li   }
113*1a96fba6SXin Li   // If we can read some data right away non-blocking we should still run the
114*1a96fba6SXin Li   // callback from the main loop, so we pass true here for force_async_callback.
115*1a96fba6SXin Li   return WriteAsyncImpl(buffer, size_to_write, success_callback, error_callback,
116*1a96fba6SXin Li                         error, true);
117*1a96fba6SXin Li }
118*1a96fba6SXin Li 
WriteAllAsync(const void * buffer,size_t size_to_write,const base::Closure & success_callback,const ErrorCallback & error_callback,ErrorPtr * error)119*1a96fba6SXin Li bool Stream::WriteAllAsync(const void* buffer,
120*1a96fba6SXin Li                            size_t size_to_write,
121*1a96fba6SXin Li                            const base::Closure& success_callback,
122*1a96fba6SXin Li                            const ErrorCallback& error_callback,
123*1a96fba6SXin Li                            ErrorPtr* error) {
124*1a96fba6SXin Li   if (is_async_write_pending_) {
125*1a96fba6SXin Li     Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
126*1a96fba6SXin Li                  errors::stream::kOperationNotSupported,
127*1a96fba6SXin Li                  "Another asynchronous operation is still pending");
128*1a96fba6SXin Li     return false;
129*1a96fba6SXin Li   }
130*1a96fba6SXin Li 
131*1a96fba6SXin Li   auto callback = base::Bind(&Stream::WriteAllAsyncCallback,
132*1a96fba6SXin Li                              weak_ptr_factory_.GetWeakPtr(), buffer,
133*1a96fba6SXin Li                              size_to_write, success_callback, error_callback);
134*1a96fba6SXin Li   return WriteAsyncImpl(buffer, size_to_write, callback, error_callback, error,
135*1a96fba6SXin Li                         true);
136*1a96fba6SXin Li }
137*1a96fba6SXin Li 
WriteBlocking(const void * buffer,size_t size_to_write,size_t * size_written,ErrorPtr * error)138*1a96fba6SXin Li bool Stream::WriteBlocking(const void* buffer,
139*1a96fba6SXin Li                            size_t size_to_write,
140*1a96fba6SXin Li                            size_t* size_written,
141*1a96fba6SXin Li                            ErrorPtr* error) {
142*1a96fba6SXin Li   for (;;) {
143*1a96fba6SXin Li     if (!WriteNonBlocking(buffer, size_to_write, size_written, error))
144*1a96fba6SXin Li       return false;
145*1a96fba6SXin Li 
146*1a96fba6SXin Li     if (*size_written > 0 || size_to_write == 0)
147*1a96fba6SXin Li       break;
148*1a96fba6SXin Li 
149*1a96fba6SXin Li     if (!WaitForDataBlocking(AccessMode::WRITE, base::TimeDelta::Max(), nullptr,
150*1a96fba6SXin Li                              error)) {
151*1a96fba6SXin Li       return false;
152*1a96fba6SXin Li     }
153*1a96fba6SXin Li   }
154*1a96fba6SXin Li   return true;
155*1a96fba6SXin Li }
156*1a96fba6SXin Li 
WriteAllBlocking(const void * buffer,size_t size_to_write,ErrorPtr * error)157*1a96fba6SXin Li bool Stream::WriteAllBlocking(const void* buffer,
158*1a96fba6SXin Li                               size_t size_to_write,
159*1a96fba6SXin Li                               ErrorPtr* error) {
160*1a96fba6SXin Li   while (size_to_write > 0) {
161*1a96fba6SXin Li     size_t size_written = 0;
162*1a96fba6SXin Li     if (!WriteBlocking(buffer, size_to_write, &size_written, error))
163*1a96fba6SXin Li       return false;
164*1a96fba6SXin Li 
165*1a96fba6SXin Li     if (size_written == 0) {
166*1a96fba6SXin Li       Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
167*1a96fba6SXin Li                    errors::stream::kPartialData,
168*1a96fba6SXin Li                    "Failed to write all the data");
169*1a96fba6SXin Li       return false;
170*1a96fba6SXin Li     }
171*1a96fba6SXin Li     size_to_write -= size_written;
172*1a96fba6SXin Li     buffer = AdvancePointer(buffer, size_written);
173*1a96fba6SXin Li   }
174*1a96fba6SXin Li   return true;
175*1a96fba6SXin Li }
176*1a96fba6SXin Li 
FlushAsync(const base::Closure & success_callback,const ErrorCallback & error_callback,ErrorPtr *)177*1a96fba6SXin Li bool Stream::FlushAsync(const base::Closure& success_callback,
178*1a96fba6SXin Li                         const ErrorCallback& error_callback,
179*1a96fba6SXin Li                         ErrorPtr* /* error */) {
180*1a96fba6SXin Li   auto callback = base::Bind(&Stream::FlushAsyncCallback,
181*1a96fba6SXin Li                              weak_ptr_factory_.GetWeakPtr(),
182*1a96fba6SXin Li                              success_callback, error_callback);
183*1a96fba6SXin Li   MessageLoop::current()->PostTask(FROM_HERE, callback);
184*1a96fba6SXin Li   return true;
185*1a96fba6SXin Li }
186*1a96fba6SXin Li 
IgnoreEOSCallback(const base::Callback<void (size_t)> & success_callback,size_t bytes,bool)187*1a96fba6SXin Li void Stream::IgnoreEOSCallback(
188*1a96fba6SXin Li     const base::Callback<void(size_t)>& success_callback,
189*1a96fba6SXin Li     size_t bytes,
190*1a96fba6SXin Li     bool /* eos */) {
191*1a96fba6SXin Li   success_callback.Run(bytes);
192*1a96fba6SXin Li }
193*1a96fba6SXin Li 
ReadAsyncImpl(void * buffer,size_t size_to_read,const base::Callback<void (size_t,bool)> & success_callback,const ErrorCallback & error_callback,ErrorPtr * error,bool force_async_callback)194*1a96fba6SXin Li bool Stream::ReadAsyncImpl(
195*1a96fba6SXin Li     void* buffer,
196*1a96fba6SXin Li     size_t size_to_read,
197*1a96fba6SXin Li     const base::Callback<void(size_t, bool)>& success_callback,
198*1a96fba6SXin Li     const ErrorCallback& error_callback,
199*1a96fba6SXin Li     ErrorPtr* error,
200*1a96fba6SXin Li     bool force_async_callback) {
201*1a96fba6SXin Li   CHECK(!is_async_read_pending_);
202*1a96fba6SXin Li   // We set this value to true early in the function so calling others will
203*1a96fba6SXin Li   // prevent us from calling WaitForData() to make calls to
204*1a96fba6SXin Li   // ReadAsync() fail while we run WaitForData().
205*1a96fba6SXin Li   is_async_read_pending_ = true;
206*1a96fba6SXin Li 
207*1a96fba6SXin Li   size_t read = 0;
208*1a96fba6SXin Li   bool eos = false;
209*1a96fba6SXin Li   if (!ReadNonBlocking(buffer, size_to_read, &read, &eos, error))
210*1a96fba6SXin Li     return false;
211*1a96fba6SXin Li 
212*1a96fba6SXin Li   if (read > 0 || eos) {
213*1a96fba6SXin Li     if (force_async_callback) {
214*1a96fba6SXin Li       MessageLoop::current()->PostTask(
215*1a96fba6SXin Li           FROM_HERE,
216*1a96fba6SXin Li           base::BindOnce(&Stream::OnReadAsyncDone,
217*1a96fba6SXin Li                          weak_ptr_factory_.GetWeakPtr(),
218*1a96fba6SXin Li                          success_callback, read, eos));
219*1a96fba6SXin Li     } else {
220*1a96fba6SXin Li       is_async_read_pending_ = false;
221*1a96fba6SXin Li       success_callback.Run(read, eos);
222*1a96fba6SXin Li     }
223*1a96fba6SXin Li     return true;
224*1a96fba6SXin Li   }
225*1a96fba6SXin Li 
226*1a96fba6SXin Li   is_async_read_pending_ = WaitForData(
227*1a96fba6SXin Li       AccessMode::READ,
228*1a96fba6SXin Li       base::Bind(&Stream::OnReadAvailable, weak_ptr_factory_.GetWeakPtr(),
229*1a96fba6SXin Li                  buffer, size_to_read, success_callback, error_callback),
230*1a96fba6SXin Li       error);
231*1a96fba6SXin Li   return is_async_read_pending_;
232*1a96fba6SXin Li }
233*1a96fba6SXin Li 
OnReadAsyncDone(const base::Callback<void (size_t,bool)> & success_callback,size_t bytes_read,bool eos)234*1a96fba6SXin Li void Stream::OnReadAsyncDone(
235*1a96fba6SXin Li     const base::Callback<void(size_t, bool)>& success_callback,
236*1a96fba6SXin Li     size_t bytes_read,
237*1a96fba6SXin Li     bool eos) {
238*1a96fba6SXin Li   is_async_read_pending_ = false;
239*1a96fba6SXin Li   success_callback.Run(bytes_read, eos);
240*1a96fba6SXin Li }
241*1a96fba6SXin Li 
OnReadAvailable(void * buffer,size_t size_to_read,const base::Callback<void (size_t,bool)> & success_callback,const ErrorCallback & error_callback,AccessMode mode)242*1a96fba6SXin Li void Stream::OnReadAvailable(
243*1a96fba6SXin Li     void* buffer,
244*1a96fba6SXin Li     size_t size_to_read,
245*1a96fba6SXin Li     const base::Callback<void(size_t, bool)>& success_callback,
246*1a96fba6SXin Li     const ErrorCallback& error_callback,
247*1a96fba6SXin Li     AccessMode mode) {
248*1a96fba6SXin Li   CHECK(stream_utils::IsReadAccessMode(mode));
249*1a96fba6SXin Li   CHECK(is_async_read_pending_);
250*1a96fba6SXin Li   is_async_read_pending_ = false;
251*1a96fba6SXin Li   ErrorPtr error;
252*1a96fba6SXin Li   // Just reschedule the read operation but don't need to run the callback from
253*1a96fba6SXin Li   // the main loop since we are already running on a callback.
254*1a96fba6SXin Li   if (!ReadAsyncImpl(buffer, size_to_read, success_callback, error_callback,
255*1a96fba6SXin Li                      &error, false)) {
256*1a96fba6SXin Li     error_callback.Run(error.get());
257*1a96fba6SXin Li   }
258*1a96fba6SXin Li }
259*1a96fba6SXin Li 
WriteAsyncImpl(const void * buffer,size_t size_to_write,const base::Callback<void (size_t)> & success_callback,const ErrorCallback & error_callback,ErrorPtr * error,bool force_async_callback)260*1a96fba6SXin Li bool Stream::WriteAsyncImpl(
261*1a96fba6SXin Li     const void* buffer,
262*1a96fba6SXin Li     size_t size_to_write,
263*1a96fba6SXin Li     const base::Callback<void(size_t)>& success_callback,
264*1a96fba6SXin Li     const ErrorCallback& error_callback,
265*1a96fba6SXin Li     ErrorPtr* error,
266*1a96fba6SXin Li     bool force_async_callback) {
267*1a96fba6SXin Li   CHECK(!is_async_write_pending_);
268*1a96fba6SXin Li   // We set this value to true early in the function so calling others will
269*1a96fba6SXin Li   // prevent us from calling WaitForData() to make calls to
270*1a96fba6SXin Li   // ReadAsync() fail while we run WaitForData().
271*1a96fba6SXin Li   is_async_write_pending_ = true;
272*1a96fba6SXin Li 
273*1a96fba6SXin Li   size_t written = 0;
274*1a96fba6SXin Li   if (!WriteNonBlocking(buffer, size_to_write, &written, error))
275*1a96fba6SXin Li     return false;
276*1a96fba6SXin Li 
277*1a96fba6SXin Li   if (written > 0) {
278*1a96fba6SXin Li     if (force_async_callback) {
279*1a96fba6SXin Li       MessageLoop::current()->PostTask(
280*1a96fba6SXin Li           FROM_HERE,
281*1a96fba6SXin Li           base::BindOnce(&Stream::OnWriteAsyncDone,
282*1a96fba6SXin Li                          weak_ptr_factory_.GetWeakPtr(),
283*1a96fba6SXin Li                          success_callback, written));
284*1a96fba6SXin Li     } else {
285*1a96fba6SXin Li       is_async_write_pending_ = false;
286*1a96fba6SXin Li       success_callback.Run(written);
287*1a96fba6SXin Li     }
288*1a96fba6SXin Li     return true;
289*1a96fba6SXin Li   }
290*1a96fba6SXin Li   is_async_write_pending_ = WaitForData(
291*1a96fba6SXin Li       AccessMode::WRITE,
292*1a96fba6SXin Li       base::Bind(&Stream::OnWriteAvailable, weak_ptr_factory_.GetWeakPtr(),
293*1a96fba6SXin Li                  buffer, size_to_write, success_callback, error_callback),
294*1a96fba6SXin Li       error);
295*1a96fba6SXin Li   return is_async_write_pending_;
296*1a96fba6SXin Li }
297*1a96fba6SXin Li 
OnWriteAsyncDone(const base::Callback<void (size_t)> & success_callback,size_t size_written)298*1a96fba6SXin Li void Stream::OnWriteAsyncDone(
299*1a96fba6SXin Li     const base::Callback<void(size_t)>& success_callback,
300*1a96fba6SXin Li     size_t size_written) {
301*1a96fba6SXin Li   is_async_write_pending_ = false;
302*1a96fba6SXin Li   success_callback.Run(size_written);
303*1a96fba6SXin Li }
304*1a96fba6SXin Li 
OnWriteAvailable(const void * buffer,size_t size,const base::Callback<void (size_t)> & success_callback,const ErrorCallback & error_callback,AccessMode mode)305*1a96fba6SXin Li void Stream::OnWriteAvailable(
306*1a96fba6SXin Li     const void* buffer,
307*1a96fba6SXin Li     size_t size,
308*1a96fba6SXin Li     const base::Callback<void(size_t)>& success_callback,
309*1a96fba6SXin Li     const ErrorCallback& error_callback,
310*1a96fba6SXin Li     AccessMode mode) {
311*1a96fba6SXin Li   CHECK(stream_utils::IsWriteAccessMode(mode));
312*1a96fba6SXin Li   CHECK(is_async_write_pending_);
313*1a96fba6SXin Li   is_async_write_pending_ = false;
314*1a96fba6SXin Li   ErrorPtr error;
315*1a96fba6SXin Li   // Just reschedule the read operation but don't need to run the callback from
316*1a96fba6SXin Li   // the main loop since we are already running on a callback.
317*1a96fba6SXin Li   if (!WriteAsyncImpl(buffer, size, success_callback, error_callback, &error,
318*1a96fba6SXin Li                       false)) {
319*1a96fba6SXin Li     error_callback.Run(error.get());
320*1a96fba6SXin Li   }
321*1a96fba6SXin Li }
322*1a96fba6SXin Li 
ReadAllAsyncCallback(void * buffer,size_t size_to_read,const base::Closure & success_callback,const ErrorCallback & error_callback,size_t size_read,bool eos)323*1a96fba6SXin Li void Stream::ReadAllAsyncCallback(void* buffer,
324*1a96fba6SXin Li                                   size_t size_to_read,
325*1a96fba6SXin Li                                   const base::Closure& success_callback,
326*1a96fba6SXin Li                                   const ErrorCallback& error_callback,
327*1a96fba6SXin Li                                   size_t size_read,
328*1a96fba6SXin Li                                   bool eos) {
329*1a96fba6SXin Li   ErrorPtr error;
330*1a96fba6SXin Li   size_to_read -= size_read;
331*1a96fba6SXin Li   if (size_to_read != 0 && eos) {
332*1a96fba6SXin Li     stream_utils::ErrorReadPastEndOfStream(FROM_HERE, &error);
333*1a96fba6SXin Li     error_callback.Run(error.get());
334*1a96fba6SXin Li     return;
335*1a96fba6SXin Li   }
336*1a96fba6SXin Li 
337*1a96fba6SXin Li   if (size_to_read) {
338*1a96fba6SXin Li     buffer = AdvancePointer(buffer, size_read);
339*1a96fba6SXin Li     auto callback = base::Bind(&Stream::ReadAllAsyncCallback,
340*1a96fba6SXin Li                                weak_ptr_factory_.GetWeakPtr(), buffer,
341*1a96fba6SXin Li                                size_to_read, success_callback, error_callback);
342*1a96fba6SXin Li     if (!ReadAsyncImpl(buffer, size_to_read, callback, error_callback, &error,
343*1a96fba6SXin Li                        false)) {
344*1a96fba6SXin Li       error_callback.Run(error.get());
345*1a96fba6SXin Li     }
346*1a96fba6SXin Li   } else {
347*1a96fba6SXin Li     success_callback.Run();
348*1a96fba6SXin Li   }
349*1a96fba6SXin Li }
350*1a96fba6SXin Li 
WriteAllAsyncCallback(const void * buffer,size_t size_to_write,const base::Closure & success_callback,const ErrorCallback & error_callback,size_t size_written)351*1a96fba6SXin Li void Stream::WriteAllAsyncCallback(const void* buffer,
352*1a96fba6SXin Li                                    size_t size_to_write,
353*1a96fba6SXin Li                                    const base::Closure& success_callback,
354*1a96fba6SXin Li                                    const ErrorCallback& error_callback,
355*1a96fba6SXin Li                                    size_t size_written) {
356*1a96fba6SXin Li   ErrorPtr error;
357*1a96fba6SXin Li   if (size_to_write != 0 && size_written == 0) {
358*1a96fba6SXin Li     Error::AddTo(&error, FROM_HERE, errors::stream::kDomain,
359*1a96fba6SXin Li                  errors::stream::kPartialData, "Failed to write all the data");
360*1a96fba6SXin Li     error_callback.Run(error.get());
361*1a96fba6SXin Li     return;
362*1a96fba6SXin Li   }
363*1a96fba6SXin Li   size_to_write -= size_written;
364*1a96fba6SXin Li   if (size_to_write) {
365*1a96fba6SXin Li     buffer = AdvancePointer(buffer, size_written);
366*1a96fba6SXin Li     auto callback = base::Bind(&Stream::WriteAllAsyncCallback,
367*1a96fba6SXin Li                                weak_ptr_factory_.GetWeakPtr(), buffer,
368*1a96fba6SXin Li                                size_to_write, success_callback, error_callback);
369*1a96fba6SXin Li     if (!WriteAsyncImpl(buffer, size_to_write, callback, error_callback, &error,
370*1a96fba6SXin Li                         false)) {
371*1a96fba6SXin Li       error_callback.Run(error.get());
372*1a96fba6SXin Li     }
373*1a96fba6SXin Li   } else {
374*1a96fba6SXin Li     success_callback.Run();
375*1a96fba6SXin Li   }
376*1a96fba6SXin Li }
377*1a96fba6SXin Li 
FlushAsyncCallback(const base::Closure & success_callback,const ErrorCallback & error_callback)378*1a96fba6SXin Li void Stream::FlushAsyncCallback(const base::Closure& success_callback,
379*1a96fba6SXin Li                                 const ErrorCallback& error_callback) {
380*1a96fba6SXin Li   ErrorPtr error;
381*1a96fba6SXin Li   if (FlushBlocking(&error)) {
382*1a96fba6SXin Li     success_callback.Run();
383*1a96fba6SXin Li   } else {
384*1a96fba6SXin Li     error_callback.Run(error.get());
385*1a96fba6SXin Li   }
386*1a96fba6SXin Li }
387*1a96fba6SXin Li 
CancelPendingAsyncOperations()388*1a96fba6SXin Li void Stream::CancelPendingAsyncOperations() {
389*1a96fba6SXin Li   weak_ptr_factory_.InvalidateWeakPtrs();
390*1a96fba6SXin Li   is_async_read_pending_ = false;
391*1a96fba6SXin Li   is_async_write_pending_ = false;
392*1a96fba6SXin Li }
393*1a96fba6SXin Li 
394*1a96fba6SXin Li }  // namespace brillo
395