1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/support/port_platform.h>
16 
17 #include "src/core/lib/promise/party.h"
18 
19 #include <atomic>
20 #include <initializer_list>
21 
22 #include "absl/base/thread_annotations.h"
23 #include "absl/strings/str_format.h"
24 
25 #include <grpc/support/log.h>
26 
27 #include "src/core/lib/debug/trace.h"
28 #include "src/core/lib/gprpp/sync.h"
29 #include "src/core/lib/iomgr/exec_ctx.h"
30 #include "src/core/lib/promise/activity.h"
31 #include "src/core/lib/promise/trace.h"
32 
33 #ifdef GRPC_MAXIMIZE_THREADYNESS
34 #include "src/core/lib/gprpp/thd.h"       // IWYU pragma: keep
35 #include "src/core/lib/iomgr/exec_ctx.h"  // IWYU pragma: keep
36 #endif
37 
38 namespace grpc_core {
39 
40 ///////////////////////////////////////////////////////////////////////////////
41 // PartySyncUsingAtomics
42 
RefIfNonZero()43 GRPC_MUST_USE_RESULT bool PartySyncUsingAtomics::RefIfNonZero() {
44   auto count = state_.load(std::memory_order_relaxed);
45   do {
46     // If zero, we are done (without an increment). If not, we must do a CAS
47     // to maintain the contract: do not increment the counter if it is already
48     // zero
49     if (count == 0) {
50       return false;
51     }
52   } while (!state_.compare_exchange_weak(count, count + kOneRef,
53                                          std::memory_order_acq_rel,
54                                          std::memory_order_relaxed));
55   return true;
56 }
57 
UnreffedLast()58 bool PartySyncUsingAtomics::UnreffedLast() {
59   uint64_t prev_state =
60       state_.fetch_or(kDestroying | kLocked, std::memory_order_acq_rel);
61   return (prev_state & kLocked) == 0;
62 }
63 
ScheduleWakeup(WakeupMask mask)64 bool PartySyncUsingAtomics::ScheduleWakeup(WakeupMask mask) {
65   // Or in the wakeup bit for the participant, AND the locked bit.
66   uint64_t prev_state = state_.fetch_or((mask & kWakeupMask) | kLocked,
67                                         std::memory_order_acq_rel);
68   // If the lock was not held now we hold it, so we need to run.
69   return ((prev_state & kLocked) == 0);
70 }
71 
72 ///////////////////////////////////////////////////////////////////////////////
73 // PartySyncUsingMutex
74 
ScheduleWakeup(WakeupMask mask)75 bool PartySyncUsingMutex::ScheduleWakeup(WakeupMask mask) {
76   MutexLock lock(&mu_);
77   wakeups_ |= mask;
78   return !std::exchange(locked_, true);
79 }
80 
81 ///////////////////////////////////////////////////////////////////////////////
82 // Party::Handle
83 
84 // Weak handle to a Party.
85 // Handle can persist while Party goes away.
86 class Party::Handle final : public Wakeable {
87  public:
Handle(Party * party)88   explicit Handle(Party* party) : party_(party) {}
89 
90   // Ref the Handle (not the activity).
Ref()91   void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
92 
93   // Activity is going away... drop its reference and sever the connection back.
DropActivity()94   void DropActivity() ABSL_LOCKS_EXCLUDED(mu_) {
95     mu_.Lock();
96     GPR_ASSERT(party_ != nullptr);
97     party_ = nullptr;
98     mu_.Unlock();
99     Unref();
100   }
101 
WakeupGeneric(WakeupMask wakeup_mask,void (Party::* wakeup_method)(WakeupMask))102   void WakeupGeneric(WakeupMask wakeup_mask,
103                      void (Party::*wakeup_method)(WakeupMask))
104       ABSL_LOCKS_EXCLUDED(mu_) {
105     mu_.Lock();
106     // Note that activity refcount can drop to zero, but we could win the lock
107     // against DropActivity, so we need to only increase activities refcount if
108     // it is non-zero.
109     Party* party = party_;
110     if (party != nullptr && party->RefIfNonZero()) {
111       mu_.Unlock();
112       // Activity still exists and we have a reference: wake it up, which will
113       // drop the ref.
114       (party->*wakeup_method)(wakeup_mask);
115     } else {
116       // Could not get the activity - it's either gone or going. No need to wake
117       // it up!
118       mu_.Unlock();
119     }
120     // Drop the ref to the handle (we have one ref = one wakeup semantics).
121     Unref();
122   }
123 
124   // Activity needs to wake up (if it still exists!) - wake it up, and drop the
125   // ref that was kept for this handle.
Wakeup(WakeupMask wakeup_mask)126   void Wakeup(WakeupMask wakeup_mask) override ABSL_LOCKS_EXCLUDED(mu_) {
127     WakeupGeneric(wakeup_mask, &Party::Wakeup);
128   }
129 
WakeupAsync(WakeupMask wakeup_mask)130   void WakeupAsync(WakeupMask wakeup_mask) override ABSL_LOCKS_EXCLUDED(mu_) {
131     WakeupGeneric(wakeup_mask, &Party::WakeupAsync);
132   }
133 
Drop(WakeupMask)134   void Drop(WakeupMask) override { Unref(); }
135 
ActivityDebugTag(WakeupMask) const136   std::string ActivityDebugTag(WakeupMask) const override {
137     MutexLock lock(&mu_);
138     return party_ == nullptr ? "<unknown>" : party_->DebugTag();
139   }
140 
141  private:
142   // Unref the Handle (not the activity).
Unref()143   void Unref() {
144     if (1 == refs_.fetch_sub(1, std::memory_order_acq_rel)) {
145       delete this;
146     }
147   }
148 
149   // Two initial refs: one for the waiter that caused instantiation, one for the
150   // party.
151   std::atomic<size_t> refs_{2};
152   mutable Mutex mu_;
153   Party* party_ ABSL_GUARDED_BY(mu_);
154 };
155 
MakeNonOwningWakeable(Party * party)156 Wakeable* Party::Participant::MakeNonOwningWakeable(Party* party) {
157   if (handle_ == nullptr) {
158     handle_ = new Handle(party);
159     return handle_;
160   }
161   handle_->Ref();
162   return handle_;
163 }
164 
~Participant()165 Party::Participant::~Participant() {
166   if (handle_ != nullptr) {
167     handle_->DropActivity();
168   }
169 }
170 
~Party()171 Party::~Party() {}
172 
CancelRemainingParticipants()173 void Party::CancelRemainingParticipants() {
174   ScopedActivity activity(this);
175   promise_detail::Context<Arena> arena_ctx(arena_);
176   for (size_t i = 0; i < party_detail::kMaxParticipants; i++) {
177     if (auto* p =
178             participants_[i].exchange(nullptr, std::memory_order_acquire)) {
179       p->Destroy();
180     }
181   }
182 }
183 
ActivityDebugTag(WakeupMask wakeup_mask) const184 std::string Party::ActivityDebugTag(WakeupMask wakeup_mask) const {
185   return absl::StrFormat("%s [parts:%x]", DebugTag(), wakeup_mask);
186 }
187 
MakeOwningWaker()188 Waker Party::MakeOwningWaker() {
189   GPR_DEBUG_ASSERT(currently_polling_ != kNotPolling);
190   IncrementRefCount();
191   return Waker(this, 1u << currently_polling_);
192 }
193 
MakeNonOwningWaker()194 Waker Party::MakeNonOwningWaker() {
195   GPR_DEBUG_ASSERT(currently_polling_ != kNotPolling);
196   return Waker(participants_[currently_polling_]
197                    .load(std::memory_order_relaxed)
198                    ->MakeNonOwningWakeable(this),
199                1u << currently_polling_);
200 }
201 
ForceImmediateRepoll(WakeupMask mask)202 void Party::ForceImmediateRepoll(WakeupMask mask) {
203   GPR_DEBUG_ASSERT(is_current());
204   sync_.ForceImmediateRepoll(mask);
205 }
206 
RunLocked()207 void Party::RunLocked() {
208   auto body = [this]() {
209     if (RunParty()) {
210       ScopedActivity activity(this);
211       PartyOver();
212     }
213   };
214 #ifdef GRPC_MAXIMIZE_THREADYNESS
215   Thread thd(
216       "RunParty",
217       [body]() {
218         ApplicationCallbackExecCtx app_exec_ctx;
219         ExecCtx exec_ctx;
220         body();
221       },
222       nullptr, Thread::Options().set_joinable(false));
223   thd.Start();
224 #else
225   body();
226 #endif
227 }
228 
RunParty()229 bool Party::RunParty() {
230   ScopedActivity activity(this);
231   promise_detail::Context<Arena> arena_ctx(arena_);
232   return sync_.RunParty([this](int i) {
233     // If the participant is null, skip.
234     // This allows participants to complete whilst wakers still exist
235     // somewhere.
236     auto* participant = participants_[i].load(std::memory_order_acquire);
237     if (participant == nullptr) {
238       if (grpc_trace_promise_primitives.enabled()) {
239         gpr_log(GPR_DEBUG, "%s[party] wakeup %d already complete",
240                 DebugTag().c_str(), i);
241       }
242       return false;
243     }
244     absl::string_view name;
245     if (grpc_trace_promise_primitives.enabled()) {
246       name = participant->name();
247       gpr_log(GPR_DEBUG, "%s[%s] begin job %d", DebugTag().c_str(),
248               std::string(name).c_str(), i);
249     }
250     // Poll the participant.
251     currently_polling_ = i;
252     bool done = participant->Poll();
253     currently_polling_ = kNotPolling;
254     if (done) {
255       if (!name.empty()) {
256         gpr_log(GPR_DEBUG, "%s[%s] end poll and finish job %d",
257                 DebugTag().c_str(), std::string(name).c_str(), i);
258       }
259       participants_[i].store(nullptr, std::memory_order_relaxed);
260     } else if (!name.empty()) {
261       gpr_log(GPR_DEBUG, "%s[%s] end poll", DebugTag().c_str(),
262               std::string(name).c_str());
263     }
264     return done;
265   });
266 }
267 
AddParticipants(Participant ** participants,size_t count)268 void Party::AddParticipants(Participant** participants, size_t count) {
269   bool run_party = sync_.AddParticipantsAndRef(count, [this, participants,
270                                                        count](size_t* slots) {
271     for (size_t i = 0; i < count; i++) {
272       participants_[slots[i]].store(participants[i], std::memory_order_release);
273     }
274   });
275   if (run_party) RunLocked();
276   Unref();
277 }
278 
Wakeup(WakeupMask wakeup_mask)279 void Party::Wakeup(WakeupMask wakeup_mask) {
280   if (sync_.ScheduleWakeup(wakeup_mask)) RunLocked();
281   Unref();
282 }
283 
WakeupAsync(WakeupMask wakeup_mask)284 void Party::WakeupAsync(WakeupMask wakeup_mask) {
285   if (sync_.ScheduleWakeup(wakeup_mask)) {
286     event_engine()->Run([this]() {
287       ApplicationCallbackExecCtx app_exec_ctx;
288       ExecCtx exec_ctx;
289       RunLocked();
290       Unref();
291     });
292   } else {
293     Unref();
294   }
295 }
296 
Drop(WakeupMask)297 void Party::Drop(WakeupMask) { Unref(); }
298 
PartyIsOver()299 void Party::PartyIsOver() {
300   ScopedActivity activity(this);
301   PartyOver();
302 }
303 
304 }  // namespace grpc_core
305