1 /*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "perfetto/ext/base/threading/spawn.h"
18
19 #include <optional>
20
21 #include "perfetto/base/task_runner.h"
22 #include "perfetto/ext/base/thread_checker.h"
23 #include "perfetto/ext/base/threading/future.h"
24 #include "perfetto/ext/base/threading/poll.h"
25 #include "perfetto/ext/base/threading/stream.h"
26
27 namespace perfetto {
28 namespace base {
29
30 // Represents a future which is being polled to completion. Owned by
31 // SpawnHandle.
32 class PolledFuture {
33 public:
PolledFuture(TaskRunner * task_runner,Future<FVoid> future)34 explicit PolledFuture(TaskRunner* task_runner, Future<FVoid> future)
35 : task_runner_(task_runner), future_(std::move(future)) {
36 PERFETTO_DCHECK(task_runner_->RunsTasksOnCurrentThread());
37 PollUntilFinish();
38 }
39
~PolledFuture()40 ~PolledFuture() {
41 PERFETTO_DCHECK_THREAD(thread_checker);
42 ClearFutureAndWatches(interested_);
43 }
44
45 private:
46 PolledFuture(PolledFuture&&) = delete;
47 PolledFuture& operator=(PolledFuture&&) = delete;
48
PollUntilFinish()49 void PollUntilFinish() {
50 PERFETTO_DCHECK(task_runner_->RunsTasksOnCurrentThread());
51
52 auto pre_poll_interested = std::move(interested_);
53 interested_.clear();
54
55 FuturePollResult<FVoid> res = future_->Poll(&context_);
56 if (!res.IsPending()) {
57 ClearFutureAndWatches(pre_poll_interested);
58 return;
59 }
60
61 for (PlatformHandle fd : SetDifference(pre_poll_interested, interested_)) {
62 task_runner_->RemoveFileDescriptorWatch(fd);
63 }
64
65 auto weak_this = weak_ptr_factory_.GetWeakPtr();
66 for (PlatformHandle fd : SetDifference(interested_, pre_poll_interested)) {
67 task_runner_->AddFileDescriptorWatch(fd, [weak_this, fd]() {
68 if (!weak_this) {
69 return;
70 }
71 weak_this->ready_ = {fd};
72 weak_this->PollUntilFinish();
73 });
74 }
75 }
76
ClearFutureAndWatches(const FlatSet<PlatformHandle> & interested)77 void ClearFutureAndWatches(const FlatSet<PlatformHandle>& interested) {
78 future_ = std::nullopt;
79 for (PlatformHandle fd : interested) {
80 task_runner_->RemoveFileDescriptorWatch(fd);
81 }
82 interested_.clear();
83 ready_.clear();
84 }
85
SetDifference(const FlatSet<PlatformHandle> & f,const FlatSet<PlatformHandle> & s)86 static std::vector<PlatformHandle> SetDifference(
87 const FlatSet<PlatformHandle>& f,
88 const FlatSet<PlatformHandle>& s) {
89 std::vector<PlatformHandle> out(f.size());
90 auto it = std::set_difference(f.begin(), f.end(), s.begin(), s.end(),
91 out.begin());
92 out.resize(static_cast<size_t>(std::distance(out.begin(), it)));
93 return out;
94 }
95
96 TaskRunner* const task_runner_ = nullptr;
97
98 std::optional<Future<FVoid>> future_;
99 FlatSet<PlatformHandle> interested_;
100 FlatSet<PlatformHandle> ready_;
101 PollContext context_{&interested_, &ready_};
102
PERFETTO_THREAD_CHECKER(thread_checker)103 PERFETTO_THREAD_CHECKER(thread_checker)
104
105 // Keep this last.
106 WeakPtrFactory<PolledFuture> weak_ptr_factory_{this};
107 };
108
SpawnHandle(TaskRunner * task_runner,std::function<Future<FVoid> ()> fn)109 SpawnHandle::SpawnHandle(TaskRunner* task_runner,
110 std::function<Future<FVoid>()> fn)
111 : task_runner_(task_runner),
112 polled_future_(std::make_shared<std::unique_ptr<PolledFuture>>()) {
113 task_runner->PostTask(
114 [t = task_runner, fn = std::move(fn), p = polled_future_]() mutable {
115 p->reset(new PolledFuture(t, fn()));
116 });
117 }
118
~SpawnHandle()119 SpawnHandle::~SpawnHandle() {
120 task_runner_->PostTask(
121 [f = std::move(polled_future_)]() mutable { f.reset(); });
122 }
123
124 } // namespace base
125 } // namespace perfetto
126