1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/support/port_platform.h>
16
17 #include "src/core/lib/promise/party.h"
18
19 #include <atomic>
20 #include <initializer_list>
21
22 #include "absl/base/thread_annotations.h"
23 #include "absl/strings/str_format.h"
24
25 #include <grpc/support/log.h>
26
27 #include "src/core/lib/debug/trace.h"
28 #include "src/core/lib/gprpp/sync.h"
29 #include "src/core/lib/iomgr/exec_ctx.h"
30 #include "src/core/lib/promise/activity.h"
31 #include "src/core/lib/promise/trace.h"
32
33 #ifdef GRPC_MAXIMIZE_THREADYNESS
34 #include "src/core/lib/gprpp/thd.h" // IWYU pragma: keep
35 #include "src/core/lib/iomgr/exec_ctx.h" // IWYU pragma: keep
36 #endif
37
38 namespace grpc_core {
39
40 ///////////////////////////////////////////////////////////////////////////////
41 // PartySyncUsingAtomics
42
RefIfNonZero()43 GRPC_MUST_USE_RESULT bool PartySyncUsingAtomics::RefIfNonZero() {
44 auto count = state_.load(std::memory_order_relaxed);
45 do {
46 // If zero, we are done (without an increment). If not, we must do a CAS
47 // to maintain the contract: do not increment the counter if it is already
48 // zero
49 if (count == 0) {
50 return false;
51 }
52 } while (!state_.compare_exchange_weak(count, count + kOneRef,
53 std::memory_order_acq_rel,
54 std::memory_order_relaxed));
55 return true;
56 }
57
UnreffedLast()58 bool PartySyncUsingAtomics::UnreffedLast() {
59 uint64_t prev_state =
60 state_.fetch_or(kDestroying | kLocked, std::memory_order_acq_rel);
61 return (prev_state & kLocked) == 0;
62 }
63
ScheduleWakeup(WakeupMask mask)64 bool PartySyncUsingAtomics::ScheduleWakeup(WakeupMask mask) {
65 // Or in the wakeup bit for the participant, AND the locked bit.
66 uint64_t prev_state = state_.fetch_or((mask & kWakeupMask) | kLocked,
67 std::memory_order_acq_rel);
68 // If the lock was not held now we hold it, so we need to run.
69 return ((prev_state & kLocked) == 0);
70 }
71
72 ///////////////////////////////////////////////////////////////////////////////
73 // PartySyncUsingMutex
74
ScheduleWakeup(WakeupMask mask)75 bool PartySyncUsingMutex::ScheduleWakeup(WakeupMask mask) {
76 MutexLock lock(&mu_);
77 wakeups_ |= mask;
78 return !std::exchange(locked_, true);
79 }
80
81 ///////////////////////////////////////////////////////////////////////////////
82 // Party::Handle
83
84 // Weak handle to a Party.
85 // Handle can persist while Party goes away.
86 class Party::Handle final : public Wakeable {
87 public:
Handle(Party * party)88 explicit Handle(Party* party) : party_(party) {}
89
90 // Ref the Handle (not the activity).
Ref()91 void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
92
93 // Activity is going away... drop its reference and sever the connection back.
DropActivity()94 void DropActivity() ABSL_LOCKS_EXCLUDED(mu_) {
95 mu_.Lock();
96 GPR_ASSERT(party_ != nullptr);
97 party_ = nullptr;
98 mu_.Unlock();
99 Unref();
100 }
101
WakeupGeneric(WakeupMask wakeup_mask,void (Party::* wakeup_method)(WakeupMask))102 void WakeupGeneric(WakeupMask wakeup_mask,
103 void (Party::*wakeup_method)(WakeupMask))
104 ABSL_LOCKS_EXCLUDED(mu_) {
105 mu_.Lock();
106 // Note that activity refcount can drop to zero, but we could win the lock
107 // against DropActivity, so we need to only increase activities refcount if
108 // it is non-zero.
109 Party* party = party_;
110 if (party != nullptr && party->RefIfNonZero()) {
111 mu_.Unlock();
112 // Activity still exists and we have a reference: wake it up, which will
113 // drop the ref.
114 (party->*wakeup_method)(wakeup_mask);
115 } else {
116 // Could not get the activity - it's either gone or going. No need to wake
117 // it up!
118 mu_.Unlock();
119 }
120 // Drop the ref to the handle (we have one ref = one wakeup semantics).
121 Unref();
122 }
123
124 // Activity needs to wake up (if it still exists!) - wake it up, and drop the
125 // ref that was kept for this handle.
Wakeup(WakeupMask wakeup_mask)126 void Wakeup(WakeupMask wakeup_mask) override ABSL_LOCKS_EXCLUDED(mu_) {
127 WakeupGeneric(wakeup_mask, &Party::Wakeup);
128 }
129
WakeupAsync(WakeupMask wakeup_mask)130 void WakeupAsync(WakeupMask wakeup_mask) override ABSL_LOCKS_EXCLUDED(mu_) {
131 WakeupGeneric(wakeup_mask, &Party::WakeupAsync);
132 }
133
Drop(WakeupMask)134 void Drop(WakeupMask) override { Unref(); }
135
ActivityDebugTag(WakeupMask) const136 std::string ActivityDebugTag(WakeupMask) const override {
137 MutexLock lock(&mu_);
138 return party_ == nullptr ? "<unknown>" : party_->DebugTag();
139 }
140
141 private:
142 // Unref the Handle (not the activity).
Unref()143 void Unref() {
144 if (1 == refs_.fetch_sub(1, std::memory_order_acq_rel)) {
145 delete this;
146 }
147 }
148
149 // Two initial refs: one for the waiter that caused instantiation, one for the
150 // party.
151 std::atomic<size_t> refs_{2};
152 mutable Mutex mu_;
153 Party* party_ ABSL_GUARDED_BY(mu_);
154 };
155
MakeNonOwningWakeable(Party * party)156 Wakeable* Party::Participant::MakeNonOwningWakeable(Party* party) {
157 if (handle_ == nullptr) {
158 handle_ = new Handle(party);
159 return handle_;
160 }
161 handle_->Ref();
162 return handle_;
163 }
164
~Participant()165 Party::Participant::~Participant() {
166 if (handle_ != nullptr) {
167 handle_->DropActivity();
168 }
169 }
170
~Party()171 Party::~Party() {}
172
CancelRemainingParticipants()173 void Party::CancelRemainingParticipants() {
174 ScopedActivity activity(this);
175 promise_detail::Context<Arena> arena_ctx(arena_);
176 for (size_t i = 0; i < party_detail::kMaxParticipants; i++) {
177 if (auto* p =
178 participants_[i].exchange(nullptr, std::memory_order_acquire)) {
179 p->Destroy();
180 }
181 }
182 }
183
ActivityDebugTag(WakeupMask wakeup_mask) const184 std::string Party::ActivityDebugTag(WakeupMask wakeup_mask) const {
185 return absl::StrFormat("%s [parts:%x]", DebugTag(), wakeup_mask);
186 }
187
MakeOwningWaker()188 Waker Party::MakeOwningWaker() {
189 GPR_DEBUG_ASSERT(currently_polling_ != kNotPolling);
190 IncrementRefCount();
191 return Waker(this, 1u << currently_polling_);
192 }
193
MakeNonOwningWaker()194 Waker Party::MakeNonOwningWaker() {
195 GPR_DEBUG_ASSERT(currently_polling_ != kNotPolling);
196 return Waker(participants_[currently_polling_]
197 .load(std::memory_order_relaxed)
198 ->MakeNonOwningWakeable(this),
199 1u << currently_polling_);
200 }
201
ForceImmediateRepoll(WakeupMask mask)202 void Party::ForceImmediateRepoll(WakeupMask mask) {
203 GPR_DEBUG_ASSERT(is_current());
204 sync_.ForceImmediateRepoll(mask);
205 }
206
RunLocked()207 void Party::RunLocked() {
208 auto body = [this]() {
209 if (RunParty()) {
210 ScopedActivity activity(this);
211 PartyOver();
212 }
213 };
214 #ifdef GRPC_MAXIMIZE_THREADYNESS
215 Thread thd(
216 "RunParty",
217 [body]() {
218 ApplicationCallbackExecCtx app_exec_ctx;
219 ExecCtx exec_ctx;
220 body();
221 },
222 nullptr, Thread::Options().set_joinable(false));
223 thd.Start();
224 #else
225 body();
226 #endif
227 }
228
RunParty()229 bool Party::RunParty() {
230 ScopedActivity activity(this);
231 promise_detail::Context<Arena> arena_ctx(arena_);
232 return sync_.RunParty([this](int i) {
233 // If the participant is null, skip.
234 // This allows participants to complete whilst wakers still exist
235 // somewhere.
236 auto* participant = participants_[i].load(std::memory_order_acquire);
237 if (participant == nullptr) {
238 if (grpc_trace_promise_primitives.enabled()) {
239 gpr_log(GPR_DEBUG, "%s[party] wakeup %d already complete",
240 DebugTag().c_str(), i);
241 }
242 return false;
243 }
244 absl::string_view name;
245 if (grpc_trace_promise_primitives.enabled()) {
246 name = participant->name();
247 gpr_log(GPR_DEBUG, "%s[%s] begin job %d", DebugTag().c_str(),
248 std::string(name).c_str(), i);
249 }
250 // Poll the participant.
251 currently_polling_ = i;
252 bool done = participant->Poll();
253 currently_polling_ = kNotPolling;
254 if (done) {
255 if (!name.empty()) {
256 gpr_log(GPR_DEBUG, "%s[%s] end poll and finish job %d",
257 DebugTag().c_str(), std::string(name).c_str(), i);
258 }
259 participants_[i].store(nullptr, std::memory_order_relaxed);
260 } else if (!name.empty()) {
261 gpr_log(GPR_DEBUG, "%s[%s] end poll", DebugTag().c_str(),
262 std::string(name).c_str());
263 }
264 return done;
265 });
266 }
267
AddParticipants(Participant ** participants,size_t count)268 void Party::AddParticipants(Participant** participants, size_t count) {
269 bool run_party = sync_.AddParticipantsAndRef(count, [this, participants,
270 count](size_t* slots) {
271 for (size_t i = 0; i < count; i++) {
272 participants_[slots[i]].store(participants[i], std::memory_order_release);
273 }
274 });
275 if (run_party) RunLocked();
276 Unref();
277 }
278
Wakeup(WakeupMask wakeup_mask)279 void Party::Wakeup(WakeupMask wakeup_mask) {
280 if (sync_.ScheduleWakeup(wakeup_mask)) RunLocked();
281 Unref();
282 }
283
WakeupAsync(WakeupMask wakeup_mask)284 void Party::WakeupAsync(WakeupMask wakeup_mask) {
285 if (sync_.ScheduleWakeup(wakeup_mask)) {
286 event_engine()->Run([this]() {
287 ApplicationCallbackExecCtx app_exec_ctx;
288 ExecCtx exec_ctx;
289 RunLocked();
290 Unref();
291 });
292 } else {
293 Unref();
294 }
295 }
296
Drop(WakeupMask)297 void Party::Drop(WakeupMask) { Unref(); }
298
PartyIsOver()299 void Party::PartyIsOver() {
300 ScopedActivity activity(this);
301 PartyOver();
302 }
303
304 } // namespace grpc_core
305