xref: /aosp_15_r20/external/perfetto/src/base/threading/spawn_unittest.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "perfetto/ext/base/threading/spawn.h"
18 
19 #include <memory>
20 #include <optional>
21 
22 #include "perfetto/base/compiler.h"
23 #include "perfetto/ext/base/event_fd.h"
24 #include "perfetto/ext/base/thread_task_runner.h"
25 #include "perfetto/ext/base/threading/future.h"
26 #include "perfetto/ext/base/threading/poll.h"
27 #include "perfetto/ext/base/threading/stream.h"
28 #include "perfetto/ext/base/threading/util.h"
29 #include "perfetto/ext/base/unix_task_runner.h"
30 #include "src/base/test/test_task_runner.h"
31 #include "test/gtest_and_gmock.h"
32 
33 namespace perfetto {
34 namespace base {
35 namespace {
36 
37 using ::testing::_;
38 using ::testing::Return;
39 
40 template <typename T>
41 class MockFuturePollable : public FuturePollable<T> {
42  public:
43   MOCK_METHOD(FuturePollResult<T>, Poll, (PollContext*), (override));
44 };
45 
46 template <typename T>
47 class MockStreamPollable : public StreamPollable<T> {
48  public:
49   MOCK_METHOD(StreamPollResult<T>, PollNext, (PollContext*), (override));
50 };
51 
52 class SpawnUnittest : public testing::Test {
53  protected:
Drop(base::SpawnHandle)54   void Drop(base::SpawnHandle) {}
Drop(base::Stream<int>)55   void Drop(base::Stream<int>) {}
56 
57   base::TestTaskRunner task_runner_;
58 
59   base::FlatSet<base::PlatformHandle> interested_;
60   base::FlatSet<base::PlatformHandle> ready_;
61   PollContext ctx_{&interested_, &ready_};
62 
63   base::EventFd fd_;
64   std::unique_ptr<MockFuturePollable<int>> future_pollable_ =
65       std::make_unique<MockFuturePollable<int>>();
66   std::unique_ptr<MockStreamPollable<int>> stream_pollable_ =
67       std::make_unique<MockStreamPollable<int>>();
68 };
69 
TEST_F(SpawnUnittest,SpawnFuture)70 TEST_F(SpawnUnittest, SpawnFuture) {
71   EXPECT_CALL(*future_pollable_, Poll(_))
72       .WillOnce([this](PollContext* ctx) {
73         fd_.Clear();
74         ctx->RegisterInterested(fd_.fd());
75         return PendingPollResult();
76       })
77       .WillOnce(Return(FuturePollResult<int>(1024)));
78   auto [handle, future] =
79       SpawnResultFuture<int>(&task_runner_, [this]() mutable {
80         return base::Future<int>(std::move(future_pollable_));
81       });
82   base::ignore_result(handle);
83 
84   task_runner_.RunUntilIdle();
85   ASSERT_TRUE(future.Poll(&ctx_).IsPending());
86 
87   task_runner_.RunUntilIdle();
88   ASSERT_TRUE(future.Poll(&ctx_).IsPending());
89 
90   fd_.Notify();
91   task_runner_.RunUntilIdle();
92 
93   ASSERT_EQ(future.Poll(&ctx_).item(), 1024);
94 }
95 
TEST_F(SpawnUnittest,SpawnStream)96 TEST_F(SpawnUnittest, SpawnStream) {
97   EXPECT_CALL(*stream_pollable_, PollNext(_))
98       .WillOnce([this](PollContext* ctx) {
99         fd_.Clear();
100         ctx->RegisterInterested(fd_.fd());
101         return PendingPollResult();
102       })
103       .WillOnce(Return(StreamPollResult<int>(1024)))
104       .WillOnce([this](PollContext* ctx) {
105         fd_.Clear();
106         ctx->RegisterInterested(fd_.fd());
107         return PendingPollResult();
108       })
109       .WillOnce(Return(StreamPollResult<int>(2048)))
110       .WillOnce(Return(DonePollResult()));
111   auto [handle, stream] =
112       SpawnResultStream<int>(&task_runner_, [this]() mutable {
113         return base::Stream<int>(std::move(stream_pollable_));
114       });
115   base::ignore_result(handle);
116 
117   task_runner_.RunUntilIdle();
118   ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
119 
120   fd_.Notify();
121   task_runner_.RunUntilIdle();
122 
123   ASSERT_EQ(stream.PollNext(&ctx_).item(), 1024);
124 
125   task_runner_.RunUntilIdle();
126   ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
127 
128   fd_.Notify();
129   task_runner_.RunUntilIdle();
130 
131   ASSERT_EQ(stream.PollNext(&ctx_).item(), 2048);
132   ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
133 }
134 
TEST_F(SpawnUnittest,SpawnStreamDropStream)135 TEST_F(SpawnUnittest, SpawnStreamDropStream) {
136   EXPECT_CALL(*stream_pollable_, PollNext(_))
137       .WillOnce([this](PollContext* ctx) {
138         fd_.Clear();
139         ctx->RegisterInterested(fd_.fd());
140         return PendingPollResult();
141       })
142       .WillOnce(Return(StreamPollResult<int>(1)))
143       .WillOnce(Return(StreamPollResult<int>(2)))
144       .WillOnce(Return(StreamPollResult<int>(4)))
145       .WillOnce(Return(StreamPollResult<int>(8)))
146       .WillOnce(Return(StreamPollResult<int>(16)))
147       .WillOnce(Return(StreamPollResult<int>(32)))
148       .WillOnce(Return(StreamPollResult<int>(64)))
149       .WillOnce(Return(StreamPollResult<int>(128)))
150       .WillOnce(Return(StreamPollResult<int>(256)))
151       .WillOnce(Return(StreamPollResult<int>(512)))
152       .WillOnce(Return(DonePollResult()));
153 
154   auto [handle, stream] =
155       SpawnResultStream<int>(&task_runner_, [this]() mutable {
156         return base::Stream<int>(std::move(stream_pollable_));
157       });
158   base::ignore_result(handle);
159 
160   task_runner_.RunUntilIdle();
161   ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
162 
163   fd_.Notify();
164   task_runner_.RunUntilIdle();
165 
166   // We should get the first 4 elements and then nothing more: this corresponds
167   // to the internal channel buffer size being 4.
168   ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
169   ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
170   ASSERT_EQ(stream.PollNext(&ctx_).item(), 4);
171   ASSERT_EQ(stream.PollNext(&ctx_).item(), 8);
172   ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
173 
174   // Should fill up a bunch more elements.
175   task_runner_.RunUntilIdle();
176 
177   // Drop the stream.
178   Drop(std::move(stream));
179 
180   // This should complete the stream.
181   task_runner_.RunUntilIdle();
182 
183   // Drop the handle and ensure any resulting is completed.
184   Drop(std::move(handle));
185   task_runner_.RunUntilIdle();
186 }
187 
TEST_F(SpawnUnittest,SpawnStreamDropHandle)188 TEST_F(SpawnUnittest, SpawnStreamDropHandle) {
189   EXPECT_CALL(*stream_pollable_, PollNext(_))
190       .WillOnce([this](PollContext* ctx) {
191         fd_.Clear();
192         ctx->RegisterInterested(fd_.fd());
193         return PendingPollResult();
194       })
195       .WillOnce(Return(StreamPollResult<int>(1)))
196       .WillOnce(Return(StreamPollResult<int>(2)))
197       .WillOnce(Return(StreamPollResult<int>(4)))
198       .WillOnce(Return(StreamPollResult<int>(8)))
199       .WillOnce(Return(StreamPollResult<int>(16)))
200       .WillOnce(Return(StreamPollResult<int>(32)))
201       .WillOnce(Return(StreamPollResult<int>(64)))
202       .WillOnce(Return(StreamPollResult<int>(128)))
203       .WillOnce(Return(DonePollResult()));
204 
205   base::TestTaskRunner task_runner;
206   auto [handle, stream] =
207       SpawnResultStream<int>(&task_runner, [this]() mutable {
208         return base::Stream<int>(std::move(stream_pollable_));
209       });
210   base::ignore_result(handle);
211 
212   task_runner.RunUntilIdle();
213   ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
214 
215   fd_.Notify();
216   task_runner.RunUntilIdle();
217 
218   // We should get the first 4 elements and then nothing more: this corresponds
219   // to the internal channel buffer size being 4.
220   ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
221   ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
222   ASSERT_EQ(stream.PollNext(&ctx_).item(), 4);
223   ASSERT_EQ(stream.PollNext(&ctx_).item(), 8);
224   ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
225 
226   // Should fill up a bunch more elements.
227   task_runner.RunUntilIdle();
228 
229   // Drop the handle.
230   Drop(std::move(handle));
231 
232   // We should just get the next four buffered elements and the stream should
233   // complete.
234   ASSERT_EQ(stream.PollNext(&ctx_).item(), 16);
235   ASSERT_EQ(stream.PollNext(&ctx_).item(), 32);
236   ASSERT_EQ(stream.PollNext(&ctx_).item(), 64);
237   ASSERT_EQ(stream.PollNext(&ctx_).item(), 128);
238   ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
239 }
240 
241 }  // namespace
242 }  // namespace base
243 }  // namespace perfetto
244