1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15
16 #include "pw_async2/dispatcher.h"
17 #include "pw_async2/dispatcher_base.h"
18 #include "pw_function/function.h"
19 #include "pw_sync/mutex.h"
20
21 namespace pw::async2 {
22
23 // A lock guarding OnceReceiver and OnceSender member variables.
24 //
25 // This is an ``InterruptSpinLock`` in order to allow sending values from an
26 // ISR context.
sender_receiver_lock()27 inline pw::sync::InterruptSpinLock& sender_receiver_lock() {
28 static NoDestructor<pw::sync::InterruptSpinLock> lock;
29 return *lock;
30 }
31
32 template <typename T>
33 class OnceSender;
34
35 /// `OnceReceiver` receives the value sent by the `OnceSender` it is constructed
36 /// with. It must be constructed using `MakeOnceSenderAndReceiver`.
37 /// `OnceReceiver::Pend()` is used to poll for the value sent by `OnceSender`.
38 /// `OnceReceiver` is thread safe and may be used on a different thread than
39 /// `OnceSender`.
40 template <typename T>
41 class OnceReceiver final {
42 public:
43 OnceReceiver() = default;
44
45 /// Create an already completed OnceReceiver by constructing the value with
46 /// `value_args`.
47 template <typename... Args>
OnceReceiver(Args &&...value_args)48 explicit OnceReceiver(Args&&... value_args)
49 : value_(std::forward<Args>(value_args)...) {}
50
OnceReceiver(OnceReceiver && other)51 OnceReceiver(OnceReceiver&& other) {
52 std::lock_guard lock(sender_receiver_lock());
53 sender_ = other.sender_;
54 other.sender_ = nullptr;
55 if (sender_) {
56 sender_->receiver_ = this;
57 }
58 if (other.value_.has_value()) {
59 value_.emplace(std::move(other.value_.value()));
60 }
61 other.value_.reset();
62 waker_ = std::move(other.waker_);
63 }
64
65 OnceReceiver(const OnceReceiver&) = delete;
66 OnceReceiver& operator=(const OnceReceiver&) = delete;
67 OnceReceiver& operator=(OnceReceiver&& other) = delete;
68
69 ~OnceReceiver();
70
71 /// Returns `Ready` with a result containing the value once the value has been
72 /// assigned. If the sender is destroyed before sending a value, a `Cancelled`
73 /// result will be returned.
Pend(Context & cx)74 Poll<Result<T>> Pend(Context& cx) {
75 std::lock_guard lock(sender_receiver_lock());
76 if (value_.has_value()) {
77 return Ready(std::move(*value_));
78 } else if (!sender_) {
79 return Ready(Status::Cancelled());
80 }
81 PW_ASYNC_STORE_WAKER(
82 cx,
83 waker_,
84 "OnceReceiver is waiting for a value to be sent into the "
85 "corresponding OnceSender");
86 return Pending();
87 }
88
89 private:
90 template <typename U>
91 friend std::pair<OnceSender<U>, OnceReceiver<U>> MakeOnceSenderAndReceiver();
92 template <typename U>
93 friend void InitializeOnceSenderAndReceiver(OnceSender<U>& sender,
94 OnceReceiver<U>& receiver);
95 friend class OnceSender<T>;
96
97 OnceSender<T>* sender_ PW_GUARDED_BY(sender_receiver_lock()) = nullptr;
98 std::optional<T> value_ PW_GUARDED_BY(sender_receiver_lock()) = std::nullopt;
99 Waker waker_;
100 };
101
102 /// `OnceSender` sends the value received by the `OnceReceiver` it is
103 /// constructed with. It must be constructed using `MakeOnceSenderAndReceiver`.
104 /// `OnceSender` is thread safe and may be used on a different thread than
105 /// `OnceReceiver`.
106 template <typename T>
107 class OnceSender final {
108 public:
109 OnceSender() = default;
110
~OnceSender()111 ~OnceSender() {
112 std::lock_guard lock(sender_receiver_lock());
113 if (receiver_) {
114 std::move(receiver_->waker_).Wake();
115 receiver_->sender_ = nullptr;
116 }
117 }
118
OnceSender(OnceSender && other)119 OnceSender(OnceSender&& other) {
120 std::lock_guard lock(sender_receiver_lock());
121 receiver_ = other.receiver_;
122 other.receiver_ = nullptr;
123 if (receiver_) {
124 receiver_->sender_ = this;
125 }
126 }
127
128 OnceSender(const OnceSender&) = delete;
129 OnceSender& operator=(const OnceSender&) = delete;
130 OnceSender& operator=(OnceSender&& other) = delete;
131
132 /// Construct the sent value in place and wake the `OnceReceiver`.
133 template <typename... Args>
emplace(Args &&...args)134 void emplace(Args&&... args) {
135 std::lock_guard lock(sender_receiver_lock());
136 if (receiver_) {
137 receiver_->value_.emplace(std::forward<Args>(args)...);
138 std::move(receiver_->waker_).Wake();
139 receiver_->sender_ = nullptr;
140 receiver_ = nullptr;
141 }
142 }
143
144 OnceSender& operator=(const T& value) {
145 emplace(value);
146 return *this;
147 }
148 OnceSender& operator=(T&& value) {
149 emplace(std::move(value));
150 return *this;
151 }
152
153 private:
154 template <typename U>
155 friend std::pair<OnceSender<U>, OnceReceiver<U>> MakeOnceSenderAndReceiver();
156 template <typename U>
157 friend void InitializeOnceSenderAndReceiver(OnceSender<U>& sender,
158 OnceReceiver<U>& receiver);
159 friend class OnceReceiver<T>;
160
OnceSender(OnceReceiver<T> * receiver)161 OnceSender(OnceReceiver<T>* receiver) : receiver_(receiver) {}
162
163 OnceReceiver<T>* receiver_ PW_GUARDED_BY(sender_receiver_lock()) = nullptr;
164 };
165
166 template <typename T>
~OnceReceiver()167 OnceReceiver<T>::~OnceReceiver() {
168 std::lock_guard lock(sender_receiver_lock());
169 if (sender_) {
170 sender_->receiver_ = nullptr;
171 }
172 }
173
174 /// Construct a pair of `OnceSender` and `OnceReceiver`.
175 template <typename T>
MakeOnceSenderAndReceiver()176 std::pair<OnceSender<T>, OnceReceiver<T>> MakeOnceSenderAndReceiver() {
177 std::pair<OnceSender<T>, OnceReceiver<T>> send_recv;
178 InitializeOnceSenderAndReceiver(send_recv.first, send_recv.second);
179 return send_recv;
180 }
181
182 /// Initialize a pair of `OnceSender` and `OnceReceiver`.
183 template <typename T>
InitializeOnceSenderAndReceiver(OnceSender<T> & sender,OnceReceiver<T> & receiver)184 void InitializeOnceSenderAndReceiver(OnceSender<T>& sender,
185 OnceReceiver<T>& receiver)
186 PW_NO_LOCK_SAFETY_ANALYSIS {
187 // Disable lock analysis because these are fresh sender/receiver pairs and
188 // do not require a lock to initialize;
189 receiver.sender_ = &sender;
190 sender.receiver_ = &receiver;
191 }
192
193 template <typename T>
194 class OnceRefSender;
195
196 /// `OnceRefReceiver` is notified when the paired `OnceRefSender` modifies a
197 /// reference. It must be constructed using `MakeOnceRefSenderAndReceiver()`.
198 /// `OnceRefReceiver::Pend()` is used to poll for completion by `OnceRefSender`.
199 /// `OnceRefReceiver` is thread safe and may be used on a different thread than
200 /// `OnceRefSender`. However, the referenced value must not be modified from the
201 /// time of construction until either `OnceRefReceiver::Pend()` returns
202 /// `Ready()` or either of `OnceRefReceiver` or `OnceRefSender` is destroyed.
203 template <typename T>
204 class OnceRefReceiver final {
205 public:
206 OnceRefReceiver() = default;
207
OnceRefReceiver(OnceRefReceiver && other)208 OnceRefReceiver(OnceRefReceiver&& other) {
209 std::lock_guard lock(sender_receiver_lock());
210 sender_ = other.sender_;
211 other.sender_ = nullptr;
212 if (sender_) {
213 sender_->receiver_ = this;
214 }
215 value_ = other.value_;
216 waker_ = std::move(other.waker_);
217 }
218
219 OnceRefReceiver(const OnceRefReceiver&) = delete;
220 OnceRefReceiver& operator=(const OnceRefReceiver&) = delete;
221 OnceRefReceiver& operator=(OnceRefReceiver&& other) = delete;
222
223 ~OnceRefReceiver();
224
225 /// Returns `Ready` with an `ok` status when the modification of the
226 /// reference is complete. If the sender is destroyed before updating the
227 /// reference, a `cancelled` status is returned.
Pend(Context & cx)228 Poll<Status> Pend(Context& cx) {
229 std::lock_guard lock(sender_receiver_lock());
230 if (value_ == nullptr) {
231 return Ready(OkStatus());
232 }
233 if (sender_ == nullptr) {
234 return Ready(Status::Cancelled());
235 }
236 PW_ASYNC_STORE_WAKER(
237 cx,
238 waker_,
239 "OnceRefReceiver is waiting for OnceRefSender to write a value");
240 return Pending();
241 }
242
243 private:
244 template <typename U>
245 friend std::pair<OnceRefSender<U>, OnceRefReceiver<U>>
246 MakeOnceRefSenderAndReceiver(U&);
247 template <typename U>
248 friend void InitializeOnceRefSenderAndReceiver(OnceRefSender<U>& sender,
249 OnceRefReceiver<U>& receiver,
250 U& value);
251 friend class OnceRefSender<T>;
252
OnceRefReceiver(T & value)253 OnceRefReceiver(T& value) : value_(&value) {}
254
255 // Pointer to the value to be modified. Set to `nullptr` once modification
256 // is complete.
257 T* value_ PW_GUARDED_BY(sender_receiver_lock()) = nullptr;
258 // Pointer to the modifier. Set to `nullptr` if the sender disappears.
259 OnceRefSender<T>* sender_ PW_GUARDED_BY(sender_receiver_lock()) = nullptr;
260 Waker waker_;
261 };
262
263 /// `OnceRefSender` mutates the reference received by the `OnceReceiver` it is
264 /// constructed with. It must be constructed using
265 /// `MakeOnceRefSenderAndReceiver`. `OnceRefSender` is thread safe and may be
266 /// used on a different thread than `OnceRefReceiver`.
267 template <typename T>
268 class OnceRefSender final {
269 public:
270 OnceRefSender() = default;
271
~OnceRefSender()272 ~OnceRefSender() {
273 std::lock_guard lock(sender_receiver_lock());
274 if (receiver_) {
275 receiver_->sender_ = nullptr;
276 std::move(receiver_->waker_).Wake();
277 }
278 }
279
OnceRefSender(OnceRefSender && other)280 OnceRefSender(OnceRefSender&& other) {
281 std::lock_guard lock(sender_receiver_lock());
282 receiver_ = other.receiver_;
283 other.receiver_ = nullptr;
284 if (receiver_) {
285 receiver_->sender_ = this;
286 }
287 }
288
289 OnceRefSender(const OnceRefSender&) = delete;
290 OnceRefSender& operator=(const OnceRefSender&) = delete;
291 OnceRefSender& operator=(OnceRefSender&& other) = delete;
292
293 /// Copy assigns the reference and awakens the receiver.
Set(const T & value)294 void Set(const T& value) {
295 std::lock_guard lock(sender_receiver_lock());
296 if (receiver_) {
297 *(receiver_->value_) = value;
298 std::move(receiver_->waker_).Wake();
299 receiver_->sender_ = nullptr;
300 receiver_->value_ = nullptr;
301 receiver_ = nullptr;
302 }
303 }
304
305 /// Move assigns the reference and awakens the receiver.
Set(T && value)306 void Set(T&& value) {
307 std::lock_guard lock(sender_receiver_lock());
308 if (receiver_) {
309 *(receiver_->value_) = std::move(value);
310 std::move(receiver_->waker_).Wake();
311 receiver_->sender_ = nullptr;
312 receiver_->value_ = nullptr;
313 receiver_ = nullptr;
314 }
315 }
316
317 /// Care must be taken not to save the reference passed to `func` or to call
318 /// any other Once*Sender/Once*Receiver APIs from within `func`. This should
319 /// be a simple modification. After all modifications are complete, `Commit`
320 /// should be called.
ModifyUnsafe(pw::Function<void (T &)> func)321 void ModifyUnsafe(pw::Function<void(T&)> func) {
322 std::lock_guard lock(sender_receiver_lock());
323 if (receiver_) {
324 // There is a risk of re-entrancy here if the user isn't careful.
325 func(*(receiver_->value_));
326 }
327 }
328
329 /// When using `ModifyUnsafe()`, call `Commit()` after all modifications have
330 /// been made to awaken the `OnceRefReceiver`.
Commit()331 void Commit() {
332 std::lock_guard lock(sender_receiver_lock());
333 if (receiver_) {
334 std::move(receiver_->waker_).Wake();
335 receiver_->sender_ = nullptr;
336 receiver_->value_ = nullptr;
337 receiver_ = nullptr;
338 }
339 }
340
341 private:
342 template <typename U>
343 friend std::pair<OnceRefSender<U>, OnceRefReceiver<U>>
344 MakeOnceRefSenderAndReceiver(U&);
345 template <typename U>
346 friend void InitializeOnceRefSenderAndReceiver(OnceRefSender<U>& sender,
347 OnceRefReceiver<U>& receiver,
348 U& value);
349 friend class OnceRefReceiver<T>;
350
OnceRefSender(OnceRefReceiver<T> * receiver)351 OnceRefSender(OnceRefReceiver<T>* receiver) : receiver_(receiver) {}
352
353 OnceRefReceiver<T>* receiver_ PW_GUARDED_BY(sender_receiver_lock()) = nullptr;
354 };
355
356 template <typename T>
~OnceRefReceiver()357 OnceRefReceiver<T>::~OnceRefReceiver() {
358 std::lock_guard lock(sender_receiver_lock());
359 if (sender_) {
360 sender_->receiver_ = nullptr;
361 }
362 }
363
364 /// Constructs a joined pair of `OnceRefSender` and `OnceRefReceiver`.
365 /// @param[in] value The reference to be mutated by the sender. It must mot be
366 /// read or modified until either `OnceRefSender` indicates `Ready()` or
367 /// either the `OnceRefSender` or `OnceRefReceiver` is destroyed.
368 template <typename T>
MakeOnceRefSenderAndReceiver(T & value)369 std::pair<OnceRefSender<T>, OnceRefReceiver<T>> MakeOnceRefSenderAndReceiver(
370 T& value) {
371 std::pair<OnceRefSender<T>, OnceRefReceiver<T>> send_recv;
372 InitializeOnceRefSenderAndReceiver(send_recv.first, send_recv.second, value);
373 return send_recv;
374 }
375
376 /// Initialize a pair of `OnceRefSender` and `OnceRefReceiver`.
377 /// @param[in] value The reference to be mutated by the sender. It must mot be
378 /// read or modified until either `OnceRefSender` indicates `Ready()` or
379 /// either the `OnceRefSender` or `OnceRefReceiver` is destroyed.
380 template <typename T>
InitializeOnceRefSenderAndReceiver(OnceRefSender<T> & sender,OnceRefReceiver<T> & receiver,T & value)381 void InitializeOnceRefSenderAndReceiver(OnceRefSender<T>& sender,
382 OnceRefReceiver<T>& receiver,
383 T& value) PW_NO_LOCK_SAFETY_ANALYSIS {
384 // Disable lock analysis because these are fresh sender/receiver pairs and
385 // do not require a lock to initialize;
386 receiver.sender_ = &sender;
387 receiver.value_ = &value;
388 sender.receiver_ = &receiver;
389 }
390
391 } // namespace pw::async2
392