1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/support/port_platform.h>
16
17 #include "src/core/lib/promise/party.h"
18
19 #include <atomic>
20
21 #include "absl/base/thread_annotations.h"
22 #include "absl/strings/str_format.h"
23
24 #include <grpc/support/log.h>
25
26 #include "src/core/lib/gprpp/sync.h"
27 #include "src/core/lib/iomgr/exec_ctx.h"
28 #include "src/core/lib/promise/activity.h"
29 #include "src/core/lib/promise/trace.h"
30
31 #ifdef GRPC_MAXIMIZE_THREADYNESS
32 #include "src/core/lib/gprpp/thd.h" // IWYU pragma: keep
33 #include "src/core/lib/iomgr/exec_ctx.h" // IWYU pragma: keep
34 #endif
35
36 grpc_core::DebugOnlyTraceFlag grpc_trace_party_state(false, "party_state");
37
38 namespace grpc_core {
39
40 namespace {
41 // TODO(ctiller): Once all activities are parties we can remove this.
42 thread_local Party** g_current_party_run_next = nullptr;
43 } // namespace
44
45 ///////////////////////////////////////////////////////////////////////////////
46 // PartySyncUsingAtomics
47
RefIfNonZero()48 GRPC_MUST_USE_RESULT bool PartySyncUsingAtomics::RefIfNonZero() {
49 auto count = state_.load(std::memory_order_relaxed);
50 do {
51 // If zero, we are done (without an increment). If not, we must do a CAS
52 // to maintain the contract: do not increment the counter if it is already
53 // zero
54 if (count == 0) {
55 return false;
56 }
57 } while (!state_.compare_exchange_weak(count, count + kOneRef,
58 std::memory_order_acq_rel,
59 std::memory_order_relaxed));
60 LogStateChange("RefIfNonZero", count, count + kOneRef);
61 return true;
62 }
63
UnreffedLast()64 bool PartySyncUsingAtomics::UnreffedLast() {
65 uint64_t prev_state =
66 state_.fetch_or(kDestroying | kLocked, std::memory_order_acq_rel);
67 LogStateChange("UnreffedLast", prev_state,
68 prev_state | kDestroying | kLocked);
69 return (prev_state & kLocked) == 0;
70 }
71
ScheduleWakeup(WakeupMask mask)72 bool PartySyncUsingAtomics::ScheduleWakeup(WakeupMask mask) {
73 // Or in the wakeup bit for the participant, AND the locked bit.
74 uint64_t prev_state = state_.fetch_or((mask & kWakeupMask) | kLocked,
75 std::memory_order_acq_rel);
76 LogStateChange("ScheduleWakeup", prev_state,
77 prev_state | (mask & kWakeupMask) | kLocked);
78 // If the lock was not held now we hold it, so we need to run.
79 return ((prev_state & kLocked) == 0);
80 }
81
82 ///////////////////////////////////////////////////////////////////////////////
83 // PartySyncUsingMutex
84
ScheduleWakeup(WakeupMask mask)85 bool PartySyncUsingMutex::ScheduleWakeup(WakeupMask mask) {
86 MutexLock lock(&mu_);
87 wakeups_ |= mask;
88 return !std::exchange(locked_, true);
89 }
90
91 ///////////////////////////////////////////////////////////////////////////////
92 // Party::Handle
93
94 // Weak handle to a Party.
95 // Handle can persist while Party goes away.
96 class Party::Handle final : public Wakeable {
97 public:
Handle(Party * party)98 explicit Handle(Party* party) : party_(party) {}
99
100 // Ref the Handle (not the activity).
Ref()101 void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
102
103 // Activity is going away... drop its reference and sever the connection back.
DropActivity()104 void DropActivity() ABSL_LOCKS_EXCLUDED(mu_) {
105 mu_.Lock();
106 GPR_ASSERT(party_ != nullptr);
107 party_ = nullptr;
108 mu_.Unlock();
109 Unref();
110 }
111
WakeupGeneric(WakeupMask wakeup_mask,void (Party::* wakeup_method)(WakeupMask))112 void WakeupGeneric(WakeupMask wakeup_mask,
113 void (Party::*wakeup_method)(WakeupMask))
114 ABSL_LOCKS_EXCLUDED(mu_) {
115 mu_.Lock();
116 // Note that activity refcount can drop to zero, but we could win the lock
117 // against DropActivity, so we need to only increase activities refcount if
118 // it is non-zero.
119 Party* party = party_;
120 if (party != nullptr && party->RefIfNonZero()) {
121 mu_.Unlock();
122 // Activity still exists and we have a reference: wake it up, which will
123 // drop the ref.
124 (party->*wakeup_method)(wakeup_mask);
125 } else {
126 // Could not get the activity - it's either gone or going. No need to wake
127 // it up!
128 mu_.Unlock();
129 }
130 // Drop the ref to the handle (we have one ref = one wakeup semantics).
131 Unref();
132 }
133
134 // Activity needs to wake up (if it still exists!) - wake it up, and drop the
135 // ref that was kept for this handle.
Wakeup(WakeupMask wakeup_mask)136 void Wakeup(WakeupMask wakeup_mask) override ABSL_LOCKS_EXCLUDED(mu_) {
137 WakeupGeneric(wakeup_mask, &Party::Wakeup);
138 }
139
WakeupAsync(WakeupMask wakeup_mask)140 void WakeupAsync(WakeupMask wakeup_mask) override ABSL_LOCKS_EXCLUDED(mu_) {
141 WakeupGeneric(wakeup_mask, &Party::WakeupAsync);
142 }
143
Drop(WakeupMask)144 void Drop(WakeupMask) override { Unref(); }
145
ActivityDebugTag(WakeupMask) const146 std::string ActivityDebugTag(WakeupMask) const override {
147 MutexLock lock(&mu_);
148 return party_ == nullptr ? "<unknown>" : party_->DebugTag();
149 }
150
151 private:
152 // Unref the Handle (not the activity).
Unref()153 void Unref() {
154 if (1 == refs_.fetch_sub(1, std::memory_order_acq_rel)) {
155 delete this;
156 }
157 }
158
159 // Two initial refs: one for the waiter that caused instantiation, one for the
160 // party.
161 std::atomic<size_t> refs_{2};
162 mutable Mutex mu_;
163 Party* party_ ABSL_GUARDED_BY(mu_);
164 };
165
MakeNonOwningWakeable(Party * party)166 Wakeable* Party::Participant::MakeNonOwningWakeable(Party* party) {
167 if (handle_ == nullptr) {
168 handle_ = new Handle(party);
169 return handle_;
170 }
171 handle_->Ref();
172 return handle_;
173 }
174
~Participant()175 Party::Participant::~Participant() {
176 if (handle_ != nullptr) {
177 handle_->DropActivity();
178 }
179 }
180
~Party()181 Party::~Party() {}
182
CancelRemainingParticipants()183 void Party::CancelRemainingParticipants() {
184 ScopedActivity activity(this);
185 for (size_t i = 0; i < party_detail::kMaxParticipants; i++) {
186 if (auto* p =
187 participants_[i].exchange(nullptr, std::memory_order_acquire)) {
188 p->Destroy();
189 }
190 }
191 }
192
ActivityDebugTag(WakeupMask wakeup_mask) const193 std::string Party::ActivityDebugTag(WakeupMask wakeup_mask) const {
194 return absl::StrFormat("%s [parts:%x]", DebugTag(), wakeup_mask);
195 }
196
MakeOwningWaker()197 Waker Party::MakeOwningWaker() {
198 GPR_DEBUG_ASSERT(currently_polling_ != kNotPolling);
199 IncrementRefCount();
200 return Waker(this, 1u << currently_polling_);
201 }
202
MakeNonOwningWaker()203 Waker Party::MakeNonOwningWaker() {
204 GPR_DEBUG_ASSERT(currently_polling_ != kNotPolling);
205 return Waker(participants_[currently_polling_]
206 .load(std::memory_order_relaxed)
207 ->MakeNonOwningWakeable(this),
208 1u << currently_polling_);
209 }
210
ForceImmediateRepoll(WakeupMask mask)211 void Party::ForceImmediateRepoll(WakeupMask mask) {
212 GPR_DEBUG_ASSERT(is_current());
213 sync_.ForceImmediateRepoll(mask);
214 }
215
RunLocked()216 void Party::RunLocked() {
217 // If there is a party running, then we don't run it immediately
218 // but instead add it to the end of the list of parties to run.
219 // This enables a fairly straightforward batching of work from a
220 // call to a transport (or back again).
221 if (g_current_party_run_next != nullptr) {
222 if (*g_current_party_run_next == nullptr) {
223 *g_current_party_run_next = this;
224 } else {
225 // But if there's already a party queued, we're better off asking event
226 // engine to run it so we can spread load.
227 event_engine()->Run([this]() {
228 ApplicationCallbackExecCtx app_exec_ctx;
229 ExecCtx exec_ctx;
230 RunLocked();
231 });
232 }
233 return;
234 }
235 auto body = [this]() {
236 GPR_DEBUG_ASSERT(g_current_party_run_next == nullptr);
237 Party* run_next = nullptr;
238 g_current_party_run_next = &run_next;
239 const bool done = RunParty();
240 GPR_DEBUG_ASSERT(g_current_party_run_next == &run_next);
241 g_current_party_run_next = nullptr;
242 if (done) {
243 ScopedActivity activity(this);
244 PartyOver();
245 }
246 if (run_next != nullptr) {
247 run_next->RunLocked();
248 }
249 };
250 #ifdef GRPC_MAXIMIZE_THREADYNESS
251 Thread thd(
252 "RunParty",
253 [body]() {
254 ApplicationCallbackExecCtx app_exec_ctx;
255 ExecCtx exec_ctx;
256 body();
257 },
258 nullptr, Thread::Options().set_joinable(false));
259 thd.Start();
260 #else
261 body();
262 #endif
263 }
264
RunParty()265 bool Party::RunParty() {
266 ScopedActivity activity(this);
267 return sync_.RunParty([this](int i) { return RunOneParticipant(i); });
268 }
269
RunOneParticipant(int i)270 bool Party::RunOneParticipant(int i) {
271 // If the participant is null, skip.
272 // This allows participants to complete whilst wakers still exist
273 // somewhere.
274 auto* participant = participants_[i].load(std::memory_order_acquire);
275 if (participant == nullptr) {
276 if (grpc_trace_promise_primitives.enabled()) {
277 gpr_log(GPR_DEBUG, "%s[party] wakeup %d already complete",
278 DebugTag().c_str(), i);
279 }
280 return false;
281 }
282 absl::string_view name;
283 if (grpc_trace_promise_primitives.enabled()) {
284 name = participant->name();
285 gpr_log(GPR_DEBUG, "%s[%s] begin job %d", DebugTag().c_str(),
286 std::string(name).c_str(), i);
287 }
288 // Poll the participant.
289 currently_polling_ = i;
290 bool done = participant->PollParticipantPromise();
291 currently_polling_ = kNotPolling;
292 if (done) {
293 if (!name.empty()) {
294 gpr_log(GPR_DEBUG, "%s[%s] end poll and finish job %d",
295 DebugTag().c_str(), std::string(name).c_str(), i);
296 }
297 participants_[i].store(nullptr, std::memory_order_relaxed);
298 } else if (!name.empty()) {
299 gpr_log(GPR_DEBUG, "%s[%s] end poll", DebugTag().c_str(),
300 std::string(name).c_str());
301 }
302 return done;
303 }
304
AddParticipants(Participant ** participants,size_t count)305 void Party::AddParticipants(Participant** participants, size_t count) {
306 bool run_party = sync_.AddParticipantsAndRef(count, [this, participants,
307 count](size_t* slots) {
308 for (size_t i = 0; i < count; i++) {
309 if (grpc_trace_party_state.enabled()) {
310 gpr_log(GPR_DEBUG,
311 "Party %p AddParticipant: %s @ %" PRIdPTR,
312 &sync_, std::string(participants[i]->name()).c_str(), slots[i]);
313 }
314 participants_[slots[i]].store(participants[i], std::memory_order_release);
315 }
316 });
317 if (run_party) RunLocked();
318 Unref();
319 }
320
Wakeup(WakeupMask wakeup_mask)321 void Party::Wakeup(WakeupMask wakeup_mask) {
322 if (sync_.ScheduleWakeup(wakeup_mask)) RunLocked();
323 Unref();
324 }
325
WakeupAsync(WakeupMask wakeup_mask)326 void Party::WakeupAsync(WakeupMask wakeup_mask) {
327 if (sync_.ScheduleWakeup(wakeup_mask)) {
328 event_engine()->Run([this]() {
329 ApplicationCallbackExecCtx app_exec_ctx;
330 ExecCtx exec_ctx;
331 RunLocked();
332 Unref();
333 });
334 } else {
335 Unref();
336 }
337 }
338
Drop(WakeupMask)339 void Party::Drop(WakeupMask) { Unref(); }
340
PartyIsOver()341 void Party::PartyIsOver() {
342 ScopedActivity activity(this);
343 PartyOver();
344 }
345
346 } // namespace grpc_core
347