1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/lib/gprpp/work_serializer.h"
20 
21 #include <stdint.h>
22 
23 #include <atomic>
24 #include <functional>
25 #include <memory>
26 #include <utility>
27 
28 #include <grpc/support/log.h>
29 
30 #include "src/core/lib/debug/trace.h"
31 #include "src/core/lib/gprpp/debug_location.h"
32 #include "src/core/lib/gprpp/mpscq.h"
33 #include "src/core/lib/gprpp/orphanable.h"
34 
35 namespace grpc_core {
36 
37 DebugOnlyTraceFlag grpc_work_serializer_trace(false, "work_serializer");
38 
39 //
40 // WorkSerializer::WorkSerializerImpl
41 //
42 
43 class WorkSerializer::WorkSerializerImpl : public Orphanable {
44  public:
45   void Run(std::function<void()> callback, const DebugLocation& location);
46   void Schedule(std::function<void()> callback, const DebugLocation& location);
47   void DrainQueue();
48   void Orphan() override;
49 
50  private:
51   struct CallbackWrapper {
CallbackWrappergrpc_core::WorkSerializer::WorkSerializerImpl::CallbackWrapper52     CallbackWrapper(std::function<void()> cb, const DebugLocation& loc)
53         : callback(std::move(cb)), location(loc) {}
54 
55     MultiProducerSingleConsumerQueue::Node mpscq_node;
56     const std::function<void()> callback;
57     const DebugLocation location;
58   };
59 
60   // Callers of DrainQueueOwned should make sure to grab the lock on the
61   // workserializer with
62   //
63   //   prev_ref_pair =
64   //     refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
65   //
66   // and only invoke DrainQueueOwned() if there was previously no owner. Note
67   // that the queue size is also incremented as part of the fetch_add to allow
68   // the callers to add a callback to the queue if another thread already holds
69   // the lock to the work serializer.
70   void DrainQueueOwned();
71 
72   // First 16 bits indicate ownership of the WorkSerializer, next 48 bits are
73   // queue size (i.e., refs).
MakeRefPair(uint16_t owners,uint64_t size)74   static uint64_t MakeRefPair(uint16_t owners, uint64_t size) {
75     GPR_ASSERT(size >> 48 == 0);
76     return (static_cast<uint64_t>(owners) << 48) + static_cast<int64_t>(size);
77   }
GetOwners(uint64_t ref_pair)78   static uint32_t GetOwners(uint64_t ref_pair) {
79     return static_cast<uint32_t>(ref_pair >> 48);
80   }
GetSize(uint64_t ref_pair)81   static uint64_t GetSize(uint64_t ref_pair) {
82     return static_cast<uint64_t>(ref_pair & 0xffffffffffffu);
83   }
84 
85   // An initial size of 1 keeps track of whether the work serializer has been
86   // orphaned.
87   std::atomic<uint64_t> refs_{MakeRefPair(0, 1)};
88   MultiProducerSingleConsumerQueue queue_;
89 };
90 
Run(std::function<void ()> callback,const DebugLocation & location)91 void WorkSerializer::WorkSerializerImpl::Run(std::function<void()> callback,
92                                              const DebugLocation& location) {
93   if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
94     gpr_log(GPR_INFO, "WorkSerializer::Run() %p Scheduling callback [%s:%d]",
95             this, location.file(), location.line());
96   }
97   // Increment queue size for the new callback and owner count to attempt to
98   // take ownership of the WorkSerializer.
99   const uint64_t prev_ref_pair =
100       refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
101   // The work serializer should not have been orphaned.
102   GPR_DEBUG_ASSERT(GetSize(prev_ref_pair) > 0);
103   if (GetOwners(prev_ref_pair) == 0) {
104     // We took ownership of the WorkSerializer. Invoke callback and drain queue.
105     if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
106       gpr_log(GPR_INFO, "  Executing immediately");
107     }
108     callback();
109     DrainQueueOwned();
110   } else {
111     // Another thread is holding the WorkSerializer, so decrement the ownership
112     // count we just added and queue the callback.
113     refs_.fetch_sub(MakeRefPair(1, 0), std::memory_order_acq_rel);
114     CallbackWrapper* cb_wrapper =
115         new CallbackWrapper(std::move(callback), location);
116     if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
117       gpr_log(GPR_INFO, "  Scheduling on queue : item %p", cb_wrapper);
118     }
119     queue_.Push(&cb_wrapper->mpscq_node);
120   }
121 }
122 
Schedule(std::function<void ()> callback,const DebugLocation & location)123 void WorkSerializer::WorkSerializerImpl::Schedule(
124     std::function<void()> callback, const DebugLocation& location) {
125   CallbackWrapper* cb_wrapper =
126       new CallbackWrapper(std::move(callback), location);
127   if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
128     gpr_log(GPR_INFO,
129             "WorkSerializer::Schedule() %p Scheduling callback %p [%s:%d]",
130             this, cb_wrapper, location.file(), location.line());
131   }
132   refs_.fetch_add(MakeRefPair(0, 1), std::memory_order_acq_rel);
133   queue_.Push(&cb_wrapper->mpscq_node);
134 }
135 
Orphan()136 void WorkSerializer::WorkSerializerImpl::Orphan() {
137   if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
138     gpr_log(GPR_INFO, "WorkSerializer::Orphan() %p", this);
139   }
140   const uint64_t prev_ref_pair =
141       refs_.fetch_sub(MakeRefPair(0, 1), std::memory_order_acq_rel);
142   if (GetOwners(prev_ref_pair) == 0 && GetSize(prev_ref_pair) == 1) {
143     if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
144       gpr_log(GPR_INFO, "  Destroying");
145     }
146     delete this;
147   }
148 }
149 
150 // The thread that calls this loans itself to the work serializer so as to
151 // execute all the scheduled callbacks.
DrainQueue()152 void WorkSerializer::WorkSerializerImpl::DrainQueue() {
153   if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
154     gpr_log(GPR_INFO, "WorkSerializer::DrainQueue() %p", this);
155   }
156   // Attempt to take ownership of the WorkSerializer. Also increment the queue
157   // size as required by `DrainQueueOwned()`.
158   const uint64_t prev_ref_pair =
159       refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
160   if (GetOwners(prev_ref_pair) == 0) {
161     // We took ownership of the WorkSerializer. Drain the queue.
162     DrainQueueOwned();
163   } else {
164     // Another thread is holding the WorkSerializer, so decrement the ownership
165     // count we just added and queue a no-op callback.
166     refs_.fetch_sub(MakeRefPair(1, 0), std::memory_order_acq_rel);
167     CallbackWrapper* cb_wrapper = new CallbackWrapper([]() {}, DEBUG_LOCATION);
168     queue_.Push(&cb_wrapper->mpscq_node);
169   }
170 }
171 
DrainQueueOwned()172 void WorkSerializer::WorkSerializerImpl::DrainQueueOwned() {
173   if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
174     gpr_log(GPR_INFO, "WorkSerializer::DrainQueueOwned() %p", this);
175   }
176   while (true) {
177     auto prev_ref_pair = refs_.fetch_sub(MakeRefPair(0, 1));
178     // It is possible that while draining the queue, the last callback ended
179     // up orphaning the work serializer. In that case, delete the object.
180     if (GetSize(prev_ref_pair) == 1) {
181       if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
182         gpr_log(GPR_INFO, "  Queue Drained. Destroying");
183       }
184       delete this;
185       return;
186     }
187     if (GetSize(prev_ref_pair) == 2) {
188       // Queue drained. Give up ownership but only if queue remains empty.
189       uint64_t expected = MakeRefPair(1, 1);
190       if (refs_.compare_exchange_strong(expected, MakeRefPair(0, 1),
191                                         std::memory_order_acq_rel)) {
192         // Queue is drained.
193         return;
194       }
195       if (GetSize(expected) == 0) {
196         // WorkSerializer got orphaned while this was running
197         if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
198           gpr_log(GPR_INFO, "  Queue Drained. Destroying");
199         }
200         delete this;
201         return;
202       }
203     }
204     // There is at least one callback on the queue. Pop the callback from the
205     // queue and execute it.
206     CallbackWrapper* cb_wrapper = nullptr;
207     bool empty_unused;
208     while ((cb_wrapper = reinterpret_cast<CallbackWrapper*>(
209                 queue_.PopAndCheckEnd(&empty_unused))) == nullptr) {
210       // This can happen due to a race condition within the mpscq
211       // implementation or because of a race with Run()/Schedule().
212       if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
213         gpr_log(GPR_INFO, "  Queue returned nullptr, trying again");
214       }
215     }
216     if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
217       gpr_log(GPR_INFO, "  Running item %p : callback scheduled at [%s:%d]",
218               cb_wrapper, cb_wrapper->location.file(),
219               cb_wrapper->location.line());
220     }
221     cb_wrapper->callback();
222     delete cb_wrapper;
223   }
224 }
225 
226 //
227 // WorkSerializer
228 //
229 
WorkSerializer()230 WorkSerializer::WorkSerializer()
231     : impl_(MakeOrphanable<WorkSerializerImpl>()) {}
232 
~WorkSerializer()233 WorkSerializer::~WorkSerializer() {}
234 
Run(std::function<void ()> callback,const DebugLocation & location)235 void WorkSerializer::Run(std::function<void()> callback,
236                          const DebugLocation& location) {
237   impl_->Run(std::move(callback), location);
238 }
239 
Schedule(std::function<void ()> callback,const DebugLocation & location)240 void WorkSerializer::Schedule(std::function<void()> callback,
241                               const DebugLocation& location) {
242   impl_->Schedule(std::move(callback), location);
243 }
244 
DrainQueue()245 void WorkSerializer::DrainQueue() { impl_->DrainQueue(); }
246 
247 }  // namespace grpc_core
248