1// 2// detail/impl/strand_executor_service.ipp 3// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 4// 5// Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com) 6// 7// Distributed under the Boost Software License, Version 1.0. (See accompanying 8// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 9// 10 11#ifndef BOOST_ASIO_DETAIL_IMPL_STRAND_EXECUTOR_SERVICE_IPP 12#define BOOST_ASIO_DETAIL_IMPL_STRAND_EXECUTOR_SERVICE_IPP 13 14#if defined(_MSC_VER) && (_MSC_VER >= 1200) 15# pragma once 16#endif // defined(_MSC_VER) && (_MSC_VER >= 1200) 17 18#include <boost/asio/detail/config.hpp> 19#include <boost/asio/detail/strand_executor_service.hpp> 20 21#include <boost/asio/detail/push_options.hpp> 22 23namespace boost { 24namespace asio { 25namespace detail { 26 27strand_executor_service::strand_executor_service(execution_context& ctx) 28 : execution_context_service_base<strand_executor_service>(ctx), 29 mutex_(), 30 salt_(0), 31 impl_list_(0) 32{ 33} 34 35void strand_executor_service::shutdown() 36{ 37 op_queue<scheduler_operation> ops; 38 39 boost::asio::detail::mutex::scoped_lock lock(mutex_); 40 41 strand_impl* impl = impl_list_; 42 while (impl) 43 { 44 impl->mutex_->lock(); 45 impl->shutdown_ = true; 46 ops.push(impl->waiting_queue_); 47 ops.push(impl->ready_queue_); 48 impl->mutex_->unlock(); 49 impl = impl->next_; 50 } 51} 52 53strand_executor_service::implementation_type 54strand_executor_service::create_implementation() 55{ 56 implementation_type new_impl(new strand_impl); 57 new_impl->locked_ = false; 58 new_impl->shutdown_ = false; 59 60 boost::asio::detail::mutex::scoped_lock lock(mutex_); 61 62 // Select a mutex from the pool of shared mutexes. 63 std::size_t salt = salt_++; 64 std::size_t mutex_index = reinterpret_cast<std::size_t>(new_impl.get()); 65 mutex_index += (reinterpret_cast<std::size_t>(new_impl.get()) >> 3); 66 mutex_index ^= salt + 0x9e3779b9 + (mutex_index << 6) + (mutex_index >> 2); 67 mutex_index = mutex_index % num_mutexes; 68 if (!mutexes_[mutex_index].get()) 69 mutexes_[mutex_index].reset(new mutex); 70 new_impl->mutex_ = mutexes_[mutex_index].get(); 71 72 // Insert implementation into linked list of all implementations. 73 new_impl->next_ = impl_list_; 74 new_impl->prev_ = 0; 75 if (impl_list_) 76 impl_list_->prev_ = new_impl.get(); 77 impl_list_ = new_impl.get(); 78 new_impl->service_ = this; 79 80 return new_impl; 81} 82 83strand_executor_service::strand_impl::~strand_impl() 84{ 85 boost::asio::detail::mutex::scoped_lock lock(service_->mutex_); 86 87 // Remove implementation from linked list of all implementations. 88 if (service_->impl_list_ == this) 89 service_->impl_list_ = next_; 90 if (prev_) 91 prev_->next_ = next_; 92 if (next_) 93 next_->prev_= prev_; 94} 95 96bool strand_executor_service::enqueue(const implementation_type& impl, 97 scheduler_operation* op) 98{ 99 impl->mutex_->lock(); 100 if (impl->shutdown_) 101 { 102 impl->mutex_->unlock(); 103 op->destroy(); 104 return false; 105 } 106 else if (impl->locked_) 107 { 108 // Some other function already holds the strand lock. Enqueue for later. 109 impl->waiting_queue_.push(op); 110 impl->mutex_->unlock(); 111 return false; 112 } 113 else 114 { 115 // The function is acquiring the strand lock and so is responsible for 116 // scheduling the strand. 117 impl->locked_ = true; 118 impl->mutex_->unlock(); 119 impl->ready_queue_.push(op); 120 return true; 121 } 122} 123 124bool strand_executor_service::running_in_this_thread( 125 const implementation_type& impl) 126{ 127 return !!call_stack<strand_impl>::contains(impl.get()); 128} 129 130bool strand_executor_service::push_waiting_to_ready(implementation_type& impl) 131{ 132 impl->mutex_->lock(); 133 impl->ready_queue_.push(impl->waiting_queue_); 134 bool more_handlers = impl->locked_ = !impl->ready_queue_.empty(); 135 impl->mutex_->unlock(); 136 return more_handlers; 137} 138 139void strand_executor_service::run_ready_handlers(implementation_type& impl) 140{ 141 // Indicate that this strand is executing on the current thread. 142 call_stack<strand_impl>::context ctx(impl.get()); 143 144 // Run all ready handlers. No lock is required since the ready queue is 145 // accessed only within the strand. 146 boost::system::error_code ec; 147 while (scheduler_operation* o = impl->ready_queue_.front()) 148 { 149 impl->ready_queue_.pop(); 150 o->complete(impl.get(), ec, 0); 151 } 152} 153 154} // namespace detail 155} // namespace asio 156} // namespace boost 157 158#include <boost/asio/detail/pop_options.hpp> 159 160#endif // BOOST_ASIO_DETAIL_IMPL_STRAND_EXECUTOR_SERVICE_IPP 161