1*d9f75844SAndroid Build Coastguard Worker /*
2*d9f75844SAndroid Build Coastguard Worker * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3*d9f75844SAndroid Build Coastguard Worker *
4*d9f75844SAndroid Build Coastguard Worker * Use of this source code is governed by a BSD-style license
5*d9f75844SAndroid Build Coastguard Worker * that can be found in the LICENSE file in the root of the source
6*d9f75844SAndroid Build Coastguard Worker * tree. An additional intellectual property rights grant can be found
7*d9f75844SAndroid Build Coastguard Worker * in the file PATENTS. All contributing project authors may
8*d9f75844SAndroid Build Coastguard Worker * be found in the AUTHORS file in the root of the source tree.
9*d9f75844SAndroid Build Coastguard Worker */
10*d9f75844SAndroid Build Coastguard Worker
11*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/thread.h"
12*d9f75844SAndroid Build Coastguard Worker
13*d9f75844SAndroid Build Coastguard Worker #include "absl/strings/string_view.h"
14*d9f75844SAndroid Build Coastguard Worker #include "api/units/time_delta.h"
15*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/socket_server.h"
16*d9f75844SAndroid Build Coastguard Worker
17*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_WIN)
18*d9f75844SAndroid Build Coastguard Worker #include <comdef.h>
19*d9f75844SAndroid Build Coastguard Worker #elif defined(WEBRTC_POSIX)
20*d9f75844SAndroid Build Coastguard Worker #include <time.h>
21*d9f75844SAndroid Build Coastguard Worker #else
22*d9f75844SAndroid Build Coastguard Worker #error "Either WEBRTC_WIN or WEBRTC_POSIX needs to be defined."
23*d9f75844SAndroid Build Coastguard Worker #endif
24*d9f75844SAndroid Build Coastguard Worker
25*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_WIN)
26*d9f75844SAndroid Build Coastguard Worker // Disable warning that we don't care about:
27*d9f75844SAndroid Build Coastguard Worker // warning C4722: destructor never returns, potential memory leak
28*d9f75844SAndroid Build Coastguard Worker #pragma warning(disable : 4722)
29*d9f75844SAndroid Build Coastguard Worker #endif
30*d9f75844SAndroid Build Coastguard Worker
31*d9f75844SAndroid Build Coastguard Worker #include <stdio.h>
32*d9f75844SAndroid Build Coastguard Worker
33*d9f75844SAndroid Build Coastguard Worker #include <utility>
34*d9f75844SAndroid Build Coastguard Worker
35*d9f75844SAndroid Build Coastguard Worker #include "absl/algorithm/container.h"
36*d9f75844SAndroid Build Coastguard Worker #include "absl/cleanup/cleanup.h"
37*d9f75844SAndroid Build Coastguard Worker #include "api/sequence_checker.h"
38*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/checks.h"
39*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/deprecated/recursive_critical_section.h"
40*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/event.h"
41*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/internal/default_socket_server.h"
42*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/logging.h"
43*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/null_socket_server.h"
44*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/synchronization/mutex.h"
45*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/time_utils.h"
46*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/trace_event.h"
47*d9f75844SAndroid Build Coastguard Worker
48*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_MAC)
49*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/system/cocoa_threading.h"
50*d9f75844SAndroid Build Coastguard Worker
51*d9f75844SAndroid Build Coastguard Worker /*
52*d9f75844SAndroid Build Coastguard Worker * These are forward-declarations for methods that are part of the
53*d9f75844SAndroid Build Coastguard Worker * ObjC runtime. They are declared in the private header objc-internal.h.
54*d9f75844SAndroid Build Coastguard Worker * These calls are what clang inserts when using @autoreleasepool in ObjC,
55*d9f75844SAndroid Build Coastguard Worker * but here they are used directly in order to keep this file C++.
56*d9f75844SAndroid Build Coastguard Worker * https://clang.llvm.org/docs/AutomaticReferenceCounting.html#runtime-support
57*d9f75844SAndroid Build Coastguard Worker */
58*d9f75844SAndroid Build Coastguard Worker extern "C" {
59*d9f75844SAndroid Build Coastguard Worker void* objc_autoreleasePoolPush(void);
60*d9f75844SAndroid Build Coastguard Worker void objc_autoreleasePoolPop(void* pool);
61*d9f75844SAndroid Build Coastguard Worker }
62*d9f75844SAndroid Build Coastguard Worker
63*d9f75844SAndroid Build Coastguard Worker namespace {
64*d9f75844SAndroid Build Coastguard Worker class ScopedAutoReleasePool {
65*d9f75844SAndroid Build Coastguard Worker public:
ScopedAutoReleasePool()66*d9f75844SAndroid Build Coastguard Worker ScopedAutoReleasePool() : pool_(objc_autoreleasePoolPush()) {}
~ScopedAutoReleasePool()67*d9f75844SAndroid Build Coastguard Worker ~ScopedAutoReleasePool() { objc_autoreleasePoolPop(pool_); }
68*d9f75844SAndroid Build Coastguard Worker
69*d9f75844SAndroid Build Coastguard Worker private:
70*d9f75844SAndroid Build Coastguard Worker void* const pool_;
71*d9f75844SAndroid Build Coastguard Worker };
72*d9f75844SAndroid Build Coastguard Worker } // namespace
73*d9f75844SAndroid Build Coastguard Worker #endif
74*d9f75844SAndroid Build Coastguard Worker
75*d9f75844SAndroid Build Coastguard Worker namespace rtc {
76*d9f75844SAndroid Build Coastguard Worker namespace {
77*d9f75844SAndroid Build Coastguard Worker
78*d9f75844SAndroid Build Coastguard Worker using ::webrtc::MutexLock;
79*d9f75844SAndroid Build Coastguard Worker using ::webrtc::TimeDelta;
80*d9f75844SAndroid Build Coastguard Worker
81*d9f75844SAndroid Build Coastguard Worker class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
82*d9f75844SAndroid Build Coastguard Worker public:
MarkProcessingCritScope(const RecursiveCriticalSection * cs,size_t * processing)83*d9f75844SAndroid Build Coastguard Worker MarkProcessingCritScope(const RecursiveCriticalSection* cs,
84*d9f75844SAndroid Build Coastguard Worker size_t* processing) RTC_EXCLUSIVE_LOCK_FUNCTION(cs)
85*d9f75844SAndroid Build Coastguard Worker : cs_(cs), processing_(processing) {
86*d9f75844SAndroid Build Coastguard Worker cs_->Enter();
87*d9f75844SAndroid Build Coastguard Worker *processing_ += 1;
88*d9f75844SAndroid Build Coastguard Worker }
89*d9f75844SAndroid Build Coastguard Worker
RTC_UNLOCK_FUNCTION()90*d9f75844SAndroid Build Coastguard Worker ~MarkProcessingCritScope() RTC_UNLOCK_FUNCTION() {
91*d9f75844SAndroid Build Coastguard Worker *processing_ -= 1;
92*d9f75844SAndroid Build Coastguard Worker cs_->Leave();
93*d9f75844SAndroid Build Coastguard Worker }
94*d9f75844SAndroid Build Coastguard Worker
95*d9f75844SAndroid Build Coastguard Worker MarkProcessingCritScope(const MarkProcessingCritScope&) = delete;
96*d9f75844SAndroid Build Coastguard Worker MarkProcessingCritScope& operator=(const MarkProcessingCritScope&) = delete;
97*d9f75844SAndroid Build Coastguard Worker
98*d9f75844SAndroid Build Coastguard Worker private:
99*d9f75844SAndroid Build Coastguard Worker const RecursiveCriticalSection* const cs_;
100*d9f75844SAndroid Build Coastguard Worker size_t* processing_;
101*d9f75844SAndroid Build Coastguard Worker };
102*d9f75844SAndroid Build Coastguard Worker
103*d9f75844SAndroid Build Coastguard Worker } // namespace
104*d9f75844SAndroid Build Coastguard Worker
Instance()105*d9f75844SAndroid Build Coastguard Worker ThreadManager* ThreadManager::Instance() {
106*d9f75844SAndroid Build Coastguard Worker static ThreadManager* const thread_manager = new ThreadManager();
107*d9f75844SAndroid Build Coastguard Worker return thread_manager;
108*d9f75844SAndroid Build Coastguard Worker }
109*d9f75844SAndroid Build Coastguard Worker
~ThreadManager()110*d9f75844SAndroid Build Coastguard Worker ThreadManager::~ThreadManager() {
111*d9f75844SAndroid Build Coastguard Worker // By above RTC_DEFINE_STATIC_LOCAL.
112*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK_NOTREACHED() << "ThreadManager should never be destructed.";
113*d9f75844SAndroid Build Coastguard Worker }
114*d9f75844SAndroid Build Coastguard Worker
115*d9f75844SAndroid Build Coastguard Worker // static
Add(Thread * message_queue)116*d9f75844SAndroid Build Coastguard Worker void ThreadManager::Add(Thread* message_queue) {
117*d9f75844SAndroid Build Coastguard Worker return Instance()->AddInternal(message_queue);
118*d9f75844SAndroid Build Coastguard Worker }
AddInternal(Thread * message_queue)119*d9f75844SAndroid Build Coastguard Worker void ThreadManager::AddInternal(Thread* message_queue) {
120*d9f75844SAndroid Build Coastguard Worker CritScope cs(&crit_);
121*d9f75844SAndroid Build Coastguard Worker // Prevent changes while the list of message queues is processed.
122*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK_EQ(processing_, 0);
123*d9f75844SAndroid Build Coastguard Worker message_queues_.push_back(message_queue);
124*d9f75844SAndroid Build Coastguard Worker }
125*d9f75844SAndroid Build Coastguard Worker
126*d9f75844SAndroid Build Coastguard Worker // static
Remove(Thread * message_queue)127*d9f75844SAndroid Build Coastguard Worker void ThreadManager::Remove(Thread* message_queue) {
128*d9f75844SAndroid Build Coastguard Worker return Instance()->RemoveInternal(message_queue);
129*d9f75844SAndroid Build Coastguard Worker }
RemoveInternal(Thread * message_queue)130*d9f75844SAndroid Build Coastguard Worker void ThreadManager::RemoveInternal(Thread* message_queue) {
131*d9f75844SAndroid Build Coastguard Worker {
132*d9f75844SAndroid Build Coastguard Worker CritScope cs(&crit_);
133*d9f75844SAndroid Build Coastguard Worker // Prevent changes while the list of message queues is processed.
134*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK_EQ(processing_, 0);
135*d9f75844SAndroid Build Coastguard Worker std::vector<Thread*>::iterator iter;
136*d9f75844SAndroid Build Coastguard Worker iter = absl::c_find(message_queues_, message_queue);
137*d9f75844SAndroid Build Coastguard Worker if (iter != message_queues_.end()) {
138*d9f75844SAndroid Build Coastguard Worker message_queues_.erase(iter);
139*d9f75844SAndroid Build Coastguard Worker }
140*d9f75844SAndroid Build Coastguard Worker #if RTC_DCHECK_IS_ON
141*d9f75844SAndroid Build Coastguard Worker RemoveFromSendGraph(message_queue);
142*d9f75844SAndroid Build Coastguard Worker #endif
143*d9f75844SAndroid Build Coastguard Worker }
144*d9f75844SAndroid Build Coastguard Worker }
145*d9f75844SAndroid Build Coastguard Worker
146*d9f75844SAndroid Build Coastguard Worker #if RTC_DCHECK_IS_ON
RemoveFromSendGraph(Thread * thread)147*d9f75844SAndroid Build Coastguard Worker void ThreadManager::RemoveFromSendGraph(Thread* thread) {
148*d9f75844SAndroid Build Coastguard Worker for (auto it = send_graph_.begin(); it != send_graph_.end();) {
149*d9f75844SAndroid Build Coastguard Worker if (it->first == thread) {
150*d9f75844SAndroid Build Coastguard Worker it = send_graph_.erase(it);
151*d9f75844SAndroid Build Coastguard Worker } else {
152*d9f75844SAndroid Build Coastguard Worker it->second.erase(thread);
153*d9f75844SAndroid Build Coastguard Worker ++it;
154*d9f75844SAndroid Build Coastguard Worker }
155*d9f75844SAndroid Build Coastguard Worker }
156*d9f75844SAndroid Build Coastguard Worker }
157*d9f75844SAndroid Build Coastguard Worker
RegisterSendAndCheckForCycles(Thread * source,Thread * target)158*d9f75844SAndroid Build Coastguard Worker void ThreadManager::RegisterSendAndCheckForCycles(Thread* source,
159*d9f75844SAndroid Build Coastguard Worker Thread* target) {
160*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(source);
161*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(target);
162*d9f75844SAndroid Build Coastguard Worker
163*d9f75844SAndroid Build Coastguard Worker CritScope cs(&crit_);
164*d9f75844SAndroid Build Coastguard Worker std::deque<Thread*> all_targets({target});
165*d9f75844SAndroid Build Coastguard Worker // We check the pre-existing who-sends-to-who graph for any path from target
166*d9f75844SAndroid Build Coastguard Worker // to source. This loop is guaranteed to terminate because per the send graph
167*d9f75844SAndroid Build Coastguard Worker // invariant, there are no cycles in the graph.
168*d9f75844SAndroid Build Coastguard Worker for (size_t i = 0; i < all_targets.size(); i++) {
169*d9f75844SAndroid Build Coastguard Worker const auto& targets = send_graph_[all_targets[i]];
170*d9f75844SAndroid Build Coastguard Worker all_targets.insert(all_targets.end(), targets.begin(), targets.end());
171*d9f75844SAndroid Build Coastguard Worker }
172*d9f75844SAndroid Build Coastguard Worker RTC_CHECK_EQ(absl::c_count(all_targets, source), 0)
173*d9f75844SAndroid Build Coastguard Worker << " send loop between " << source->name() << " and " << target->name();
174*d9f75844SAndroid Build Coastguard Worker
175*d9f75844SAndroid Build Coastguard Worker // We may now insert source -> target without creating a cycle, since there
176*d9f75844SAndroid Build Coastguard Worker // was no path from target to source per the prior CHECK.
177*d9f75844SAndroid Build Coastguard Worker send_graph_[source].insert(target);
178*d9f75844SAndroid Build Coastguard Worker }
179*d9f75844SAndroid Build Coastguard Worker #endif
180*d9f75844SAndroid Build Coastguard Worker
181*d9f75844SAndroid Build Coastguard Worker // static
ProcessAllMessageQueuesForTesting()182*d9f75844SAndroid Build Coastguard Worker void ThreadManager::ProcessAllMessageQueuesForTesting() {
183*d9f75844SAndroid Build Coastguard Worker return Instance()->ProcessAllMessageQueuesInternal();
184*d9f75844SAndroid Build Coastguard Worker }
185*d9f75844SAndroid Build Coastguard Worker
ProcessAllMessageQueuesInternal()186*d9f75844SAndroid Build Coastguard Worker void ThreadManager::ProcessAllMessageQueuesInternal() {
187*d9f75844SAndroid Build Coastguard Worker // This works by posting a delayed message at the current time and waiting
188*d9f75844SAndroid Build Coastguard Worker // for it to be dispatched on all queues, which will ensure that all messages
189*d9f75844SAndroid Build Coastguard Worker // that came before it were also dispatched.
190*d9f75844SAndroid Build Coastguard Worker std::atomic<int> queues_not_done(0);
191*d9f75844SAndroid Build Coastguard Worker
192*d9f75844SAndroid Build Coastguard Worker {
193*d9f75844SAndroid Build Coastguard Worker MarkProcessingCritScope cs(&crit_, &processing_);
194*d9f75844SAndroid Build Coastguard Worker for (Thread* queue : message_queues_) {
195*d9f75844SAndroid Build Coastguard Worker if (!queue->IsProcessingMessagesForTesting()) {
196*d9f75844SAndroid Build Coastguard Worker // If the queue is not processing messages, it can
197*d9f75844SAndroid Build Coastguard Worker // be ignored. If we tried to post a message to it, it would be dropped
198*d9f75844SAndroid Build Coastguard Worker // or ignored.
199*d9f75844SAndroid Build Coastguard Worker continue;
200*d9f75844SAndroid Build Coastguard Worker }
201*d9f75844SAndroid Build Coastguard Worker queues_not_done.fetch_add(1);
202*d9f75844SAndroid Build Coastguard Worker // Whether the task is processed, or the thread is simply cleared,
203*d9f75844SAndroid Build Coastguard Worker // queues_not_done gets decremented.
204*d9f75844SAndroid Build Coastguard Worker absl::Cleanup sub = [&queues_not_done] { queues_not_done.fetch_sub(1); };
205*d9f75844SAndroid Build Coastguard Worker // Post delayed task instead of regular task to wait for all delayed tasks
206*d9f75844SAndroid Build Coastguard Worker // that are ready for processing.
207*d9f75844SAndroid Build Coastguard Worker queue->PostDelayedTask([sub = std::move(sub)] {}, TimeDelta::Zero());
208*d9f75844SAndroid Build Coastguard Worker }
209*d9f75844SAndroid Build Coastguard Worker }
210*d9f75844SAndroid Build Coastguard Worker
211*d9f75844SAndroid Build Coastguard Worker rtc::Thread* current = rtc::Thread::Current();
212*d9f75844SAndroid Build Coastguard Worker // Note: One of the message queues may have been on this thread, which is
213*d9f75844SAndroid Build Coastguard Worker // why we can't synchronously wait for queues_not_done to go to 0; we need
214*d9f75844SAndroid Build Coastguard Worker // to process messages as well.
215*d9f75844SAndroid Build Coastguard Worker while (queues_not_done.load() > 0) {
216*d9f75844SAndroid Build Coastguard Worker if (current) {
217*d9f75844SAndroid Build Coastguard Worker current->ProcessMessages(0);
218*d9f75844SAndroid Build Coastguard Worker }
219*d9f75844SAndroid Build Coastguard Worker }
220*d9f75844SAndroid Build Coastguard Worker }
221*d9f75844SAndroid Build Coastguard Worker
222*d9f75844SAndroid Build Coastguard Worker // static
Current()223*d9f75844SAndroid Build Coastguard Worker Thread* Thread::Current() {
224*d9f75844SAndroid Build Coastguard Worker ThreadManager* manager = ThreadManager::Instance();
225*d9f75844SAndroid Build Coastguard Worker Thread* thread = manager->CurrentThread();
226*d9f75844SAndroid Build Coastguard Worker
227*d9f75844SAndroid Build Coastguard Worker return thread;
228*d9f75844SAndroid Build Coastguard Worker }
229*d9f75844SAndroid Build Coastguard Worker
230*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_POSIX)
ThreadManager()231*d9f75844SAndroid Build Coastguard Worker ThreadManager::ThreadManager() {
232*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_MAC)
233*d9f75844SAndroid Build Coastguard Worker InitCocoaMultiThreading();
234*d9f75844SAndroid Build Coastguard Worker #endif
235*d9f75844SAndroid Build Coastguard Worker pthread_key_create(&key_, nullptr);
236*d9f75844SAndroid Build Coastguard Worker }
237*d9f75844SAndroid Build Coastguard Worker
CurrentThread()238*d9f75844SAndroid Build Coastguard Worker Thread* ThreadManager::CurrentThread() {
239*d9f75844SAndroid Build Coastguard Worker return static_cast<Thread*>(pthread_getspecific(key_));
240*d9f75844SAndroid Build Coastguard Worker }
241*d9f75844SAndroid Build Coastguard Worker
SetCurrentThreadInternal(Thread * thread)242*d9f75844SAndroid Build Coastguard Worker void ThreadManager::SetCurrentThreadInternal(Thread* thread) {
243*d9f75844SAndroid Build Coastguard Worker pthread_setspecific(key_, thread);
244*d9f75844SAndroid Build Coastguard Worker }
245*d9f75844SAndroid Build Coastguard Worker #endif
246*d9f75844SAndroid Build Coastguard Worker
247*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_WIN)
ThreadManager()248*d9f75844SAndroid Build Coastguard Worker ThreadManager::ThreadManager() : key_(TlsAlloc()) {}
249*d9f75844SAndroid Build Coastguard Worker
CurrentThread()250*d9f75844SAndroid Build Coastguard Worker Thread* ThreadManager::CurrentThread() {
251*d9f75844SAndroid Build Coastguard Worker return static_cast<Thread*>(TlsGetValue(key_));
252*d9f75844SAndroid Build Coastguard Worker }
253*d9f75844SAndroid Build Coastguard Worker
SetCurrentThreadInternal(Thread * thread)254*d9f75844SAndroid Build Coastguard Worker void ThreadManager::SetCurrentThreadInternal(Thread* thread) {
255*d9f75844SAndroid Build Coastguard Worker TlsSetValue(key_, thread);
256*d9f75844SAndroid Build Coastguard Worker }
257*d9f75844SAndroid Build Coastguard Worker #endif
258*d9f75844SAndroid Build Coastguard Worker
SetCurrentThread(Thread * thread)259*d9f75844SAndroid Build Coastguard Worker void ThreadManager::SetCurrentThread(Thread* thread) {
260*d9f75844SAndroid Build Coastguard Worker #if RTC_DLOG_IS_ON
261*d9f75844SAndroid Build Coastguard Worker if (CurrentThread() && thread) {
262*d9f75844SAndroid Build Coastguard Worker RTC_DLOG(LS_ERROR) << "SetCurrentThread: Overwriting an existing value?";
263*d9f75844SAndroid Build Coastguard Worker }
264*d9f75844SAndroid Build Coastguard Worker #endif // RTC_DLOG_IS_ON
265*d9f75844SAndroid Build Coastguard Worker
266*d9f75844SAndroid Build Coastguard Worker if (thread) {
267*d9f75844SAndroid Build Coastguard Worker thread->EnsureIsCurrentTaskQueue();
268*d9f75844SAndroid Build Coastguard Worker } else {
269*d9f75844SAndroid Build Coastguard Worker Thread* current = CurrentThread();
270*d9f75844SAndroid Build Coastguard Worker if (current) {
271*d9f75844SAndroid Build Coastguard Worker // The current thread is being cleared, e.g. as a result of
272*d9f75844SAndroid Build Coastguard Worker // UnwrapCurrent() being called or when a thread is being stopped
273*d9f75844SAndroid Build Coastguard Worker // (see PreRun()). This signals that the Thread instance is being detached
274*d9f75844SAndroid Build Coastguard Worker // from the thread, which also means that TaskQueue::Current() must not
275*d9f75844SAndroid Build Coastguard Worker // return a pointer to the Thread instance.
276*d9f75844SAndroid Build Coastguard Worker current->ClearCurrentTaskQueue();
277*d9f75844SAndroid Build Coastguard Worker }
278*d9f75844SAndroid Build Coastguard Worker }
279*d9f75844SAndroid Build Coastguard Worker
280*d9f75844SAndroid Build Coastguard Worker SetCurrentThreadInternal(thread);
281*d9f75844SAndroid Build Coastguard Worker }
282*d9f75844SAndroid Build Coastguard Worker
ChangeCurrentThreadForTest(rtc::Thread * thread)283*d9f75844SAndroid Build Coastguard Worker void rtc::ThreadManager::ChangeCurrentThreadForTest(rtc::Thread* thread) {
284*d9f75844SAndroid Build Coastguard Worker SetCurrentThreadInternal(thread);
285*d9f75844SAndroid Build Coastguard Worker }
286*d9f75844SAndroid Build Coastguard Worker
WrapCurrentThread()287*d9f75844SAndroid Build Coastguard Worker Thread* ThreadManager::WrapCurrentThread() {
288*d9f75844SAndroid Build Coastguard Worker Thread* result = CurrentThread();
289*d9f75844SAndroid Build Coastguard Worker if (nullptr == result) {
290*d9f75844SAndroid Build Coastguard Worker result = new Thread(CreateDefaultSocketServer());
291*d9f75844SAndroid Build Coastguard Worker result->WrapCurrentWithThreadManager(this, true);
292*d9f75844SAndroid Build Coastguard Worker }
293*d9f75844SAndroid Build Coastguard Worker return result;
294*d9f75844SAndroid Build Coastguard Worker }
295*d9f75844SAndroid Build Coastguard Worker
UnwrapCurrentThread()296*d9f75844SAndroid Build Coastguard Worker void ThreadManager::UnwrapCurrentThread() {
297*d9f75844SAndroid Build Coastguard Worker Thread* t = CurrentThread();
298*d9f75844SAndroid Build Coastguard Worker if (t && !(t->IsOwned())) {
299*d9f75844SAndroid Build Coastguard Worker t->UnwrapCurrent();
300*d9f75844SAndroid Build Coastguard Worker delete t;
301*d9f75844SAndroid Build Coastguard Worker }
302*d9f75844SAndroid Build Coastguard Worker }
303*d9f75844SAndroid Build Coastguard Worker
ScopedDisallowBlockingCalls()304*d9f75844SAndroid Build Coastguard Worker Thread::ScopedDisallowBlockingCalls::ScopedDisallowBlockingCalls()
305*d9f75844SAndroid Build Coastguard Worker : thread_(Thread::Current()),
306*d9f75844SAndroid Build Coastguard Worker previous_state_(thread_->SetAllowBlockingCalls(false)) {}
307*d9f75844SAndroid Build Coastguard Worker
~ScopedDisallowBlockingCalls()308*d9f75844SAndroid Build Coastguard Worker Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() {
309*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(thread_->IsCurrent());
310*d9f75844SAndroid Build Coastguard Worker thread_->SetAllowBlockingCalls(previous_state_);
311*d9f75844SAndroid Build Coastguard Worker }
312*d9f75844SAndroid Build Coastguard Worker
313*d9f75844SAndroid Build Coastguard Worker #if RTC_DCHECK_IS_ON
ScopedCountBlockingCalls(std::function<void (uint32_t,uint32_t)> callback)314*d9f75844SAndroid Build Coastguard Worker Thread::ScopedCountBlockingCalls::ScopedCountBlockingCalls(
315*d9f75844SAndroid Build Coastguard Worker std::function<void(uint32_t, uint32_t)> callback)
316*d9f75844SAndroid Build Coastguard Worker : thread_(Thread::Current()),
317*d9f75844SAndroid Build Coastguard Worker base_blocking_call_count_(thread_->GetBlockingCallCount()),
318*d9f75844SAndroid Build Coastguard Worker base_could_be_blocking_call_count_(
319*d9f75844SAndroid Build Coastguard Worker thread_->GetCouldBeBlockingCallCount()),
320*d9f75844SAndroid Build Coastguard Worker result_callback_(std::move(callback)) {}
321*d9f75844SAndroid Build Coastguard Worker
~ScopedCountBlockingCalls()322*d9f75844SAndroid Build Coastguard Worker Thread::ScopedCountBlockingCalls::~ScopedCountBlockingCalls() {
323*d9f75844SAndroid Build Coastguard Worker if (GetTotalBlockedCallCount() >= min_blocking_calls_for_callback_) {
324*d9f75844SAndroid Build Coastguard Worker result_callback_(GetBlockingCallCount(), GetCouldBeBlockingCallCount());
325*d9f75844SAndroid Build Coastguard Worker }
326*d9f75844SAndroid Build Coastguard Worker }
327*d9f75844SAndroid Build Coastguard Worker
GetBlockingCallCount() const328*d9f75844SAndroid Build Coastguard Worker uint32_t Thread::ScopedCountBlockingCalls::GetBlockingCallCount() const {
329*d9f75844SAndroid Build Coastguard Worker return thread_->GetBlockingCallCount() - base_blocking_call_count_;
330*d9f75844SAndroid Build Coastguard Worker }
331*d9f75844SAndroid Build Coastguard Worker
GetCouldBeBlockingCallCount() const332*d9f75844SAndroid Build Coastguard Worker uint32_t Thread::ScopedCountBlockingCalls::GetCouldBeBlockingCallCount() const {
333*d9f75844SAndroid Build Coastguard Worker return thread_->GetCouldBeBlockingCallCount() -
334*d9f75844SAndroid Build Coastguard Worker base_could_be_blocking_call_count_;
335*d9f75844SAndroid Build Coastguard Worker }
336*d9f75844SAndroid Build Coastguard Worker
GetTotalBlockedCallCount() const337*d9f75844SAndroid Build Coastguard Worker uint32_t Thread::ScopedCountBlockingCalls::GetTotalBlockedCallCount() const {
338*d9f75844SAndroid Build Coastguard Worker return GetBlockingCallCount() + GetCouldBeBlockingCallCount();
339*d9f75844SAndroid Build Coastguard Worker }
340*d9f75844SAndroid Build Coastguard Worker #endif
341*d9f75844SAndroid Build Coastguard Worker
Thread(SocketServer * ss)342*d9f75844SAndroid Build Coastguard Worker Thread::Thread(SocketServer* ss) : Thread(ss, /*do_init=*/true) {}
343*d9f75844SAndroid Build Coastguard Worker
Thread(std::unique_ptr<SocketServer> ss)344*d9f75844SAndroid Build Coastguard Worker Thread::Thread(std::unique_ptr<SocketServer> ss)
345*d9f75844SAndroid Build Coastguard Worker : Thread(std::move(ss), /*do_init=*/true) {}
346*d9f75844SAndroid Build Coastguard Worker
Thread(SocketServer * ss,bool do_init)347*d9f75844SAndroid Build Coastguard Worker Thread::Thread(SocketServer* ss, bool do_init)
348*d9f75844SAndroid Build Coastguard Worker : delayed_next_num_(0),
349*d9f75844SAndroid Build Coastguard Worker fInitialized_(false),
350*d9f75844SAndroid Build Coastguard Worker fDestroyed_(false),
351*d9f75844SAndroid Build Coastguard Worker stop_(0),
352*d9f75844SAndroid Build Coastguard Worker ss_(ss) {
353*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(ss);
354*d9f75844SAndroid Build Coastguard Worker ss_->SetMessageQueue(this);
355*d9f75844SAndroid Build Coastguard Worker SetName("Thread", this); // default name
356*d9f75844SAndroid Build Coastguard Worker if (do_init) {
357*d9f75844SAndroid Build Coastguard Worker DoInit();
358*d9f75844SAndroid Build Coastguard Worker }
359*d9f75844SAndroid Build Coastguard Worker }
360*d9f75844SAndroid Build Coastguard Worker
Thread(std::unique_ptr<SocketServer> ss,bool do_init)361*d9f75844SAndroid Build Coastguard Worker Thread::Thread(std::unique_ptr<SocketServer> ss, bool do_init)
362*d9f75844SAndroid Build Coastguard Worker : Thread(ss.get(), do_init) {
363*d9f75844SAndroid Build Coastguard Worker own_ss_ = std::move(ss);
364*d9f75844SAndroid Build Coastguard Worker }
365*d9f75844SAndroid Build Coastguard Worker
~Thread()366*d9f75844SAndroid Build Coastguard Worker Thread::~Thread() {
367*d9f75844SAndroid Build Coastguard Worker Stop();
368*d9f75844SAndroid Build Coastguard Worker DoDestroy();
369*d9f75844SAndroid Build Coastguard Worker }
370*d9f75844SAndroid Build Coastguard Worker
DoInit()371*d9f75844SAndroid Build Coastguard Worker void Thread::DoInit() {
372*d9f75844SAndroid Build Coastguard Worker if (fInitialized_) {
373*d9f75844SAndroid Build Coastguard Worker return;
374*d9f75844SAndroid Build Coastguard Worker }
375*d9f75844SAndroid Build Coastguard Worker
376*d9f75844SAndroid Build Coastguard Worker fInitialized_ = true;
377*d9f75844SAndroid Build Coastguard Worker ThreadManager::Add(this);
378*d9f75844SAndroid Build Coastguard Worker }
379*d9f75844SAndroid Build Coastguard Worker
DoDestroy()380*d9f75844SAndroid Build Coastguard Worker void Thread::DoDestroy() {
381*d9f75844SAndroid Build Coastguard Worker if (fDestroyed_) {
382*d9f75844SAndroid Build Coastguard Worker return;
383*d9f75844SAndroid Build Coastguard Worker }
384*d9f75844SAndroid Build Coastguard Worker
385*d9f75844SAndroid Build Coastguard Worker fDestroyed_ = true;
386*d9f75844SAndroid Build Coastguard Worker // The signal is done from here to ensure
387*d9f75844SAndroid Build Coastguard Worker // that it always gets called when the queue
388*d9f75844SAndroid Build Coastguard Worker // is going away.
389*d9f75844SAndroid Build Coastguard Worker if (ss_) {
390*d9f75844SAndroid Build Coastguard Worker ss_->SetMessageQueue(nullptr);
391*d9f75844SAndroid Build Coastguard Worker }
392*d9f75844SAndroid Build Coastguard Worker ThreadManager::Remove(this);
393*d9f75844SAndroid Build Coastguard Worker // Clear.
394*d9f75844SAndroid Build Coastguard Worker messages_ = {};
395*d9f75844SAndroid Build Coastguard Worker delayed_messages_ = {};
396*d9f75844SAndroid Build Coastguard Worker }
397*d9f75844SAndroid Build Coastguard Worker
socketserver()398*d9f75844SAndroid Build Coastguard Worker SocketServer* Thread::socketserver() {
399*d9f75844SAndroid Build Coastguard Worker return ss_;
400*d9f75844SAndroid Build Coastguard Worker }
401*d9f75844SAndroid Build Coastguard Worker
WakeUpSocketServer()402*d9f75844SAndroid Build Coastguard Worker void Thread::WakeUpSocketServer() {
403*d9f75844SAndroid Build Coastguard Worker ss_->WakeUp();
404*d9f75844SAndroid Build Coastguard Worker }
405*d9f75844SAndroid Build Coastguard Worker
Quit()406*d9f75844SAndroid Build Coastguard Worker void Thread::Quit() {
407*d9f75844SAndroid Build Coastguard Worker stop_.store(1, std::memory_order_release);
408*d9f75844SAndroid Build Coastguard Worker WakeUpSocketServer();
409*d9f75844SAndroid Build Coastguard Worker }
410*d9f75844SAndroid Build Coastguard Worker
IsQuitting()411*d9f75844SAndroid Build Coastguard Worker bool Thread::IsQuitting() {
412*d9f75844SAndroid Build Coastguard Worker return stop_.load(std::memory_order_acquire) != 0;
413*d9f75844SAndroid Build Coastguard Worker }
414*d9f75844SAndroid Build Coastguard Worker
Restart()415*d9f75844SAndroid Build Coastguard Worker void Thread::Restart() {
416*d9f75844SAndroid Build Coastguard Worker stop_.store(0, std::memory_order_release);
417*d9f75844SAndroid Build Coastguard Worker }
418*d9f75844SAndroid Build Coastguard Worker
Get(int cmsWait)419*d9f75844SAndroid Build Coastguard Worker absl::AnyInvocable<void() &&> Thread::Get(int cmsWait) {
420*d9f75844SAndroid Build Coastguard Worker // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
421*d9f75844SAndroid Build Coastguard Worker
422*d9f75844SAndroid Build Coastguard Worker int64_t cmsTotal = cmsWait;
423*d9f75844SAndroid Build Coastguard Worker int64_t cmsElapsed = 0;
424*d9f75844SAndroid Build Coastguard Worker int64_t msStart = TimeMillis();
425*d9f75844SAndroid Build Coastguard Worker int64_t msCurrent = msStart;
426*d9f75844SAndroid Build Coastguard Worker while (true) {
427*d9f75844SAndroid Build Coastguard Worker // Check for posted events
428*d9f75844SAndroid Build Coastguard Worker int64_t cmsDelayNext = kForever;
429*d9f75844SAndroid Build Coastguard Worker {
430*d9f75844SAndroid Build Coastguard Worker // All queue operations need to be locked, but nothing else in this loop
431*d9f75844SAndroid Build Coastguard Worker // can happen while holding the `mutex_`.
432*d9f75844SAndroid Build Coastguard Worker MutexLock lock(&mutex_);
433*d9f75844SAndroid Build Coastguard Worker // Check for delayed messages that have been triggered and calculate the
434*d9f75844SAndroid Build Coastguard Worker // next trigger time.
435*d9f75844SAndroid Build Coastguard Worker while (!delayed_messages_.empty()) {
436*d9f75844SAndroid Build Coastguard Worker if (msCurrent < delayed_messages_.top().run_time_ms) {
437*d9f75844SAndroid Build Coastguard Worker cmsDelayNext =
438*d9f75844SAndroid Build Coastguard Worker TimeDiff(delayed_messages_.top().run_time_ms, msCurrent);
439*d9f75844SAndroid Build Coastguard Worker break;
440*d9f75844SAndroid Build Coastguard Worker }
441*d9f75844SAndroid Build Coastguard Worker messages_.push(std::move(delayed_messages_.top().functor));
442*d9f75844SAndroid Build Coastguard Worker delayed_messages_.pop();
443*d9f75844SAndroid Build Coastguard Worker }
444*d9f75844SAndroid Build Coastguard Worker // Pull a message off the message queue, if available.
445*d9f75844SAndroid Build Coastguard Worker if (!messages_.empty()) {
446*d9f75844SAndroid Build Coastguard Worker absl::AnyInvocable<void()&&> task = std::move(messages_.front());
447*d9f75844SAndroid Build Coastguard Worker messages_.pop();
448*d9f75844SAndroid Build Coastguard Worker return task;
449*d9f75844SAndroid Build Coastguard Worker }
450*d9f75844SAndroid Build Coastguard Worker }
451*d9f75844SAndroid Build Coastguard Worker
452*d9f75844SAndroid Build Coastguard Worker if (IsQuitting())
453*d9f75844SAndroid Build Coastguard Worker break;
454*d9f75844SAndroid Build Coastguard Worker
455*d9f75844SAndroid Build Coastguard Worker // Which is shorter, the delay wait or the asked wait?
456*d9f75844SAndroid Build Coastguard Worker
457*d9f75844SAndroid Build Coastguard Worker int64_t cmsNext;
458*d9f75844SAndroid Build Coastguard Worker if (cmsWait == kForever) {
459*d9f75844SAndroid Build Coastguard Worker cmsNext = cmsDelayNext;
460*d9f75844SAndroid Build Coastguard Worker } else {
461*d9f75844SAndroid Build Coastguard Worker cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
462*d9f75844SAndroid Build Coastguard Worker if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
463*d9f75844SAndroid Build Coastguard Worker cmsNext = cmsDelayNext;
464*d9f75844SAndroid Build Coastguard Worker }
465*d9f75844SAndroid Build Coastguard Worker
466*d9f75844SAndroid Build Coastguard Worker {
467*d9f75844SAndroid Build Coastguard Worker // Wait and multiplex in the meantime
468*d9f75844SAndroid Build Coastguard Worker if (!ss_->Wait(cmsNext == kForever ? SocketServer::kForever
469*d9f75844SAndroid Build Coastguard Worker : webrtc::TimeDelta::Millis(cmsNext),
470*d9f75844SAndroid Build Coastguard Worker /*process_io=*/true))
471*d9f75844SAndroid Build Coastguard Worker return nullptr;
472*d9f75844SAndroid Build Coastguard Worker }
473*d9f75844SAndroid Build Coastguard Worker
474*d9f75844SAndroid Build Coastguard Worker // If the specified timeout expired, return
475*d9f75844SAndroid Build Coastguard Worker
476*d9f75844SAndroid Build Coastguard Worker msCurrent = TimeMillis();
477*d9f75844SAndroid Build Coastguard Worker cmsElapsed = TimeDiff(msCurrent, msStart);
478*d9f75844SAndroid Build Coastguard Worker if (cmsWait != kForever) {
479*d9f75844SAndroid Build Coastguard Worker if (cmsElapsed >= cmsWait)
480*d9f75844SAndroid Build Coastguard Worker return nullptr;
481*d9f75844SAndroid Build Coastguard Worker }
482*d9f75844SAndroid Build Coastguard Worker }
483*d9f75844SAndroid Build Coastguard Worker return nullptr;
484*d9f75844SAndroid Build Coastguard Worker }
485*d9f75844SAndroid Build Coastguard Worker
PostTask(absl::AnyInvocable<void ()&&> task)486*d9f75844SAndroid Build Coastguard Worker void Thread::PostTask(absl::AnyInvocable<void() &&> task) {
487*d9f75844SAndroid Build Coastguard Worker if (IsQuitting()) {
488*d9f75844SAndroid Build Coastguard Worker return;
489*d9f75844SAndroid Build Coastguard Worker }
490*d9f75844SAndroid Build Coastguard Worker
491*d9f75844SAndroid Build Coastguard Worker // Keep thread safe
492*d9f75844SAndroid Build Coastguard Worker // Add the message to the end of the queue
493*d9f75844SAndroid Build Coastguard Worker // Signal for the multiplexer to return
494*d9f75844SAndroid Build Coastguard Worker
495*d9f75844SAndroid Build Coastguard Worker {
496*d9f75844SAndroid Build Coastguard Worker MutexLock lock(&mutex_);
497*d9f75844SAndroid Build Coastguard Worker messages_.push(std::move(task));
498*d9f75844SAndroid Build Coastguard Worker }
499*d9f75844SAndroid Build Coastguard Worker WakeUpSocketServer();
500*d9f75844SAndroid Build Coastguard Worker }
501*d9f75844SAndroid Build Coastguard Worker
PostDelayedHighPrecisionTask(absl::AnyInvocable<void ()&&> task,webrtc::TimeDelta delay)502*d9f75844SAndroid Build Coastguard Worker void Thread::PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
503*d9f75844SAndroid Build Coastguard Worker webrtc::TimeDelta delay) {
504*d9f75844SAndroid Build Coastguard Worker if (IsQuitting()) {
505*d9f75844SAndroid Build Coastguard Worker return;
506*d9f75844SAndroid Build Coastguard Worker }
507*d9f75844SAndroid Build Coastguard Worker
508*d9f75844SAndroid Build Coastguard Worker // Keep thread safe
509*d9f75844SAndroid Build Coastguard Worker // Add to the priority queue. Gets sorted soonest first.
510*d9f75844SAndroid Build Coastguard Worker // Signal for the multiplexer to return.
511*d9f75844SAndroid Build Coastguard Worker
512*d9f75844SAndroid Build Coastguard Worker int64_t delay_ms = delay.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms<int>();
513*d9f75844SAndroid Build Coastguard Worker int64_t run_time_ms = TimeAfter(delay_ms);
514*d9f75844SAndroid Build Coastguard Worker {
515*d9f75844SAndroid Build Coastguard Worker MutexLock lock(&mutex_);
516*d9f75844SAndroid Build Coastguard Worker delayed_messages_.push({.delay_ms = delay_ms,
517*d9f75844SAndroid Build Coastguard Worker .run_time_ms = run_time_ms,
518*d9f75844SAndroid Build Coastguard Worker .message_number = delayed_next_num_,
519*d9f75844SAndroid Build Coastguard Worker .functor = std::move(task)});
520*d9f75844SAndroid Build Coastguard Worker // If this message queue processes 1 message every millisecond for 50 days,
521*d9f75844SAndroid Build Coastguard Worker // we will wrap this number. Even then, only messages with identical times
522*d9f75844SAndroid Build Coastguard Worker // will be misordered, and then only briefly. This is probably ok.
523*d9f75844SAndroid Build Coastguard Worker ++delayed_next_num_;
524*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK_NE(0, delayed_next_num_);
525*d9f75844SAndroid Build Coastguard Worker }
526*d9f75844SAndroid Build Coastguard Worker WakeUpSocketServer();
527*d9f75844SAndroid Build Coastguard Worker }
528*d9f75844SAndroid Build Coastguard Worker
GetDelay()529*d9f75844SAndroid Build Coastguard Worker int Thread::GetDelay() {
530*d9f75844SAndroid Build Coastguard Worker MutexLock lock(&mutex_);
531*d9f75844SAndroid Build Coastguard Worker
532*d9f75844SAndroid Build Coastguard Worker if (!messages_.empty())
533*d9f75844SAndroid Build Coastguard Worker return 0;
534*d9f75844SAndroid Build Coastguard Worker
535*d9f75844SAndroid Build Coastguard Worker if (!delayed_messages_.empty()) {
536*d9f75844SAndroid Build Coastguard Worker int delay = TimeUntil(delayed_messages_.top().run_time_ms);
537*d9f75844SAndroid Build Coastguard Worker if (delay < 0)
538*d9f75844SAndroid Build Coastguard Worker delay = 0;
539*d9f75844SAndroid Build Coastguard Worker return delay;
540*d9f75844SAndroid Build Coastguard Worker }
541*d9f75844SAndroid Build Coastguard Worker
542*d9f75844SAndroid Build Coastguard Worker return kForever;
543*d9f75844SAndroid Build Coastguard Worker }
544*d9f75844SAndroid Build Coastguard Worker
Dispatch(absl::AnyInvocable<void ()&&> task)545*d9f75844SAndroid Build Coastguard Worker void Thread::Dispatch(absl::AnyInvocable<void() &&> task) {
546*d9f75844SAndroid Build Coastguard Worker TRACE_EVENT0("webrtc", "Thread::Dispatch");
547*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK_RUN_ON(this);
548*d9f75844SAndroid Build Coastguard Worker int64_t start_time = TimeMillis();
549*d9f75844SAndroid Build Coastguard Worker std::move(task)();
550*d9f75844SAndroid Build Coastguard Worker int64_t end_time = TimeMillis();
551*d9f75844SAndroid Build Coastguard Worker int64_t diff = TimeDiff(end_time, start_time);
552*d9f75844SAndroid Build Coastguard Worker if (diff >= dispatch_warning_ms_) {
553*d9f75844SAndroid Build Coastguard Worker RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
554*d9f75844SAndroid Build Coastguard Worker << "ms to dispatch.";
555*d9f75844SAndroid Build Coastguard Worker // To avoid log spew, move the warning limit to only give warning
556*d9f75844SAndroid Build Coastguard Worker // for delays that are larger than the one observed.
557*d9f75844SAndroid Build Coastguard Worker dispatch_warning_ms_ = diff + 1;
558*d9f75844SAndroid Build Coastguard Worker }
559*d9f75844SAndroid Build Coastguard Worker }
560*d9f75844SAndroid Build Coastguard Worker
IsCurrent() const561*d9f75844SAndroid Build Coastguard Worker bool Thread::IsCurrent() const {
562*d9f75844SAndroid Build Coastguard Worker return ThreadManager::Instance()->CurrentThread() == this;
563*d9f75844SAndroid Build Coastguard Worker }
564*d9f75844SAndroid Build Coastguard Worker
CreateWithSocketServer()565*d9f75844SAndroid Build Coastguard Worker std::unique_ptr<Thread> Thread::CreateWithSocketServer() {
566*d9f75844SAndroid Build Coastguard Worker return std::unique_ptr<Thread>(new Thread(CreateDefaultSocketServer()));
567*d9f75844SAndroid Build Coastguard Worker }
568*d9f75844SAndroid Build Coastguard Worker
Create()569*d9f75844SAndroid Build Coastguard Worker std::unique_ptr<Thread> Thread::Create() {
570*d9f75844SAndroid Build Coastguard Worker return std::unique_ptr<Thread>(
571*d9f75844SAndroid Build Coastguard Worker new Thread(std::unique_ptr<SocketServer>(new NullSocketServer())));
572*d9f75844SAndroid Build Coastguard Worker }
573*d9f75844SAndroid Build Coastguard Worker
SleepMs(int milliseconds)574*d9f75844SAndroid Build Coastguard Worker bool Thread::SleepMs(int milliseconds) {
575*d9f75844SAndroid Build Coastguard Worker AssertBlockingIsAllowedOnCurrentThread();
576*d9f75844SAndroid Build Coastguard Worker
577*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_WIN)
578*d9f75844SAndroid Build Coastguard Worker ::Sleep(milliseconds);
579*d9f75844SAndroid Build Coastguard Worker return true;
580*d9f75844SAndroid Build Coastguard Worker #else
581*d9f75844SAndroid Build Coastguard Worker // POSIX has both a usleep() and a nanosleep(), but the former is deprecated,
582*d9f75844SAndroid Build Coastguard Worker // so we use nanosleep() even though it has greater precision than necessary.
583*d9f75844SAndroid Build Coastguard Worker struct timespec ts;
584*d9f75844SAndroid Build Coastguard Worker ts.tv_sec = milliseconds / 1000;
585*d9f75844SAndroid Build Coastguard Worker ts.tv_nsec = (milliseconds % 1000) * 1000000;
586*d9f75844SAndroid Build Coastguard Worker int ret = nanosleep(&ts, nullptr);
587*d9f75844SAndroid Build Coastguard Worker if (ret != 0) {
588*d9f75844SAndroid Build Coastguard Worker RTC_LOG_ERR(LS_WARNING) << "nanosleep() returning early";
589*d9f75844SAndroid Build Coastguard Worker return false;
590*d9f75844SAndroid Build Coastguard Worker }
591*d9f75844SAndroid Build Coastguard Worker return true;
592*d9f75844SAndroid Build Coastguard Worker #endif
593*d9f75844SAndroid Build Coastguard Worker }
594*d9f75844SAndroid Build Coastguard Worker
SetName(absl::string_view name,const void * obj)595*d9f75844SAndroid Build Coastguard Worker bool Thread::SetName(absl::string_view name, const void* obj) {
596*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(!IsRunning());
597*d9f75844SAndroid Build Coastguard Worker
598*d9f75844SAndroid Build Coastguard Worker name_ = std::string(name);
599*d9f75844SAndroid Build Coastguard Worker if (obj) {
600*d9f75844SAndroid Build Coastguard Worker // The %p specifier typically produce at most 16 hex digits, possibly with a
601*d9f75844SAndroid Build Coastguard Worker // 0x prefix. But format is implementation defined, so add some margin.
602*d9f75844SAndroid Build Coastguard Worker char buf[30];
603*d9f75844SAndroid Build Coastguard Worker snprintf(buf, sizeof(buf), " 0x%p", obj);
604*d9f75844SAndroid Build Coastguard Worker name_ += buf;
605*d9f75844SAndroid Build Coastguard Worker }
606*d9f75844SAndroid Build Coastguard Worker return true;
607*d9f75844SAndroid Build Coastguard Worker }
608*d9f75844SAndroid Build Coastguard Worker
SetDispatchWarningMs(int deadline)609*d9f75844SAndroid Build Coastguard Worker void Thread::SetDispatchWarningMs(int deadline) {
610*d9f75844SAndroid Build Coastguard Worker if (!IsCurrent()) {
611*d9f75844SAndroid Build Coastguard Worker PostTask([this, deadline]() { SetDispatchWarningMs(deadline); });
612*d9f75844SAndroid Build Coastguard Worker return;
613*d9f75844SAndroid Build Coastguard Worker }
614*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK_RUN_ON(this);
615*d9f75844SAndroid Build Coastguard Worker dispatch_warning_ms_ = deadline;
616*d9f75844SAndroid Build Coastguard Worker }
617*d9f75844SAndroid Build Coastguard Worker
Start()618*d9f75844SAndroid Build Coastguard Worker bool Thread::Start() {
619*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(!IsRunning());
620*d9f75844SAndroid Build Coastguard Worker
621*d9f75844SAndroid Build Coastguard Worker if (IsRunning())
622*d9f75844SAndroid Build Coastguard Worker return false;
623*d9f75844SAndroid Build Coastguard Worker
624*d9f75844SAndroid Build Coastguard Worker Restart(); // reset IsQuitting() if the thread is being restarted
625*d9f75844SAndroid Build Coastguard Worker
626*d9f75844SAndroid Build Coastguard Worker // Make sure that ThreadManager is created on the main thread before
627*d9f75844SAndroid Build Coastguard Worker // we start a new thread.
628*d9f75844SAndroid Build Coastguard Worker ThreadManager::Instance();
629*d9f75844SAndroid Build Coastguard Worker
630*d9f75844SAndroid Build Coastguard Worker owned_ = true;
631*d9f75844SAndroid Build Coastguard Worker
632*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_WIN)
633*d9f75844SAndroid Build Coastguard Worker thread_ = CreateThread(nullptr, 0, PreRun, this, 0, &thread_id_);
634*d9f75844SAndroid Build Coastguard Worker if (!thread_) {
635*d9f75844SAndroid Build Coastguard Worker return false;
636*d9f75844SAndroid Build Coastguard Worker }
637*d9f75844SAndroid Build Coastguard Worker #elif defined(WEBRTC_POSIX)
638*d9f75844SAndroid Build Coastguard Worker pthread_attr_t attr;
639*d9f75844SAndroid Build Coastguard Worker pthread_attr_init(&attr);
640*d9f75844SAndroid Build Coastguard Worker
641*d9f75844SAndroid Build Coastguard Worker int error_code = pthread_create(&thread_, &attr, PreRun, this);
642*d9f75844SAndroid Build Coastguard Worker if (0 != error_code) {
643*d9f75844SAndroid Build Coastguard Worker RTC_LOG(LS_ERROR) << "Unable to create pthread, error " << error_code;
644*d9f75844SAndroid Build Coastguard Worker thread_ = 0;
645*d9f75844SAndroid Build Coastguard Worker return false;
646*d9f75844SAndroid Build Coastguard Worker }
647*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(thread_);
648*d9f75844SAndroid Build Coastguard Worker #endif
649*d9f75844SAndroid Build Coastguard Worker return true;
650*d9f75844SAndroid Build Coastguard Worker }
651*d9f75844SAndroid Build Coastguard Worker
WrapCurrent()652*d9f75844SAndroid Build Coastguard Worker bool Thread::WrapCurrent() {
653*d9f75844SAndroid Build Coastguard Worker return WrapCurrentWithThreadManager(ThreadManager::Instance(), true);
654*d9f75844SAndroid Build Coastguard Worker }
655*d9f75844SAndroid Build Coastguard Worker
UnwrapCurrent()656*d9f75844SAndroid Build Coastguard Worker void Thread::UnwrapCurrent() {
657*d9f75844SAndroid Build Coastguard Worker // Clears the platform-specific thread-specific storage.
658*d9f75844SAndroid Build Coastguard Worker ThreadManager::Instance()->SetCurrentThread(nullptr);
659*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_WIN)
660*d9f75844SAndroid Build Coastguard Worker if (thread_ != nullptr) {
661*d9f75844SAndroid Build Coastguard Worker if (!CloseHandle(thread_)) {
662*d9f75844SAndroid Build Coastguard Worker RTC_LOG_GLE(LS_ERROR)
663*d9f75844SAndroid Build Coastguard Worker << "When unwrapping thread, failed to close handle.";
664*d9f75844SAndroid Build Coastguard Worker }
665*d9f75844SAndroid Build Coastguard Worker thread_ = nullptr;
666*d9f75844SAndroid Build Coastguard Worker thread_id_ = 0;
667*d9f75844SAndroid Build Coastguard Worker }
668*d9f75844SAndroid Build Coastguard Worker #elif defined(WEBRTC_POSIX)
669*d9f75844SAndroid Build Coastguard Worker thread_ = 0;
670*d9f75844SAndroid Build Coastguard Worker #endif
671*d9f75844SAndroid Build Coastguard Worker }
672*d9f75844SAndroid Build Coastguard Worker
SafeWrapCurrent()673*d9f75844SAndroid Build Coastguard Worker void Thread::SafeWrapCurrent() {
674*d9f75844SAndroid Build Coastguard Worker WrapCurrentWithThreadManager(ThreadManager::Instance(), false);
675*d9f75844SAndroid Build Coastguard Worker }
676*d9f75844SAndroid Build Coastguard Worker
Join()677*d9f75844SAndroid Build Coastguard Worker void Thread::Join() {
678*d9f75844SAndroid Build Coastguard Worker if (!IsRunning())
679*d9f75844SAndroid Build Coastguard Worker return;
680*d9f75844SAndroid Build Coastguard Worker
681*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(!IsCurrent());
682*d9f75844SAndroid Build Coastguard Worker if (Current() && !Current()->blocking_calls_allowed_) {
683*d9f75844SAndroid Build Coastguard Worker RTC_LOG(LS_WARNING) << "Waiting for the thread to join, "
684*d9f75844SAndroid Build Coastguard Worker "but blocking calls have been disallowed";
685*d9f75844SAndroid Build Coastguard Worker }
686*d9f75844SAndroid Build Coastguard Worker
687*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_WIN)
688*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(thread_ != nullptr);
689*d9f75844SAndroid Build Coastguard Worker WaitForSingleObject(thread_, INFINITE);
690*d9f75844SAndroid Build Coastguard Worker CloseHandle(thread_);
691*d9f75844SAndroid Build Coastguard Worker thread_ = nullptr;
692*d9f75844SAndroid Build Coastguard Worker thread_id_ = 0;
693*d9f75844SAndroid Build Coastguard Worker #elif defined(WEBRTC_POSIX)
694*d9f75844SAndroid Build Coastguard Worker pthread_join(thread_, nullptr);
695*d9f75844SAndroid Build Coastguard Worker thread_ = 0;
696*d9f75844SAndroid Build Coastguard Worker #endif
697*d9f75844SAndroid Build Coastguard Worker }
698*d9f75844SAndroid Build Coastguard Worker
SetAllowBlockingCalls(bool allow)699*d9f75844SAndroid Build Coastguard Worker bool Thread::SetAllowBlockingCalls(bool allow) {
700*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(IsCurrent());
701*d9f75844SAndroid Build Coastguard Worker bool previous = blocking_calls_allowed_;
702*d9f75844SAndroid Build Coastguard Worker blocking_calls_allowed_ = allow;
703*d9f75844SAndroid Build Coastguard Worker return previous;
704*d9f75844SAndroid Build Coastguard Worker }
705*d9f75844SAndroid Build Coastguard Worker
706*d9f75844SAndroid Build Coastguard Worker // static
AssertBlockingIsAllowedOnCurrentThread()707*d9f75844SAndroid Build Coastguard Worker void Thread::AssertBlockingIsAllowedOnCurrentThread() {
708*d9f75844SAndroid Build Coastguard Worker #if !defined(NDEBUG)
709*d9f75844SAndroid Build Coastguard Worker Thread* current = Thread::Current();
710*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(!current || current->blocking_calls_allowed_);
711*d9f75844SAndroid Build Coastguard Worker #endif
712*d9f75844SAndroid Build Coastguard Worker }
713*d9f75844SAndroid Build Coastguard Worker
714*d9f75844SAndroid Build Coastguard Worker // static
715*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_WIN)
PreRun(LPVOID pv)716*d9f75844SAndroid Build Coastguard Worker DWORD WINAPI Thread::PreRun(LPVOID pv) {
717*d9f75844SAndroid Build Coastguard Worker #else
718*d9f75844SAndroid Build Coastguard Worker void* Thread::PreRun(void* pv) {
719*d9f75844SAndroid Build Coastguard Worker #endif
720*d9f75844SAndroid Build Coastguard Worker Thread* thread = static_cast<Thread*>(pv);
721*d9f75844SAndroid Build Coastguard Worker ThreadManager::Instance()->SetCurrentThread(thread);
722*d9f75844SAndroid Build Coastguard Worker rtc::SetCurrentThreadName(thread->name_.c_str());
723*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_MAC)
724*d9f75844SAndroid Build Coastguard Worker ScopedAutoReleasePool pool;
725*d9f75844SAndroid Build Coastguard Worker #endif
726*d9f75844SAndroid Build Coastguard Worker thread->Run();
727*d9f75844SAndroid Build Coastguard Worker
728*d9f75844SAndroid Build Coastguard Worker ThreadManager::Instance()->SetCurrentThread(nullptr);
729*d9f75844SAndroid Build Coastguard Worker #ifdef WEBRTC_WIN
730*d9f75844SAndroid Build Coastguard Worker return 0;
731*d9f75844SAndroid Build Coastguard Worker #else
732*d9f75844SAndroid Build Coastguard Worker return nullptr;
733*d9f75844SAndroid Build Coastguard Worker #endif
734*d9f75844SAndroid Build Coastguard Worker } // namespace rtc
735*d9f75844SAndroid Build Coastguard Worker
736*d9f75844SAndroid Build Coastguard Worker void Thread::Run() {
737*d9f75844SAndroid Build Coastguard Worker ProcessMessages(kForever);
738*d9f75844SAndroid Build Coastguard Worker }
739*d9f75844SAndroid Build Coastguard Worker
740*d9f75844SAndroid Build Coastguard Worker bool Thread::IsOwned() {
741*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(IsRunning());
742*d9f75844SAndroid Build Coastguard Worker return owned_;
743*d9f75844SAndroid Build Coastguard Worker }
744*d9f75844SAndroid Build Coastguard Worker
745*d9f75844SAndroid Build Coastguard Worker void Thread::Stop() {
746*d9f75844SAndroid Build Coastguard Worker Thread::Quit();
747*d9f75844SAndroid Build Coastguard Worker Join();
748*d9f75844SAndroid Build Coastguard Worker }
749*d9f75844SAndroid Build Coastguard Worker
750*d9f75844SAndroid Build Coastguard Worker void Thread::BlockingCall(rtc::FunctionView<void()> functor) {
751*d9f75844SAndroid Build Coastguard Worker TRACE_EVENT0("webrtc", "Thread::BlockingCall");
752*d9f75844SAndroid Build Coastguard Worker
753*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(!IsQuitting());
754*d9f75844SAndroid Build Coastguard Worker if (IsQuitting())
755*d9f75844SAndroid Build Coastguard Worker return;
756*d9f75844SAndroid Build Coastguard Worker
757*d9f75844SAndroid Build Coastguard Worker if (IsCurrent()) {
758*d9f75844SAndroid Build Coastguard Worker #if RTC_DCHECK_IS_ON
759*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(this->IsInvokeToThreadAllowed(this));
760*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK_RUN_ON(this);
761*d9f75844SAndroid Build Coastguard Worker could_be_blocking_call_count_++;
762*d9f75844SAndroid Build Coastguard Worker #endif
763*d9f75844SAndroid Build Coastguard Worker functor();
764*d9f75844SAndroid Build Coastguard Worker return;
765*d9f75844SAndroid Build Coastguard Worker }
766*d9f75844SAndroid Build Coastguard Worker
767*d9f75844SAndroid Build Coastguard Worker AssertBlockingIsAllowedOnCurrentThread();
768*d9f75844SAndroid Build Coastguard Worker
769*d9f75844SAndroid Build Coastguard Worker Thread* current_thread = Thread::Current();
770*d9f75844SAndroid Build Coastguard Worker
771*d9f75844SAndroid Build Coastguard Worker #if RTC_DCHECK_IS_ON
772*d9f75844SAndroid Build Coastguard Worker if (current_thread) {
773*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK_RUN_ON(current_thread);
774*d9f75844SAndroid Build Coastguard Worker current_thread->blocking_call_count_++;
775*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this));
776*d9f75844SAndroid Build Coastguard Worker ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread,
777*d9f75844SAndroid Build Coastguard Worker this);
778*d9f75844SAndroid Build Coastguard Worker }
779*d9f75844SAndroid Build Coastguard Worker #endif
780*d9f75844SAndroid Build Coastguard Worker
781*d9f75844SAndroid Build Coastguard Worker // Perhaps down the line we can get rid of this workaround and always require
782*d9f75844SAndroid Build Coastguard Worker // current_thread to be valid when BlockingCall() is called.
783*d9f75844SAndroid Build Coastguard Worker std::unique_ptr<rtc::Event> done_event;
784*d9f75844SAndroid Build Coastguard Worker if (!current_thread)
785*d9f75844SAndroid Build Coastguard Worker done_event.reset(new rtc::Event());
786*d9f75844SAndroid Build Coastguard Worker
787*d9f75844SAndroid Build Coastguard Worker bool ready = false;
788*d9f75844SAndroid Build Coastguard Worker absl::Cleanup cleanup = [this, &ready, current_thread,
789*d9f75844SAndroid Build Coastguard Worker done = done_event.get()] {
790*d9f75844SAndroid Build Coastguard Worker if (current_thread) {
791*d9f75844SAndroid Build Coastguard Worker {
792*d9f75844SAndroid Build Coastguard Worker MutexLock lock(&mutex_);
793*d9f75844SAndroid Build Coastguard Worker ready = true;
794*d9f75844SAndroid Build Coastguard Worker }
795*d9f75844SAndroid Build Coastguard Worker current_thread->socketserver()->WakeUp();
796*d9f75844SAndroid Build Coastguard Worker } else {
797*d9f75844SAndroid Build Coastguard Worker done->Set();
798*d9f75844SAndroid Build Coastguard Worker }
799*d9f75844SAndroid Build Coastguard Worker };
800*d9f75844SAndroid Build Coastguard Worker PostTask([functor, cleanup = std::move(cleanup)] { functor(); });
801*d9f75844SAndroid Build Coastguard Worker if (current_thread) {
802*d9f75844SAndroid Build Coastguard Worker bool waited = false;
803*d9f75844SAndroid Build Coastguard Worker mutex_.Lock();
804*d9f75844SAndroid Build Coastguard Worker while (!ready) {
805*d9f75844SAndroid Build Coastguard Worker mutex_.Unlock();
806*d9f75844SAndroid Build Coastguard Worker current_thread->socketserver()->Wait(SocketServer::kForever, false);
807*d9f75844SAndroid Build Coastguard Worker waited = true;
808*d9f75844SAndroid Build Coastguard Worker mutex_.Lock();
809*d9f75844SAndroid Build Coastguard Worker }
810*d9f75844SAndroid Build Coastguard Worker mutex_.Unlock();
811*d9f75844SAndroid Build Coastguard Worker
812*d9f75844SAndroid Build Coastguard Worker // Our Wait loop above may have consumed some WakeUp events for this
813*d9f75844SAndroid Build Coastguard Worker // Thread, that weren't relevant to this Send. Losing these WakeUps can
814*d9f75844SAndroid Build Coastguard Worker // cause problems for some SocketServers.
815*d9f75844SAndroid Build Coastguard Worker //
816*d9f75844SAndroid Build Coastguard Worker // Concrete example:
817*d9f75844SAndroid Build Coastguard Worker // Win32SocketServer on thread A calls Send on thread B. While processing
818*d9f75844SAndroid Build Coastguard Worker // the message, thread B Posts a message to A. We consume the wakeup for
819*d9f75844SAndroid Build Coastguard Worker // that Post while waiting for the Send to complete, which means that when
820*d9f75844SAndroid Build Coastguard Worker // we exit this loop, we need to issue another WakeUp, or else the Posted
821*d9f75844SAndroid Build Coastguard Worker // message won't be processed in a timely manner.
822*d9f75844SAndroid Build Coastguard Worker
823*d9f75844SAndroid Build Coastguard Worker if (waited) {
824*d9f75844SAndroid Build Coastguard Worker current_thread->socketserver()->WakeUp();
825*d9f75844SAndroid Build Coastguard Worker }
826*d9f75844SAndroid Build Coastguard Worker } else {
827*d9f75844SAndroid Build Coastguard Worker done_event->Wait(rtc::Event::kForever);
828*d9f75844SAndroid Build Coastguard Worker }
829*d9f75844SAndroid Build Coastguard Worker }
830*d9f75844SAndroid Build Coastguard Worker
831*d9f75844SAndroid Build Coastguard Worker // Called by the ThreadManager when being set as the current thread.
832*d9f75844SAndroid Build Coastguard Worker void Thread::EnsureIsCurrentTaskQueue() {
833*d9f75844SAndroid Build Coastguard Worker task_queue_registration_ =
834*d9f75844SAndroid Build Coastguard Worker std::make_unique<TaskQueueBase::CurrentTaskQueueSetter>(this);
835*d9f75844SAndroid Build Coastguard Worker }
836*d9f75844SAndroid Build Coastguard Worker
837*d9f75844SAndroid Build Coastguard Worker // Called by the ThreadManager when being set as the current thread.
838*d9f75844SAndroid Build Coastguard Worker void Thread::ClearCurrentTaskQueue() {
839*d9f75844SAndroid Build Coastguard Worker task_queue_registration_.reset();
840*d9f75844SAndroid Build Coastguard Worker }
841*d9f75844SAndroid Build Coastguard Worker
842*d9f75844SAndroid Build Coastguard Worker void Thread::AllowInvokesToThread(Thread* thread) {
843*d9f75844SAndroid Build Coastguard Worker #if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
844*d9f75844SAndroid Build Coastguard Worker if (!IsCurrent()) {
845*d9f75844SAndroid Build Coastguard Worker PostTask([thread, this]() { AllowInvokesToThread(thread); });
846*d9f75844SAndroid Build Coastguard Worker return;
847*d9f75844SAndroid Build Coastguard Worker }
848*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK_RUN_ON(this);
849*d9f75844SAndroid Build Coastguard Worker allowed_threads_.push_back(thread);
850*d9f75844SAndroid Build Coastguard Worker invoke_policy_enabled_ = true;
851*d9f75844SAndroid Build Coastguard Worker #endif
852*d9f75844SAndroid Build Coastguard Worker }
853*d9f75844SAndroid Build Coastguard Worker
854*d9f75844SAndroid Build Coastguard Worker void Thread::DisallowAllInvokes() {
855*d9f75844SAndroid Build Coastguard Worker #if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
856*d9f75844SAndroid Build Coastguard Worker if (!IsCurrent()) {
857*d9f75844SAndroid Build Coastguard Worker PostTask([this]() { DisallowAllInvokes(); });
858*d9f75844SAndroid Build Coastguard Worker return;
859*d9f75844SAndroid Build Coastguard Worker }
860*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK_RUN_ON(this);
861*d9f75844SAndroid Build Coastguard Worker allowed_threads_.clear();
862*d9f75844SAndroid Build Coastguard Worker invoke_policy_enabled_ = true;
863*d9f75844SAndroid Build Coastguard Worker #endif
864*d9f75844SAndroid Build Coastguard Worker }
865*d9f75844SAndroid Build Coastguard Worker
866*d9f75844SAndroid Build Coastguard Worker #if RTC_DCHECK_IS_ON
867*d9f75844SAndroid Build Coastguard Worker uint32_t Thread::GetBlockingCallCount() const {
868*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK_RUN_ON(this);
869*d9f75844SAndroid Build Coastguard Worker return blocking_call_count_;
870*d9f75844SAndroid Build Coastguard Worker }
871*d9f75844SAndroid Build Coastguard Worker uint32_t Thread::GetCouldBeBlockingCallCount() const {
872*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK_RUN_ON(this);
873*d9f75844SAndroid Build Coastguard Worker return could_be_blocking_call_count_;
874*d9f75844SAndroid Build Coastguard Worker }
875*d9f75844SAndroid Build Coastguard Worker #endif
876*d9f75844SAndroid Build Coastguard Worker
877*d9f75844SAndroid Build Coastguard Worker // Returns true if no policies added or if there is at least one policy
878*d9f75844SAndroid Build Coastguard Worker // that permits invocation to `target` thread.
879*d9f75844SAndroid Build Coastguard Worker bool Thread::IsInvokeToThreadAllowed(rtc::Thread* target) {
880*d9f75844SAndroid Build Coastguard Worker #if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
881*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK_RUN_ON(this);
882*d9f75844SAndroid Build Coastguard Worker if (!invoke_policy_enabled_) {
883*d9f75844SAndroid Build Coastguard Worker return true;
884*d9f75844SAndroid Build Coastguard Worker }
885*d9f75844SAndroid Build Coastguard Worker for (const auto* thread : allowed_threads_) {
886*d9f75844SAndroid Build Coastguard Worker if (thread == target) {
887*d9f75844SAndroid Build Coastguard Worker return true;
888*d9f75844SAndroid Build Coastguard Worker }
889*d9f75844SAndroid Build Coastguard Worker }
890*d9f75844SAndroid Build Coastguard Worker return false;
891*d9f75844SAndroid Build Coastguard Worker #else
892*d9f75844SAndroid Build Coastguard Worker return true;
893*d9f75844SAndroid Build Coastguard Worker #endif
894*d9f75844SAndroid Build Coastguard Worker }
895*d9f75844SAndroid Build Coastguard Worker
896*d9f75844SAndroid Build Coastguard Worker void Thread::Delete() {
897*d9f75844SAndroid Build Coastguard Worker Stop();
898*d9f75844SAndroid Build Coastguard Worker delete this;
899*d9f75844SAndroid Build Coastguard Worker }
900*d9f75844SAndroid Build Coastguard Worker
901*d9f75844SAndroid Build Coastguard Worker void Thread::PostDelayedTask(absl::AnyInvocable<void() &&> task,
902*d9f75844SAndroid Build Coastguard Worker webrtc::TimeDelta delay) {
903*d9f75844SAndroid Build Coastguard Worker // This implementation does not support low precision yet.
904*d9f75844SAndroid Build Coastguard Worker PostDelayedHighPrecisionTask(std::move(task), delay);
905*d9f75844SAndroid Build Coastguard Worker }
906*d9f75844SAndroid Build Coastguard Worker
907*d9f75844SAndroid Build Coastguard Worker bool Thread::IsProcessingMessagesForTesting() {
908*d9f75844SAndroid Build Coastguard Worker return (owned_ || IsCurrent()) && !IsQuitting();
909*d9f75844SAndroid Build Coastguard Worker }
910*d9f75844SAndroid Build Coastguard Worker
911*d9f75844SAndroid Build Coastguard Worker bool Thread::ProcessMessages(int cmsLoop) {
912*d9f75844SAndroid Build Coastguard Worker // Using ProcessMessages with a custom clock for testing and a time greater
913*d9f75844SAndroid Build Coastguard Worker // than 0 doesn't work, since it's not guaranteed to advance the custom
914*d9f75844SAndroid Build Coastguard Worker // clock's time, and may get stuck in an infinite loop.
915*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 ||
916*d9f75844SAndroid Build Coastguard Worker cmsLoop == kForever);
917*d9f75844SAndroid Build Coastguard Worker int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
918*d9f75844SAndroid Build Coastguard Worker int cmsNext = cmsLoop;
919*d9f75844SAndroid Build Coastguard Worker
920*d9f75844SAndroid Build Coastguard Worker while (true) {
921*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_MAC)
922*d9f75844SAndroid Build Coastguard Worker ScopedAutoReleasePool pool;
923*d9f75844SAndroid Build Coastguard Worker #endif
924*d9f75844SAndroid Build Coastguard Worker absl::AnyInvocable<void()&&> task = Get(cmsNext);
925*d9f75844SAndroid Build Coastguard Worker if (!task)
926*d9f75844SAndroid Build Coastguard Worker return !IsQuitting();
927*d9f75844SAndroid Build Coastguard Worker Dispatch(std::move(task));
928*d9f75844SAndroid Build Coastguard Worker
929*d9f75844SAndroid Build Coastguard Worker if (cmsLoop != kForever) {
930*d9f75844SAndroid Build Coastguard Worker cmsNext = static_cast<int>(TimeUntil(msEnd));
931*d9f75844SAndroid Build Coastguard Worker if (cmsNext < 0)
932*d9f75844SAndroid Build Coastguard Worker return true;
933*d9f75844SAndroid Build Coastguard Worker }
934*d9f75844SAndroid Build Coastguard Worker }
935*d9f75844SAndroid Build Coastguard Worker }
936*d9f75844SAndroid Build Coastguard Worker
937*d9f75844SAndroid Build Coastguard Worker bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager,
938*d9f75844SAndroid Build Coastguard Worker bool need_synchronize_access) {
939*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(!IsRunning());
940*d9f75844SAndroid Build Coastguard Worker
941*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_WIN)
942*d9f75844SAndroid Build Coastguard Worker if (need_synchronize_access) {
943*d9f75844SAndroid Build Coastguard Worker // We explicitly ask for no rights other than synchronization.
944*d9f75844SAndroid Build Coastguard Worker // This gives us the best chance of succeeding.
945*d9f75844SAndroid Build Coastguard Worker thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId());
946*d9f75844SAndroid Build Coastguard Worker if (!thread_) {
947*d9f75844SAndroid Build Coastguard Worker RTC_LOG_GLE(LS_ERROR) << "Unable to get handle to thread.";
948*d9f75844SAndroid Build Coastguard Worker return false;
949*d9f75844SAndroid Build Coastguard Worker }
950*d9f75844SAndroid Build Coastguard Worker thread_id_ = GetCurrentThreadId();
951*d9f75844SAndroid Build Coastguard Worker }
952*d9f75844SAndroid Build Coastguard Worker #elif defined(WEBRTC_POSIX)
953*d9f75844SAndroid Build Coastguard Worker thread_ = pthread_self();
954*d9f75844SAndroid Build Coastguard Worker #endif
955*d9f75844SAndroid Build Coastguard Worker owned_ = false;
956*d9f75844SAndroid Build Coastguard Worker thread_manager->SetCurrentThread(this);
957*d9f75844SAndroid Build Coastguard Worker return true;
958*d9f75844SAndroid Build Coastguard Worker }
959*d9f75844SAndroid Build Coastguard Worker
960*d9f75844SAndroid Build Coastguard Worker bool Thread::IsRunning() {
961*d9f75844SAndroid Build Coastguard Worker #if defined(WEBRTC_WIN)
962*d9f75844SAndroid Build Coastguard Worker return thread_ != nullptr;
963*d9f75844SAndroid Build Coastguard Worker #elif defined(WEBRTC_POSIX)
964*d9f75844SAndroid Build Coastguard Worker return thread_ != 0;
965*d9f75844SAndroid Build Coastguard Worker #endif
966*d9f75844SAndroid Build Coastguard Worker }
967*d9f75844SAndroid Build Coastguard Worker
968*d9f75844SAndroid Build Coastguard Worker AutoThread::AutoThread()
969*d9f75844SAndroid Build Coastguard Worker : Thread(CreateDefaultSocketServer(), /*do_init=*/false) {
970*d9f75844SAndroid Build Coastguard Worker if (!ThreadManager::Instance()->CurrentThread()) {
971*d9f75844SAndroid Build Coastguard Worker // DoInit registers with ThreadManager. Do that only if we intend to
972*d9f75844SAndroid Build Coastguard Worker // be rtc::Thread::Current(), otherwise ProcessAllMessageQueuesInternal will
973*d9f75844SAndroid Build Coastguard Worker // post a message to a queue that no running thread is serving.
974*d9f75844SAndroid Build Coastguard Worker DoInit();
975*d9f75844SAndroid Build Coastguard Worker ThreadManager::Instance()->SetCurrentThread(this);
976*d9f75844SAndroid Build Coastguard Worker }
977*d9f75844SAndroid Build Coastguard Worker }
978*d9f75844SAndroid Build Coastguard Worker
979*d9f75844SAndroid Build Coastguard Worker AutoThread::~AutoThread() {
980*d9f75844SAndroid Build Coastguard Worker Stop();
981*d9f75844SAndroid Build Coastguard Worker DoDestroy();
982*d9f75844SAndroid Build Coastguard Worker if (ThreadManager::Instance()->CurrentThread() == this) {
983*d9f75844SAndroid Build Coastguard Worker ThreadManager::Instance()->SetCurrentThread(nullptr);
984*d9f75844SAndroid Build Coastguard Worker }
985*d9f75844SAndroid Build Coastguard Worker }
986*d9f75844SAndroid Build Coastguard Worker
987*d9f75844SAndroid Build Coastguard Worker AutoSocketServerThread::AutoSocketServerThread(SocketServer* ss)
988*d9f75844SAndroid Build Coastguard Worker : Thread(ss, /*do_init=*/false) {
989*d9f75844SAndroid Build Coastguard Worker DoInit();
990*d9f75844SAndroid Build Coastguard Worker old_thread_ = ThreadManager::Instance()->CurrentThread();
991*d9f75844SAndroid Build Coastguard Worker // Temporarily set the current thread to nullptr so that we can keep checks
992*d9f75844SAndroid Build Coastguard Worker // around that catch unintentional pointer overwrites.
993*d9f75844SAndroid Build Coastguard Worker rtc::ThreadManager::Instance()->SetCurrentThread(nullptr);
994*d9f75844SAndroid Build Coastguard Worker rtc::ThreadManager::Instance()->SetCurrentThread(this);
995*d9f75844SAndroid Build Coastguard Worker if (old_thread_) {
996*d9f75844SAndroid Build Coastguard Worker ThreadManager::Remove(old_thread_);
997*d9f75844SAndroid Build Coastguard Worker }
998*d9f75844SAndroid Build Coastguard Worker }
999*d9f75844SAndroid Build Coastguard Worker
1000*d9f75844SAndroid Build Coastguard Worker AutoSocketServerThread::~AutoSocketServerThread() {
1001*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(ThreadManager::Instance()->CurrentThread() == this);
1002*d9f75844SAndroid Build Coastguard Worker // Stop and destroy the thread before clearing it as the current thread.
1003*d9f75844SAndroid Build Coastguard Worker // Sometimes there are messages left in the Thread that will be
1004*d9f75844SAndroid Build Coastguard Worker // destroyed by DoDestroy, and sometimes the destructors of the message and/or
1005*d9f75844SAndroid Build Coastguard Worker // its contents rely on this thread still being set as the current thread.
1006*d9f75844SAndroid Build Coastguard Worker Stop();
1007*d9f75844SAndroid Build Coastguard Worker DoDestroy();
1008*d9f75844SAndroid Build Coastguard Worker rtc::ThreadManager::Instance()->SetCurrentThread(nullptr);
1009*d9f75844SAndroid Build Coastguard Worker rtc::ThreadManager::Instance()->SetCurrentThread(old_thread_);
1010*d9f75844SAndroid Build Coastguard Worker if (old_thread_) {
1011*d9f75844SAndroid Build Coastguard Worker ThreadManager::Add(old_thread_);
1012*d9f75844SAndroid Build Coastguard Worker }
1013*d9f75844SAndroid Build Coastguard Worker }
1014*d9f75844SAndroid Build Coastguard Worker
1015*d9f75844SAndroid Build Coastguard Worker } // namespace rtc
1016