1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/message_loop/message_pump_libevent.h"
6
7 #include <errno.h>
8 #include <unistd.h>
9
10 #include <memory>
11 #include <utility>
12
13 #include "base/auto_reset.h"
14 #include "base/compiler_specific.h"
15 #include "base/files/file_util.h"
16 #include "base/logging.h"
17 #include "base/memory/raw_ptr.h"
18 #include "base/notreached.h"
19 #include "base/posix/eintr_wrapper.h"
20 #include "base/time/time.h"
21 #include "base/trace_event/base_tracing.h"
22 #include "build/build_config.h"
23 #include "third_party/libevent/event.h"
24
25 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
26 #include "base/message_loop/message_pump_epoll.h"
27 #endif
28
29 // Lifecycle of struct event
30 // Libevent uses two main data structures:
31 // struct event_base (of which there is one per message pump), and
32 // struct event (of which there is roughly one per socket).
33 // The socket's struct event is created in
34 // MessagePumpLibevent::WatchFileDescriptor(),
35 // is owned by the FdWatchController, and is destroyed in
36 // StopWatchingFileDescriptor().
37 // It is moved into and out of lists in struct event_base by
38 // the libevent functions event_add() and event_del().
39
40 namespace base {
41
42 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
43 namespace {
44 bool g_use_epoll = true;
45 } // namespace
46
47 BASE_FEATURE(kMessagePumpEpoll, "MessagePumpEpoll", FEATURE_ENABLED_BY_DEFAULT);
48 #endif // BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
49
FdWatchController(const Location & from_here)50 MessagePumpLibevent::FdWatchController::FdWatchController(
51 const Location& from_here)
52 : FdWatchControllerInterface(from_here) {}
53
~FdWatchController()54 MessagePumpLibevent::FdWatchController::~FdWatchController() {
55 CHECK(StopWatchingFileDescriptor());
56 if (was_destroyed_) {
57 DCHECK(!*was_destroyed_);
58 *was_destroyed_ = true;
59 }
60 }
61
StopWatchingFileDescriptor()62 bool MessagePumpLibevent::FdWatchController::StopWatchingFileDescriptor() {
63 watcher_ = nullptr;
64
65 std::unique_ptr<event> e = ReleaseEvent();
66 if (e) {
67 // event_del() is a no-op if the event isn't active.
68 int rv = event_del(e.get());
69 libevent_pump_ = nullptr;
70 return (rv == 0);
71 }
72
73 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
74 if (epoll_interest_ && epoll_pump_) {
75 epoll_pump_->UnregisterInterest(epoll_interest_);
76 epoll_interest_.reset();
77 epoll_pump_.reset();
78 }
79 #endif
80
81 return true;
82 }
83
Init(std::unique_ptr<event> e)84 void MessagePumpLibevent::FdWatchController::Init(std::unique_ptr<event> e) {
85 DCHECK(e);
86 DCHECK(!event_);
87
88 event_ = std::move(e);
89 }
90
ReleaseEvent()91 std::unique_ptr<event> MessagePumpLibevent::FdWatchController::ReleaseEvent() {
92 return std::move(event_);
93 }
94
OnFileCanReadWithoutBlocking(int fd,MessagePumpLibevent * pump)95 void MessagePumpLibevent::FdWatchController::OnFileCanReadWithoutBlocking(
96 int fd,
97 MessagePumpLibevent* pump) {
98 // Since OnFileCanWriteWithoutBlocking() gets called first, it can stop
99 // watching the file descriptor.
100 if (!watcher_)
101 return;
102 watcher_->OnFileCanReadWithoutBlocking(fd);
103 }
104
OnFileCanWriteWithoutBlocking(int fd,MessagePumpLibevent * pump)105 void MessagePumpLibevent::FdWatchController::OnFileCanWriteWithoutBlocking(
106 int fd,
107 MessagePumpLibevent* pump) {
108 DCHECK(watcher_);
109 watcher_->OnFileCanWriteWithoutBlocking(fd);
110 }
111
112 const scoped_refptr<MessagePumpLibevent::EpollInterest>&
AssignEpollInterest(const EpollInterestParams & params)113 MessagePumpLibevent::FdWatchController::AssignEpollInterest(
114 const EpollInterestParams& params) {
115 epoll_interest_ = MakeRefCounted<EpollInterest>(this, params);
116 return epoll_interest_;
117 }
118
OnFdReadable()119 void MessagePumpLibevent::FdWatchController::OnFdReadable() {
120 if (!watcher_) {
121 // When a watcher is watching both read and write and both are possible, the
122 // pump will call OnFdWritable() first, followed by OnFdReadable(). But
123 // OnFdWritable() may stop or destroy the watch. If the watch is destroyed,
124 // the pump will not call OnFdReadable() at all, but if it's merely stopped,
125 // OnFdReadable() will be called while `watcher_` is null. In this case we
126 // don't actually want to call the client.
127 return;
128 }
129 watcher_->OnFileCanReadWithoutBlocking(epoll_interest_->params().fd);
130 }
131
OnFdWritable()132 void MessagePumpLibevent::FdWatchController::OnFdWritable() {
133 DCHECK(watcher_);
134 watcher_->OnFileCanWriteWithoutBlocking(epoll_interest_->params().fd);
135 }
136
MessagePumpLibevent()137 MessagePumpLibevent::MessagePumpLibevent() {
138 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
139 if (g_use_epoll) {
140 epoll_pump_ = std::make_unique<MessagePumpEpoll>();
141 return;
142 }
143 #endif
144
145 if (!Init())
146 NOTREACHED();
147 DCHECK_NE(wakeup_pipe_in_, -1);
148 DCHECK_NE(wakeup_pipe_out_, -1);
149 DCHECK(wakeup_event_);
150 }
151
~MessagePumpLibevent()152 MessagePumpLibevent::~MessagePumpLibevent() {
153 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
154 const bool using_libevent = !epoll_pump_;
155 #else
156 const bool using_libevent = true;
157 #endif
158
159 DCHECK(event_base_);
160 if (using_libevent) {
161 DCHECK(wakeup_event_);
162 event_del(wakeup_event_.get());
163 wakeup_event_.reset();
164 if (wakeup_pipe_in_ >= 0) {
165 if (IGNORE_EINTR(close(wakeup_pipe_in_)) < 0)
166 DPLOG(ERROR) << "close";
167 }
168 if (wakeup_pipe_out_ >= 0) {
169 if (IGNORE_EINTR(close(wakeup_pipe_out_)) < 0)
170 DPLOG(ERROR) << "close";
171 }
172 }
173 event_base_.reset();
174 }
175
176 // Must be called early in process startup, but after FeatureList
177 // initialization. This allows MessagePumpLibevent to query and cache the
178 // enabled state of any relevant features.
179 // static
InitializeFeatures()180 void MessagePumpLibevent::InitializeFeatures() {
181 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
182 g_use_epoll = FeatureList::IsEnabled(kMessagePumpEpoll);
183 #endif
184 }
185
WatchFileDescriptor(int fd,bool persistent,int mode,FdWatchController * controller,FdWatcher * delegate)186 bool MessagePumpLibevent::WatchFileDescriptor(int fd,
187 bool persistent,
188 int mode,
189 FdWatchController* controller,
190 FdWatcher* delegate) {
191 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
192 if (epoll_pump_) {
193 return epoll_pump_->WatchFileDescriptor(fd, persistent, mode, controller,
194 delegate);
195 }
196 #endif
197
198 TRACE_EVENT("base", "MessagePumpLibevent::WatchFileDescriptor", "fd", fd,
199 "persistent", persistent, "watch_read", mode & WATCH_READ,
200 "watch_write", mode & WATCH_WRITE);
201 DCHECK_GE(fd, 0);
202 DCHECK(controller);
203 DCHECK(delegate);
204 DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
205 // WatchFileDescriptor should be called on the pump thread. It is not
206 // threadsafe, and your watcher may never be registered.
207 DCHECK(watch_file_descriptor_caller_checker_.CalledOnValidThread());
208
209 short event_mask = persistent ? EV_PERSIST : 0;
210 if (mode & WATCH_READ) {
211 event_mask |= EV_READ;
212 }
213 if (mode & WATCH_WRITE) {
214 event_mask |= EV_WRITE;
215 }
216
217 std::unique_ptr<event> evt(controller->ReleaseEvent());
218 if (!evt) {
219 // Ownership is transferred to the controller.
220 evt = std::make_unique<event>();
221 } else {
222 // Make sure we don't pick up any funky internal libevent masks.
223 int old_interest_mask = evt->ev_events & (EV_READ | EV_WRITE | EV_PERSIST);
224
225 // Combine old/new event masks.
226 event_mask |= old_interest_mask;
227
228 // Must disarm the event before we can reuse it.
229 event_del(evt.get());
230
231 // It's illegal to use this function to listen on 2 separate fds with the
232 // same |controller|.
233 if (EVENT_FD(evt.get()) != fd) {
234 NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd;
235 return false;
236 }
237 }
238
239 // Set current interest mask and message pump for this event.
240 event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller);
241
242 // Tell libevent which message pump this socket will belong to when we add it.
243 if (event_base_set(event_base_.get(), evt.get())) {
244 DPLOG(ERROR) << "event_base_set(fd=" << EVENT_FD(evt.get()) << ")";
245 return false;
246 }
247
248 // Add this socket to the list of monitored sockets.
249 if (event_add(evt.get(), nullptr)) {
250 DPLOG(ERROR) << "event_add failed(fd=" << EVENT_FD(evt.get()) << ")";
251 return false;
252 }
253
254 controller->Init(std::move(evt));
255 controller->set_watcher(delegate);
256 controller->set_libevent_pump(this);
257 return true;
258 }
259
260 // Tell libevent to break out of inner loop.
timer_callback(int fd,short events,void * context)261 static void timer_callback(int fd, short events, void* context) {
262 event_base_loopbreak((struct event_base*)context);
263 }
264
265 // Reentrant!
Run(Delegate * delegate)266 void MessagePumpLibevent::Run(Delegate* delegate) {
267 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
268 if (epoll_pump_) {
269 epoll_pump_->Run(delegate);
270 return;
271 }
272 #endif
273
274 RunState run_state(delegate);
275 AutoReset<raw_ptr<RunState>> auto_reset_run_state(&run_state_, &run_state);
276
277 // event_base_loopexit() + EVLOOP_ONCE is leaky, see http://crbug.com/25641.
278 // Instead, make our own timer and reuse it on each call to event_base_loop().
279 std::unique_ptr<event> timer_event(new event);
280
281 for (;;) {
282 // Do some work and see if the next task is ready right away.
283 Delegate::NextWorkInfo next_work_info = delegate->DoWork();
284 bool immediate_work_available = next_work_info.is_immediate();
285
286 if (run_state.should_quit)
287 break;
288
289 // Process native events if any are ready. Do not block waiting for more. Do
290 // not instantiate a ScopedDoWorkItem for this call as:
291 // - This most often ends up calling OnLibeventNotification() below which
292 // already instantiates a ScopedDoWorkItem (and doing so twice would
293 // incorrectly appear as nested work).
294 // - "ThreadController active" is already up per the above DoWork() so this
295 // would only be about detecting #work-in-work-implies-nested
296 // (ref. thread_controller.h).
297 // - This can result in the same work as the
298 // event_base_loop(event_base_, EVLOOP_ONCE) call at the end of this
299 // method and that call definitely can't be in a ScopedDoWorkItem as
300 // it includes sleep.
301 // - The only downside is that, if a native work item other than
302 // OnLibeventNotification() did enter a nested loop from here, it
303 // wouldn't be labeled as such in tracing by "ThreadController active".
304 // Contact gab@/scheduler-dev@ if a problematic trace emerges.
305 event_base_loop(event_base_.get(), EVLOOP_NONBLOCK);
306
307 bool attempt_more_work = immediate_work_available || processed_io_events_;
308 processed_io_events_ = false;
309
310 if (run_state.should_quit)
311 break;
312
313 if (attempt_more_work)
314 continue;
315
316 attempt_more_work = delegate->DoIdleWork();
317
318 if (run_state.should_quit)
319 break;
320
321 if (attempt_more_work)
322 continue;
323
324 bool did_set_timer = false;
325
326 // If there is delayed work.
327 DCHECK(!next_work_info.delayed_run_time.is_null());
328 if (!next_work_info.delayed_run_time.is_max()) {
329 const TimeDelta delay = next_work_info.remaining_delay();
330
331 // Setup a timer to break out of the event loop at the right time.
332 struct timeval poll_tv;
333 poll_tv.tv_sec = static_cast<time_t>(delay.InSeconds());
334 poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
335 event_set(timer_event.get(), -1, 0, timer_callback, event_base_.get());
336 event_base_set(event_base_.get(), timer_event.get());
337 event_add(timer_event.get(), &poll_tv);
338
339 did_set_timer = true;
340 }
341
342 // Block waiting for events and process all available upon waking up. This
343 // is conditionally interrupted to look for more work if we are aware of a
344 // delayed task that will need servicing.
345 delegate->BeforeWait();
346 event_base_loop(event_base_.get(), EVLOOP_ONCE);
347
348 // We previously setup a timer to break out the event loop to look for more
349 // work. Now that we're here delete the event.
350 if (did_set_timer) {
351 event_del(timer_event.get());
352 }
353
354 if (run_state.should_quit)
355 break;
356 }
357 }
358
Quit()359 void MessagePumpLibevent::Quit() {
360 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
361 if (epoll_pump_) {
362 epoll_pump_->Quit();
363 return;
364 }
365 #endif
366
367 DCHECK(run_state_) << "Quit was called outside of Run!";
368 // Tell both libevent and Run that they should break out of their loops.
369 run_state_->should_quit = true;
370 ScheduleWork();
371 }
372
ScheduleWork()373 void MessagePumpLibevent::ScheduleWork() {
374 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
375 if (epoll_pump_) {
376 epoll_pump_->ScheduleWork();
377 return;
378 }
379 #endif
380
381 // Tell libevent (in a threadsafe way) that it should break out of its loop.
382 char buf = 0;
383 long nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));
384 DPCHECK(nwrite == 1 || errno == EAGAIN) << "nwrite:" << nwrite;
385 }
386
ScheduleDelayedWork(const Delegate::NextWorkInfo & next_work_info)387 void MessagePumpLibevent::ScheduleDelayedWork(
388 const Delegate::NextWorkInfo& next_work_info) {
389 // When using libevent we know that we can't be blocked on Run()'s
390 // `timer_event` right now since this method can only be called on the same
391 // thread as Run(). When using epoll, the pump clearly must be in between
392 // waits if we're here. In either case, any scheduled work will be seen prior
393 // to the next libevent loop or epoll wait, so there's nothing to do here.
394 }
395
Init()396 bool MessagePumpLibevent::Init() {
397 int fds[2];
398 if (!CreateLocalNonBlockingPipe(fds)) {
399 DPLOG(ERROR) << "pipe creation failed";
400 return false;
401 }
402 wakeup_pipe_out_ = fds[0];
403 wakeup_pipe_in_ = fds[1];
404
405 wakeup_event_ = std::make_unique<event>();
406 event_set(wakeup_event_.get(), wakeup_pipe_out_, EV_READ | EV_PERSIST,
407 OnWakeup, this);
408 event_base_set(event_base_.get(), wakeup_event_.get());
409
410 if (event_add(wakeup_event_.get(), nullptr))
411 return false;
412 return true;
413 }
414
415 // static
OnLibeventNotification(int fd,short flags,void * context)416 void MessagePumpLibevent::OnLibeventNotification(int fd,
417 short flags,
418 void* context) {
419 FdWatchController* controller = static_cast<FdWatchController*>(context);
420 DCHECK(controller);
421
422 MessagePumpLibevent* pump = controller->libevent_pump();
423 pump->processed_io_events_ = true;
424
425 // Make the MessagePumpDelegate aware of this other form of "DoWork". Skip if
426 // OnLibeventNotification is called outside of Run() (e.g. in unit tests).
427 Delegate::ScopedDoWorkItem scoped_do_work_item;
428 if (pump->run_state_)
429 scoped_do_work_item = pump->run_state_->delegate->BeginWorkItem();
430
431 // Trace events must begin after the above BeginWorkItem() so that the
432 // ensuing "ThreadController active" outscopes all the events under it.
433 TRACE_EVENT("toplevel", "OnLibevent", "controller_created_from",
434 controller->created_from_location(), "fd", fd, "flags", flags,
435 "context", context);
436 TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION heap_profiler_scope(
437 controller->created_from_location().file_name());
438
439 if ((flags & (EV_READ | EV_WRITE)) == (EV_READ | EV_WRITE)) {
440 // Both callbacks will be called. It is necessary to check that |controller|
441 // is not destroyed.
442 bool controller_was_destroyed = false;
443 controller->was_destroyed_ = &controller_was_destroyed;
444 controller->OnFileCanWriteWithoutBlocking(fd, pump);
445 if (!controller_was_destroyed)
446 controller->OnFileCanReadWithoutBlocking(fd, pump);
447 if (!controller_was_destroyed)
448 controller->was_destroyed_ = nullptr;
449 } else if (flags & EV_WRITE) {
450 controller->OnFileCanWriteWithoutBlocking(fd, pump);
451 } else if (flags & EV_READ) {
452 controller->OnFileCanReadWithoutBlocking(fd, pump);
453 }
454 }
455
456 // Called if a byte is received on the wakeup pipe.
457 // static
OnWakeup(int socket,short flags,void * context)458 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
459 TRACE_EVENT(TRACE_DISABLED_BY_DEFAULT("base"),
460 "MessagePumpLibevent::OnWakeup", "socket", socket, "flags", flags,
461 "context", context);
462 MessagePumpLibevent* that = static_cast<MessagePumpLibevent*>(context);
463 DCHECK(that->wakeup_pipe_out_ == socket);
464
465 // Remove and discard the wakeup byte.
466 char buf;
467 long nread = HANDLE_EINTR(read(socket, &buf, 1));
468 DCHECK_EQ(nread, 1);
469 that->processed_io_events_ = true;
470 // Tell libevent to break out of inner loop.
471 event_base_loopbreak(that->event_base_.get());
472 }
473
EpollInterest(FdWatchController * controller,const EpollInterestParams & params)474 MessagePumpLibevent::EpollInterest::EpollInterest(
475 FdWatchController* controller,
476 const EpollInterestParams& params)
477 : controller_(controller), params_(params) {}
478
479 MessagePumpLibevent::EpollInterest::~EpollInterest() = default;
480
481 } // namespace base
482