1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/message_loop/message_pump_kqueue.h"
6
7 #include <sys/errno.h>
8
9 #include <atomic>
10
11 #include "base/apple/mach_logging.h"
12 #include "base/apple/scoped_nsautorelease_pool.h"
13 #include "base/auto_reset.h"
14 #include "base/feature_list.h"
15 #include "base/logging.h"
16 #include "base/mac/mac_util.h"
17 #include "base/notreached.h"
18 #include "base/posix/eintr_wrapper.h"
19 #include "base/task/task_features.h"
20 #include "base/time/time_override.h"
21
22 namespace base {
23
24 namespace {
25
26 // Under this feature a simplified version of the Run() function is used. It
27 // improves legibility and avoids some calls to kevent64(). Remove once
28 // crbug.com/1200141 is resolved.
29 BASE_FEATURE(kUseSimplifiedMessagePumpKqueueLoop,
30 "UseSimplifiedMessagePumpKqueueLoop",
31 base::FEATURE_DISABLED_BY_DEFAULT);
32
33 // Caches the state of the "UseSimplifiedMessagePumpKqueueLoop".
34 std::atomic_bool g_use_simplified_version = false;
35
36 // Caches the state of the "TimerSlackMac" feature for efficiency.
37 std::atomic_bool g_timer_slack = false;
38
39 #if DCHECK_IS_ON()
40 // Prior to macOS 10.14, kqueue timers may spuriously wake up, because earlier
41 // wake ups race with timer resets in the kernel. As of macOS 10.14, updating a
42 // timer from the thread that reads the kqueue does not cause spurious wakeups.
43 // Note that updating a kqueue timer from one thread while another thread is
44 // waiting in a kevent64 invocation is still (inherently) racy.
KqueueTimersSpuriouslyWakeUp()45 bool KqueueTimersSpuriouslyWakeUp() {
46 #if BUILDFLAG(IS_MAC)
47 return false;
48 #else
49 // This still happens on iOS15.
50 return true;
51 #endif
52 }
53 #endif
54
ChangeOneEvent(const ScopedFD & kqueue,kevent64_s * event)55 int ChangeOneEvent(const ScopedFD& kqueue, kevent64_s* event) {
56 return HANDLE_EINTR(kevent64(kqueue.get(), event, 1, nullptr, 0, 0, nullptr));
57 }
58
59 } // namespace
60
FdWatchController(const Location & from_here)61 MessagePumpKqueue::FdWatchController::FdWatchController(
62 const Location& from_here)
63 : FdWatchControllerInterface(from_here) {}
64
~FdWatchController()65 MessagePumpKqueue::FdWatchController::~FdWatchController() {
66 StopWatchingFileDescriptor();
67 }
68
StopWatchingFileDescriptor()69 bool MessagePumpKqueue::FdWatchController::StopWatchingFileDescriptor() {
70 if (!pump_)
71 return true;
72 return pump_->StopWatchingFileDescriptor(this);
73 }
74
Init(WeakPtr<MessagePumpKqueue> pump,int fd,int mode,FdWatcher * watcher)75 void MessagePumpKqueue::FdWatchController::Init(WeakPtr<MessagePumpKqueue> pump,
76 int fd,
77 int mode,
78 FdWatcher* watcher) {
79 DCHECK_NE(fd, -1);
80 DCHECK(!watcher_);
81 DCHECK(watcher);
82 DCHECK(pump);
83 fd_ = fd;
84 mode_ = mode;
85 watcher_ = watcher;
86 pump_ = pump;
87 }
88
Reset()89 void MessagePumpKqueue::FdWatchController::Reset() {
90 fd_ = -1;
91 mode_ = 0;
92 watcher_ = nullptr;
93 pump_ = nullptr;
94 }
95
MachPortWatchController(const Location & from_here)96 MessagePumpKqueue::MachPortWatchController::MachPortWatchController(
97 const Location& from_here)
98 : from_here_(from_here) {}
99
~MachPortWatchController()100 MessagePumpKqueue::MachPortWatchController::~MachPortWatchController() {
101 StopWatchingMachPort();
102 }
103
StopWatchingMachPort()104 bool MessagePumpKqueue::MachPortWatchController::StopWatchingMachPort() {
105 if (!pump_)
106 return true;
107 return pump_->StopWatchingMachPort(this);
108 }
109
Init(WeakPtr<MessagePumpKqueue> pump,mach_port_t port,MachPortWatcher * watcher)110 void MessagePumpKqueue::MachPortWatchController::Init(
111 WeakPtr<MessagePumpKqueue> pump,
112 mach_port_t port,
113 MachPortWatcher* watcher) {
114 DCHECK(!watcher_);
115 DCHECK(watcher);
116 DCHECK(pump);
117 port_ = port;
118 watcher_ = watcher;
119 pump_ = pump;
120 }
121
Reset()122 void MessagePumpKqueue::MachPortWatchController::Reset() {
123 port_ = MACH_PORT_NULL;
124 watcher_ = nullptr;
125 pump_ = nullptr;
126 }
127
MessagePumpKqueue()128 MessagePumpKqueue::MessagePumpKqueue()
129 : kqueue_(kqueue()), weak_factory_(this) {
130 PCHECK(kqueue_.is_valid()) << "kqueue";
131
132 // Create a Mach port that will be used to wake up the pump by sending
133 // a message in response to ScheduleWork(). This is significantly faster than
134 // using an EVFILT_USER event, especially when triggered across threads.
135 kern_return_t kr = mach_port_allocate(
136 mach_task_self(), MACH_PORT_RIGHT_RECEIVE,
137 base::apple::ScopedMachReceiveRight::Receiver(wakeup_).get());
138 MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_allocate";
139
140 // Configure the event to directly receive the Mach message as part of the
141 // kevent64() call.
142 kevent64_s event{};
143 event.ident = wakeup_.get();
144 event.filter = EVFILT_MACHPORT;
145 event.flags = EV_ADD;
146 event.fflags = MACH_RCV_MSG;
147 event.ext[0] = reinterpret_cast<uint64_t>(&wakeup_buffer_);
148 event.ext[1] = sizeof(wakeup_buffer_);
149
150 int rv = ChangeOneEvent(kqueue_, &event);
151 PCHECK(rv == 0) << "kevent64";
152 }
153
~MessagePumpKqueue()154 MessagePumpKqueue::~MessagePumpKqueue() {}
155
InitializeFeatures()156 void MessagePumpKqueue::InitializeFeatures() {
157 g_use_simplified_version.store(
158 base::FeatureList::IsEnabled(kUseSimplifiedMessagePumpKqueueLoop),
159 std::memory_order_relaxed);
160 g_timer_slack.store(FeatureList::IsEnabled(kTimerSlackMac),
161 std::memory_order_relaxed);
162 }
163
Run(Delegate * delegate)164 void MessagePumpKqueue::Run(Delegate* delegate) {
165 AutoReset<bool> reset_keep_running(&keep_running_, true);
166
167 if (g_use_simplified_version.load(std::memory_order_relaxed)) {
168 RunSimplified(delegate);
169 } else {
170 while (keep_running_) {
171 apple::ScopedNSAutoreleasePool pool;
172
173 bool do_more_work = DoInternalWork(delegate, nullptr);
174 if (!keep_running_)
175 break;
176
177 Delegate::NextWorkInfo next_work_info = delegate->DoWork();
178 do_more_work |= next_work_info.is_immediate();
179 if (!keep_running_)
180 break;
181
182 if (do_more_work)
183 continue;
184
185 do_more_work |= delegate->DoIdleWork();
186 if (!keep_running_)
187 break;
188
189 if (do_more_work)
190 continue;
191
192 DoInternalWork(delegate, &next_work_info);
193 }
194 }
195 }
196
RunSimplified(Delegate * delegate)197 void MessagePumpKqueue::RunSimplified(Delegate* delegate) {
198 // Look for native work once before the loop starts. Without this call the
199 // loop would break without checking native work even once in cases where
200 // QuitWhenIdle was used. This is sometimes the case in tests.
201 DoInternalWork(delegate, nullptr);
202
203 while (keep_running_) {
204 apple::ScopedNSAutoreleasePool pool;
205
206 Delegate::NextWorkInfo next_work_info = delegate->DoWork();
207 if (!keep_running_)
208 break;
209
210 if (!next_work_info.is_immediate()) {
211 delegate->DoIdleWork();
212 }
213 if (!keep_running_)
214 break;
215
216 DoInternalWork(delegate, &next_work_info);
217 }
218 }
219
Quit()220 void MessagePumpKqueue::Quit() {
221 keep_running_ = false;
222 ScheduleWork();
223 }
224
ScheduleWork()225 void MessagePumpKqueue::ScheduleWork() {
226 mach_msg_empty_send_t message{};
227 message.header.msgh_size = sizeof(message);
228 message.header.msgh_bits =
229 MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE);
230 message.header.msgh_remote_port = wakeup_.get();
231 kern_return_t kr = mach_msg_send(&message.header);
232 if (kr != KERN_SUCCESS) {
233 // If ScheduleWork() is being called by other threads faster than the pump
234 // can dispatch work, the kernel message queue for the wakeup port can fill
235 // up (this happens under base_perftests, for example). The kernel does
236 // return a SEND_ONCE right in the case of failure, which must be destroyed
237 // to avoid leaking.
238 MACH_DLOG_IF(ERROR, (kr & ~MACH_MSG_IPC_SPACE) != MACH_SEND_NO_BUFFER, kr)
239 << "mach_msg_send";
240 mach_msg_destroy(&message.header);
241 }
242 }
243
ScheduleDelayedWork(const Delegate::NextWorkInfo & next_work_info)244 void MessagePumpKqueue::ScheduleDelayedWork(
245 const Delegate::NextWorkInfo& next_work_info) {
246 // Nothing to do. This MessagePump uses DoWork().
247 }
248
WatchMachReceivePort(mach_port_t port,MachPortWatchController * controller,MachPortWatcher * delegate)249 bool MessagePumpKqueue::WatchMachReceivePort(
250 mach_port_t port,
251 MachPortWatchController* controller,
252 MachPortWatcher* delegate) {
253 DCHECK(port != MACH_PORT_NULL);
254 DCHECK(controller);
255 DCHECK(delegate);
256
257 if (controller->port() != MACH_PORT_NULL) {
258 DLOG(ERROR)
259 << "Cannot use the same MachPortWatchController while it is active";
260 return false;
261 }
262
263 kevent64_s event{};
264 event.ident = port;
265 event.filter = EVFILT_MACHPORT;
266 event.flags = EV_ADD;
267 int rv = ChangeOneEvent(kqueue_, &event);
268 if (rv < 0) {
269 DPLOG(ERROR) << "kevent64";
270 return false;
271 }
272 ++event_count_;
273
274 controller->Init(weak_factory_.GetWeakPtr(), port, delegate);
275 port_controllers_.AddWithID(controller, port);
276
277 return true;
278 }
279
AdjustDelayedRunTime(TimeTicks earliest_time,TimeTicks run_time,TimeTicks latest_time)280 TimeTicks MessagePumpKqueue::AdjustDelayedRunTime(TimeTicks earliest_time,
281 TimeTicks run_time,
282 TimeTicks latest_time) {
283 if (g_timer_slack.load(std::memory_order_relaxed)) {
284 return earliest_time;
285 }
286 return MessagePump::AdjustDelayedRunTime(earliest_time, run_time,
287 latest_time);
288 }
289
WatchFileDescriptor(int fd,bool persistent,int mode,FdWatchController * controller,FdWatcher * delegate)290 bool MessagePumpKqueue::WatchFileDescriptor(int fd,
291 bool persistent,
292 int mode,
293 FdWatchController* controller,
294 FdWatcher* delegate) {
295 DCHECK_GE(fd, 0);
296 DCHECK(controller);
297 DCHECK(delegate);
298 DCHECK_NE(mode & Mode::WATCH_READ_WRITE, 0);
299
300 if (controller->fd() != -1 && controller->fd() != fd) {
301 DLOG(ERROR) << "Cannot use the same FdWatchController on two different FDs";
302 return false;
303 }
304 StopWatchingFileDescriptor(controller);
305
306 std::vector<kevent64_s> events;
307
308 kevent64_s base_event{};
309 base_event.ident = static_cast<uint64_t>(fd);
310 base_event.flags = EV_ADD | (!persistent ? EV_ONESHOT : 0);
311
312 if (mode & Mode::WATCH_READ) {
313 base_event.filter = EVFILT_READ;
314 base_event.udata = fd_controllers_.Add(controller);
315 events.push_back(base_event);
316 }
317 if (mode & Mode::WATCH_WRITE) {
318 base_event.filter = EVFILT_WRITE;
319 base_event.udata = fd_controllers_.Add(controller);
320 events.push_back(base_event);
321 }
322
323 int rv = HANDLE_EINTR(kevent64(kqueue_.get(), events.data(),
324 checked_cast<int>(events.size()), nullptr, 0,
325 0, nullptr));
326 if (rv < 0) {
327 DPLOG(ERROR) << "WatchFileDescriptor kevent64";
328 return false;
329 }
330
331 event_count_ += events.size();
332 controller->Init(weak_factory_.GetWeakPtr(), fd, mode, delegate);
333
334 return true;
335 }
336
SetWakeupTimerEvent(const base::TimeTicks & wakeup_time,base::TimeDelta leeway,kevent64_s * timer_event)337 void MessagePumpKqueue::SetWakeupTimerEvent(const base::TimeTicks& wakeup_time,
338 base::TimeDelta leeway,
339 kevent64_s* timer_event) {
340 // The ident of the wakeup timer. There's only the one timer as the pair
341 // (ident, filter) is the identity of the event.
342 constexpr uint64_t kWakeupTimerIdent = 0x0;
343 timer_event->ident = kWakeupTimerIdent;
344 timer_event->filter = EVFILT_TIMER;
345 if (wakeup_time == base::TimeTicks::Max()) {
346 timer_event->flags = EV_DELETE;
347 } else {
348 timer_event->filter = EVFILT_TIMER;
349 // This updates the timer if it already exists in |kqueue_|.
350 timer_event->flags = EV_ADD | EV_ONESHOT;
351
352 // Specify the sleep in microseconds to avoid undersleeping due to
353 // numeric problems. The sleep is computed from TimeTicks::Now rather than
354 // NextWorkInfo::recent_now because recent_now is strictly earlier than
355 // current wall-clock. Using an earlier wall clock time to compute the
356 // delta to the next wakeup wall-clock time would guarantee oversleep.
357 // If wakeup_time is in the past, the delta below will be negative and the
358 // timer is set immediately.
359 timer_event->fflags = NOTE_USECONDS;
360 timer_event->data = (wakeup_time - base::TimeTicks::Now()).InMicroseconds();
361
362 if (!leeway.is_zero() && g_timer_slack.load(std::memory_order_relaxed)) {
363 // Specify slack based on |leeway|.
364 // See "man kqueue" in recent macOSen for documentation.
365 timer_event->fflags |= NOTE_LEEWAY;
366 timer_event->ext[1] = static_cast<uint64_t>(leeway.InMicroseconds());
367 }
368 }
369 }
370
StopWatchingMachPort(MachPortWatchController * controller)371 bool MessagePumpKqueue::StopWatchingMachPort(
372 MachPortWatchController* controller) {
373 mach_port_t port = controller->port();
374 controller->Reset();
375 port_controllers_.Remove(port);
376
377 kevent64_s event{};
378 event.ident = port;
379 event.filter = EVFILT_MACHPORT;
380 event.flags = EV_DELETE;
381 --event_count_;
382 int rv = ChangeOneEvent(kqueue_, &event);
383 if (rv < 0) {
384 DPLOG(ERROR) << "kevent64";
385 return false;
386 }
387
388 return true;
389 }
390
StopWatchingFileDescriptor(FdWatchController * controller)391 bool MessagePumpKqueue::StopWatchingFileDescriptor(
392 FdWatchController* controller) {
393 int fd = controller->fd();
394 int mode = controller->mode();
395 controller->Reset();
396
397 if (fd < 0)
398 return true;
399
400 std::vector<kevent64_s> events;
401
402 kevent64_s base_event{};
403 base_event.ident = static_cast<uint64_t>(fd);
404 base_event.flags = EV_DELETE;
405
406 if (mode & Mode::WATCH_READ) {
407 base_event.filter = EVFILT_READ;
408 events.push_back(base_event);
409 }
410 if (mode & Mode::WATCH_WRITE) {
411 base_event.filter = EVFILT_WRITE;
412 events.push_back(base_event);
413 }
414
415 int rv = HANDLE_EINTR(kevent64(kqueue_.get(), events.data(),
416 checked_cast<int>(events.size()), nullptr, 0,
417 0, nullptr));
418 DPLOG_IF(ERROR, rv < 0) << "StopWatchingFileDescriptor kevent64";
419
420 // The keys for the IDMap aren't recorded anywhere (they're attached to the
421 // kevent object in the kernel), so locate the entries by controller pointer.
422 for (IDMap<FdWatchController*, uint64_t>::iterator it(&fd_controllers_);
423 !it.IsAtEnd(); it.Advance()) {
424 if (it.GetCurrentValue() == controller) {
425 fd_controllers_.Remove(it.GetCurrentKey());
426 }
427 }
428
429 event_count_ -= events.size();
430
431 return rv >= 0;
432 }
433
DoInternalWork(Delegate * delegate,Delegate::NextWorkInfo * next_work_info)434 bool MessagePumpKqueue::DoInternalWork(Delegate* delegate,
435 Delegate::NextWorkInfo* next_work_info) {
436 if (events_.size() < event_count_) {
437 events_.resize(event_count_);
438 }
439
440 bool immediate = next_work_info == nullptr;
441 unsigned int flags = immediate ? KEVENT_FLAG_IMMEDIATE : 0;
442
443 if (!immediate) {
444 MaybeUpdateWakeupTimer(next_work_info->delayed_run_time,
445 next_work_info->leeway);
446 DCHECK_EQ(scheduled_wakeup_time_, next_work_info->delayed_run_time);
447 delegate->BeforeWait();
448 }
449
450 int rv =
451 HANDLE_EINTR(kevent64(kqueue_.get(), nullptr, 0, events_.data(),
452 checked_cast<int>(events_.size()), flags, nullptr));
453 if (rv == 0) {
454 // No events to dispatch so no need to call ProcessEvents().
455 return false;
456 }
457
458 PCHECK(rv > 0) << "kevent64";
459 return ProcessEvents(delegate, static_cast<size_t>(rv));
460 }
461
ProcessEvents(Delegate * delegate,size_t count)462 bool MessagePumpKqueue::ProcessEvents(Delegate* delegate, size_t count) {
463 bool did_work = false;
464
465 delegate->BeginNativeWorkBeforeDoWork();
466 for (size_t i = 0; i < count; ++i) {
467 auto* event = &events_[i];
468 if (event->filter == EVFILT_READ || event->filter == EVFILT_WRITE) {
469 did_work = true;
470
471 FdWatchController* controller = fd_controllers_.Lookup(event->udata);
472 if (!controller) {
473 // The controller was removed by some other work callout before
474 // this event could be processed.
475 continue;
476 }
477 FdWatcher* fd_watcher = controller->watcher();
478
479 if (event->flags & EV_ONESHOT) {
480 // If this was a one-shot event, the Controller needs to stop tracking
481 // the descriptor, so it is not double-removed when it is told to stop
482 // watching.
483 controller->Reset();
484 fd_controllers_.Remove(event->udata);
485 --event_count_;
486 }
487
488 auto scoped_do_work_item = delegate->BeginWorkItem();
489 // WatchFileDescriptor() originally upcasts event->ident from an int.
490 if (event->filter == EVFILT_READ) {
491 fd_watcher->OnFileCanReadWithoutBlocking(
492 static_cast<int>(event->ident));
493 } else if (event->filter == EVFILT_WRITE) {
494 fd_watcher->OnFileCanWriteWithoutBlocking(
495 static_cast<int>(event->ident));
496 }
497 } else if (event->filter == EVFILT_MACHPORT) {
498 // WatchMachReceivePort() originally sets event->ident from a mach_port_t.
499 mach_port_t port = static_cast<mach_port_t>(event->ident);
500 if (port == wakeup_.get()) {
501 // The wakeup event has been received, do not treat this as "doing
502 // work", this just wakes up the pump.
503 continue;
504 }
505
506 did_work = true;
507
508 MachPortWatchController* controller = port_controllers_.Lookup(port);
509 // The controller could have been removed by some other work callout
510 // before this event could be processed.
511 if (controller) {
512 auto scoped_do_work_item = delegate->BeginWorkItem();
513 controller->watcher()->OnMachMessageReceived(port);
514 }
515 } else if (event->filter == EVFILT_TIMER) {
516 // The wakeup timer fired.
517 #if DCHECK_IS_ON()
518 // On macOS 10.13 and earlier, kqueue timers may spuriously wake up.
519 // When this happens, the timer will be re-scheduled the next time
520 // DoInternalWork is entered, which means this doesn't lead to a
521 // spinning wait.
522 // When clock overrides are active, TimeTicks::Now may be decoupled from
523 // wall-clock time, and can therefore not be used to validate whether the
524 // expected wall-clock time has passed.
525 if (!KqueueTimersSpuriouslyWakeUp() &&
526 !subtle::ScopedTimeClockOverrides::overrides_active()) {
527 // Given the caveats above, assert that the timer didn't fire early.
528 DCHECK_LE(scheduled_wakeup_time_, base::TimeTicks::Now());
529 }
530 #endif
531 DCHECK_NE(scheduled_wakeup_time_, base::TimeTicks::Max());
532 scheduled_wakeup_time_ = base::TimeTicks::Max();
533 --event_count_;
534 } else {
535 NOTREACHED() << "Unexpected event for filter " << event->filter;
536 }
537 }
538
539 return did_work;
540 }
541
MaybeUpdateWakeupTimer(const base::TimeTicks & wakeup_time,base::TimeDelta leeway)542 void MessagePumpKqueue::MaybeUpdateWakeupTimer(
543 const base::TimeTicks& wakeup_time,
544 base::TimeDelta leeway) {
545 if (wakeup_time == scheduled_wakeup_time_) {
546 // No change in the timer setting necessary.
547 return;
548 }
549
550 if (wakeup_time == base::TimeTicks::Max()) {
551 // If the timer was already reset, don't re-reset it on a suspend toggle.
552 if (scheduled_wakeup_time_ != base::TimeTicks::Max()) {
553 // Clear the timer.
554 kevent64_s timer{};
555 SetWakeupTimerEvent(wakeup_time, leeway, &timer);
556 int rv = ChangeOneEvent(kqueue_, &timer);
557 PCHECK(rv == 0) << "kevent64, delete timer";
558 --event_count_;
559 }
560 } else {
561 // Set/reset the timer.
562 kevent64_s timer{};
563 SetWakeupTimerEvent(wakeup_time, leeway, &timer);
564 int rv = ChangeOneEvent(kqueue_, &timer);
565 PCHECK(rv == 0) << "kevent64, set timer";
566
567 // Bump the event count if we just added the timer.
568 if (scheduled_wakeup_time_ == base::TimeTicks::Max())
569 ++event_count_;
570 }
571
572 scheduled_wakeup_time_ = wakeup_time;
573 }
574
575 } // namespace base
576