1 // Copyright 2015 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <brillo/streams/input_stream_set.h>
6
7 #include <utility>
8
9 #include <base/bind.h>
10 #include <brillo/message_loops/message_loop.h>
11 #include <brillo/streams/stream_errors.h>
12 #include <brillo/streams/stream_utils.h>
13
14 namespace brillo {
15
InputStreamSet(std::vector<Stream * > source_streams,std::vector<StreamPtr> owned_source_streams,uint64_t initial_stream_size)16 InputStreamSet::InputStreamSet(
17 std::vector<Stream*> source_streams,
18 std::vector<StreamPtr> owned_source_streams,
19 uint64_t initial_stream_size)
20 : source_streams_{std::move(source_streams)},
21 owned_source_streams_{std::move(owned_source_streams)},
22 initial_stream_size_{initial_stream_size} {}
23
Create(std::vector<Stream * > source_streams,std::vector<StreamPtr> owned_source_streams,ErrorPtr * error)24 StreamPtr InputStreamSet::Create(std::vector<Stream*> source_streams,
25 std::vector<StreamPtr> owned_source_streams,
26 ErrorPtr* error) {
27 StreamPtr stream;
28
29 if (source_streams.empty()) {
30 Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
31 errors::stream::kInvalidParameter,
32 "Source stream list is empty");
33 return stream;
34 }
35
36 // Make sure we have only readable streams.
37 for (Stream* src_stream : source_streams) {
38 if (!src_stream->CanRead()) {
39 Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
40 errors::stream::kInvalidParameter,
41 "The stream list must contain only readable streams");
42 return stream;
43 }
44 }
45
46 // We are using remaining size here because the multiplexed stream is not
47 // seekable and the bytes already read are essentially "lost" as far as this
48 // stream is concerned.
49 uint64_t initial_stream_size = 0;
50 for (const Stream* stream : source_streams)
51 initial_stream_size += stream->GetRemainingSize();
52
53 stream.reset(new InputStreamSet{std::move(source_streams),
54 std::move(owned_source_streams),
55 initial_stream_size});
56 return stream;
57 }
58
Create(std::vector<Stream * > source_streams,ErrorPtr * error)59 StreamPtr InputStreamSet::Create(std::vector<Stream*> source_streams,
60 ErrorPtr* error) {
61 return Create(std::move(source_streams), {}, error);
62 }
63
Create(std::vector<StreamPtr> owned_source_streams,ErrorPtr * error)64 StreamPtr InputStreamSet::Create(std::vector<StreamPtr> owned_source_streams,
65 ErrorPtr* error) {
66 std::vector<Stream*> source_streams;
67 source_streams.reserve(owned_source_streams.size());
68 for (const StreamPtr& stream : owned_source_streams)
69 source_streams.push_back(stream.get());
70 return Create(std::move(source_streams), std::move(owned_source_streams),
71 error);
72 }
73
IsOpen() const74 bool InputStreamSet::IsOpen() const {
75 return !closed_;
76 }
77
CanGetSize() const78 bool InputStreamSet::CanGetSize() const {
79 bool can_get_size = IsOpen();
80 for (const Stream* stream : source_streams_) {
81 if (!stream->CanGetSize()) {
82 can_get_size = false;
83 break;
84 }
85 }
86 return can_get_size;
87 }
88
GetSize() const89 uint64_t InputStreamSet::GetSize() const {
90 return initial_stream_size_;
91 }
92
SetSizeBlocking(uint64_t,ErrorPtr * error)93 bool InputStreamSet::SetSizeBlocking(uint64_t /* size */, ErrorPtr* error) {
94 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
95 }
96
GetRemainingSize() const97 uint64_t InputStreamSet::GetRemainingSize() const {
98 uint64_t size = 0;
99 for (const Stream* stream : source_streams_)
100 size += stream->GetRemainingSize();
101 return size;
102 }
103
Seek(int64_t,Whence,uint64_t *,ErrorPtr * error)104 bool InputStreamSet::Seek(int64_t /* offset */,
105 Whence /* whence */,
106 uint64_t* /* new_position */,
107 ErrorPtr* error) {
108 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
109 }
110
ReadNonBlocking(void * buffer,size_t size_to_read,size_t * size_read,bool * end_of_stream,ErrorPtr * error)111 bool InputStreamSet::ReadNonBlocking(void* buffer,
112 size_t size_to_read,
113 size_t* size_read,
114 bool* end_of_stream,
115 ErrorPtr* error) {
116 if (!IsOpen())
117 return stream_utils::ErrorStreamClosed(FROM_HERE, error);
118
119 while (!source_streams_.empty()) {
120 Stream* stream = source_streams_.front();
121 bool eos = false;
122 if (!stream->ReadNonBlocking(buffer, size_to_read, size_read, &eos, error))
123 return false;
124
125 if (*size_read > 0 || !eos) {
126 if (end_of_stream)
127 *end_of_stream = false;
128 return true;
129 }
130
131 source_streams_.erase(source_streams_.begin());
132 }
133 *size_read = 0;
134 if (end_of_stream)
135 *end_of_stream = true;
136 return true;
137 }
138
WriteNonBlocking(const void *,size_t,size_t *,ErrorPtr * error)139 bool InputStreamSet::WriteNonBlocking(const void* /* buffer */,
140 size_t /* size_to_write */,
141 size_t* /* size_written */,
142 ErrorPtr* error) {
143 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
144 }
145
CloseBlocking(ErrorPtr * error)146 bool InputStreamSet::CloseBlocking(ErrorPtr* error) {
147 bool success = true;
148 // We want to close only the owned streams.
149 for (StreamPtr& stream_ptr : owned_source_streams_) {
150 if (!stream_ptr->CloseBlocking(error))
151 success = false; // Keep going for other streams...
152 }
153 owned_source_streams_.clear();
154 source_streams_.clear();
155 initial_stream_size_ = 0;
156 closed_ = true;
157 return success;
158 }
159
WaitForData(AccessMode mode,const base::Callback<void (AccessMode)> & callback,ErrorPtr * error)160 bool InputStreamSet::WaitForData(
161 AccessMode mode,
162 const base::Callback<void(AccessMode)>& callback,
163 ErrorPtr* error) {
164 if (!IsOpen())
165 return stream_utils::ErrorStreamClosed(FROM_HERE, error);
166
167 if (stream_utils::IsWriteAccessMode(mode))
168 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
169
170 if (!source_streams_.empty()) {
171 Stream* stream = source_streams_.front();
172 return stream->WaitForData(mode, callback, error);
173 }
174
175 MessageLoop::current()->PostTask(FROM_HERE, base::BindOnce(callback, mode));
176 return true;
177 }
178
WaitForDataBlocking(AccessMode in_mode,base::TimeDelta timeout,AccessMode * out_mode,ErrorPtr * error)179 bool InputStreamSet::WaitForDataBlocking(AccessMode in_mode,
180 base::TimeDelta timeout,
181 AccessMode* out_mode,
182 ErrorPtr* error) {
183 if (!IsOpen())
184 return stream_utils::ErrorStreamClosed(FROM_HERE, error);
185
186 if (stream_utils::IsWriteAccessMode(in_mode))
187 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
188
189 if (!source_streams_.empty()) {
190 Stream* stream = source_streams_.front();
191 return stream->WaitForDataBlocking(in_mode, timeout, out_mode, error);
192 }
193
194 if (out_mode)
195 *out_mode = in_mode;
196 return true;
197 }
198
CancelPendingAsyncOperations()199 void InputStreamSet::CancelPendingAsyncOperations() {
200 if (IsOpen() && !source_streams_.empty()) {
201 Stream* stream = source_streams_.front();
202 stream->CancelPendingAsyncOperations();
203 }
204 Stream::CancelPendingAsyncOperations();
205 }
206
207 } // namespace brillo
208