1*6dbdd20aSAndroid Build Coastguard Worker /*
2*6dbdd20aSAndroid Build Coastguard Worker * Copyright (C) 2023 The Android Open Source Project
3*6dbdd20aSAndroid Build Coastguard Worker *
4*6dbdd20aSAndroid Build Coastguard Worker * Licensed under the Apache License, Version 2.0 (the "License");
5*6dbdd20aSAndroid Build Coastguard Worker * you may not use this file except in compliance with the License.
6*6dbdd20aSAndroid Build Coastguard Worker * You may obtain a copy of the License at
7*6dbdd20aSAndroid Build Coastguard Worker *
8*6dbdd20aSAndroid Build Coastguard Worker * http://www.apache.org/licenses/LICENSE-2.0
9*6dbdd20aSAndroid Build Coastguard Worker *
10*6dbdd20aSAndroid Build Coastguard Worker * Unless required by applicable law or agreed to in writing, software
11*6dbdd20aSAndroid Build Coastguard Worker * distributed under the License is distributed on an "AS IS" BASIS,
12*6dbdd20aSAndroid Build Coastguard Worker * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13*6dbdd20aSAndroid Build Coastguard Worker * See the License for the specific language governing permissions and
14*6dbdd20aSAndroid Build Coastguard Worker * limitations under the License.
15*6dbdd20aSAndroid Build Coastguard Worker */
16*6dbdd20aSAndroid Build Coastguard Worker
17*6dbdd20aSAndroid Build Coastguard Worker #include "perfetto/ext/base/threading/stream.h"
18*6dbdd20aSAndroid Build Coastguard Worker
19*6dbdd20aSAndroid Build Coastguard Worker #include <vector>
20*6dbdd20aSAndroid Build Coastguard Worker
21*6dbdd20aSAndroid Build Coastguard Worker #include "perfetto/base/platform_handle.h"
22*6dbdd20aSAndroid Build Coastguard Worker #include "perfetto/base/status.h"
23*6dbdd20aSAndroid Build Coastguard Worker #include "perfetto/ext/base/event_fd.h"
24*6dbdd20aSAndroid Build Coastguard Worker #include "perfetto/ext/base/threading/future_combinators.h"
25*6dbdd20aSAndroid Build Coastguard Worker #include "perfetto/ext/base/threading/poll.h"
26*6dbdd20aSAndroid Build Coastguard Worker #include "test/gtest_and_gmock.h"
27*6dbdd20aSAndroid Build Coastguard Worker
28*6dbdd20aSAndroid Build Coastguard Worker namespace perfetto {
29*6dbdd20aSAndroid Build Coastguard Worker namespace base {
30*6dbdd20aSAndroid Build Coastguard Worker namespace {
31*6dbdd20aSAndroid Build Coastguard Worker
32*6dbdd20aSAndroid Build Coastguard Worker using testing::_;
33*6dbdd20aSAndroid Build Coastguard Worker using testing::ElementsAre;
34*6dbdd20aSAndroid Build Coastguard Worker using testing::Return;
35*6dbdd20aSAndroid Build Coastguard Worker using testing::UnorderedElementsAre;
36*6dbdd20aSAndroid Build Coastguard Worker
37*6dbdd20aSAndroid Build Coastguard Worker template <typename T>
38*6dbdd20aSAndroid Build Coastguard Worker class MockPollable : public FuturePollable<T> {
39*6dbdd20aSAndroid Build Coastguard Worker public:
40*6dbdd20aSAndroid Build Coastguard Worker MOCK_METHOD(FuturePollResult<T>, Poll, (PollContext*), (override));
41*6dbdd20aSAndroid Build Coastguard Worker };
42*6dbdd20aSAndroid Build Coastguard Worker
43*6dbdd20aSAndroid Build Coastguard Worker template <typename T>
44*6dbdd20aSAndroid Build Coastguard Worker class MockStreamPollable : public StreamPollable<T> {
45*6dbdd20aSAndroid Build Coastguard Worker public:
46*6dbdd20aSAndroid Build Coastguard Worker MOCK_METHOD(StreamPollResult<T>, PollNext, (PollContext*), (override));
47*6dbdd20aSAndroid Build Coastguard Worker };
48*6dbdd20aSAndroid Build Coastguard Worker
49*6dbdd20aSAndroid Build Coastguard Worker class StreamUnittest : public ::testing::Test {
50*6dbdd20aSAndroid Build Coastguard Worker protected:
51*6dbdd20aSAndroid Build Coastguard Worker base::FlatSet<base::PlatformHandle> interested_;
52*6dbdd20aSAndroid Build Coastguard Worker base::FlatSet<base::PlatformHandle> ready_;
53*6dbdd20aSAndroid Build Coastguard Worker PollContext ctx_{&interested_, &ready_};
54*6dbdd20aSAndroid Build Coastguard Worker };
55*6dbdd20aSAndroid Build Coastguard Worker
TEST_F(StreamUnittest,PollableImmediateResult)56*6dbdd20aSAndroid Build Coastguard Worker TEST_F(StreamUnittest, PollableImmediateResult) {
57*6dbdd20aSAndroid Build Coastguard Worker std::unique_ptr<MockStreamPollable<int>> int_pollable(
58*6dbdd20aSAndroid Build Coastguard Worker new MockStreamPollable<int>());
59*6dbdd20aSAndroid Build Coastguard Worker EXPECT_CALL(*int_pollable, PollNext(_))
60*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(StreamPollResult<int>(0)));
61*6dbdd20aSAndroid Build Coastguard Worker
62*6dbdd20aSAndroid Build Coastguard Worker base::Stream<int> stream(std::move(int_pollable));
63*6dbdd20aSAndroid Build Coastguard Worker auto res = stream.PollNext(&ctx_);
64*6dbdd20aSAndroid Build Coastguard Worker ASSERT_FALSE(res.IsPending());
65*6dbdd20aSAndroid Build Coastguard Worker ASSERT_EQ(res.item(), 0);
66*6dbdd20aSAndroid Build Coastguard Worker }
67*6dbdd20aSAndroid Build Coastguard Worker
TEST_F(StreamUnittest,PollablePendingThenResult)68*6dbdd20aSAndroid Build Coastguard Worker TEST_F(StreamUnittest, PollablePendingThenResult) {
69*6dbdd20aSAndroid Build Coastguard Worker std::unique_ptr<MockStreamPollable<int>> int_pollable(
70*6dbdd20aSAndroid Build Coastguard Worker new MockStreamPollable<int>());
71*6dbdd20aSAndroid Build Coastguard Worker EXPECT_CALL(*int_pollable, PollNext(_))
72*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(PendingPollResult()))
73*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(StreamPollResult<int>(1)))
74*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(DonePollResult()));
75*6dbdd20aSAndroid Build Coastguard Worker
76*6dbdd20aSAndroid Build Coastguard Worker base::Stream<int> stream(std::move(int_pollable));
77*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
78*6dbdd20aSAndroid Build Coastguard Worker ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
79*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
80*6dbdd20aSAndroid Build Coastguard Worker }
81*6dbdd20aSAndroid Build Coastguard Worker
TEST_F(StreamUnittest,Map)82*6dbdd20aSAndroid Build Coastguard Worker TEST_F(StreamUnittest, Map) {
83*6dbdd20aSAndroid Build Coastguard Worker std::unique_ptr<MockStreamPollable<int>> int_pollable(
84*6dbdd20aSAndroid Build Coastguard Worker new MockStreamPollable<int>());
85*6dbdd20aSAndroid Build Coastguard Worker EXPECT_CALL(*int_pollable, PollNext(_))
86*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(PendingPollResult()))
87*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(StreamPollResult<int>(1)))
88*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(PendingPollResult()))
89*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(StreamPollResult<int>(2)))
90*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(DonePollResult()));
91*6dbdd20aSAndroid Build Coastguard Worker
92*6dbdd20aSAndroid Build Coastguard Worker auto stream = base::Stream<int>(std::move(int_pollable))
93*6dbdd20aSAndroid Build Coastguard Worker .MapFuture([](int res) -> base::Future<std::string> {
94*6dbdd20aSAndroid Build Coastguard Worker return std::to_string(res);
95*6dbdd20aSAndroid Build Coastguard Worker });
96*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
97*6dbdd20aSAndroid Build Coastguard Worker ASSERT_EQ(stream.PollNext(&ctx_).item(), "1");
98*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
99*6dbdd20aSAndroid Build Coastguard Worker ASSERT_EQ(stream.PollNext(&ctx_).item(), "2");
100*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
101*6dbdd20aSAndroid Build Coastguard Worker }
102*6dbdd20aSAndroid Build Coastguard Worker
TEST_F(StreamUnittest,Concat)103*6dbdd20aSAndroid Build Coastguard Worker TEST_F(StreamUnittest, Concat) {
104*6dbdd20aSAndroid Build Coastguard Worker std::unique_ptr<MockStreamPollable<int>> int_pollable(
105*6dbdd20aSAndroid Build Coastguard Worker new MockStreamPollable<int>());
106*6dbdd20aSAndroid Build Coastguard Worker EXPECT_CALL(*int_pollable, PollNext(_))
107*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(PendingPollResult()))
108*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(StreamPollResult<int>(1)))
109*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(StreamPollResult<int>(2)))
110*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(DonePollResult()));
111*6dbdd20aSAndroid Build Coastguard Worker
112*6dbdd20aSAndroid Build Coastguard Worker std::unique_ptr<MockStreamPollable<int>> concat_pollable(
113*6dbdd20aSAndroid Build Coastguard Worker new MockStreamPollable<int>());
114*6dbdd20aSAndroid Build Coastguard Worker EXPECT_CALL(*concat_pollable, PollNext(_))
115*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(PendingPollResult()))
116*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(StreamPollResult<int>(3)))
117*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(PendingPollResult()))
118*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(StreamPollResult<int>(4)))
119*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(DonePollResult()));
120*6dbdd20aSAndroid Build Coastguard Worker
121*6dbdd20aSAndroid Build Coastguard Worker auto stream = base::Stream<int>(std::move(int_pollable))
122*6dbdd20aSAndroid Build Coastguard Worker .Concat(base::Stream<int>(std::move(concat_pollable)));
123*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
124*6dbdd20aSAndroid Build Coastguard Worker ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
125*6dbdd20aSAndroid Build Coastguard Worker ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
126*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
127*6dbdd20aSAndroid Build Coastguard Worker ASSERT_EQ(stream.PollNext(&ctx_).item(), 3);
128*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
129*6dbdd20aSAndroid Build Coastguard Worker ASSERT_EQ(stream.PollNext(&ctx_).item(), 4);
130*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
131*6dbdd20aSAndroid Build Coastguard Worker }
132*6dbdd20aSAndroid Build Coastguard Worker
TEST_F(StreamUnittest,AllOkCollectorEarly)133*6dbdd20aSAndroid Build Coastguard Worker TEST_F(StreamUnittest, AllOkCollectorEarly) {
134*6dbdd20aSAndroid Build Coastguard Worker std::unique_ptr<MockStreamPollable<base::Status>> pollable(
135*6dbdd20aSAndroid Build Coastguard Worker new MockStreamPollable<base::Status>());
136*6dbdd20aSAndroid Build Coastguard Worker EXPECT_CALL(*pollable, PollNext(_))
137*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(PendingPollResult()))
138*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
139*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(PendingPollResult()))
140*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(StreamPollResult<base::Status>(base::ErrStatus("Bad"))));
141*6dbdd20aSAndroid Build Coastguard Worker
142*6dbdd20aSAndroid Build Coastguard Worker auto future = base::Stream<base::Status>(std::move(pollable))
143*6dbdd20aSAndroid Build Coastguard Worker .Collect(base::AllOkCollector());
144*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(future.Poll(&ctx_).IsPending());
145*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(future.Poll(&ctx_).IsPending());
146*6dbdd20aSAndroid Build Coastguard Worker ASSERT_FALSE(future.Poll(&ctx_).item().ok());
147*6dbdd20aSAndroid Build Coastguard Worker }
148*6dbdd20aSAndroid Build Coastguard Worker
TEST_F(StreamUnittest,AllOkCollectorComplete)149*6dbdd20aSAndroid Build Coastguard Worker TEST_F(StreamUnittest, AllOkCollectorComplete) {
150*6dbdd20aSAndroid Build Coastguard Worker std::unique_ptr<MockStreamPollable<base::Status>> pollable(
151*6dbdd20aSAndroid Build Coastguard Worker new MockStreamPollable<base::Status>());
152*6dbdd20aSAndroid Build Coastguard Worker EXPECT_CALL(*pollable, PollNext(_))
153*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(PendingPollResult()))
154*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
155*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(PendingPollResult()))
156*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
157*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
158*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(DonePollResult()));
159*6dbdd20aSAndroid Build Coastguard Worker
160*6dbdd20aSAndroid Build Coastguard Worker auto future = base::Stream<base::Status>(std::move(pollable))
161*6dbdd20aSAndroid Build Coastguard Worker .Collect(base::AllOkCollector());
162*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(future.Poll(&ctx_).IsPending());
163*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(future.Poll(&ctx_).IsPending());
164*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(future.Poll(&ctx_).item().ok());
165*6dbdd20aSAndroid Build Coastguard Worker }
166*6dbdd20aSAndroid Build Coastguard Worker
TEST_F(StreamUnittest,ToFutureCheckedCollector)167*6dbdd20aSAndroid Build Coastguard Worker TEST_F(StreamUnittest, ToFutureCheckedCollector) {
168*6dbdd20aSAndroid Build Coastguard Worker std::unique_ptr<MockStreamPollable<base::Status>> pollable(
169*6dbdd20aSAndroid Build Coastguard Worker new MockStreamPollable<base::Status>());
170*6dbdd20aSAndroid Build Coastguard Worker EXPECT_CALL(*pollable, PollNext(_))
171*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(PendingPollResult()))
172*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
173*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(DonePollResult()));
174*6dbdd20aSAndroid Build Coastguard Worker
175*6dbdd20aSAndroid Build Coastguard Worker auto future = base::Stream<base::Status>(std::move(pollable))
176*6dbdd20aSAndroid Build Coastguard Worker .Collect(base::ToFutureCheckedCollector<base::Status>());
177*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(future.Poll(&ctx_).IsPending());
178*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(future.Poll(&ctx_).item().ok());
179*6dbdd20aSAndroid Build Coastguard Worker }
180*6dbdd20aSAndroid Build Coastguard Worker
TEST_F(StreamUnittest,StatusOrCollectorEarly)181*6dbdd20aSAndroid Build Coastguard Worker TEST_F(StreamUnittest, StatusOrCollectorEarly) {
182*6dbdd20aSAndroid Build Coastguard Worker std::unique_ptr<MockStreamPollable<base::StatusOr<int>>> pollable(
183*6dbdd20aSAndroid Build Coastguard Worker new MockStreamPollable<base::StatusOr<int>>());
184*6dbdd20aSAndroid Build Coastguard Worker EXPECT_CALL(*pollable, PollNext(_))
185*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(PendingPollResult()))
186*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(StreamPollResult<base::StatusOr<int>>(1024)))
187*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(PendingPollResult()))
188*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(
189*6dbdd20aSAndroid Build Coastguard Worker StreamPollResult<base::StatusOr<int>>(base::ErrStatus("Bad"))));
190*6dbdd20aSAndroid Build Coastguard Worker
191*6dbdd20aSAndroid Build Coastguard Worker auto future = base::Stream<base::StatusOr<int>>(std::move(pollable))
192*6dbdd20aSAndroid Build Coastguard Worker .Collect(base::StatusOrVectorCollector<int>());
193*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(future.Poll(&ctx_).IsPending());
194*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(future.Poll(&ctx_).IsPending());
195*6dbdd20aSAndroid Build Coastguard Worker ASSERT_FALSE(future.Poll(&ctx_).item().ok());
196*6dbdd20aSAndroid Build Coastguard Worker }
197*6dbdd20aSAndroid Build Coastguard Worker
TEST_F(StreamUnittest,StatusOrCollectorComplete)198*6dbdd20aSAndroid Build Coastguard Worker TEST_F(StreamUnittest, StatusOrCollectorComplete) {
199*6dbdd20aSAndroid Build Coastguard Worker std::unique_ptr<MockStreamPollable<base::StatusOr<int>>> pollable(
200*6dbdd20aSAndroid Build Coastguard Worker new MockStreamPollable<base::StatusOr<int>>());
201*6dbdd20aSAndroid Build Coastguard Worker EXPECT_CALL(*pollable, PollNext(_))
202*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(PendingPollResult()))
203*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(StreamPollResult<base::StatusOr<int>>(1024)))
204*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(PendingPollResult()))
205*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(StreamPollResult<base::StatusOr<int>>(2048)))
206*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(DonePollResult()));
207*6dbdd20aSAndroid Build Coastguard Worker
208*6dbdd20aSAndroid Build Coastguard Worker auto future = base::Stream<base::StatusOr<int>>(std::move(pollable))
209*6dbdd20aSAndroid Build Coastguard Worker .Collect(base::StatusOrVectorCollector<int>());
210*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(future.Poll(&ctx_).IsPending());
211*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(future.Poll(&ctx_).IsPending());
212*6dbdd20aSAndroid Build Coastguard Worker ASSERT_THAT(future.Poll(&ctx_).item().value(), ElementsAre(1024, 2048));
213*6dbdd20aSAndroid Build Coastguard Worker }
214*6dbdd20aSAndroid Build Coastguard Worker
TEST_F(StreamUnittest,StreamFrom)215*6dbdd20aSAndroid Build Coastguard Worker TEST_F(StreamUnittest, StreamFrom) {
216*6dbdd20aSAndroid Build Coastguard Worker auto stream = base::StreamFrom(std::vector<int>({1, 2, 4}));
217*6dbdd20aSAndroid Build Coastguard Worker
218*6dbdd20aSAndroid Build Coastguard Worker ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
219*6dbdd20aSAndroid Build Coastguard Worker ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
220*6dbdd20aSAndroid Build Coastguard Worker ASSERT_EQ(stream.PollNext(&ctx_).item(), 4);
221*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
222*6dbdd20aSAndroid Build Coastguard Worker }
223*6dbdd20aSAndroid Build Coastguard Worker
TEST_F(StreamUnittest,EmptyStream)224*6dbdd20aSAndroid Build Coastguard Worker TEST_F(StreamUnittest, EmptyStream) {
225*6dbdd20aSAndroid Build Coastguard Worker auto stream = base::EmptyStream<int>();
226*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
227*6dbdd20aSAndroid Build Coastguard Worker }
228*6dbdd20aSAndroid Build Coastguard Worker
TEST_F(StreamUnittest,StreamOf)229*6dbdd20aSAndroid Build Coastguard Worker TEST_F(StreamUnittest, StreamOf) {
230*6dbdd20aSAndroid Build Coastguard Worker auto stream = base::StreamOf(1, 2);
231*6dbdd20aSAndroid Build Coastguard Worker
232*6dbdd20aSAndroid Build Coastguard Worker ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
233*6dbdd20aSAndroid Build Coastguard Worker ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
234*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
235*6dbdd20aSAndroid Build Coastguard Worker }
236*6dbdd20aSAndroid Build Coastguard Worker
TEST_F(StreamUnittest,StreamFromFuture)237*6dbdd20aSAndroid Build Coastguard Worker TEST_F(StreamUnittest, StreamFromFuture) {
238*6dbdd20aSAndroid Build Coastguard Worker std::unique_ptr<MockPollable<int>> int_pollable(new MockPollable<int>());
239*6dbdd20aSAndroid Build Coastguard Worker EXPECT_CALL(*int_pollable, Poll(_))
240*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(PendingPollResult()))
241*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(FuturePollResult<int>(1)));
242*6dbdd20aSAndroid Build Coastguard Worker
243*6dbdd20aSAndroid Build Coastguard Worker auto stream =
244*6dbdd20aSAndroid Build Coastguard Worker base::StreamFromFuture(base::Future<int>(std::move(int_pollable)));
245*6dbdd20aSAndroid Build Coastguard Worker
246*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
247*6dbdd20aSAndroid Build Coastguard Worker ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
248*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
249*6dbdd20aSAndroid Build Coastguard Worker }
250*6dbdd20aSAndroid Build Coastguard Worker
TEST_F(StreamUnittest,OnDestroyStream)251*6dbdd20aSAndroid Build Coastguard Worker TEST_F(StreamUnittest, OnDestroyStream) {
252*6dbdd20aSAndroid Build Coastguard Worker bool destroyed = false;
253*6dbdd20aSAndroid Build Coastguard Worker {
254*6dbdd20aSAndroid Build Coastguard Worker auto stream =
255*6dbdd20aSAndroid Build Coastguard Worker base::OnDestroyStream<int>([&destroyed]() { destroyed = true; });
256*6dbdd20aSAndroid Build Coastguard Worker ASSERT_FALSE(destroyed);
257*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
258*6dbdd20aSAndroid Build Coastguard Worker ASSERT_FALSE(destroyed);
259*6dbdd20aSAndroid Build Coastguard Worker }
260*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(destroyed);
261*6dbdd20aSAndroid Build Coastguard Worker }
262*6dbdd20aSAndroid Build Coastguard Worker
TEST_F(StreamUnittest,FlattenStreams)263*6dbdd20aSAndroid Build Coastguard Worker TEST_F(StreamUnittest, FlattenStreams) {
264*6dbdd20aSAndroid Build Coastguard Worker EventFd event_fd1, event_fd2, event_fd3, event_fd4;
265*6dbdd20aSAndroid Build Coastguard Worker const PlatformHandle fd1 = event_fd1.fd(), fd2 = event_fd2.fd(),
266*6dbdd20aSAndroid Build Coastguard Worker fd3 = event_fd3.fd(), fd4 = event_fd4.fd();
267*6dbdd20aSAndroid Build Coastguard Worker std::unique_ptr<MockStreamPollable<int>> a(new MockStreamPollable<int>());
268*6dbdd20aSAndroid Build Coastguard Worker EXPECT_CALL(*a, PollNext(_))
269*6dbdd20aSAndroid Build Coastguard Worker .WillOnce([fd1](PollContext* ctx) {
270*6dbdd20aSAndroid Build Coastguard Worker ctx->RegisterInterested(fd1);
271*6dbdd20aSAndroid Build Coastguard Worker return PendingPollResult();
272*6dbdd20aSAndroid Build Coastguard Worker })
273*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(StreamPollResult<int>(1)))
274*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(DonePollResult()));
275*6dbdd20aSAndroid Build Coastguard Worker
276*6dbdd20aSAndroid Build Coastguard Worker std::unique_ptr<MockStreamPollable<int>> b(new MockStreamPollable<int>());
277*6dbdd20aSAndroid Build Coastguard Worker EXPECT_CALL(*b, PollNext(_))
278*6dbdd20aSAndroid Build Coastguard Worker .WillOnce([fd2](PollContext* ctx) {
279*6dbdd20aSAndroid Build Coastguard Worker ctx->RegisterInterested(fd2);
280*6dbdd20aSAndroid Build Coastguard Worker return PendingPollResult();
281*6dbdd20aSAndroid Build Coastguard Worker })
282*6dbdd20aSAndroid Build Coastguard Worker .WillOnce([fd2](PollContext* ctx) {
283*6dbdd20aSAndroid Build Coastguard Worker ctx->RegisterInterested(fd2);
284*6dbdd20aSAndroid Build Coastguard Worker return PendingPollResult();
285*6dbdd20aSAndroid Build Coastguard Worker })
286*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(StreamPollResult<int>(2)))
287*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(DonePollResult()));
288*6dbdd20aSAndroid Build Coastguard Worker
289*6dbdd20aSAndroid Build Coastguard Worker std::unique_ptr<MockStreamPollable<int>> c(new MockStreamPollable<int>());
290*6dbdd20aSAndroid Build Coastguard Worker EXPECT_CALL(*c, PollNext(_))
291*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(StreamPollResult<int>(3)))
292*6dbdd20aSAndroid Build Coastguard Worker .WillOnce([fd3, fd4](PollContext* ctx) {
293*6dbdd20aSAndroid Build Coastguard Worker ctx->RegisterInterested(fd3);
294*6dbdd20aSAndroid Build Coastguard Worker ctx->RegisterInterested(fd4);
295*6dbdd20aSAndroid Build Coastguard Worker return PendingPollResult();
296*6dbdd20aSAndroid Build Coastguard Worker })
297*6dbdd20aSAndroid Build Coastguard Worker .WillOnce(Return(DonePollResult()));
298*6dbdd20aSAndroid Build Coastguard Worker
299*6dbdd20aSAndroid Build Coastguard Worker std::vector<Stream<int>> streams;
300*6dbdd20aSAndroid Build Coastguard Worker streams.emplace_back(std::move(a));
301*6dbdd20aSAndroid Build Coastguard Worker streams.emplace_back(std::move(b));
302*6dbdd20aSAndroid Build Coastguard Worker streams.emplace_back(std::move(c));
303*6dbdd20aSAndroid Build Coastguard Worker
304*6dbdd20aSAndroid Build Coastguard Worker auto stream = base::FlattenStreams(std::move(streams));
305*6dbdd20aSAndroid Build Coastguard Worker ASSERT_EQ(stream.PollNext(&ctx_).item(), 3);
306*6dbdd20aSAndroid Build Coastguard Worker ASSERT_THAT(interested_, ElementsAre());
307*6dbdd20aSAndroid Build Coastguard Worker
308*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
309*6dbdd20aSAndroid Build Coastguard Worker ASSERT_THAT(interested_, UnorderedElementsAre(fd1, fd2, fd3, fd4));
310*6dbdd20aSAndroid Build Coastguard Worker
311*6dbdd20aSAndroid Build Coastguard Worker interested_.clear();
312*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
313*6dbdd20aSAndroid Build Coastguard Worker ASSERT_THAT(interested_, UnorderedElementsAre(fd1, fd2, fd3, fd4));
314*6dbdd20aSAndroid Build Coastguard Worker
315*6dbdd20aSAndroid Build Coastguard Worker interested_.clear();
316*6dbdd20aSAndroid Build Coastguard Worker ready_ = {fd1};
317*6dbdd20aSAndroid Build Coastguard Worker ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
318*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
319*6dbdd20aSAndroid Build Coastguard Worker ASSERT_THAT(interested_, UnorderedElementsAre(fd2, fd3, fd4));
320*6dbdd20aSAndroid Build Coastguard Worker
321*6dbdd20aSAndroid Build Coastguard Worker interested_.clear();
322*6dbdd20aSAndroid Build Coastguard Worker ready_ = {};
323*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
324*6dbdd20aSAndroid Build Coastguard Worker ASSERT_THAT(interested_, ElementsAre(fd2, fd3, fd4));
325*6dbdd20aSAndroid Build Coastguard Worker
326*6dbdd20aSAndroid Build Coastguard Worker interested_.clear();
327*6dbdd20aSAndroid Build Coastguard Worker ready_ = {fd1, fd2, fd3};
328*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
329*6dbdd20aSAndroid Build Coastguard Worker ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
330*6dbdd20aSAndroid Build Coastguard Worker ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
331*6dbdd20aSAndroid Build Coastguard Worker }
332*6dbdd20aSAndroid Build Coastguard Worker
333*6dbdd20aSAndroid Build Coastguard Worker } // namespace
334*6dbdd20aSAndroid Build Coastguard Worker } // namespace base
335*6dbdd20aSAndroid Build Coastguard Worker } // namespace perfetto
336