1 /*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "perfetto/ext/base/threading/spawn.h"
18
19 #include <memory>
20 #include <optional>
21
22 #include "perfetto/base/compiler.h"
23 #include "perfetto/ext/base/event_fd.h"
24 #include "perfetto/ext/base/thread_task_runner.h"
25 #include "perfetto/ext/base/threading/future.h"
26 #include "perfetto/ext/base/threading/poll.h"
27 #include "perfetto/ext/base/threading/stream.h"
28 #include "perfetto/ext/base/threading/util.h"
29 #include "perfetto/ext/base/unix_task_runner.h"
30 #include "src/base/test/test_task_runner.h"
31 #include "test/gtest_and_gmock.h"
32
33 namespace perfetto {
34 namespace base {
35 namespace {
36
37 using ::testing::_;
38 using ::testing::Return;
39
40 template <typename T>
41 class MockFuturePollable : public FuturePollable<T> {
42 public:
43 MOCK_METHOD(FuturePollResult<T>, Poll, (PollContext*), (override));
44 };
45
46 template <typename T>
47 class MockStreamPollable : public StreamPollable<T> {
48 public:
49 MOCK_METHOD(StreamPollResult<T>, PollNext, (PollContext*), (override));
50 };
51
52 class SpawnUnittest : public testing::Test {
53 protected:
Drop(base::SpawnHandle)54 void Drop(base::SpawnHandle) {}
Drop(base::Stream<int>)55 void Drop(base::Stream<int>) {}
56
57 base::TestTaskRunner task_runner_;
58
59 base::FlatSet<base::PlatformHandle> interested_;
60 base::FlatSet<base::PlatformHandle> ready_;
61 PollContext ctx_{&interested_, &ready_};
62
63 base::EventFd fd_;
64 std::unique_ptr<MockFuturePollable<int>> future_pollable_ =
65 std::make_unique<MockFuturePollable<int>>();
66 std::unique_ptr<MockStreamPollable<int>> stream_pollable_ =
67 std::make_unique<MockStreamPollable<int>>();
68 };
69
TEST_F(SpawnUnittest,SpawnFuture)70 TEST_F(SpawnUnittest, SpawnFuture) {
71 EXPECT_CALL(*future_pollable_, Poll(_))
72 .WillOnce([this](PollContext* ctx) {
73 fd_.Clear();
74 ctx->RegisterInterested(fd_.fd());
75 return PendingPollResult();
76 })
77 .WillOnce(Return(FuturePollResult<int>(1024)));
78 auto [handle, future] =
79 SpawnResultFuture<int>(&task_runner_, [this]() mutable {
80 return base::Future<int>(std::move(future_pollable_));
81 });
82 base::ignore_result(handle);
83
84 task_runner_.RunUntilIdle();
85 ASSERT_TRUE(future.Poll(&ctx_).IsPending());
86
87 task_runner_.RunUntilIdle();
88 ASSERT_TRUE(future.Poll(&ctx_).IsPending());
89
90 fd_.Notify();
91 task_runner_.RunUntilIdle();
92
93 ASSERT_EQ(future.Poll(&ctx_).item(), 1024);
94 }
95
TEST_F(SpawnUnittest,SpawnStream)96 TEST_F(SpawnUnittest, SpawnStream) {
97 EXPECT_CALL(*stream_pollable_, PollNext(_))
98 .WillOnce([this](PollContext* ctx) {
99 fd_.Clear();
100 ctx->RegisterInterested(fd_.fd());
101 return PendingPollResult();
102 })
103 .WillOnce(Return(StreamPollResult<int>(1024)))
104 .WillOnce([this](PollContext* ctx) {
105 fd_.Clear();
106 ctx->RegisterInterested(fd_.fd());
107 return PendingPollResult();
108 })
109 .WillOnce(Return(StreamPollResult<int>(2048)))
110 .WillOnce(Return(DonePollResult()));
111 auto [handle, stream] =
112 SpawnResultStream<int>(&task_runner_, [this]() mutable {
113 return base::Stream<int>(std::move(stream_pollable_));
114 });
115 base::ignore_result(handle);
116
117 task_runner_.RunUntilIdle();
118 ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
119
120 fd_.Notify();
121 task_runner_.RunUntilIdle();
122
123 ASSERT_EQ(stream.PollNext(&ctx_).item(), 1024);
124
125 task_runner_.RunUntilIdle();
126 ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
127
128 fd_.Notify();
129 task_runner_.RunUntilIdle();
130
131 ASSERT_EQ(stream.PollNext(&ctx_).item(), 2048);
132 ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
133 }
134
TEST_F(SpawnUnittest,SpawnStreamDropStream)135 TEST_F(SpawnUnittest, SpawnStreamDropStream) {
136 EXPECT_CALL(*stream_pollable_, PollNext(_))
137 .WillOnce([this](PollContext* ctx) {
138 fd_.Clear();
139 ctx->RegisterInterested(fd_.fd());
140 return PendingPollResult();
141 })
142 .WillOnce(Return(StreamPollResult<int>(1)))
143 .WillOnce(Return(StreamPollResult<int>(2)))
144 .WillOnce(Return(StreamPollResult<int>(4)))
145 .WillOnce(Return(StreamPollResult<int>(8)))
146 .WillOnce(Return(StreamPollResult<int>(16)))
147 .WillOnce(Return(StreamPollResult<int>(32)))
148 .WillOnce(Return(StreamPollResult<int>(64)))
149 .WillOnce(Return(StreamPollResult<int>(128)))
150 .WillOnce(Return(StreamPollResult<int>(256)))
151 .WillOnce(Return(StreamPollResult<int>(512)))
152 .WillOnce(Return(DonePollResult()));
153
154 auto [handle, stream] =
155 SpawnResultStream<int>(&task_runner_, [this]() mutable {
156 return base::Stream<int>(std::move(stream_pollable_));
157 });
158 base::ignore_result(handle);
159
160 task_runner_.RunUntilIdle();
161 ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
162
163 fd_.Notify();
164 task_runner_.RunUntilIdle();
165
166 // We should get the first 4 elements and then nothing more: this corresponds
167 // to the internal channel buffer size being 4.
168 ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
169 ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
170 ASSERT_EQ(stream.PollNext(&ctx_).item(), 4);
171 ASSERT_EQ(stream.PollNext(&ctx_).item(), 8);
172 ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
173
174 // Should fill up a bunch more elements.
175 task_runner_.RunUntilIdle();
176
177 // Drop the stream.
178 Drop(std::move(stream));
179
180 // This should complete the stream.
181 task_runner_.RunUntilIdle();
182
183 // Drop the handle and ensure any resulting is completed.
184 Drop(std::move(handle));
185 task_runner_.RunUntilIdle();
186 }
187
TEST_F(SpawnUnittest,SpawnStreamDropHandle)188 TEST_F(SpawnUnittest, SpawnStreamDropHandle) {
189 EXPECT_CALL(*stream_pollable_, PollNext(_))
190 .WillOnce([this](PollContext* ctx) {
191 fd_.Clear();
192 ctx->RegisterInterested(fd_.fd());
193 return PendingPollResult();
194 })
195 .WillOnce(Return(StreamPollResult<int>(1)))
196 .WillOnce(Return(StreamPollResult<int>(2)))
197 .WillOnce(Return(StreamPollResult<int>(4)))
198 .WillOnce(Return(StreamPollResult<int>(8)))
199 .WillOnce(Return(StreamPollResult<int>(16)))
200 .WillOnce(Return(StreamPollResult<int>(32)))
201 .WillOnce(Return(StreamPollResult<int>(64)))
202 .WillOnce(Return(StreamPollResult<int>(128)))
203 .WillOnce(Return(DonePollResult()));
204
205 base::TestTaskRunner task_runner;
206 auto [handle, stream] =
207 SpawnResultStream<int>(&task_runner, [this]() mutable {
208 return base::Stream<int>(std::move(stream_pollable_));
209 });
210 base::ignore_result(handle);
211
212 task_runner.RunUntilIdle();
213 ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
214
215 fd_.Notify();
216 task_runner.RunUntilIdle();
217
218 // We should get the first 4 elements and then nothing more: this corresponds
219 // to the internal channel buffer size being 4.
220 ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
221 ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
222 ASSERT_EQ(stream.PollNext(&ctx_).item(), 4);
223 ASSERT_EQ(stream.PollNext(&ctx_).item(), 8);
224 ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
225
226 // Should fill up a bunch more elements.
227 task_runner.RunUntilIdle();
228
229 // Drop the handle.
230 Drop(std::move(handle));
231
232 // We should just get the next four buffered elements and the stream should
233 // complete.
234 ASSERT_EQ(stream.PollNext(&ctx_).item(), 16);
235 ASSERT_EQ(stream.PollNext(&ctx_).item(), 32);
236 ASSERT_EQ(stream.PollNext(&ctx_).item(), 64);
237 ASSERT_EQ(stream.PollNext(&ctx_).item(), 128);
238 ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
239 }
240
241 } // namespace
242 } // namespace base
243 } // namespace perfetto
244