1 /*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "perfetto/ext/base/threading/util.h"
18
19 #include <optional>
20
21 #include "perfetto/base/flat_set.h"
22 #include "perfetto/base/platform_handle.h"
23 #include "perfetto/base/time.h"
24 #include "perfetto/ext/base/event_fd.h"
25 #include "perfetto/ext/base/threading/channel.h"
26 #include "perfetto/ext/base/threading/poll.h"
27 #include "perfetto/ext/base/threading/stream.h"
28 #include "perfetto/ext/base/threading/thread_pool.h"
29 #include "perfetto/ext/base/unix_task_runner.h"
30 #include "perfetto/ext/base/waitable_event.h"
31 #include "test/gtest_and_gmock.h"
32
33 namespace perfetto {
34 namespace base {
35 namespace {
36
WaitForFutureReady(base::Future<int> & stream,base::FlatSet<base::PlatformHandle> & interested,PollContext & ctx)37 int WaitForFutureReady(base::Future<int>& stream,
38 base::FlatSet<base::PlatformHandle>& interested,
39 PollContext& ctx) {
40 auto res = stream.Poll(&ctx);
41 for (; res.IsPending(); res = stream.Poll(&ctx)) {
42 PERFETTO_CHECK(interested.size() == 1);
43 base::BlockUntilReadableFd(*interested.begin());
44 interested = {};
45 }
46 return res.item();
47 }
48
WaitForStreamReady(base::Stream<int> & stream,base::FlatSet<base::PlatformHandle> & interested,PollContext & ctx)49 std::optional<int> WaitForStreamReady(
50 base::Stream<int>& stream,
51 base::FlatSet<base::PlatformHandle>& interested,
52 PollContext& ctx) {
53 auto res = stream.PollNext(&ctx);
54 for (; res.IsPending(); res = stream.PollNext(&ctx)) {
55 PERFETTO_CHECK(interested.size() == 1);
56 base::BlockUntilReadableFd(*interested.begin());
57 interested = {};
58 }
59 return res.IsDone() ? std::nullopt : std::make_optional(res.item());
60 }
61
TEST(UtilUnittest,BlockUntilReadableFd)62 TEST(UtilUnittest, BlockUntilReadableFd) {
63 base::WaitableEvent evt;
64 base::EventFd main_to_background;
65 base::EventFd background_to_main;
66 std::thread thread([&main_to_background, &background_to_main] {
67 base::BlockUntilReadableFd(main_to_background.fd());
68 background_to_main.Notify();
69 });
70 main_to_background.Notify();
71 base::BlockUntilReadableFd(background_to_main.fd());
72 thread.join();
73 }
74
TEST(UtilUnittest,ReadChannelStream)75 TEST(UtilUnittest, ReadChannelStream) {
76 base::Channel<int> channel(1);
77 auto stream = base::ReadChannelStream(&channel);
78
79 base::FlatSet<base::PlatformHandle> interested;
80 base::FlatSet<base::PlatformHandle> ready;
81 PollContext ctx(&interested, &ready);
82
83 ASSERT_TRUE(stream.PollNext(&ctx).IsPending());
84 ASSERT_EQ(interested.count(channel.read_fd()), 1u);
85 interested = {};
86
87 ASSERT_TRUE(channel.WriteNonBlocking(1).success);
88 ASSERT_EQ(stream.PollNext(&ctx).item(), 1);
89
90 ASSERT_TRUE(stream.PollNext(&ctx).IsPending());
91 ASSERT_EQ(interested.count(channel.read_fd()), 1u);
92 interested = {};
93
94 ASSERT_TRUE(channel.WriteNonBlocking(2).success);
95 channel.Close();
96
97 ASSERT_EQ(stream.PollNext(&ctx).item(), 2);
98 ASSERT_TRUE(stream.PollNext(&ctx).IsDone());
99 }
100
TEST(UtilUnittest,WriteChannelFuture)101 TEST(UtilUnittest, WriteChannelFuture) {
102 base::Channel<int> channel(1);
103
104 base::FlatSet<base::PlatformHandle> interested;
105 base::FlatSet<base::PlatformHandle> ready;
106 PollContext ctx(&interested, &ready);
107
108 ASSERT_TRUE(channel.WriteNonBlocking(1).success);
109 ASSERT_FALSE(channel.WriteNonBlocking(2).success);
110
111 auto future = base::WriteChannelFuture(&channel, 3);
112 ASSERT_TRUE(future.Poll(&ctx).IsPending());
113 ASSERT_EQ(interested.count(channel.write_fd()), 1u);
114 interested = {};
115
116 ASSERT_EQ(channel.ReadNonBlocking().item, 1);
117 ASSERT_EQ(channel.ReadNonBlocking().item, std::nullopt);
118
119 ASSERT_FALSE(future.Poll(&ctx).IsPending());
120 ASSERT_EQ(channel.ReadNonBlocking().item, 3);
121 }
122
TEST(UtilUnittest,RunOnThreadPool)123 TEST(UtilUnittest, RunOnThreadPool) {
124 base::FlatSet<base::PlatformHandle> interested;
125 base::FlatSet<base::PlatformHandle> ready;
126 PollContext ctx(&interested, &ready);
127
128 base::ThreadPool pool(1);
129 base::Stream<int> stream =
130 base::RunOnThreadPool<int>(&pool, [counter = 0]() mutable {
131 return counter == 2 ? std::nullopt : std::make_optional(counter++);
132 });
133 ASSERT_EQ(WaitForStreamReady(stream, interested, ctx), 0);
134 ASSERT_EQ(WaitForStreamReady(stream, interested, ctx), 1);
135 ASSERT_EQ(WaitForStreamReady(stream, interested, ctx), std::nullopt);
136 }
137
TEST(UtilUnittest,RunOnceOnThreadPool)138 TEST(UtilUnittest, RunOnceOnThreadPool) {
139 base::FlatSet<base::PlatformHandle> interested;
140 base::FlatSet<base::PlatformHandle> ready;
141 PollContext ctx(&interested, &ready);
142
143 base::ThreadPool pool(1);
144 base::Future<int> fut =
145 base::RunOnceOnThreadPool<int>(&pool, []() mutable { return 1; });
146 ASSERT_EQ(WaitForFutureReady(fut, interested, ctx), 1);
147 }
148
149 } // namespace
150 } // namespace base
151 } // namespace perfetto
152