1*1a96fba6SXin Li // Copyright 2015 The Chromium OS Authors. All rights reserved.
2*1a96fba6SXin Li // Use of this source code is governed by a BSD-style license that can be
3*1a96fba6SXin Li // found in the LICENSE file.
4*1a96fba6SXin Li
5*1a96fba6SXin Li #include <brillo/streams/fake_stream.h>
6*1a96fba6SXin Li
7*1a96fba6SXin Li #include <algorithm>
8*1a96fba6SXin Li #include <utility>
9*1a96fba6SXin Li
10*1a96fba6SXin Li #include <base/bind.h>
11*1a96fba6SXin Li #include <brillo/message_loops/message_loop.h>
12*1a96fba6SXin Li #include <brillo/streams/stream_utils.h>
13*1a96fba6SXin Li
14*1a96fba6SXin Li namespace brillo {
15*1a96fba6SXin Li
16*1a96fba6SXin Li namespace {
17*1a96fba6SXin Li
18*1a96fba6SXin Li // Gets a delta between the two times, makes sure that the delta is positive.
CalculateDelay(const base::Time & now,const base::Time & delay_until)19*1a96fba6SXin Li base::TimeDelta CalculateDelay(const base::Time& now,
20*1a96fba6SXin Li const base::Time& delay_until) {
21*1a96fba6SXin Li const base::TimeDelta zero_delay;
22*1a96fba6SXin Li if (delay_until.is_null() || now >= delay_until) {
23*1a96fba6SXin Li return zero_delay;
24*1a96fba6SXin Li }
25*1a96fba6SXin Li
26*1a96fba6SXin Li base::TimeDelta delay = delay_until - now;
27*1a96fba6SXin Li if (delay < zero_delay)
28*1a96fba6SXin Li delay = zero_delay;
29*1a96fba6SXin Li return delay;
30*1a96fba6SXin Li }
31*1a96fba6SXin Li
32*1a96fba6SXin Li // Given the current clock time, and expected delays for read and write
33*1a96fba6SXin Li // operations calculates the smaller wait delay of the two and sets the
34*1a96fba6SXin Li // resulting operation to |*mode| and the delay to wait for into |*delay|.
GetMinDelayAndMode(const base::Time & now,bool read,const base::Time & delay_read_until,bool write,const base::Time & delay_write_until,Stream::AccessMode * mode,base::TimeDelta * delay)35*1a96fba6SXin Li void GetMinDelayAndMode(const base::Time& now,
36*1a96fba6SXin Li bool read, const base::Time& delay_read_until,
37*1a96fba6SXin Li bool write, const base::Time& delay_write_until,
38*1a96fba6SXin Li Stream::AccessMode* mode, base::TimeDelta* delay) {
39*1a96fba6SXin Li base::TimeDelta read_delay = base::TimeDelta::Max();
40*1a96fba6SXin Li base::TimeDelta write_delay = base::TimeDelta::Max();
41*1a96fba6SXin Li
42*1a96fba6SXin Li if (read)
43*1a96fba6SXin Li read_delay = CalculateDelay(now, delay_read_until);
44*1a96fba6SXin Li if (write)
45*1a96fba6SXin Li write_delay = CalculateDelay(now, delay_write_until);
46*1a96fba6SXin Li
47*1a96fba6SXin Li if (read_delay > write_delay) {
48*1a96fba6SXin Li read = false;
49*1a96fba6SXin Li } else if (read_delay < write_delay) {
50*1a96fba6SXin Li write = false;
51*1a96fba6SXin Li }
52*1a96fba6SXin Li *mode = stream_utils::MakeAccessMode(read, write);
53*1a96fba6SXin Li *delay = std::min(read_delay, write_delay);
54*1a96fba6SXin Li }
55*1a96fba6SXin Li
56*1a96fba6SXin Li } // anonymous namespace
57*1a96fba6SXin Li
FakeStream(Stream::AccessMode mode,base::Clock * clock)58*1a96fba6SXin Li FakeStream::FakeStream(Stream::AccessMode mode,
59*1a96fba6SXin Li base::Clock* clock)
60*1a96fba6SXin Li : mode_{mode}, clock_{clock} {}
61*1a96fba6SXin Li
AddReadPacketData(base::TimeDelta delay,const void * data,size_t size)62*1a96fba6SXin Li void FakeStream::AddReadPacketData(base::TimeDelta delay,
63*1a96fba6SXin Li const void* data,
64*1a96fba6SXin Li size_t size) {
65*1a96fba6SXin Li auto* byte_ptr = static_cast<const uint8_t*>(data);
66*1a96fba6SXin Li AddReadPacketData(delay, brillo::Blob{byte_ptr, byte_ptr + size});
67*1a96fba6SXin Li }
68*1a96fba6SXin Li
AddReadPacketData(base::TimeDelta delay,brillo::Blob data)69*1a96fba6SXin Li void FakeStream::AddReadPacketData(base::TimeDelta delay, brillo::Blob data) {
70*1a96fba6SXin Li InputDataPacket packet;
71*1a96fba6SXin Li packet.data = std::move(data);
72*1a96fba6SXin Li packet.delay_before = delay;
73*1a96fba6SXin Li incoming_queue_.push(std::move(packet));
74*1a96fba6SXin Li }
75*1a96fba6SXin Li
AddReadPacketString(base::TimeDelta delay,const std::string & data)76*1a96fba6SXin Li void FakeStream::AddReadPacketString(base::TimeDelta delay,
77*1a96fba6SXin Li const std::string& data) {
78*1a96fba6SXin Li AddReadPacketData(delay, brillo::Blob{data.begin(), data.end()});
79*1a96fba6SXin Li }
80*1a96fba6SXin Li
QueueReadError(base::TimeDelta delay)81*1a96fba6SXin Li void FakeStream::QueueReadError(base::TimeDelta delay) {
82*1a96fba6SXin Li QueueReadErrorWithMessage(delay, std::string{});
83*1a96fba6SXin Li }
84*1a96fba6SXin Li
QueueReadErrorWithMessage(base::TimeDelta delay,const std::string & message)85*1a96fba6SXin Li void FakeStream::QueueReadErrorWithMessage(base::TimeDelta delay,
86*1a96fba6SXin Li const std::string& message) {
87*1a96fba6SXin Li InputDataPacket packet;
88*1a96fba6SXin Li packet.data.assign(message.begin(), message.end());
89*1a96fba6SXin Li packet.delay_before = delay;
90*1a96fba6SXin Li packet.read_error = true;
91*1a96fba6SXin Li incoming_queue_.push(std::move(packet));
92*1a96fba6SXin Li }
93*1a96fba6SXin Li
ClearReadQueue()94*1a96fba6SXin Li void FakeStream::ClearReadQueue() {
95*1a96fba6SXin Li std::queue<InputDataPacket>().swap(incoming_queue_);
96*1a96fba6SXin Li delay_input_until_ = base::Time{};
97*1a96fba6SXin Li input_buffer_.clear();
98*1a96fba6SXin Li input_ptr_ = 0;
99*1a96fba6SXin Li report_read_error_ = 0;
100*1a96fba6SXin Li }
101*1a96fba6SXin Li
ExpectWritePacketSize(base::TimeDelta delay,size_t data_size)102*1a96fba6SXin Li void FakeStream::ExpectWritePacketSize(base::TimeDelta delay,
103*1a96fba6SXin Li size_t data_size) {
104*1a96fba6SXin Li OutputDataPacket packet;
105*1a96fba6SXin Li packet.expected_size = data_size;
106*1a96fba6SXin Li packet.delay_before = delay;
107*1a96fba6SXin Li outgoing_queue_.push(std::move(packet));
108*1a96fba6SXin Li }
109*1a96fba6SXin Li
ExpectWritePacketData(base::TimeDelta delay,const void * data,size_t size)110*1a96fba6SXin Li void FakeStream::ExpectWritePacketData(base::TimeDelta delay,
111*1a96fba6SXin Li const void* data,
112*1a96fba6SXin Li size_t size) {
113*1a96fba6SXin Li auto* byte_ptr = static_cast<const uint8_t*>(data);
114*1a96fba6SXin Li ExpectWritePacketData(delay, brillo::Blob{byte_ptr, byte_ptr + size});
115*1a96fba6SXin Li }
116*1a96fba6SXin Li
ExpectWritePacketData(base::TimeDelta delay,brillo::Blob data)117*1a96fba6SXin Li void FakeStream::ExpectWritePacketData(base::TimeDelta delay,
118*1a96fba6SXin Li brillo::Blob data) {
119*1a96fba6SXin Li OutputDataPacket packet;
120*1a96fba6SXin Li packet.expected_size = data.size();
121*1a96fba6SXin Li packet.data = std::move(data);
122*1a96fba6SXin Li packet.delay_before = delay;
123*1a96fba6SXin Li outgoing_queue_.push(std::move(packet));
124*1a96fba6SXin Li }
125*1a96fba6SXin Li
ExpectWritePacketString(base::TimeDelta delay,const std::string & data)126*1a96fba6SXin Li void FakeStream::ExpectWritePacketString(base::TimeDelta delay,
127*1a96fba6SXin Li const std::string& data) {
128*1a96fba6SXin Li ExpectWritePacketData(delay, brillo::Blob{data.begin(), data.end()});
129*1a96fba6SXin Li }
130*1a96fba6SXin Li
QueueWriteError(base::TimeDelta delay)131*1a96fba6SXin Li void FakeStream::QueueWriteError(base::TimeDelta delay) {
132*1a96fba6SXin Li QueueWriteErrorWithMessage(delay, std::string{});
133*1a96fba6SXin Li }
134*1a96fba6SXin Li
QueueWriteErrorWithMessage(base::TimeDelta delay,const std::string & message)135*1a96fba6SXin Li void FakeStream::QueueWriteErrorWithMessage(base::TimeDelta delay,
136*1a96fba6SXin Li const std::string& message) {
137*1a96fba6SXin Li OutputDataPacket packet;
138*1a96fba6SXin Li packet.expected_size = 0;
139*1a96fba6SXin Li packet.data.assign(message.begin(), message.end());
140*1a96fba6SXin Li packet.delay_before = delay;
141*1a96fba6SXin Li packet.write_error = true;
142*1a96fba6SXin Li outgoing_queue_.push(std::move(packet));
143*1a96fba6SXin Li }
144*1a96fba6SXin Li
ClearWriteQueue()145*1a96fba6SXin Li void FakeStream::ClearWriteQueue() {
146*1a96fba6SXin Li std::queue<OutputDataPacket>().swap(outgoing_queue_);
147*1a96fba6SXin Li delay_output_until_ = base::Time{};
148*1a96fba6SXin Li output_buffer_.clear();
149*1a96fba6SXin Li expected_output_data_.clear();
150*1a96fba6SXin Li max_output_buffer_size_ = 0;
151*1a96fba6SXin Li all_output_data_.clear();
152*1a96fba6SXin Li report_write_error_ = 0;
153*1a96fba6SXin Li }
154*1a96fba6SXin Li
GetFlushedOutputData() const155*1a96fba6SXin Li const brillo::Blob& FakeStream::GetFlushedOutputData() const {
156*1a96fba6SXin Li return all_output_data_;
157*1a96fba6SXin Li }
158*1a96fba6SXin Li
GetFlushedOutputDataAsString() const159*1a96fba6SXin Li std::string FakeStream::GetFlushedOutputDataAsString() const {
160*1a96fba6SXin Li return std::string{all_output_data_.begin(), all_output_data_.end()};
161*1a96fba6SXin Li }
162*1a96fba6SXin Li
CanRead() const163*1a96fba6SXin Li bool FakeStream::CanRead() const {
164*1a96fba6SXin Li return stream_utils::IsReadAccessMode(mode_);
165*1a96fba6SXin Li }
166*1a96fba6SXin Li
CanWrite() const167*1a96fba6SXin Li bool FakeStream::CanWrite() const {
168*1a96fba6SXin Li return stream_utils::IsWriteAccessMode(mode_);
169*1a96fba6SXin Li }
170*1a96fba6SXin Li
SetSizeBlocking(uint64_t,ErrorPtr * error)171*1a96fba6SXin Li bool FakeStream::SetSizeBlocking(uint64_t /* size */, ErrorPtr* error) {
172*1a96fba6SXin Li return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
173*1a96fba6SXin Li }
174*1a96fba6SXin Li
Seek(int64_t,Whence,uint64_t *,ErrorPtr * error)175*1a96fba6SXin Li bool FakeStream::Seek(int64_t /* offset */,
176*1a96fba6SXin Li Whence /* whence */,
177*1a96fba6SXin Li uint64_t* /* new_position */,
178*1a96fba6SXin Li ErrorPtr* error) {
179*1a96fba6SXin Li return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
180*1a96fba6SXin Li }
181*1a96fba6SXin Li
IsReadBufferEmpty() const182*1a96fba6SXin Li bool FakeStream::IsReadBufferEmpty() const {
183*1a96fba6SXin Li return input_ptr_ >= input_buffer_.size();
184*1a96fba6SXin Li }
185*1a96fba6SXin Li
PopReadPacket()186*1a96fba6SXin Li bool FakeStream::PopReadPacket() {
187*1a96fba6SXin Li if (incoming_queue_.empty())
188*1a96fba6SXin Li return false;
189*1a96fba6SXin Li InputDataPacket& packet = incoming_queue_.front();
190*1a96fba6SXin Li input_ptr_ = 0;
191*1a96fba6SXin Li input_buffer_ = std::move(packet.data);
192*1a96fba6SXin Li delay_input_until_ = clock_->Now() + packet.delay_before;
193*1a96fba6SXin Li incoming_queue_.pop();
194*1a96fba6SXin Li report_read_error_ = packet.read_error;
195*1a96fba6SXin Li return true;
196*1a96fba6SXin Li }
197*1a96fba6SXin Li
ReadNonBlocking(void * buffer,size_t size_to_read,size_t * size_read,bool * end_of_stream,ErrorPtr * error)198*1a96fba6SXin Li bool FakeStream::ReadNonBlocking(void* buffer,
199*1a96fba6SXin Li size_t size_to_read,
200*1a96fba6SXin Li size_t* size_read,
201*1a96fba6SXin Li bool* end_of_stream,
202*1a96fba6SXin Li ErrorPtr* error) {
203*1a96fba6SXin Li if (!CanRead())
204*1a96fba6SXin Li return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
205*1a96fba6SXin Li
206*1a96fba6SXin Li if (!IsOpen())
207*1a96fba6SXin Li return stream_utils::ErrorStreamClosed(FROM_HERE, error);
208*1a96fba6SXin Li
209*1a96fba6SXin Li for (;;) {
210*1a96fba6SXin Li if (!delay_input_until_.is_null() && clock_->Now() < delay_input_until_) {
211*1a96fba6SXin Li *size_read = 0;
212*1a96fba6SXin Li if (end_of_stream)
213*1a96fba6SXin Li *end_of_stream = false;
214*1a96fba6SXin Li break;
215*1a96fba6SXin Li }
216*1a96fba6SXin Li
217*1a96fba6SXin Li if (report_read_error_) {
218*1a96fba6SXin Li report_read_error_ = false;
219*1a96fba6SXin Li std::string message{input_buffer_.begin(), input_buffer_.end()};
220*1a96fba6SXin Li if (message.empty())
221*1a96fba6SXin Li message = "Simulating read error for tests";
222*1a96fba6SXin Li input_buffer_.clear();
223*1a96fba6SXin Li Error::AddTo(error, FROM_HERE, "fake_stream", "read_error", message);
224*1a96fba6SXin Li return false;
225*1a96fba6SXin Li }
226*1a96fba6SXin Li
227*1a96fba6SXin Li if (!IsReadBufferEmpty()) {
228*1a96fba6SXin Li size_to_read = std::min(size_to_read, input_buffer_.size() - input_ptr_);
229*1a96fba6SXin Li std::memcpy(buffer, input_buffer_.data() + input_ptr_, size_to_read);
230*1a96fba6SXin Li input_ptr_ += size_to_read;
231*1a96fba6SXin Li *size_read = size_to_read;
232*1a96fba6SXin Li if (end_of_stream)
233*1a96fba6SXin Li *end_of_stream = false;
234*1a96fba6SXin Li break;
235*1a96fba6SXin Li }
236*1a96fba6SXin Li
237*1a96fba6SXin Li if (!PopReadPacket()) {
238*1a96fba6SXin Li *size_read = 0;
239*1a96fba6SXin Li if (end_of_stream)
240*1a96fba6SXin Li *end_of_stream = true;
241*1a96fba6SXin Li break;
242*1a96fba6SXin Li }
243*1a96fba6SXin Li }
244*1a96fba6SXin Li return true;
245*1a96fba6SXin Li }
246*1a96fba6SXin Li
IsWriteBufferFull() const247*1a96fba6SXin Li bool FakeStream::IsWriteBufferFull() const {
248*1a96fba6SXin Li return output_buffer_.size() >= max_output_buffer_size_;
249*1a96fba6SXin Li }
250*1a96fba6SXin Li
PopWritePacket()251*1a96fba6SXin Li bool FakeStream::PopWritePacket() {
252*1a96fba6SXin Li if (outgoing_queue_.empty())
253*1a96fba6SXin Li return false;
254*1a96fba6SXin Li OutputDataPacket& packet = outgoing_queue_.front();
255*1a96fba6SXin Li expected_output_data_ = std::move(packet.data);
256*1a96fba6SXin Li delay_output_until_ = clock_->Now() + packet.delay_before;
257*1a96fba6SXin Li max_output_buffer_size_ = packet.expected_size;
258*1a96fba6SXin Li report_write_error_ = packet.write_error;
259*1a96fba6SXin Li outgoing_queue_.pop();
260*1a96fba6SXin Li return true;
261*1a96fba6SXin Li }
262*1a96fba6SXin Li
WriteNonBlocking(const void * buffer,size_t size_to_write,size_t * size_written,ErrorPtr * error)263*1a96fba6SXin Li bool FakeStream::WriteNonBlocking(const void* buffer,
264*1a96fba6SXin Li size_t size_to_write,
265*1a96fba6SXin Li size_t* size_written,
266*1a96fba6SXin Li ErrorPtr* error) {
267*1a96fba6SXin Li if (!CanWrite())
268*1a96fba6SXin Li return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
269*1a96fba6SXin Li
270*1a96fba6SXin Li if (!IsOpen())
271*1a96fba6SXin Li return stream_utils::ErrorStreamClosed(FROM_HERE, error);
272*1a96fba6SXin Li
273*1a96fba6SXin Li for (;;) {
274*1a96fba6SXin Li if (!delay_output_until_.is_null() && clock_->Now() < delay_output_until_) {
275*1a96fba6SXin Li *size_written = 0;
276*1a96fba6SXin Li return true;
277*1a96fba6SXin Li }
278*1a96fba6SXin Li
279*1a96fba6SXin Li if (report_write_error_) {
280*1a96fba6SXin Li report_write_error_ = false;
281*1a96fba6SXin Li std::string message{expected_output_data_.begin(),
282*1a96fba6SXin Li expected_output_data_.end()};
283*1a96fba6SXin Li if (message.empty())
284*1a96fba6SXin Li message = "Simulating write error for tests";
285*1a96fba6SXin Li output_buffer_.clear();
286*1a96fba6SXin Li max_output_buffer_size_ = 0;
287*1a96fba6SXin Li expected_output_data_.clear();
288*1a96fba6SXin Li Error::AddTo(error, FROM_HERE, "fake_stream", "write_error", message);
289*1a96fba6SXin Li return false;
290*1a96fba6SXin Li }
291*1a96fba6SXin Li
292*1a96fba6SXin Li if (!IsWriteBufferFull()) {
293*1a96fba6SXin Li bool success = true;
294*1a96fba6SXin Li size_to_write = std::min(size_to_write,
295*1a96fba6SXin Li max_output_buffer_size_ - output_buffer_.size());
296*1a96fba6SXin Li auto byte_ptr = static_cast<const uint8_t*>(buffer);
297*1a96fba6SXin Li output_buffer_.insert(output_buffer_.end(),
298*1a96fba6SXin Li byte_ptr, byte_ptr + size_to_write);
299*1a96fba6SXin Li if (output_buffer_.size() == max_output_buffer_size_) {
300*1a96fba6SXin Li if (!expected_output_data_.empty() &&
301*1a96fba6SXin Li expected_output_data_ != output_buffer_) {
302*1a96fba6SXin Li // We expected different data to be written, report an error.
303*1a96fba6SXin Li Error::AddTo(error, FROM_HERE, "fake_stream", "data_mismatch",
304*1a96fba6SXin Li "Unexpected data written");
305*1a96fba6SXin Li success = false;
306*1a96fba6SXin Li }
307*1a96fba6SXin Li
308*1a96fba6SXin Li all_output_data_.insert(all_output_data_.end(),
309*1a96fba6SXin Li output_buffer_.begin(), output_buffer_.end());
310*1a96fba6SXin Li
311*1a96fba6SXin Li output_buffer_.clear();
312*1a96fba6SXin Li max_output_buffer_size_ = 0;
313*1a96fba6SXin Li expected_output_data_.clear();
314*1a96fba6SXin Li }
315*1a96fba6SXin Li *size_written = size_to_write;
316*1a96fba6SXin Li return success;
317*1a96fba6SXin Li }
318*1a96fba6SXin Li
319*1a96fba6SXin Li if (!PopWritePacket()) {
320*1a96fba6SXin Li // No more data expected.
321*1a96fba6SXin Li Error::AddTo(error, FROM_HERE, "fake_stream", "full",
322*1a96fba6SXin Li "No more output data expected");
323*1a96fba6SXin Li return false;
324*1a96fba6SXin Li }
325*1a96fba6SXin Li }
326*1a96fba6SXin Li }
327*1a96fba6SXin Li
FlushBlocking(ErrorPtr * error)328*1a96fba6SXin Li bool FakeStream::FlushBlocking(ErrorPtr* error) {
329*1a96fba6SXin Li if (!CanWrite())
330*1a96fba6SXin Li return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
331*1a96fba6SXin Li
332*1a96fba6SXin Li if (!IsOpen())
333*1a96fba6SXin Li return stream_utils::ErrorStreamClosed(FROM_HERE, error);
334*1a96fba6SXin Li
335*1a96fba6SXin Li bool success = true;
336*1a96fba6SXin Li if (!output_buffer_.empty()) {
337*1a96fba6SXin Li if (!expected_output_data_.empty() &&
338*1a96fba6SXin Li expected_output_data_ != output_buffer_) {
339*1a96fba6SXin Li // We expected different data to be written, report an error.
340*1a96fba6SXin Li Error::AddTo(error, FROM_HERE, "fake_stream", "data_mismatch",
341*1a96fba6SXin Li "Unexpected data written");
342*1a96fba6SXin Li success = false;
343*1a96fba6SXin Li }
344*1a96fba6SXin Li all_output_data_.insert(all_output_data_.end(),
345*1a96fba6SXin Li output_buffer_.begin(), output_buffer_.end());
346*1a96fba6SXin Li
347*1a96fba6SXin Li output_buffer_.clear();
348*1a96fba6SXin Li max_output_buffer_size_ = 0;
349*1a96fba6SXin Li expected_output_data_.clear();
350*1a96fba6SXin Li }
351*1a96fba6SXin Li return success;
352*1a96fba6SXin Li }
353*1a96fba6SXin Li
CloseBlocking(ErrorPtr *)354*1a96fba6SXin Li bool FakeStream::CloseBlocking(ErrorPtr* /* error */) {
355*1a96fba6SXin Li is_open_ = false;
356*1a96fba6SXin Li return true;
357*1a96fba6SXin Li }
358*1a96fba6SXin Li
WaitForData(AccessMode mode,const base::Callback<void (AccessMode)> & callback,ErrorPtr * error)359*1a96fba6SXin Li bool FakeStream::WaitForData(AccessMode mode,
360*1a96fba6SXin Li const base::Callback<void(AccessMode)>& callback,
361*1a96fba6SXin Li ErrorPtr* error) {
362*1a96fba6SXin Li bool read_requested = stream_utils::IsReadAccessMode(mode);
363*1a96fba6SXin Li bool write_requested = stream_utils::IsWriteAccessMode(mode);
364*1a96fba6SXin Li
365*1a96fba6SXin Li if ((read_requested && !CanRead()) || (write_requested && !CanWrite()))
366*1a96fba6SXin Li return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
367*1a96fba6SXin Li
368*1a96fba6SXin Li if (read_requested && IsReadBufferEmpty())
369*1a96fba6SXin Li PopReadPacket();
370*1a96fba6SXin Li if (write_requested && IsWriteBufferFull())
371*1a96fba6SXin Li PopWritePacket();
372*1a96fba6SXin Li
373*1a96fba6SXin Li base::TimeDelta delay;
374*1a96fba6SXin Li GetMinDelayAndMode(clock_->Now(), read_requested, delay_input_until_,
375*1a96fba6SXin Li write_requested, delay_output_until_, &mode, &delay);
376*1a96fba6SXin Li MessageLoop::current()->PostDelayedTask(
377*1a96fba6SXin Li FROM_HERE, base::Bind(callback, mode), delay);
378*1a96fba6SXin Li return true;
379*1a96fba6SXin Li }
380*1a96fba6SXin Li
WaitForDataBlocking(AccessMode in_mode,base::TimeDelta timeout,AccessMode * out_mode,ErrorPtr * error)381*1a96fba6SXin Li bool FakeStream::WaitForDataBlocking(AccessMode in_mode,
382*1a96fba6SXin Li base::TimeDelta timeout,
383*1a96fba6SXin Li AccessMode* out_mode,
384*1a96fba6SXin Li ErrorPtr* error) {
385*1a96fba6SXin Li bool read_requested = stream_utils::IsReadAccessMode(in_mode);
386*1a96fba6SXin Li bool write_requested = stream_utils::IsWriteAccessMode(in_mode);
387*1a96fba6SXin Li
388*1a96fba6SXin Li if ((read_requested && !CanRead()) || (write_requested && !CanWrite()))
389*1a96fba6SXin Li return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
390*1a96fba6SXin Li
391*1a96fba6SXin Li base::TimeDelta delay;
392*1a96fba6SXin Li GetMinDelayAndMode(clock_->Now(), read_requested, delay_input_until_,
393*1a96fba6SXin Li write_requested, delay_output_until_, out_mode, &delay);
394*1a96fba6SXin Li
395*1a96fba6SXin Li if (timeout < delay)
396*1a96fba6SXin Li return stream_utils::ErrorOperationTimeout(FROM_HERE, error);
397*1a96fba6SXin Li
398*1a96fba6SXin Li LOG(INFO) << "TEST: Would have blocked for " << delay.InMilliseconds()
399*1a96fba6SXin Li << " ms.";
400*1a96fba6SXin Li
401*1a96fba6SXin Li return true;
402*1a96fba6SXin Li }
403*1a96fba6SXin Li
404*1a96fba6SXin Li } // namespace brillo
405