1*1a96fba6SXin Li // Copyright 2015 The Chromium OS Authors. All rights reserved.
2*1a96fba6SXin Li // Use of this source code is governed by a BSD-style license that can be
3*1a96fba6SXin Li // found in the LICENSE file.
4*1a96fba6SXin Li
5*1a96fba6SXin Li #include <brillo/streams/stream.h>
6*1a96fba6SXin Li
7*1a96fba6SXin Li #include <algorithm>
8*1a96fba6SXin Li
9*1a96fba6SXin Li #include <base/bind.h>
10*1a96fba6SXin Li #include <brillo/message_loops/message_loop.h>
11*1a96fba6SXin Li #include <brillo/pointer_utils.h>
12*1a96fba6SXin Li #include <brillo/streams/stream_errors.h>
13*1a96fba6SXin Li #include <brillo/streams/stream_utils.h>
14*1a96fba6SXin Li
15*1a96fba6SXin Li namespace brillo {
16*1a96fba6SXin Li
TruncateBlocking(ErrorPtr * error)17*1a96fba6SXin Li bool Stream::TruncateBlocking(ErrorPtr* error) {
18*1a96fba6SXin Li return SetSizeBlocking(GetPosition(), error);
19*1a96fba6SXin Li }
20*1a96fba6SXin Li
SetPosition(uint64_t position,ErrorPtr * error)21*1a96fba6SXin Li bool Stream::SetPosition(uint64_t position, ErrorPtr* error) {
22*1a96fba6SXin Li if (!stream_utils::CheckInt64Overflow(FROM_HERE, position, 0, error))
23*1a96fba6SXin Li return false;
24*1a96fba6SXin Li return Seek(position, Whence::FROM_BEGIN, nullptr, error);
25*1a96fba6SXin Li }
26*1a96fba6SXin Li
ReadAsync(void * buffer,size_t size_to_read,const base::Callback<void (size_t)> & success_callback,const ErrorCallback & error_callback,ErrorPtr * error)27*1a96fba6SXin Li bool Stream::ReadAsync(void* buffer,
28*1a96fba6SXin Li size_t size_to_read,
29*1a96fba6SXin Li const base::Callback<void(size_t)>& success_callback,
30*1a96fba6SXin Li const ErrorCallback& error_callback,
31*1a96fba6SXin Li ErrorPtr* error) {
32*1a96fba6SXin Li if (is_async_read_pending_) {
33*1a96fba6SXin Li Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
34*1a96fba6SXin Li errors::stream::kOperationNotSupported,
35*1a96fba6SXin Li "Another asynchronous operation is still pending");
36*1a96fba6SXin Li return false;
37*1a96fba6SXin Li }
38*1a96fba6SXin Li
39*1a96fba6SXin Li auto callback = base::Bind(&Stream::IgnoreEOSCallback, success_callback);
40*1a96fba6SXin Li // If we can read some data right away non-blocking we should still run the
41*1a96fba6SXin Li // callback from the main loop, so we pass true here for force_async_callback.
42*1a96fba6SXin Li return ReadAsyncImpl(buffer, size_to_read, callback, error_callback, error,
43*1a96fba6SXin Li true);
44*1a96fba6SXin Li }
45*1a96fba6SXin Li
ReadAllAsync(void * buffer,size_t size_to_read,const base::Closure & success_callback,const ErrorCallback & error_callback,ErrorPtr * error)46*1a96fba6SXin Li bool Stream::ReadAllAsync(void* buffer,
47*1a96fba6SXin Li size_t size_to_read,
48*1a96fba6SXin Li const base::Closure& success_callback,
49*1a96fba6SXin Li const ErrorCallback& error_callback,
50*1a96fba6SXin Li ErrorPtr* error) {
51*1a96fba6SXin Li if (is_async_read_pending_) {
52*1a96fba6SXin Li Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
53*1a96fba6SXin Li errors::stream::kOperationNotSupported,
54*1a96fba6SXin Li "Another asynchronous operation is still pending");
55*1a96fba6SXin Li return false;
56*1a96fba6SXin Li }
57*1a96fba6SXin Li
58*1a96fba6SXin Li auto callback = base::Bind(&Stream::ReadAllAsyncCallback,
59*1a96fba6SXin Li weak_ptr_factory_.GetWeakPtr(), buffer,
60*1a96fba6SXin Li size_to_read, success_callback, error_callback);
61*1a96fba6SXin Li return ReadAsyncImpl(buffer, size_to_read, callback, error_callback, error,
62*1a96fba6SXin Li true);
63*1a96fba6SXin Li }
64*1a96fba6SXin Li
ReadBlocking(void * buffer,size_t size_to_read,size_t * size_read,ErrorPtr * error)65*1a96fba6SXin Li bool Stream::ReadBlocking(void* buffer,
66*1a96fba6SXin Li size_t size_to_read,
67*1a96fba6SXin Li size_t* size_read,
68*1a96fba6SXin Li ErrorPtr* error) {
69*1a96fba6SXin Li for (;;) {
70*1a96fba6SXin Li bool eos = false;
71*1a96fba6SXin Li if (!ReadNonBlocking(buffer, size_to_read, size_read, &eos, error))
72*1a96fba6SXin Li return false;
73*1a96fba6SXin Li
74*1a96fba6SXin Li if (*size_read > 0 || eos)
75*1a96fba6SXin Li break;
76*1a96fba6SXin Li
77*1a96fba6SXin Li if (!WaitForDataBlocking(AccessMode::READ, base::TimeDelta::Max(), nullptr,
78*1a96fba6SXin Li error)) {
79*1a96fba6SXin Li return false;
80*1a96fba6SXin Li }
81*1a96fba6SXin Li }
82*1a96fba6SXin Li return true;
83*1a96fba6SXin Li }
84*1a96fba6SXin Li
ReadAllBlocking(void * buffer,size_t size_to_read,ErrorPtr * error)85*1a96fba6SXin Li bool Stream::ReadAllBlocking(void* buffer,
86*1a96fba6SXin Li size_t size_to_read,
87*1a96fba6SXin Li ErrorPtr* error) {
88*1a96fba6SXin Li while (size_to_read > 0) {
89*1a96fba6SXin Li size_t size_read = 0;
90*1a96fba6SXin Li if (!ReadBlocking(buffer, size_to_read, &size_read, error))
91*1a96fba6SXin Li return false;
92*1a96fba6SXin Li
93*1a96fba6SXin Li if (size_read == 0)
94*1a96fba6SXin Li return stream_utils::ErrorReadPastEndOfStream(FROM_HERE, error);
95*1a96fba6SXin Li
96*1a96fba6SXin Li size_to_read -= size_read;
97*1a96fba6SXin Li buffer = AdvancePointer(buffer, size_read);
98*1a96fba6SXin Li }
99*1a96fba6SXin Li return true;
100*1a96fba6SXin Li }
101*1a96fba6SXin Li
WriteAsync(const void * buffer,size_t size_to_write,const base::Callback<void (size_t)> & success_callback,const ErrorCallback & error_callback,ErrorPtr * error)102*1a96fba6SXin Li bool Stream::WriteAsync(const void* buffer,
103*1a96fba6SXin Li size_t size_to_write,
104*1a96fba6SXin Li const base::Callback<void(size_t)>& success_callback,
105*1a96fba6SXin Li const ErrorCallback& error_callback,
106*1a96fba6SXin Li ErrorPtr* error) {
107*1a96fba6SXin Li if (is_async_write_pending_) {
108*1a96fba6SXin Li Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
109*1a96fba6SXin Li errors::stream::kOperationNotSupported,
110*1a96fba6SXin Li "Another asynchronous operation is still pending");
111*1a96fba6SXin Li return false;
112*1a96fba6SXin Li }
113*1a96fba6SXin Li // If we can read some data right away non-blocking we should still run the
114*1a96fba6SXin Li // callback from the main loop, so we pass true here for force_async_callback.
115*1a96fba6SXin Li return WriteAsyncImpl(buffer, size_to_write, success_callback, error_callback,
116*1a96fba6SXin Li error, true);
117*1a96fba6SXin Li }
118*1a96fba6SXin Li
WriteAllAsync(const void * buffer,size_t size_to_write,const base::Closure & success_callback,const ErrorCallback & error_callback,ErrorPtr * error)119*1a96fba6SXin Li bool Stream::WriteAllAsync(const void* buffer,
120*1a96fba6SXin Li size_t size_to_write,
121*1a96fba6SXin Li const base::Closure& success_callback,
122*1a96fba6SXin Li const ErrorCallback& error_callback,
123*1a96fba6SXin Li ErrorPtr* error) {
124*1a96fba6SXin Li if (is_async_write_pending_) {
125*1a96fba6SXin Li Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
126*1a96fba6SXin Li errors::stream::kOperationNotSupported,
127*1a96fba6SXin Li "Another asynchronous operation is still pending");
128*1a96fba6SXin Li return false;
129*1a96fba6SXin Li }
130*1a96fba6SXin Li
131*1a96fba6SXin Li auto callback = base::Bind(&Stream::WriteAllAsyncCallback,
132*1a96fba6SXin Li weak_ptr_factory_.GetWeakPtr(), buffer,
133*1a96fba6SXin Li size_to_write, success_callback, error_callback);
134*1a96fba6SXin Li return WriteAsyncImpl(buffer, size_to_write, callback, error_callback, error,
135*1a96fba6SXin Li true);
136*1a96fba6SXin Li }
137*1a96fba6SXin Li
WriteBlocking(const void * buffer,size_t size_to_write,size_t * size_written,ErrorPtr * error)138*1a96fba6SXin Li bool Stream::WriteBlocking(const void* buffer,
139*1a96fba6SXin Li size_t size_to_write,
140*1a96fba6SXin Li size_t* size_written,
141*1a96fba6SXin Li ErrorPtr* error) {
142*1a96fba6SXin Li for (;;) {
143*1a96fba6SXin Li if (!WriteNonBlocking(buffer, size_to_write, size_written, error))
144*1a96fba6SXin Li return false;
145*1a96fba6SXin Li
146*1a96fba6SXin Li if (*size_written > 0 || size_to_write == 0)
147*1a96fba6SXin Li break;
148*1a96fba6SXin Li
149*1a96fba6SXin Li if (!WaitForDataBlocking(AccessMode::WRITE, base::TimeDelta::Max(), nullptr,
150*1a96fba6SXin Li error)) {
151*1a96fba6SXin Li return false;
152*1a96fba6SXin Li }
153*1a96fba6SXin Li }
154*1a96fba6SXin Li return true;
155*1a96fba6SXin Li }
156*1a96fba6SXin Li
WriteAllBlocking(const void * buffer,size_t size_to_write,ErrorPtr * error)157*1a96fba6SXin Li bool Stream::WriteAllBlocking(const void* buffer,
158*1a96fba6SXin Li size_t size_to_write,
159*1a96fba6SXin Li ErrorPtr* error) {
160*1a96fba6SXin Li while (size_to_write > 0) {
161*1a96fba6SXin Li size_t size_written = 0;
162*1a96fba6SXin Li if (!WriteBlocking(buffer, size_to_write, &size_written, error))
163*1a96fba6SXin Li return false;
164*1a96fba6SXin Li
165*1a96fba6SXin Li if (size_written == 0) {
166*1a96fba6SXin Li Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
167*1a96fba6SXin Li errors::stream::kPartialData,
168*1a96fba6SXin Li "Failed to write all the data");
169*1a96fba6SXin Li return false;
170*1a96fba6SXin Li }
171*1a96fba6SXin Li size_to_write -= size_written;
172*1a96fba6SXin Li buffer = AdvancePointer(buffer, size_written);
173*1a96fba6SXin Li }
174*1a96fba6SXin Li return true;
175*1a96fba6SXin Li }
176*1a96fba6SXin Li
FlushAsync(const base::Closure & success_callback,const ErrorCallback & error_callback,ErrorPtr *)177*1a96fba6SXin Li bool Stream::FlushAsync(const base::Closure& success_callback,
178*1a96fba6SXin Li const ErrorCallback& error_callback,
179*1a96fba6SXin Li ErrorPtr* /* error */) {
180*1a96fba6SXin Li auto callback = base::Bind(&Stream::FlushAsyncCallback,
181*1a96fba6SXin Li weak_ptr_factory_.GetWeakPtr(),
182*1a96fba6SXin Li success_callback, error_callback);
183*1a96fba6SXin Li MessageLoop::current()->PostTask(FROM_HERE, callback);
184*1a96fba6SXin Li return true;
185*1a96fba6SXin Li }
186*1a96fba6SXin Li
IgnoreEOSCallback(const base::Callback<void (size_t)> & success_callback,size_t bytes,bool)187*1a96fba6SXin Li void Stream::IgnoreEOSCallback(
188*1a96fba6SXin Li const base::Callback<void(size_t)>& success_callback,
189*1a96fba6SXin Li size_t bytes,
190*1a96fba6SXin Li bool /* eos */) {
191*1a96fba6SXin Li success_callback.Run(bytes);
192*1a96fba6SXin Li }
193*1a96fba6SXin Li
ReadAsyncImpl(void * buffer,size_t size_to_read,const base::Callback<void (size_t,bool)> & success_callback,const ErrorCallback & error_callback,ErrorPtr * error,bool force_async_callback)194*1a96fba6SXin Li bool Stream::ReadAsyncImpl(
195*1a96fba6SXin Li void* buffer,
196*1a96fba6SXin Li size_t size_to_read,
197*1a96fba6SXin Li const base::Callback<void(size_t, bool)>& success_callback,
198*1a96fba6SXin Li const ErrorCallback& error_callback,
199*1a96fba6SXin Li ErrorPtr* error,
200*1a96fba6SXin Li bool force_async_callback) {
201*1a96fba6SXin Li CHECK(!is_async_read_pending_);
202*1a96fba6SXin Li // We set this value to true early in the function so calling others will
203*1a96fba6SXin Li // prevent us from calling WaitForData() to make calls to
204*1a96fba6SXin Li // ReadAsync() fail while we run WaitForData().
205*1a96fba6SXin Li is_async_read_pending_ = true;
206*1a96fba6SXin Li
207*1a96fba6SXin Li size_t read = 0;
208*1a96fba6SXin Li bool eos = false;
209*1a96fba6SXin Li if (!ReadNonBlocking(buffer, size_to_read, &read, &eos, error))
210*1a96fba6SXin Li return false;
211*1a96fba6SXin Li
212*1a96fba6SXin Li if (read > 0 || eos) {
213*1a96fba6SXin Li if (force_async_callback) {
214*1a96fba6SXin Li MessageLoop::current()->PostTask(
215*1a96fba6SXin Li FROM_HERE,
216*1a96fba6SXin Li base::BindOnce(&Stream::OnReadAsyncDone,
217*1a96fba6SXin Li weak_ptr_factory_.GetWeakPtr(),
218*1a96fba6SXin Li success_callback, read, eos));
219*1a96fba6SXin Li } else {
220*1a96fba6SXin Li is_async_read_pending_ = false;
221*1a96fba6SXin Li success_callback.Run(read, eos);
222*1a96fba6SXin Li }
223*1a96fba6SXin Li return true;
224*1a96fba6SXin Li }
225*1a96fba6SXin Li
226*1a96fba6SXin Li is_async_read_pending_ = WaitForData(
227*1a96fba6SXin Li AccessMode::READ,
228*1a96fba6SXin Li base::Bind(&Stream::OnReadAvailable, weak_ptr_factory_.GetWeakPtr(),
229*1a96fba6SXin Li buffer, size_to_read, success_callback, error_callback),
230*1a96fba6SXin Li error);
231*1a96fba6SXin Li return is_async_read_pending_;
232*1a96fba6SXin Li }
233*1a96fba6SXin Li
OnReadAsyncDone(const base::Callback<void (size_t,bool)> & success_callback,size_t bytes_read,bool eos)234*1a96fba6SXin Li void Stream::OnReadAsyncDone(
235*1a96fba6SXin Li const base::Callback<void(size_t, bool)>& success_callback,
236*1a96fba6SXin Li size_t bytes_read,
237*1a96fba6SXin Li bool eos) {
238*1a96fba6SXin Li is_async_read_pending_ = false;
239*1a96fba6SXin Li success_callback.Run(bytes_read, eos);
240*1a96fba6SXin Li }
241*1a96fba6SXin Li
OnReadAvailable(void * buffer,size_t size_to_read,const base::Callback<void (size_t,bool)> & success_callback,const ErrorCallback & error_callback,AccessMode mode)242*1a96fba6SXin Li void Stream::OnReadAvailable(
243*1a96fba6SXin Li void* buffer,
244*1a96fba6SXin Li size_t size_to_read,
245*1a96fba6SXin Li const base::Callback<void(size_t, bool)>& success_callback,
246*1a96fba6SXin Li const ErrorCallback& error_callback,
247*1a96fba6SXin Li AccessMode mode) {
248*1a96fba6SXin Li CHECK(stream_utils::IsReadAccessMode(mode));
249*1a96fba6SXin Li CHECK(is_async_read_pending_);
250*1a96fba6SXin Li is_async_read_pending_ = false;
251*1a96fba6SXin Li ErrorPtr error;
252*1a96fba6SXin Li // Just reschedule the read operation but don't need to run the callback from
253*1a96fba6SXin Li // the main loop since we are already running on a callback.
254*1a96fba6SXin Li if (!ReadAsyncImpl(buffer, size_to_read, success_callback, error_callback,
255*1a96fba6SXin Li &error, false)) {
256*1a96fba6SXin Li error_callback.Run(error.get());
257*1a96fba6SXin Li }
258*1a96fba6SXin Li }
259*1a96fba6SXin Li
WriteAsyncImpl(const void * buffer,size_t size_to_write,const base::Callback<void (size_t)> & success_callback,const ErrorCallback & error_callback,ErrorPtr * error,bool force_async_callback)260*1a96fba6SXin Li bool Stream::WriteAsyncImpl(
261*1a96fba6SXin Li const void* buffer,
262*1a96fba6SXin Li size_t size_to_write,
263*1a96fba6SXin Li const base::Callback<void(size_t)>& success_callback,
264*1a96fba6SXin Li const ErrorCallback& error_callback,
265*1a96fba6SXin Li ErrorPtr* error,
266*1a96fba6SXin Li bool force_async_callback) {
267*1a96fba6SXin Li CHECK(!is_async_write_pending_);
268*1a96fba6SXin Li // We set this value to true early in the function so calling others will
269*1a96fba6SXin Li // prevent us from calling WaitForData() to make calls to
270*1a96fba6SXin Li // ReadAsync() fail while we run WaitForData().
271*1a96fba6SXin Li is_async_write_pending_ = true;
272*1a96fba6SXin Li
273*1a96fba6SXin Li size_t written = 0;
274*1a96fba6SXin Li if (!WriteNonBlocking(buffer, size_to_write, &written, error))
275*1a96fba6SXin Li return false;
276*1a96fba6SXin Li
277*1a96fba6SXin Li if (written > 0) {
278*1a96fba6SXin Li if (force_async_callback) {
279*1a96fba6SXin Li MessageLoop::current()->PostTask(
280*1a96fba6SXin Li FROM_HERE,
281*1a96fba6SXin Li base::BindOnce(&Stream::OnWriteAsyncDone,
282*1a96fba6SXin Li weak_ptr_factory_.GetWeakPtr(),
283*1a96fba6SXin Li success_callback, written));
284*1a96fba6SXin Li } else {
285*1a96fba6SXin Li is_async_write_pending_ = false;
286*1a96fba6SXin Li success_callback.Run(written);
287*1a96fba6SXin Li }
288*1a96fba6SXin Li return true;
289*1a96fba6SXin Li }
290*1a96fba6SXin Li is_async_write_pending_ = WaitForData(
291*1a96fba6SXin Li AccessMode::WRITE,
292*1a96fba6SXin Li base::Bind(&Stream::OnWriteAvailable, weak_ptr_factory_.GetWeakPtr(),
293*1a96fba6SXin Li buffer, size_to_write, success_callback, error_callback),
294*1a96fba6SXin Li error);
295*1a96fba6SXin Li return is_async_write_pending_;
296*1a96fba6SXin Li }
297*1a96fba6SXin Li
OnWriteAsyncDone(const base::Callback<void (size_t)> & success_callback,size_t size_written)298*1a96fba6SXin Li void Stream::OnWriteAsyncDone(
299*1a96fba6SXin Li const base::Callback<void(size_t)>& success_callback,
300*1a96fba6SXin Li size_t size_written) {
301*1a96fba6SXin Li is_async_write_pending_ = false;
302*1a96fba6SXin Li success_callback.Run(size_written);
303*1a96fba6SXin Li }
304*1a96fba6SXin Li
OnWriteAvailable(const void * buffer,size_t size,const base::Callback<void (size_t)> & success_callback,const ErrorCallback & error_callback,AccessMode mode)305*1a96fba6SXin Li void Stream::OnWriteAvailable(
306*1a96fba6SXin Li const void* buffer,
307*1a96fba6SXin Li size_t size,
308*1a96fba6SXin Li const base::Callback<void(size_t)>& success_callback,
309*1a96fba6SXin Li const ErrorCallback& error_callback,
310*1a96fba6SXin Li AccessMode mode) {
311*1a96fba6SXin Li CHECK(stream_utils::IsWriteAccessMode(mode));
312*1a96fba6SXin Li CHECK(is_async_write_pending_);
313*1a96fba6SXin Li is_async_write_pending_ = false;
314*1a96fba6SXin Li ErrorPtr error;
315*1a96fba6SXin Li // Just reschedule the read operation but don't need to run the callback from
316*1a96fba6SXin Li // the main loop since we are already running on a callback.
317*1a96fba6SXin Li if (!WriteAsyncImpl(buffer, size, success_callback, error_callback, &error,
318*1a96fba6SXin Li false)) {
319*1a96fba6SXin Li error_callback.Run(error.get());
320*1a96fba6SXin Li }
321*1a96fba6SXin Li }
322*1a96fba6SXin Li
ReadAllAsyncCallback(void * buffer,size_t size_to_read,const base::Closure & success_callback,const ErrorCallback & error_callback,size_t size_read,bool eos)323*1a96fba6SXin Li void Stream::ReadAllAsyncCallback(void* buffer,
324*1a96fba6SXin Li size_t size_to_read,
325*1a96fba6SXin Li const base::Closure& success_callback,
326*1a96fba6SXin Li const ErrorCallback& error_callback,
327*1a96fba6SXin Li size_t size_read,
328*1a96fba6SXin Li bool eos) {
329*1a96fba6SXin Li ErrorPtr error;
330*1a96fba6SXin Li size_to_read -= size_read;
331*1a96fba6SXin Li if (size_to_read != 0 && eos) {
332*1a96fba6SXin Li stream_utils::ErrorReadPastEndOfStream(FROM_HERE, &error);
333*1a96fba6SXin Li error_callback.Run(error.get());
334*1a96fba6SXin Li return;
335*1a96fba6SXin Li }
336*1a96fba6SXin Li
337*1a96fba6SXin Li if (size_to_read) {
338*1a96fba6SXin Li buffer = AdvancePointer(buffer, size_read);
339*1a96fba6SXin Li auto callback = base::Bind(&Stream::ReadAllAsyncCallback,
340*1a96fba6SXin Li weak_ptr_factory_.GetWeakPtr(), buffer,
341*1a96fba6SXin Li size_to_read, success_callback, error_callback);
342*1a96fba6SXin Li if (!ReadAsyncImpl(buffer, size_to_read, callback, error_callback, &error,
343*1a96fba6SXin Li false)) {
344*1a96fba6SXin Li error_callback.Run(error.get());
345*1a96fba6SXin Li }
346*1a96fba6SXin Li } else {
347*1a96fba6SXin Li success_callback.Run();
348*1a96fba6SXin Li }
349*1a96fba6SXin Li }
350*1a96fba6SXin Li
WriteAllAsyncCallback(const void * buffer,size_t size_to_write,const base::Closure & success_callback,const ErrorCallback & error_callback,size_t size_written)351*1a96fba6SXin Li void Stream::WriteAllAsyncCallback(const void* buffer,
352*1a96fba6SXin Li size_t size_to_write,
353*1a96fba6SXin Li const base::Closure& success_callback,
354*1a96fba6SXin Li const ErrorCallback& error_callback,
355*1a96fba6SXin Li size_t size_written) {
356*1a96fba6SXin Li ErrorPtr error;
357*1a96fba6SXin Li if (size_to_write != 0 && size_written == 0) {
358*1a96fba6SXin Li Error::AddTo(&error, FROM_HERE, errors::stream::kDomain,
359*1a96fba6SXin Li errors::stream::kPartialData, "Failed to write all the data");
360*1a96fba6SXin Li error_callback.Run(error.get());
361*1a96fba6SXin Li return;
362*1a96fba6SXin Li }
363*1a96fba6SXin Li size_to_write -= size_written;
364*1a96fba6SXin Li if (size_to_write) {
365*1a96fba6SXin Li buffer = AdvancePointer(buffer, size_written);
366*1a96fba6SXin Li auto callback = base::Bind(&Stream::WriteAllAsyncCallback,
367*1a96fba6SXin Li weak_ptr_factory_.GetWeakPtr(), buffer,
368*1a96fba6SXin Li size_to_write, success_callback, error_callback);
369*1a96fba6SXin Li if (!WriteAsyncImpl(buffer, size_to_write, callback, error_callback, &error,
370*1a96fba6SXin Li false)) {
371*1a96fba6SXin Li error_callback.Run(error.get());
372*1a96fba6SXin Li }
373*1a96fba6SXin Li } else {
374*1a96fba6SXin Li success_callback.Run();
375*1a96fba6SXin Li }
376*1a96fba6SXin Li }
377*1a96fba6SXin Li
FlushAsyncCallback(const base::Closure & success_callback,const ErrorCallback & error_callback)378*1a96fba6SXin Li void Stream::FlushAsyncCallback(const base::Closure& success_callback,
379*1a96fba6SXin Li const ErrorCallback& error_callback) {
380*1a96fba6SXin Li ErrorPtr error;
381*1a96fba6SXin Li if (FlushBlocking(&error)) {
382*1a96fba6SXin Li success_callback.Run();
383*1a96fba6SXin Li } else {
384*1a96fba6SXin Li error_callback.Run(error.get());
385*1a96fba6SXin Li }
386*1a96fba6SXin Li }
387*1a96fba6SXin Li
CancelPendingAsyncOperations()388*1a96fba6SXin Li void Stream::CancelPendingAsyncOperations() {
389*1a96fba6SXin Li weak_ptr_factory_.InvalidateWeakPtrs();
390*1a96fba6SXin Li is_async_read_pending_ = false;
391*1a96fba6SXin Li is_async_write_pending_ = false;
392*1a96fba6SXin Li }
393*1a96fba6SXin Li
394*1a96fba6SXin Li } // namespace brillo
395