1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/proxy_resolution/multi_threaded_proxy_resolver.h"
6
7 #include <memory>
8 #include <utility>
9 #include <vector>
10
11 #include "base/containers/circular_deque.h"
12 #include "base/functional/bind.h"
13 #include "base/functional/callback_helpers.h"
14 #include "base/location.h"
15 #include "base/memory/raw_ptr.h"
16 #include "base/strings/string_util.h"
17 #include "base/strings/stringprintf.h"
18 #include "base/task/single_thread_task_runner.h"
19 #include "base/threading/thread.h"
20 #include "base/threading/thread_checker.h"
21 #include "base/threading/thread_restrictions.h"
22 #include "net/base/net_errors.h"
23 #include "net/base/network_anonymization_key.h"
24 #include "net/log/net_log.h"
25 #include "net/log/net_log_event_type.h"
26 #include "net/log/net_log_with_source.h"
27 #include "net/proxy_resolution/proxy_info.h"
28 #include "net/proxy_resolution/proxy_resolver.h"
29
30 namespace net {
31
32 class NetworkAnonymizationKey;
33
34 // http://crbug.com/69710
35 class MultiThreadedProxyResolverScopedAllowJoinOnIO
36 : public base::ScopedAllowBaseSyncPrimitivesOutsideBlockingScope {};
37
38 namespace {
39 class Job;
40
41 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
42 // thread and a synchronous ProxyResolver (which will be operated on said
43 // thread.)
44 class Executor : public base::RefCountedThreadSafe<Executor> {
45 public:
46 class Coordinator {
47 public:
48 virtual void OnExecutorReady(Executor* executor) = 0;
49
50 protected:
51 virtual ~Coordinator() = default;
52 };
53
54 // |coordinator| must remain valid throughout our lifetime. It is used to
55 // signal when the executor is ready to receive work by calling
56 // |coordinator->OnExecutorReady()|.
57 // |thread_number| is an identifier used when naming the worker thread.
58 Executor(Coordinator* coordinator, int thread_number);
59
60 // Submit a job to this executor.
61 void StartJob(scoped_refptr<Job> job);
62
63 // Callback for when a job has completed running on the executor's thread.
64 void OnJobCompleted(Job* job);
65
66 // Cleanup the executor. Cancels all outstanding work, and frees the thread
67 // and resolver.
68 void Destroy();
69
70 // Returns the outstanding job, or NULL.
outstanding_job() const71 Job* outstanding_job() const { return outstanding_job_.get(); }
72
resolver()73 ProxyResolver* resolver() { return resolver_.get(); }
74
thread_number() const75 int thread_number() const { return thread_number_; }
76
set_resolver(std::unique_ptr<ProxyResolver> resolver)77 void set_resolver(std::unique_ptr<ProxyResolver> resolver) {
78 resolver_ = std::move(resolver);
79 }
80
set_coordinator(Coordinator * coordinator)81 void set_coordinator(Coordinator* coordinator) {
82 DCHECK(coordinator);
83 DCHECK(coordinator_);
84 coordinator_ = coordinator;
85 }
86
87 private:
88 friend class base::RefCountedThreadSafe<Executor>;
89 ~Executor();
90
91 raw_ptr<Coordinator> coordinator_;
92 const int thread_number_;
93
94 // The currently active job for this executor (either a CreateProxyResolver or
95 // GetProxyForURL task).
96 scoped_refptr<Job> outstanding_job_;
97
98 // The synchronous resolver implementation.
99 std::unique_ptr<ProxyResolver> resolver_;
100
101 // The thread where |resolver_| is run on.
102 // Note that declaration ordering is important here. |thread_| needs to be
103 // destroyed *before* |resolver_|, in case |resolver_| is currently
104 // executing on |thread_|.
105 std::unique_ptr<base::Thread> thread_;
106 };
107
108 class MultiThreadedProxyResolver : public ProxyResolver,
109 public Executor::Coordinator {
110 public:
111 // Creates an asynchronous ProxyResolver that runs requests on up to
112 // |max_num_threads|.
113 //
114 // For each thread that is created, an accompanying synchronous ProxyResolver
115 // will be provisioned using |resolver_factory|. All methods on these
116 // ProxyResolvers will be called on the one thread.
117 MultiThreadedProxyResolver(
118 std::unique_ptr<ProxyResolverFactory> resolver_factory,
119 size_t max_num_threads,
120 const scoped_refptr<PacFileData>& script_data,
121 scoped_refptr<Executor> executor);
122
123 ~MultiThreadedProxyResolver() override;
124
125 // ProxyResolver implementation:
126 int GetProxyForURL(const GURL& url,
127 const NetworkAnonymizationKey& network_anonymization_key,
128 ProxyInfo* results,
129 CompletionOnceCallback callback,
130 std::unique_ptr<Request>* request,
131 const NetLogWithSource& net_log) override;
132
133 private:
134 class GetProxyForURLJob;
135 class RequestImpl;
136 // FIFO queue of pending jobs waiting to be started.
137 // TODO(eroman): Make this priority queue.
138 using PendingJobsQueue = base::circular_deque<scoped_refptr<Job>>;
139 using ExecutorList = std::vector<scoped_refptr<Executor>>;
140
141 // Returns an idle worker thread which is ready to receive GetProxyForURL()
142 // requests. If all threads are occupied, returns NULL.
143 Executor* FindIdleExecutor();
144
145 // Creates a new worker thread, and appends it to |executors_|.
146 void AddNewExecutor();
147
148 // Starts the next job from |pending_jobs_| if possible.
149 void OnExecutorReady(Executor* executor) override;
150
151 const std::unique_ptr<ProxyResolverFactory> resolver_factory_;
152 const size_t max_num_threads_;
153 PendingJobsQueue pending_jobs_;
154 ExecutorList executors_;
155 scoped_refptr<PacFileData> script_data_;
156
157 THREAD_CHECKER(thread_checker_);
158 };
159
160 // Job ---------------------------------------------
161
162 class Job : public base::RefCountedThreadSafe<Job> {
163 public:
164 Job() = default;
165
set_executor(Executor * executor)166 void set_executor(Executor* executor) {
167 executor_ = executor;
168 }
169
170 // The "executor" is the job runner that is scheduling this job. If
171 // this job has not been submitted to an executor yet, this will be
172 // NULL (and we know it hasn't started yet).
executor()173 Executor* executor() {
174 return executor_;
175 }
176
177 // Mark the job as having been cancelled.
Cancel()178 void Cancel() {
179 was_cancelled_ = true;
180 }
181
182 // Returns true if Cancel() has been called.
was_cancelled() const183 bool was_cancelled() const { return was_cancelled_; }
184
185 // This method is called when the job is inserted into a wait queue
186 // because no executors were ready to accept it.
WaitingForThread()187 virtual void WaitingForThread() {}
188
189 // This method is called just before the job is posted to the work thread.
FinishedWaitingForThread()190 virtual void FinishedWaitingForThread() {}
191
192 // This method is called on the worker thread to do the job's work. On
193 // completion, implementors are expected to call OnJobCompleted() on
194 // |origin_runner|.
195 virtual void Run(
196 scoped_refptr<base::SingleThreadTaskRunner> origin_runner) = 0;
197
198 protected:
OnJobCompleted()199 void OnJobCompleted() {
200 // |executor_| will be NULL if the executor has already been deleted.
201 if (executor_)
202 executor_->OnJobCompleted(this);
203 }
204
205 friend class base::RefCountedThreadSafe<Job>;
206
207 virtual ~Job() = default;
208
209 private:
210 raw_ptr<Executor> executor_ = nullptr;
211 bool was_cancelled_ = false;
212 };
213
214 class MultiThreadedProxyResolver::RequestImpl : public ProxyResolver::Request {
215 public:
RequestImpl(scoped_refptr<Job> job)216 explicit RequestImpl(scoped_refptr<Job> job) : job_(std::move(job)) {}
217
~RequestImpl()218 ~RequestImpl() override { job_->Cancel(); }
219
GetLoadState()220 LoadState GetLoadState() override {
221 return LOAD_STATE_RESOLVING_PROXY_FOR_URL;
222 }
223
224 private:
225 scoped_refptr<Job> job_;
226 };
227
228 // CreateResolverJob -----------------------------------------------------------
229
230 // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver.
231 class CreateResolverJob : public Job {
232 public:
CreateResolverJob(const scoped_refptr<PacFileData> & script_data,ProxyResolverFactory * factory)233 CreateResolverJob(const scoped_refptr<PacFileData>& script_data,
234 ProxyResolverFactory* factory)
235 : script_data_(script_data), factory_(factory) {}
236
237 // Runs on the worker thread.
Run(scoped_refptr<base::SingleThreadTaskRunner> origin_runner)238 void Run(scoped_refptr<base::SingleThreadTaskRunner> origin_runner) override {
239 std::unique_ptr<ProxyResolverFactory::Request> request;
240 int rv = factory_->CreateProxyResolver(script_data_, &resolver_,
241 CompletionOnceCallback(), &request);
242
243 DCHECK_NE(rv, ERR_IO_PENDING);
244 origin_runner->PostTask(
245 FROM_HERE,
246 base::BindOnce(&CreateResolverJob::RequestComplete, this, rv));
247 }
248
249 protected:
250 ~CreateResolverJob() override = default;
251
252 private:
253 // Runs the completion callback on the origin thread.
RequestComplete(int result_code)254 void RequestComplete(int result_code) {
255 // The task may have been cancelled after it was started.
256 if (!was_cancelled()) {
257 DCHECK(executor());
258 executor()->set_resolver(std::move(resolver_));
259 }
260 OnJobCompleted();
261 }
262
263 const scoped_refptr<PacFileData> script_data_;
264 raw_ptr<ProxyResolverFactory, AcrossTasksDanglingUntriaged> factory_;
265 std::unique_ptr<ProxyResolver> resolver_;
266 };
267
268 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
269
270 class MultiThreadedProxyResolver::GetProxyForURLJob : public Job {
271 public:
272 // |url| -- the URL of the query.
273 // |results| -- the structure to fill with proxy resolve results.
GetProxyForURLJob(const GURL & url,const NetworkAnonymizationKey & network_anonymization_key,ProxyInfo * results,CompletionOnceCallback callback,const NetLogWithSource & net_log)274 GetProxyForURLJob(const GURL& url,
275 const NetworkAnonymizationKey& network_anonymization_key,
276 ProxyInfo* results,
277 CompletionOnceCallback callback,
278 const NetLogWithSource& net_log)
279 : callback_(std::move(callback)),
280 results_(results),
281 net_log_(net_log),
282 url_(url),
283 network_anonymization_key_(network_anonymization_key) {
284 DCHECK(callback_);
285 }
286
net_log()287 NetLogWithSource* net_log() { return &net_log_; }
288
WaitingForThread()289 void WaitingForThread() override {
290 was_waiting_for_thread_ = true;
291 net_log_.BeginEvent(NetLogEventType::WAITING_FOR_PROXY_RESOLVER_THREAD);
292 }
293
FinishedWaitingForThread()294 void FinishedWaitingForThread() override {
295 DCHECK(executor());
296
297 if (was_waiting_for_thread_) {
298 net_log_.EndEvent(NetLogEventType::WAITING_FOR_PROXY_RESOLVER_THREAD);
299 }
300
301 net_log_.AddEventWithIntParams(
302 NetLogEventType::SUBMITTED_TO_RESOLVER_THREAD, "thread_number",
303 executor()->thread_number());
304 }
305
306 // Runs on the worker thread.
Run(scoped_refptr<base::SingleThreadTaskRunner> origin_runner)307 void Run(scoped_refptr<base::SingleThreadTaskRunner> origin_runner) override {
308 ProxyResolver* resolver = executor()->resolver();
309 DCHECK(resolver);
310 int rv = resolver->GetProxyForURL(url_, network_anonymization_key_,
311 &results_buf_, CompletionOnceCallback(),
312 nullptr, net_log_);
313 DCHECK_NE(rv, ERR_IO_PENDING);
314
315 origin_runner->PostTask(
316 FROM_HERE, base::BindOnce(&GetProxyForURLJob::QueryComplete, this, rv));
317 }
318
319 protected:
320 ~GetProxyForURLJob() override = default;
321
322 private:
323 // Runs the completion callback on the origin thread.
QueryComplete(int result_code)324 void QueryComplete(int result_code) {
325 // The Job may have been cancelled after it was started.
326 if (!was_cancelled()) {
327 if (result_code >= OK) { // Note: unit-tests use values > 0.
328 results_->Use(results_buf_);
329 }
330 std::move(callback_).Run(result_code);
331 }
332 OnJobCompleted();
333 }
334
335 CompletionOnceCallback callback_;
336
337 // Must only be used on the "origin" thread.
338 raw_ptr<ProxyInfo> results_;
339
340 // Can be used on either "origin" or worker thread.
341 NetLogWithSource net_log_;
342
343 const GURL url_;
344 const NetworkAnonymizationKey network_anonymization_key_;
345
346 // Usable from within DoQuery on the worker thread.
347 ProxyInfo results_buf_;
348
349 bool was_waiting_for_thread_ = false;
350 };
351
352 // Executor ----------------------------------------
353
Executor(Executor::Coordinator * coordinator,int thread_number)354 Executor::Executor(Executor::Coordinator* coordinator, int thread_number)
355 : coordinator_(coordinator), thread_number_(thread_number) {
356 DCHECK(coordinator);
357 // Start up the thread.
358 thread_ = std::make_unique<base::Thread>(
359 base::StringPrintf("PAC thread #%d", thread_number));
360 CHECK(thread_->Start());
361 }
362
StartJob(scoped_refptr<Job> job)363 void Executor::StartJob(scoped_refptr<Job> job) {
364 DCHECK(!outstanding_job_.get());
365 outstanding_job_ = job;
366
367 // Run the job. Once it has completed (regardless of whether it was
368 // cancelled), it will invoke OnJobCompleted() on this thread.
369 job->set_executor(this);
370 job->FinishedWaitingForThread();
371 thread_->task_runner()->PostTask(
372 FROM_HERE,
373 base::BindOnce(&Job::Run, job,
374 base::SingleThreadTaskRunner::GetCurrentDefault()));
375 }
376
OnJobCompleted(Job * job)377 void Executor::OnJobCompleted(Job* job) {
378 DCHECK_EQ(job, outstanding_job_.get());
379 outstanding_job_ = nullptr;
380 coordinator_->OnExecutorReady(this);
381 }
382
Destroy()383 void Executor::Destroy() {
384 DCHECK(coordinator_);
385
386 {
387 // TODO(http://crbug.com/69710): Use ThreadPool instead of creating a
388 // base::Thread.
389 MultiThreadedProxyResolverScopedAllowJoinOnIO allow_thread_join;
390
391 // Join the worker thread.
392 thread_.reset();
393 }
394
395 // Cancel any outstanding job.
396 if (outstanding_job_.get()) {
397 outstanding_job_->Cancel();
398 // Orphan the job (since this executor may be deleted soon).
399 outstanding_job_->set_executor(nullptr);
400 }
401
402 // It is now safe to free the ProxyResolver, since all the tasks that
403 // were using it on the resolver thread have completed.
404 resolver_.reset();
405
406 // Null some stuff as a precaution.
407 coordinator_ = nullptr;
408 outstanding_job_ = nullptr;
409 }
410
~Executor()411 Executor::~Executor() {
412 // The important cleanup happens as part of Destroy(), which should always be
413 // called first.
414 DCHECK(!coordinator_) << "Destroy() was not called";
415 DCHECK(!thread_.get());
416 DCHECK(!resolver_.get());
417 DCHECK(!outstanding_job_.get());
418 }
419
420 // MultiThreadedProxyResolver --------------------------------------------------
421
MultiThreadedProxyResolver(std::unique_ptr<ProxyResolverFactory> resolver_factory,size_t max_num_threads,const scoped_refptr<PacFileData> & script_data,scoped_refptr<Executor> executor)422 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
423 std::unique_ptr<ProxyResolverFactory> resolver_factory,
424 size_t max_num_threads,
425 const scoped_refptr<PacFileData>& script_data,
426 scoped_refptr<Executor> executor)
427 : resolver_factory_(std::move(resolver_factory)),
428 max_num_threads_(max_num_threads),
429 script_data_(script_data) {
430 DCHECK(script_data_);
431 executor->set_coordinator(this);
432 executors_.push_back(executor);
433 }
434
~MultiThreadedProxyResolver()435 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
436 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
437 // We will cancel all outstanding requests.
438 pending_jobs_.clear();
439
440 for (auto& executor : executors_) {
441 executor->Destroy();
442 }
443 }
444
GetProxyForURL(const GURL & url,const NetworkAnonymizationKey & network_anonymization_key,ProxyInfo * results,CompletionOnceCallback callback,std::unique_ptr<Request> * request,const NetLogWithSource & net_log)445 int MultiThreadedProxyResolver::GetProxyForURL(
446 const GURL& url,
447 const NetworkAnonymizationKey& network_anonymization_key,
448 ProxyInfo* results,
449 CompletionOnceCallback callback,
450 std::unique_ptr<Request>* request,
451 const NetLogWithSource& net_log) {
452 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
453 DCHECK(!callback.is_null());
454
455 auto job = base::MakeRefCounted<GetProxyForURLJob>(
456 url, network_anonymization_key, results, std::move(callback), net_log);
457
458 // Completion will be notified through |callback|, unless the caller cancels
459 // the request using |request|.
460 if (request)
461 *request = std::make_unique<RequestImpl>(job);
462
463 // If there is an executor that is ready to run this request, submit it!
464 Executor* executor = FindIdleExecutor();
465 if (executor) {
466 DCHECK_EQ(0u, pending_jobs_.size());
467 executor->StartJob(job);
468 return ERR_IO_PENDING;
469 }
470
471 // Otherwise queue this request. (We will schedule it to a thread once one
472 // becomes available).
473 job->WaitingForThread();
474 pending_jobs_.push_back(job);
475
476 // If we haven't already reached the thread limit, provision a new thread to
477 // drain the requests more quickly.
478 if (executors_.size() < max_num_threads_)
479 AddNewExecutor();
480
481 return ERR_IO_PENDING;
482 }
483
FindIdleExecutor()484 Executor* MultiThreadedProxyResolver::FindIdleExecutor() {
485 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
486 for (auto& executor : executors_) {
487 if (!executor->outstanding_job())
488 return executor.get();
489 }
490 return nullptr;
491 }
492
AddNewExecutor()493 void MultiThreadedProxyResolver::AddNewExecutor() {
494 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
495 DCHECK_LT(executors_.size(), max_num_threads_);
496 // The "thread number" is used to give the thread a unique name.
497 int thread_number = executors_.size();
498
499 auto executor = base::MakeRefCounted<Executor>(this, thread_number);
500 executor->StartJob(base::MakeRefCounted<CreateResolverJob>(
501 script_data_, resolver_factory_.get()));
502 executors_.push_back(std::move(executor));
503 }
504
OnExecutorReady(Executor * executor)505 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
506 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
507 while (!pending_jobs_.empty()) {
508 scoped_refptr<Job> job = pending_jobs_.front();
509 pending_jobs_.pop_front();
510 if (!job->was_cancelled()) {
511 executor->StartJob(std::move(job));
512 return;
513 }
514 }
515 }
516
517 } // namespace
518
519 class MultiThreadedProxyResolverFactory::Job
520 : public ProxyResolverFactory::Request,
521 public Executor::Coordinator {
522 public:
Job(MultiThreadedProxyResolverFactory * factory,const scoped_refptr<PacFileData> & script_data,std::unique_ptr<ProxyResolver> * resolver,std::unique_ptr<ProxyResolverFactory> resolver_factory,size_t max_num_threads,CompletionOnceCallback callback)523 Job(MultiThreadedProxyResolverFactory* factory,
524 const scoped_refptr<PacFileData>& script_data,
525 std::unique_ptr<ProxyResolver>* resolver,
526 std::unique_ptr<ProxyResolverFactory> resolver_factory,
527 size_t max_num_threads,
528 CompletionOnceCallback callback)
529 : factory_(factory),
530 resolver_out_(resolver),
531 resolver_factory_(std::move(resolver_factory)),
532 max_num_threads_(max_num_threads),
533 script_data_(script_data),
534 executor_(base::MakeRefCounted<Executor>(this, 0)),
535 callback_(std::move(callback)) {
536 executor_->StartJob(base::MakeRefCounted<CreateResolverJob>(
537 script_data_, resolver_factory_.get()));
538 }
539
~Job()540 ~Job() override {
541 if (factory_) {
542 executor_->Destroy();
543 factory_->RemoveJob(this);
544 }
545 }
546
FactoryDestroyed()547 void FactoryDestroyed() {
548 executor_->Destroy();
549 executor_ = nullptr;
550 factory_ = nullptr;
551 resolver_out_ = nullptr;
552 }
553
554 private:
OnExecutorReady(Executor * executor)555 void OnExecutorReady(Executor* executor) override {
556 int error = OK;
557 if (executor->resolver()) {
558 *resolver_out_ = std::make_unique<MultiThreadedProxyResolver>(
559 std::move(resolver_factory_), max_num_threads_,
560 std::move(script_data_), executor_);
561 } else {
562 error = ERR_PAC_SCRIPT_FAILED;
563 executor_->Destroy();
564 }
565 factory_->RemoveJob(this);
566 factory_ = nullptr;
567 std::move(callback_).Run(error);
568 }
569
570 raw_ptr<MultiThreadedProxyResolverFactory> factory_;
571 raw_ptr<std::unique_ptr<ProxyResolver>> resolver_out_;
572 std::unique_ptr<ProxyResolverFactory> resolver_factory_;
573 const size_t max_num_threads_;
574 scoped_refptr<PacFileData> script_data_;
575 scoped_refptr<Executor> executor_;
576 CompletionOnceCallback callback_;
577 };
578
MultiThreadedProxyResolverFactory(size_t max_num_threads,bool factory_expects_bytes)579 MultiThreadedProxyResolverFactory::MultiThreadedProxyResolverFactory(
580 size_t max_num_threads,
581 bool factory_expects_bytes)
582 : ProxyResolverFactory(factory_expects_bytes),
583 max_num_threads_(max_num_threads) {
584 DCHECK_GE(max_num_threads, 1u);
585 }
586
~MultiThreadedProxyResolverFactory()587 MultiThreadedProxyResolverFactory::~MultiThreadedProxyResolverFactory() {
588 for (Job* job : jobs_) {
589 job->FactoryDestroyed();
590 }
591 }
592
CreateProxyResolver(const scoped_refptr<PacFileData> & pac_script,std::unique_ptr<ProxyResolver> * resolver,CompletionOnceCallback callback,std::unique_ptr<Request> * request)593 int MultiThreadedProxyResolverFactory::CreateProxyResolver(
594 const scoped_refptr<PacFileData>& pac_script,
595 std::unique_ptr<ProxyResolver>* resolver,
596 CompletionOnceCallback callback,
597 std::unique_ptr<Request>* request) {
598 auto job = std::make_unique<Job>(this, pac_script, resolver,
599 CreateProxyResolverFactory(),
600 max_num_threads_, std::move(callback));
601 jobs_.insert(job.get());
602 *request = std::move(job);
603 return ERR_IO_PENDING;
604 }
605
RemoveJob(MultiThreadedProxyResolverFactory::Job * job)606 void MultiThreadedProxyResolverFactory::RemoveJob(
607 MultiThreadedProxyResolverFactory::Job* job) {
608 size_t erased = jobs_.erase(job);
609 DCHECK_EQ(1u, erased);
610 }
611
612 } // namespace net
613