1 /* 2 * Copyright 2019 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #ifndef RTC_BASE_OPERATIONS_CHAIN_H_ 12 #define RTC_BASE_OPERATIONS_CHAIN_H_ 13 14 #include <functional> 15 #include <memory> 16 #include <queue> 17 #include <set> 18 #include <type_traits> 19 #include <utility> 20 21 #include "absl/types/optional.h" 22 #include "api/ref_counted_base.h" 23 #include "api/scoped_refptr.h" 24 #include "api/sequence_checker.h" 25 #include "rtc_base/checks.h" 26 #include "rtc_base/ref_count.h" 27 #include "rtc_base/ref_counted_object.h" 28 #include "rtc_base/system/no_unique_address.h" 29 30 namespace rtc { 31 32 namespace rtc_operations_chain_internal { 33 34 // Abstract base class for operations on the OperationsChain. Run() must be 35 // invoked exactly once during the Operation's lifespan. 36 class Operation { 37 public: ~Operation()38 virtual ~Operation() {} 39 40 virtual void Run() = 0; 41 }; 42 43 // FunctorT is the same as in OperationsChain::ChainOperation(). `callback_` is 44 // passed on to the `functor_` and is used to inform the OperationsChain that 45 // the operation completed. The functor is responsible for invoking the 46 // callback when the operation has completed. 47 template <typename FunctorT> 48 class OperationWithFunctor final : public Operation { 49 public: OperationWithFunctor(FunctorT && functor,std::function<void ()> callback)50 OperationWithFunctor(FunctorT&& functor, std::function<void()> callback) 51 : functor_(std::forward<FunctorT>(functor)), 52 callback_(std::move(callback)) {} 53 ~OperationWithFunctor()54 ~OperationWithFunctor() override { 55 #if RTC_DCHECK_IS_ON 56 RTC_DCHECK(has_run_); 57 #endif // RTC_DCHECK_IS_ON 58 } 59 Run()60 void Run() override { 61 #if RTC_DCHECK_IS_ON 62 RTC_DCHECK(!has_run_); 63 has_run_ = true; 64 #endif // RTC_DCHECK_IS_ON 65 // The functor being executed may invoke the callback synchronously, 66 // marking the operation as complete. As such, `this` OperationWithFunctor 67 // object may get deleted here, including destroying `functor_`. To 68 // protect the functor from self-destruction while running, it is moved to 69 // a local variable. 70 auto functor = std::move(functor_); 71 functor(std::move(callback_)); 72 // `this` may now be deleted; don't touch any member variables. 73 } 74 75 private: 76 typename std::remove_reference<FunctorT>::type functor_; 77 std::function<void()> callback_; 78 #if RTC_DCHECK_IS_ON 79 bool has_run_ = false; 80 #endif // RTC_DCHECK_IS_ON 81 }; 82 83 } // namespace rtc_operations_chain_internal 84 85 // An implementation of an operations chain. An operations chain is used to 86 // ensure that asynchronous tasks are executed in-order with at most one task 87 // running at a time. The notion of an operation chain is defined in 88 // https://w3c.github.io/webrtc-pc/#dfn-operations-chain, though unlike this 89 // implementation, the referenced definition is coupled with a peer connection. 90 // 91 // An operation is an asynchronous task. The operation starts when its functor 92 // is invoked, and completes when the callback that is passed to functor is 93 // invoked by the operation. The operation must start and complete on the same 94 // sequence that the operation was "chained" on. As such, the OperationsChain 95 // operates in a "single-threaded" fashion, but the asynchronous operations may 96 // use any number of threads to achieve "in parallel" behavior. 97 // 98 // When an operation is chained onto the OperationsChain, it is enqueued to be 99 // executed. Operations are executed in FIFO order, where the next operation 100 // does not start until the previous operation has completed. OperationsChain 101 // guarantees that: 102 // - If the operations chain is empty when an operation is chained, the 103 // operation starts immediately, inside ChainOperation(). 104 // - If the operations chain is not empty when an operation is chained, the 105 // operation starts upon the previous operation completing, inside the 106 // callback. 107 // 108 // An operation is contractually obligated to invoke the completion callback 109 // exactly once. Cancelling a chained operation is not supported by the 110 // OperationsChain; an operation that wants to be cancellable is responsible for 111 // aborting its own steps. The callback must still be invoked. 112 // 113 // The OperationsChain is kept-alive through reference counting if there are 114 // operations pending. This, together with the contract, guarantees that all 115 // operations that are chained get executed. 116 class OperationsChain final : public RefCountedNonVirtual<OperationsChain> { 117 public: 118 static scoped_refptr<OperationsChain> Create(); 119 ~OperationsChain(); 120 121 OperationsChain(const OperationsChain&) = delete; 122 OperationsChain& operator=(const OperationsChain&) = delete; 123 124 void SetOnChainEmptyCallback(std::function<void()> on_chain_empty_callback); 125 bool IsEmpty() const; 126 127 // Chains an operation. Chained operations are executed in FIFO order. The 128 // operation starts when `functor` is executed by the OperationsChain and is 129 // contractually obligated to invoke the callback passed to it when the 130 // operation is complete. Operations must start and complete on the same 131 // sequence that this method was invoked on. 132 // 133 // If the OperationsChain is empty, the operation starts immediately. 134 // Otherwise it starts upon the previous operation completing. 135 // 136 // Requirements of FunctorT: 137 // - FunctorT is movable. 138 // - FunctorT implements "T operator()(std::function<void()> callback)" or 139 // "T operator()(std::function<void()> callback) const" for some T (if T is 140 // not void, the return value is discarded in the invoking sequence). The 141 // operator starts the operation; when the operation is complete, "callback" 142 // MUST be invoked, and it MUST be so on the sequence that ChainOperation() 143 // was invoked on. 144 // 145 // Lambda expressions are valid functors. 146 template <typename FunctorT> ChainOperation(FunctorT && functor)147 void ChainOperation(FunctorT&& functor) { 148 RTC_DCHECK_RUN_ON(&sequence_checker_); 149 chained_operations_.push( 150 std::make_unique< 151 rtc_operations_chain_internal::OperationWithFunctor<FunctorT>>( 152 std::forward<FunctorT>(functor), CreateOperationsChainCallback())); 153 // If this is the only operation in the chain we execute it immediately. 154 // Otherwise the callback will get invoked when the pending operation 155 // completes which will trigger the next operation to execute. 156 if (chained_operations_.size() == 1) { 157 chained_operations_.front()->Run(); 158 } 159 } 160 161 private: 162 friend class CallbackHandle; 163 164 // The callback that is passed to an operation's functor (that is used to 165 // inform the OperationsChain that the operation has completed) is of type 166 // std::function<void()>, which is a copyable type. To allow the callback to 167 // be copyable, it is backed up by this reference counted handle. See 168 // CreateOperationsChainCallback(). 169 class CallbackHandle final : public RefCountedNonVirtual<CallbackHandle> { 170 public: 171 explicit CallbackHandle(scoped_refptr<OperationsChain> operations_chain); 172 ~CallbackHandle(); 173 174 CallbackHandle(const CallbackHandle&) = delete; 175 CallbackHandle& operator=(const CallbackHandle&) = delete; 176 177 void OnOperationComplete(); 178 179 private: 180 scoped_refptr<OperationsChain> operations_chain_; 181 #if RTC_DCHECK_IS_ON 182 bool has_run_ = false; 183 #endif // RTC_DCHECK_IS_ON 184 }; 185 186 OperationsChain(); 187 188 std::function<void()> CreateOperationsChainCallback(); 189 void OnOperationComplete(); 190 191 RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker sequence_checker_; 192 // FIFO-list of operations that are chained. An operation that is executing 193 // remains on this list until it has completed by invoking the callback passed 194 // to it. 195 std::queue<std::unique_ptr<rtc_operations_chain_internal::Operation>> 196 chained_operations_ RTC_GUARDED_BY(sequence_checker_); 197 absl::optional<std::function<void()>> on_chain_empty_callback_ 198 RTC_GUARDED_BY(sequence_checker_); 199 }; 200 201 } // namespace rtc 202 203 #endif // RTC_BASE_OPERATIONS_CHAIN_H_ 204