xref: /aosp_15_r20/external/cronet/base/message_loop/message_pump_libevent.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/message_loop/message_pump_libevent.h"
6 
7 #include <errno.h>
8 #include <unistd.h>
9 
10 #include <memory>
11 #include <utility>
12 
13 #include "base/auto_reset.h"
14 #include "base/compiler_specific.h"
15 #include "base/files/file_util.h"
16 #include "base/logging.h"
17 #include "base/memory/raw_ptr.h"
18 #include "base/notreached.h"
19 #include "base/posix/eintr_wrapper.h"
20 #include "base/time/time.h"
21 #include "base/trace_event/base_tracing.h"
22 #include "build/build_config.h"
23 #include "third_party/libevent/event.h"
24 
25 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
26 #include "base/message_loop/message_pump_epoll.h"
27 #endif
28 
29 // Lifecycle of struct event
30 // Libevent uses two main data structures:
31 // struct event_base (of which there is one per message pump), and
32 // struct event (of which there is roughly one per socket).
33 // The socket's struct event is created in
34 // MessagePumpLibevent::WatchFileDescriptor(),
35 // is owned by the FdWatchController, and is destroyed in
36 // StopWatchingFileDescriptor().
37 // It is moved into and out of lists in struct event_base by
38 // the libevent functions event_add() and event_del().
39 
40 namespace base {
41 
42 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
43 namespace {
44 bool g_use_epoll = true;
45 }  // namespace
46 
47 BASE_FEATURE(kMessagePumpEpoll, "MessagePumpEpoll", FEATURE_ENABLED_BY_DEFAULT);
48 #endif  // BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
49 
FdWatchController(const Location & from_here)50 MessagePumpLibevent::FdWatchController::FdWatchController(
51     const Location& from_here)
52     : FdWatchControllerInterface(from_here) {}
53 
~FdWatchController()54 MessagePumpLibevent::FdWatchController::~FdWatchController() {
55   CHECK(StopWatchingFileDescriptor());
56   if (was_destroyed_) {
57     DCHECK(!*was_destroyed_);
58     *was_destroyed_ = true;
59   }
60 }
61 
StopWatchingFileDescriptor()62 bool MessagePumpLibevent::FdWatchController::StopWatchingFileDescriptor() {
63   watcher_ = nullptr;
64 
65   std::unique_ptr<event> e = ReleaseEvent();
66   if (e) {
67     // event_del() is a no-op if the event isn't active.
68     int rv = event_del(e.get());
69     libevent_pump_ = nullptr;
70     return (rv == 0);
71   }
72 
73 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
74   if (epoll_interest_ && epoll_pump_) {
75     epoll_pump_->UnregisterInterest(epoll_interest_);
76     epoll_interest_.reset();
77     epoll_pump_.reset();
78   }
79 #endif
80 
81   return true;
82 }
83 
Init(std::unique_ptr<event> e)84 void MessagePumpLibevent::FdWatchController::Init(std::unique_ptr<event> e) {
85   DCHECK(e);
86   DCHECK(!event_);
87 
88   event_ = std::move(e);
89 }
90 
ReleaseEvent()91 std::unique_ptr<event> MessagePumpLibevent::FdWatchController::ReleaseEvent() {
92   return std::move(event_);
93 }
94 
OnFileCanReadWithoutBlocking(int fd,MessagePumpLibevent * pump)95 void MessagePumpLibevent::FdWatchController::OnFileCanReadWithoutBlocking(
96     int fd,
97     MessagePumpLibevent* pump) {
98   // Since OnFileCanWriteWithoutBlocking() gets called first, it can stop
99   // watching the file descriptor.
100   if (!watcher_)
101     return;
102   watcher_->OnFileCanReadWithoutBlocking(fd);
103 }
104 
OnFileCanWriteWithoutBlocking(int fd,MessagePumpLibevent * pump)105 void MessagePumpLibevent::FdWatchController::OnFileCanWriteWithoutBlocking(
106     int fd,
107     MessagePumpLibevent* pump) {
108   DCHECK(watcher_);
109   watcher_->OnFileCanWriteWithoutBlocking(fd);
110 }
111 
112 const scoped_refptr<MessagePumpLibevent::EpollInterest>&
AssignEpollInterest(const EpollInterestParams & params)113 MessagePumpLibevent::FdWatchController::AssignEpollInterest(
114     const EpollInterestParams& params) {
115   epoll_interest_ = MakeRefCounted<EpollInterest>(this, params);
116   return epoll_interest_;
117 }
118 
OnFdReadable()119 void MessagePumpLibevent::FdWatchController::OnFdReadable() {
120   if (!watcher_) {
121     // When a watcher is watching both read and write and both are possible, the
122     // pump will call OnFdWritable() first, followed by OnFdReadable(). But
123     // OnFdWritable() may stop or destroy the watch. If the watch is destroyed,
124     // the pump will not call OnFdReadable() at all, but if it's merely stopped,
125     // OnFdReadable() will be called while `watcher_` is  null. In this case we
126     // don't actually want to call the client.
127     return;
128   }
129   watcher_->OnFileCanReadWithoutBlocking(epoll_interest_->params().fd);
130 }
131 
OnFdWritable()132 void MessagePumpLibevent::FdWatchController::OnFdWritable() {
133   DCHECK(watcher_);
134   watcher_->OnFileCanWriteWithoutBlocking(epoll_interest_->params().fd);
135 }
136 
MessagePumpLibevent()137 MessagePumpLibevent::MessagePumpLibevent() {
138 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
139   if (g_use_epoll) {
140     epoll_pump_ = std::make_unique<MessagePumpEpoll>();
141     return;
142   }
143 #endif
144 
145   if (!Init())
146     NOTREACHED();
147   DCHECK_NE(wakeup_pipe_in_, -1);
148   DCHECK_NE(wakeup_pipe_out_, -1);
149   DCHECK(wakeup_event_);
150 }
151 
~MessagePumpLibevent()152 MessagePumpLibevent::~MessagePumpLibevent() {
153 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
154   const bool using_libevent = !epoll_pump_;
155 #else
156   const bool using_libevent = true;
157 #endif
158 
159   DCHECK(event_base_);
160   if (using_libevent) {
161     DCHECK(wakeup_event_);
162     event_del(wakeup_event_.get());
163     wakeup_event_.reset();
164     if (wakeup_pipe_in_ >= 0) {
165       if (IGNORE_EINTR(close(wakeup_pipe_in_)) < 0)
166         DPLOG(ERROR) << "close";
167     }
168     if (wakeup_pipe_out_ >= 0) {
169       if (IGNORE_EINTR(close(wakeup_pipe_out_)) < 0)
170         DPLOG(ERROR) << "close";
171     }
172   }
173   event_base_.reset();
174 }
175 
176 // Must be called early in process startup, but after FeatureList
177 // initialization. This allows MessagePumpLibevent to query and cache the
178 // enabled state of any relevant features.
179 // static
InitializeFeatures()180 void MessagePumpLibevent::InitializeFeatures() {
181 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
182   g_use_epoll = FeatureList::IsEnabled(kMessagePumpEpoll);
183 #endif
184 }
185 
WatchFileDescriptor(int fd,bool persistent,int mode,FdWatchController * controller,FdWatcher * delegate)186 bool MessagePumpLibevent::WatchFileDescriptor(int fd,
187                                               bool persistent,
188                                               int mode,
189                                               FdWatchController* controller,
190                                               FdWatcher* delegate) {
191 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
192   if (epoll_pump_) {
193     return epoll_pump_->WatchFileDescriptor(fd, persistent, mode, controller,
194                                             delegate);
195   }
196 #endif
197 
198   TRACE_EVENT("base", "MessagePumpLibevent::WatchFileDescriptor", "fd", fd,
199               "persistent", persistent, "watch_read", mode & WATCH_READ,
200               "watch_write", mode & WATCH_WRITE);
201   DCHECK_GE(fd, 0);
202   DCHECK(controller);
203   DCHECK(delegate);
204   DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
205   // WatchFileDescriptor should be called on the pump thread. It is not
206   // threadsafe, and your watcher may never be registered.
207   DCHECK(watch_file_descriptor_caller_checker_.CalledOnValidThread());
208 
209   short event_mask = persistent ? EV_PERSIST : 0;
210   if (mode & WATCH_READ) {
211     event_mask |= EV_READ;
212   }
213   if (mode & WATCH_WRITE) {
214     event_mask |= EV_WRITE;
215   }
216 
217   std::unique_ptr<event> evt(controller->ReleaseEvent());
218   if (!evt) {
219     // Ownership is transferred to the controller.
220     evt = std::make_unique<event>();
221   } else {
222     // Make sure we don't pick up any funky internal libevent masks.
223     int old_interest_mask = evt->ev_events & (EV_READ | EV_WRITE | EV_PERSIST);
224 
225     // Combine old/new event masks.
226     event_mask |= old_interest_mask;
227 
228     // Must disarm the event before we can reuse it.
229     event_del(evt.get());
230 
231     // It's illegal to use this function to listen on 2 separate fds with the
232     // same |controller|.
233     if (EVENT_FD(evt.get()) != fd) {
234       NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd;
235       return false;
236     }
237   }
238 
239   // Set current interest mask and message pump for this event.
240   event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller);
241 
242   // Tell libevent which message pump this socket will belong to when we add it.
243   if (event_base_set(event_base_.get(), evt.get())) {
244     DPLOG(ERROR) << "event_base_set(fd=" << EVENT_FD(evt.get()) << ")";
245     return false;
246   }
247 
248   // Add this socket to the list of monitored sockets.
249   if (event_add(evt.get(), nullptr)) {
250     DPLOG(ERROR) << "event_add failed(fd=" << EVENT_FD(evt.get()) << ")";
251     return false;
252   }
253 
254   controller->Init(std::move(evt));
255   controller->set_watcher(delegate);
256   controller->set_libevent_pump(this);
257   return true;
258 }
259 
260 // Tell libevent to break out of inner loop.
timer_callback(int fd,short events,void * context)261 static void timer_callback(int fd, short events, void* context) {
262   event_base_loopbreak((struct event_base*)context);
263 }
264 
265 // Reentrant!
Run(Delegate * delegate)266 void MessagePumpLibevent::Run(Delegate* delegate) {
267 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
268   if (epoll_pump_) {
269     epoll_pump_->Run(delegate);
270     return;
271   }
272 #endif
273 
274   RunState run_state(delegate);
275   AutoReset<raw_ptr<RunState>> auto_reset_run_state(&run_state_, &run_state);
276 
277   // event_base_loopexit() + EVLOOP_ONCE is leaky, see http://crbug.com/25641.
278   // Instead, make our own timer and reuse it on each call to event_base_loop().
279   std::unique_ptr<event> timer_event(new event);
280 
281   for (;;) {
282     // Do some work and see if the next task is ready right away.
283     Delegate::NextWorkInfo next_work_info = delegate->DoWork();
284     bool immediate_work_available = next_work_info.is_immediate();
285 
286     if (run_state.should_quit)
287       break;
288 
289     // Process native events if any are ready. Do not block waiting for more. Do
290     // not instantiate a ScopedDoWorkItem for this call as:
291     //  - This most often ends up calling OnLibeventNotification() below which
292     //    already instantiates a ScopedDoWorkItem (and doing so twice would
293     //    incorrectly appear as nested work).
294     //  - "ThreadController active" is already up per the above DoWork() so this
295     //    would only be about detecting #work-in-work-implies-nested
296     //    (ref. thread_controller.h).
297     //  - This can result in the same work as the
298     //    event_base_loop(event_base_, EVLOOP_ONCE) call at the end of this
299     //    method and that call definitely can't be in a ScopedDoWorkItem as
300     //    it includes sleep.
301     //  - The only downside is that, if a native work item other than
302     //    OnLibeventNotification() did enter a nested loop from here, it
303     //    wouldn't be labeled as such in tracing by "ThreadController active".
304     //    Contact gab@/scheduler-dev@ if a problematic trace emerges.
305     event_base_loop(event_base_.get(), EVLOOP_NONBLOCK);
306 
307     bool attempt_more_work = immediate_work_available || processed_io_events_;
308     processed_io_events_ = false;
309 
310     if (run_state.should_quit)
311       break;
312 
313     if (attempt_more_work)
314       continue;
315 
316     attempt_more_work = delegate->DoIdleWork();
317 
318     if (run_state.should_quit)
319       break;
320 
321     if (attempt_more_work)
322       continue;
323 
324     bool did_set_timer = false;
325 
326     // If there is delayed work.
327     DCHECK(!next_work_info.delayed_run_time.is_null());
328     if (!next_work_info.delayed_run_time.is_max()) {
329       const TimeDelta delay = next_work_info.remaining_delay();
330 
331       // Setup a timer to break out of the event loop at the right time.
332       struct timeval poll_tv;
333       poll_tv.tv_sec = static_cast<time_t>(delay.InSeconds());
334       poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
335       event_set(timer_event.get(), -1, 0, timer_callback, event_base_.get());
336       event_base_set(event_base_.get(), timer_event.get());
337       event_add(timer_event.get(), &poll_tv);
338 
339       did_set_timer = true;
340     }
341 
342     // Block waiting for events and process all available upon waking up. This
343     // is conditionally interrupted to look for more work if we are aware of a
344     // delayed task that will need servicing.
345     delegate->BeforeWait();
346     event_base_loop(event_base_.get(), EVLOOP_ONCE);
347 
348     // We previously setup a timer to break out the event loop to look for more
349     // work. Now that we're here delete the event.
350     if (did_set_timer) {
351       event_del(timer_event.get());
352     }
353 
354     if (run_state.should_quit)
355       break;
356   }
357 }
358 
Quit()359 void MessagePumpLibevent::Quit() {
360 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
361   if (epoll_pump_) {
362     epoll_pump_->Quit();
363     return;
364   }
365 #endif
366 
367   DCHECK(run_state_) << "Quit was called outside of Run!";
368   // Tell both libevent and Run that they should break out of their loops.
369   run_state_->should_quit = true;
370   ScheduleWork();
371 }
372 
ScheduleWork()373 void MessagePumpLibevent::ScheduleWork() {
374 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
375   if (epoll_pump_) {
376     epoll_pump_->ScheduleWork();
377     return;
378   }
379 #endif
380 
381   // Tell libevent (in a threadsafe way) that it should break out of its loop.
382   char buf = 0;
383   long nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));
384   DPCHECK(nwrite == 1 || errno == EAGAIN) << "nwrite:" << nwrite;
385 }
386 
ScheduleDelayedWork(const Delegate::NextWorkInfo & next_work_info)387 void MessagePumpLibevent::ScheduleDelayedWork(
388     const Delegate::NextWorkInfo& next_work_info) {
389   // When using libevent we know that we can't be blocked on Run()'s
390   // `timer_event` right now since this method can only be called on the same
391   // thread as Run(). When using epoll, the pump clearly must be in between
392   // waits if we're here. In either case, any scheduled work will be seen prior
393   // to the next libevent loop or epoll wait, so there's nothing to do here.
394 }
395 
Init()396 bool MessagePumpLibevent::Init() {
397   int fds[2];
398   if (!CreateLocalNonBlockingPipe(fds)) {
399     DPLOG(ERROR) << "pipe creation failed";
400     return false;
401   }
402   wakeup_pipe_out_ = fds[0];
403   wakeup_pipe_in_ = fds[1];
404 
405   wakeup_event_ = std::make_unique<event>();
406   event_set(wakeup_event_.get(), wakeup_pipe_out_, EV_READ | EV_PERSIST,
407             OnWakeup, this);
408   event_base_set(event_base_.get(), wakeup_event_.get());
409 
410   if (event_add(wakeup_event_.get(), nullptr))
411     return false;
412   return true;
413 }
414 
415 // static
OnLibeventNotification(int fd,short flags,void * context)416 void MessagePumpLibevent::OnLibeventNotification(int fd,
417                                                  short flags,
418                                                  void* context) {
419   FdWatchController* controller = static_cast<FdWatchController*>(context);
420   DCHECK(controller);
421 
422   MessagePumpLibevent* pump = controller->libevent_pump();
423   pump->processed_io_events_ = true;
424 
425   // Make the MessagePumpDelegate aware of this other form of "DoWork". Skip if
426   // OnLibeventNotification is called outside of Run() (e.g. in unit tests).
427   Delegate::ScopedDoWorkItem scoped_do_work_item;
428   if (pump->run_state_)
429     scoped_do_work_item = pump->run_state_->delegate->BeginWorkItem();
430 
431   // Trace events must begin after the above BeginWorkItem() so that the
432   // ensuing "ThreadController active" outscopes all the events under it.
433   TRACE_EVENT("toplevel", "OnLibevent", "controller_created_from",
434               controller->created_from_location(), "fd", fd, "flags", flags,
435               "context", context);
436   TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION heap_profiler_scope(
437       controller->created_from_location().file_name());
438 
439   if ((flags & (EV_READ | EV_WRITE)) == (EV_READ | EV_WRITE)) {
440     // Both callbacks will be called. It is necessary to check that |controller|
441     // is not destroyed.
442     bool controller_was_destroyed = false;
443     controller->was_destroyed_ = &controller_was_destroyed;
444     controller->OnFileCanWriteWithoutBlocking(fd, pump);
445     if (!controller_was_destroyed)
446       controller->OnFileCanReadWithoutBlocking(fd, pump);
447     if (!controller_was_destroyed)
448       controller->was_destroyed_ = nullptr;
449   } else if (flags & EV_WRITE) {
450     controller->OnFileCanWriteWithoutBlocking(fd, pump);
451   } else if (flags & EV_READ) {
452     controller->OnFileCanReadWithoutBlocking(fd, pump);
453   }
454 }
455 
456 // Called if a byte is received on the wakeup pipe.
457 // static
OnWakeup(int socket,short flags,void * context)458 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
459   TRACE_EVENT(TRACE_DISABLED_BY_DEFAULT("base"),
460               "MessagePumpLibevent::OnWakeup", "socket", socket, "flags", flags,
461               "context", context);
462   MessagePumpLibevent* that = static_cast<MessagePumpLibevent*>(context);
463   DCHECK(that->wakeup_pipe_out_ == socket);
464 
465   // Remove and discard the wakeup byte.
466   char buf;
467   long nread = HANDLE_EINTR(read(socket, &buf, 1));
468   DCHECK_EQ(nread, 1);
469   that->processed_io_events_ = true;
470   // Tell libevent to break out of inner loop.
471   event_base_loopbreak(that->event_base_.get());
472 }
473 
EpollInterest(FdWatchController * controller,const EpollInterestParams & params)474 MessagePumpLibevent::EpollInterest::EpollInterest(
475     FdWatchController* controller,
476     const EpollInterestParams& params)
477     : controller_(controller), params_(params) {}
478 
479 MessagePumpLibevent::EpollInterest::~EpollInterest() = default;
480 
481 }  // namespace base
482