xref: /aosp_15_r20/external/libbrillo/brillo/streams/input_stream_set.cc (revision 1a96fba65179ea7d3f56207137718607415c5953)
1 // Copyright 2015 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <brillo/streams/input_stream_set.h>
6 
7 #include <utility>
8 
9 #include <base/bind.h>
10 #include <brillo/message_loops/message_loop.h>
11 #include <brillo/streams/stream_errors.h>
12 #include <brillo/streams/stream_utils.h>
13 
14 namespace brillo {
15 
InputStreamSet(std::vector<Stream * > source_streams,std::vector<StreamPtr> owned_source_streams,uint64_t initial_stream_size)16 InputStreamSet::InputStreamSet(
17     std::vector<Stream*> source_streams,
18     std::vector<StreamPtr> owned_source_streams,
19     uint64_t initial_stream_size)
20     : source_streams_{std::move(source_streams)},
21       owned_source_streams_{std::move(owned_source_streams)},
22       initial_stream_size_{initial_stream_size} {}
23 
Create(std::vector<Stream * > source_streams,std::vector<StreamPtr> owned_source_streams,ErrorPtr * error)24 StreamPtr InputStreamSet::Create(std::vector<Stream*> source_streams,
25                                  std::vector<StreamPtr> owned_source_streams,
26                                  ErrorPtr* error) {
27   StreamPtr stream;
28 
29   if (source_streams.empty()) {
30     Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
31                  errors::stream::kInvalidParameter,
32                  "Source stream list is empty");
33     return stream;
34   }
35 
36   // Make sure we have only readable streams.
37   for (Stream* src_stream : source_streams) {
38     if (!src_stream->CanRead()) {
39       Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
40                    errors::stream::kInvalidParameter,
41                    "The stream list must contain only readable streams");
42       return stream;
43     }
44   }
45 
46   // We are using remaining size here because the multiplexed stream is not
47   // seekable and the bytes already read are essentially "lost" as far as this
48   // stream is concerned.
49   uint64_t initial_stream_size = 0;
50   for (const Stream* stream : source_streams)
51     initial_stream_size += stream->GetRemainingSize();
52 
53   stream.reset(new InputStreamSet{std::move(source_streams),
54                                   std::move(owned_source_streams),
55                                   initial_stream_size});
56   return stream;
57 }
58 
Create(std::vector<Stream * > source_streams,ErrorPtr * error)59 StreamPtr InputStreamSet::Create(std::vector<Stream*> source_streams,
60                                  ErrorPtr* error) {
61   return Create(std::move(source_streams), {}, error);
62 }
63 
Create(std::vector<StreamPtr> owned_source_streams,ErrorPtr * error)64 StreamPtr InputStreamSet::Create(std::vector<StreamPtr> owned_source_streams,
65                                  ErrorPtr* error) {
66   std::vector<Stream*> source_streams;
67   source_streams.reserve(owned_source_streams.size());
68   for (const StreamPtr& stream : owned_source_streams)
69     source_streams.push_back(stream.get());
70   return Create(std::move(source_streams), std::move(owned_source_streams),
71                 error);
72 }
73 
IsOpen() const74 bool InputStreamSet::IsOpen() const {
75   return !closed_;
76 }
77 
CanGetSize() const78 bool InputStreamSet::CanGetSize() const {
79   bool can_get_size = IsOpen();
80   for (const Stream* stream : source_streams_) {
81     if (!stream->CanGetSize()) {
82       can_get_size = false;
83       break;
84     }
85   }
86   return can_get_size;
87 }
88 
GetSize() const89 uint64_t InputStreamSet::GetSize() const {
90   return initial_stream_size_;
91 }
92 
SetSizeBlocking(uint64_t,ErrorPtr * error)93 bool InputStreamSet::SetSizeBlocking(uint64_t /* size */, ErrorPtr* error) {
94   return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
95 }
96 
GetRemainingSize() const97 uint64_t InputStreamSet::GetRemainingSize() const {
98   uint64_t size = 0;
99   for (const Stream* stream : source_streams_)
100     size += stream->GetRemainingSize();
101   return size;
102 }
103 
Seek(int64_t,Whence,uint64_t *,ErrorPtr * error)104 bool InputStreamSet::Seek(int64_t /* offset */,
105                           Whence /* whence */,
106                           uint64_t* /* new_position */,
107                           ErrorPtr* error) {
108   return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
109 }
110 
ReadNonBlocking(void * buffer,size_t size_to_read,size_t * size_read,bool * end_of_stream,ErrorPtr * error)111 bool InputStreamSet::ReadNonBlocking(void* buffer,
112                                      size_t size_to_read,
113                                      size_t* size_read,
114                                      bool* end_of_stream,
115                                      ErrorPtr* error) {
116   if (!IsOpen())
117     return stream_utils::ErrorStreamClosed(FROM_HERE, error);
118 
119   while (!source_streams_.empty()) {
120     Stream* stream = source_streams_.front();
121     bool eos = false;
122     if (!stream->ReadNonBlocking(buffer, size_to_read, size_read, &eos, error))
123       return false;
124 
125     if (*size_read > 0 || !eos) {
126       if (end_of_stream)
127         *end_of_stream = false;
128       return true;
129     }
130 
131     source_streams_.erase(source_streams_.begin());
132   }
133   *size_read = 0;
134   if (end_of_stream)
135     *end_of_stream = true;
136   return true;
137 }
138 
WriteNonBlocking(const void *,size_t,size_t *,ErrorPtr * error)139 bool InputStreamSet::WriteNonBlocking(const void* /* buffer */,
140                                       size_t /* size_to_write */,
141                                       size_t* /* size_written */,
142                                       ErrorPtr* error) {
143   return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
144 }
145 
CloseBlocking(ErrorPtr * error)146 bool InputStreamSet::CloseBlocking(ErrorPtr* error) {
147   bool success = true;
148   // We want to close only the owned streams.
149   for (StreamPtr& stream_ptr : owned_source_streams_) {
150     if (!stream_ptr->CloseBlocking(error))
151       success = false;  // Keep going for other streams...
152   }
153   owned_source_streams_.clear();
154   source_streams_.clear();
155   initial_stream_size_ = 0;
156   closed_ = true;
157   return success;
158 }
159 
WaitForData(AccessMode mode,const base::Callback<void (AccessMode)> & callback,ErrorPtr * error)160 bool InputStreamSet::WaitForData(
161     AccessMode mode,
162     const base::Callback<void(AccessMode)>& callback,
163     ErrorPtr* error) {
164   if (!IsOpen())
165     return stream_utils::ErrorStreamClosed(FROM_HERE, error);
166 
167   if (stream_utils::IsWriteAccessMode(mode))
168     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
169 
170   if (!source_streams_.empty()) {
171     Stream* stream = source_streams_.front();
172     return stream->WaitForData(mode, callback, error);
173   }
174 
175   MessageLoop::current()->PostTask(FROM_HERE, base::BindOnce(callback, mode));
176   return true;
177 }
178 
WaitForDataBlocking(AccessMode in_mode,base::TimeDelta timeout,AccessMode * out_mode,ErrorPtr * error)179 bool InputStreamSet::WaitForDataBlocking(AccessMode in_mode,
180                                          base::TimeDelta timeout,
181                                          AccessMode* out_mode,
182                                          ErrorPtr* error) {
183   if (!IsOpen())
184     return stream_utils::ErrorStreamClosed(FROM_HERE, error);
185 
186   if (stream_utils::IsWriteAccessMode(in_mode))
187     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
188 
189   if (!source_streams_.empty()) {
190     Stream* stream = source_streams_.front();
191     return stream->WaitForDataBlocking(in_mode, timeout, out_mode, error);
192   }
193 
194   if (out_mode)
195     *out_mode = in_mode;
196   return true;
197 }
198 
CancelPendingAsyncOperations()199 void InputStreamSet::CancelPendingAsyncOperations() {
200   if (IsOpen() && !source_streams_.empty()) {
201     Stream* stream = source_streams_.front();
202     stream->CancelPendingAsyncOperations();
203   }
204   Stream::CancelPendingAsyncOperations();
205 }
206 
207 }  // namespace brillo
208