xref: /aosp_15_r20/external/libbrillo/brillo/streams/fake_stream.cc (revision 1a96fba65179ea7d3f56207137718607415c5953)
1*1a96fba6SXin Li // Copyright 2015 The Chromium OS Authors. All rights reserved.
2*1a96fba6SXin Li // Use of this source code is governed by a BSD-style license that can be
3*1a96fba6SXin Li // found in the LICENSE file.
4*1a96fba6SXin Li 
5*1a96fba6SXin Li #include <brillo/streams/fake_stream.h>
6*1a96fba6SXin Li 
7*1a96fba6SXin Li #include <algorithm>
8*1a96fba6SXin Li #include <utility>
9*1a96fba6SXin Li 
10*1a96fba6SXin Li #include <base/bind.h>
11*1a96fba6SXin Li #include <brillo/message_loops/message_loop.h>
12*1a96fba6SXin Li #include <brillo/streams/stream_utils.h>
13*1a96fba6SXin Li 
14*1a96fba6SXin Li namespace brillo {
15*1a96fba6SXin Li 
16*1a96fba6SXin Li namespace {
17*1a96fba6SXin Li 
18*1a96fba6SXin Li // Gets a delta between the two times, makes sure that the delta is positive.
CalculateDelay(const base::Time & now,const base::Time & delay_until)19*1a96fba6SXin Li base::TimeDelta CalculateDelay(const base::Time& now,
20*1a96fba6SXin Li                                const base::Time& delay_until) {
21*1a96fba6SXin Li   const base::TimeDelta zero_delay;
22*1a96fba6SXin Li   if (delay_until.is_null() || now >= delay_until) {
23*1a96fba6SXin Li     return zero_delay;
24*1a96fba6SXin Li   }
25*1a96fba6SXin Li 
26*1a96fba6SXin Li   base::TimeDelta delay = delay_until - now;
27*1a96fba6SXin Li   if (delay < zero_delay)
28*1a96fba6SXin Li     delay = zero_delay;
29*1a96fba6SXin Li   return delay;
30*1a96fba6SXin Li }
31*1a96fba6SXin Li 
32*1a96fba6SXin Li // Given the current clock time, and expected delays for read and write
33*1a96fba6SXin Li // operations calculates the smaller wait delay of the two and sets the
34*1a96fba6SXin Li // resulting operation to |*mode| and the delay to wait for into |*delay|.
GetMinDelayAndMode(const base::Time & now,bool read,const base::Time & delay_read_until,bool write,const base::Time & delay_write_until,Stream::AccessMode * mode,base::TimeDelta * delay)35*1a96fba6SXin Li void GetMinDelayAndMode(const base::Time& now,
36*1a96fba6SXin Li                         bool read, const base::Time& delay_read_until,
37*1a96fba6SXin Li                         bool write, const base::Time& delay_write_until,
38*1a96fba6SXin Li                         Stream::AccessMode* mode, base::TimeDelta* delay) {
39*1a96fba6SXin Li   base::TimeDelta read_delay = base::TimeDelta::Max();
40*1a96fba6SXin Li   base::TimeDelta write_delay = base::TimeDelta::Max();
41*1a96fba6SXin Li 
42*1a96fba6SXin Li   if (read)
43*1a96fba6SXin Li     read_delay = CalculateDelay(now, delay_read_until);
44*1a96fba6SXin Li   if (write)
45*1a96fba6SXin Li     write_delay = CalculateDelay(now, delay_write_until);
46*1a96fba6SXin Li 
47*1a96fba6SXin Li   if (read_delay > write_delay) {
48*1a96fba6SXin Li     read = false;
49*1a96fba6SXin Li   } else if (read_delay < write_delay) {
50*1a96fba6SXin Li     write = false;
51*1a96fba6SXin Li   }
52*1a96fba6SXin Li   *mode = stream_utils::MakeAccessMode(read, write);
53*1a96fba6SXin Li   *delay = std::min(read_delay, write_delay);
54*1a96fba6SXin Li }
55*1a96fba6SXin Li 
56*1a96fba6SXin Li }  // anonymous namespace
57*1a96fba6SXin Li 
FakeStream(Stream::AccessMode mode,base::Clock * clock)58*1a96fba6SXin Li FakeStream::FakeStream(Stream::AccessMode mode,
59*1a96fba6SXin Li                        base::Clock* clock)
60*1a96fba6SXin Li     : mode_{mode}, clock_{clock} {}
61*1a96fba6SXin Li 
AddReadPacketData(base::TimeDelta delay,const void * data,size_t size)62*1a96fba6SXin Li void FakeStream::AddReadPacketData(base::TimeDelta delay,
63*1a96fba6SXin Li                                    const void* data,
64*1a96fba6SXin Li                                    size_t size) {
65*1a96fba6SXin Li   auto* byte_ptr = static_cast<const uint8_t*>(data);
66*1a96fba6SXin Li   AddReadPacketData(delay, brillo::Blob{byte_ptr, byte_ptr + size});
67*1a96fba6SXin Li }
68*1a96fba6SXin Li 
AddReadPacketData(base::TimeDelta delay,brillo::Blob data)69*1a96fba6SXin Li void FakeStream::AddReadPacketData(base::TimeDelta delay, brillo::Blob data) {
70*1a96fba6SXin Li   InputDataPacket packet;
71*1a96fba6SXin Li   packet.data = std::move(data);
72*1a96fba6SXin Li   packet.delay_before = delay;
73*1a96fba6SXin Li   incoming_queue_.push(std::move(packet));
74*1a96fba6SXin Li }
75*1a96fba6SXin Li 
AddReadPacketString(base::TimeDelta delay,const std::string & data)76*1a96fba6SXin Li void FakeStream::AddReadPacketString(base::TimeDelta delay,
77*1a96fba6SXin Li                                      const std::string& data) {
78*1a96fba6SXin Li   AddReadPacketData(delay, brillo::Blob{data.begin(), data.end()});
79*1a96fba6SXin Li }
80*1a96fba6SXin Li 
QueueReadError(base::TimeDelta delay)81*1a96fba6SXin Li void FakeStream::QueueReadError(base::TimeDelta delay) {
82*1a96fba6SXin Li   QueueReadErrorWithMessage(delay, std::string{});
83*1a96fba6SXin Li }
84*1a96fba6SXin Li 
QueueReadErrorWithMessage(base::TimeDelta delay,const std::string & message)85*1a96fba6SXin Li void FakeStream::QueueReadErrorWithMessage(base::TimeDelta delay,
86*1a96fba6SXin Li                                            const std::string& message) {
87*1a96fba6SXin Li   InputDataPacket packet;
88*1a96fba6SXin Li   packet.data.assign(message.begin(), message.end());
89*1a96fba6SXin Li   packet.delay_before = delay;
90*1a96fba6SXin Li   packet.read_error = true;
91*1a96fba6SXin Li   incoming_queue_.push(std::move(packet));
92*1a96fba6SXin Li }
93*1a96fba6SXin Li 
ClearReadQueue()94*1a96fba6SXin Li void FakeStream::ClearReadQueue() {
95*1a96fba6SXin Li   std::queue<InputDataPacket>().swap(incoming_queue_);
96*1a96fba6SXin Li   delay_input_until_ = base::Time{};
97*1a96fba6SXin Li   input_buffer_.clear();
98*1a96fba6SXin Li   input_ptr_ = 0;
99*1a96fba6SXin Li   report_read_error_ = 0;
100*1a96fba6SXin Li }
101*1a96fba6SXin Li 
ExpectWritePacketSize(base::TimeDelta delay,size_t data_size)102*1a96fba6SXin Li void FakeStream::ExpectWritePacketSize(base::TimeDelta delay,
103*1a96fba6SXin Li                                        size_t data_size) {
104*1a96fba6SXin Li   OutputDataPacket packet;
105*1a96fba6SXin Li   packet.expected_size = data_size;
106*1a96fba6SXin Li   packet.delay_before = delay;
107*1a96fba6SXin Li   outgoing_queue_.push(std::move(packet));
108*1a96fba6SXin Li }
109*1a96fba6SXin Li 
ExpectWritePacketData(base::TimeDelta delay,const void * data,size_t size)110*1a96fba6SXin Li void FakeStream::ExpectWritePacketData(base::TimeDelta delay,
111*1a96fba6SXin Li                                        const void* data,
112*1a96fba6SXin Li                                        size_t size) {
113*1a96fba6SXin Li   auto* byte_ptr = static_cast<const uint8_t*>(data);
114*1a96fba6SXin Li   ExpectWritePacketData(delay, brillo::Blob{byte_ptr, byte_ptr + size});
115*1a96fba6SXin Li }
116*1a96fba6SXin Li 
ExpectWritePacketData(base::TimeDelta delay,brillo::Blob data)117*1a96fba6SXin Li void FakeStream::ExpectWritePacketData(base::TimeDelta delay,
118*1a96fba6SXin Li                                        brillo::Blob data) {
119*1a96fba6SXin Li   OutputDataPacket packet;
120*1a96fba6SXin Li   packet.expected_size = data.size();
121*1a96fba6SXin Li   packet.data = std::move(data);
122*1a96fba6SXin Li   packet.delay_before = delay;
123*1a96fba6SXin Li   outgoing_queue_.push(std::move(packet));
124*1a96fba6SXin Li }
125*1a96fba6SXin Li 
ExpectWritePacketString(base::TimeDelta delay,const std::string & data)126*1a96fba6SXin Li void FakeStream::ExpectWritePacketString(base::TimeDelta delay,
127*1a96fba6SXin Li                                          const std::string& data) {
128*1a96fba6SXin Li   ExpectWritePacketData(delay, brillo::Blob{data.begin(), data.end()});
129*1a96fba6SXin Li }
130*1a96fba6SXin Li 
QueueWriteError(base::TimeDelta delay)131*1a96fba6SXin Li void FakeStream::QueueWriteError(base::TimeDelta delay) {
132*1a96fba6SXin Li   QueueWriteErrorWithMessage(delay, std::string{});
133*1a96fba6SXin Li }
134*1a96fba6SXin Li 
QueueWriteErrorWithMessage(base::TimeDelta delay,const std::string & message)135*1a96fba6SXin Li void FakeStream::QueueWriteErrorWithMessage(base::TimeDelta delay,
136*1a96fba6SXin Li                                             const std::string& message) {
137*1a96fba6SXin Li   OutputDataPacket packet;
138*1a96fba6SXin Li   packet.expected_size = 0;
139*1a96fba6SXin Li   packet.data.assign(message.begin(), message.end());
140*1a96fba6SXin Li   packet.delay_before = delay;
141*1a96fba6SXin Li   packet.write_error = true;
142*1a96fba6SXin Li   outgoing_queue_.push(std::move(packet));
143*1a96fba6SXin Li }
144*1a96fba6SXin Li 
ClearWriteQueue()145*1a96fba6SXin Li void FakeStream::ClearWriteQueue() {
146*1a96fba6SXin Li   std::queue<OutputDataPacket>().swap(outgoing_queue_);
147*1a96fba6SXin Li   delay_output_until_ = base::Time{};
148*1a96fba6SXin Li   output_buffer_.clear();
149*1a96fba6SXin Li   expected_output_data_.clear();
150*1a96fba6SXin Li   max_output_buffer_size_ = 0;
151*1a96fba6SXin Li   all_output_data_.clear();
152*1a96fba6SXin Li   report_write_error_ = 0;
153*1a96fba6SXin Li }
154*1a96fba6SXin Li 
GetFlushedOutputData() const155*1a96fba6SXin Li const brillo::Blob& FakeStream::GetFlushedOutputData() const {
156*1a96fba6SXin Li   return all_output_data_;
157*1a96fba6SXin Li }
158*1a96fba6SXin Li 
GetFlushedOutputDataAsString() const159*1a96fba6SXin Li std::string FakeStream::GetFlushedOutputDataAsString() const {
160*1a96fba6SXin Li   return std::string{all_output_data_.begin(), all_output_data_.end()};
161*1a96fba6SXin Li }
162*1a96fba6SXin Li 
CanRead() const163*1a96fba6SXin Li bool FakeStream::CanRead() const {
164*1a96fba6SXin Li   return stream_utils::IsReadAccessMode(mode_);
165*1a96fba6SXin Li }
166*1a96fba6SXin Li 
CanWrite() const167*1a96fba6SXin Li bool FakeStream::CanWrite() const {
168*1a96fba6SXin Li   return stream_utils::IsWriteAccessMode(mode_);
169*1a96fba6SXin Li }
170*1a96fba6SXin Li 
SetSizeBlocking(uint64_t,ErrorPtr * error)171*1a96fba6SXin Li bool FakeStream::SetSizeBlocking(uint64_t /* size */, ErrorPtr* error) {
172*1a96fba6SXin Li   return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
173*1a96fba6SXin Li }
174*1a96fba6SXin Li 
Seek(int64_t,Whence,uint64_t *,ErrorPtr * error)175*1a96fba6SXin Li bool FakeStream::Seek(int64_t /* offset */,
176*1a96fba6SXin Li                       Whence /* whence */,
177*1a96fba6SXin Li                       uint64_t* /* new_position */,
178*1a96fba6SXin Li                       ErrorPtr* error) {
179*1a96fba6SXin Li   return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
180*1a96fba6SXin Li }
181*1a96fba6SXin Li 
IsReadBufferEmpty() const182*1a96fba6SXin Li bool FakeStream::IsReadBufferEmpty() const {
183*1a96fba6SXin Li   return input_ptr_ >= input_buffer_.size();
184*1a96fba6SXin Li }
185*1a96fba6SXin Li 
PopReadPacket()186*1a96fba6SXin Li bool FakeStream::PopReadPacket() {
187*1a96fba6SXin Li   if (incoming_queue_.empty())
188*1a96fba6SXin Li     return false;
189*1a96fba6SXin Li   InputDataPacket& packet = incoming_queue_.front();
190*1a96fba6SXin Li   input_ptr_ = 0;
191*1a96fba6SXin Li   input_buffer_ = std::move(packet.data);
192*1a96fba6SXin Li   delay_input_until_ = clock_->Now() + packet.delay_before;
193*1a96fba6SXin Li   incoming_queue_.pop();
194*1a96fba6SXin Li   report_read_error_ = packet.read_error;
195*1a96fba6SXin Li   return true;
196*1a96fba6SXin Li }
197*1a96fba6SXin Li 
ReadNonBlocking(void * buffer,size_t size_to_read,size_t * size_read,bool * end_of_stream,ErrorPtr * error)198*1a96fba6SXin Li bool FakeStream::ReadNonBlocking(void* buffer,
199*1a96fba6SXin Li                                  size_t size_to_read,
200*1a96fba6SXin Li                                  size_t* size_read,
201*1a96fba6SXin Li                                  bool* end_of_stream,
202*1a96fba6SXin Li                                  ErrorPtr* error) {
203*1a96fba6SXin Li   if (!CanRead())
204*1a96fba6SXin Li     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
205*1a96fba6SXin Li 
206*1a96fba6SXin Li   if (!IsOpen())
207*1a96fba6SXin Li     return stream_utils::ErrorStreamClosed(FROM_HERE, error);
208*1a96fba6SXin Li 
209*1a96fba6SXin Li   for (;;) {
210*1a96fba6SXin Li     if (!delay_input_until_.is_null() && clock_->Now() < delay_input_until_) {
211*1a96fba6SXin Li       *size_read = 0;
212*1a96fba6SXin Li       if (end_of_stream)
213*1a96fba6SXin Li         *end_of_stream = false;
214*1a96fba6SXin Li       break;
215*1a96fba6SXin Li     }
216*1a96fba6SXin Li 
217*1a96fba6SXin Li     if (report_read_error_) {
218*1a96fba6SXin Li       report_read_error_ = false;
219*1a96fba6SXin Li       std::string message{input_buffer_.begin(), input_buffer_.end()};
220*1a96fba6SXin Li       if (message.empty())
221*1a96fba6SXin Li         message = "Simulating read error for tests";
222*1a96fba6SXin Li       input_buffer_.clear();
223*1a96fba6SXin Li       Error::AddTo(error, FROM_HERE, "fake_stream", "read_error", message);
224*1a96fba6SXin Li       return false;
225*1a96fba6SXin Li     }
226*1a96fba6SXin Li 
227*1a96fba6SXin Li     if (!IsReadBufferEmpty()) {
228*1a96fba6SXin Li       size_to_read = std::min(size_to_read, input_buffer_.size() - input_ptr_);
229*1a96fba6SXin Li       std::memcpy(buffer, input_buffer_.data() + input_ptr_, size_to_read);
230*1a96fba6SXin Li       input_ptr_ += size_to_read;
231*1a96fba6SXin Li       *size_read = size_to_read;
232*1a96fba6SXin Li       if (end_of_stream)
233*1a96fba6SXin Li         *end_of_stream = false;
234*1a96fba6SXin Li       break;
235*1a96fba6SXin Li     }
236*1a96fba6SXin Li 
237*1a96fba6SXin Li     if (!PopReadPacket()) {
238*1a96fba6SXin Li       *size_read = 0;
239*1a96fba6SXin Li       if (end_of_stream)
240*1a96fba6SXin Li         *end_of_stream = true;
241*1a96fba6SXin Li       break;
242*1a96fba6SXin Li     }
243*1a96fba6SXin Li   }
244*1a96fba6SXin Li   return true;
245*1a96fba6SXin Li }
246*1a96fba6SXin Li 
IsWriteBufferFull() const247*1a96fba6SXin Li bool FakeStream::IsWriteBufferFull() const {
248*1a96fba6SXin Li   return output_buffer_.size() >= max_output_buffer_size_;
249*1a96fba6SXin Li }
250*1a96fba6SXin Li 
PopWritePacket()251*1a96fba6SXin Li bool FakeStream::PopWritePacket() {
252*1a96fba6SXin Li   if (outgoing_queue_.empty())
253*1a96fba6SXin Li     return false;
254*1a96fba6SXin Li   OutputDataPacket& packet = outgoing_queue_.front();
255*1a96fba6SXin Li   expected_output_data_ = std::move(packet.data);
256*1a96fba6SXin Li   delay_output_until_ = clock_->Now() + packet.delay_before;
257*1a96fba6SXin Li   max_output_buffer_size_ = packet.expected_size;
258*1a96fba6SXin Li   report_write_error_ = packet.write_error;
259*1a96fba6SXin Li   outgoing_queue_.pop();
260*1a96fba6SXin Li   return true;
261*1a96fba6SXin Li }
262*1a96fba6SXin Li 
WriteNonBlocking(const void * buffer,size_t size_to_write,size_t * size_written,ErrorPtr * error)263*1a96fba6SXin Li bool FakeStream::WriteNonBlocking(const void* buffer,
264*1a96fba6SXin Li                                   size_t size_to_write,
265*1a96fba6SXin Li                                   size_t* size_written,
266*1a96fba6SXin Li                                   ErrorPtr* error) {
267*1a96fba6SXin Li   if (!CanWrite())
268*1a96fba6SXin Li     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
269*1a96fba6SXin Li 
270*1a96fba6SXin Li   if (!IsOpen())
271*1a96fba6SXin Li     return stream_utils::ErrorStreamClosed(FROM_HERE, error);
272*1a96fba6SXin Li 
273*1a96fba6SXin Li   for (;;) {
274*1a96fba6SXin Li     if (!delay_output_until_.is_null() && clock_->Now() < delay_output_until_) {
275*1a96fba6SXin Li       *size_written = 0;
276*1a96fba6SXin Li       return true;
277*1a96fba6SXin Li     }
278*1a96fba6SXin Li 
279*1a96fba6SXin Li     if (report_write_error_) {
280*1a96fba6SXin Li       report_write_error_ = false;
281*1a96fba6SXin Li       std::string message{expected_output_data_.begin(),
282*1a96fba6SXin Li                           expected_output_data_.end()};
283*1a96fba6SXin Li       if (message.empty())
284*1a96fba6SXin Li         message = "Simulating write error for tests";
285*1a96fba6SXin Li       output_buffer_.clear();
286*1a96fba6SXin Li       max_output_buffer_size_ = 0;
287*1a96fba6SXin Li       expected_output_data_.clear();
288*1a96fba6SXin Li       Error::AddTo(error, FROM_HERE, "fake_stream", "write_error", message);
289*1a96fba6SXin Li       return false;
290*1a96fba6SXin Li     }
291*1a96fba6SXin Li 
292*1a96fba6SXin Li     if (!IsWriteBufferFull()) {
293*1a96fba6SXin Li       bool success = true;
294*1a96fba6SXin Li       size_to_write = std::min(size_to_write,
295*1a96fba6SXin Li                                max_output_buffer_size_ - output_buffer_.size());
296*1a96fba6SXin Li       auto byte_ptr = static_cast<const uint8_t*>(buffer);
297*1a96fba6SXin Li       output_buffer_.insert(output_buffer_.end(),
298*1a96fba6SXin Li                             byte_ptr, byte_ptr + size_to_write);
299*1a96fba6SXin Li       if (output_buffer_.size()  == max_output_buffer_size_) {
300*1a96fba6SXin Li         if (!expected_output_data_.empty() &&
301*1a96fba6SXin Li             expected_output_data_ != output_buffer_) {
302*1a96fba6SXin Li           // We expected different data to be written, report an error.
303*1a96fba6SXin Li           Error::AddTo(error, FROM_HERE, "fake_stream", "data_mismatch",
304*1a96fba6SXin Li                        "Unexpected data written");
305*1a96fba6SXin Li           success = false;
306*1a96fba6SXin Li         }
307*1a96fba6SXin Li 
308*1a96fba6SXin Li         all_output_data_.insert(all_output_data_.end(),
309*1a96fba6SXin Li                                 output_buffer_.begin(), output_buffer_.end());
310*1a96fba6SXin Li 
311*1a96fba6SXin Li         output_buffer_.clear();
312*1a96fba6SXin Li         max_output_buffer_size_ = 0;
313*1a96fba6SXin Li         expected_output_data_.clear();
314*1a96fba6SXin Li       }
315*1a96fba6SXin Li       *size_written = size_to_write;
316*1a96fba6SXin Li       return success;
317*1a96fba6SXin Li     }
318*1a96fba6SXin Li 
319*1a96fba6SXin Li     if (!PopWritePacket()) {
320*1a96fba6SXin Li       // No more data expected.
321*1a96fba6SXin Li       Error::AddTo(error, FROM_HERE, "fake_stream", "full",
322*1a96fba6SXin Li                    "No more output data expected");
323*1a96fba6SXin Li       return false;
324*1a96fba6SXin Li     }
325*1a96fba6SXin Li   }
326*1a96fba6SXin Li }
327*1a96fba6SXin Li 
FlushBlocking(ErrorPtr * error)328*1a96fba6SXin Li bool FakeStream::FlushBlocking(ErrorPtr* error) {
329*1a96fba6SXin Li   if (!CanWrite())
330*1a96fba6SXin Li     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
331*1a96fba6SXin Li 
332*1a96fba6SXin Li   if (!IsOpen())
333*1a96fba6SXin Li     return stream_utils::ErrorStreamClosed(FROM_HERE, error);
334*1a96fba6SXin Li 
335*1a96fba6SXin Li   bool success = true;
336*1a96fba6SXin Li   if (!output_buffer_.empty()) {
337*1a96fba6SXin Li     if (!expected_output_data_.empty() &&
338*1a96fba6SXin Li         expected_output_data_ != output_buffer_) {
339*1a96fba6SXin Li       // We expected different data to be written, report an error.
340*1a96fba6SXin Li       Error::AddTo(error, FROM_HERE, "fake_stream", "data_mismatch",
341*1a96fba6SXin Li                    "Unexpected data written");
342*1a96fba6SXin Li       success = false;
343*1a96fba6SXin Li     }
344*1a96fba6SXin Li     all_output_data_.insert(all_output_data_.end(),
345*1a96fba6SXin Li                             output_buffer_.begin(), output_buffer_.end());
346*1a96fba6SXin Li 
347*1a96fba6SXin Li     output_buffer_.clear();
348*1a96fba6SXin Li     max_output_buffer_size_ = 0;
349*1a96fba6SXin Li     expected_output_data_.clear();
350*1a96fba6SXin Li   }
351*1a96fba6SXin Li   return success;
352*1a96fba6SXin Li }
353*1a96fba6SXin Li 
CloseBlocking(ErrorPtr *)354*1a96fba6SXin Li bool FakeStream::CloseBlocking(ErrorPtr* /* error */) {
355*1a96fba6SXin Li   is_open_ = false;
356*1a96fba6SXin Li   return true;
357*1a96fba6SXin Li }
358*1a96fba6SXin Li 
WaitForData(AccessMode mode,const base::Callback<void (AccessMode)> & callback,ErrorPtr * error)359*1a96fba6SXin Li bool FakeStream::WaitForData(AccessMode mode,
360*1a96fba6SXin Li                              const base::Callback<void(AccessMode)>& callback,
361*1a96fba6SXin Li                              ErrorPtr* error) {
362*1a96fba6SXin Li   bool read_requested = stream_utils::IsReadAccessMode(mode);
363*1a96fba6SXin Li   bool write_requested = stream_utils::IsWriteAccessMode(mode);
364*1a96fba6SXin Li 
365*1a96fba6SXin Li   if ((read_requested && !CanRead()) || (write_requested && !CanWrite()))
366*1a96fba6SXin Li     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
367*1a96fba6SXin Li 
368*1a96fba6SXin Li   if (read_requested && IsReadBufferEmpty())
369*1a96fba6SXin Li     PopReadPacket();
370*1a96fba6SXin Li   if (write_requested && IsWriteBufferFull())
371*1a96fba6SXin Li     PopWritePacket();
372*1a96fba6SXin Li 
373*1a96fba6SXin Li   base::TimeDelta delay;
374*1a96fba6SXin Li   GetMinDelayAndMode(clock_->Now(), read_requested, delay_input_until_,
375*1a96fba6SXin Li                      write_requested, delay_output_until_, &mode, &delay);
376*1a96fba6SXin Li   MessageLoop::current()->PostDelayedTask(
377*1a96fba6SXin Li       FROM_HERE, base::Bind(callback, mode), delay);
378*1a96fba6SXin Li   return true;
379*1a96fba6SXin Li }
380*1a96fba6SXin Li 
WaitForDataBlocking(AccessMode in_mode,base::TimeDelta timeout,AccessMode * out_mode,ErrorPtr * error)381*1a96fba6SXin Li bool FakeStream::WaitForDataBlocking(AccessMode in_mode,
382*1a96fba6SXin Li                                      base::TimeDelta timeout,
383*1a96fba6SXin Li                                      AccessMode* out_mode,
384*1a96fba6SXin Li                                      ErrorPtr* error) {
385*1a96fba6SXin Li   bool read_requested = stream_utils::IsReadAccessMode(in_mode);
386*1a96fba6SXin Li   bool write_requested = stream_utils::IsWriteAccessMode(in_mode);
387*1a96fba6SXin Li 
388*1a96fba6SXin Li   if ((read_requested && !CanRead()) || (write_requested && !CanWrite()))
389*1a96fba6SXin Li     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
390*1a96fba6SXin Li 
391*1a96fba6SXin Li   base::TimeDelta delay;
392*1a96fba6SXin Li   GetMinDelayAndMode(clock_->Now(), read_requested, delay_input_until_,
393*1a96fba6SXin Li                      write_requested, delay_output_until_, out_mode, &delay);
394*1a96fba6SXin Li 
395*1a96fba6SXin Li   if (timeout < delay)
396*1a96fba6SXin Li     return stream_utils::ErrorOperationTimeout(FROM_HERE, error);
397*1a96fba6SXin Li 
398*1a96fba6SXin Li   LOG(INFO) << "TEST: Would have blocked for " << delay.InMilliseconds()
399*1a96fba6SXin Li             << " ms.";
400*1a96fba6SXin Li 
401*1a96fba6SXin Li   return true;
402*1a96fba6SXin Li }
403*1a96fba6SXin Li 
404*1a96fba6SXin Li }  // namespace brillo
405