1 /*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "perfetto/ext/base/threading/stream.h"
18
19 #include <vector>
20
21 #include "perfetto/base/platform_handle.h"
22 #include "perfetto/base/status.h"
23 #include "perfetto/ext/base/event_fd.h"
24 #include "perfetto/ext/base/threading/future_combinators.h"
25 #include "perfetto/ext/base/threading/poll.h"
26 #include "test/gtest_and_gmock.h"
27
28 namespace perfetto {
29 namespace base {
30 namespace {
31
32 using testing::_;
33 using testing::ElementsAre;
34 using testing::Return;
35 using testing::UnorderedElementsAre;
36
37 template <typename T>
38 class MockPollable : public FuturePollable<T> {
39 public:
40 MOCK_METHOD(FuturePollResult<T>, Poll, (PollContext*), (override));
41 };
42
43 template <typename T>
44 class MockStreamPollable : public StreamPollable<T> {
45 public:
46 MOCK_METHOD(StreamPollResult<T>, PollNext, (PollContext*), (override));
47 };
48
49 class StreamUnittest : public ::testing::Test {
50 protected:
51 base::FlatSet<base::PlatformHandle> interested_;
52 base::FlatSet<base::PlatformHandle> ready_;
53 PollContext ctx_{&interested_, &ready_};
54 };
55
TEST_F(StreamUnittest,PollableImmediateResult)56 TEST_F(StreamUnittest, PollableImmediateResult) {
57 std::unique_ptr<MockStreamPollable<int>> int_pollable(
58 new MockStreamPollable<int>());
59 EXPECT_CALL(*int_pollable, PollNext(_))
60 .WillOnce(Return(StreamPollResult<int>(0)));
61
62 base::Stream<int> stream(std::move(int_pollable));
63 auto res = stream.PollNext(&ctx_);
64 ASSERT_FALSE(res.IsPending());
65 ASSERT_EQ(res.item(), 0);
66 }
67
TEST_F(StreamUnittest,PollablePendingThenResult)68 TEST_F(StreamUnittest, PollablePendingThenResult) {
69 std::unique_ptr<MockStreamPollable<int>> int_pollable(
70 new MockStreamPollable<int>());
71 EXPECT_CALL(*int_pollable, PollNext(_))
72 .WillOnce(Return(PendingPollResult()))
73 .WillOnce(Return(StreamPollResult<int>(1)))
74 .WillOnce(Return(DonePollResult()));
75
76 base::Stream<int> stream(std::move(int_pollable));
77 ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
78 ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
79 ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
80 }
81
TEST_F(StreamUnittest,Map)82 TEST_F(StreamUnittest, Map) {
83 std::unique_ptr<MockStreamPollable<int>> int_pollable(
84 new MockStreamPollable<int>());
85 EXPECT_CALL(*int_pollable, PollNext(_))
86 .WillOnce(Return(PendingPollResult()))
87 .WillOnce(Return(StreamPollResult<int>(1)))
88 .WillOnce(Return(PendingPollResult()))
89 .WillOnce(Return(StreamPollResult<int>(2)))
90 .WillOnce(Return(DonePollResult()));
91
92 auto stream = base::Stream<int>(std::move(int_pollable))
93 .MapFuture([](int res) -> base::Future<std::string> {
94 return std::to_string(res);
95 });
96 ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
97 ASSERT_EQ(stream.PollNext(&ctx_).item(), "1");
98 ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
99 ASSERT_EQ(stream.PollNext(&ctx_).item(), "2");
100 ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
101 }
102
TEST_F(StreamUnittest,Concat)103 TEST_F(StreamUnittest, Concat) {
104 std::unique_ptr<MockStreamPollable<int>> int_pollable(
105 new MockStreamPollable<int>());
106 EXPECT_CALL(*int_pollable, PollNext(_))
107 .WillOnce(Return(PendingPollResult()))
108 .WillOnce(Return(StreamPollResult<int>(1)))
109 .WillOnce(Return(StreamPollResult<int>(2)))
110 .WillOnce(Return(DonePollResult()));
111
112 std::unique_ptr<MockStreamPollable<int>> concat_pollable(
113 new MockStreamPollable<int>());
114 EXPECT_CALL(*concat_pollable, PollNext(_))
115 .WillOnce(Return(PendingPollResult()))
116 .WillOnce(Return(StreamPollResult<int>(3)))
117 .WillOnce(Return(PendingPollResult()))
118 .WillOnce(Return(StreamPollResult<int>(4)))
119 .WillOnce(Return(DonePollResult()));
120
121 auto stream = base::Stream<int>(std::move(int_pollable))
122 .Concat(base::Stream<int>(std::move(concat_pollable)));
123 ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
124 ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
125 ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
126 ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
127 ASSERT_EQ(stream.PollNext(&ctx_).item(), 3);
128 ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
129 ASSERT_EQ(stream.PollNext(&ctx_).item(), 4);
130 ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
131 }
132
TEST_F(StreamUnittest,AllOkCollectorEarly)133 TEST_F(StreamUnittest, AllOkCollectorEarly) {
134 std::unique_ptr<MockStreamPollable<base::Status>> pollable(
135 new MockStreamPollable<base::Status>());
136 EXPECT_CALL(*pollable, PollNext(_))
137 .WillOnce(Return(PendingPollResult()))
138 .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
139 .WillOnce(Return(PendingPollResult()))
140 .WillOnce(Return(StreamPollResult<base::Status>(base::ErrStatus("Bad"))));
141
142 auto future = base::Stream<base::Status>(std::move(pollable))
143 .Collect(base::AllOkCollector());
144 ASSERT_TRUE(future.Poll(&ctx_).IsPending());
145 ASSERT_TRUE(future.Poll(&ctx_).IsPending());
146 ASSERT_FALSE(future.Poll(&ctx_).item().ok());
147 }
148
TEST_F(StreamUnittest,AllOkCollectorComplete)149 TEST_F(StreamUnittest, AllOkCollectorComplete) {
150 std::unique_ptr<MockStreamPollable<base::Status>> pollable(
151 new MockStreamPollable<base::Status>());
152 EXPECT_CALL(*pollable, PollNext(_))
153 .WillOnce(Return(PendingPollResult()))
154 .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
155 .WillOnce(Return(PendingPollResult()))
156 .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
157 .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
158 .WillOnce(Return(DonePollResult()));
159
160 auto future = base::Stream<base::Status>(std::move(pollable))
161 .Collect(base::AllOkCollector());
162 ASSERT_TRUE(future.Poll(&ctx_).IsPending());
163 ASSERT_TRUE(future.Poll(&ctx_).IsPending());
164 ASSERT_TRUE(future.Poll(&ctx_).item().ok());
165 }
166
TEST_F(StreamUnittest,ToFutureCheckedCollector)167 TEST_F(StreamUnittest, ToFutureCheckedCollector) {
168 std::unique_ptr<MockStreamPollable<base::Status>> pollable(
169 new MockStreamPollable<base::Status>());
170 EXPECT_CALL(*pollable, PollNext(_))
171 .WillOnce(Return(PendingPollResult()))
172 .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
173 .WillOnce(Return(DonePollResult()));
174
175 auto future = base::Stream<base::Status>(std::move(pollable))
176 .Collect(base::ToFutureCheckedCollector<base::Status>());
177 ASSERT_TRUE(future.Poll(&ctx_).IsPending());
178 ASSERT_TRUE(future.Poll(&ctx_).item().ok());
179 }
180
TEST_F(StreamUnittest,StatusOrCollectorEarly)181 TEST_F(StreamUnittest, StatusOrCollectorEarly) {
182 std::unique_ptr<MockStreamPollable<base::StatusOr<int>>> pollable(
183 new MockStreamPollable<base::StatusOr<int>>());
184 EXPECT_CALL(*pollable, PollNext(_))
185 .WillOnce(Return(PendingPollResult()))
186 .WillOnce(Return(StreamPollResult<base::StatusOr<int>>(1024)))
187 .WillOnce(Return(PendingPollResult()))
188 .WillOnce(Return(
189 StreamPollResult<base::StatusOr<int>>(base::ErrStatus("Bad"))));
190
191 auto future = base::Stream<base::StatusOr<int>>(std::move(pollable))
192 .Collect(base::StatusOrVectorCollector<int>());
193 ASSERT_TRUE(future.Poll(&ctx_).IsPending());
194 ASSERT_TRUE(future.Poll(&ctx_).IsPending());
195 ASSERT_FALSE(future.Poll(&ctx_).item().ok());
196 }
197
TEST_F(StreamUnittest,StatusOrCollectorComplete)198 TEST_F(StreamUnittest, StatusOrCollectorComplete) {
199 std::unique_ptr<MockStreamPollable<base::StatusOr<int>>> pollable(
200 new MockStreamPollable<base::StatusOr<int>>());
201 EXPECT_CALL(*pollable, PollNext(_))
202 .WillOnce(Return(PendingPollResult()))
203 .WillOnce(Return(StreamPollResult<base::StatusOr<int>>(1024)))
204 .WillOnce(Return(PendingPollResult()))
205 .WillOnce(Return(StreamPollResult<base::StatusOr<int>>(2048)))
206 .WillOnce(Return(DonePollResult()));
207
208 auto future = base::Stream<base::StatusOr<int>>(std::move(pollable))
209 .Collect(base::StatusOrVectorCollector<int>());
210 ASSERT_TRUE(future.Poll(&ctx_).IsPending());
211 ASSERT_TRUE(future.Poll(&ctx_).IsPending());
212 ASSERT_THAT(future.Poll(&ctx_).item().value(), ElementsAre(1024, 2048));
213 }
214
TEST_F(StreamUnittest,StreamFrom)215 TEST_F(StreamUnittest, StreamFrom) {
216 auto stream = base::StreamFrom(std::vector<int>({1, 2, 4}));
217
218 ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
219 ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
220 ASSERT_EQ(stream.PollNext(&ctx_).item(), 4);
221 ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
222 }
223
TEST_F(StreamUnittest,EmptyStream)224 TEST_F(StreamUnittest, EmptyStream) {
225 auto stream = base::EmptyStream<int>();
226 ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
227 }
228
TEST_F(StreamUnittest,StreamOf)229 TEST_F(StreamUnittest, StreamOf) {
230 auto stream = base::StreamOf(1, 2);
231
232 ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
233 ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
234 ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
235 }
236
TEST_F(StreamUnittest,StreamFromFuture)237 TEST_F(StreamUnittest, StreamFromFuture) {
238 std::unique_ptr<MockPollable<int>> int_pollable(new MockPollable<int>());
239 EXPECT_CALL(*int_pollable, Poll(_))
240 .WillOnce(Return(PendingPollResult()))
241 .WillOnce(Return(FuturePollResult<int>(1)));
242
243 auto stream =
244 base::StreamFromFuture(base::Future<int>(std::move(int_pollable)));
245
246 ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
247 ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
248 ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
249 }
250
TEST_F(StreamUnittest,OnDestroyStream)251 TEST_F(StreamUnittest, OnDestroyStream) {
252 bool destroyed = false;
253 {
254 auto stream =
255 base::OnDestroyStream<int>([&destroyed]() { destroyed = true; });
256 ASSERT_FALSE(destroyed);
257 ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
258 ASSERT_FALSE(destroyed);
259 }
260 ASSERT_TRUE(destroyed);
261 }
262
TEST_F(StreamUnittest,FlattenStreams)263 TEST_F(StreamUnittest, FlattenStreams) {
264 EventFd event_fd1, event_fd2, event_fd3, event_fd4;
265 const PlatformHandle fd1 = event_fd1.fd(), fd2 = event_fd2.fd(),
266 fd3 = event_fd3.fd(), fd4 = event_fd4.fd();
267 std::unique_ptr<MockStreamPollable<int>> a(new MockStreamPollable<int>());
268 EXPECT_CALL(*a, PollNext(_))
269 .WillOnce([fd1](PollContext* ctx) {
270 ctx->RegisterInterested(fd1);
271 return PendingPollResult();
272 })
273 .WillOnce(Return(StreamPollResult<int>(1)))
274 .WillOnce(Return(DonePollResult()));
275
276 std::unique_ptr<MockStreamPollable<int>> b(new MockStreamPollable<int>());
277 EXPECT_CALL(*b, PollNext(_))
278 .WillOnce([fd2](PollContext* ctx) {
279 ctx->RegisterInterested(fd2);
280 return PendingPollResult();
281 })
282 .WillOnce([fd2](PollContext* ctx) {
283 ctx->RegisterInterested(fd2);
284 return PendingPollResult();
285 })
286 .WillOnce(Return(StreamPollResult<int>(2)))
287 .WillOnce(Return(DonePollResult()));
288
289 std::unique_ptr<MockStreamPollable<int>> c(new MockStreamPollable<int>());
290 EXPECT_CALL(*c, PollNext(_))
291 .WillOnce(Return(StreamPollResult<int>(3)))
292 .WillOnce([fd3, fd4](PollContext* ctx) {
293 ctx->RegisterInterested(fd3);
294 ctx->RegisterInterested(fd4);
295 return PendingPollResult();
296 })
297 .WillOnce(Return(DonePollResult()));
298
299 std::vector<Stream<int>> streams;
300 streams.emplace_back(std::move(a));
301 streams.emplace_back(std::move(b));
302 streams.emplace_back(std::move(c));
303
304 auto stream = base::FlattenStreams(std::move(streams));
305 ASSERT_EQ(stream.PollNext(&ctx_).item(), 3);
306 ASSERT_THAT(interested_, ElementsAre());
307
308 ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
309 ASSERT_THAT(interested_, UnorderedElementsAre(fd1, fd2, fd3, fd4));
310
311 interested_.clear();
312 ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
313 ASSERT_THAT(interested_, UnorderedElementsAre(fd1, fd2, fd3, fd4));
314
315 interested_.clear();
316 ready_ = {fd1};
317 ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
318 ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
319 ASSERT_THAT(interested_, UnorderedElementsAre(fd2, fd3, fd4));
320
321 interested_.clear();
322 ready_ = {};
323 ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
324 ASSERT_THAT(interested_, ElementsAre(fd2, fd3, fd4));
325
326 interested_.clear();
327 ready_ = {fd1, fd2, fd3};
328 ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
329 ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
330 ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
331 }
332
333 } // namespace
334 } // namespace base
335 } // namespace perfetto
336