xref: /aosp_15_r20/external/webrtc/rtc_base/thread.cc (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1*d9f75844SAndroid Build Coastguard Worker /*
2*d9f75844SAndroid Build Coastguard Worker  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3*d9f75844SAndroid Build Coastguard Worker  *
4*d9f75844SAndroid Build Coastguard Worker  *  Use of this source code is governed by a BSD-style license
5*d9f75844SAndroid Build Coastguard Worker  *  that can be found in the LICENSE file in the root of the source
6*d9f75844SAndroid Build Coastguard Worker  *  tree. An additional intellectual property rights grant can be found
7*d9f75844SAndroid Build Coastguard Worker  *  in the file PATENTS.  All contributing project authors may
8*d9f75844SAndroid Build Coastguard Worker  *  be found in the AUTHORS file in the root of the source tree.
9*d9f75844SAndroid Build Coastguard Worker  */
10*d9f75844SAndroid Build Coastguard Worker 
11*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/thread.h"
12*d9f75844SAndroid Build Coastguard Worker 
13*d9f75844SAndroid Build Coastguard Worker #include "absl/strings/string_view.h"
14*d9f75844SAndroid Build Coastguard Worker #include "api/units/time_delta.h"
15*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/socket_server.h"
16*d9f75844SAndroid Build Coastguard Worker 
17*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_WIN)
18*d9f75844SAndroid Build Coastguard Worker #include <comdef.h>
19*d9f75844SAndroid Build Coastguard Worker #elif defined(WEBRTC_POSIX)
20*d9f75844SAndroid Build Coastguard Worker #include <time.h>
21*d9f75844SAndroid Build Coastguard Worker #else
22*d9f75844SAndroid Build Coastguard Worker #error "Either WEBRTC_WIN or WEBRTC_POSIX needs to be defined."
23*d9f75844SAndroid Build Coastguard Worker #endif
24*d9f75844SAndroid Build Coastguard Worker 
25*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_WIN)
26*d9f75844SAndroid Build Coastguard Worker // Disable warning that we don't care about:
27*d9f75844SAndroid Build Coastguard Worker // warning C4722: destructor never returns, potential memory leak
28*d9f75844SAndroid Build Coastguard Worker #pragma warning(disable : 4722)
29*d9f75844SAndroid Build Coastguard Worker #endif
30*d9f75844SAndroid Build Coastguard Worker 
31*d9f75844SAndroid Build Coastguard Worker #include <stdio.h>
32*d9f75844SAndroid Build Coastguard Worker 
33*d9f75844SAndroid Build Coastguard Worker #include <utility>
34*d9f75844SAndroid Build Coastguard Worker 
35*d9f75844SAndroid Build Coastguard Worker #include "absl/algorithm/container.h"
36*d9f75844SAndroid Build Coastguard Worker #include "absl/cleanup/cleanup.h"
37*d9f75844SAndroid Build Coastguard Worker #include "api/sequence_checker.h"
38*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/checks.h"
39*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/deprecated/recursive_critical_section.h"
40*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/event.h"
41*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/internal/default_socket_server.h"
42*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/logging.h"
43*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/null_socket_server.h"
44*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/synchronization/mutex.h"
45*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/time_utils.h"
46*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/trace_event.h"
47*d9f75844SAndroid Build Coastguard Worker 
48*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_MAC)
49*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/system/cocoa_threading.h"
50*d9f75844SAndroid Build Coastguard Worker 
51*d9f75844SAndroid Build Coastguard Worker /*
52*d9f75844SAndroid Build Coastguard Worker  * These are forward-declarations for methods that are part of the
53*d9f75844SAndroid Build Coastguard Worker  * ObjC runtime. They are declared in the private header objc-internal.h.
54*d9f75844SAndroid Build Coastguard Worker  * These calls are what clang inserts when using @autoreleasepool in ObjC,
55*d9f75844SAndroid Build Coastguard Worker  * but here they are used directly in order to keep this file C++.
56*d9f75844SAndroid Build Coastguard Worker  * https://clang.llvm.org/docs/AutomaticReferenceCounting.html#runtime-support
57*d9f75844SAndroid Build Coastguard Worker  */
58*d9f75844SAndroid Build Coastguard Worker extern "C" {
59*d9f75844SAndroid Build Coastguard Worker void* objc_autoreleasePoolPush(void);
60*d9f75844SAndroid Build Coastguard Worker void objc_autoreleasePoolPop(void* pool);
61*d9f75844SAndroid Build Coastguard Worker }
62*d9f75844SAndroid Build Coastguard Worker 
63*d9f75844SAndroid Build Coastguard Worker namespace {
64*d9f75844SAndroid Build Coastguard Worker class ScopedAutoReleasePool {
65*d9f75844SAndroid Build Coastguard Worker  public:
ScopedAutoReleasePool()66*d9f75844SAndroid Build Coastguard Worker   ScopedAutoReleasePool() : pool_(objc_autoreleasePoolPush()) {}
~ScopedAutoReleasePool()67*d9f75844SAndroid Build Coastguard Worker   ~ScopedAutoReleasePool() { objc_autoreleasePoolPop(pool_); }
68*d9f75844SAndroid Build Coastguard Worker 
69*d9f75844SAndroid Build Coastguard Worker  private:
70*d9f75844SAndroid Build Coastguard Worker   void* const pool_;
71*d9f75844SAndroid Build Coastguard Worker };
72*d9f75844SAndroid Build Coastguard Worker }  // namespace
73*d9f75844SAndroid Build Coastguard Worker #endif
74*d9f75844SAndroid Build Coastguard Worker 
75*d9f75844SAndroid Build Coastguard Worker namespace rtc {
76*d9f75844SAndroid Build Coastguard Worker namespace {
77*d9f75844SAndroid Build Coastguard Worker 
78*d9f75844SAndroid Build Coastguard Worker using ::webrtc::MutexLock;
79*d9f75844SAndroid Build Coastguard Worker using ::webrtc::TimeDelta;
80*d9f75844SAndroid Build Coastguard Worker 
81*d9f75844SAndroid Build Coastguard Worker class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
82*d9f75844SAndroid Build Coastguard Worker  public:
MarkProcessingCritScope(const RecursiveCriticalSection * cs,size_t * processing)83*d9f75844SAndroid Build Coastguard Worker   MarkProcessingCritScope(const RecursiveCriticalSection* cs,
84*d9f75844SAndroid Build Coastguard Worker                           size_t* processing) RTC_EXCLUSIVE_LOCK_FUNCTION(cs)
85*d9f75844SAndroid Build Coastguard Worker       : cs_(cs), processing_(processing) {
86*d9f75844SAndroid Build Coastguard Worker     cs_->Enter();
87*d9f75844SAndroid Build Coastguard Worker     *processing_ += 1;
88*d9f75844SAndroid Build Coastguard Worker   }
89*d9f75844SAndroid Build Coastguard Worker 
RTC_UNLOCK_FUNCTION()90*d9f75844SAndroid Build Coastguard Worker   ~MarkProcessingCritScope() RTC_UNLOCK_FUNCTION() {
91*d9f75844SAndroid Build Coastguard Worker     *processing_ -= 1;
92*d9f75844SAndroid Build Coastguard Worker     cs_->Leave();
93*d9f75844SAndroid Build Coastguard Worker   }
94*d9f75844SAndroid Build Coastguard Worker 
95*d9f75844SAndroid Build Coastguard Worker   MarkProcessingCritScope(const MarkProcessingCritScope&) = delete;
96*d9f75844SAndroid Build Coastguard Worker   MarkProcessingCritScope& operator=(const MarkProcessingCritScope&) = delete;
97*d9f75844SAndroid Build Coastguard Worker 
98*d9f75844SAndroid Build Coastguard Worker  private:
99*d9f75844SAndroid Build Coastguard Worker   const RecursiveCriticalSection* const cs_;
100*d9f75844SAndroid Build Coastguard Worker   size_t* processing_;
101*d9f75844SAndroid Build Coastguard Worker };
102*d9f75844SAndroid Build Coastguard Worker 
103*d9f75844SAndroid Build Coastguard Worker }  // namespace
104*d9f75844SAndroid Build Coastguard Worker 
Instance()105*d9f75844SAndroid Build Coastguard Worker ThreadManager* ThreadManager::Instance() {
106*d9f75844SAndroid Build Coastguard Worker   static ThreadManager* const thread_manager = new ThreadManager();
107*d9f75844SAndroid Build Coastguard Worker   return thread_manager;
108*d9f75844SAndroid Build Coastguard Worker }
109*d9f75844SAndroid Build Coastguard Worker 
~ThreadManager()110*d9f75844SAndroid Build Coastguard Worker ThreadManager::~ThreadManager() {
111*d9f75844SAndroid Build Coastguard Worker   // By above RTC_DEFINE_STATIC_LOCAL.
112*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK_NOTREACHED() << "ThreadManager should never be destructed.";
113*d9f75844SAndroid Build Coastguard Worker }
114*d9f75844SAndroid Build Coastguard Worker 
115*d9f75844SAndroid Build Coastguard Worker // static
Add(Thread * message_queue)116*d9f75844SAndroid Build Coastguard Worker void ThreadManager::Add(Thread* message_queue) {
117*d9f75844SAndroid Build Coastguard Worker   return Instance()->AddInternal(message_queue);
118*d9f75844SAndroid Build Coastguard Worker }
AddInternal(Thread * message_queue)119*d9f75844SAndroid Build Coastguard Worker void ThreadManager::AddInternal(Thread* message_queue) {
120*d9f75844SAndroid Build Coastguard Worker   CritScope cs(&crit_);
121*d9f75844SAndroid Build Coastguard Worker   // Prevent changes while the list of message queues is processed.
122*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK_EQ(processing_, 0);
123*d9f75844SAndroid Build Coastguard Worker   message_queues_.push_back(message_queue);
124*d9f75844SAndroid Build Coastguard Worker }
125*d9f75844SAndroid Build Coastguard Worker 
126*d9f75844SAndroid Build Coastguard Worker // static
Remove(Thread * message_queue)127*d9f75844SAndroid Build Coastguard Worker void ThreadManager::Remove(Thread* message_queue) {
128*d9f75844SAndroid Build Coastguard Worker   return Instance()->RemoveInternal(message_queue);
129*d9f75844SAndroid Build Coastguard Worker }
RemoveInternal(Thread * message_queue)130*d9f75844SAndroid Build Coastguard Worker void ThreadManager::RemoveInternal(Thread* message_queue) {
131*d9f75844SAndroid Build Coastguard Worker   {
132*d9f75844SAndroid Build Coastguard Worker     CritScope cs(&crit_);
133*d9f75844SAndroid Build Coastguard Worker     // Prevent changes while the list of message queues is processed.
134*d9f75844SAndroid Build Coastguard Worker     RTC_DCHECK_EQ(processing_, 0);
135*d9f75844SAndroid Build Coastguard Worker     std::vector<Thread*>::iterator iter;
136*d9f75844SAndroid Build Coastguard Worker     iter = absl::c_find(message_queues_, message_queue);
137*d9f75844SAndroid Build Coastguard Worker     if (iter != message_queues_.end()) {
138*d9f75844SAndroid Build Coastguard Worker       message_queues_.erase(iter);
139*d9f75844SAndroid Build Coastguard Worker     }
140*d9f75844SAndroid Build Coastguard Worker #if RTC_DCHECK_IS_ON
141*d9f75844SAndroid Build Coastguard Worker     RemoveFromSendGraph(message_queue);
142*d9f75844SAndroid Build Coastguard Worker #endif
143*d9f75844SAndroid Build Coastguard Worker   }
144*d9f75844SAndroid Build Coastguard Worker }
145*d9f75844SAndroid Build Coastguard Worker 
146*d9f75844SAndroid Build Coastguard Worker #if RTC_DCHECK_IS_ON
RemoveFromSendGraph(Thread * thread)147*d9f75844SAndroid Build Coastguard Worker void ThreadManager::RemoveFromSendGraph(Thread* thread) {
148*d9f75844SAndroid Build Coastguard Worker   for (auto it = send_graph_.begin(); it != send_graph_.end();) {
149*d9f75844SAndroid Build Coastguard Worker     if (it->first == thread) {
150*d9f75844SAndroid Build Coastguard Worker       it = send_graph_.erase(it);
151*d9f75844SAndroid Build Coastguard Worker     } else {
152*d9f75844SAndroid Build Coastguard Worker       it->second.erase(thread);
153*d9f75844SAndroid Build Coastguard Worker       ++it;
154*d9f75844SAndroid Build Coastguard Worker     }
155*d9f75844SAndroid Build Coastguard Worker   }
156*d9f75844SAndroid Build Coastguard Worker }
157*d9f75844SAndroid Build Coastguard Worker 
RegisterSendAndCheckForCycles(Thread * source,Thread * target)158*d9f75844SAndroid Build Coastguard Worker void ThreadManager::RegisterSendAndCheckForCycles(Thread* source,
159*d9f75844SAndroid Build Coastguard Worker                                                   Thread* target) {
160*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(source);
161*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(target);
162*d9f75844SAndroid Build Coastguard Worker 
163*d9f75844SAndroid Build Coastguard Worker   CritScope cs(&crit_);
164*d9f75844SAndroid Build Coastguard Worker   std::deque<Thread*> all_targets({target});
165*d9f75844SAndroid Build Coastguard Worker   // We check the pre-existing who-sends-to-who graph for any path from target
166*d9f75844SAndroid Build Coastguard Worker   // to source. This loop is guaranteed to terminate because per the send graph
167*d9f75844SAndroid Build Coastguard Worker   // invariant, there are no cycles in the graph.
168*d9f75844SAndroid Build Coastguard Worker   for (size_t i = 0; i < all_targets.size(); i++) {
169*d9f75844SAndroid Build Coastguard Worker     const auto& targets = send_graph_[all_targets[i]];
170*d9f75844SAndroid Build Coastguard Worker     all_targets.insert(all_targets.end(), targets.begin(), targets.end());
171*d9f75844SAndroid Build Coastguard Worker   }
172*d9f75844SAndroid Build Coastguard Worker   RTC_CHECK_EQ(absl::c_count(all_targets, source), 0)
173*d9f75844SAndroid Build Coastguard Worker       << " send loop between " << source->name() << " and " << target->name();
174*d9f75844SAndroid Build Coastguard Worker 
175*d9f75844SAndroid Build Coastguard Worker   // We may now insert source -> target without creating a cycle, since there
176*d9f75844SAndroid Build Coastguard Worker   // was no path from target to source per the prior CHECK.
177*d9f75844SAndroid Build Coastguard Worker   send_graph_[source].insert(target);
178*d9f75844SAndroid Build Coastguard Worker }
179*d9f75844SAndroid Build Coastguard Worker #endif
180*d9f75844SAndroid Build Coastguard Worker 
181*d9f75844SAndroid Build Coastguard Worker // static
ProcessAllMessageQueuesForTesting()182*d9f75844SAndroid Build Coastguard Worker void ThreadManager::ProcessAllMessageQueuesForTesting() {
183*d9f75844SAndroid Build Coastguard Worker   return Instance()->ProcessAllMessageQueuesInternal();
184*d9f75844SAndroid Build Coastguard Worker }
185*d9f75844SAndroid Build Coastguard Worker 
ProcessAllMessageQueuesInternal()186*d9f75844SAndroid Build Coastguard Worker void ThreadManager::ProcessAllMessageQueuesInternal() {
187*d9f75844SAndroid Build Coastguard Worker   // This works by posting a delayed message at the current time and waiting
188*d9f75844SAndroid Build Coastguard Worker   // for it to be dispatched on all queues, which will ensure that all messages
189*d9f75844SAndroid Build Coastguard Worker   // that came before it were also dispatched.
190*d9f75844SAndroid Build Coastguard Worker   std::atomic<int> queues_not_done(0);
191*d9f75844SAndroid Build Coastguard Worker 
192*d9f75844SAndroid Build Coastguard Worker   {
193*d9f75844SAndroid Build Coastguard Worker     MarkProcessingCritScope cs(&crit_, &processing_);
194*d9f75844SAndroid Build Coastguard Worker     for (Thread* queue : message_queues_) {
195*d9f75844SAndroid Build Coastguard Worker       if (!queue->IsProcessingMessagesForTesting()) {
196*d9f75844SAndroid Build Coastguard Worker         // If the queue is not processing messages, it can
197*d9f75844SAndroid Build Coastguard Worker         // be ignored. If we tried to post a message to it, it would be dropped
198*d9f75844SAndroid Build Coastguard Worker         // or ignored.
199*d9f75844SAndroid Build Coastguard Worker         continue;
200*d9f75844SAndroid Build Coastguard Worker       }
201*d9f75844SAndroid Build Coastguard Worker       queues_not_done.fetch_add(1);
202*d9f75844SAndroid Build Coastguard Worker       // Whether the task is processed, or the thread is simply cleared,
203*d9f75844SAndroid Build Coastguard Worker       // queues_not_done gets decremented.
204*d9f75844SAndroid Build Coastguard Worker       absl::Cleanup sub = [&queues_not_done] { queues_not_done.fetch_sub(1); };
205*d9f75844SAndroid Build Coastguard Worker       // Post delayed task instead of regular task to wait for all delayed tasks
206*d9f75844SAndroid Build Coastguard Worker       // that are ready for processing.
207*d9f75844SAndroid Build Coastguard Worker       queue->PostDelayedTask([sub = std::move(sub)] {}, TimeDelta::Zero());
208*d9f75844SAndroid Build Coastguard Worker     }
209*d9f75844SAndroid Build Coastguard Worker   }
210*d9f75844SAndroid Build Coastguard Worker 
211*d9f75844SAndroid Build Coastguard Worker   rtc::Thread* current = rtc::Thread::Current();
212*d9f75844SAndroid Build Coastguard Worker   // Note: One of the message queues may have been on this thread, which is
213*d9f75844SAndroid Build Coastguard Worker   // why we can't synchronously wait for queues_not_done to go to 0; we need
214*d9f75844SAndroid Build Coastguard Worker   // to process messages as well.
215*d9f75844SAndroid Build Coastguard Worker   while (queues_not_done.load() > 0) {
216*d9f75844SAndroid Build Coastguard Worker     if (current) {
217*d9f75844SAndroid Build Coastguard Worker       current->ProcessMessages(0);
218*d9f75844SAndroid Build Coastguard Worker     }
219*d9f75844SAndroid Build Coastguard Worker   }
220*d9f75844SAndroid Build Coastguard Worker }
221*d9f75844SAndroid Build Coastguard Worker 
222*d9f75844SAndroid Build Coastguard Worker // static
Current()223*d9f75844SAndroid Build Coastguard Worker Thread* Thread::Current() {
224*d9f75844SAndroid Build Coastguard Worker   ThreadManager* manager = ThreadManager::Instance();
225*d9f75844SAndroid Build Coastguard Worker   Thread* thread = manager->CurrentThread();
226*d9f75844SAndroid Build Coastguard Worker 
227*d9f75844SAndroid Build Coastguard Worker   return thread;
228*d9f75844SAndroid Build Coastguard Worker }
229*d9f75844SAndroid Build Coastguard Worker 
230*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_POSIX)
ThreadManager()231*d9f75844SAndroid Build Coastguard Worker ThreadManager::ThreadManager() {
232*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_MAC)
233*d9f75844SAndroid Build Coastguard Worker   InitCocoaMultiThreading();
234*d9f75844SAndroid Build Coastguard Worker #endif
235*d9f75844SAndroid Build Coastguard Worker   pthread_key_create(&key_, nullptr);
236*d9f75844SAndroid Build Coastguard Worker }
237*d9f75844SAndroid Build Coastguard Worker 
CurrentThread()238*d9f75844SAndroid Build Coastguard Worker Thread* ThreadManager::CurrentThread() {
239*d9f75844SAndroid Build Coastguard Worker   return static_cast<Thread*>(pthread_getspecific(key_));
240*d9f75844SAndroid Build Coastguard Worker }
241*d9f75844SAndroid Build Coastguard Worker 
SetCurrentThreadInternal(Thread * thread)242*d9f75844SAndroid Build Coastguard Worker void ThreadManager::SetCurrentThreadInternal(Thread* thread) {
243*d9f75844SAndroid Build Coastguard Worker   pthread_setspecific(key_, thread);
244*d9f75844SAndroid Build Coastguard Worker }
245*d9f75844SAndroid Build Coastguard Worker #endif
246*d9f75844SAndroid Build Coastguard Worker 
247*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_WIN)
ThreadManager()248*d9f75844SAndroid Build Coastguard Worker ThreadManager::ThreadManager() : key_(TlsAlloc()) {}
249*d9f75844SAndroid Build Coastguard Worker 
CurrentThread()250*d9f75844SAndroid Build Coastguard Worker Thread* ThreadManager::CurrentThread() {
251*d9f75844SAndroid Build Coastguard Worker   return static_cast<Thread*>(TlsGetValue(key_));
252*d9f75844SAndroid Build Coastguard Worker }
253*d9f75844SAndroid Build Coastguard Worker 
SetCurrentThreadInternal(Thread * thread)254*d9f75844SAndroid Build Coastguard Worker void ThreadManager::SetCurrentThreadInternal(Thread* thread) {
255*d9f75844SAndroid Build Coastguard Worker   TlsSetValue(key_, thread);
256*d9f75844SAndroid Build Coastguard Worker }
257*d9f75844SAndroid Build Coastguard Worker #endif
258*d9f75844SAndroid Build Coastguard Worker 
SetCurrentThread(Thread * thread)259*d9f75844SAndroid Build Coastguard Worker void ThreadManager::SetCurrentThread(Thread* thread) {
260*d9f75844SAndroid Build Coastguard Worker #if RTC_DLOG_IS_ON
261*d9f75844SAndroid Build Coastguard Worker   if (CurrentThread() && thread) {
262*d9f75844SAndroid Build Coastguard Worker     RTC_DLOG(LS_ERROR) << "SetCurrentThread: Overwriting an existing value?";
263*d9f75844SAndroid Build Coastguard Worker   }
264*d9f75844SAndroid Build Coastguard Worker #endif  // RTC_DLOG_IS_ON
265*d9f75844SAndroid Build Coastguard Worker 
266*d9f75844SAndroid Build Coastguard Worker   if (thread) {
267*d9f75844SAndroid Build Coastguard Worker     thread->EnsureIsCurrentTaskQueue();
268*d9f75844SAndroid Build Coastguard Worker   } else {
269*d9f75844SAndroid Build Coastguard Worker     Thread* current = CurrentThread();
270*d9f75844SAndroid Build Coastguard Worker     if (current) {
271*d9f75844SAndroid Build Coastguard Worker       // The current thread is being cleared, e.g. as a result of
272*d9f75844SAndroid Build Coastguard Worker       // UnwrapCurrent() being called or when a thread is being stopped
273*d9f75844SAndroid Build Coastguard Worker       // (see PreRun()). This signals that the Thread instance is being detached
274*d9f75844SAndroid Build Coastguard Worker       // from the thread, which also means that TaskQueue::Current() must not
275*d9f75844SAndroid Build Coastguard Worker       // return a pointer to the Thread instance.
276*d9f75844SAndroid Build Coastguard Worker       current->ClearCurrentTaskQueue();
277*d9f75844SAndroid Build Coastguard Worker     }
278*d9f75844SAndroid Build Coastguard Worker   }
279*d9f75844SAndroid Build Coastguard Worker 
280*d9f75844SAndroid Build Coastguard Worker   SetCurrentThreadInternal(thread);
281*d9f75844SAndroid Build Coastguard Worker }
282*d9f75844SAndroid Build Coastguard Worker 
ChangeCurrentThreadForTest(rtc::Thread * thread)283*d9f75844SAndroid Build Coastguard Worker void rtc::ThreadManager::ChangeCurrentThreadForTest(rtc::Thread* thread) {
284*d9f75844SAndroid Build Coastguard Worker   SetCurrentThreadInternal(thread);
285*d9f75844SAndroid Build Coastguard Worker }
286*d9f75844SAndroid Build Coastguard Worker 
WrapCurrentThread()287*d9f75844SAndroid Build Coastguard Worker Thread* ThreadManager::WrapCurrentThread() {
288*d9f75844SAndroid Build Coastguard Worker   Thread* result = CurrentThread();
289*d9f75844SAndroid Build Coastguard Worker   if (nullptr == result) {
290*d9f75844SAndroid Build Coastguard Worker     result = new Thread(CreateDefaultSocketServer());
291*d9f75844SAndroid Build Coastguard Worker     result->WrapCurrentWithThreadManager(this, true);
292*d9f75844SAndroid Build Coastguard Worker   }
293*d9f75844SAndroid Build Coastguard Worker   return result;
294*d9f75844SAndroid Build Coastguard Worker }
295*d9f75844SAndroid Build Coastguard Worker 
UnwrapCurrentThread()296*d9f75844SAndroid Build Coastguard Worker void ThreadManager::UnwrapCurrentThread() {
297*d9f75844SAndroid Build Coastguard Worker   Thread* t = CurrentThread();
298*d9f75844SAndroid Build Coastguard Worker   if (t && !(t->IsOwned())) {
299*d9f75844SAndroid Build Coastguard Worker     t->UnwrapCurrent();
300*d9f75844SAndroid Build Coastguard Worker     delete t;
301*d9f75844SAndroid Build Coastguard Worker   }
302*d9f75844SAndroid Build Coastguard Worker }
303*d9f75844SAndroid Build Coastguard Worker 
ScopedDisallowBlockingCalls()304*d9f75844SAndroid Build Coastguard Worker Thread::ScopedDisallowBlockingCalls::ScopedDisallowBlockingCalls()
305*d9f75844SAndroid Build Coastguard Worker     : thread_(Thread::Current()),
306*d9f75844SAndroid Build Coastguard Worker       previous_state_(thread_->SetAllowBlockingCalls(false)) {}
307*d9f75844SAndroid Build Coastguard Worker 
~ScopedDisallowBlockingCalls()308*d9f75844SAndroid Build Coastguard Worker Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() {
309*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(thread_->IsCurrent());
310*d9f75844SAndroid Build Coastguard Worker   thread_->SetAllowBlockingCalls(previous_state_);
311*d9f75844SAndroid Build Coastguard Worker }
312*d9f75844SAndroid Build Coastguard Worker 
313*d9f75844SAndroid Build Coastguard Worker #if RTC_DCHECK_IS_ON
ScopedCountBlockingCalls(std::function<void (uint32_t,uint32_t)> callback)314*d9f75844SAndroid Build Coastguard Worker Thread::ScopedCountBlockingCalls::ScopedCountBlockingCalls(
315*d9f75844SAndroid Build Coastguard Worker     std::function<void(uint32_t, uint32_t)> callback)
316*d9f75844SAndroid Build Coastguard Worker     : thread_(Thread::Current()),
317*d9f75844SAndroid Build Coastguard Worker       base_blocking_call_count_(thread_->GetBlockingCallCount()),
318*d9f75844SAndroid Build Coastguard Worker       base_could_be_blocking_call_count_(
319*d9f75844SAndroid Build Coastguard Worker           thread_->GetCouldBeBlockingCallCount()),
320*d9f75844SAndroid Build Coastguard Worker       result_callback_(std::move(callback)) {}
321*d9f75844SAndroid Build Coastguard Worker 
~ScopedCountBlockingCalls()322*d9f75844SAndroid Build Coastguard Worker Thread::ScopedCountBlockingCalls::~ScopedCountBlockingCalls() {
323*d9f75844SAndroid Build Coastguard Worker   if (GetTotalBlockedCallCount() >= min_blocking_calls_for_callback_) {
324*d9f75844SAndroid Build Coastguard Worker     result_callback_(GetBlockingCallCount(), GetCouldBeBlockingCallCount());
325*d9f75844SAndroid Build Coastguard Worker   }
326*d9f75844SAndroid Build Coastguard Worker }
327*d9f75844SAndroid Build Coastguard Worker 
GetBlockingCallCount() const328*d9f75844SAndroid Build Coastguard Worker uint32_t Thread::ScopedCountBlockingCalls::GetBlockingCallCount() const {
329*d9f75844SAndroid Build Coastguard Worker   return thread_->GetBlockingCallCount() - base_blocking_call_count_;
330*d9f75844SAndroid Build Coastguard Worker }
331*d9f75844SAndroid Build Coastguard Worker 
GetCouldBeBlockingCallCount() const332*d9f75844SAndroid Build Coastguard Worker uint32_t Thread::ScopedCountBlockingCalls::GetCouldBeBlockingCallCount() const {
333*d9f75844SAndroid Build Coastguard Worker   return thread_->GetCouldBeBlockingCallCount() -
334*d9f75844SAndroid Build Coastguard Worker          base_could_be_blocking_call_count_;
335*d9f75844SAndroid Build Coastguard Worker }
336*d9f75844SAndroid Build Coastguard Worker 
GetTotalBlockedCallCount() const337*d9f75844SAndroid Build Coastguard Worker uint32_t Thread::ScopedCountBlockingCalls::GetTotalBlockedCallCount() const {
338*d9f75844SAndroid Build Coastguard Worker   return GetBlockingCallCount() + GetCouldBeBlockingCallCount();
339*d9f75844SAndroid Build Coastguard Worker }
340*d9f75844SAndroid Build Coastguard Worker #endif
341*d9f75844SAndroid Build Coastguard Worker 
Thread(SocketServer * ss)342*d9f75844SAndroid Build Coastguard Worker Thread::Thread(SocketServer* ss) : Thread(ss, /*do_init=*/true) {}
343*d9f75844SAndroid Build Coastguard Worker 
Thread(std::unique_ptr<SocketServer> ss)344*d9f75844SAndroid Build Coastguard Worker Thread::Thread(std::unique_ptr<SocketServer> ss)
345*d9f75844SAndroid Build Coastguard Worker     : Thread(std::move(ss), /*do_init=*/true) {}
346*d9f75844SAndroid Build Coastguard Worker 
Thread(SocketServer * ss,bool do_init)347*d9f75844SAndroid Build Coastguard Worker Thread::Thread(SocketServer* ss, bool do_init)
348*d9f75844SAndroid Build Coastguard Worker     : delayed_next_num_(0),
349*d9f75844SAndroid Build Coastguard Worker       fInitialized_(false),
350*d9f75844SAndroid Build Coastguard Worker       fDestroyed_(false),
351*d9f75844SAndroid Build Coastguard Worker       stop_(0),
352*d9f75844SAndroid Build Coastguard Worker       ss_(ss) {
353*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(ss);
354*d9f75844SAndroid Build Coastguard Worker   ss_->SetMessageQueue(this);
355*d9f75844SAndroid Build Coastguard Worker   SetName("Thread", this);  // default name
356*d9f75844SAndroid Build Coastguard Worker   if (do_init) {
357*d9f75844SAndroid Build Coastguard Worker     DoInit();
358*d9f75844SAndroid Build Coastguard Worker   }
359*d9f75844SAndroid Build Coastguard Worker }
360*d9f75844SAndroid Build Coastguard Worker 
Thread(std::unique_ptr<SocketServer> ss,bool do_init)361*d9f75844SAndroid Build Coastguard Worker Thread::Thread(std::unique_ptr<SocketServer> ss, bool do_init)
362*d9f75844SAndroid Build Coastguard Worker     : Thread(ss.get(), do_init) {
363*d9f75844SAndroid Build Coastguard Worker   own_ss_ = std::move(ss);
364*d9f75844SAndroid Build Coastguard Worker }
365*d9f75844SAndroid Build Coastguard Worker 
~Thread()366*d9f75844SAndroid Build Coastguard Worker Thread::~Thread() {
367*d9f75844SAndroid Build Coastguard Worker   Stop();
368*d9f75844SAndroid Build Coastguard Worker   DoDestroy();
369*d9f75844SAndroid Build Coastguard Worker }
370*d9f75844SAndroid Build Coastguard Worker 
DoInit()371*d9f75844SAndroid Build Coastguard Worker void Thread::DoInit() {
372*d9f75844SAndroid Build Coastguard Worker   if (fInitialized_) {
373*d9f75844SAndroid Build Coastguard Worker     return;
374*d9f75844SAndroid Build Coastguard Worker   }
375*d9f75844SAndroid Build Coastguard Worker 
376*d9f75844SAndroid Build Coastguard Worker   fInitialized_ = true;
377*d9f75844SAndroid Build Coastguard Worker   ThreadManager::Add(this);
378*d9f75844SAndroid Build Coastguard Worker }
379*d9f75844SAndroid Build Coastguard Worker 
DoDestroy()380*d9f75844SAndroid Build Coastguard Worker void Thread::DoDestroy() {
381*d9f75844SAndroid Build Coastguard Worker   if (fDestroyed_) {
382*d9f75844SAndroid Build Coastguard Worker     return;
383*d9f75844SAndroid Build Coastguard Worker   }
384*d9f75844SAndroid Build Coastguard Worker 
385*d9f75844SAndroid Build Coastguard Worker   fDestroyed_ = true;
386*d9f75844SAndroid Build Coastguard Worker   // The signal is done from here to ensure
387*d9f75844SAndroid Build Coastguard Worker   // that it always gets called when the queue
388*d9f75844SAndroid Build Coastguard Worker   // is going away.
389*d9f75844SAndroid Build Coastguard Worker   if (ss_) {
390*d9f75844SAndroid Build Coastguard Worker     ss_->SetMessageQueue(nullptr);
391*d9f75844SAndroid Build Coastguard Worker   }
392*d9f75844SAndroid Build Coastguard Worker   ThreadManager::Remove(this);
393*d9f75844SAndroid Build Coastguard Worker   // Clear.
394*d9f75844SAndroid Build Coastguard Worker   messages_ = {};
395*d9f75844SAndroid Build Coastguard Worker   delayed_messages_ = {};
396*d9f75844SAndroid Build Coastguard Worker }
397*d9f75844SAndroid Build Coastguard Worker 
socketserver()398*d9f75844SAndroid Build Coastguard Worker SocketServer* Thread::socketserver() {
399*d9f75844SAndroid Build Coastguard Worker   return ss_;
400*d9f75844SAndroid Build Coastguard Worker }
401*d9f75844SAndroid Build Coastguard Worker 
WakeUpSocketServer()402*d9f75844SAndroid Build Coastguard Worker void Thread::WakeUpSocketServer() {
403*d9f75844SAndroid Build Coastguard Worker   ss_->WakeUp();
404*d9f75844SAndroid Build Coastguard Worker }
405*d9f75844SAndroid Build Coastguard Worker 
Quit()406*d9f75844SAndroid Build Coastguard Worker void Thread::Quit() {
407*d9f75844SAndroid Build Coastguard Worker   stop_.store(1, std::memory_order_release);
408*d9f75844SAndroid Build Coastguard Worker   WakeUpSocketServer();
409*d9f75844SAndroid Build Coastguard Worker }
410*d9f75844SAndroid Build Coastguard Worker 
IsQuitting()411*d9f75844SAndroid Build Coastguard Worker bool Thread::IsQuitting() {
412*d9f75844SAndroid Build Coastguard Worker   return stop_.load(std::memory_order_acquire) != 0;
413*d9f75844SAndroid Build Coastguard Worker }
414*d9f75844SAndroid Build Coastguard Worker 
Restart()415*d9f75844SAndroid Build Coastguard Worker void Thread::Restart() {
416*d9f75844SAndroid Build Coastguard Worker   stop_.store(0, std::memory_order_release);
417*d9f75844SAndroid Build Coastguard Worker }
418*d9f75844SAndroid Build Coastguard Worker 
Get(int cmsWait)419*d9f75844SAndroid Build Coastguard Worker absl::AnyInvocable<void() &&> Thread::Get(int cmsWait) {
420*d9f75844SAndroid Build Coastguard Worker   // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
421*d9f75844SAndroid Build Coastguard Worker 
422*d9f75844SAndroid Build Coastguard Worker   int64_t cmsTotal = cmsWait;
423*d9f75844SAndroid Build Coastguard Worker   int64_t cmsElapsed = 0;
424*d9f75844SAndroid Build Coastguard Worker   int64_t msStart = TimeMillis();
425*d9f75844SAndroid Build Coastguard Worker   int64_t msCurrent = msStart;
426*d9f75844SAndroid Build Coastguard Worker   while (true) {
427*d9f75844SAndroid Build Coastguard Worker     // Check for posted events
428*d9f75844SAndroid Build Coastguard Worker     int64_t cmsDelayNext = kForever;
429*d9f75844SAndroid Build Coastguard Worker     {
430*d9f75844SAndroid Build Coastguard Worker       // All queue operations need to be locked, but nothing else in this loop
431*d9f75844SAndroid Build Coastguard Worker       // can happen while holding the `mutex_`.
432*d9f75844SAndroid Build Coastguard Worker       MutexLock lock(&mutex_);
433*d9f75844SAndroid Build Coastguard Worker       // Check for delayed messages that have been triggered and calculate the
434*d9f75844SAndroid Build Coastguard Worker       // next trigger time.
435*d9f75844SAndroid Build Coastguard Worker       while (!delayed_messages_.empty()) {
436*d9f75844SAndroid Build Coastguard Worker         if (msCurrent < delayed_messages_.top().run_time_ms) {
437*d9f75844SAndroid Build Coastguard Worker           cmsDelayNext =
438*d9f75844SAndroid Build Coastguard Worker               TimeDiff(delayed_messages_.top().run_time_ms, msCurrent);
439*d9f75844SAndroid Build Coastguard Worker           break;
440*d9f75844SAndroid Build Coastguard Worker         }
441*d9f75844SAndroid Build Coastguard Worker         messages_.push(std::move(delayed_messages_.top().functor));
442*d9f75844SAndroid Build Coastguard Worker         delayed_messages_.pop();
443*d9f75844SAndroid Build Coastguard Worker       }
444*d9f75844SAndroid Build Coastguard Worker       // Pull a message off the message queue, if available.
445*d9f75844SAndroid Build Coastguard Worker       if (!messages_.empty()) {
446*d9f75844SAndroid Build Coastguard Worker         absl::AnyInvocable<void()&&> task = std::move(messages_.front());
447*d9f75844SAndroid Build Coastguard Worker         messages_.pop();
448*d9f75844SAndroid Build Coastguard Worker         return task;
449*d9f75844SAndroid Build Coastguard Worker       }
450*d9f75844SAndroid Build Coastguard Worker     }
451*d9f75844SAndroid Build Coastguard Worker 
452*d9f75844SAndroid Build Coastguard Worker     if (IsQuitting())
453*d9f75844SAndroid Build Coastguard Worker       break;
454*d9f75844SAndroid Build Coastguard Worker 
455*d9f75844SAndroid Build Coastguard Worker     // Which is shorter, the delay wait or the asked wait?
456*d9f75844SAndroid Build Coastguard Worker 
457*d9f75844SAndroid Build Coastguard Worker     int64_t cmsNext;
458*d9f75844SAndroid Build Coastguard Worker     if (cmsWait == kForever) {
459*d9f75844SAndroid Build Coastguard Worker       cmsNext = cmsDelayNext;
460*d9f75844SAndroid Build Coastguard Worker     } else {
461*d9f75844SAndroid Build Coastguard Worker       cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
462*d9f75844SAndroid Build Coastguard Worker       if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
463*d9f75844SAndroid Build Coastguard Worker         cmsNext = cmsDelayNext;
464*d9f75844SAndroid Build Coastguard Worker     }
465*d9f75844SAndroid Build Coastguard Worker 
466*d9f75844SAndroid Build Coastguard Worker     {
467*d9f75844SAndroid Build Coastguard Worker       // Wait and multiplex in the meantime
468*d9f75844SAndroid Build Coastguard Worker       if (!ss_->Wait(cmsNext == kForever ? SocketServer::kForever
469*d9f75844SAndroid Build Coastguard Worker                                          : webrtc::TimeDelta::Millis(cmsNext),
470*d9f75844SAndroid Build Coastguard Worker                      /*process_io=*/true))
471*d9f75844SAndroid Build Coastguard Worker         return nullptr;
472*d9f75844SAndroid Build Coastguard Worker     }
473*d9f75844SAndroid Build Coastguard Worker 
474*d9f75844SAndroid Build Coastguard Worker     // If the specified timeout expired, return
475*d9f75844SAndroid Build Coastguard Worker 
476*d9f75844SAndroid Build Coastguard Worker     msCurrent = TimeMillis();
477*d9f75844SAndroid Build Coastguard Worker     cmsElapsed = TimeDiff(msCurrent, msStart);
478*d9f75844SAndroid Build Coastguard Worker     if (cmsWait != kForever) {
479*d9f75844SAndroid Build Coastguard Worker       if (cmsElapsed >= cmsWait)
480*d9f75844SAndroid Build Coastguard Worker         return nullptr;
481*d9f75844SAndroid Build Coastguard Worker     }
482*d9f75844SAndroid Build Coastguard Worker   }
483*d9f75844SAndroid Build Coastguard Worker   return nullptr;
484*d9f75844SAndroid Build Coastguard Worker }
485*d9f75844SAndroid Build Coastguard Worker 
PostTask(absl::AnyInvocable<void ()&&> task)486*d9f75844SAndroid Build Coastguard Worker void Thread::PostTask(absl::AnyInvocable<void() &&> task) {
487*d9f75844SAndroid Build Coastguard Worker   if (IsQuitting()) {
488*d9f75844SAndroid Build Coastguard Worker     return;
489*d9f75844SAndroid Build Coastguard Worker   }
490*d9f75844SAndroid Build Coastguard Worker 
491*d9f75844SAndroid Build Coastguard Worker   // Keep thread safe
492*d9f75844SAndroid Build Coastguard Worker   // Add the message to the end of the queue
493*d9f75844SAndroid Build Coastguard Worker   // Signal for the multiplexer to return
494*d9f75844SAndroid Build Coastguard Worker 
495*d9f75844SAndroid Build Coastguard Worker   {
496*d9f75844SAndroid Build Coastguard Worker     MutexLock lock(&mutex_);
497*d9f75844SAndroid Build Coastguard Worker     messages_.push(std::move(task));
498*d9f75844SAndroid Build Coastguard Worker   }
499*d9f75844SAndroid Build Coastguard Worker   WakeUpSocketServer();
500*d9f75844SAndroid Build Coastguard Worker }
501*d9f75844SAndroid Build Coastguard Worker 
PostDelayedHighPrecisionTask(absl::AnyInvocable<void ()&&> task,webrtc::TimeDelta delay)502*d9f75844SAndroid Build Coastguard Worker void Thread::PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
503*d9f75844SAndroid Build Coastguard Worker                                           webrtc::TimeDelta delay) {
504*d9f75844SAndroid Build Coastguard Worker   if (IsQuitting()) {
505*d9f75844SAndroid Build Coastguard Worker     return;
506*d9f75844SAndroid Build Coastguard Worker   }
507*d9f75844SAndroid Build Coastguard Worker 
508*d9f75844SAndroid Build Coastguard Worker   // Keep thread safe
509*d9f75844SAndroid Build Coastguard Worker   // Add to the priority queue. Gets sorted soonest first.
510*d9f75844SAndroid Build Coastguard Worker   // Signal for the multiplexer to return.
511*d9f75844SAndroid Build Coastguard Worker 
512*d9f75844SAndroid Build Coastguard Worker   int64_t delay_ms = delay.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms<int>();
513*d9f75844SAndroid Build Coastguard Worker   int64_t run_time_ms = TimeAfter(delay_ms);
514*d9f75844SAndroid Build Coastguard Worker   {
515*d9f75844SAndroid Build Coastguard Worker     MutexLock lock(&mutex_);
516*d9f75844SAndroid Build Coastguard Worker     delayed_messages_.push({.delay_ms = delay_ms,
517*d9f75844SAndroid Build Coastguard Worker                             .run_time_ms = run_time_ms,
518*d9f75844SAndroid Build Coastguard Worker                             .message_number = delayed_next_num_,
519*d9f75844SAndroid Build Coastguard Worker                             .functor = std::move(task)});
520*d9f75844SAndroid Build Coastguard Worker     // If this message queue processes 1 message every millisecond for 50 days,
521*d9f75844SAndroid Build Coastguard Worker     // we will wrap this number.  Even then, only messages with identical times
522*d9f75844SAndroid Build Coastguard Worker     // will be misordered, and then only briefly.  This is probably ok.
523*d9f75844SAndroid Build Coastguard Worker     ++delayed_next_num_;
524*d9f75844SAndroid Build Coastguard Worker     RTC_DCHECK_NE(0, delayed_next_num_);
525*d9f75844SAndroid Build Coastguard Worker   }
526*d9f75844SAndroid Build Coastguard Worker   WakeUpSocketServer();
527*d9f75844SAndroid Build Coastguard Worker }
528*d9f75844SAndroid Build Coastguard Worker 
GetDelay()529*d9f75844SAndroid Build Coastguard Worker int Thread::GetDelay() {
530*d9f75844SAndroid Build Coastguard Worker   MutexLock lock(&mutex_);
531*d9f75844SAndroid Build Coastguard Worker 
532*d9f75844SAndroid Build Coastguard Worker   if (!messages_.empty())
533*d9f75844SAndroid Build Coastguard Worker     return 0;
534*d9f75844SAndroid Build Coastguard Worker 
535*d9f75844SAndroid Build Coastguard Worker   if (!delayed_messages_.empty()) {
536*d9f75844SAndroid Build Coastguard Worker     int delay = TimeUntil(delayed_messages_.top().run_time_ms);
537*d9f75844SAndroid Build Coastguard Worker     if (delay < 0)
538*d9f75844SAndroid Build Coastguard Worker       delay = 0;
539*d9f75844SAndroid Build Coastguard Worker     return delay;
540*d9f75844SAndroid Build Coastguard Worker   }
541*d9f75844SAndroid Build Coastguard Worker 
542*d9f75844SAndroid Build Coastguard Worker   return kForever;
543*d9f75844SAndroid Build Coastguard Worker }
544*d9f75844SAndroid Build Coastguard Worker 
Dispatch(absl::AnyInvocable<void ()&&> task)545*d9f75844SAndroid Build Coastguard Worker void Thread::Dispatch(absl::AnyInvocable<void() &&> task) {
546*d9f75844SAndroid Build Coastguard Worker   TRACE_EVENT0("webrtc", "Thread::Dispatch");
547*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK_RUN_ON(this);
548*d9f75844SAndroid Build Coastguard Worker   int64_t start_time = TimeMillis();
549*d9f75844SAndroid Build Coastguard Worker   std::move(task)();
550*d9f75844SAndroid Build Coastguard Worker   int64_t end_time = TimeMillis();
551*d9f75844SAndroid Build Coastguard Worker   int64_t diff = TimeDiff(end_time, start_time);
552*d9f75844SAndroid Build Coastguard Worker   if (diff >= dispatch_warning_ms_) {
553*d9f75844SAndroid Build Coastguard Worker     RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
554*d9f75844SAndroid Build Coastguard Worker                      << "ms to dispatch.";
555*d9f75844SAndroid Build Coastguard Worker     // To avoid log spew, move the warning limit to only give warning
556*d9f75844SAndroid Build Coastguard Worker     // for delays that are larger than the one observed.
557*d9f75844SAndroid Build Coastguard Worker     dispatch_warning_ms_ = diff + 1;
558*d9f75844SAndroid Build Coastguard Worker   }
559*d9f75844SAndroid Build Coastguard Worker }
560*d9f75844SAndroid Build Coastguard Worker 
IsCurrent() const561*d9f75844SAndroid Build Coastguard Worker bool Thread::IsCurrent() const {
562*d9f75844SAndroid Build Coastguard Worker   return ThreadManager::Instance()->CurrentThread() == this;
563*d9f75844SAndroid Build Coastguard Worker }
564*d9f75844SAndroid Build Coastguard Worker 
CreateWithSocketServer()565*d9f75844SAndroid Build Coastguard Worker std::unique_ptr<Thread> Thread::CreateWithSocketServer() {
566*d9f75844SAndroid Build Coastguard Worker   return std::unique_ptr<Thread>(new Thread(CreateDefaultSocketServer()));
567*d9f75844SAndroid Build Coastguard Worker }
568*d9f75844SAndroid Build Coastguard Worker 
Create()569*d9f75844SAndroid Build Coastguard Worker std::unique_ptr<Thread> Thread::Create() {
570*d9f75844SAndroid Build Coastguard Worker   return std::unique_ptr<Thread>(
571*d9f75844SAndroid Build Coastguard Worker       new Thread(std::unique_ptr<SocketServer>(new NullSocketServer())));
572*d9f75844SAndroid Build Coastguard Worker }
573*d9f75844SAndroid Build Coastguard Worker 
SleepMs(int milliseconds)574*d9f75844SAndroid Build Coastguard Worker bool Thread::SleepMs(int milliseconds) {
575*d9f75844SAndroid Build Coastguard Worker   AssertBlockingIsAllowedOnCurrentThread();
576*d9f75844SAndroid Build Coastguard Worker 
577*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_WIN)
578*d9f75844SAndroid Build Coastguard Worker   ::Sleep(milliseconds);
579*d9f75844SAndroid Build Coastguard Worker   return true;
580*d9f75844SAndroid Build Coastguard Worker #else
581*d9f75844SAndroid Build Coastguard Worker   // POSIX has both a usleep() and a nanosleep(), but the former is deprecated,
582*d9f75844SAndroid Build Coastguard Worker   // so we use nanosleep() even though it has greater precision than necessary.
583*d9f75844SAndroid Build Coastguard Worker   struct timespec ts;
584*d9f75844SAndroid Build Coastguard Worker   ts.tv_sec = milliseconds / 1000;
585*d9f75844SAndroid Build Coastguard Worker   ts.tv_nsec = (milliseconds % 1000) * 1000000;
586*d9f75844SAndroid Build Coastguard Worker   int ret = nanosleep(&ts, nullptr);
587*d9f75844SAndroid Build Coastguard Worker   if (ret != 0) {
588*d9f75844SAndroid Build Coastguard Worker     RTC_LOG_ERR(LS_WARNING) << "nanosleep() returning early";
589*d9f75844SAndroid Build Coastguard Worker     return false;
590*d9f75844SAndroid Build Coastguard Worker   }
591*d9f75844SAndroid Build Coastguard Worker   return true;
592*d9f75844SAndroid Build Coastguard Worker #endif
593*d9f75844SAndroid Build Coastguard Worker }
594*d9f75844SAndroid Build Coastguard Worker 
SetName(absl::string_view name,const void * obj)595*d9f75844SAndroid Build Coastguard Worker bool Thread::SetName(absl::string_view name, const void* obj) {
596*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(!IsRunning());
597*d9f75844SAndroid Build Coastguard Worker 
598*d9f75844SAndroid Build Coastguard Worker   name_ = std::string(name);
599*d9f75844SAndroid Build Coastguard Worker   if (obj) {
600*d9f75844SAndroid Build Coastguard Worker     // The %p specifier typically produce at most 16 hex digits, possibly with a
601*d9f75844SAndroid Build Coastguard Worker     // 0x prefix. But format is implementation defined, so add some margin.
602*d9f75844SAndroid Build Coastguard Worker     char buf[30];
603*d9f75844SAndroid Build Coastguard Worker     snprintf(buf, sizeof(buf), " 0x%p", obj);
604*d9f75844SAndroid Build Coastguard Worker     name_ += buf;
605*d9f75844SAndroid Build Coastguard Worker   }
606*d9f75844SAndroid Build Coastguard Worker   return true;
607*d9f75844SAndroid Build Coastguard Worker }
608*d9f75844SAndroid Build Coastguard Worker 
SetDispatchWarningMs(int deadline)609*d9f75844SAndroid Build Coastguard Worker void Thread::SetDispatchWarningMs(int deadline) {
610*d9f75844SAndroid Build Coastguard Worker   if (!IsCurrent()) {
611*d9f75844SAndroid Build Coastguard Worker     PostTask([this, deadline]() { SetDispatchWarningMs(deadline); });
612*d9f75844SAndroid Build Coastguard Worker     return;
613*d9f75844SAndroid Build Coastguard Worker   }
614*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK_RUN_ON(this);
615*d9f75844SAndroid Build Coastguard Worker   dispatch_warning_ms_ = deadline;
616*d9f75844SAndroid Build Coastguard Worker }
617*d9f75844SAndroid Build Coastguard Worker 
Start()618*d9f75844SAndroid Build Coastguard Worker bool Thread::Start() {
619*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(!IsRunning());
620*d9f75844SAndroid Build Coastguard Worker 
621*d9f75844SAndroid Build Coastguard Worker   if (IsRunning())
622*d9f75844SAndroid Build Coastguard Worker     return false;
623*d9f75844SAndroid Build Coastguard Worker 
624*d9f75844SAndroid Build Coastguard Worker   Restart();  // reset IsQuitting() if the thread is being restarted
625*d9f75844SAndroid Build Coastguard Worker 
626*d9f75844SAndroid Build Coastguard Worker   // Make sure that ThreadManager is created on the main thread before
627*d9f75844SAndroid Build Coastguard Worker   // we start a new thread.
628*d9f75844SAndroid Build Coastguard Worker   ThreadManager::Instance();
629*d9f75844SAndroid Build Coastguard Worker 
630*d9f75844SAndroid Build Coastguard Worker   owned_ = true;
631*d9f75844SAndroid Build Coastguard Worker 
632*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_WIN)
633*d9f75844SAndroid Build Coastguard Worker   thread_ = CreateThread(nullptr, 0, PreRun, this, 0, &thread_id_);
634*d9f75844SAndroid Build Coastguard Worker   if (!thread_) {
635*d9f75844SAndroid Build Coastguard Worker     return false;
636*d9f75844SAndroid Build Coastguard Worker   }
637*d9f75844SAndroid Build Coastguard Worker #elif defined(WEBRTC_POSIX)
638*d9f75844SAndroid Build Coastguard Worker   pthread_attr_t attr;
639*d9f75844SAndroid Build Coastguard Worker   pthread_attr_init(&attr);
640*d9f75844SAndroid Build Coastguard Worker 
641*d9f75844SAndroid Build Coastguard Worker   int error_code = pthread_create(&thread_, &attr, PreRun, this);
642*d9f75844SAndroid Build Coastguard Worker   if (0 != error_code) {
643*d9f75844SAndroid Build Coastguard Worker     RTC_LOG(LS_ERROR) << "Unable to create pthread, error " << error_code;
644*d9f75844SAndroid Build Coastguard Worker     thread_ = 0;
645*d9f75844SAndroid Build Coastguard Worker     return false;
646*d9f75844SAndroid Build Coastguard Worker   }
647*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(thread_);
648*d9f75844SAndroid Build Coastguard Worker #endif
649*d9f75844SAndroid Build Coastguard Worker   return true;
650*d9f75844SAndroid Build Coastguard Worker }
651*d9f75844SAndroid Build Coastguard Worker 
WrapCurrent()652*d9f75844SAndroid Build Coastguard Worker bool Thread::WrapCurrent() {
653*d9f75844SAndroid Build Coastguard Worker   return WrapCurrentWithThreadManager(ThreadManager::Instance(), true);
654*d9f75844SAndroid Build Coastguard Worker }
655*d9f75844SAndroid Build Coastguard Worker 
UnwrapCurrent()656*d9f75844SAndroid Build Coastguard Worker void Thread::UnwrapCurrent() {
657*d9f75844SAndroid Build Coastguard Worker   // Clears the platform-specific thread-specific storage.
658*d9f75844SAndroid Build Coastguard Worker   ThreadManager::Instance()->SetCurrentThread(nullptr);
659*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_WIN)
660*d9f75844SAndroid Build Coastguard Worker   if (thread_ != nullptr) {
661*d9f75844SAndroid Build Coastguard Worker     if (!CloseHandle(thread_)) {
662*d9f75844SAndroid Build Coastguard Worker       RTC_LOG_GLE(LS_ERROR)
663*d9f75844SAndroid Build Coastguard Worker           << "When unwrapping thread, failed to close handle.";
664*d9f75844SAndroid Build Coastguard Worker     }
665*d9f75844SAndroid Build Coastguard Worker     thread_ = nullptr;
666*d9f75844SAndroid Build Coastguard Worker     thread_id_ = 0;
667*d9f75844SAndroid Build Coastguard Worker   }
668*d9f75844SAndroid Build Coastguard Worker #elif defined(WEBRTC_POSIX)
669*d9f75844SAndroid Build Coastguard Worker   thread_ = 0;
670*d9f75844SAndroid Build Coastguard Worker #endif
671*d9f75844SAndroid Build Coastguard Worker }
672*d9f75844SAndroid Build Coastguard Worker 
SafeWrapCurrent()673*d9f75844SAndroid Build Coastguard Worker void Thread::SafeWrapCurrent() {
674*d9f75844SAndroid Build Coastguard Worker   WrapCurrentWithThreadManager(ThreadManager::Instance(), false);
675*d9f75844SAndroid Build Coastguard Worker }
676*d9f75844SAndroid Build Coastguard Worker 
Join()677*d9f75844SAndroid Build Coastguard Worker void Thread::Join() {
678*d9f75844SAndroid Build Coastguard Worker   if (!IsRunning())
679*d9f75844SAndroid Build Coastguard Worker     return;
680*d9f75844SAndroid Build Coastguard Worker 
681*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(!IsCurrent());
682*d9f75844SAndroid Build Coastguard Worker   if (Current() && !Current()->blocking_calls_allowed_) {
683*d9f75844SAndroid Build Coastguard Worker     RTC_LOG(LS_WARNING) << "Waiting for the thread to join, "
684*d9f75844SAndroid Build Coastguard Worker                            "but blocking calls have been disallowed";
685*d9f75844SAndroid Build Coastguard Worker   }
686*d9f75844SAndroid Build Coastguard Worker 
687*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_WIN)
688*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(thread_ != nullptr);
689*d9f75844SAndroid Build Coastguard Worker   WaitForSingleObject(thread_, INFINITE);
690*d9f75844SAndroid Build Coastguard Worker   CloseHandle(thread_);
691*d9f75844SAndroid Build Coastguard Worker   thread_ = nullptr;
692*d9f75844SAndroid Build Coastguard Worker   thread_id_ = 0;
693*d9f75844SAndroid Build Coastguard Worker #elif defined(WEBRTC_POSIX)
694*d9f75844SAndroid Build Coastguard Worker   pthread_join(thread_, nullptr);
695*d9f75844SAndroid Build Coastguard Worker   thread_ = 0;
696*d9f75844SAndroid Build Coastguard Worker #endif
697*d9f75844SAndroid Build Coastguard Worker }
698*d9f75844SAndroid Build Coastguard Worker 
SetAllowBlockingCalls(bool allow)699*d9f75844SAndroid Build Coastguard Worker bool Thread::SetAllowBlockingCalls(bool allow) {
700*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(IsCurrent());
701*d9f75844SAndroid Build Coastguard Worker   bool previous = blocking_calls_allowed_;
702*d9f75844SAndroid Build Coastguard Worker   blocking_calls_allowed_ = allow;
703*d9f75844SAndroid Build Coastguard Worker   return previous;
704*d9f75844SAndroid Build Coastguard Worker }
705*d9f75844SAndroid Build Coastguard Worker 
706*d9f75844SAndroid Build Coastguard Worker // static
AssertBlockingIsAllowedOnCurrentThread()707*d9f75844SAndroid Build Coastguard Worker void Thread::AssertBlockingIsAllowedOnCurrentThread() {
708*d9f75844SAndroid Build Coastguard Worker #if !defined(NDEBUG)
709*d9f75844SAndroid Build Coastguard Worker   Thread* current = Thread::Current();
710*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(!current || current->blocking_calls_allowed_);
711*d9f75844SAndroid Build Coastguard Worker #endif
712*d9f75844SAndroid Build Coastguard Worker }
713*d9f75844SAndroid Build Coastguard Worker 
714*d9f75844SAndroid Build Coastguard Worker // static
715*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_WIN)
PreRun(LPVOID pv)716*d9f75844SAndroid Build Coastguard Worker DWORD WINAPI Thread::PreRun(LPVOID pv) {
717*d9f75844SAndroid Build Coastguard Worker #else
718*d9f75844SAndroid Build Coastguard Worker void* Thread::PreRun(void* pv) {
719*d9f75844SAndroid Build Coastguard Worker #endif
720*d9f75844SAndroid Build Coastguard Worker   Thread* thread = static_cast<Thread*>(pv);
721*d9f75844SAndroid Build Coastguard Worker   ThreadManager::Instance()->SetCurrentThread(thread);
722*d9f75844SAndroid Build Coastguard Worker   rtc::SetCurrentThreadName(thread->name_.c_str());
723*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_MAC)
724*d9f75844SAndroid Build Coastguard Worker   ScopedAutoReleasePool pool;
725*d9f75844SAndroid Build Coastguard Worker #endif
726*d9f75844SAndroid Build Coastguard Worker   thread->Run();
727*d9f75844SAndroid Build Coastguard Worker 
728*d9f75844SAndroid Build Coastguard Worker   ThreadManager::Instance()->SetCurrentThread(nullptr);
729*d9f75844SAndroid Build Coastguard Worker #ifdef WEBRTC_WIN
730*d9f75844SAndroid Build Coastguard Worker   return 0;
731*d9f75844SAndroid Build Coastguard Worker #else
732*d9f75844SAndroid Build Coastguard Worker   return nullptr;
733*d9f75844SAndroid Build Coastguard Worker #endif
734*d9f75844SAndroid Build Coastguard Worker }  // namespace rtc
735*d9f75844SAndroid Build Coastguard Worker 
736*d9f75844SAndroid Build Coastguard Worker void Thread::Run() {
737*d9f75844SAndroid Build Coastguard Worker   ProcessMessages(kForever);
738*d9f75844SAndroid Build Coastguard Worker }
739*d9f75844SAndroid Build Coastguard Worker 
740*d9f75844SAndroid Build Coastguard Worker bool Thread::IsOwned() {
741*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(IsRunning());
742*d9f75844SAndroid Build Coastguard Worker   return owned_;
743*d9f75844SAndroid Build Coastguard Worker }
744*d9f75844SAndroid Build Coastguard Worker 
745*d9f75844SAndroid Build Coastguard Worker void Thread::Stop() {
746*d9f75844SAndroid Build Coastguard Worker   Thread::Quit();
747*d9f75844SAndroid Build Coastguard Worker   Join();
748*d9f75844SAndroid Build Coastguard Worker }
749*d9f75844SAndroid Build Coastguard Worker 
750*d9f75844SAndroid Build Coastguard Worker void Thread::BlockingCall(rtc::FunctionView<void()> functor) {
751*d9f75844SAndroid Build Coastguard Worker   TRACE_EVENT0("webrtc", "Thread::BlockingCall");
752*d9f75844SAndroid Build Coastguard Worker 
753*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(!IsQuitting());
754*d9f75844SAndroid Build Coastguard Worker   if (IsQuitting())
755*d9f75844SAndroid Build Coastguard Worker     return;
756*d9f75844SAndroid Build Coastguard Worker 
757*d9f75844SAndroid Build Coastguard Worker   if (IsCurrent()) {
758*d9f75844SAndroid Build Coastguard Worker #if RTC_DCHECK_IS_ON
759*d9f75844SAndroid Build Coastguard Worker     RTC_DCHECK(this->IsInvokeToThreadAllowed(this));
760*d9f75844SAndroid Build Coastguard Worker     RTC_DCHECK_RUN_ON(this);
761*d9f75844SAndroid Build Coastguard Worker     could_be_blocking_call_count_++;
762*d9f75844SAndroid Build Coastguard Worker #endif
763*d9f75844SAndroid Build Coastguard Worker     functor();
764*d9f75844SAndroid Build Coastguard Worker     return;
765*d9f75844SAndroid Build Coastguard Worker   }
766*d9f75844SAndroid Build Coastguard Worker 
767*d9f75844SAndroid Build Coastguard Worker   AssertBlockingIsAllowedOnCurrentThread();
768*d9f75844SAndroid Build Coastguard Worker 
769*d9f75844SAndroid Build Coastguard Worker   Thread* current_thread = Thread::Current();
770*d9f75844SAndroid Build Coastguard Worker 
771*d9f75844SAndroid Build Coastguard Worker #if RTC_DCHECK_IS_ON
772*d9f75844SAndroid Build Coastguard Worker   if (current_thread) {
773*d9f75844SAndroid Build Coastguard Worker     RTC_DCHECK_RUN_ON(current_thread);
774*d9f75844SAndroid Build Coastguard Worker     current_thread->blocking_call_count_++;
775*d9f75844SAndroid Build Coastguard Worker     RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this));
776*d9f75844SAndroid Build Coastguard Worker     ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread,
777*d9f75844SAndroid Build Coastguard Worker                                                              this);
778*d9f75844SAndroid Build Coastguard Worker   }
779*d9f75844SAndroid Build Coastguard Worker #endif
780*d9f75844SAndroid Build Coastguard Worker 
781*d9f75844SAndroid Build Coastguard Worker   // Perhaps down the line we can get rid of this workaround and always require
782*d9f75844SAndroid Build Coastguard Worker   // current_thread to be valid when BlockingCall() is called.
783*d9f75844SAndroid Build Coastguard Worker   std::unique_ptr<rtc::Event> done_event;
784*d9f75844SAndroid Build Coastguard Worker   if (!current_thread)
785*d9f75844SAndroid Build Coastguard Worker     done_event.reset(new rtc::Event());
786*d9f75844SAndroid Build Coastguard Worker 
787*d9f75844SAndroid Build Coastguard Worker   bool ready = false;
788*d9f75844SAndroid Build Coastguard Worker   absl::Cleanup cleanup = [this, &ready, current_thread,
789*d9f75844SAndroid Build Coastguard Worker                            done = done_event.get()] {
790*d9f75844SAndroid Build Coastguard Worker     if (current_thread) {
791*d9f75844SAndroid Build Coastguard Worker       {
792*d9f75844SAndroid Build Coastguard Worker         MutexLock lock(&mutex_);
793*d9f75844SAndroid Build Coastguard Worker         ready = true;
794*d9f75844SAndroid Build Coastguard Worker       }
795*d9f75844SAndroid Build Coastguard Worker       current_thread->socketserver()->WakeUp();
796*d9f75844SAndroid Build Coastguard Worker     } else {
797*d9f75844SAndroid Build Coastguard Worker       done->Set();
798*d9f75844SAndroid Build Coastguard Worker     }
799*d9f75844SAndroid Build Coastguard Worker   };
800*d9f75844SAndroid Build Coastguard Worker   PostTask([functor, cleanup = std::move(cleanup)] { functor(); });
801*d9f75844SAndroid Build Coastguard Worker   if (current_thread) {
802*d9f75844SAndroid Build Coastguard Worker     bool waited = false;
803*d9f75844SAndroid Build Coastguard Worker     mutex_.Lock();
804*d9f75844SAndroid Build Coastguard Worker     while (!ready) {
805*d9f75844SAndroid Build Coastguard Worker       mutex_.Unlock();
806*d9f75844SAndroid Build Coastguard Worker       current_thread->socketserver()->Wait(SocketServer::kForever, false);
807*d9f75844SAndroid Build Coastguard Worker       waited = true;
808*d9f75844SAndroid Build Coastguard Worker       mutex_.Lock();
809*d9f75844SAndroid Build Coastguard Worker     }
810*d9f75844SAndroid Build Coastguard Worker     mutex_.Unlock();
811*d9f75844SAndroid Build Coastguard Worker 
812*d9f75844SAndroid Build Coastguard Worker     // Our Wait loop above may have consumed some WakeUp events for this
813*d9f75844SAndroid Build Coastguard Worker     // Thread, that weren't relevant to this Send.  Losing these WakeUps can
814*d9f75844SAndroid Build Coastguard Worker     // cause problems for some SocketServers.
815*d9f75844SAndroid Build Coastguard Worker     //
816*d9f75844SAndroid Build Coastguard Worker     // Concrete example:
817*d9f75844SAndroid Build Coastguard Worker     // Win32SocketServer on thread A calls Send on thread B.  While processing
818*d9f75844SAndroid Build Coastguard Worker     // the message, thread B Posts a message to A.  We consume the wakeup for
819*d9f75844SAndroid Build Coastguard Worker     // that Post while waiting for the Send to complete, which means that when
820*d9f75844SAndroid Build Coastguard Worker     // we exit this loop, we need to issue another WakeUp, or else the Posted
821*d9f75844SAndroid Build Coastguard Worker     // message won't be processed in a timely manner.
822*d9f75844SAndroid Build Coastguard Worker 
823*d9f75844SAndroid Build Coastguard Worker     if (waited) {
824*d9f75844SAndroid Build Coastguard Worker       current_thread->socketserver()->WakeUp();
825*d9f75844SAndroid Build Coastguard Worker     }
826*d9f75844SAndroid Build Coastguard Worker   } else {
827*d9f75844SAndroid Build Coastguard Worker     done_event->Wait(rtc::Event::kForever);
828*d9f75844SAndroid Build Coastguard Worker   }
829*d9f75844SAndroid Build Coastguard Worker }
830*d9f75844SAndroid Build Coastguard Worker 
831*d9f75844SAndroid Build Coastguard Worker // Called by the ThreadManager when being set as the current thread.
832*d9f75844SAndroid Build Coastguard Worker void Thread::EnsureIsCurrentTaskQueue() {
833*d9f75844SAndroid Build Coastguard Worker   task_queue_registration_ =
834*d9f75844SAndroid Build Coastguard Worker       std::make_unique<TaskQueueBase::CurrentTaskQueueSetter>(this);
835*d9f75844SAndroid Build Coastguard Worker }
836*d9f75844SAndroid Build Coastguard Worker 
837*d9f75844SAndroid Build Coastguard Worker // Called by the ThreadManager when being set as the current thread.
838*d9f75844SAndroid Build Coastguard Worker void Thread::ClearCurrentTaskQueue() {
839*d9f75844SAndroid Build Coastguard Worker   task_queue_registration_.reset();
840*d9f75844SAndroid Build Coastguard Worker }
841*d9f75844SAndroid Build Coastguard Worker 
842*d9f75844SAndroid Build Coastguard Worker void Thread::AllowInvokesToThread(Thread* thread) {
843*d9f75844SAndroid Build Coastguard Worker #if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
844*d9f75844SAndroid Build Coastguard Worker   if (!IsCurrent()) {
845*d9f75844SAndroid Build Coastguard Worker     PostTask([thread, this]() { AllowInvokesToThread(thread); });
846*d9f75844SAndroid Build Coastguard Worker     return;
847*d9f75844SAndroid Build Coastguard Worker   }
848*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK_RUN_ON(this);
849*d9f75844SAndroid Build Coastguard Worker   allowed_threads_.push_back(thread);
850*d9f75844SAndroid Build Coastguard Worker   invoke_policy_enabled_ = true;
851*d9f75844SAndroid Build Coastguard Worker #endif
852*d9f75844SAndroid Build Coastguard Worker }
853*d9f75844SAndroid Build Coastguard Worker 
854*d9f75844SAndroid Build Coastguard Worker void Thread::DisallowAllInvokes() {
855*d9f75844SAndroid Build Coastguard Worker #if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
856*d9f75844SAndroid Build Coastguard Worker   if (!IsCurrent()) {
857*d9f75844SAndroid Build Coastguard Worker     PostTask([this]() { DisallowAllInvokes(); });
858*d9f75844SAndroid Build Coastguard Worker     return;
859*d9f75844SAndroid Build Coastguard Worker   }
860*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK_RUN_ON(this);
861*d9f75844SAndroid Build Coastguard Worker   allowed_threads_.clear();
862*d9f75844SAndroid Build Coastguard Worker   invoke_policy_enabled_ = true;
863*d9f75844SAndroid Build Coastguard Worker #endif
864*d9f75844SAndroid Build Coastguard Worker }
865*d9f75844SAndroid Build Coastguard Worker 
866*d9f75844SAndroid Build Coastguard Worker #if RTC_DCHECK_IS_ON
867*d9f75844SAndroid Build Coastguard Worker uint32_t Thread::GetBlockingCallCount() const {
868*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK_RUN_ON(this);
869*d9f75844SAndroid Build Coastguard Worker   return blocking_call_count_;
870*d9f75844SAndroid Build Coastguard Worker }
871*d9f75844SAndroid Build Coastguard Worker uint32_t Thread::GetCouldBeBlockingCallCount() const {
872*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK_RUN_ON(this);
873*d9f75844SAndroid Build Coastguard Worker   return could_be_blocking_call_count_;
874*d9f75844SAndroid Build Coastguard Worker }
875*d9f75844SAndroid Build Coastguard Worker #endif
876*d9f75844SAndroid Build Coastguard Worker 
877*d9f75844SAndroid Build Coastguard Worker // Returns true if no policies added or if there is at least one policy
878*d9f75844SAndroid Build Coastguard Worker // that permits invocation to `target` thread.
879*d9f75844SAndroid Build Coastguard Worker bool Thread::IsInvokeToThreadAllowed(rtc::Thread* target) {
880*d9f75844SAndroid Build Coastguard Worker #if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
881*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK_RUN_ON(this);
882*d9f75844SAndroid Build Coastguard Worker   if (!invoke_policy_enabled_) {
883*d9f75844SAndroid Build Coastguard Worker     return true;
884*d9f75844SAndroid Build Coastguard Worker   }
885*d9f75844SAndroid Build Coastguard Worker   for (const auto* thread : allowed_threads_) {
886*d9f75844SAndroid Build Coastguard Worker     if (thread == target) {
887*d9f75844SAndroid Build Coastguard Worker       return true;
888*d9f75844SAndroid Build Coastguard Worker     }
889*d9f75844SAndroid Build Coastguard Worker   }
890*d9f75844SAndroid Build Coastguard Worker   return false;
891*d9f75844SAndroid Build Coastguard Worker #else
892*d9f75844SAndroid Build Coastguard Worker   return true;
893*d9f75844SAndroid Build Coastguard Worker #endif
894*d9f75844SAndroid Build Coastguard Worker }
895*d9f75844SAndroid Build Coastguard Worker 
896*d9f75844SAndroid Build Coastguard Worker void Thread::Delete() {
897*d9f75844SAndroid Build Coastguard Worker   Stop();
898*d9f75844SAndroid Build Coastguard Worker   delete this;
899*d9f75844SAndroid Build Coastguard Worker }
900*d9f75844SAndroid Build Coastguard Worker 
901*d9f75844SAndroid Build Coastguard Worker void Thread::PostDelayedTask(absl::AnyInvocable<void() &&> task,
902*d9f75844SAndroid Build Coastguard Worker                              webrtc::TimeDelta delay) {
903*d9f75844SAndroid Build Coastguard Worker   // This implementation does not support low precision yet.
904*d9f75844SAndroid Build Coastguard Worker   PostDelayedHighPrecisionTask(std::move(task), delay);
905*d9f75844SAndroid Build Coastguard Worker }
906*d9f75844SAndroid Build Coastguard Worker 
907*d9f75844SAndroid Build Coastguard Worker bool Thread::IsProcessingMessagesForTesting() {
908*d9f75844SAndroid Build Coastguard Worker   return (owned_ || IsCurrent()) && !IsQuitting();
909*d9f75844SAndroid Build Coastguard Worker }
910*d9f75844SAndroid Build Coastguard Worker 
911*d9f75844SAndroid Build Coastguard Worker bool Thread::ProcessMessages(int cmsLoop) {
912*d9f75844SAndroid Build Coastguard Worker   // Using ProcessMessages with a custom clock for testing and a time greater
913*d9f75844SAndroid Build Coastguard Worker   // than 0 doesn't work, since it's not guaranteed to advance the custom
914*d9f75844SAndroid Build Coastguard Worker   // clock's time, and may get stuck in an infinite loop.
915*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 ||
916*d9f75844SAndroid Build Coastguard Worker              cmsLoop == kForever);
917*d9f75844SAndroid Build Coastguard Worker   int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
918*d9f75844SAndroid Build Coastguard Worker   int cmsNext = cmsLoop;
919*d9f75844SAndroid Build Coastguard Worker 
920*d9f75844SAndroid Build Coastguard Worker   while (true) {
921*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_MAC)
922*d9f75844SAndroid Build Coastguard Worker     ScopedAutoReleasePool pool;
923*d9f75844SAndroid Build Coastguard Worker #endif
924*d9f75844SAndroid Build Coastguard Worker     absl::AnyInvocable<void()&&> task = Get(cmsNext);
925*d9f75844SAndroid Build Coastguard Worker     if (!task)
926*d9f75844SAndroid Build Coastguard Worker       return !IsQuitting();
927*d9f75844SAndroid Build Coastguard Worker     Dispatch(std::move(task));
928*d9f75844SAndroid Build Coastguard Worker 
929*d9f75844SAndroid Build Coastguard Worker     if (cmsLoop != kForever) {
930*d9f75844SAndroid Build Coastguard Worker       cmsNext = static_cast<int>(TimeUntil(msEnd));
931*d9f75844SAndroid Build Coastguard Worker       if (cmsNext < 0)
932*d9f75844SAndroid Build Coastguard Worker         return true;
933*d9f75844SAndroid Build Coastguard Worker     }
934*d9f75844SAndroid Build Coastguard Worker   }
935*d9f75844SAndroid Build Coastguard Worker }
936*d9f75844SAndroid Build Coastguard Worker 
937*d9f75844SAndroid Build Coastguard Worker bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager,
938*d9f75844SAndroid Build Coastguard Worker                                           bool need_synchronize_access) {
939*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(!IsRunning());
940*d9f75844SAndroid Build Coastguard Worker 
941*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_WIN)
942*d9f75844SAndroid Build Coastguard Worker   if (need_synchronize_access) {
943*d9f75844SAndroid Build Coastguard Worker     // We explicitly ask for no rights other than synchronization.
944*d9f75844SAndroid Build Coastguard Worker     // This gives us the best chance of succeeding.
945*d9f75844SAndroid Build Coastguard Worker     thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId());
946*d9f75844SAndroid Build Coastguard Worker     if (!thread_) {
947*d9f75844SAndroid Build Coastguard Worker       RTC_LOG_GLE(LS_ERROR) << "Unable to get handle to thread.";
948*d9f75844SAndroid Build Coastguard Worker       return false;
949*d9f75844SAndroid Build Coastguard Worker     }
950*d9f75844SAndroid Build Coastguard Worker     thread_id_ = GetCurrentThreadId();
951*d9f75844SAndroid Build Coastguard Worker   }
952*d9f75844SAndroid Build Coastguard Worker #elif defined(WEBRTC_POSIX)
953*d9f75844SAndroid Build Coastguard Worker   thread_ = pthread_self();
954*d9f75844SAndroid Build Coastguard Worker #endif
955*d9f75844SAndroid Build Coastguard Worker   owned_ = false;
956*d9f75844SAndroid Build Coastguard Worker   thread_manager->SetCurrentThread(this);
957*d9f75844SAndroid Build Coastguard Worker   return true;
958*d9f75844SAndroid Build Coastguard Worker }
959*d9f75844SAndroid Build Coastguard Worker 
960*d9f75844SAndroid Build Coastguard Worker bool Thread::IsRunning() {
961*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_WIN)
962*d9f75844SAndroid Build Coastguard Worker   return thread_ != nullptr;
963*d9f75844SAndroid Build Coastguard Worker #elif defined(WEBRTC_POSIX)
964*d9f75844SAndroid Build Coastguard Worker   return thread_ != 0;
965*d9f75844SAndroid Build Coastguard Worker #endif
966*d9f75844SAndroid Build Coastguard Worker }
967*d9f75844SAndroid Build Coastguard Worker 
968*d9f75844SAndroid Build Coastguard Worker AutoThread::AutoThread()
969*d9f75844SAndroid Build Coastguard Worker     : Thread(CreateDefaultSocketServer(), /*do_init=*/false) {
970*d9f75844SAndroid Build Coastguard Worker   if (!ThreadManager::Instance()->CurrentThread()) {
971*d9f75844SAndroid Build Coastguard Worker     // DoInit registers with ThreadManager. Do that only if we intend to
972*d9f75844SAndroid Build Coastguard Worker     // be rtc::Thread::Current(), otherwise ProcessAllMessageQueuesInternal will
973*d9f75844SAndroid Build Coastguard Worker     // post a message to a queue that no running thread is serving.
974*d9f75844SAndroid Build Coastguard Worker     DoInit();
975*d9f75844SAndroid Build Coastguard Worker     ThreadManager::Instance()->SetCurrentThread(this);
976*d9f75844SAndroid Build Coastguard Worker   }
977*d9f75844SAndroid Build Coastguard Worker }
978*d9f75844SAndroid Build Coastguard Worker 
979*d9f75844SAndroid Build Coastguard Worker AutoThread::~AutoThread() {
980*d9f75844SAndroid Build Coastguard Worker   Stop();
981*d9f75844SAndroid Build Coastguard Worker   DoDestroy();
982*d9f75844SAndroid Build Coastguard Worker   if (ThreadManager::Instance()->CurrentThread() == this) {
983*d9f75844SAndroid Build Coastguard Worker     ThreadManager::Instance()->SetCurrentThread(nullptr);
984*d9f75844SAndroid Build Coastguard Worker   }
985*d9f75844SAndroid Build Coastguard Worker }
986*d9f75844SAndroid Build Coastguard Worker 
987*d9f75844SAndroid Build Coastguard Worker AutoSocketServerThread::AutoSocketServerThread(SocketServer* ss)
988*d9f75844SAndroid Build Coastguard Worker     : Thread(ss, /*do_init=*/false) {
989*d9f75844SAndroid Build Coastguard Worker   DoInit();
990*d9f75844SAndroid Build Coastguard Worker   old_thread_ = ThreadManager::Instance()->CurrentThread();
991*d9f75844SAndroid Build Coastguard Worker   // Temporarily set the current thread to nullptr so that we can keep checks
992*d9f75844SAndroid Build Coastguard Worker   // around that catch unintentional pointer overwrites.
993*d9f75844SAndroid Build Coastguard Worker   rtc::ThreadManager::Instance()->SetCurrentThread(nullptr);
994*d9f75844SAndroid Build Coastguard Worker   rtc::ThreadManager::Instance()->SetCurrentThread(this);
995*d9f75844SAndroid Build Coastguard Worker   if (old_thread_) {
996*d9f75844SAndroid Build Coastguard Worker     ThreadManager::Remove(old_thread_);
997*d9f75844SAndroid Build Coastguard Worker   }
998*d9f75844SAndroid Build Coastguard Worker }
999*d9f75844SAndroid Build Coastguard Worker 
1000*d9f75844SAndroid Build Coastguard Worker AutoSocketServerThread::~AutoSocketServerThread() {
1001*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(ThreadManager::Instance()->CurrentThread() == this);
1002*d9f75844SAndroid Build Coastguard Worker   // Stop and destroy the thread before clearing it as the current thread.
1003*d9f75844SAndroid Build Coastguard Worker   // Sometimes there are messages left in the Thread that will be
1004*d9f75844SAndroid Build Coastguard Worker   // destroyed by DoDestroy, and sometimes the destructors of the message and/or
1005*d9f75844SAndroid Build Coastguard Worker   // its contents rely on this thread still being set as the current thread.
1006*d9f75844SAndroid Build Coastguard Worker   Stop();
1007*d9f75844SAndroid Build Coastguard Worker   DoDestroy();
1008*d9f75844SAndroid Build Coastguard Worker   rtc::ThreadManager::Instance()->SetCurrentThread(nullptr);
1009*d9f75844SAndroid Build Coastguard Worker   rtc::ThreadManager::Instance()->SetCurrentThread(old_thread_);
1010*d9f75844SAndroid Build Coastguard Worker   if (old_thread_) {
1011*d9f75844SAndroid Build Coastguard Worker     ThreadManager::Add(old_thread_);
1012*d9f75844SAndroid Build Coastguard Worker   }
1013*d9f75844SAndroid Build Coastguard Worker }
1014*d9f75844SAndroid Build Coastguard Worker 
1015*d9f75844SAndroid Build Coastguard Worker }  // namespace rtc
1016