xref: /aosp_15_r20/external/cronet/base/message_loop/message_pump_epoll.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2022 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/message_loop/message_pump_epoll.h"
6 
7 #include <sys/epoll.h>
8 #include <sys/eventfd.h>
9 
10 #include <cstddef>
11 #include <cstdint>
12 #include <optional>
13 #include <utility>
14 
15 #include "base/auto_reset.h"
16 #include "base/check_op.h"
17 #include "base/memory/raw_ptr.h"
18 #include "base/memory/ref_counted.h"
19 #include "base/posix/eintr_wrapper.h"
20 #include "base/ranges/algorithm.h"
21 #include "base/threading/thread_checker.h"
22 #include "base/trace_event/base_tracing.h"
23 
24 namespace base {
25 
MessagePumpEpoll()26 MessagePumpEpoll::MessagePumpEpoll() {
27   epoll_.reset(epoll_create1(/*flags=*/0));
28   PCHECK(epoll_.is_valid());
29 
30   wake_event_.reset(eventfd(0, EFD_NONBLOCK));
31   PCHECK(wake_event_.is_valid());
32 
33   epoll_event wake{.events = EPOLLIN, .data = {.ptr = &wake_event_}};
34   int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_ADD, wake_event_.get(), &wake);
35   PCHECK(rv == 0);
36 }
37 
38 MessagePumpEpoll::~MessagePumpEpoll() = default;
39 
WatchFileDescriptor(int fd,bool persistent,int mode,FdWatchController * controller,FdWatcher * watcher)40 bool MessagePumpEpoll::WatchFileDescriptor(int fd,
41                                            bool persistent,
42                                            int mode,
43                                            FdWatchController* controller,
44                                            FdWatcher* watcher) {
45   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
46   TRACE_EVENT("base", "MessagePumpEpoll::WatchFileDescriptor", "fd", fd,
47               "persistent", persistent, "watch_read", mode & WATCH_READ,
48               "watch_write", mode & WATCH_WRITE);
49 
50   const InterestParams params{
51       .fd = fd,
52       .read = (mode == WATCH_READ || mode == WATCH_READ_WRITE),
53       .write = (mode == WATCH_WRITE || mode == WATCH_READ_WRITE),
54       .one_shot = !persistent,
55   };
56 
57   auto [it, is_new_fd_entry] = entries_.emplace(fd, fd);
58   EpollEventEntry& entry = it->second;
59   scoped_refptr<Interest> existing_interest = controller->epoll_interest();
60   if (existing_interest && existing_interest->params().IsEqual(params)) {
61     // WatchFileDescriptor() has already been called for this controller at
62     // least once before, and as in the most common cases, it is now being
63     // called again with the same parameters.
64     //
65     // We don't need to allocate and register a new Interest in this case, but
66     // we can instead reactivate the existing (presumably deactivated,
67     // non-persistent) Interest.
68     existing_interest->set_active(true);
69   } else {
70     entry.interests.push_back(controller->AssignEpollInterest(params));
71     if (existing_interest) {
72       UnregisterInterest(existing_interest);
73     }
74   }
75 
76   if (is_new_fd_entry) {
77     AddEpollEvent(entry);
78   } else {
79     UpdateEpollEvent(entry);
80   }
81 
82   controller->set_epoll_pump(weak_ptr_factory_.GetWeakPtr());
83   controller->set_watcher(watcher);
84   return true;
85 }
86 
Run(Delegate * delegate)87 void MessagePumpEpoll::Run(Delegate* delegate) {
88   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
89   RunState run_state(delegate);
90   AutoReset<raw_ptr<RunState>> auto_reset_run_state(&run_state_, &run_state);
91   for (;;) {
92     // Do some work and see if the next task is ready right away.
93     Delegate::NextWorkInfo next_work_info = delegate->DoWork();
94     const bool immediate_work_available = next_work_info.is_immediate();
95     if (run_state.should_quit) {
96       break;
97     }
98 
99     // Reset the native work flag before processing IO events.
100     native_work_started_ = false;
101 
102     // Process any immediately ready IO event, but don't wait for more yet.
103     WaitForEpollEvents(TimeDelta());
104 
105     bool attempt_more_work = immediate_work_available || processed_io_events_;
106     processed_io_events_ = false;
107 
108     if (run_state.should_quit) {
109       break;
110     }
111     if (attempt_more_work) {
112       continue;
113     }
114 
115     attempt_more_work = delegate->DoIdleWork();
116     if (run_state.should_quit) {
117       break;
118     }
119     if (attempt_more_work) {
120       continue;
121     }
122 
123     TimeDelta timeout = TimeDelta::Max();
124     DCHECK(!next_work_info.delayed_run_time.is_null());
125     if (!next_work_info.delayed_run_time.is_max()) {
126       timeout = next_work_info.remaining_delay();
127     }
128     delegate->BeforeWait();
129     WaitForEpollEvents(timeout);
130     if (run_state.should_quit) {
131       break;
132     }
133   }
134 }
135 
Quit()136 void MessagePumpEpoll::Quit() {
137   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
138   DCHECK(run_state_) << "Quit() called outside of Run()";
139   run_state_->should_quit = true;
140 }
141 
ScheduleWork()142 void MessagePumpEpoll::ScheduleWork() {
143   const uint64_t value = 1;
144   ssize_t n = HANDLE_EINTR(write(wake_event_.get(), &value, sizeof(value)));
145 
146   // EAGAIN here implies that the write() would overflow of the event counter,
147   // which is a condition we can safely ignore. It implies that the event
148   // counter is non-zero and therefore readable, which is enough to ensure that
149   // any pending wait eventually wakes up.
150   DPCHECK(n == sizeof(value) || errno == EAGAIN);
151 }
152 
ScheduleDelayedWork(const Delegate::NextWorkInfo & next_work_info)153 void MessagePumpEpoll::ScheduleDelayedWork(
154     const Delegate::NextWorkInfo& next_work_info) {
155   // Nothing to do. This can only be called from the same thread as Run(), so
156   // the pump must be in between waits. The scheduled work therefore will be
157   // seen in time for the next wait.
158 }
159 
AddEpollEvent(EpollEventEntry & entry)160 void MessagePumpEpoll::AddEpollEvent(EpollEventEntry& entry) {
161   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
162   DCHECK(!entry.stopped);
163   const uint32_t events = entry.ComputeActiveEvents();
164   epoll_event event{.events = events, .data = {.ptr = &entry}};
165   int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_ADD, entry.fd, &event);
166   DPCHECK(rv == 0);
167   entry.registered_events = events;
168 }
169 
UpdateEpollEvent(EpollEventEntry & entry)170 void MessagePumpEpoll::UpdateEpollEvent(EpollEventEntry& entry) {
171   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
172   if (!entry.stopped) {
173     const uint32_t events = entry.ComputeActiveEvents();
174     if (events == entry.registered_events && !(events & EPOLLONESHOT)) {
175       // Persistent events don't need to be modified if no bits are changing.
176       return;
177     }
178     epoll_event event{.events = events, .data = {.ptr = &entry}};
179     int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_MOD, entry.fd, &event);
180     DPCHECK(rv == 0);
181     entry.registered_events = events;
182   }
183 }
184 
StopEpollEvent(EpollEventEntry & entry)185 void MessagePumpEpoll::StopEpollEvent(EpollEventEntry& entry) {
186   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
187   if (!entry.stopped) {
188     int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_DEL, entry.fd, nullptr);
189     DPCHECK(rv == 0);
190     entry.stopped = true;
191   }
192 }
193 
UnregisterInterest(const scoped_refptr<Interest> & interest)194 void MessagePumpEpoll::UnregisterInterest(
195     const scoped_refptr<Interest>& interest) {
196   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
197 
198   const int fd = interest->params().fd;
199   auto entry_it = entries_.find(fd);
200   CHECK(entry_it != entries_.end(), base::NotFatalUntil::M125);
201 
202   EpollEventEntry& entry = entry_it->second;
203   auto& interests = entry.interests;
204   auto* it = ranges::find(interests, interest);
205   CHECK(it != interests.end(), base::NotFatalUntil::M125);
206   interests.erase(it);
207 
208   if (interests.empty()) {
209     StopEpollEvent(entry);
210     entries_.erase(entry_it);
211   } else {
212     UpdateEpollEvent(entry);
213   }
214 }
215 
WaitForEpollEvents(TimeDelta timeout)216 bool MessagePumpEpoll::WaitForEpollEvents(TimeDelta timeout) {
217   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
218 
219   // `timeout` has microsecond resolution, but timeouts accepted by epoll_wait()
220   // are integral milliseconds. Round up to the next millisecond.
221   // TODO(https://crbug.com/1382894): Consider higher-resolution timeouts.
222   const int epoll_timeout =
223       timeout.is_max() ? -1
224                        : saturated_cast<int>(timeout.InMillisecondsRoundedUp());
225   epoll_event events[16];
226   const int epoll_result =
227       epoll_wait(epoll_.get(), events, std::size(events), epoll_timeout);
228   if (epoll_result < 0) {
229     DPCHECK(errno == EINTR);
230     return false;
231   }
232 
233   if (epoll_result == 0) {
234     return false;
235   }
236 
237   const base::span<epoll_event> ready_events(events,
238                                              static_cast<size_t>(epoll_result));
239   for (auto& e : ready_events) {
240     if (e.data.ptr == &wake_event_) {
241       // Wake-up events are always safe to handle immediately. Unlike other
242       // events used by MessagePumpEpoll they also don't point to an
243       // EpollEventEntry, so we handle them separately here.
244       HandleWakeUp();
245       e.data.ptr = nullptr;
246       continue;
247     }
248 
249     // To guard against one of the ready events unregistering and thus
250     // invalidating one of the others here, first link each entry to the
251     // corresponding epoll_event returned by epoll_wait(). We do this before
252     // dispatching any events, and the second pass below will only dispatch an
253     // event if its epoll_event data is still valid.
254     auto& entry = EpollEventEntry::FromEpollEvent(e);
255     DCHECK(!entry.active_event);
256     EpollEventEntry::FromEpollEvent(e).active_event = &e;
257   }
258 
259   for (auto& e : ready_events) {
260     if (e.data.ptr) {
261       auto& entry = EpollEventEntry::FromEpollEvent(e);
262       entry.active_event = nullptr;
263       OnEpollEvent(entry, e.events);
264     }
265   }
266 
267   return true;
268 }
269 
OnEpollEvent(EpollEventEntry & entry,uint32_t events)270 void MessagePumpEpoll::OnEpollEvent(EpollEventEntry& entry, uint32_t events) {
271   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
272   DCHECK(!entry.stopped);
273 
274   const bool readable = (events & EPOLLIN) != 0;
275   const bool writable = (events & EPOLLOUT) != 0;
276 
277   // Under different circumstances, peer closure may raise both/either EPOLLHUP
278   // and/or EPOLLERR. Treat them as equivalent. Notify the watchers to
279   // gracefully stop watching if disconnected.
280   const bool disconnected = (events & (EPOLLHUP | EPOLLERR)) != 0;
281   DCHECK(readable || writable || disconnected);
282 
283   // Copy the set of Interests, since interests may be added to or removed from
284   // `entry` during the loop below. This copy is inexpensive in practice
285   // because the size of this vector is expected to be very small (<= 2).
286   auto interests = entry.interests;
287 
288   // Any of these interests' event handlers may destroy any of the others'
289   // controllers. Start all of them watching for destruction before we actually
290   // dispatch any events.
291   for (const auto& interest : interests) {
292     interest->WatchForControllerDestruction();
293   }
294 
295   bool event_handled = false;
296   for (const auto& interest : interests) {
297     if (!interest->active()) {
298       continue;
299     }
300 
301     const bool can_read = (readable || disconnected) && interest->params().read;
302     const bool can_write =
303         (writable || disconnected) && interest->params().write;
304     if (!can_read && !can_write) {
305       // If this Interest is active but not watching for whichever event was
306       // raised here, there's nothing to do. This can occur if a descriptor has
307       // multiple active interests, since only one interest needs to be
308       // satisfied in order for us to process an epoll event.
309       continue;
310     }
311 
312     if (interest->params().one_shot) {
313       // This is a one-shot event watch which is about to be triggered. We
314       // deactivate the interest and update epoll immediately. The event handler
315       // may reactivate it.
316       interest->set_active(false);
317       UpdateEpollEvent(entry);
318     }
319 
320     if (!interest->was_controller_destroyed()) {
321       HandleEvent(entry.fd, can_read, can_write, interest->controller());
322       event_handled = true;
323     }
324   }
325 
326   // Stop `EpollEventEntry` for disconnected file descriptor without active
327   // interests.
328   if (disconnected && !event_handled) {
329     StopEpollEvent(entry);
330   }
331 
332   for (const auto& interest : interests) {
333     interest->StopWatchingForControllerDestruction();
334   }
335 }
336 
HandleEvent(int fd,bool can_read,bool can_write,FdWatchController * controller)337 void MessagePumpEpoll::HandleEvent(int fd,
338                                    bool can_read,
339                                    bool can_write,
340                                    FdWatchController* controller) {
341   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
342   BeginNativeWorkBatch();
343   processed_io_events_ = true;
344   // Make the MessagePumpDelegate aware of this other form of "DoWork". Skip if
345   // HandleNotification() is called outside of Run() (e.g. in unit tests).
346   Delegate::ScopedDoWorkItem scoped_do_work_item;
347   if (run_state_) {
348     scoped_do_work_item = run_state_->delegate->BeginWorkItem();
349   }
350 
351   // Trace events must begin after the above BeginWorkItem() so that the
352   // ensuing "ThreadController active" outscopes all the events under it.
353   TRACE_EVENT("toplevel", "EpollEvent", "controller_created_from",
354               controller->created_from_location(), "fd", fd, "can_read",
355               can_read, "can_write", can_write, "context",
356               static_cast<void*>(controller));
357   TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION heap_profiler_scope(
358       controller->created_from_location().file_name());
359   if (can_read && can_write) {
360     bool controller_was_destroyed = false;
361     bool* previous_was_destroyed_flag =
362         std::exchange(controller->was_destroyed_, &controller_was_destroyed);
363 
364     controller->OnFdWritable();
365     if (!controller_was_destroyed) {
366       controller->OnFdReadable();
367     }
368     if (!controller_was_destroyed) {
369       controller->was_destroyed_ = previous_was_destroyed_flag;
370     } else if (previous_was_destroyed_flag) {
371       *previous_was_destroyed_flag = true;
372     }
373   } else if (can_write) {
374     controller->OnFdWritable();
375   } else if (can_read) {
376     controller->OnFdReadable();
377   }
378 }
379 
HandleWakeUp()380 void MessagePumpEpoll::HandleWakeUp() {
381   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
382   BeginNativeWorkBatch();
383   processed_io_events_ = true;
384   uint64_t value;
385   ssize_t n = HANDLE_EINTR(read(wake_event_.get(), &value, sizeof(value)));
386   DPCHECK(n == sizeof(value));
387 }
388 
BeginNativeWorkBatch()389 void MessagePumpEpoll::BeginNativeWorkBatch() {
390   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
391   // Call `BeginNativeWorkBeforeDoWork()` if native work hasn't started.
392   if (!native_work_started_) {
393     if (run_state_) {
394       run_state_->delegate->BeginNativeWorkBeforeDoWork();
395     }
396     native_work_started_ = true;
397   }
398 }
399 
EpollEventEntry(int fd)400 MessagePumpEpoll::EpollEventEntry::EpollEventEntry(int fd) : fd(fd) {}
401 
~EpollEventEntry()402 MessagePumpEpoll::EpollEventEntry::~EpollEventEntry() {
403   if (active_event) {
404     DCHECK_EQ(this, active_event->data.ptr);
405     active_event->data.ptr = nullptr;
406   }
407 }
408 
ComputeActiveEvents()409 uint32_t MessagePumpEpoll::EpollEventEntry::ComputeActiveEvents() {
410   uint32_t events = 0;
411   bool one_shot = true;
412   for (const auto& interest : interests) {
413     if (!interest->active()) {
414       continue;
415     }
416     const InterestParams& params = interest->params();
417     events |= (params.read ? EPOLLIN : 0) | (params.write ? EPOLLOUT : 0);
418     one_shot &= params.one_shot;
419   }
420   if (events != 0 && one_shot) {
421     return events | EPOLLONESHOT;
422   }
423   return events;
424 }
425 
426 }  // namespace base
427