1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/lib/gprpp/work_serializer.h"
20
21 #include <stdint.h>
22
23 #include <atomic>
24 #include <functional>
25 #include <memory>
26 #include <utility>
27
28 #include <grpc/support/log.h>
29
30 #include "src/core/lib/debug/trace.h"
31 #include "src/core/lib/gprpp/debug_location.h"
32 #include "src/core/lib/gprpp/mpscq.h"
33 #include "src/core/lib/gprpp/orphanable.h"
34
35 namespace grpc_core {
36
37 DebugOnlyTraceFlag grpc_work_serializer_trace(false, "work_serializer");
38
39 //
40 // WorkSerializer::WorkSerializerImpl
41 //
42
43 class WorkSerializer::WorkSerializerImpl : public Orphanable {
44 public:
45 void Run(std::function<void()> callback, const DebugLocation& location);
46 void Schedule(std::function<void()> callback, const DebugLocation& location);
47 void DrainQueue();
48 void Orphan() override;
49
50 private:
51 struct CallbackWrapper {
CallbackWrappergrpc_core::WorkSerializer::WorkSerializerImpl::CallbackWrapper52 CallbackWrapper(std::function<void()> cb, const DebugLocation& loc)
53 : callback(std::move(cb)), location(loc) {}
54
55 MultiProducerSingleConsumerQueue::Node mpscq_node;
56 const std::function<void()> callback;
57 const DebugLocation location;
58 };
59
60 // Callers of DrainQueueOwned should make sure to grab the lock on the
61 // workserializer with
62 //
63 // prev_ref_pair =
64 // refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
65 //
66 // and only invoke DrainQueueOwned() if there was previously no owner. Note
67 // that the queue size is also incremented as part of the fetch_add to allow
68 // the callers to add a callback to the queue if another thread already holds
69 // the lock to the work serializer.
70 void DrainQueueOwned();
71
72 // First 16 bits indicate ownership of the WorkSerializer, next 48 bits are
73 // queue size (i.e., refs).
MakeRefPair(uint16_t owners,uint64_t size)74 static uint64_t MakeRefPair(uint16_t owners, uint64_t size) {
75 GPR_ASSERT(size >> 48 == 0);
76 return (static_cast<uint64_t>(owners) << 48) + static_cast<int64_t>(size);
77 }
GetOwners(uint64_t ref_pair)78 static uint32_t GetOwners(uint64_t ref_pair) {
79 return static_cast<uint32_t>(ref_pair >> 48);
80 }
GetSize(uint64_t ref_pair)81 static uint64_t GetSize(uint64_t ref_pair) {
82 return static_cast<uint64_t>(ref_pair & 0xffffffffffffu);
83 }
84
85 // An initial size of 1 keeps track of whether the work serializer has been
86 // orphaned.
87 std::atomic<uint64_t> refs_{MakeRefPair(0, 1)};
88 MultiProducerSingleConsumerQueue queue_;
89 };
90
Run(std::function<void ()> callback,const DebugLocation & location)91 void WorkSerializer::WorkSerializerImpl::Run(std::function<void()> callback,
92 const DebugLocation& location) {
93 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
94 gpr_log(GPR_INFO, "WorkSerializer::Run() %p Scheduling callback [%s:%d]",
95 this, location.file(), location.line());
96 }
97 // Increment queue size for the new callback and owner count to attempt to
98 // take ownership of the WorkSerializer.
99 const uint64_t prev_ref_pair =
100 refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
101 // The work serializer should not have been orphaned.
102 GPR_DEBUG_ASSERT(GetSize(prev_ref_pair) > 0);
103 if (GetOwners(prev_ref_pair) == 0) {
104 // We took ownership of the WorkSerializer. Invoke callback and drain queue.
105 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
106 gpr_log(GPR_INFO, " Executing immediately");
107 }
108 callback();
109 DrainQueueOwned();
110 } else {
111 // Another thread is holding the WorkSerializer, so decrement the ownership
112 // count we just added and queue the callback.
113 refs_.fetch_sub(MakeRefPair(1, 0), std::memory_order_acq_rel);
114 CallbackWrapper* cb_wrapper =
115 new CallbackWrapper(std::move(callback), location);
116 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
117 gpr_log(GPR_INFO, " Scheduling on queue : item %p", cb_wrapper);
118 }
119 queue_.Push(&cb_wrapper->mpscq_node);
120 }
121 }
122
Schedule(std::function<void ()> callback,const DebugLocation & location)123 void WorkSerializer::WorkSerializerImpl::Schedule(
124 std::function<void()> callback, const DebugLocation& location) {
125 CallbackWrapper* cb_wrapper =
126 new CallbackWrapper(std::move(callback), location);
127 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
128 gpr_log(GPR_INFO,
129 "WorkSerializer::Schedule() %p Scheduling callback %p [%s:%d]",
130 this, cb_wrapper, location.file(), location.line());
131 }
132 refs_.fetch_add(MakeRefPair(0, 1), std::memory_order_acq_rel);
133 queue_.Push(&cb_wrapper->mpscq_node);
134 }
135
Orphan()136 void WorkSerializer::WorkSerializerImpl::Orphan() {
137 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
138 gpr_log(GPR_INFO, "WorkSerializer::Orphan() %p", this);
139 }
140 const uint64_t prev_ref_pair =
141 refs_.fetch_sub(MakeRefPair(0, 1), std::memory_order_acq_rel);
142 if (GetOwners(prev_ref_pair) == 0 && GetSize(prev_ref_pair) == 1) {
143 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
144 gpr_log(GPR_INFO, " Destroying");
145 }
146 delete this;
147 }
148 }
149
150 // The thread that calls this loans itself to the work serializer so as to
151 // execute all the scheduled callbacks.
DrainQueue()152 void WorkSerializer::WorkSerializerImpl::DrainQueue() {
153 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
154 gpr_log(GPR_INFO, "WorkSerializer::DrainQueue() %p", this);
155 }
156 // Attempt to take ownership of the WorkSerializer. Also increment the queue
157 // size as required by `DrainQueueOwned()`.
158 const uint64_t prev_ref_pair =
159 refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
160 if (GetOwners(prev_ref_pair) == 0) {
161 // We took ownership of the WorkSerializer. Drain the queue.
162 DrainQueueOwned();
163 } else {
164 // Another thread is holding the WorkSerializer, so decrement the ownership
165 // count we just added and queue a no-op callback.
166 refs_.fetch_sub(MakeRefPair(1, 0), std::memory_order_acq_rel);
167 CallbackWrapper* cb_wrapper = new CallbackWrapper([]() {}, DEBUG_LOCATION);
168 queue_.Push(&cb_wrapper->mpscq_node);
169 }
170 }
171
DrainQueueOwned()172 void WorkSerializer::WorkSerializerImpl::DrainQueueOwned() {
173 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
174 gpr_log(GPR_INFO, "WorkSerializer::DrainQueueOwned() %p", this);
175 }
176 while (true) {
177 auto prev_ref_pair = refs_.fetch_sub(MakeRefPair(0, 1));
178 // It is possible that while draining the queue, the last callback ended
179 // up orphaning the work serializer. In that case, delete the object.
180 if (GetSize(prev_ref_pair) == 1) {
181 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
182 gpr_log(GPR_INFO, " Queue Drained. Destroying");
183 }
184 delete this;
185 return;
186 }
187 if (GetSize(prev_ref_pair) == 2) {
188 // Queue drained. Give up ownership but only if queue remains empty.
189 uint64_t expected = MakeRefPair(1, 1);
190 if (refs_.compare_exchange_strong(expected, MakeRefPair(0, 1),
191 std::memory_order_acq_rel)) {
192 // Queue is drained.
193 return;
194 }
195 if (GetSize(expected) == 0) {
196 // WorkSerializer got orphaned while this was running
197 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
198 gpr_log(GPR_INFO, " Queue Drained. Destroying");
199 }
200 delete this;
201 return;
202 }
203 }
204 // There is at least one callback on the queue. Pop the callback from the
205 // queue and execute it.
206 CallbackWrapper* cb_wrapper = nullptr;
207 bool empty_unused;
208 while ((cb_wrapper = reinterpret_cast<CallbackWrapper*>(
209 queue_.PopAndCheckEnd(&empty_unused))) == nullptr) {
210 // This can happen due to a race condition within the mpscq
211 // implementation or because of a race with Run()/Schedule().
212 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
213 gpr_log(GPR_INFO, " Queue returned nullptr, trying again");
214 }
215 }
216 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
217 gpr_log(GPR_INFO, " Running item %p : callback scheduled at [%s:%d]",
218 cb_wrapper, cb_wrapper->location.file(),
219 cb_wrapper->location.line());
220 }
221 cb_wrapper->callback();
222 delete cb_wrapper;
223 }
224 }
225
226 //
227 // WorkSerializer
228 //
229
WorkSerializer()230 WorkSerializer::WorkSerializer()
231 : impl_(MakeOrphanable<WorkSerializerImpl>()) {}
232
~WorkSerializer()233 WorkSerializer::~WorkSerializer() {}
234
Run(std::function<void ()> callback,const DebugLocation & location)235 void WorkSerializer::Run(std::function<void()> callback,
236 const DebugLocation& location) {
237 impl_->Run(std::move(callback), location);
238 }
239
Schedule(std::function<void ()> callback,const DebugLocation & location)240 void WorkSerializer::Schedule(std::function<void()> callback,
241 const DebugLocation& location) {
242 impl_->Schedule(std::move(callback), location);
243 }
244
DrainQueue()245 void WorkSerializer::DrainQueue() { impl_->DrainQueue(); }
246
247 } // namespace grpc_core
248