xref: /aosp_15_r20/external/perfetto/src/base/threading/util_unittest.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "perfetto/ext/base/threading/util.h"
18 
19 #include <optional>
20 
21 #include "perfetto/base/flat_set.h"
22 #include "perfetto/base/platform_handle.h"
23 #include "perfetto/base/time.h"
24 #include "perfetto/ext/base/event_fd.h"
25 #include "perfetto/ext/base/threading/channel.h"
26 #include "perfetto/ext/base/threading/poll.h"
27 #include "perfetto/ext/base/threading/stream.h"
28 #include "perfetto/ext/base/threading/thread_pool.h"
29 #include "perfetto/ext/base/unix_task_runner.h"
30 #include "perfetto/ext/base/waitable_event.h"
31 #include "test/gtest_and_gmock.h"
32 
33 namespace perfetto {
34 namespace base {
35 namespace {
36 
WaitForFutureReady(base::Future<int> & stream,base::FlatSet<base::PlatformHandle> & interested,PollContext & ctx)37 int WaitForFutureReady(base::Future<int>& stream,
38                        base::FlatSet<base::PlatformHandle>& interested,
39                        PollContext& ctx) {
40   auto res = stream.Poll(&ctx);
41   for (; res.IsPending(); res = stream.Poll(&ctx)) {
42     PERFETTO_CHECK(interested.size() == 1);
43     base::BlockUntilReadableFd(*interested.begin());
44     interested = {};
45   }
46   return res.item();
47 }
48 
WaitForStreamReady(base::Stream<int> & stream,base::FlatSet<base::PlatformHandle> & interested,PollContext & ctx)49 std::optional<int> WaitForStreamReady(
50     base::Stream<int>& stream,
51     base::FlatSet<base::PlatformHandle>& interested,
52     PollContext& ctx) {
53   auto res = stream.PollNext(&ctx);
54   for (; res.IsPending(); res = stream.PollNext(&ctx)) {
55     PERFETTO_CHECK(interested.size() == 1);
56     base::BlockUntilReadableFd(*interested.begin());
57     interested = {};
58   }
59   return res.IsDone() ? std::nullopt : std::make_optional(res.item());
60 }
61 
TEST(UtilUnittest,BlockUntilReadableFd)62 TEST(UtilUnittest, BlockUntilReadableFd) {
63   base::WaitableEvent evt;
64   base::EventFd main_to_background;
65   base::EventFd background_to_main;
66   std::thread thread([&main_to_background, &background_to_main] {
67     base::BlockUntilReadableFd(main_to_background.fd());
68     background_to_main.Notify();
69   });
70   main_to_background.Notify();
71   base::BlockUntilReadableFd(background_to_main.fd());
72   thread.join();
73 }
74 
TEST(UtilUnittest,ReadChannelStream)75 TEST(UtilUnittest, ReadChannelStream) {
76   base::Channel<int> channel(1);
77   auto stream = base::ReadChannelStream(&channel);
78 
79   base::FlatSet<base::PlatformHandle> interested;
80   base::FlatSet<base::PlatformHandle> ready;
81   PollContext ctx(&interested, &ready);
82 
83   ASSERT_TRUE(stream.PollNext(&ctx).IsPending());
84   ASSERT_EQ(interested.count(channel.read_fd()), 1u);
85   interested = {};
86 
87   ASSERT_TRUE(channel.WriteNonBlocking(1).success);
88   ASSERT_EQ(stream.PollNext(&ctx).item(), 1);
89 
90   ASSERT_TRUE(stream.PollNext(&ctx).IsPending());
91   ASSERT_EQ(interested.count(channel.read_fd()), 1u);
92   interested = {};
93 
94   ASSERT_TRUE(channel.WriteNonBlocking(2).success);
95   channel.Close();
96 
97   ASSERT_EQ(stream.PollNext(&ctx).item(), 2);
98   ASSERT_TRUE(stream.PollNext(&ctx).IsDone());
99 }
100 
TEST(UtilUnittest,WriteChannelFuture)101 TEST(UtilUnittest, WriteChannelFuture) {
102   base::Channel<int> channel(1);
103 
104   base::FlatSet<base::PlatformHandle> interested;
105   base::FlatSet<base::PlatformHandle> ready;
106   PollContext ctx(&interested, &ready);
107 
108   ASSERT_TRUE(channel.WriteNonBlocking(1).success);
109   ASSERT_FALSE(channel.WriteNonBlocking(2).success);
110 
111   auto future = base::WriteChannelFuture(&channel, 3);
112   ASSERT_TRUE(future.Poll(&ctx).IsPending());
113   ASSERT_EQ(interested.count(channel.write_fd()), 1u);
114   interested = {};
115 
116   ASSERT_EQ(channel.ReadNonBlocking().item, 1);
117   ASSERT_EQ(channel.ReadNonBlocking().item, std::nullopt);
118 
119   ASSERT_FALSE(future.Poll(&ctx).IsPending());
120   ASSERT_EQ(channel.ReadNonBlocking().item, 3);
121 }
122 
TEST(UtilUnittest,RunOnThreadPool)123 TEST(UtilUnittest, RunOnThreadPool) {
124   base::FlatSet<base::PlatformHandle> interested;
125   base::FlatSet<base::PlatformHandle> ready;
126   PollContext ctx(&interested, &ready);
127 
128   base::ThreadPool pool(1);
129   base::Stream<int> stream =
130       base::RunOnThreadPool<int>(&pool, [counter = 0]() mutable {
131         return counter == 2 ? std::nullopt : std::make_optional(counter++);
132       });
133   ASSERT_EQ(WaitForStreamReady(stream, interested, ctx), 0);
134   ASSERT_EQ(WaitForStreamReady(stream, interested, ctx), 1);
135   ASSERT_EQ(WaitForStreamReady(stream, interested, ctx), std::nullopt);
136 }
137 
TEST(UtilUnittest,RunOnceOnThreadPool)138 TEST(UtilUnittest, RunOnceOnThreadPool) {
139   base::FlatSet<base::PlatformHandle> interested;
140   base::FlatSet<base::PlatformHandle> ready;
141   PollContext ctx(&interested, &ready);
142 
143   base::ThreadPool pool(1);
144   base::Future<int> fut =
145       base::RunOnceOnThreadPool<int>(&pool, []() mutable { return 1; });
146   ASSERT_EQ(WaitForFutureReady(fut, interested, ctx), 1);
147 }
148 
149 }  // namespace
150 }  // namespace base
151 }  // namespace perfetto
152