xref: /aosp_15_r20/external/grpc-grpc/src/core/lib/promise/party.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/support/port_platform.h>
16 
17 #include "src/core/lib/promise/party.h"
18 
19 #include <atomic>
20 
21 #include "absl/base/thread_annotations.h"
22 #include "absl/strings/str_format.h"
23 
24 #include <grpc/support/log.h>
25 
26 #include "src/core/lib/gprpp/sync.h"
27 #include "src/core/lib/iomgr/exec_ctx.h"
28 #include "src/core/lib/promise/activity.h"
29 #include "src/core/lib/promise/trace.h"
30 
31 #ifdef GRPC_MAXIMIZE_THREADYNESS
32 #include "src/core/lib/gprpp/thd.h"       // IWYU pragma: keep
33 #include "src/core/lib/iomgr/exec_ctx.h"  // IWYU pragma: keep
34 #endif
35 
36 grpc_core::DebugOnlyTraceFlag grpc_trace_party_state(false, "party_state");
37 
38 namespace grpc_core {
39 
40 namespace {
41 // TODO(ctiller): Once all activities are parties we can remove this.
42 thread_local Party** g_current_party_run_next = nullptr;
43 }  // namespace
44 
45 ///////////////////////////////////////////////////////////////////////////////
46 // PartySyncUsingAtomics
47 
RefIfNonZero()48 GRPC_MUST_USE_RESULT bool PartySyncUsingAtomics::RefIfNonZero() {
49   auto count = state_.load(std::memory_order_relaxed);
50   do {
51     // If zero, we are done (without an increment). If not, we must do a CAS
52     // to maintain the contract: do not increment the counter if it is already
53     // zero
54     if (count == 0) {
55       return false;
56     }
57   } while (!state_.compare_exchange_weak(count, count + kOneRef,
58                                          std::memory_order_acq_rel,
59                                          std::memory_order_relaxed));
60   LogStateChange("RefIfNonZero", count, count + kOneRef);
61   return true;
62 }
63 
UnreffedLast()64 bool PartySyncUsingAtomics::UnreffedLast() {
65   uint64_t prev_state =
66       state_.fetch_or(kDestroying | kLocked, std::memory_order_acq_rel);
67   LogStateChange("UnreffedLast", prev_state,
68                  prev_state | kDestroying | kLocked);
69   return (prev_state & kLocked) == 0;
70 }
71 
ScheduleWakeup(WakeupMask mask)72 bool PartySyncUsingAtomics::ScheduleWakeup(WakeupMask mask) {
73   // Or in the wakeup bit for the participant, AND the locked bit.
74   uint64_t prev_state = state_.fetch_or((mask & kWakeupMask) | kLocked,
75                                         std::memory_order_acq_rel);
76   LogStateChange("ScheduleWakeup", prev_state,
77                  prev_state | (mask & kWakeupMask) | kLocked);
78   // If the lock was not held now we hold it, so we need to run.
79   return ((prev_state & kLocked) == 0);
80 }
81 
82 ///////////////////////////////////////////////////////////////////////////////
83 // PartySyncUsingMutex
84 
ScheduleWakeup(WakeupMask mask)85 bool PartySyncUsingMutex::ScheduleWakeup(WakeupMask mask) {
86   MutexLock lock(&mu_);
87   wakeups_ |= mask;
88   return !std::exchange(locked_, true);
89 }
90 
91 ///////////////////////////////////////////////////////////////////////////////
92 // Party::Handle
93 
94 // Weak handle to a Party.
95 // Handle can persist while Party goes away.
96 class Party::Handle final : public Wakeable {
97  public:
Handle(Party * party)98   explicit Handle(Party* party) : party_(party) {}
99 
100   // Ref the Handle (not the activity).
Ref()101   void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
102 
103   // Activity is going away... drop its reference and sever the connection back.
DropActivity()104   void DropActivity() ABSL_LOCKS_EXCLUDED(mu_) {
105     mu_.Lock();
106     GPR_ASSERT(party_ != nullptr);
107     party_ = nullptr;
108     mu_.Unlock();
109     Unref();
110   }
111 
WakeupGeneric(WakeupMask wakeup_mask,void (Party::* wakeup_method)(WakeupMask))112   void WakeupGeneric(WakeupMask wakeup_mask,
113                      void (Party::*wakeup_method)(WakeupMask))
114       ABSL_LOCKS_EXCLUDED(mu_) {
115     mu_.Lock();
116     // Note that activity refcount can drop to zero, but we could win the lock
117     // against DropActivity, so we need to only increase activities refcount if
118     // it is non-zero.
119     Party* party = party_;
120     if (party != nullptr && party->RefIfNonZero()) {
121       mu_.Unlock();
122       // Activity still exists and we have a reference: wake it up, which will
123       // drop the ref.
124       (party->*wakeup_method)(wakeup_mask);
125     } else {
126       // Could not get the activity - it's either gone or going. No need to wake
127       // it up!
128       mu_.Unlock();
129     }
130     // Drop the ref to the handle (we have one ref = one wakeup semantics).
131     Unref();
132   }
133 
134   // Activity needs to wake up (if it still exists!) - wake it up, and drop the
135   // ref that was kept for this handle.
Wakeup(WakeupMask wakeup_mask)136   void Wakeup(WakeupMask wakeup_mask) override ABSL_LOCKS_EXCLUDED(mu_) {
137     WakeupGeneric(wakeup_mask, &Party::Wakeup);
138   }
139 
WakeupAsync(WakeupMask wakeup_mask)140   void WakeupAsync(WakeupMask wakeup_mask) override ABSL_LOCKS_EXCLUDED(mu_) {
141     WakeupGeneric(wakeup_mask, &Party::WakeupAsync);
142   }
143 
Drop(WakeupMask)144   void Drop(WakeupMask) override { Unref(); }
145 
ActivityDebugTag(WakeupMask) const146   std::string ActivityDebugTag(WakeupMask) const override {
147     MutexLock lock(&mu_);
148     return party_ == nullptr ? "<unknown>" : party_->DebugTag();
149   }
150 
151  private:
152   // Unref the Handle (not the activity).
Unref()153   void Unref() {
154     if (1 == refs_.fetch_sub(1, std::memory_order_acq_rel)) {
155       delete this;
156     }
157   }
158 
159   // Two initial refs: one for the waiter that caused instantiation, one for the
160   // party.
161   std::atomic<size_t> refs_{2};
162   mutable Mutex mu_;
163   Party* party_ ABSL_GUARDED_BY(mu_);
164 };
165 
MakeNonOwningWakeable(Party * party)166 Wakeable* Party::Participant::MakeNonOwningWakeable(Party* party) {
167   if (handle_ == nullptr) {
168     handle_ = new Handle(party);
169     return handle_;
170   }
171   handle_->Ref();
172   return handle_;
173 }
174 
~Participant()175 Party::Participant::~Participant() {
176   if (handle_ != nullptr) {
177     handle_->DropActivity();
178   }
179 }
180 
~Party()181 Party::~Party() {}
182 
CancelRemainingParticipants()183 void Party::CancelRemainingParticipants() {
184   ScopedActivity activity(this);
185   for (size_t i = 0; i < party_detail::kMaxParticipants; i++) {
186     if (auto* p =
187             participants_[i].exchange(nullptr, std::memory_order_acquire)) {
188       p->Destroy();
189     }
190   }
191 }
192 
ActivityDebugTag(WakeupMask wakeup_mask) const193 std::string Party::ActivityDebugTag(WakeupMask wakeup_mask) const {
194   return absl::StrFormat("%s [parts:%x]", DebugTag(), wakeup_mask);
195 }
196 
MakeOwningWaker()197 Waker Party::MakeOwningWaker() {
198   GPR_DEBUG_ASSERT(currently_polling_ != kNotPolling);
199   IncrementRefCount();
200   return Waker(this, 1u << currently_polling_);
201 }
202 
MakeNonOwningWaker()203 Waker Party::MakeNonOwningWaker() {
204   GPR_DEBUG_ASSERT(currently_polling_ != kNotPolling);
205   return Waker(participants_[currently_polling_]
206                    .load(std::memory_order_relaxed)
207                    ->MakeNonOwningWakeable(this),
208                1u << currently_polling_);
209 }
210 
ForceImmediateRepoll(WakeupMask mask)211 void Party::ForceImmediateRepoll(WakeupMask mask) {
212   GPR_DEBUG_ASSERT(is_current());
213   sync_.ForceImmediateRepoll(mask);
214 }
215 
RunLocked()216 void Party::RunLocked() {
217   // If there is a party running, then we don't run it immediately
218   // but instead add it to the end of the list of parties to run.
219   // This enables a fairly straightforward batching of work from a
220   // call to a transport (or back again).
221   if (g_current_party_run_next != nullptr) {
222     if (*g_current_party_run_next == nullptr) {
223       *g_current_party_run_next = this;
224     } else {
225       // But if there's already a party queued, we're better off asking event
226       // engine to run it so we can spread load.
227       event_engine()->Run([this]() {
228         ApplicationCallbackExecCtx app_exec_ctx;
229         ExecCtx exec_ctx;
230         RunLocked();
231       });
232     }
233     return;
234   }
235   auto body = [this]() {
236     GPR_DEBUG_ASSERT(g_current_party_run_next == nullptr);
237     Party* run_next = nullptr;
238     g_current_party_run_next = &run_next;
239     const bool done = RunParty();
240     GPR_DEBUG_ASSERT(g_current_party_run_next == &run_next);
241     g_current_party_run_next = nullptr;
242     if (done) {
243       ScopedActivity activity(this);
244       PartyOver();
245     }
246     if (run_next != nullptr) {
247       run_next->RunLocked();
248     }
249   };
250 #ifdef GRPC_MAXIMIZE_THREADYNESS
251   Thread thd(
252       "RunParty",
253       [body]() {
254         ApplicationCallbackExecCtx app_exec_ctx;
255         ExecCtx exec_ctx;
256         body();
257       },
258       nullptr, Thread::Options().set_joinable(false));
259   thd.Start();
260 #else
261   body();
262 #endif
263 }
264 
RunParty()265 bool Party::RunParty() {
266   ScopedActivity activity(this);
267   return sync_.RunParty([this](int i) { return RunOneParticipant(i); });
268 }
269 
RunOneParticipant(int i)270 bool Party::RunOneParticipant(int i) {
271   // If the participant is null, skip.
272   // This allows participants to complete whilst wakers still exist
273   // somewhere.
274   auto* participant = participants_[i].load(std::memory_order_acquire);
275   if (participant == nullptr) {
276     if (grpc_trace_promise_primitives.enabled()) {
277       gpr_log(GPR_DEBUG, "%s[party] wakeup %d already complete",
278               DebugTag().c_str(), i);
279     }
280     return false;
281   }
282   absl::string_view name;
283   if (grpc_trace_promise_primitives.enabled()) {
284     name = participant->name();
285     gpr_log(GPR_DEBUG, "%s[%s] begin job %d", DebugTag().c_str(),
286             std::string(name).c_str(), i);
287   }
288   // Poll the participant.
289   currently_polling_ = i;
290   bool done = participant->PollParticipantPromise();
291   currently_polling_ = kNotPolling;
292   if (done) {
293     if (!name.empty()) {
294       gpr_log(GPR_DEBUG, "%s[%s] end poll and finish job %d",
295               DebugTag().c_str(), std::string(name).c_str(), i);
296     }
297     participants_[i].store(nullptr, std::memory_order_relaxed);
298   } else if (!name.empty()) {
299     gpr_log(GPR_DEBUG, "%s[%s] end poll", DebugTag().c_str(),
300             std::string(name).c_str());
301   }
302   return done;
303 }
304 
AddParticipants(Participant ** participants,size_t count)305 void Party::AddParticipants(Participant** participants, size_t count) {
306   bool run_party = sync_.AddParticipantsAndRef(count, [this, participants,
307                                                        count](size_t* slots) {
308     for (size_t i = 0; i < count; i++) {
309       if (grpc_trace_party_state.enabled()) {
310         gpr_log(GPR_DEBUG,
311                 "Party %p                 AddParticipant: %s @ %" PRIdPTR,
312                 &sync_, std::string(participants[i]->name()).c_str(), slots[i]);
313       }
314       participants_[slots[i]].store(participants[i], std::memory_order_release);
315     }
316   });
317   if (run_party) RunLocked();
318   Unref();
319 }
320 
Wakeup(WakeupMask wakeup_mask)321 void Party::Wakeup(WakeupMask wakeup_mask) {
322   if (sync_.ScheduleWakeup(wakeup_mask)) RunLocked();
323   Unref();
324 }
325 
WakeupAsync(WakeupMask wakeup_mask)326 void Party::WakeupAsync(WakeupMask wakeup_mask) {
327   if (sync_.ScheduleWakeup(wakeup_mask)) {
328     event_engine()->Run([this]() {
329       ApplicationCallbackExecCtx app_exec_ctx;
330       ExecCtx exec_ctx;
331       RunLocked();
332       Unref();
333     });
334   } else {
335     Unref();
336   }
337 }
338 
Drop(WakeupMask)339 void Party::Drop(WakeupMask) { Unref(); }
340 
PartyIsOver()341 void Party::PartyIsOver() {
342   ScopedActivity activity(this);
343   PartyOver();
344 }
345 
346 }  // namespace grpc_core
347