xref: /aosp_15_r20/external/webrtc/rtc_base/task_queue_win.cc (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1 /*
2  *  Copyright 2016 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "rtc_base/task_queue_win.h"
12 
13 // clang-format off
14 // clang formating would change include order.
15 
16 // Include winsock2.h before including <windows.h> to maintain consistency with
17 // win32.h. To include win32.h directly, it must be broken out into its own
18 // build target.
19 #include <winsock2.h>
20 #include <windows.h>
21 #include <sal.h>       // Must come after windows headers.
22 #include <mmsystem.h>  // Must come after windows headers.
23 // clang-format on
24 #include <string.h>
25 
26 #include <algorithm>
27 #include <functional>
28 #include <memory>
29 #include <queue>
30 #include <utility>
31 
32 #include "absl/functional/any_invocable.h"
33 #include "absl/strings/string_view.h"
34 #include "absl/types/optional.h"
35 #include "api/task_queue/task_queue_base.h"
36 #include "api/units/time_delta.h"
37 #include "api/units/timestamp.h"
38 #include "rtc_base/arraysize.h"
39 #include "rtc_base/checks.h"
40 #include "rtc_base/event.h"
41 #include "rtc_base/logging.h"
42 #include "rtc_base/numerics/safe_conversions.h"
43 #include "rtc_base/platform_thread.h"
44 #include "rtc_base/synchronization/mutex.h"
45 #include "rtc_base/time_utils.h"
46 
47 namespace webrtc {
48 namespace {
49 #define WM_QUEUE_DELAYED_TASK WM_USER + 2
50 
InitializeQueueThread(ULONG_PTR param)51 void CALLBACK InitializeQueueThread(ULONG_PTR param) {
52   MSG msg;
53   ::PeekMessage(&msg, nullptr, WM_USER, WM_USER, PM_NOREMOVE);
54   rtc::Event* data = reinterpret_cast<rtc::Event*>(param);
55   data->Set();
56 }
57 
TaskQueuePriorityToThreadPriority(TaskQueueFactory::Priority priority)58 rtc::ThreadPriority TaskQueuePriorityToThreadPriority(
59     TaskQueueFactory::Priority priority) {
60   switch (priority) {
61     case TaskQueueFactory::Priority::HIGH:
62       return rtc::ThreadPriority::kRealtime;
63     case TaskQueueFactory::Priority::LOW:
64       return rtc::ThreadPriority::kLow;
65     case TaskQueueFactory::Priority::NORMAL:
66       return rtc::ThreadPriority::kNormal;
67   }
68 }
69 
CurrentTime()70 Timestamp CurrentTime() {
71   static const UINT kPeriod = 1;
72   bool high_res = (timeBeginPeriod(kPeriod) == TIMERR_NOERROR);
73   Timestamp ret = Timestamp::Micros(rtc::TimeMicros());
74   if (high_res)
75     timeEndPeriod(kPeriod);
76   return ret;
77 }
78 
79 class DelayedTaskInfo {
80  public:
81   // Default ctor needed to support priority_queue::pop().
DelayedTaskInfo()82   DelayedTaskInfo() {}
DelayedTaskInfo(TimeDelta delay,absl::AnyInvocable<void ()&&> task)83   DelayedTaskInfo(TimeDelta delay, absl::AnyInvocable<void() &&> task)
84       : due_time_(CurrentTime() + delay), task_(std::move(task)) {}
85   DelayedTaskInfo(DelayedTaskInfo&&) = default;
86 
87   // Implement for priority_queue.
operator >(const DelayedTaskInfo & other) const88   bool operator>(const DelayedTaskInfo& other) const {
89     return due_time_ > other.due_time_;
90   }
91 
92   // Required by priority_queue::pop().
93   DelayedTaskInfo& operator=(DelayedTaskInfo&& other) = default;
94 
95   // See below for why this method is const.
Run() const96   void Run() const {
97     RTC_DCHECK(task_);
98     std::move(task_)();
99   }
100 
due_time() const101   Timestamp due_time() const { return due_time_; }
102 
103  private:
104   Timestamp due_time_ = Timestamp::Zero();
105 
106   // `task` needs to be mutable because std::priority_queue::top() returns
107   // a const reference and a key in an ordered queue must not be changed.
108   // There are two basic workarounds, one using const_cast, which would also
109   // make the key (`due_time`), non-const and the other is to make the non-key
110   // (`task`), mutable.
111   // Because of this, the `task` variable is made private and can only be
112   // mutated by calling the `Run()` method.
113   mutable absl::AnyInvocable<void() &&> task_;
114 };
115 
116 class MultimediaTimer {
117  public:
118   // Note: We create an event that requires manual reset.
MultimediaTimer()119   MultimediaTimer() : event_(::CreateEvent(nullptr, true, false, nullptr)) {}
120 
~MultimediaTimer()121   ~MultimediaTimer() {
122     Cancel();
123     ::CloseHandle(event_);
124   }
125 
126   MultimediaTimer(const MultimediaTimer&) = delete;
127   MultimediaTimer& operator=(const MultimediaTimer&) = delete;
128 
StartOneShotTimer(UINT delay_ms)129   bool StartOneShotTimer(UINT delay_ms) {
130     RTC_DCHECK_EQ(0, timer_id_);
131     RTC_DCHECK(event_ != nullptr);
132     timer_id_ =
133         ::timeSetEvent(delay_ms, 0, reinterpret_cast<LPTIMECALLBACK>(event_), 0,
134                        TIME_ONESHOT | TIME_CALLBACK_EVENT_SET);
135     return timer_id_ != 0;
136   }
137 
Cancel()138   void Cancel() {
139     if (timer_id_) {
140       ::timeKillEvent(timer_id_);
141       timer_id_ = 0;
142     }
143     // Now that timer is killed and not able to set the event, reset the event.
144     // Doing it in opposite order is racy because event may be set between
145     // event was reset and timer is killed leaving MultimediaTimer in surprising
146     // state where both event is set and timer is canceled.
147     ::ResetEvent(event_);
148   }
149 
event_for_wait()150   HANDLE* event_for_wait() { return &event_; }
151 
152  private:
153   HANDLE event_ = nullptr;
154   MMRESULT timer_id_ = 0;
155 };
156 
157 class TaskQueueWin : public TaskQueueBase {
158  public:
159   TaskQueueWin(absl::string_view queue_name, rtc::ThreadPriority priority);
160   ~TaskQueueWin() override = default;
161 
162   void Delete() override;
163   void PostTask(absl::AnyInvocable<void() &&> task) override;
164   void PostDelayedTask(absl::AnyInvocable<void() &&> task,
165                        TimeDelta delay) override;
166   void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
167                                     TimeDelta delay) override;
168   void RunPendingTasks();
169 
170  private:
171   void RunThreadMain();
172   bool ProcessQueuedMessages();
173   void RunDueTasks();
174   void ScheduleNextTimer();
175   void CancelTimers();
176 
177   MultimediaTimer timer_;
178   // Since priority_queue<> by defult orders items in terms of
179   // largest->smallest, using std::less<>, and we want smallest->largest,
180   // we would like to use std::greater<> here.
181   std::priority_queue<DelayedTaskInfo,
182                       std::vector<DelayedTaskInfo>,
183                       std::greater<DelayedTaskInfo>>
184       timer_tasks_;
185   UINT_PTR timer_id_ = 0;
186   rtc::PlatformThread thread_;
187   Mutex pending_lock_;
188   std::queue<absl::AnyInvocable<void() &&>> pending_
189       RTC_GUARDED_BY(pending_lock_);
190   HANDLE in_queue_;
191 };
192 
TaskQueueWin(absl::string_view queue_name,rtc::ThreadPriority priority)193 TaskQueueWin::TaskQueueWin(absl::string_view queue_name,
194                            rtc::ThreadPriority priority)
195     : in_queue_(::CreateEvent(nullptr, true, false, nullptr)) {
196   RTC_DCHECK(in_queue_);
197   thread_ = rtc::PlatformThread::SpawnJoinable(
198       [this] { RunThreadMain(); }, queue_name,
199       rtc::ThreadAttributes().SetPriority(priority));
200 
201   rtc::Event event(false, false);
202   RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread,
203                              reinterpret_cast<ULONG_PTR>(&event)));
204   event.Wait(rtc::Event::kForever);
205 }
206 
Delete()207 void TaskQueueWin::Delete() {
208   RTC_DCHECK(!IsCurrent());
209   RTC_CHECK(thread_.GetHandle() != absl::nullopt);
210   while (
211       !::PostThreadMessage(GetThreadId(*thread_.GetHandle()), WM_QUIT, 0, 0)) {
212     RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError());
213     Sleep(1);
214   }
215   thread_.Finalize();
216   ::CloseHandle(in_queue_);
217   delete this;
218 }
219 
PostTask(absl::AnyInvocable<void ()&&> task)220 void TaskQueueWin::PostTask(absl::AnyInvocable<void() &&> task) {
221   MutexLock lock(&pending_lock_);
222   pending_.push(std::move(task));
223   ::SetEvent(in_queue_);
224 }
225 
PostDelayedTask(absl::AnyInvocable<void ()&&> task,TimeDelta delay)226 void TaskQueueWin::PostDelayedTask(absl::AnyInvocable<void() &&> task,
227                                    TimeDelta delay) {
228   if (delay <= TimeDelta::Zero()) {
229     PostTask(std::move(task));
230     return;
231   }
232 
233   auto* task_info = new DelayedTaskInfo(delay, std::move(task));
234   RTC_CHECK(thread_.GetHandle() != absl::nullopt);
235   if (!::PostThreadMessage(GetThreadId(*thread_.GetHandle()),
236                            WM_QUEUE_DELAYED_TASK, 0,
237                            reinterpret_cast<LPARAM>(task_info))) {
238     delete task_info;
239   }
240 }
241 
PostDelayedHighPrecisionTask(absl::AnyInvocable<void ()&&> task,TimeDelta delay)242 void TaskQueueWin::PostDelayedHighPrecisionTask(
243     absl::AnyInvocable<void() &&> task,
244     TimeDelta delay) {
245   PostDelayedTask(std::move(task), delay);
246 }
247 
RunPendingTasks()248 void TaskQueueWin::RunPendingTasks() {
249   while (true) {
250     absl::AnyInvocable<void() &&> task;
251     {
252       MutexLock lock(&pending_lock_);
253       if (pending_.empty())
254         break;
255       task = std::move(pending_.front());
256       pending_.pop();
257     }
258 
259     std::move(task)();
260   }
261 }
262 
RunThreadMain()263 void TaskQueueWin::RunThreadMain() {
264   CurrentTaskQueueSetter set_current(this);
265   HANDLE handles[2] = {*timer_.event_for_wait(), in_queue_};
266   while (true) {
267     // Make sure we do an alertable wait as that's required to allow APCs to run
268     // (e.g. required for InitializeQueueThread and stopping the thread in
269     // PlatformThread).
270     DWORD result = ::MsgWaitForMultipleObjectsEx(
271         arraysize(handles), handles, INFINITE, QS_ALLEVENTS, MWMO_ALERTABLE);
272     RTC_CHECK_NE(WAIT_FAILED, result);
273     if (result == (WAIT_OBJECT_0 + 2)) {
274       // There are messages in the message queue that need to be handled.
275       if (!ProcessQueuedMessages())
276         break;
277     }
278 
279     if (result == WAIT_OBJECT_0 ||
280         (!timer_tasks_.empty() &&
281          ::WaitForSingleObject(*timer_.event_for_wait(), 0) == WAIT_OBJECT_0)) {
282       // The multimedia timer was signaled.
283       timer_.Cancel();
284       RunDueTasks();
285       ScheduleNextTimer();
286     }
287 
288     if (result == (WAIT_OBJECT_0 + 1)) {
289       ::ResetEvent(in_queue_);
290       RunPendingTasks();
291     }
292   }
293 }
294 
ProcessQueuedMessages()295 bool TaskQueueWin::ProcessQueuedMessages() {
296   MSG msg = {};
297   // To protect against overly busy message queues, we limit the time
298   // we process tasks to a few milliseconds. If we don't do that, there's
299   // a chance that timer tasks won't ever run.
300   static constexpr TimeDelta kMaxTaskProcessingTime = TimeDelta::Millis(500);
301   Timestamp start = CurrentTime();
302   while (::PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) &&
303          msg.message != WM_QUIT) {
304     if (!msg.hwnd) {
305       switch (msg.message) {
306         case WM_QUEUE_DELAYED_TASK: {
307           std::unique_ptr<DelayedTaskInfo> info(
308               reinterpret_cast<DelayedTaskInfo*>(msg.lParam));
309           bool need_to_schedule_timers =
310               timer_tasks_.empty() ||
311               timer_tasks_.top().due_time() > info->due_time();
312           timer_tasks_.push(std::move(*info));
313           if (need_to_schedule_timers) {
314             CancelTimers();
315             ScheduleNextTimer();
316           }
317           break;
318         }
319         case WM_TIMER: {
320           RTC_DCHECK_EQ(timer_id_, msg.wParam);
321           ::KillTimer(nullptr, msg.wParam);
322           timer_id_ = 0;
323           RunDueTasks();
324           ScheduleNextTimer();
325           break;
326         }
327         default:
328           RTC_DCHECK_NOTREACHED();
329           break;
330       }
331     } else {
332       ::TranslateMessage(&msg);
333       ::DispatchMessage(&msg);
334     }
335 
336     if (CurrentTime() > start + kMaxTaskProcessingTime)
337       break;
338   }
339   return msg.message != WM_QUIT;
340 }
341 
RunDueTasks()342 void TaskQueueWin::RunDueTasks() {
343   RTC_DCHECK(!timer_tasks_.empty());
344   Timestamp now = CurrentTime();
345   do {
346     const auto& top = timer_tasks_.top();
347     if (top.due_time() > now)
348       break;
349     top.Run();
350     timer_tasks_.pop();
351   } while (!timer_tasks_.empty());
352 }
353 
ScheduleNextTimer()354 void TaskQueueWin::ScheduleNextTimer() {
355   RTC_DCHECK_EQ(timer_id_, 0);
356   if (timer_tasks_.empty())
357     return;
358 
359   const auto& next_task = timer_tasks_.top();
360   TimeDelta delay =
361       std::max(TimeDelta::Zero(), next_task.due_time() - CurrentTime());
362   uint32_t milliseconds = delay.RoundUpTo(TimeDelta::Millis(1)).ms<uint32_t>();
363   if (!timer_.StartOneShotTimer(milliseconds))
364     timer_id_ = ::SetTimer(nullptr, 0, milliseconds, nullptr);
365 }
366 
CancelTimers()367 void TaskQueueWin::CancelTimers() {
368   timer_.Cancel();
369   if (timer_id_) {
370     ::KillTimer(nullptr, timer_id_);
371     timer_id_ = 0;
372   }
373 }
374 
375 class TaskQueueWinFactory : public TaskQueueFactory {
376  public:
CreateTaskQueue(absl::string_view name,Priority priority) const377   std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
378       absl::string_view name,
379       Priority priority) const override {
380     return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
381         new TaskQueueWin(name, TaskQueuePriorityToThreadPriority(priority)));
382   }
383 };
384 
385 }  // namespace
386 
CreateTaskQueueWinFactory()387 std::unique_ptr<TaskQueueFactory> CreateTaskQueueWinFactory() {
388   return std::make_unique<TaskQueueWinFactory>();
389 }
390 
391 }  // namespace webrtc
392