1*1a96fba6SXin Li // Copyright 2015 The Chromium OS Authors. All rights reserved.
2*1a96fba6SXin Li // Use of this source code is governed by a BSD-style license that can be
3*1a96fba6SXin Li // found in the LICENSE file.
4*1a96fba6SXin Li
5*1a96fba6SXin Li #include <brillo/streams/input_stream_set.h>
6*1a96fba6SXin Li
7*1a96fba6SXin Li #include <utility>
8*1a96fba6SXin Li
9*1a96fba6SXin Li #include <base/bind.h>
10*1a96fba6SXin Li #include <brillo/message_loops/message_loop.h>
11*1a96fba6SXin Li #include <brillo/streams/stream_errors.h>
12*1a96fba6SXin Li #include <brillo/streams/stream_utils.h>
13*1a96fba6SXin Li
14*1a96fba6SXin Li namespace brillo {
15*1a96fba6SXin Li
InputStreamSet(std::vector<Stream * > source_streams,std::vector<StreamPtr> owned_source_streams,uint64_t initial_stream_size)16*1a96fba6SXin Li InputStreamSet::InputStreamSet(
17*1a96fba6SXin Li std::vector<Stream*> source_streams,
18*1a96fba6SXin Li std::vector<StreamPtr> owned_source_streams,
19*1a96fba6SXin Li uint64_t initial_stream_size)
20*1a96fba6SXin Li : source_streams_{std::move(source_streams)},
21*1a96fba6SXin Li owned_source_streams_{std::move(owned_source_streams)},
22*1a96fba6SXin Li initial_stream_size_{initial_stream_size} {}
23*1a96fba6SXin Li
Create(std::vector<Stream * > source_streams,std::vector<StreamPtr> owned_source_streams,ErrorPtr * error)24*1a96fba6SXin Li StreamPtr InputStreamSet::Create(std::vector<Stream*> source_streams,
25*1a96fba6SXin Li std::vector<StreamPtr> owned_source_streams,
26*1a96fba6SXin Li ErrorPtr* error) {
27*1a96fba6SXin Li StreamPtr stream;
28*1a96fba6SXin Li
29*1a96fba6SXin Li if (source_streams.empty()) {
30*1a96fba6SXin Li Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
31*1a96fba6SXin Li errors::stream::kInvalidParameter,
32*1a96fba6SXin Li "Source stream list is empty");
33*1a96fba6SXin Li return stream;
34*1a96fba6SXin Li }
35*1a96fba6SXin Li
36*1a96fba6SXin Li // Make sure we have only readable streams.
37*1a96fba6SXin Li for (Stream* src_stream : source_streams) {
38*1a96fba6SXin Li if (!src_stream->CanRead()) {
39*1a96fba6SXin Li Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
40*1a96fba6SXin Li errors::stream::kInvalidParameter,
41*1a96fba6SXin Li "The stream list must contain only readable streams");
42*1a96fba6SXin Li return stream;
43*1a96fba6SXin Li }
44*1a96fba6SXin Li }
45*1a96fba6SXin Li
46*1a96fba6SXin Li // We are using remaining size here because the multiplexed stream is not
47*1a96fba6SXin Li // seekable and the bytes already read are essentially "lost" as far as this
48*1a96fba6SXin Li // stream is concerned.
49*1a96fba6SXin Li uint64_t initial_stream_size = 0;
50*1a96fba6SXin Li for (const Stream* stream : source_streams)
51*1a96fba6SXin Li initial_stream_size += stream->GetRemainingSize();
52*1a96fba6SXin Li
53*1a96fba6SXin Li stream.reset(new InputStreamSet{std::move(source_streams),
54*1a96fba6SXin Li std::move(owned_source_streams),
55*1a96fba6SXin Li initial_stream_size});
56*1a96fba6SXin Li return stream;
57*1a96fba6SXin Li }
58*1a96fba6SXin Li
Create(std::vector<Stream * > source_streams,ErrorPtr * error)59*1a96fba6SXin Li StreamPtr InputStreamSet::Create(std::vector<Stream*> source_streams,
60*1a96fba6SXin Li ErrorPtr* error) {
61*1a96fba6SXin Li return Create(std::move(source_streams), {}, error);
62*1a96fba6SXin Li }
63*1a96fba6SXin Li
Create(std::vector<StreamPtr> owned_source_streams,ErrorPtr * error)64*1a96fba6SXin Li StreamPtr InputStreamSet::Create(std::vector<StreamPtr> owned_source_streams,
65*1a96fba6SXin Li ErrorPtr* error) {
66*1a96fba6SXin Li std::vector<Stream*> source_streams;
67*1a96fba6SXin Li source_streams.reserve(owned_source_streams.size());
68*1a96fba6SXin Li for (const StreamPtr& stream : owned_source_streams)
69*1a96fba6SXin Li source_streams.push_back(stream.get());
70*1a96fba6SXin Li return Create(std::move(source_streams), std::move(owned_source_streams),
71*1a96fba6SXin Li error);
72*1a96fba6SXin Li }
73*1a96fba6SXin Li
IsOpen() const74*1a96fba6SXin Li bool InputStreamSet::IsOpen() const {
75*1a96fba6SXin Li return !closed_;
76*1a96fba6SXin Li }
77*1a96fba6SXin Li
CanGetSize() const78*1a96fba6SXin Li bool InputStreamSet::CanGetSize() const {
79*1a96fba6SXin Li bool can_get_size = IsOpen();
80*1a96fba6SXin Li for (const Stream* stream : source_streams_) {
81*1a96fba6SXin Li if (!stream->CanGetSize()) {
82*1a96fba6SXin Li can_get_size = false;
83*1a96fba6SXin Li break;
84*1a96fba6SXin Li }
85*1a96fba6SXin Li }
86*1a96fba6SXin Li return can_get_size;
87*1a96fba6SXin Li }
88*1a96fba6SXin Li
GetSize() const89*1a96fba6SXin Li uint64_t InputStreamSet::GetSize() const {
90*1a96fba6SXin Li return initial_stream_size_;
91*1a96fba6SXin Li }
92*1a96fba6SXin Li
SetSizeBlocking(uint64_t,ErrorPtr * error)93*1a96fba6SXin Li bool InputStreamSet::SetSizeBlocking(uint64_t /* size */, ErrorPtr* error) {
94*1a96fba6SXin Li return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
95*1a96fba6SXin Li }
96*1a96fba6SXin Li
GetRemainingSize() const97*1a96fba6SXin Li uint64_t InputStreamSet::GetRemainingSize() const {
98*1a96fba6SXin Li uint64_t size = 0;
99*1a96fba6SXin Li for (const Stream* stream : source_streams_)
100*1a96fba6SXin Li size += stream->GetRemainingSize();
101*1a96fba6SXin Li return size;
102*1a96fba6SXin Li }
103*1a96fba6SXin Li
Seek(int64_t,Whence,uint64_t *,ErrorPtr * error)104*1a96fba6SXin Li bool InputStreamSet::Seek(int64_t /* offset */,
105*1a96fba6SXin Li Whence /* whence */,
106*1a96fba6SXin Li uint64_t* /* new_position */,
107*1a96fba6SXin Li ErrorPtr* error) {
108*1a96fba6SXin Li return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
109*1a96fba6SXin Li }
110*1a96fba6SXin Li
ReadNonBlocking(void * buffer,size_t size_to_read,size_t * size_read,bool * end_of_stream,ErrorPtr * error)111*1a96fba6SXin Li bool InputStreamSet::ReadNonBlocking(void* buffer,
112*1a96fba6SXin Li size_t size_to_read,
113*1a96fba6SXin Li size_t* size_read,
114*1a96fba6SXin Li bool* end_of_stream,
115*1a96fba6SXin Li ErrorPtr* error) {
116*1a96fba6SXin Li if (!IsOpen())
117*1a96fba6SXin Li return stream_utils::ErrorStreamClosed(FROM_HERE, error);
118*1a96fba6SXin Li
119*1a96fba6SXin Li while (!source_streams_.empty()) {
120*1a96fba6SXin Li Stream* stream = source_streams_.front();
121*1a96fba6SXin Li bool eos = false;
122*1a96fba6SXin Li if (!stream->ReadNonBlocking(buffer, size_to_read, size_read, &eos, error))
123*1a96fba6SXin Li return false;
124*1a96fba6SXin Li
125*1a96fba6SXin Li if (*size_read > 0 || !eos) {
126*1a96fba6SXin Li if (end_of_stream)
127*1a96fba6SXin Li *end_of_stream = false;
128*1a96fba6SXin Li return true;
129*1a96fba6SXin Li }
130*1a96fba6SXin Li
131*1a96fba6SXin Li source_streams_.erase(source_streams_.begin());
132*1a96fba6SXin Li }
133*1a96fba6SXin Li *size_read = 0;
134*1a96fba6SXin Li if (end_of_stream)
135*1a96fba6SXin Li *end_of_stream = true;
136*1a96fba6SXin Li return true;
137*1a96fba6SXin Li }
138*1a96fba6SXin Li
WriteNonBlocking(const void *,size_t,size_t *,ErrorPtr * error)139*1a96fba6SXin Li bool InputStreamSet::WriteNonBlocking(const void* /* buffer */,
140*1a96fba6SXin Li size_t /* size_to_write */,
141*1a96fba6SXin Li size_t* /* size_written */,
142*1a96fba6SXin Li ErrorPtr* error) {
143*1a96fba6SXin Li return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
144*1a96fba6SXin Li }
145*1a96fba6SXin Li
CloseBlocking(ErrorPtr * error)146*1a96fba6SXin Li bool InputStreamSet::CloseBlocking(ErrorPtr* error) {
147*1a96fba6SXin Li bool success = true;
148*1a96fba6SXin Li // We want to close only the owned streams.
149*1a96fba6SXin Li for (StreamPtr& stream_ptr : owned_source_streams_) {
150*1a96fba6SXin Li if (!stream_ptr->CloseBlocking(error))
151*1a96fba6SXin Li success = false; // Keep going for other streams...
152*1a96fba6SXin Li }
153*1a96fba6SXin Li owned_source_streams_.clear();
154*1a96fba6SXin Li source_streams_.clear();
155*1a96fba6SXin Li initial_stream_size_ = 0;
156*1a96fba6SXin Li closed_ = true;
157*1a96fba6SXin Li return success;
158*1a96fba6SXin Li }
159*1a96fba6SXin Li
WaitForData(AccessMode mode,const base::Callback<void (AccessMode)> & callback,ErrorPtr * error)160*1a96fba6SXin Li bool InputStreamSet::WaitForData(
161*1a96fba6SXin Li AccessMode mode,
162*1a96fba6SXin Li const base::Callback<void(AccessMode)>& callback,
163*1a96fba6SXin Li ErrorPtr* error) {
164*1a96fba6SXin Li if (!IsOpen())
165*1a96fba6SXin Li return stream_utils::ErrorStreamClosed(FROM_HERE, error);
166*1a96fba6SXin Li
167*1a96fba6SXin Li if (stream_utils::IsWriteAccessMode(mode))
168*1a96fba6SXin Li return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
169*1a96fba6SXin Li
170*1a96fba6SXin Li if (!source_streams_.empty()) {
171*1a96fba6SXin Li Stream* stream = source_streams_.front();
172*1a96fba6SXin Li return stream->WaitForData(mode, callback, error);
173*1a96fba6SXin Li }
174*1a96fba6SXin Li
175*1a96fba6SXin Li MessageLoop::current()->PostTask(FROM_HERE, base::BindOnce(callback, mode));
176*1a96fba6SXin Li return true;
177*1a96fba6SXin Li }
178*1a96fba6SXin Li
WaitForDataBlocking(AccessMode in_mode,base::TimeDelta timeout,AccessMode * out_mode,ErrorPtr * error)179*1a96fba6SXin Li bool InputStreamSet::WaitForDataBlocking(AccessMode in_mode,
180*1a96fba6SXin Li base::TimeDelta timeout,
181*1a96fba6SXin Li AccessMode* out_mode,
182*1a96fba6SXin Li ErrorPtr* error) {
183*1a96fba6SXin Li if (!IsOpen())
184*1a96fba6SXin Li return stream_utils::ErrorStreamClosed(FROM_HERE, error);
185*1a96fba6SXin Li
186*1a96fba6SXin Li if (stream_utils::IsWriteAccessMode(in_mode))
187*1a96fba6SXin Li return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
188*1a96fba6SXin Li
189*1a96fba6SXin Li if (!source_streams_.empty()) {
190*1a96fba6SXin Li Stream* stream = source_streams_.front();
191*1a96fba6SXin Li return stream->WaitForDataBlocking(in_mode, timeout, out_mode, error);
192*1a96fba6SXin Li }
193*1a96fba6SXin Li
194*1a96fba6SXin Li if (out_mode)
195*1a96fba6SXin Li *out_mode = in_mode;
196*1a96fba6SXin Li return true;
197*1a96fba6SXin Li }
198*1a96fba6SXin Li
CancelPendingAsyncOperations()199*1a96fba6SXin Li void InputStreamSet::CancelPendingAsyncOperations() {
200*1a96fba6SXin Li if (IsOpen() && !source_streams_.empty()) {
201*1a96fba6SXin Li Stream* stream = source_streams_.front();
202*1a96fba6SXin Li stream->CancelPendingAsyncOperations();
203*1a96fba6SXin Li }
204*1a96fba6SXin Li Stream::CancelPendingAsyncOperations();
205*1a96fba6SXin Li }
206*1a96fba6SXin Li
207*1a96fba6SXin Li } // namespace brillo
208