1*d9f75844SAndroid Build Coastguard Worker /*
2*d9f75844SAndroid Build Coastguard Worker * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3*d9f75844SAndroid Build Coastguard Worker *
4*d9f75844SAndroid Build Coastguard Worker * Use of this source code is governed by a BSD-style license
5*d9f75844SAndroid Build Coastguard Worker * that can be found in the LICENSE file in the root of the source
6*d9f75844SAndroid Build Coastguard Worker * tree. An additional intellectual property rights grant can be found
7*d9f75844SAndroid Build Coastguard Worker * in the file PATENTS. All contributing project authors may
8*d9f75844SAndroid Build Coastguard Worker * be found in the AUTHORS file in the root of the source tree.
9*d9f75844SAndroid Build Coastguard Worker */
10*d9f75844SAndroid Build Coastguard Worker
11*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/task_queue_libevent.h"
12*d9f75844SAndroid Build Coastguard Worker
13*d9f75844SAndroid Build Coastguard Worker #include <errno.h>
14*d9f75844SAndroid Build Coastguard Worker #include <fcntl.h>
15*d9f75844SAndroid Build Coastguard Worker #include <pthread.h>
16*d9f75844SAndroid Build Coastguard Worker #include <signal.h>
17*d9f75844SAndroid Build Coastguard Worker #include <stdint.h>
18*d9f75844SAndroid Build Coastguard Worker #include <time.h>
19*d9f75844SAndroid Build Coastguard Worker #include <unistd.h>
20*d9f75844SAndroid Build Coastguard Worker
21*d9f75844SAndroid Build Coastguard Worker #include <list>
22*d9f75844SAndroid Build Coastguard Worker #include <memory>
23*d9f75844SAndroid Build Coastguard Worker #include <type_traits>
24*d9f75844SAndroid Build Coastguard Worker #include <utility>
25*d9f75844SAndroid Build Coastguard Worker
26*d9f75844SAndroid Build Coastguard Worker #include "absl/container/inlined_vector.h"
27*d9f75844SAndroid Build Coastguard Worker #include "absl/functional/any_invocable.h"
28*d9f75844SAndroid Build Coastguard Worker #include "absl/strings/string_view.h"
29*d9f75844SAndroid Build Coastguard Worker #include "api/task_queue/task_queue_base.h"
30*d9f75844SAndroid Build Coastguard Worker #include "api/units/time_delta.h"
31*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/checks.h"
32*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/logging.h"
33*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/numerics/safe_conversions.h"
34*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/platform_thread.h"
35*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/platform_thread_types.h"
36*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/synchronization/mutex.h"
37*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/thread_annotations.h"
38*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/time_utils.h"
39*d9f75844SAndroid Build Coastguard Worker #include "third_party/libevent/event.h"
40*d9f75844SAndroid Build Coastguard Worker
41*d9f75844SAndroid Build Coastguard Worker namespace webrtc {
42*d9f75844SAndroid Build Coastguard Worker namespace {
43*d9f75844SAndroid Build Coastguard Worker constexpr char kQuit = 1;
44*d9f75844SAndroid Build Coastguard Worker constexpr char kRunTasks = 2;
45*d9f75844SAndroid Build Coastguard Worker
46*d9f75844SAndroid Build Coastguard Worker using Priority = TaskQueueFactory::Priority;
47*d9f75844SAndroid Build Coastguard Worker
48*d9f75844SAndroid Build Coastguard Worker // This ignores the SIGPIPE signal on the calling thread.
49*d9f75844SAndroid Build Coastguard Worker // This signal can be fired when trying to write() to a pipe that's being
50*d9f75844SAndroid Build Coastguard Worker // closed or while closing a pipe that's being written to.
51*d9f75844SAndroid Build Coastguard Worker // We can run into that situation so we ignore this signal and continue as
52*d9f75844SAndroid Build Coastguard Worker // normal.
53*d9f75844SAndroid Build Coastguard Worker // As a side note for this implementation, it would be great if we could safely
54*d9f75844SAndroid Build Coastguard Worker // restore the sigmask, but unfortunately the operation of restoring it, can
55*d9f75844SAndroid Build Coastguard Worker // itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
56*d9f75844SAndroid Build Coastguard Worker // The SIGPIPE signal by default causes the process to be terminated, so we
57*d9f75844SAndroid Build Coastguard Worker // don't want to risk that.
58*d9f75844SAndroid Build Coastguard Worker // An alternative to this approach is to ignore the signal for the whole
59*d9f75844SAndroid Build Coastguard Worker // process:
60*d9f75844SAndroid Build Coastguard Worker // signal(SIGPIPE, SIG_IGN);
IgnoreSigPipeSignalOnCurrentThread()61*d9f75844SAndroid Build Coastguard Worker void IgnoreSigPipeSignalOnCurrentThread() {
62*d9f75844SAndroid Build Coastguard Worker sigset_t sigpipe_mask;
63*d9f75844SAndroid Build Coastguard Worker sigemptyset(&sigpipe_mask);
64*d9f75844SAndroid Build Coastguard Worker sigaddset(&sigpipe_mask, SIGPIPE);
65*d9f75844SAndroid Build Coastguard Worker pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
66*d9f75844SAndroid Build Coastguard Worker }
67*d9f75844SAndroid Build Coastguard Worker
SetNonBlocking(int fd)68*d9f75844SAndroid Build Coastguard Worker bool SetNonBlocking(int fd) {
69*d9f75844SAndroid Build Coastguard Worker const int flags = fcntl(fd, F_GETFL);
70*d9f75844SAndroid Build Coastguard Worker RTC_CHECK(flags != -1);
71*d9f75844SAndroid Build Coastguard Worker return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
72*d9f75844SAndroid Build Coastguard Worker }
73*d9f75844SAndroid Build Coastguard Worker
74*d9f75844SAndroid Build Coastguard Worker // TODO(tommi): This is a hack to support two versions of libevent that we're
75*d9f75844SAndroid Build Coastguard Worker // compatible with. The method we really want to call is event_assign(),
76*d9f75844SAndroid Build Coastguard Worker // since event_set() has been marked as deprecated (and doesn't accept
77*d9f75844SAndroid Build Coastguard Worker // passing event_base__ as a parameter). However, the version of libevent
78*d9f75844SAndroid Build Coastguard Worker // that we have in Chromium, doesn't have event_assign(), so we need to call
79*d9f75844SAndroid Build Coastguard Worker // event_set() there.
EventAssign(struct event * ev,struct event_base * base,int fd,short events,void (* callback)(int,short,void *),void * arg)80*d9f75844SAndroid Build Coastguard Worker void EventAssign(struct event* ev,
81*d9f75844SAndroid Build Coastguard Worker struct event_base* base,
82*d9f75844SAndroid Build Coastguard Worker int fd,
83*d9f75844SAndroid Build Coastguard Worker short events,
84*d9f75844SAndroid Build Coastguard Worker void (*callback)(int, short, void*),
85*d9f75844SAndroid Build Coastguard Worker void* arg) {
86*d9f75844SAndroid Build Coastguard Worker #if defined(_EVENT2_EVENT_H_)
87*d9f75844SAndroid Build Coastguard Worker RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
88*d9f75844SAndroid Build Coastguard Worker #else
89*d9f75844SAndroid Build Coastguard Worker event_set(ev, fd, events, callback, arg);
90*d9f75844SAndroid Build Coastguard Worker RTC_CHECK_EQ(0, event_base_set(base, ev));
91*d9f75844SAndroid Build Coastguard Worker #endif
92*d9f75844SAndroid Build Coastguard Worker }
93*d9f75844SAndroid Build Coastguard Worker
TaskQueuePriorityToThreadPriority(Priority priority)94*d9f75844SAndroid Build Coastguard Worker rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
95*d9f75844SAndroid Build Coastguard Worker switch (priority) {
96*d9f75844SAndroid Build Coastguard Worker case Priority::HIGH:
97*d9f75844SAndroid Build Coastguard Worker return rtc::ThreadPriority::kRealtime;
98*d9f75844SAndroid Build Coastguard Worker case Priority::LOW:
99*d9f75844SAndroid Build Coastguard Worker return rtc::ThreadPriority::kLow;
100*d9f75844SAndroid Build Coastguard Worker case Priority::NORMAL:
101*d9f75844SAndroid Build Coastguard Worker return rtc::ThreadPriority::kNormal;
102*d9f75844SAndroid Build Coastguard Worker }
103*d9f75844SAndroid Build Coastguard Worker }
104*d9f75844SAndroid Build Coastguard Worker
105*d9f75844SAndroid Build Coastguard Worker class TaskQueueLibevent final : public TaskQueueBase {
106*d9f75844SAndroid Build Coastguard Worker public:
107*d9f75844SAndroid Build Coastguard Worker TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority);
108*d9f75844SAndroid Build Coastguard Worker
109*d9f75844SAndroid Build Coastguard Worker void Delete() override;
110*d9f75844SAndroid Build Coastguard Worker void PostTask(absl::AnyInvocable<void() &&> task) override;
111*d9f75844SAndroid Build Coastguard Worker void PostDelayedTask(absl::AnyInvocable<void() &&> task,
112*d9f75844SAndroid Build Coastguard Worker TimeDelta delay) override;
113*d9f75844SAndroid Build Coastguard Worker void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
114*d9f75844SAndroid Build Coastguard Worker TimeDelta delay) override;
115*d9f75844SAndroid Build Coastguard Worker
116*d9f75844SAndroid Build Coastguard Worker private:
117*d9f75844SAndroid Build Coastguard Worker struct TimerEvent;
118*d9f75844SAndroid Build Coastguard Worker
119*d9f75844SAndroid Build Coastguard Worker void PostDelayedTaskOnTaskQueue(absl::AnyInvocable<void() &&> task,
120*d9f75844SAndroid Build Coastguard Worker TimeDelta delay);
121*d9f75844SAndroid Build Coastguard Worker
122*d9f75844SAndroid Build Coastguard Worker ~TaskQueueLibevent() override = default;
123*d9f75844SAndroid Build Coastguard Worker
124*d9f75844SAndroid Build Coastguard Worker static void OnWakeup(int socket, short flags, void* context); // NOLINT
125*d9f75844SAndroid Build Coastguard Worker static void RunTimer(int fd, short flags, void* context); // NOLINT
126*d9f75844SAndroid Build Coastguard Worker
127*d9f75844SAndroid Build Coastguard Worker bool is_active_ = true;
128*d9f75844SAndroid Build Coastguard Worker int wakeup_pipe_in_ = -1;
129*d9f75844SAndroid Build Coastguard Worker int wakeup_pipe_out_ = -1;
130*d9f75844SAndroid Build Coastguard Worker event_base* event_base_;
131*d9f75844SAndroid Build Coastguard Worker event wakeup_event_;
132*d9f75844SAndroid Build Coastguard Worker rtc::PlatformThread thread_;
133*d9f75844SAndroid Build Coastguard Worker Mutex pending_lock_;
134*d9f75844SAndroid Build Coastguard Worker absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> pending_
135*d9f75844SAndroid Build Coastguard Worker RTC_GUARDED_BY(pending_lock_);
136*d9f75844SAndroid Build Coastguard Worker // Holds a list of events pending timers for cleanup when the loop exits.
137*d9f75844SAndroid Build Coastguard Worker std::list<TimerEvent*> pending_timers_;
138*d9f75844SAndroid Build Coastguard Worker };
139*d9f75844SAndroid Build Coastguard Worker
140*d9f75844SAndroid Build Coastguard Worker struct TaskQueueLibevent::TimerEvent {
TimerEventwebrtc::__anon8d624fd20111::TaskQueueLibevent::TimerEvent141*d9f75844SAndroid Build Coastguard Worker TimerEvent(TaskQueueLibevent* task_queue, absl::AnyInvocable<void() &&> task)
142*d9f75844SAndroid Build Coastguard Worker : task_queue(task_queue), task(std::move(task)) {}
~TimerEventwebrtc::__anon8d624fd20111::TaskQueueLibevent::TimerEvent143*d9f75844SAndroid Build Coastguard Worker ~TimerEvent() { event_del(&ev); }
144*d9f75844SAndroid Build Coastguard Worker
145*d9f75844SAndroid Build Coastguard Worker event ev;
146*d9f75844SAndroid Build Coastguard Worker TaskQueueLibevent* task_queue;
147*d9f75844SAndroid Build Coastguard Worker absl::AnyInvocable<void() &&> task;
148*d9f75844SAndroid Build Coastguard Worker };
149*d9f75844SAndroid Build Coastguard Worker
TaskQueueLibevent(absl::string_view queue_name,rtc::ThreadPriority priority)150*d9f75844SAndroid Build Coastguard Worker TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
151*d9f75844SAndroid Build Coastguard Worker rtc::ThreadPriority priority)
152*d9f75844SAndroid Build Coastguard Worker : event_base_(event_base_new()) {
153*d9f75844SAndroid Build Coastguard Worker int fds[2];
154*d9f75844SAndroid Build Coastguard Worker RTC_CHECK(pipe(fds) == 0);
155*d9f75844SAndroid Build Coastguard Worker SetNonBlocking(fds[0]);
156*d9f75844SAndroid Build Coastguard Worker SetNonBlocking(fds[1]);
157*d9f75844SAndroid Build Coastguard Worker wakeup_pipe_out_ = fds[0];
158*d9f75844SAndroid Build Coastguard Worker wakeup_pipe_in_ = fds[1];
159*d9f75844SAndroid Build Coastguard Worker
160*d9f75844SAndroid Build Coastguard Worker EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_,
161*d9f75844SAndroid Build Coastguard Worker EV_READ | EV_PERSIST, OnWakeup, this);
162*d9f75844SAndroid Build Coastguard Worker event_add(&wakeup_event_, 0);
163*d9f75844SAndroid Build Coastguard Worker thread_ = rtc::PlatformThread::SpawnJoinable(
164*d9f75844SAndroid Build Coastguard Worker [this] {
165*d9f75844SAndroid Build Coastguard Worker {
166*d9f75844SAndroid Build Coastguard Worker CurrentTaskQueueSetter set_current(this);
167*d9f75844SAndroid Build Coastguard Worker while (is_active_)
168*d9f75844SAndroid Build Coastguard Worker event_base_loop(event_base_, 0);
169*d9f75844SAndroid Build Coastguard Worker }
170*d9f75844SAndroid Build Coastguard Worker
171*d9f75844SAndroid Build Coastguard Worker for (TimerEvent* timer : pending_timers_)
172*d9f75844SAndroid Build Coastguard Worker delete timer;
173*d9f75844SAndroid Build Coastguard Worker },
174*d9f75844SAndroid Build Coastguard Worker queue_name, rtc::ThreadAttributes().SetPriority(priority));
175*d9f75844SAndroid Build Coastguard Worker }
176*d9f75844SAndroid Build Coastguard Worker
Delete()177*d9f75844SAndroid Build Coastguard Worker void TaskQueueLibevent::Delete() {
178*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(!IsCurrent());
179*d9f75844SAndroid Build Coastguard Worker struct timespec ts;
180*d9f75844SAndroid Build Coastguard Worker char message = kQuit;
181*d9f75844SAndroid Build Coastguard Worker while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
182*d9f75844SAndroid Build Coastguard Worker // The queue is full, so we have no choice but to wait and retry.
183*d9f75844SAndroid Build Coastguard Worker RTC_CHECK_EQ(EAGAIN, errno);
184*d9f75844SAndroid Build Coastguard Worker ts.tv_sec = 0;
185*d9f75844SAndroid Build Coastguard Worker ts.tv_nsec = 1000000;
186*d9f75844SAndroid Build Coastguard Worker nanosleep(&ts, nullptr);
187*d9f75844SAndroid Build Coastguard Worker }
188*d9f75844SAndroid Build Coastguard Worker
189*d9f75844SAndroid Build Coastguard Worker thread_.Finalize();
190*d9f75844SAndroid Build Coastguard Worker
191*d9f75844SAndroid Build Coastguard Worker event_del(&wakeup_event_);
192*d9f75844SAndroid Build Coastguard Worker
193*d9f75844SAndroid Build Coastguard Worker IgnoreSigPipeSignalOnCurrentThread();
194*d9f75844SAndroid Build Coastguard Worker
195*d9f75844SAndroid Build Coastguard Worker close(wakeup_pipe_in_);
196*d9f75844SAndroid Build Coastguard Worker close(wakeup_pipe_out_);
197*d9f75844SAndroid Build Coastguard Worker wakeup_pipe_in_ = -1;
198*d9f75844SAndroid Build Coastguard Worker wakeup_pipe_out_ = -1;
199*d9f75844SAndroid Build Coastguard Worker
200*d9f75844SAndroid Build Coastguard Worker event_base_free(event_base_);
201*d9f75844SAndroid Build Coastguard Worker delete this;
202*d9f75844SAndroid Build Coastguard Worker }
203*d9f75844SAndroid Build Coastguard Worker
PostTask(absl::AnyInvocable<void ()&&> task)204*d9f75844SAndroid Build Coastguard Worker void TaskQueueLibevent::PostTask(absl::AnyInvocable<void() &&> task) {
205*d9f75844SAndroid Build Coastguard Worker {
206*d9f75844SAndroid Build Coastguard Worker MutexLock lock(&pending_lock_);
207*d9f75844SAndroid Build Coastguard Worker bool had_pending_tasks = !pending_.empty();
208*d9f75844SAndroid Build Coastguard Worker pending_.push_back(std::move(task));
209*d9f75844SAndroid Build Coastguard Worker
210*d9f75844SAndroid Build Coastguard Worker // Only write to the pipe if there were no pending tasks before this one
211*d9f75844SAndroid Build Coastguard Worker // since the thread could be sleeping. If there were already pending tasks
212*d9f75844SAndroid Build Coastguard Worker // then we know there's either a pending write in the pipe or the thread has
213*d9f75844SAndroid Build Coastguard Worker // not yet processed the pending tasks. In either case, the thread will
214*d9f75844SAndroid Build Coastguard Worker // eventually wake up and process all pending tasks including this one.
215*d9f75844SAndroid Build Coastguard Worker if (had_pending_tasks) {
216*d9f75844SAndroid Build Coastguard Worker return;
217*d9f75844SAndroid Build Coastguard Worker }
218*d9f75844SAndroid Build Coastguard Worker }
219*d9f75844SAndroid Build Coastguard Worker
220*d9f75844SAndroid Build Coastguard Worker // Note: This behvior outlined above ensures we never fill up the pipe write
221*d9f75844SAndroid Build Coastguard Worker // buffer since there will only ever be 1 byte pending.
222*d9f75844SAndroid Build Coastguard Worker char message = kRunTasks;
223*d9f75844SAndroid Build Coastguard Worker RTC_CHECK_EQ(write(wakeup_pipe_in_, &message, sizeof(message)),
224*d9f75844SAndroid Build Coastguard Worker sizeof(message));
225*d9f75844SAndroid Build Coastguard Worker }
226*d9f75844SAndroid Build Coastguard Worker
PostDelayedTaskOnTaskQueue(absl::AnyInvocable<void ()&&> task,TimeDelta delay)227*d9f75844SAndroid Build Coastguard Worker void TaskQueueLibevent::PostDelayedTaskOnTaskQueue(
228*d9f75844SAndroid Build Coastguard Worker absl::AnyInvocable<void() &&> task,
229*d9f75844SAndroid Build Coastguard Worker TimeDelta delay) {
230*d9f75844SAndroid Build Coastguard Worker // libevent api is not thread safe by default, thus event_add need to be
231*d9f75844SAndroid Build Coastguard Worker // called on the `thread_`.
232*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(IsCurrent());
233*d9f75844SAndroid Build Coastguard Worker
234*d9f75844SAndroid Build Coastguard Worker TimerEvent* timer = new TimerEvent(this, std::move(task));
235*d9f75844SAndroid Build Coastguard Worker EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer,
236*d9f75844SAndroid Build Coastguard Worker timer);
237*d9f75844SAndroid Build Coastguard Worker pending_timers_.push_back(timer);
238*d9f75844SAndroid Build Coastguard Worker timeval tv = {.tv_sec = rtc::dchecked_cast<int>(delay.us() / 1'000'000),
239*d9f75844SAndroid Build Coastguard Worker .tv_usec = rtc::dchecked_cast<int>(delay.us() % 1'000'000)};
240*d9f75844SAndroid Build Coastguard Worker event_add(&timer->ev, &tv);
241*d9f75844SAndroid Build Coastguard Worker }
242*d9f75844SAndroid Build Coastguard Worker
PostDelayedTask(absl::AnyInvocable<void ()&&> task,TimeDelta delay)243*d9f75844SAndroid Build Coastguard Worker void TaskQueueLibevent::PostDelayedTask(absl::AnyInvocable<void() &&> task,
244*d9f75844SAndroid Build Coastguard Worker TimeDelta delay) {
245*d9f75844SAndroid Build Coastguard Worker if (IsCurrent()) {
246*d9f75844SAndroid Build Coastguard Worker PostDelayedTaskOnTaskQueue(std::move(task), delay);
247*d9f75844SAndroid Build Coastguard Worker } else {
248*d9f75844SAndroid Build Coastguard Worker int64_t posted_us = rtc::TimeMicros();
249*d9f75844SAndroid Build Coastguard Worker PostTask([posted_us, delay, task = std::move(task), this]() mutable {
250*d9f75844SAndroid Build Coastguard Worker // Compensate for the time that has passed since the posting.
251*d9f75844SAndroid Build Coastguard Worker TimeDelta post_time = TimeDelta::Micros(rtc::TimeMicros() - posted_us);
252*d9f75844SAndroid Build Coastguard Worker PostDelayedTaskOnTaskQueue(
253*d9f75844SAndroid Build Coastguard Worker std::move(task), std::max(delay - post_time, TimeDelta::Zero()));
254*d9f75844SAndroid Build Coastguard Worker });
255*d9f75844SAndroid Build Coastguard Worker }
256*d9f75844SAndroid Build Coastguard Worker }
257*d9f75844SAndroid Build Coastguard Worker
PostDelayedHighPrecisionTask(absl::AnyInvocable<void ()&&> task,TimeDelta delay)258*d9f75844SAndroid Build Coastguard Worker void TaskQueueLibevent::PostDelayedHighPrecisionTask(
259*d9f75844SAndroid Build Coastguard Worker absl::AnyInvocable<void() &&> task,
260*d9f75844SAndroid Build Coastguard Worker TimeDelta delay) {
261*d9f75844SAndroid Build Coastguard Worker PostDelayedTask(std::move(task), delay);
262*d9f75844SAndroid Build Coastguard Worker }
263*d9f75844SAndroid Build Coastguard Worker
264*d9f75844SAndroid Build Coastguard Worker // static
OnWakeup(int socket,short flags,void * context)265*d9f75844SAndroid Build Coastguard Worker void TaskQueueLibevent::OnWakeup(int socket,
266*d9f75844SAndroid Build Coastguard Worker short flags, // NOLINT
267*d9f75844SAndroid Build Coastguard Worker void* context) {
268*d9f75844SAndroid Build Coastguard Worker TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
269*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(me->wakeup_pipe_out_ == socket);
270*d9f75844SAndroid Build Coastguard Worker char buf;
271*d9f75844SAndroid Build Coastguard Worker RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
272*d9f75844SAndroid Build Coastguard Worker switch (buf) {
273*d9f75844SAndroid Build Coastguard Worker case kQuit:
274*d9f75844SAndroid Build Coastguard Worker me->is_active_ = false;
275*d9f75844SAndroid Build Coastguard Worker event_base_loopbreak(me->event_base_);
276*d9f75844SAndroid Build Coastguard Worker break;
277*d9f75844SAndroid Build Coastguard Worker case kRunTasks: {
278*d9f75844SAndroid Build Coastguard Worker absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> tasks;
279*d9f75844SAndroid Build Coastguard Worker {
280*d9f75844SAndroid Build Coastguard Worker MutexLock lock(&me->pending_lock_);
281*d9f75844SAndroid Build Coastguard Worker tasks.swap(me->pending_);
282*d9f75844SAndroid Build Coastguard Worker }
283*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(!tasks.empty());
284*d9f75844SAndroid Build Coastguard Worker for (auto& task : tasks) {
285*d9f75844SAndroid Build Coastguard Worker std::move(task)();
286*d9f75844SAndroid Build Coastguard Worker // Prefer to delete the `task` before running the next one.
287*d9f75844SAndroid Build Coastguard Worker task = nullptr;
288*d9f75844SAndroid Build Coastguard Worker }
289*d9f75844SAndroid Build Coastguard Worker break;
290*d9f75844SAndroid Build Coastguard Worker }
291*d9f75844SAndroid Build Coastguard Worker default:
292*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK_NOTREACHED();
293*d9f75844SAndroid Build Coastguard Worker break;
294*d9f75844SAndroid Build Coastguard Worker }
295*d9f75844SAndroid Build Coastguard Worker }
296*d9f75844SAndroid Build Coastguard Worker
297*d9f75844SAndroid Build Coastguard Worker // static
RunTimer(int fd,short flags,void * context)298*d9f75844SAndroid Build Coastguard Worker void TaskQueueLibevent::RunTimer(int fd,
299*d9f75844SAndroid Build Coastguard Worker short flags, // NOLINT
300*d9f75844SAndroid Build Coastguard Worker void* context) {
301*d9f75844SAndroid Build Coastguard Worker TimerEvent* timer = static_cast<TimerEvent*>(context);
302*d9f75844SAndroid Build Coastguard Worker std::move(timer->task)();
303*d9f75844SAndroid Build Coastguard Worker timer->task_queue->pending_timers_.remove(timer);
304*d9f75844SAndroid Build Coastguard Worker delete timer;
305*d9f75844SAndroid Build Coastguard Worker }
306*d9f75844SAndroid Build Coastguard Worker
307*d9f75844SAndroid Build Coastguard Worker class TaskQueueLibeventFactory final : public TaskQueueFactory {
308*d9f75844SAndroid Build Coastguard Worker public:
CreateTaskQueue(absl::string_view name,Priority priority) const309*d9f75844SAndroid Build Coastguard Worker std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
310*d9f75844SAndroid Build Coastguard Worker absl::string_view name,
311*d9f75844SAndroid Build Coastguard Worker Priority priority) const override {
312*d9f75844SAndroid Build Coastguard Worker return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
313*d9f75844SAndroid Build Coastguard Worker new TaskQueueLibevent(name,
314*d9f75844SAndroid Build Coastguard Worker TaskQueuePriorityToThreadPriority(priority)));
315*d9f75844SAndroid Build Coastguard Worker }
316*d9f75844SAndroid Build Coastguard Worker };
317*d9f75844SAndroid Build Coastguard Worker
318*d9f75844SAndroid Build Coastguard Worker } // namespace
319*d9f75844SAndroid Build Coastguard Worker
CreateTaskQueueLibeventFactory()320*d9f75844SAndroid Build Coastguard Worker std::unique_ptr<TaskQueueFactory> CreateTaskQueueLibeventFactory() {
321*d9f75844SAndroid Build Coastguard Worker return std::make_unique<TaskQueueLibeventFactory>();
322*d9f75844SAndroid Build Coastguard Worker }
323*d9f75844SAndroid Build Coastguard Worker
324*d9f75844SAndroid Build Coastguard Worker } // namespace webrtc
325