1//
2// detail/impl/strand_executor_service.ipp
3// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4//
5// Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6//
7// Distributed under the Boost Software License, Version 1.0. (See accompanying
8// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9//
10
11#ifndef BOOST_ASIO_DETAIL_IMPL_STRAND_EXECUTOR_SERVICE_IPP
12#define BOOST_ASIO_DETAIL_IMPL_STRAND_EXECUTOR_SERVICE_IPP
13
14#if defined(_MSC_VER) && (_MSC_VER >= 1200)
15# pragma once
16#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
17
18#include <boost/asio/detail/config.hpp>
19#include <boost/asio/detail/strand_executor_service.hpp>
20
21#include <boost/asio/detail/push_options.hpp>
22
23namespace boost {
24namespace asio {
25namespace detail {
26
27strand_executor_service::strand_executor_service(execution_context& ctx)
28  : execution_context_service_base<strand_executor_service>(ctx),
29    mutex_(),
30    salt_(0),
31    impl_list_(0)
32{
33}
34
35void strand_executor_service::shutdown()
36{
37  op_queue<scheduler_operation> ops;
38
39  boost::asio::detail::mutex::scoped_lock lock(mutex_);
40
41  strand_impl* impl = impl_list_;
42  while (impl)
43  {
44    impl->mutex_->lock();
45    impl->shutdown_ = true;
46    ops.push(impl->waiting_queue_);
47    ops.push(impl->ready_queue_);
48    impl->mutex_->unlock();
49    impl = impl->next_;
50  }
51}
52
53strand_executor_service::implementation_type
54strand_executor_service::create_implementation()
55{
56  implementation_type new_impl(new strand_impl);
57  new_impl->locked_ = false;
58  new_impl->shutdown_ = false;
59
60  boost::asio::detail::mutex::scoped_lock lock(mutex_);
61
62  // Select a mutex from the pool of shared mutexes.
63  std::size_t salt = salt_++;
64  std::size_t mutex_index = reinterpret_cast<std::size_t>(new_impl.get());
65  mutex_index += (reinterpret_cast<std::size_t>(new_impl.get()) >> 3);
66  mutex_index ^= salt + 0x9e3779b9 + (mutex_index << 6) + (mutex_index >> 2);
67  mutex_index = mutex_index % num_mutexes;
68  if (!mutexes_[mutex_index].get())
69    mutexes_[mutex_index].reset(new mutex);
70  new_impl->mutex_ = mutexes_[mutex_index].get();
71
72  // Insert implementation into linked list of all implementations.
73  new_impl->next_ = impl_list_;
74  new_impl->prev_ = 0;
75  if (impl_list_)
76    impl_list_->prev_ = new_impl.get();
77  impl_list_ = new_impl.get();
78  new_impl->service_ = this;
79
80  return new_impl;
81}
82
83strand_executor_service::strand_impl::~strand_impl()
84{
85  boost::asio::detail::mutex::scoped_lock lock(service_->mutex_);
86
87  // Remove implementation from linked list of all implementations.
88  if (service_->impl_list_ == this)
89    service_->impl_list_ = next_;
90  if (prev_)
91    prev_->next_ = next_;
92  if (next_)
93    next_->prev_= prev_;
94}
95
96bool strand_executor_service::enqueue(const implementation_type& impl,
97    scheduler_operation* op)
98{
99  impl->mutex_->lock();
100  if (impl->shutdown_)
101  {
102    impl->mutex_->unlock();
103    op->destroy();
104    return false;
105  }
106  else if (impl->locked_)
107  {
108    // Some other function already holds the strand lock. Enqueue for later.
109    impl->waiting_queue_.push(op);
110    impl->mutex_->unlock();
111    return false;
112  }
113  else
114  {
115    // The function is acquiring the strand lock and so is responsible for
116    // scheduling the strand.
117    impl->locked_ = true;
118    impl->mutex_->unlock();
119    impl->ready_queue_.push(op);
120    return true;
121  }
122}
123
124bool strand_executor_service::running_in_this_thread(
125    const implementation_type& impl)
126{
127  return !!call_stack<strand_impl>::contains(impl.get());
128}
129
130bool strand_executor_service::push_waiting_to_ready(implementation_type& impl)
131{
132  impl->mutex_->lock();
133  impl->ready_queue_.push(impl->waiting_queue_);
134  bool more_handlers = impl->locked_ = !impl->ready_queue_.empty();
135  impl->mutex_->unlock();
136  return more_handlers;
137}
138
139void strand_executor_service::run_ready_handlers(implementation_type& impl)
140{
141  // Indicate that this strand is executing on the current thread.
142  call_stack<strand_impl>::context ctx(impl.get());
143
144  // Run all ready handlers. No lock is required since the ready queue is
145  // accessed only within the strand.
146  boost::system::error_code ec;
147  while (scheduler_operation* o = impl->ready_queue_.front())
148  {
149    impl->ready_queue_.pop();
150    o->complete(impl.get(), ec, 0);
151  }
152}
153
154} // namespace detail
155} // namespace asio
156} // namespace boost
157
158#include <boost/asio/detail/pop_options.hpp>
159
160#endif // BOOST_ASIO_DETAIL_IMPL_STRAND_EXECUTOR_SERVICE_IPP
161