xref: /aosp_15_r20/external/cronet/base/message_loop/message_pump_kqueue.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/message_loop/message_pump_kqueue.h"
6 
7 #include <sys/errno.h>
8 
9 #include <atomic>
10 
11 #include "base/apple/mach_logging.h"
12 #include "base/apple/scoped_nsautorelease_pool.h"
13 #include "base/auto_reset.h"
14 #include "base/feature_list.h"
15 #include "base/logging.h"
16 #include "base/mac/mac_util.h"
17 #include "base/notreached.h"
18 #include "base/posix/eintr_wrapper.h"
19 #include "base/task/task_features.h"
20 #include "base/time/time_override.h"
21 
22 namespace base {
23 
24 namespace {
25 
26 // Under this feature a simplified version of the Run() function is used. It
27 // improves legibility and avoids some calls to kevent64(). Remove once
28 // crbug.com/1200141 is resolved.
29 BASE_FEATURE(kUseSimplifiedMessagePumpKqueueLoop,
30              "UseSimplifiedMessagePumpKqueueLoop",
31              base::FEATURE_DISABLED_BY_DEFAULT);
32 
33 // Caches the state of the "UseSimplifiedMessagePumpKqueueLoop".
34 std::atomic_bool g_use_simplified_version = false;
35 
36 // Caches the state of the "TimerSlackMac" feature for efficiency.
37 std::atomic_bool g_timer_slack = false;
38 
39 #if DCHECK_IS_ON()
40 // Prior to macOS 10.14, kqueue timers may spuriously wake up, because earlier
41 // wake ups race with timer resets in the kernel. As of macOS 10.14, updating a
42 // timer from the thread that reads the kqueue does not cause spurious wakeups.
43 // Note that updating a kqueue timer from one thread while another thread is
44 // waiting in a kevent64 invocation is still (inherently) racy.
KqueueTimersSpuriouslyWakeUp()45 bool KqueueTimersSpuriouslyWakeUp() {
46 #if BUILDFLAG(IS_MAC)
47   return false;
48 #else
49   // This still happens on iOS15.
50   return true;
51 #endif
52 }
53 #endif
54 
ChangeOneEvent(const ScopedFD & kqueue,kevent64_s * event)55 int ChangeOneEvent(const ScopedFD& kqueue, kevent64_s* event) {
56   return HANDLE_EINTR(kevent64(kqueue.get(), event, 1, nullptr, 0, 0, nullptr));
57 }
58 
59 }  // namespace
60 
FdWatchController(const Location & from_here)61 MessagePumpKqueue::FdWatchController::FdWatchController(
62     const Location& from_here)
63     : FdWatchControllerInterface(from_here) {}
64 
~FdWatchController()65 MessagePumpKqueue::FdWatchController::~FdWatchController() {
66   StopWatchingFileDescriptor();
67 }
68 
StopWatchingFileDescriptor()69 bool MessagePumpKqueue::FdWatchController::StopWatchingFileDescriptor() {
70   if (!pump_)
71     return true;
72   return pump_->StopWatchingFileDescriptor(this);
73 }
74 
Init(WeakPtr<MessagePumpKqueue> pump,int fd,int mode,FdWatcher * watcher)75 void MessagePumpKqueue::FdWatchController::Init(WeakPtr<MessagePumpKqueue> pump,
76                                                 int fd,
77                                                 int mode,
78                                                 FdWatcher* watcher) {
79   DCHECK_NE(fd, -1);
80   DCHECK(!watcher_);
81   DCHECK(watcher);
82   DCHECK(pump);
83   fd_ = fd;
84   mode_ = mode;
85   watcher_ = watcher;
86   pump_ = pump;
87 }
88 
Reset()89 void MessagePumpKqueue::FdWatchController::Reset() {
90   fd_ = -1;
91   mode_ = 0;
92   watcher_ = nullptr;
93   pump_ = nullptr;
94 }
95 
MachPortWatchController(const Location & from_here)96 MessagePumpKqueue::MachPortWatchController::MachPortWatchController(
97     const Location& from_here)
98     : from_here_(from_here) {}
99 
~MachPortWatchController()100 MessagePumpKqueue::MachPortWatchController::~MachPortWatchController() {
101   StopWatchingMachPort();
102 }
103 
StopWatchingMachPort()104 bool MessagePumpKqueue::MachPortWatchController::StopWatchingMachPort() {
105   if (!pump_)
106     return true;
107   return pump_->StopWatchingMachPort(this);
108 }
109 
Init(WeakPtr<MessagePumpKqueue> pump,mach_port_t port,MachPortWatcher * watcher)110 void MessagePumpKqueue::MachPortWatchController::Init(
111     WeakPtr<MessagePumpKqueue> pump,
112     mach_port_t port,
113     MachPortWatcher* watcher) {
114   DCHECK(!watcher_);
115   DCHECK(watcher);
116   DCHECK(pump);
117   port_ = port;
118   watcher_ = watcher;
119   pump_ = pump;
120 }
121 
Reset()122 void MessagePumpKqueue::MachPortWatchController::Reset() {
123   port_ = MACH_PORT_NULL;
124   watcher_ = nullptr;
125   pump_ = nullptr;
126 }
127 
MessagePumpKqueue()128 MessagePumpKqueue::MessagePumpKqueue()
129     : kqueue_(kqueue()), weak_factory_(this) {
130   PCHECK(kqueue_.is_valid()) << "kqueue";
131 
132   // Create a Mach port that will be used to wake up the pump by sending
133   // a message in response to ScheduleWork(). This is significantly faster than
134   // using an EVFILT_USER event, especially when triggered across threads.
135   kern_return_t kr = mach_port_allocate(
136       mach_task_self(), MACH_PORT_RIGHT_RECEIVE,
137       base::apple::ScopedMachReceiveRight::Receiver(wakeup_).get());
138   MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_allocate";
139 
140   // Configure the event to directly receive the Mach message as part of the
141   // kevent64() call.
142   kevent64_s event{};
143   event.ident = wakeup_.get();
144   event.filter = EVFILT_MACHPORT;
145   event.flags = EV_ADD;
146   event.fflags = MACH_RCV_MSG;
147   event.ext[0] = reinterpret_cast<uint64_t>(&wakeup_buffer_);
148   event.ext[1] = sizeof(wakeup_buffer_);
149 
150   int rv = ChangeOneEvent(kqueue_, &event);
151   PCHECK(rv == 0) << "kevent64";
152 }
153 
~MessagePumpKqueue()154 MessagePumpKqueue::~MessagePumpKqueue() {}
155 
InitializeFeatures()156 void MessagePumpKqueue::InitializeFeatures() {
157   g_use_simplified_version.store(
158       base::FeatureList::IsEnabled(kUseSimplifiedMessagePumpKqueueLoop),
159       std::memory_order_relaxed);
160   g_timer_slack.store(FeatureList::IsEnabled(kTimerSlackMac),
161                       std::memory_order_relaxed);
162 }
163 
Run(Delegate * delegate)164 void MessagePumpKqueue::Run(Delegate* delegate) {
165   AutoReset<bool> reset_keep_running(&keep_running_, true);
166 
167   if (g_use_simplified_version.load(std::memory_order_relaxed)) {
168     RunSimplified(delegate);
169   } else {
170     while (keep_running_) {
171       apple::ScopedNSAutoreleasePool pool;
172 
173       bool do_more_work = DoInternalWork(delegate, nullptr);
174       if (!keep_running_)
175         break;
176 
177       Delegate::NextWorkInfo next_work_info = delegate->DoWork();
178       do_more_work |= next_work_info.is_immediate();
179       if (!keep_running_)
180         break;
181 
182       if (do_more_work)
183         continue;
184 
185       do_more_work |= delegate->DoIdleWork();
186       if (!keep_running_)
187         break;
188 
189       if (do_more_work)
190         continue;
191 
192       DoInternalWork(delegate, &next_work_info);
193     }
194   }
195 }
196 
RunSimplified(Delegate * delegate)197 void MessagePumpKqueue::RunSimplified(Delegate* delegate) {
198   // Look for native work once before the loop starts. Without this call the
199   // loop would break without checking native work even once in cases where
200   // QuitWhenIdle was used. This is sometimes the case in tests.
201   DoInternalWork(delegate, nullptr);
202 
203   while (keep_running_) {
204     apple::ScopedNSAutoreleasePool pool;
205 
206     Delegate::NextWorkInfo next_work_info = delegate->DoWork();
207     if (!keep_running_)
208       break;
209 
210     if (!next_work_info.is_immediate()) {
211       delegate->DoIdleWork();
212     }
213     if (!keep_running_)
214       break;
215 
216     DoInternalWork(delegate, &next_work_info);
217   }
218 }
219 
Quit()220 void MessagePumpKqueue::Quit() {
221   keep_running_ = false;
222   ScheduleWork();
223 }
224 
ScheduleWork()225 void MessagePumpKqueue::ScheduleWork() {
226   mach_msg_empty_send_t message{};
227   message.header.msgh_size = sizeof(message);
228   message.header.msgh_bits =
229       MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE);
230   message.header.msgh_remote_port = wakeup_.get();
231   kern_return_t kr = mach_msg_send(&message.header);
232   if (kr != KERN_SUCCESS) {
233     // If ScheduleWork() is being called by other threads faster than the pump
234     // can dispatch work, the kernel message queue for the wakeup port can fill
235     // up (this happens under base_perftests, for example). The kernel does
236     // return a SEND_ONCE right in the case of failure, which must be destroyed
237     // to avoid leaking.
238     MACH_DLOG_IF(ERROR, (kr & ~MACH_MSG_IPC_SPACE) != MACH_SEND_NO_BUFFER, kr)
239         << "mach_msg_send";
240     mach_msg_destroy(&message.header);
241   }
242 }
243 
ScheduleDelayedWork(const Delegate::NextWorkInfo & next_work_info)244 void MessagePumpKqueue::ScheduleDelayedWork(
245     const Delegate::NextWorkInfo& next_work_info) {
246   // Nothing to do. This MessagePump uses DoWork().
247 }
248 
WatchMachReceivePort(mach_port_t port,MachPortWatchController * controller,MachPortWatcher * delegate)249 bool MessagePumpKqueue::WatchMachReceivePort(
250     mach_port_t port,
251     MachPortWatchController* controller,
252     MachPortWatcher* delegate) {
253   DCHECK(port != MACH_PORT_NULL);
254   DCHECK(controller);
255   DCHECK(delegate);
256 
257   if (controller->port() != MACH_PORT_NULL) {
258     DLOG(ERROR)
259         << "Cannot use the same MachPortWatchController while it is active";
260     return false;
261   }
262 
263   kevent64_s event{};
264   event.ident = port;
265   event.filter = EVFILT_MACHPORT;
266   event.flags = EV_ADD;
267   int rv = ChangeOneEvent(kqueue_, &event);
268   if (rv < 0) {
269     DPLOG(ERROR) << "kevent64";
270     return false;
271   }
272   ++event_count_;
273 
274   controller->Init(weak_factory_.GetWeakPtr(), port, delegate);
275   port_controllers_.AddWithID(controller, port);
276 
277   return true;
278 }
279 
AdjustDelayedRunTime(TimeTicks earliest_time,TimeTicks run_time,TimeTicks latest_time)280 TimeTicks MessagePumpKqueue::AdjustDelayedRunTime(TimeTicks earliest_time,
281                                                   TimeTicks run_time,
282                                                   TimeTicks latest_time) {
283   if (g_timer_slack.load(std::memory_order_relaxed)) {
284     return earliest_time;
285   }
286   return MessagePump::AdjustDelayedRunTime(earliest_time, run_time,
287                                            latest_time);
288 }
289 
WatchFileDescriptor(int fd,bool persistent,int mode,FdWatchController * controller,FdWatcher * delegate)290 bool MessagePumpKqueue::WatchFileDescriptor(int fd,
291                                             bool persistent,
292                                             int mode,
293                                             FdWatchController* controller,
294                                             FdWatcher* delegate) {
295   DCHECK_GE(fd, 0);
296   DCHECK(controller);
297   DCHECK(delegate);
298   DCHECK_NE(mode & Mode::WATCH_READ_WRITE, 0);
299 
300   if (controller->fd() != -1 && controller->fd() != fd) {
301     DLOG(ERROR) << "Cannot use the same FdWatchController on two different FDs";
302     return false;
303   }
304   StopWatchingFileDescriptor(controller);
305 
306   std::vector<kevent64_s> events;
307 
308   kevent64_s base_event{};
309   base_event.ident = static_cast<uint64_t>(fd);
310   base_event.flags = EV_ADD | (!persistent ? EV_ONESHOT : 0);
311 
312   if (mode & Mode::WATCH_READ) {
313     base_event.filter = EVFILT_READ;
314     base_event.udata = fd_controllers_.Add(controller);
315     events.push_back(base_event);
316   }
317   if (mode & Mode::WATCH_WRITE) {
318     base_event.filter = EVFILT_WRITE;
319     base_event.udata = fd_controllers_.Add(controller);
320     events.push_back(base_event);
321   }
322 
323   int rv = HANDLE_EINTR(kevent64(kqueue_.get(), events.data(),
324                                  checked_cast<int>(events.size()), nullptr, 0,
325                                  0, nullptr));
326   if (rv < 0) {
327     DPLOG(ERROR) << "WatchFileDescriptor kevent64";
328     return false;
329   }
330 
331   event_count_ += events.size();
332   controller->Init(weak_factory_.GetWeakPtr(), fd, mode, delegate);
333 
334   return true;
335 }
336 
SetWakeupTimerEvent(const base::TimeTicks & wakeup_time,base::TimeDelta leeway,kevent64_s * timer_event)337 void MessagePumpKqueue::SetWakeupTimerEvent(const base::TimeTicks& wakeup_time,
338                                             base::TimeDelta leeway,
339                                             kevent64_s* timer_event) {
340   // The ident of the wakeup timer. There's only the one timer as the pair
341   // (ident, filter) is the identity of the event.
342   constexpr uint64_t kWakeupTimerIdent = 0x0;
343   timer_event->ident = kWakeupTimerIdent;
344   timer_event->filter = EVFILT_TIMER;
345   if (wakeup_time == base::TimeTicks::Max()) {
346     timer_event->flags = EV_DELETE;
347   } else {
348     timer_event->filter = EVFILT_TIMER;
349     // This updates the timer if it already exists in |kqueue_|.
350     timer_event->flags = EV_ADD | EV_ONESHOT;
351 
352     // Specify the sleep in microseconds to avoid undersleeping due to
353     // numeric problems. The sleep is computed from TimeTicks::Now rather than
354     // NextWorkInfo::recent_now because recent_now is strictly earlier than
355     // current wall-clock. Using an earlier wall clock time  to compute the
356     // delta to the next wakeup wall-clock time would guarantee oversleep.
357     // If wakeup_time is in the past, the delta below will be negative and the
358     // timer is set immediately.
359     timer_event->fflags = NOTE_USECONDS;
360     timer_event->data = (wakeup_time - base::TimeTicks::Now()).InMicroseconds();
361 
362     if (!leeway.is_zero() && g_timer_slack.load(std::memory_order_relaxed)) {
363       // Specify slack based on |leeway|.
364       // See "man kqueue" in recent macOSen for documentation.
365       timer_event->fflags |= NOTE_LEEWAY;
366       timer_event->ext[1] = static_cast<uint64_t>(leeway.InMicroseconds());
367     }
368   }
369 }
370 
StopWatchingMachPort(MachPortWatchController * controller)371 bool MessagePumpKqueue::StopWatchingMachPort(
372     MachPortWatchController* controller) {
373   mach_port_t port = controller->port();
374   controller->Reset();
375   port_controllers_.Remove(port);
376 
377   kevent64_s event{};
378   event.ident = port;
379   event.filter = EVFILT_MACHPORT;
380   event.flags = EV_DELETE;
381   --event_count_;
382   int rv = ChangeOneEvent(kqueue_, &event);
383   if (rv < 0) {
384     DPLOG(ERROR) << "kevent64";
385     return false;
386   }
387 
388   return true;
389 }
390 
StopWatchingFileDescriptor(FdWatchController * controller)391 bool MessagePumpKqueue::StopWatchingFileDescriptor(
392     FdWatchController* controller) {
393   int fd = controller->fd();
394   int mode = controller->mode();
395   controller->Reset();
396 
397   if (fd < 0)
398     return true;
399 
400   std::vector<kevent64_s> events;
401 
402   kevent64_s base_event{};
403   base_event.ident = static_cast<uint64_t>(fd);
404   base_event.flags = EV_DELETE;
405 
406   if (mode & Mode::WATCH_READ) {
407     base_event.filter = EVFILT_READ;
408     events.push_back(base_event);
409   }
410   if (mode & Mode::WATCH_WRITE) {
411     base_event.filter = EVFILT_WRITE;
412     events.push_back(base_event);
413   }
414 
415   int rv = HANDLE_EINTR(kevent64(kqueue_.get(), events.data(),
416                                  checked_cast<int>(events.size()), nullptr, 0,
417                                  0, nullptr));
418   DPLOG_IF(ERROR, rv < 0) << "StopWatchingFileDescriptor kevent64";
419 
420   // The keys for the IDMap aren't recorded anywhere (they're attached to the
421   // kevent object in the kernel), so locate the entries by controller pointer.
422   for (IDMap<FdWatchController*, uint64_t>::iterator it(&fd_controllers_);
423        !it.IsAtEnd(); it.Advance()) {
424     if (it.GetCurrentValue() == controller) {
425       fd_controllers_.Remove(it.GetCurrentKey());
426     }
427   }
428 
429   event_count_ -= events.size();
430 
431   return rv >= 0;
432 }
433 
DoInternalWork(Delegate * delegate,Delegate::NextWorkInfo * next_work_info)434 bool MessagePumpKqueue::DoInternalWork(Delegate* delegate,
435                                        Delegate::NextWorkInfo* next_work_info) {
436   if (events_.size() < event_count_) {
437     events_.resize(event_count_);
438   }
439 
440   bool immediate = next_work_info == nullptr;
441   unsigned int flags = immediate ? KEVENT_FLAG_IMMEDIATE : 0;
442 
443   if (!immediate) {
444     MaybeUpdateWakeupTimer(next_work_info->delayed_run_time,
445                            next_work_info->leeway);
446     DCHECK_EQ(scheduled_wakeup_time_, next_work_info->delayed_run_time);
447     delegate->BeforeWait();
448   }
449 
450   int rv =
451       HANDLE_EINTR(kevent64(kqueue_.get(), nullptr, 0, events_.data(),
452                             checked_cast<int>(events_.size()), flags, nullptr));
453   if (rv == 0) {
454     // No events to dispatch so no need to call ProcessEvents().
455     return false;
456   }
457 
458   PCHECK(rv > 0) << "kevent64";
459   return ProcessEvents(delegate, static_cast<size_t>(rv));
460 }
461 
ProcessEvents(Delegate * delegate,size_t count)462 bool MessagePumpKqueue::ProcessEvents(Delegate* delegate, size_t count) {
463   bool did_work = false;
464 
465   delegate->BeginNativeWorkBeforeDoWork();
466   for (size_t i = 0; i < count; ++i) {
467     auto* event = &events_[i];
468     if (event->filter == EVFILT_READ || event->filter == EVFILT_WRITE) {
469       did_work = true;
470 
471       FdWatchController* controller = fd_controllers_.Lookup(event->udata);
472       if (!controller) {
473         // The controller was removed by some other work callout before
474         // this event could be processed.
475         continue;
476       }
477       FdWatcher* fd_watcher = controller->watcher();
478 
479       if (event->flags & EV_ONESHOT) {
480         // If this was a one-shot event, the Controller needs to stop tracking
481         // the descriptor, so it is not double-removed when it is told to stop
482         // watching.
483         controller->Reset();
484         fd_controllers_.Remove(event->udata);
485         --event_count_;
486       }
487 
488       auto scoped_do_work_item = delegate->BeginWorkItem();
489       // WatchFileDescriptor() originally upcasts event->ident from an int.
490       if (event->filter == EVFILT_READ) {
491         fd_watcher->OnFileCanReadWithoutBlocking(
492             static_cast<int>(event->ident));
493       } else if (event->filter == EVFILT_WRITE) {
494         fd_watcher->OnFileCanWriteWithoutBlocking(
495             static_cast<int>(event->ident));
496       }
497     } else if (event->filter == EVFILT_MACHPORT) {
498       // WatchMachReceivePort() originally sets event->ident from a mach_port_t.
499       mach_port_t port = static_cast<mach_port_t>(event->ident);
500       if (port == wakeup_.get()) {
501         // The wakeup event has been received, do not treat this as "doing
502         // work", this just wakes up the pump.
503         continue;
504       }
505 
506       did_work = true;
507 
508       MachPortWatchController* controller = port_controllers_.Lookup(port);
509       // The controller could have been removed by some other work callout
510       // before this event could be processed.
511       if (controller) {
512         auto scoped_do_work_item = delegate->BeginWorkItem();
513         controller->watcher()->OnMachMessageReceived(port);
514       }
515     } else if (event->filter == EVFILT_TIMER) {
516       // The wakeup timer fired.
517 #if DCHECK_IS_ON()
518       // On macOS 10.13 and earlier, kqueue timers may spuriously wake up.
519       // When this happens, the timer will be re-scheduled the next time
520       // DoInternalWork is entered, which means this doesn't lead to a
521       // spinning wait.
522       // When clock overrides are active, TimeTicks::Now may be decoupled from
523       // wall-clock time, and can therefore not be used to validate whether the
524       // expected wall-clock time has passed.
525       if (!KqueueTimersSpuriouslyWakeUp() &&
526           !subtle::ScopedTimeClockOverrides::overrides_active()) {
527         // Given the caveats above, assert that the timer didn't fire early.
528         DCHECK_LE(scheduled_wakeup_time_, base::TimeTicks::Now());
529       }
530 #endif
531       DCHECK_NE(scheduled_wakeup_time_, base::TimeTicks::Max());
532       scheduled_wakeup_time_ = base::TimeTicks::Max();
533       --event_count_;
534     } else {
535       NOTREACHED() << "Unexpected event for filter " << event->filter;
536     }
537   }
538 
539   return did_work;
540 }
541 
MaybeUpdateWakeupTimer(const base::TimeTicks & wakeup_time,base::TimeDelta leeway)542 void MessagePumpKqueue::MaybeUpdateWakeupTimer(
543     const base::TimeTicks& wakeup_time,
544     base::TimeDelta leeway) {
545   if (wakeup_time == scheduled_wakeup_time_) {
546     // No change in the timer setting necessary.
547     return;
548   }
549 
550   if (wakeup_time == base::TimeTicks::Max()) {
551     // If the timer was already reset, don't re-reset it on a suspend toggle.
552     if (scheduled_wakeup_time_ != base::TimeTicks::Max()) {
553       // Clear the timer.
554       kevent64_s timer{};
555       SetWakeupTimerEvent(wakeup_time, leeway, &timer);
556       int rv = ChangeOneEvent(kqueue_, &timer);
557       PCHECK(rv == 0) << "kevent64, delete timer";
558       --event_count_;
559     }
560   } else {
561     // Set/reset the timer.
562     kevent64_s timer{};
563     SetWakeupTimerEvent(wakeup_time, leeway, &timer);
564     int rv = ChangeOneEvent(kqueue_, &timer);
565     PCHECK(rv == 0) << "kevent64, set timer";
566 
567     // Bump the event count if we just added the timer.
568     if (scheduled_wakeup_time_ == base::TimeTicks::Max())
569       ++event_count_;
570   }
571 
572   scheduled_wakeup_time_ = wakeup_time;
573 }
574 
575 }  // namespace base
576