xref: /aosp_15_r20/external/perfetto/src/base/threading/spawn.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "perfetto/ext/base/threading/spawn.h"
18 
19 #include <optional>
20 
21 #include "perfetto/base/task_runner.h"
22 #include "perfetto/ext/base/thread_checker.h"
23 #include "perfetto/ext/base/threading/future.h"
24 #include "perfetto/ext/base/threading/poll.h"
25 #include "perfetto/ext/base/threading/stream.h"
26 
27 namespace perfetto {
28 namespace base {
29 
30 // Represents a future which is being polled to completion. Owned by
31 // SpawnHandle.
32 class PolledFuture {
33  public:
PolledFuture(TaskRunner * task_runner,Future<FVoid> future)34   explicit PolledFuture(TaskRunner* task_runner, Future<FVoid> future)
35       : task_runner_(task_runner), future_(std::move(future)) {
36     PERFETTO_DCHECK(task_runner_->RunsTasksOnCurrentThread());
37     PollUntilFinish();
38   }
39 
~PolledFuture()40   ~PolledFuture() {
41     PERFETTO_DCHECK_THREAD(thread_checker);
42     ClearFutureAndWatches(interested_);
43   }
44 
45  private:
46   PolledFuture(PolledFuture&&) = delete;
47   PolledFuture& operator=(PolledFuture&&) = delete;
48 
PollUntilFinish()49   void PollUntilFinish() {
50     PERFETTO_DCHECK(task_runner_->RunsTasksOnCurrentThread());
51 
52     auto pre_poll_interested = std::move(interested_);
53     interested_.clear();
54 
55     FuturePollResult<FVoid> res = future_->Poll(&context_);
56     if (!res.IsPending()) {
57       ClearFutureAndWatches(pre_poll_interested);
58       return;
59     }
60 
61     for (PlatformHandle fd : SetDifference(pre_poll_interested, interested_)) {
62       task_runner_->RemoveFileDescriptorWatch(fd);
63     }
64 
65     auto weak_this = weak_ptr_factory_.GetWeakPtr();
66     for (PlatformHandle fd : SetDifference(interested_, pre_poll_interested)) {
67       task_runner_->AddFileDescriptorWatch(fd, [weak_this, fd]() {
68         if (!weak_this) {
69           return;
70         }
71         weak_this->ready_ = {fd};
72         weak_this->PollUntilFinish();
73       });
74     }
75   }
76 
ClearFutureAndWatches(const FlatSet<PlatformHandle> & interested)77   void ClearFutureAndWatches(const FlatSet<PlatformHandle>& interested) {
78     future_ = std::nullopt;
79     for (PlatformHandle fd : interested) {
80       task_runner_->RemoveFileDescriptorWatch(fd);
81     }
82     interested_.clear();
83     ready_.clear();
84   }
85 
SetDifference(const FlatSet<PlatformHandle> & f,const FlatSet<PlatformHandle> & s)86   static std::vector<PlatformHandle> SetDifference(
87       const FlatSet<PlatformHandle>& f,
88       const FlatSet<PlatformHandle>& s) {
89     std::vector<PlatformHandle> out(f.size());
90     auto it = std::set_difference(f.begin(), f.end(), s.begin(), s.end(),
91                                   out.begin());
92     out.resize(static_cast<size_t>(std::distance(out.begin(), it)));
93     return out;
94   }
95 
96   TaskRunner* const task_runner_ = nullptr;
97 
98   std::optional<Future<FVoid>> future_;
99   FlatSet<PlatformHandle> interested_;
100   FlatSet<PlatformHandle> ready_;
101   PollContext context_{&interested_, &ready_};
102 
PERFETTO_THREAD_CHECKER(thread_checker)103   PERFETTO_THREAD_CHECKER(thread_checker)
104 
105   // Keep this last.
106   WeakPtrFactory<PolledFuture> weak_ptr_factory_{this};
107 };
108 
SpawnHandle(TaskRunner * task_runner,std::function<Future<FVoid> ()> fn)109 SpawnHandle::SpawnHandle(TaskRunner* task_runner,
110                          std::function<Future<FVoid>()> fn)
111     : task_runner_(task_runner),
112       polled_future_(std::make_shared<std::unique_ptr<PolledFuture>>()) {
113   task_runner->PostTask(
114       [t = task_runner, fn = std::move(fn), p = polled_future_]() mutable {
115         p->reset(new PolledFuture(t, fn()));
116       });
117 }
118 
~SpawnHandle()119 SpawnHandle::~SpawnHandle() {
120   task_runner_->PostTask(
121       [f = std::move(polled_future_)]() mutable { f.reset(); });
122 }
123 
124 }  // namespace base
125 }  // namespace perfetto
126