xref: /aosp_15_r20/external/webrtc/rtc_base/task_queue_libevent.cc (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1*d9f75844SAndroid Build Coastguard Worker /*
2*d9f75844SAndroid Build Coastguard Worker  *  Copyright 2016 The WebRTC Project Authors. All rights reserved.
3*d9f75844SAndroid Build Coastguard Worker  *
4*d9f75844SAndroid Build Coastguard Worker  *  Use of this source code is governed by a BSD-style license
5*d9f75844SAndroid Build Coastguard Worker  *  that can be found in the LICENSE file in the root of the source
6*d9f75844SAndroid Build Coastguard Worker  *  tree. An additional intellectual property rights grant can be found
7*d9f75844SAndroid Build Coastguard Worker  *  in the file PATENTS.  All contributing project authors may
8*d9f75844SAndroid Build Coastguard Worker  *  be found in the AUTHORS file in the root of the source tree.
9*d9f75844SAndroid Build Coastguard Worker  */
10*d9f75844SAndroid Build Coastguard Worker 
11*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/task_queue_libevent.h"
12*d9f75844SAndroid Build Coastguard Worker 
13*d9f75844SAndroid Build Coastguard Worker #include <errno.h>
14*d9f75844SAndroid Build Coastguard Worker #include <fcntl.h>
15*d9f75844SAndroid Build Coastguard Worker #include <pthread.h>
16*d9f75844SAndroid Build Coastguard Worker #include <signal.h>
17*d9f75844SAndroid Build Coastguard Worker #include <stdint.h>
18*d9f75844SAndroid Build Coastguard Worker #include <time.h>
19*d9f75844SAndroid Build Coastguard Worker #include <unistd.h>
20*d9f75844SAndroid Build Coastguard Worker 
21*d9f75844SAndroid Build Coastguard Worker #include <list>
22*d9f75844SAndroid Build Coastguard Worker #include <memory>
23*d9f75844SAndroid Build Coastguard Worker #include <type_traits>
24*d9f75844SAndroid Build Coastguard Worker #include <utility>
25*d9f75844SAndroid Build Coastguard Worker 
26*d9f75844SAndroid Build Coastguard Worker #include "absl/container/inlined_vector.h"
27*d9f75844SAndroid Build Coastguard Worker #include "absl/functional/any_invocable.h"
28*d9f75844SAndroid Build Coastguard Worker #include "absl/strings/string_view.h"
29*d9f75844SAndroid Build Coastguard Worker #include "api/task_queue/task_queue_base.h"
30*d9f75844SAndroid Build Coastguard Worker #include "api/units/time_delta.h"
31*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/checks.h"
32*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/logging.h"
33*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/numerics/safe_conversions.h"
34*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/platform_thread.h"
35*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/platform_thread_types.h"
36*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/synchronization/mutex.h"
37*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/thread_annotations.h"
38*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/time_utils.h"
39*d9f75844SAndroid Build Coastguard Worker #include "third_party/libevent/event.h"
40*d9f75844SAndroid Build Coastguard Worker 
41*d9f75844SAndroid Build Coastguard Worker namespace webrtc {
42*d9f75844SAndroid Build Coastguard Worker namespace {
43*d9f75844SAndroid Build Coastguard Worker constexpr char kQuit = 1;
44*d9f75844SAndroid Build Coastguard Worker constexpr char kRunTasks = 2;
45*d9f75844SAndroid Build Coastguard Worker 
46*d9f75844SAndroid Build Coastguard Worker using Priority = TaskQueueFactory::Priority;
47*d9f75844SAndroid Build Coastguard Worker 
48*d9f75844SAndroid Build Coastguard Worker // This ignores the SIGPIPE signal on the calling thread.
49*d9f75844SAndroid Build Coastguard Worker // This signal can be fired when trying to write() to a pipe that's being
50*d9f75844SAndroid Build Coastguard Worker // closed or while closing a pipe that's being written to.
51*d9f75844SAndroid Build Coastguard Worker // We can run into that situation so we ignore this signal and continue as
52*d9f75844SAndroid Build Coastguard Worker // normal.
53*d9f75844SAndroid Build Coastguard Worker // As a side note for this implementation, it would be great if we could safely
54*d9f75844SAndroid Build Coastguard Worker // restore the sigmask, but unfortunately the operation of restoring it, can
55*d9f75844SAndroid Build Coastguard Worker // itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
56*d9f75844SAndroid Build Coastguard Worker // The SIGPIPE signal by default causes the process to be terminated, so we
57*d9f75844SAndroid Build Coastguard Worker // don't want to risk that.
58*d9f75844SAndroid Build Coastguard Worker // An alternative to this approach is to ignore the signal for the whole
59*d9f75844SAndroid Build Coastguard Worker // process:
60*d9f75844SAndroid Build Coastguard Worker //   signal(SIGPIPE, SIG_IGN);
IgnoreSigPipeSignalOnCurrentThread()61*d9f75844SAndroid Build Coastguard Worker void IgnoreSigPipeSignalOnCurrentThread() {
62*d9f75844SAndroid Build Coastguard Worker   sigset_t sigpipe_mask;
63*d9f75844SAndroid Build Coastguard Worker   sigemptyset(&sigpipe_mask);
64*d9f75844SAndroid Build Coastguard Worker   sigaddset(&sigpipe_mask, SIGPIPE);
65*d9f75844SAndroid Build Coastguard Worker   pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
66*d9f75844SAndroid Build Coastguard Worker }
67*d9f75844SAndroid Build Coastguard Worker 
SetNonBlocking(int fd)68*d9f75844SAndroid Build Coastguard Worker bool SetNonBlocking(int fd) {
69*d9f75844SAndroid Build Coastguard Worker   const int flags = fcntl(fd, F_GETFL);
70*d9f75844SAndroid Build Coastguard Worker   RTC_CHECK(flags != -1);
71*d9f75844SAndroid Build Coastguard Worker   return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
72*d9f75844SAndroid Build Coastguard Worker }
73*d9f75844SAndroid Build Coastguard Worker 
74*d9f75844SAndroid Build Coastguard Worker // TODO(tommi): This is a hack to support two versions of libevent that we're
75*d9f75844SAndroid Build Coastguard Worker // compatible with.  The method we really want to call is event_assign(),
76*d9f75844SAndroid Build Coastguard Worker // since event_set() has been marked as deprecated (and doesn't accept
77*d9f75844SAndroid Build Coastguard Worker // passing event_base__ as a parameter).  However, the version of libevent
78*d9f75844SAndroid Build Coastguard Worker // that we have in Chromium, doesn't have event_assign(), so we need to call
79*d9f75844SAndroid Build Coastguard Worker // event_set() there.
EventAssign(struct event * ev,struct event_base * base,int fd,short events,void (* callback)(int,short,void *),void * arg)80*d9f75844SAndroid Build Coastguard Worker void EventAssign(struct event* ev,
81*d9f75844SAndroid Build Coastguard Worker                  struct event_base* base,
82*d9f75844SAndroid Build Coastguard Worker                  int fd,
83*d9f75844SAndroid Build Coastguard Worker                  short events,
84*d9f75844SAndroid Build Coastguard Worker                  void (*callback)(int, short, void*),
85*d9f75844SAndroid Build Coastguard Worker                  void* arg) {
86*d9f75844SAndroid Build Coastguard Worker #if defined(_EVENT2_EVENT_H_)
87*d9f75844SAndroid Build Coastguard Worker   RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
88*d9f75844SAndroid Build Coastguard Worker #else
89*d9f75844SAndroid Build Coastguard Worker   event_set(ev, fd, events, callback, arg);
90*d9f75844SAndroid Build Coastguard Worker   RTC_CHECK_EQ(0, event_base_set(base, ev));
91*d9f75844SAndroid Build Coastguard Worker #endif
92*d9f75844SAndroid Build Coastguard Worker }
93*d9f75844SAndroid Build Coastguard Worker 
TaskQueuePriorityToThreadPriority(Priority priority)94*d9f75844SAndroid Build Coastguard Worker rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
95*d9f75844SAndroid Build Coastguard Worker   switch (priority) {
96*d9f75844SAndroid Build Coastguard Worker     case Priority::HIGH:
97*d9f75844SAndroid Build Coastguard Worker       return rtc::ThreadPriority::kRealtime;
98*d9f75844SAndroid Build Coastguard Worker     case Priority::LOW:
99*d9f75844SAndroid Build Coastguard Worker       return rtc::ThreadPriority::kLow;
100*d9f75844SAndroid Build Coastguard Worker     case Priority::NORMAL:
101*d9f75844SAndroid Build Coastguard Worker       return rtc::ThreadPriority::kNormal;
102*d9f75844SAndroid Build Coastguard Worker   }
103*d9f75844SAndroid Build Coastguard Worker }
104*d9f75844SAndroid Build Coastguard Worker 
105*d9f75844SAndroid Build Coastguard Worker class TaskQueueLibevent final : public TaskQueueBase {
106*d9f75844SAndroid Build Coastguard Worker  public:
107*d9f75844SAndroid Build Coastguard Worker   TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority);
108*d9f75844SAndroid Build Coastguard Worker 
109*d9f75844SAndroid Build Coastguard Worker   void Delete() override;
110*d9f75844SAndroid Build Coastguard Worker   void PostTask(absl::AnyInvocable<void() &&> task) override;
111*d9f75844SAndroid Build Coastguard Worker   void PostDelayedTask(absl::AnyInvocable<void() &&> task,
112*d9f75844SAndroid Build Coastguard Worker                        TimeDelta delay) override;
113*d9f75844SAndroid Build Coastguard Worker   void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
114*d9f75844SAndroid Build Coastguard Worker                                     TimeDelta delay) override;
115*d9f75844SAndroid Build Coastguard Worker 
116*d9f75844SAndroid Build Coastguard Worker  private:
117*d9f75844SAndroid Build Coastguard Worker   struct TimerEvent;
118*d9f75844SAndroid Build Coastguard Worker 
119*d9f75844SAndroid Build Coastguard Worker   void PostDelayedTaskOnTaskQueue(absl::AnyInvocable<void() &&> task,
120*d9f75844SAndroid Build Coastguard Worker                                   TimeDelta delay);
121*d9f75844SAndroid Build Coastguard Worker 
122*d9f75844SAndroid Build Coastguard Worker   ~TaskQueueLibevent() override = default;
123*d9f75844SAndroid Build Coastguard Worker 
124*d9f75844SAndroid Build Coastguard Worker   static void OnWakeup(int socket, short flags, void* context);  // NOLINT
125*d9f75844SAndroid Build Coastguard Worker   static void RunTimer(int fd, short flags, void* context);      // NOLINT
126*d9f75844SAndroid Build Coastguard Worker 
127*d9f75844SAndroid Build Coastguard Worker   bool is_active_ = true;
128*d9f75844SAndroid Build Coastguard Worker   int wakeup_pipe_in_ = -1;
129*d9f75844SAndroid Build Coastguard Worker   int wakeup_pipe_out_ = -1;
130*d9f75844SAndroid Build Coastguard Worker   event_base* event_base_;
131*d9f75844SAndroid Build Coastguard Worker   event wakeup_event_;
132*d9f75844SAndroid Build Coastguard Worker   rtc::PlatformThread thread_;
133*d9f75844SAndroid Build Coastguard Worker   Mutex pending_lock_;
134*d9f75844SAndroid Build Coastguard Worker   absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> pending_
135*d9f75844SAndroid Build Coastguard Worker       RTC_GUARDED_BY(pending_lock_);
136*d9f75844SAndroid Build Coastguard Worker   // Holds a list of events pending timers for cleanup when the loop exits.
137*d9f75844SAndroid Build Coastguard Worker   std::list<TimerEvent*> pending_timers_;
138*d9f75844SAndroid Build Coastguard Worker };
139*d9f75844SAndroid Build Coastguard Worker 
140*d9f75844SAndroid Build Coastguard Worker struct TaskQueueLibevent::TimerEvent {
TimerEventwebrtc::__anon8d624fd20111::TaskQueueLibevent::TimerEvent141*d9f75844SAndroid Build Coastguard Worker   TimerEvent(TaskQueueLibevent* task_queue, absl::AnyInvocable<void() &&> task)
142*d9f75844SAndroid Build Coastguard Worker       : task_queue(task_queue), task(std::move(task)) {}
~TimerEventwebrtc::__anon8d624fd20111::TaskQueueLibevent::TimerEvent143*d9f75844SAndroid Build Coastguard Worker   ~TimerEvent() { event_del(&ev); }
144*d9f75844SAndroid Build Coastguard Worker 
145*d9f75844SAndroid Build Coastguard Worker   event ev;
146*d9f75844SAndroid Build Coastguard Worker   TaskQueueLibevent* task_queue;
147*d9f75844SAndroid Build Coastguard Worker   absl::AnyInvocable<void() &&> task;
148*d9f75844SAndroid Build Coastguard Worker };
149*d9f75844SAndroid Build Coastguard Worker 
TaskQueueLibevent(absl::string_view queue_name,rtc::ThreadPriority priority)150*d9f75844SAndroid Build Coastguard Worker TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
151*d9f75844SAndroid Build Coastguard Worker                                      rtc::ThreadPriority priority)
152*d9f75844SAndroid Build Coastguard Worker     : event_base_(event_base_new()) {
153*d9f75844SAndroid Build Coastguard Worker   int fds[2];
154*d9f75844SAndroid Build Coastguard Worker   RTC_CHECK(pipe(fds) == 0);
155*d9f75844SAndroid Build Coastguard Worker   SetNonBlocking(fds[0]);
156*d9f75844SAndroid Build Coastguard Worker   SetNonBlocking(fds[1]);
157*d9f75844SAndroid Build Coastguard Worker   wakeup_pipe_out_ = fds[0];
158*d9f75844SAndroid Build Coastguard Worker   wakeup_pipe_in_ = fds[1];
159*d9f75844SAndroid Build Coastguard Worker 
160*d9f75844SAndroid Build Coastguard Worker   EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_,
161*d9f75844SAndroid Build Coastguard Worker               EV_READ | EV_PERSIST, OnWakeup, this);
162*d9f75844SAndroid Build Coastguard Worker   event_add(&wakeup_event_, 0);
163*d9f75844SAndroid Build Coastguard Worker   thread_ = rtc::PlatformThread::SpawnJoinable(
164*d9f75844SAndroid Build Coastguard Worker       [this] {
165*d9f75844SAndroid Build Coastguard Worker         {
166*d9f75844SAndroid Build Coastguard Worker           CurrentTaskQueueSetter set_current(this);
167*d9f75844SAndroid Build Coastguard Worker           while (is_active_)
168*d9f75844SAndroid Build Coastguard Worker             event_base_loop(event_base_, 0);
169*d9f75844SAndroid Build Coastguard Worker         }
170*d9f75844SAndroid Build Coastguard Worker 
171*d9f75844SAndroid Build Coastguard Worker         for (TimerEvent* timer : pending_timers_)
172*d9f75844SAndroid Build Coastguard Worker           delete timer;
173*d9f75844SAndroid Build Coastguard Worker       },
174*d9f75844SAndroid Build Coastguard Worker       queue_name, rtc::ThreadAttributes().SetPriority(priority));
175*d9f75844SAndroid Build Coastguard Worker }
176*d9f75844SAndroid Build Coastguard Worker 
Delete()177*d9f75844SAndroid Build Coastguard Worker void TaskQueueLibevent::Delete() {
178*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(!IsCurrent());
179*d9f75844SAndroid Build Coastguard Worker   struct timespec ts;
180*d9f75844SAndroid Build Coastguard Worker   char message = kQuit;
181*d9f75844SAndroid Build Coastguard Worker   while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
182*d9f75844SAndroid Build Coastguard Worker     // The queue is full, so we have no choice but to wait and retry.
183*d9f75844SAndroid Build Coastguard Worker     RTC_CHECK_EQ(EAGAIN, errno);
184*d9f75844SAndroid Build Coastguard Worker     ts.tv_sec = 0;
185*d9f75844SAndroid Build Coastguard Worker     ts.tv_nsec = 1000000;
186*d9f75844SAndroid Build Coastguard Worker     nanosleep(&ts, nullptr);
187*d9f75844SAndroid Build Coastguard Worker   }
188*d9f75844SAndroid Build Coastguard Worker 
189*d9f75844SAndroid Build Coastguard Worker   thread_.Finalize();
190*d9f75844SAndroid Build Coastguard Worker 
191*d9f75844SAndroid Build Coastguard Worker   event_del(&wakeup_event_);
192*d9f75844SAndroid Build Coastguard Worker 
193*d9f75844SAndroid Build Coastguard Worker   IgnoreSigPipeSignalOnCurrentThread();
194*d9f75844SAndroid Build Coastguard Worker 
195*d9f75844SAndroid Build Coastguard Worker   close(wakeup_pipe_in_);
196*d9f75844SAndroid Build Coastguard Worker   close(wakeup_pipe_out_);
197*d9f75844SAndroid Build Coastguard Worker   wakeup_pipe_in_ = -1;
198*d9f75844SAndroid Build Coastguard Worker   wakeup_pipe_out_ = -1;
199*d9f75844SAndroid Build Coastguard Worker 
200*d9f75844SAndroid Build Coastguard Worker   event_base_free(event_base_);
201*d9f75844SAndroid Build Coastguard Worker   delete this;
202*d9f75844SAndroid Build Coastguard Worker }
203*d9f75844SAndroid Build Coastguard Worker 
PostTask(absl::AnyInvocable<void ()&&> task)204*d9f75844SAndroid Build Coastguard Worker void TaskQueueLibevent::PostTask(absl::AnyInvocable<void() &&> task) {
205*d9f75844SAndroid Build Coastguard Worker   {
206*d9f75844SAndroid Build Coastguard Worker     MutexLock lock(&pending_lock_);
207*d9f75844SAndroid Build Coastguard Worker     bool had_pending_tasks = !pending_.empty();
208*d9f75844SAndroid Build Coastguard Worker     pending_.push_back(std::move(task));
209*d9f75844SAndroid Build Coastguard Worker 
210*d9f75844SAndroid Build Coastguard Worker     // Only write to the pipe if there were no pending tasks before this one
211*d9f75844SAndroid Build Coastguard Worker     // since the thread could be sleeping. If there were already pending tasks
212*d9f75844SAndroid Build Coastguard Worker     // then we know there's either a pending write in the pipe or the thread has
213*d9f75844SAndroid Build Coastguard Worker     // not yet processed the pending tasks. In either case, the thread will
214*d9f75844SAndroid Build Coastguard Worker     // eventually wake up and process all pending tasks including this one.
215*d9f75844SAndroid Build Coastguard Worker     if (had_pending_tasks) {
216*d9f75844SAndroid Build Coastguard Worker       return;
217*d9f75844SAndroid Build Coastguard Worker     }
218*d9f75844SAndroid Build Coastguard Worker   }
219*d9f75844SAndroid Build Coastguard Worker 
220*d9f75844SAndroid Build Coastguard Worker   // Note: This behvior outlined above ensures we never fill up the pipe write
221*d9f75844SAndroid Build Coastguard Worker   // buffer since there will only ever be 1 byte pending.
222*d9f75844SAndroid Build Coastguard Worker   char message = kRunTasks;
223*d9f75844SAndroid Build Coastguard Worker   RTC_CHECK_EQ(write(wakeup_pipe_in_, &message, sizeof(message)),
224*d9f75844SAndroid Build Coastguard Worker                sizeof(message));
225*d9f75844SAndroid Build Coastguard Worker }
226*d9f75844SAndroid Build Coastguard Worker 
PostDelayedTaskOnTaskQueue(absl::AnyInvocable<void ()&&> task,TimeDelta delay)227*d9f75844SAndroid Build Coastguard Worker void TaskQueueLibevent::PostDelayedTaskOnTaskQueue(
228*d9f75844SAndroid Build Coastguard Worker     absl::AnyInvocable<void() &&> task,
229*d9f75844SAndroid Build Coastguard Worker     TimeDelta delay) {
230*d9f75844SAndroid Build Coastguard Worker   // libevent api is not thread safe by default, thus event_add need to be
231*d9f75844SAndroid Build Coastguard Worker   // called on the `thread_`.
232*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(IsCurrent());
233*d9f75844SAndroid Build Coastguard Worker 
234*d9f75844SAndroid Build Coastguard Worker   TimerEvent* timer = new TimerEvent(this, std::move(task));
235*d9f75844SAndroid Build Coastguard Worker   EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer,
236*d9f75844SAndroid Build Coastguard Worker               timer);
237*d9f75844SAndroid Build Coastguard Worker   pending_timers_.push_back(timer);
238*d9f75844SAndroid Build Coastguard Worker   timeval tv = {.tv_sec = rtc::dchecked_cast<int>(delay.us() / 1'000'000),
239*d9f75844SAndroid Build Coastguard Worker                 .tv_usec = rtc::dchecked_cast<int>(delay.us() % 1'000'000)};
240*d9f75844SAndroid Build Coastguard Worker   event_add(&timer->ev, &tv);
241*d9f75844SAndroid Build Coastguard Worker }
242*d9f75844SAndroid Build Coastguard Worker 
PostDelayedTask(absl::AnyInvocable<void ()&&> task,TimeDelta delay)243*d9f75844SAndroid Build Coastguard Worker void TaskQueueLibevent::PostDelayedTask(absl::AnyInvocable<void() &&> task,
244*d9f75844SAndroid Build Coastguard Worker                                         TimeDelta delay) {
245*d9f75844SAndroid Build Coastguard Worker   if (IsCurrent()) {
246*d9f75844SAndroid Build Coastguard Worker     PostDelayedTaskOnTaskQueue(std::move(task), delay);
247*d9f75844SAndroid Build Coastguard Worker   } else {
248*d9f75844SAndroid Build Coastguard Worker     int64_t posted_us = rtc::TimeMicros();
249*d9f75844SAndroid Build Coastguard Worker     PostTask([posted_us, delay, task = std::move(task), this]() mutable {
250*d9f75844SAndroid Build Coastguard Worker       // Compensate for the time that has passed since the posting.
251*d9f75844SAndroid Build Coastguard Worker       TimeDelta post_time = TimeDelta::Micros(rtc::TimeMicros() - posted_us);
252*d9f75844SAndroid Build Coastguard Worker       PostDelayedTaskOnTaskQueue(
253*d9f75844SAndroid Build Coastguard Worker           std::move(task), std::max(delay - post_time, TimeDelta::Zero()));
254*d9f75844SAndroid Build Coastguard Worker     });
255*d9f75844SAndroid Build Coastguard Worker   }
256*d9f75844SAndroid Build Coastguard Worker }
257*d9f75844SAndroid Build Coastguard Worker 
PostDelayedHighPrecisionTask(absl::AnyInvocable<void ()&&> task,TimeDelta delay)258*d9f75844SAndroid Build Coastguard Worker void TaskQueueLibevent::PostDelayedHighPrecisionTask(
259*d9f75844SAndroid Build Coastguard Worker     absl::AnyInvocable<void() &&> task,
260*d9f75844SAndroid Build Coastguard Worker     TimeDelta delay) {
261*d9f75844SAndroid Build Coastguard Worker   PostDelayedTask(std::move(task), delay);
262*d9f75844SAndroid Build Coastguard Worker }
263*d9f75844SAndroid Build Coastguard Worker 
264*d9f75844SAndroid Build Coastguard Worker // static
OnWakeup(int socket,short flags,void * context)265*d9f75844SAndroid Build Coastguard Worker void TaskQueueLibevent::OnWakeup(int socket,
266*d9f75844SAndroid Build Coastguard Worker                                  short flags,  // NOLINT
267*d9f75844SAndroid Build Coastguard Worker                                  void* context) {
268*d9f75844SAndroid Build Coastguard Worker   TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
269*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(me->wakeup_pipe_out_ == socket);
270*d9f75844SAndroid Build Coastguard Worker   char buf;
271*d9f75844SAndroid Build Coastguard Worker   RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
272*d9f75844SAndroid Build Coastguard Worker   switch (buf) {
273*d9f75844SAndroid Build Coastguard Worker     case kQuit:
274*d9f75844SAndroid Build Coastguard Worker       me->is_active_ = false;
275*d9f75844SAndroid Build Coastguard Worker       event_base_loopbreak(me->event_base_);
276*d9f75844SAndroid Build Coastguard Worker       break;
277*d9f75844SAndroid Build Coastguard Worker     case kRunTasks: {
278*d9f75844SAndroid Build Coastguard Worker       absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> tasks;
279*d9f75844SAndroid Build Coastguard Worker       {
280*d9f75844SAndroid Build Coastguard Worker         MutexLock lock(&me->pending_lock_);
281*d9f75844SAndroid Build Coastguard Worker         tasks.swap(me->pending_);
282*d9f75844SAndroid Build Coastguard Worker       }
283*d9f75844SAndroid Build Coastguard Worker       RTC_DCHECK(!tasks.empty());
284*d9f75844SAndroid Build Coastguard Worker       for (auto& task : tasks) {
285*d9f75844SAndroid Build Coastguard Worker         std::move(task)();
286*d9f75844SAndroid Build Coastguard Worker         // Prefer to delete the `task` before running the next one.
287*d9f75844SAndroid Build Coastguard Worker         task = nullptr;
288*d9f75844SAndroid Build Coastguard Worker       }
289*d9f75844SAndroid Build Coastguard Worker       break;
290*d9f75844SAndroid Build Coastguard Worker     }
291*d9f75844SAndroid Build Coastguard Worker     default:
292*d9f75844SAndroid Build Coastguard Worker       RTC_DCHECK_NOTREACHED();
293*d9f75844SAndroid Build Coastguard Worker       break;
294*d9f75844SAndroid Build Coastguard Worker   }
295*d9f75844SAndroid Build Coastguard Worker }
296*d9f75844SAndroid Build Coastguard Worker 
297*d9f75844SAndroid Build Coastguard Worker // static
RunTimer(int fd,short flags,void * context)298*d9f75844SAndroid Build Coastguard Worker void TaskQueueLibevent::RunTimer(int fd,
299*d9f75844SAndroid Build Coastguard Worker                                  short flags,  // NOLINT
300*d9f75844SAndroid Build Coastguard Worker                                  void* context) {
301*d9f75844SAndroid Build Coastguard Worker   TimerEvent* timer = static_cast<TimerEvent*>(context);
302*d9f75844SAndroid Build Coastguard Worker   std::move(timer->task)();
303*d9f75844SAndroid Build Coastguard Worker   timer->task_queue->pending_timers_.remove(timer);
304*d9f75844SAndroid Build Coastguard Worker   delete timer;
305*d9f75844SAndroid Build Coastguard Worker }
306*d9f75844SAndroid Build Coastguard Worker 
307*d9f75844SAndroid Build Coastguard Worker class TaskQueueLibeventFactory final : public TaskQueueFactory {
308*d9f75844SAndroid Build Coastguard Worker  public:
CreateTaskQueue(absl::string_view name,Priority priority) const309*d9f75844SAndroid Build Coastguard Worker   std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
310*d9f75844SAndroid Build Coastguard Worker       absl::string_view name,
311*d9f75844SAndroid Build Coastguard Worker       Priority priority) const override {
312*d9f75844SAndroid Build Coastguard Worker     return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
313*d9f75844SAndroid Build Coastguard Worker         new TaskQueueLibevent(name,
314*d9f75844SAndroid Build Coastguard Worker                               TaskQueuePriorityToThreadPriority(priority)));
315*d9f75844SAndroid Build Coastguard Worker   }
316*d9f75844SAndroid Build Coastguard Worker };
317*d9f75844SAndroid Build Coastguard Worker 
318*d9f75844SAndroid Build Coastguard Worker }  // namespace
319*d9f75844SAndroid Build Coastguard Worker 
CreateTaskQueueLibeventFactory()320*d9f75844SAndroid Build Coastguard Worker std::unique_ptr<TaskQueueFactory> CreateTaskQueueLibeventFactory() {
321*d9f75844SAndroid Build Coastguard Worker   return std::make_unique<TaskQueueLibeventFactory>();
322*d9f75844SAndroid Build Coastguard Worker }
323*d9f75844SAndroid Build Coastguard Worker 
324*d9f75844SAndroid Build Coastguard Worker }  // namespace webrtc
325