// Copyright 2019 The Chromium Authors // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "base/message_loop/message_pump_kqueue.h" #include #include #include "base/apple/mach_logging.h" #include "base/apple/scoped_nsautorelease_pool.h" #include "base/auto_reset.h" #include "base/feature_list.h" #include "base/logging.h" #include "base/mac/mac_util.h" #include "base/notreached.h" #include "base/posix/eintr_wrapper.h" #include "base/task/task_features.h" #include "base/time/time_override.h" namespace base { namespace { // Under this feature a simplified version of the Run() function is used. It // improves legibility and avoids some calls to kevent64(). Remove once // crbug.com/1200141 is resolved. BASE_FEATURE(kUseSimplifiedMessagePumpKqueueLoop, "UseSimplifiedMessagePumpKqueueLoop", base::FEATURE_DISABLED_BY_DEFAULT); // Caches the state of the "UseSimplifiedMessagePumpKqueueLoop". std::atomic_bool g_use_simplified_version = false; // Caches the state of the "TimerSlackMac" feature for efficiency. std::atomic_bool g_timer_slack = false; #if DCHECK_IS_ON() // Prior to macOS 10.14, kqueue timers may spuriously wake up, because earlier // wake ups race with timer resets in the kernel. As of macOS 10.14, updating a // timer from the thread that reads the kqueue does not cause spurious wakeups. // Note that updating a kqueue timer from one thread while another thread is // waiting in a kevent64 invocation is still (inherently) racy. bool KqueueTimersSpuriouslyWakeUp() { #if BUILDFLAG(IS_MAC) return false; #else // This still happens on iOS15. return true; #endif } #endif int ChangeOneEvent(const ScopedFD& kqueue, kevent64_s* event) { return HANDLE_EINTR(kevent64(kqueue.get(), event, 1, nullptr, 0, 0, nullptr)); } } // namespace MessagePumpKqueue::FdWatchController::FdWatchController( const Location& from_here) : FdWatchControllerInterface(from_here) {} MessagePumpKqueue::FdWatchController::~FdWatchController() { StopWatchingFileDescriptor(); } bool MessagePumpKqueue::FdWatchController::StopWatchingFileDescriptor() { if (!pump_) return true; return pump_->StopWatchingFileDescriptor(this); } void MessagePumpKqueue::FdWatchController::Init(WeakPtr pump, int fd, int mode, FdWatcher* watcher) { DCHECK_NE(fd, -1); DCHECK(!watcher_); DCHECK(watcher); DCHECK(pump); fd_ = fd; mode_ = mode; watcher_ = watcher; pump_ = pump; } void MessagePumpKqueue::FdWatchController::Reset() { fd_ = -1; mode_ = 0; watcher_ = nullptr; pump_ = nullptr; } MessagePumpKqueue::MachPortWatchController::MachPortWatchController( const Location& from_here) : from_here_(from_here) {} MessagePumpKqueue::MachPortWatchController::~MachPortWatchController() { StopWatchingMachPort(); } bool MessagePumpKqueue::MachPortWatchController::StopWatchingMachPort() { if (!pump_) return true; return pump_->StopWatchingMachPort(this); } void MessagePumpKqueue::MachPortWatchController::Init( WeakPtr pump, mach_port_t port, MachPortWatcher* watcher) { DCHECK(!watcher_); DCHECK(watcher); DCHECK(pump); port_ = port; watcher_ = watcher; pump_ = pump; } void MessagePumpKqueue::MachPortWatchController::Reset() { port_ = MACH_PORT_NULL; watcher_ = nullptr; pump_ = nullptr; } MessagePumpKqueue::MessagePumpKqueue() : kqueue_(kqueue()), weak_factory_(this) { PCHECK(kqueue_.is_valid()) << "kqueue"; // Create a Mach port that will be used to wake up the pump by sending // a message in response to ScheduleWork(). This is significantly faster than // using an EVFILT_USER event, especially when triggered across threads. kern_return_t kr = mach_port_allocate( mach_task_self(), MACH_PORT_RIGHT_RECEIVE, base::apple::ScopedMachReceiveRight::Receiver(wakeup_).get()); MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_allocate"; // Configure the event to directly receive the Mach message as part of the // kevent64() call. kevent64_s event{}; event.ident = wakeup_.get(); event.filter = EVFILT_MACHPORT; event.flags = EV_ADD; event.fflags = MACH_RCV_MSG; event.ext[0] = reinterpret_cast(&wakeup_buffer_); event.ext[1] = sizeof(wakeup_buffer_); int rv = ChangeOneEvent(kqueue_, &event); PCHECK(rv == 0) << "kevent64"; } MessagePumpKqueue::~MessagePumpKqueue() {} void MessagePumpKqueue::InitializeFeatures() { g_use_simplified_version.store( base::FeatureList::IsEnabled(kUseSimplifiedMessagePumpKqueueLoop), std::memory_order_relaxed); g_timer_slack.store(FeatureList::IsEnabled(kTimerSlackMac), std::memory_order_relaxed); } void MessagePumpKqueue::Run(Delegate* delegate) { AutoReset reset_keep_running(&keep_running_, true); if (g_use_simplified_version.load(std::memory_order_relaxed)) { RunSimplified(delegate); } else { while (keep_running_) { apple::ScopedNSAutoreleasePool pool; bool do_more_work = DoInternalWork(delegate, nullptr); if (!keep_running_) break; Delegate::NextWorkInfo next_work_info = delegate->DoWork(); do_more_work |= next_work_info.is_immediate(); if (!keep_running_) break; if (do_more_work) continue; do_more_work |= delegate->DoIdleWork(); if (!keep_running_) break; if (do_more_work) continue; DoInternalWork(delegate, &next_work_info); } } } void MessagePumpKqueue::RunSimplified(Delegate* delegate) { // Look for native work once before the loop starts. Without this call the // loop would break without checking native work even once in cases where // QuitWhenIdle was used. This is sometimes the case in tests. DoInternalWork(delegate, nullptr); while (keep_running_) { apple::ScopedNSAutoreleasePool pool; Delegate::NextWorkInfo next_work_info = delegate->DoWork(); if (!keep_running_) break; if (!next_work_info.is_immediate()) { delegate->DoIdleWork(); } if (!keep_running_) break; DoInternalWork(delegate, &next_work_info); } } void MessagePumpKqueue::Quit() { keep_running_ = false; ScheduleWork(); } void MessagePumpKqueue::ScheduleWork() { mach_msg_empty_send_t message{}; message.header.msgh_size = sizeof(message); message.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE); message.header.msgh_remote_port = wakeup_.get(); kern_return_t kr = mach_msg_send(&message.header); if (kr != KERN_SUCCESS) { // If ScheduleWork() is being called by other threads faster than the pump // can dispatch work, the kernel message queue for the wakeup port can fill // up (this happens under base_perftests, for example). The kernel does // return a SEND_ONCE right in the case of failure, which must be destroyed // to avoid leaking. MACH_DLOG_IF(ERROR, (kr & ~MACH_MSG_IPC_SPACE) != MACH_SEND_NO_BUFFER, kr) << "mach_msg_send"; mach_msg_destroy(&message.header); } } void MessagePumpKqueue::ScheduleDelayedWork( const Delegate::NextWorkInfo& next_work_info) { // Nothing to do. This MessagePump uses DoWork(). } bool MessagePumpKqueue::WatchMachReceivePort( mach_port_t port, MachPortWatchController* controller, MachPortWatcher* delegate) { DCHECK(port != MACH_PORT_NULL); DCHECK(controller); DCHECK(delegate); if (controller->port() != MACH_PORT_NULL) { DLOG(ERROR) << "Cannot use the same MachPortWatchController while it is active"; return false; } kevent64_s event{}; event.ident = port; event.filter = EVFILT_MACHPORT; event.flags = EV_ADD; int rv = ChangeOneEvent(kqueue_, &event); if (rv < 0) { DPLOG(ERROR) << "kevent64"; return false; } ++event_count_; controller->Init(weak_factory_.GetWeakPtr(), port, delegate); port_controllers_.AddWithID(controller, port); return true; } TimeTicks MessagePumpKqueue::AdjustDelayedRunTime(TimeTicks earliest_time, TimeTicks run_time, TimeTicks latest_time) { if (g_timer_slack.load(std::memory_order_relaxed)) { return earliest_time; } return MessagePump::AdjustDelayedRunTime(earliest_time, run_time, latest_time); } bool MessagePumpKqueue::WatchFileDescriptor(int fd, bool persistent, int mode, FdWatchController* controller, FdWatcher* delegate) { DCHECK_GE(fd, 0); DCHECK(controller); DCHECK(delegate); DCHECK_NE(mode & Mode::WATCH_READ_WRITE, 0); if (controller->fd() != -1 && controller->fd() != fd) { DLOG(ERROR) << "Cannot use the same FdWatchController on two different FDs"; return false; } StopWatchingFileDescriptor(controller); std::vector events; kevent64_s base_event{}; base_event.ident = static_cast(fd); base_event.flags = EV_ADD | (!persistent ? EV_ONESHOT : 0); if (mode & Mode::WATCH_READ) { base_event.filter = EVFILT_READ; base_event.udata = fd_controllers_.Add(controller); events.push_back(base_event); } if (mode & Mode::WATCH_WRITE) { base_event.filter = EVFILT_WRITE; base_event.udata = fd_controllers_.Add(controller); events.push_back(base_event); } int rv = HANDLE_EINTR(kevent64(kqueue_.get(), events.data(), checked_cast(events.size()), nullptr, 0, 0, nullptr)); if (rv < 0) { DPLOG(ERROR) << "WatchFileDescriptor kevent64"; return false; } event_count_ += events.size(); controller->Init(weak_factory_.GetWeakPtr(), fd, mode, delegate); return true; } void MessagePumpKqueue::SetWakeupTimerEvent(const base::TimeTicks& wakeup_time, base::TimeDelta leeway, kevent64_s* timer_event) { // The ident of the wakeup timer. There's only the one timer as the pair // (ident, filter) is the identity of the event. constexpr uint64_t kWakeupTimerIdent = 0x0; timer_event->ident = kWakeupTimerIdent; timer_event->filter = EVFILT_TIMER; if (wakeup_time == base::TimeTicks::Max()) { timer_event->flags = EV_DELETE; } else { timer_event->filter = EVFILT_TIMER; // This updates the timer if it already exists in |kqueue_|. timer_event->flags = EV_ADD | EV_ONESHOT; // Specify the sleep in microseconds to avoid undersleeping due to // numeric problems. The sleep is computed from TimeTicks::Now rather than // NextWorkInfo::recent_now because recent_now is strictly earlier than // current wall-clock. Using an earlier wall clock time to compute the // delta to the next wakeup wall-clock time would guarantee oversleep. // If wakeup_time is in the past, the delta below will be negative and the // timer is set immediately. timer_event->fflags = NOTE_USECONDS; timer_event->data = (wakeup_time - base::TimeTicks::Now()).InMicroseconds(); if (!leeway.is_zero() && g_timer_slack.load(std::memory_order_relaxed)) { // Specify slack based on |leeway|. // See "man kqueue" in recent macOSen for documentation. timer_event->fflags |= NOTE_LEEWAY; timer_event->ext[1] = static_cast(leeway.InMicroseconds()); } } } bool MessagePumpKqueue::StopWatchingMachPort( MachPortWatchController* controller) { mach_port_t port = controller->port(); controller->Reset(); port_controllers_.Remove(port); kevent64_s event{}; event.ident = port; event.filter = EVFILT_MACHPORT; event.flags = EV_DELETE; --event_count_; int rv = ChangeOneEvent(kqueue_, &event); if (rv < 0) { DPLOG(ERROR) << "kevent64"; return false; } return true; } bool MessagePumpKqueue::StopWatchingFileDescriptor( FdWatchController* controller) { int fd = controller->fd(); int mode = controller->mode(); controller->Reset(); if (fd < 0) return true; std::vector events; kevent64_s base_event{}; base_event.ident = static_cast(fd); base_event.flags = EV_DELETE; if (mode & Mode::WATCH_READ) { base_event.filter = EVFILT_READ; events.push_back(base_event); } if (mode & Mode::WATCH_WRITE) { base_event.filter = EVFILT_WRITE; events.push_back(base_event); } int rv = HANDLE_EINTR(kevent64(kqueue_.get(), events.data(), checked_cast(events.size()), nullptr, 0, 0, nullptr)); DPLOG_IF(ERROR, rv < 0) << "StopWatchingFileDescriptor kevent64"; // The keys for the IDMap aren't recorded anywhere (they're attached to the // kevent object in the kernel), so locate the entries by controller pointer. for (IDMap::iterator it(&fd_controllers_); !it.IsAtEnd(); it.Advance()) { if (it.GetCurrentValue() == controller) { fd_controllers_.Remove(it.GetCurrentKey()); } } event_count_ -= events.size(); return rv >= 0; } bool MessagePumpKqueue::DoInternalWork(Delegate* delegate, Delegate::NextWorkInfo* next_work_info) { if (events_.size() < event_count_) { events_.resize(event_count_); } bool immediate = next_work_info == nullptr; unsigned int flags = immediate ? KEVENT_FLAG_IMMEDIATE : 0; if (!immediate) { MaybeUpdateWakeupTimer(next_work_info->delayed_run_time, next_work_info->leeway); DCHECK_EQ(scheduled_wakeup_time_, next_work_info->delayed_run_time); delegate->BeforeWait(); } int rv = HANDLE_EINTR(kevent64(kqueue_.get(), nullptr, 0, events_.data(), checked_cast(events_.size()), flags, nullptr)); if (rv == 0) { // No events to dispatch so no need to call ProcessEvents(). return false; } PCHECK(rv > 0) << "kevent64"; return ProcessEvents(delegate, static_cast(rv)); } bool MessagePumpKqueue::ProcessEvents(Delegate* delegate, size_t count) { bool did_work = false; delegate->BeginNativeWorkBeforeDoWork(); for (size_t i = 0; i < count; ++i) { auto* event = &events_[i]; if (event->filter == EVFILT_READ || event->filter == EVFILT_WRITE) { did_work = true; FdWatchController* controller = fd_controllers_.Lookup(event->udata); if (!controller) { // The controller was removed by some other work callout before // this event could be processed. continue; } FdWatcher* fd_watcher = controller->watcher(); if (event->flags & EV_ONESHOT) { // If this was a one-shot event, the Controller needs to stop tracking // the descriptor, so it is not double-removed when it is told to stop // watching. controller->Reset(); fd_controllers_.Remove(event->udata); --event_count_; } auto scoped_do_work_item = delegate->BeginWorkItem(); // WatchFileDescriptor() originally upcasts event->ident from an int. if (event->filter == EVFILT_READ) { fd_watcher->OnFileCanReadWithoutBlocking( static_cast(event->ident)); } else if (event->filter == EVFILT_WRITE) { fd_watcher->OnFileCanWriteWithoutBlocking( static_cast(event->ident)); } } else if (event->filter == EVFILT_MACHPORT) { // WatchMachReceivePort() originally sets event->ident from a mach_port_t. mach_port_t port = static_cast(event->ident); if (port == wakeup_.get()) { // The wakeup event has been received, do not treat this as "doing // work", this just wakes up the pump. continue; } did_work = true; MachPortWatchController* controller = port_controllers_.Lookup(port); // The controller could have been removed by some other work callout // before this event could be processed. if (controller) { auto scoped_do_work_item = delegate->BeginWorkItem(); controller->watcher()->OnMachMessageReceived(port); } } else if (event->filter == EVFILT_TIMER) { // The wakeup timer fired. #if DCHECK_IS_ON() // On macOS 10.13 and earlier, kqueue timers may spuriously wake up. // When this happens, the timer will be re-scheduled the next time // DoInternalWork is entered, which means this doesn't lead to a // spinning wait. // When clock overrides are active, TimeTicks::Now may be decoupled from // wall-clock time, and can therefore not be used to validate whether the // expected wall-clock time has passed. if (!KqueueTimersSpuriouslyWakeUp() && !subtle::ScopedTimeClockOverrides::overrides_active()) { // Given the caveats above, assert that the timer didn't fire early. DCHECK_LE(scheduled_wakeup_time_, base::TimeTicks::Now()); } #endif DCHECK_NE(scheduled_wakeup_time_, base::TimeTicks::Max()); scheduled_wakeup_time_ = base::TimeTicks::Max(); --event_count_; } else { NOTREACHED() << "Unexpected event for filter " << event->filter; } } return did_work; } void MessagePumpKqueue::MaybeUpdateWakeupTimer( const base::TimeTicks& wakeup_time, base::TimeDelta leeway) { if (wakeup_time == scheduled_wakeup_time_) { // No change in the timer setting necessary. return; } if (wakeup_time == base::TimeTicks::Max()) { // If the timer was already reset, don't re-reset it on a suspend toggle. if (scheduled_wakeup_time_ != base::TimeTicks::Max()) { // Clear the timer. kevent64_s timer{}; SetWakeupTimerEvent(wakeup_time, leeway, &timer); int rv = ChangeOneEvent(kqueue_, &timer); PCHECK(rv == 0) << "kevent64, delete timer"; --event_count_; } } else { // Set/reset the timer. kevent64_s timer{}; SetWakeupTimerEvent(wakeup_time, leeway, &timer); int rv = ChangeOneEvent(kqueue_, &timer); PCHECK(rv == 0) << "kevent64, set timer"; // Bump the event count if we just added the timer. if (scheduled_wakeup_time_ == base::TimeTicks::Max()) ++event_count_; } scheduled_wakeup_time_ = wakeup_time; } } // namespace base