xref: /aosp_15_r20/external/perfetto/src/base/threading/stream_unittest.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "perfetto/ext/base/threading/stream.h"
18 
19 #include <vector>
20 
21 #include "perfetto/base/platform_handle.h"
22 #include "perfetto/base/status.h"
23 #include "perfetto/ext/base/event_fd.h"
24 #include "perfetto/ext/base/threading/future_combinators.h"
25 #include "perfetto/ext/base/threading/poll.h"
26 #include "test/gtest_and_gmock.h"
27 
28 namespace perfetto {
29 namespace base {
30 namespace {
31 
32 using testing::_;
33 using testing::ElementsAre;
34 using testing::Return;
35 using testing::UnorderedElementsAre;
36 
37 template <typename T>
38 class MockPollable : public FuturePollable<T> {
39  public:
40   MOCK_METHOD(FuturePollResult<T>, Poll, (PollContext*), (override));
41 };
42 
43 template <typename T>
44 class MockStreamPollable : public StreamPollable<T> {
45  public:
46   MOCK_METHOD(StreamPollResult<T>, PollNext, (PollContext*), (override));
47 };
48 
49 class StreamUnittest : public ::testing::Test {
50  protected:
51   base::FlatSet<base::PlatformHandle> interested_;
52   base::FlatSet<base::PlatformHandle> ready_;
53   PollContext ctx_{&interested_, &ready_};
54 };
55 
TEST_F(StreamUnittest,PollableImmediateResult)56 TEST_F(StreamUnittest, PollableImmediateResult) {
57   std::unique_ptr<MockStreamPollable<int>> int_pollable(
58       new MockStreamPollable<int>());
59   EXPECT_CALL(*int_pollable, PollNext(_))
60       .WillOnce(Return(StreamPollResult<int>(0)));
61 
62   base::Stream<int> stream(std::move(int_pollable));
63   auto res = stream.PollNext(&ctx_);
64   ASSERT_FALSE(res.IsPending());
65   ASSERT_EQ(res.item(), 0);
66 }
67 
TEST_F(StreamUnittest,PollablePendingThenResult)68 TEST_F(StreamUnittest, PollablePendingThenResult) {
69   std::unique_ptr<MockStreamPollable<int>> int_pollable(
70       new MockStreamPollable<int>());
71   EXPECT_CALL(*int_pollable, PollNext(_))
72       .WillOnce(Return(PendingPollResult()))
73       .WillOnce(Return(StreamPollResult<int>(1)))
74       .WillOnce(Return(DonePollResult()));
75 
76   base::Stream<int> stream(std::move(int_pollable));
77   ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
78   ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
79   ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
80 }
81 
TEST_F(StreamUnittest,Map)82 TEST_F(StreamUnittest, Map) {
83   std::unique_ptr<MockStreamPollable<int>> int_pollable(
84       new MockStreamPollable<int>());
85   EXPECT_CALL(*int_pollable, PollNext(_))
86       .WillOnce(Return(PendingPollResult()))
87       .WillOnce(Return(StreamPollResult<int>(1)))
88       .WillOnce(Return(PendingPollResult()))
89       .WillOnce(Return(StreamPollResult<int>(2)))
90       .WillOnce(Return(DonePollResult()));
91 
92   auto stream = base::Stream<int>(std::move(int_pollable))
93                     .MapFuture([](int res) -> base::Future<std::string> {
94                       return std::to_string(res);
95                     });
96   ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
97   ASSERT_EQ(stream.PollNext(&ctx_).item(), "1");
98   ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
99   ASSERT_EQ(stream.PollNext(&ctx_).item(), "2");
100   ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
101 }
102 
TEST_F(StreamUnittest,Concat)103 TEST_F(StreamUnittest, Concat) {
104   std::unique_ptr<MockStreamPollable<int>> int_pollable(
105       new MockStreamPollable<int>());
106   EXPECT_CALL(*int_pollable, PollNext(_))
107       .WillOnce(Return(PendingPollResult()))
108       .WillOnce(Return(StreamPollResult<int>(1)))
109       .WillOnce(Return(StreamPollResult<int>(2)))
110       .WillOnce(Return(DonePollResult()));
111 
112   std::unique_ptr<MockStreamPollable<int>> concat_pollable(
113       new MockStreamPollable<int>());
114   EXPECT_CALL(*concat_pollable, PollNext(_))
115       .WillOnce(Return(PendingPollResult()))
116       .WillOnce(Return(StreamPollResult<int>(3)))
117       .WillOnce(Return(PendingPollResult()))
118       .WillOnce(Return(StreamPollResult<int>(4)))
119       .WillOnce(Return(DonePollResult()));
120 
121   auto stream = base::Stream<int>(std::move(int_pollable))
122                     .Concat(base::Stream<int>(std::move(concat_pollable)));
123   ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
124   ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
125   ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
126   ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
127   ASSERT_EQ(stream.PollNext(&ctx_).item(), 3);
128   ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
129   ASSERT_EQ(stream.PollNext(&ctx_).item(), 4);
130   ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
131 }
132 
TEST_F(StreamUnittest,AllOkCollectorEarly)133 TEST_F(StreamUnittest, AllOkCollectorEarly) {
134   std::unique_ptr<MockStreamPollable<base::Status>> pollable(
135       new MockStreamPollable<base::Status>());
136   EXPECT_CALL(*pollable, PollNext(_))
137       .WillOnce(Return(PendingPollResult()))
138       .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
139       .WillOnce(Return(PendingPollResult()))
140       .WillOnce(Return(StreamPollResult<base::Status>(base::ErrStatus("Bad"))));
141 
142   auto future = base::Stream<base::Status>(std::move(pollable))
143                     .Collect(base::AllOkCollector());
144   ASSERT_TRUE(future.Poll(&ctx_).IsPending());
145   ASSERT_TRUE(future.Poll(&ctx_).IsPending());
146   ASSERT_FALSE(future.Poll(&ctx_).item().ok());
147 }
148 
TEST_F(StreamUnittest,AllOkCollectorComplete)149 TEST_F(StreamUnittest, AllOkCollectorComplete) {
150   std::unique_ptr<MockStreamPollable<base::Status>> pollable(
151       new MockStreamPollable<base::Status>());
152   EXPECT_CALL(*pollable, PollNext(_))
153       .WillOnce(Return(PendingPollResult()))
154       .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
155       .WillOnce(Return(PendingPollResult()))
156       .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
157       .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
158       .WillOnce(Return(DonePollResult()));
159 
160   auto future = base::Stream<base::Status>(std::move(pollable))
161                     .Collect(base::AllOkCollector());
162   ASSERT_TRUE(future.Poll(&ctx_).IsPending());
163   ASSERT_TRUE(future.Poll(&ctx_).IsPending());
164   ASSERT_TRUE(future.Poll(&ctx_).item().ok());
165 }
166 
TEST_F(StreamUnittest,ToFutureCheckedCollector)167 TEST_F(StreamUnittest, ToFutureCheckedCollector) {
168   std::unique_ptr<MockStreamPollable<base::Status>> pollable(
169       new MockStreamPollable<base::Status>());
170   EXPECT_CALL(*pollable, PollNext(_))
171       .WillOnce(Return(PendingPollResult()))
172       .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
173       .WillOnce(Return(DonePollResult()));
174 
175   auto future = base::Stream<base::Status>(std::move(pollable))
176                     .Collect(base::ToFutureCheckedCollector<base::Status>());
177   ASSERT_TRUE(future.Poll(&ctx_).IsPending());
178   ASSERT_TRUE(future.Poll(&ctx_).item().ok());
179 }
180 
TEST_F(StreamUnittest,StatusOrCollectorEarly)181 TEST_F(StreamUnittest, StatusOrCollectorEarly) {
182   std::unique_ptr<MockStreamPollable<base::StatusOr<int>>> pollable(
183       new MockStreamPollable<base::StatusOr<int>>());
184   EXPECT_CALL(*pollable, PollNext(_))
185       .WillOnce(Return(PendingPollResult()))
186       .WillOnce(Return(StreamPollResult<base::StatusOr<int>>(1024)))
187       .WillOnce(Return(PendingPollResult()))
188       .WillOnce(Return(
189           StreamPollResult<base::StatusOr<int>>(base::ErrStatus("Bad"))));
190 
191   auto future = base::Stream<base::StatusOr<int>>(std::move(pollable))
192                     .Collect(base::StatusOrVectorCollector<int>());
193   ASSERT_TRUE(future.Poll(&ctx_).IsPending());
194   ASSERT_TRUE(future.Poll(&ctx_).IsPending());
195   ASSERT_FALSE(future.Poll(&ctx_).item().ok());
196 }
197 
TEST_F(StreamUnittest,StatusOrCollectorComplete)198 TEST_F(StreamUnittest, StatusOrCollectorComplete) {
199   std::unique_ptr<MockStreamPollable<base::StatusOr<int>>> pollable(
200       new MockStreamPollable<base::StatusOr<int>>());
201   EXPECT_CALL(*pollable, PollNext(_))
202       .WillOnce(Return(PendingPollResult()))
203       .WillOnce(Return(StreamPollResult<base::StatusOr<int>>(1024)))
204       .WillOnce(Return(PendingPollResult()))
205       .WillOnce(Return(StreamPollResult<base::StatusOr<int>>(2048)))
206       .WillOnce(Return(DonePollResult()));
207 
208   auto future = base::Stream<base::StatusOr<int>>(std::move(pollable))
209                     .Collect(base::StatusOrVectorCollector<int>());
210   ASSERT_TRUE(future.Poll(&ctx_).IsPending());
211   ASSERT_TRUE(future.Poll(&ctx_).IsPending());
212   ASSERT_THAT(future.Poll(&ctx_).item().value(), ElementsAre(1024, 2048));
213 }
214 
TEST_F(StreamUnittest,StreamFrom)215 TEST_F(StreamUnittest, StreamFrom) {
216   auto stream = base::StreamFrom(std::vector<int>({1, 2, 4}));
217 
218   ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
219   ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
220   ASSERT_EQ(stream.PollNext(&ctx_).item(), 4);
221   ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
222 }
223 
TEST_F(StreamUnittest,EmptyStream)224 TEST_F(StreamUnittest, EmptyStream) {
225   auto stream = base::EmptyStream<int>();
226   ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
227 }
228 
TEST_F(StreamUnittest,StreamOf)229 TEST_F(StreamUnittest, StreamOf) {
230   auto stream = base::StreamOf(1, 2);
231 
232   ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
233   ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
234   ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
235 }
236 
TEST_F(StreamUnittest,StreamFromFuture)237 TEST_F(StreamUnittest, StreamFromFuture) {
238   std::unique_ptr<MockPollable<int>> int_pollable(new MockPollable<int>());
239   EXPECT_CALL(*int_pollable, Poll(_))
240       .WillOnce(Return(PendingPollResult()))
241       .WillOnce(Return(FuturePollResult<int>(1)));
242 
243   auto stream =
244       base::StreamFromFuture(base::Future<int>(std::move(int_pollable)));
245 
246   ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
247   ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
248   ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
249 }
250 
TEST_F(StreamUnittest,OnDestroyStream)251 TEST_F(StreamUnittest, OnDestroyStream) {
252   bool destroyed = false;
253   {
254     auto stream =
255         base::OnDestroyStream<int>([&destroyed]() { destroyed = true; });
256     ASSERT_FALSE(destroyed);
257     ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
258     ASSERT_FALSE(destroyed);
259   }
260   ASSERT_TRUE(destroyed);
261 }
262 
TEST_F(StreamUnittest,FlattenStreams)263 TEST_F(StreamUnittest, FlattenStreams) {
264   EventFd event_fd1, event_fd2, event_fd3, event_fd4;
265   const PlatformHandle fd1 = event_fd1.fd(), fd2 = event_fd2.fd(),
266                        fd3 = event_fd3.fd(), fd4 = event_fd4.fd();
267   std::unique_ptr<MockStreamPollable<int>> a(new MockStreamPollable<int>());
268   EXPECT_CALL(*a, PollNext(_))
269       .WillOnce([fd1](PollContext* ctx) {
270         ctx->RegisterInterested(fd1);
271         return PendingPollResult();
272       })
273       .WillOnce(Return(StreamPollResult<int>(1)))
274       .WillOnce(Return(DonePollResult()));
275 
276   std::unique_ptr<MockStreamPollable<int>> b(new MockStreamPollable<int>());
277   EXPECT_CALL(*b, PollNext(_))
278       .WillOnce([fd2](PollContext* ctx) {
279         ctx->RegisterInterested(fd2);
280         return PendingPollResult();
281       })
282       .WillOnce([fd2](PollContext* ctx) {
283         ctx->RegisterInterested(fd2);
284         return PendingPollResult();
285       })
286       .WillOnce(Return(StreamPollResult<int>(2)))
287       .WillOnce(Return(DonePollResult()));
288 
289   std::unique_ptr<MockStreamPollable<int>> c(new MockStreamPollable<int>());
290   EXPECT_CALL(*c, PollNext(_))
291       .WillOnce(Return(StreamPollResult<int>(3)))
292       .WillOnce([fd3, fd4](PollContext* ctx) {
293         ctx->RegisterInterested(fd3);
294         ctx->RegisterInterested(fd4);
295         return PendingPollResult();
296       })
297       .WillOnce(Return(DonePollResult()));
298 
299   std::vector<Stream<int>> streams;
300   streams.emplace_back(std::move(a));
301   streams.emplace_back(std::move(b));
302   streams.emplace_back(std::move(c));
303 
304   auto stream = base::FlattenStreams(std::move(streams));
305   ASSERT_EQ(stream.PollNext(&ctx_).item(), 3);
306   ASSERT_THAT(interested_, ElementsAre());
307 
308   ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
309   ASSERT_THAT(interested_, UnorderedElementsAre(fd1, fd2, fd3, fd4));
310 
311   interested_.clear();
312   ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
313   ASSERT_THAT(interested_, UnorderedElementsAre(fd1, fd2, fd3, fd4));
314 
315   interested_.clear();
316   ready_ = {fd1};
317   ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
318   ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
319   ASSERT_THAT(interested_, UnorderedElementsAre(fd2, fd3, fd4));
320 
321   interested_.clear();
322   ready_ = {};
323   ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
324   ASSERT_THAT(interested_, ElementsAre(fd2, fd3, fd4));
325 
326   interested_.clear();
327   ready_ = {fd1, fd2, fd3};
328   ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
329   ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
330   ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
331 }
332 
333 }  // namespace
334 }  // namespace base
335 }  // namespace perfetto
336