1 // Copyright 2022 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/message_loop/message_pump_epoll.h"
6
7 #include <sys/epoll.h>
8 #include <sys/eventfd.h>
9
10 #include <cstddef>
11 #include <cstdint>
12 #include <optional>
13 #include <utility>
14
15 #include "base/auto_reset.h"
16 #include "base/check_op.h"
17 #include "base/memory/raw_ptr.h"
18 #include "base/memory/ref_counted.h"
19 #include "base/posix/eintr_wrapper.h"
20 #include "base/ranges/algorithm.h"
21 #include "base/threading/thread_checker.h"
22 #include "base/trace_event/base_tracing.h"
23
24 namespace base {
25
MessagePumpEpoll()26 MessagePumpEpoll::MessagePumpEpoll() {
27 epoll_.reset(epoll_create1(/*flags=*/0));
28 PCHECK(epoll_.is_valid());
29
30 wake_event_.reset(eventfd(0, EFD_NONBLOCK));
31 PCHECK(wake_event_.is_valid());
32
33 epoll_event wake{.events = EPOLLIN, .data = {.ptr = &wake_event_}};
34 int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_ADD, wake_event_.get(), &wake);
35 PCHECK(rv == 0);
36 }
37
38 MessagePumpEpoll::~MessagePumpEpoll() = default;
39
WatchFileDescriptor(int fd,bool persistent,int mode,FdWatchController * controller,FdWatcher * watcher)40 bool MessagePumpEpoll::WatchFileDescriptor(int fd,
41 bool persistent,
42 int mode,
43 FdWatchController* controller,
44 FdWatcher* watcher) {
45 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
46 TRACE_EVENT("base", "MessagePumpEpoll::WatchFileDescriptor", "fd", fd,
47 "persistent", persistent, "watch_read", mode & WATCH_READ,
48 "watch_write", mode & WATCH_WRITE);
49
50 const InterestParams params{
51 .fd = fd,
52 .read = (mode == WATCH_READ || mode == WATCH_READ_WRITE),
53 .write = (mode == WATCH_WRITE || mode == WATCH_READ_WRITE),
54 .one_shot = !persistent,
55 };
56
57 auto [it, is_new_fd_entry] = entries_.emplace(fd, fd);
58 EpollEventEntry& entry = it->second;
59 scoped_refptr<Interest> existing_interest = controller->epoll_interest();
60 if (existing_interest && existing_interest->params().IsEqual(params)) {
61 // WatchFileDescriptor() has already been called for this controller at
62 // least once before, and as in the most common cases, it is now being
63 // called again with the same parameters.
64 //
65 // We don't need to allocate and register a new Interest in this case, but
66 // we can instead reactivate the existing (presumably deactivated,
67 // non-persistent) Interest.
68 existing_interest->set_active(true);
69 } else {
70 entry.interests.push_back(controller->AssignEpollInterest(params));
71 if (existing_interest) {
72 UnregisterInterest(existing_interest);
73 }
74 }
75
76 if (is_new_fd_entry) {
77 AddEpollEvent(entry);
78 } else {
79 UpdateEpollEvent(entry);
80 }
81
82 controller->set_epoll_pump(weak_ptr_factory_.GetWeakPtr());
83 controller->set_watcher(watcher);
84 return true;
85 }
86
Run(Delegate * delegate)87 void MessagePumpEpoll::Run(Delegate* delegate) {
88 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
89 RunState run_state(delegate);
90 AutoReset<raw_ptr<RunState>> auto_reset_run_state(&run_state_, &run_state);
91 for (;;) {
92 // Do some work and see if the next task is ready right away.
93 Delegate::NextWorkInfo next_work_info = delegate->DoWork();
94 const bool immediate_work_available = next_work_info.is_immediate();
95 if (run_state.should_quit) {
96 break;
97 }
98
99 // Reset the native work flag before processing IO events.
100 native_work_started_ = false;
101
102 // Process any immediately ready IO event, but don't wait for more yet.
103 WaitForEpollEvents(TimeDelta());
104
105 bool attempt_more_work = immediate_work_available || processed_io_events_;
106 processed_io_events_ = false;
107
108 if (run_state.should_quit) {
109 break;
110 }
111 if (attempt_more_work) {
112 continue;
113 }
114
115 attempt_more_work = delegate->DoIdleWork();
116 if (run_state.should_quit) {
117 break;
118 }
119 if (attempt_more_work) {
120 continue;
121 }
122
123 TimeDelta timeout = TimeDelta::Max();
124 DCHECK(!next_work_info.delayed_run_time.is_null());
125 if (!next_work_info.delayed_run_time.is_max()) {
126 timeout = next_work_info.remaining_delay();
127 }
128 delegate->BeforeWait();
129 WaitForEpollEvents(timeout);
130 if (run_state.should_quit) {
131 break;
132 }
133 }
134 }
135
Quit()136 void MessagePumpEpoll::Quit() {
137 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
138 DCHECK(run_state_) << "Quit() called outside of Run()";
139 run_state_->should_quit = true;
140 }
141
ScheduleWork()142 void MessagePumpEpoll::ScheduleWork() {
143 const uint64_t value = 1;
144 ssize_t n = HANDLE_EINTR(write(wake_event_.get(), &value, sizeof(value)));
145
146 // EAGAIN here implies that the write() would overflow of the event counter,
147 // which is a condition we can safely ignore. It implies that the event
148 // counter is non-zero and therefore readable, which is enough to ensure that
149 // any pending wait eventually wakes up.
150 DPCHECK(n == sizeof(value) || errno == EAGAIN);
151 }
152
ScheduleDelayedWork(const Delegate::NextWorkInfo & next_work_info)153 void MessagePumpEpoll::ScheduleDelayedWork(
154 const Delegate::NextWorkInfo& next_work_info) {
155 // Nothing to do. This can only be called from the same thread as Run(), so
156 // the pump must be in between waits. The scheduled work therefore will be
157 // seen in time for the next wait.
158 }
159
AddEpollEvent(EpollEventEntry & entry)160 void MessagePumpEpoll::AddEpollEvent(EpollEventEntry& entry) {
161 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
162 DCHECK(!entry.stopped);
163 const uint32_t events = entry.ComputeActiveEvents();
164 epoll_event event{.events = events, .data = {.ptr = &entry}};
165 int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_ADD, entry.fd, &event);
166 DPCHECK(rv == 0);
167 entry.registered_events = events;
168 }
169
UpdateEpollEvent(EpollEventEntry & entry)170 void MessagePumpEpoll::UpdateEpollEvent(EpollEventEntry& entry) {
171 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
172 if (!entry.stopped) {
173 const uint32_t events = entry.ComputeActiveEvents();
174 if (events == entry.registered_events && !(events & EPOLLONESHOT)) {
175 // Persistent events don't need to be modified if no bits are changing.
176 return;
177 }
178 epoll_event event{.events = events, .data = {.ptr = &entry}};
179 int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_MOD, entry.fd, &event);
180 DPCHECK(rv == 0);
181 entry.registered_events = events;
182 }
183 }
184
StopEpollEvent(EpollEventEntry & entry)185 void MessagePumpEpoll::StopEpollEvent(EpollEventEntry& entry) {
186 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
187 if (!entry.stopped) {
188 int rv = epoll_ctl(epoll_.get(), EPOLL_CTL_DEL, entry.fd, nullptr);
189 DPCHECK(rv == 0);
190 entry.stopped = true;
191 }
192 }
193
UnregisterInterest(const scoped_refptr<Interest> & interest)194 void MessagePumpEpoll::UnregisterInterest(
195 const scoped_refptr<Interest>& interest) {
196 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
197
198 const int fd = interest->params().fd;
199 auto entry_it = entries_.find(fd);
200 CHECK(entry_it != entries_.end(), base::NotFatalUntil::M125);
201
202 EpollEventEntry& entry = entry_it->second;
203 auto& interests = entry.interests;
204 auto* it = ranges::find(interests, interest);
205 CHECK(it != interests.end(), base::NotFatalUntil::M125);
206 interests.erase(it);
207
208 if (interests.empty()) {
209 StopEpollEvent(entry);
210 entries_.erase(entry_it);
211 } else {
212 UpdateEpollEvent(entry);
213 }
214 }
215
WaitForEpollEvents(TimeDelta timeout)216 bool MessagePumpEpoll::WaitForEpollEvents(TimeDelta timeout) {
217 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
218
219 // `timeout` has microsecond resolution, but timeouts accepted by epoll_wait()
220 // are integral milliseconds. Round up to the next millisecond.
221 // TODO(https://crbug.com/1382894): Consider higher-resolution timeouts.
222 const int epoll_timeout =
223 timeout.is_max() ? -1
224 : saturated_cast<int>(timeout.InMillisecondsRoundedUp());
225 epoll_event events[16];
226 const int epoll_result =
227 epoll_wait(epoll_.get(), events, std::size(events), epoll_timeout);
228 if (epoll_result < 0) {
229 DPCHECK(errno == EINTR);
230 return false;
231 }
232
233 if (epoll_result == 0) {
234 return false;
235 }
236
237 const base::span<epoll_event> ready_events(events,
238 static_cast<size_t>(epoll_result));
239 for (auto& e : ready_events) {
240 if (e.data.ptr == &wake_event_) {
241 // Wake-up events are always safe to handle immediately. Unlike other
242 // events used by MessagePumpEpoll they also don't point to an
243 // EpollEventEntry, so we handle them separately here.
244 HandleWakeUp();
245 e.data.ptr = nullptr;
246 continue;
247 }
248
249 // To guard against one of the ready events unregistering and thus
250 // invalidating one of the others here, first link each entry to the
251 // corresponding epoll_event returned by epoll_wait(). We do this before
252 // dispatching any events, and the second pass below will only dispatch an
253 // event if its epoll_event data is still valid.
254 auto& entry = EpollEventEntry::FromEpollEvent(e);
255 DCHECK(!entry.active_event);
256 EpollEventEntry::FromEpollEvent(e).active_event = &e;
257 }
258
259 for (auto& e : ready_events) {
260 if (e.data.ptr) {
261 auto& entry = EpollEventEntry::FromEpollEvent(e);
262 entry.active_event = nullptr;
263 OnEpollEvent(entry, e.events);
264 }
265 }
266
267 return true;
268 }
269
OnEpollEvent(EpollEventEntry & entry,uint32_t events)270 void MessagePumpEpoll::OnEpollEvent(EpollEventEntry& entry, uint32_t events) {
271 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
272 DCHECK(!entry.stopped);
273
274 const bool readable = (events & EPOLLIN) != 0;
275 const bool writable = (events & EPOLLOUT) != 0;
276
277 // Under different circumstances, peer closure may raise both/either EPOLLHUP
278 // and/or EPOLLERR. Treat them as equivalent. Notify the watchers to
279 // gracefully stop watching if disconnected.
280 const bool disconnected = (events & (EPOLLHUP | EPOLLERR)) != 0;
281 DCHECK(readable || writable || disconnected);
282
283 // Copy the set of Interests, since interests may be added to or removed from
284 // `entry` during the loop below. This copy is inexpensive in practice
285 // because the size of this vector is expected to be very small (<= 2).
286 auto interests = entry.interests;
287
288 // Any of these interests' event handlers may destroy any of the others'
289 // controllers. Start all of them watching for destruction before we actually
290 // dispatch any events.
291 for (const auto& interest : interests) {
292 interest->WatchForControllerDestruction();
293 }
294
295 bool event_handled = false;
296 for (const auto& interest : interests) {
297 if (!interest->active()) {
298 continue;
299 }
300
301 const bool can_read = (readable || disconnected) && interest->params().read;
302 const bool can_write =
303 (writable || disconnected) && interest->params().write;
304 if (!can_read && !can_write) {
305 // If this Interest is active but not watching for whichever event was
306 // raised here, there's nothing to do. This can occur if a descriptor has
307 // multiple active interests, since only one interest needs to be
308 // satisfied in order for us to process an epoll event.
309 continue;
310 }
311
312 if (interest->params().one_shot) {
313 // This is a one-shot event watch which is about to be triggered. We
314 // deactivate the interest and update epoll immediately. The event handler
315 // may reactivate it.
316 interest->set_active(false);
317 UpdateEpollEvent(entry);
318 }
319
320 if (!interest->was_controller_destroyed()) {
321 HandleEvent(entry.fd, can_read, can_write, interest->controller());
322 event_handled = true;
323 }
324 }
325
326 // Stop `EpollEventEntry` for disconnected file descriptor without active
327 // interests.
328 if (disconnected && !event_handled) {
329 StopEpollEvent(entry);
330 }
331
332 for (const auto& interest : interests) {
333 interest->StopWatchingForControllerDestruction();
334 }
335 }
336
HandleEvent(int fd,bool can_read,bool can_write,FdWatchController * controller)337 void MessagePumpEpoll::HandleEvent(int fd,
338 bool can_read,
339 bool can_write,
340 FdWatchController* controller) {
341 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
342 BeginNativeWorkBatch();
343 processed_io_events_ = true;
344 // Make the MessagePumpDelegate aware of this other form of "DoWork". Skip if
345 // HandleNotification() is called outside of Run() (e.g. in unit tests).
346 Delegate::ScopedDoWorkItem scoped_do_work_item;
347 if (run_state_) {
348 scoped_do_work_item = run_state_->delegate->BeginWorkItem();
349 }
350
351 // Trace events must begin after the above BeginWorkItem() so that the
352 // ensuing "ThreadController active" outscopes all the events under it.
353 TRACE_EVENT("toplevel", "EpollEvent", "controller_created_from",
354 controller->created_from_location(), "fd", fd, "can_read",
355 can_read, "can_write", can_write, "context",
356 static_cast<void*>(controller));
357 TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION heap_profiler_scope(
358 controller->created_from_location().file_name());
359 if (can_read && can_write) {
360 bool controller_was_destroyed = false;
361 bool* previous_was_destroyed_flag =
362 std::exchange(controller->was_destroyed_, &controller_was_destroyed);
363
364 controller->OnFdWritable();
365 if (!controller_was_destroyed) {
366 controller->OnFdReadable();
367 }
368 if (!controller_was_destroyed) {
369 controller->was_destroyed_ = previous_was_destroyed_flag;
370 } else if (previous_was_destroyed_flag) {
371 *previous_was_destroyed_flag = true;
372 }
373 } else if (can_write) {
374 controller->OnFdWritable();
375 } else if (can_read) {
376 controller->OnFdReadable();
377 }
378 }
379
HandleWakeUp()380 void MessagePumpEpoll::HandleWakeUp() {
381 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
382 BeginNativeWorkBatch();
383 processed_io_events_ = true;
384 uint64_t value;
385 ssize_t n = HANDLE_EINTR(read(wake_event_.get(), &value, sizeof(value)));
386 DPCHECK(n == sizeof(value));
387 }
388
BeginNativeWorkBatch()389 void MessagePumpEpoll::BeginNativeWorkBatch() {
390 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
391 // Call `BeginNativeWorkBeforeDoWork()` if native work hasn't started.
392 if (!native_work_started_) {
393 if (run_state_) {
394 run_state_->delegate->BeginNativeWorkBeforeDoWork();
395 }
396 native_work_started_ = true;
397 }
398 }
399
EpollEventEntry(int fd)400 MessagePumpEpoll::EpollEventEntry::EpollEventEntry(int fd) : fd(fd) {}
401
~EpollEventEntry()402 MessagePumpEpoll::EpollEventEntry::~EpollEventEntry() {
403 if (active_event) {
404 DCHECK_EQ(this, active_event->data.ptr);
405 active_event->data.ptr = nullptr;
406 }
407 }
408
ComputeActiveEvents()409 uint32_t MessagePumpEpoll::EpollEventEntry::ComputeActiveEvents() {
410 uint32_t events = 0;
411 bool one_shot = true;
412 for (const auto& interest : interests) {
413 if (!interest->active()) {
414 continue;
415 }
416 const InterestParams& params = interest->params();
417 events |= (params.read ? EPOLLIN : 0) | (params.write ? EPOLLOUT : 0);
418 one_shot &= params.one_shot;
419 }
420 if (events != 0 && one_shot) {
421 return events | EPOLLONESHOT;
422 }
423 return events;
424 }
425
426 } // namespace base
427