1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #pragma once
16
17 #include <lib/async/cpp/wait.h>
18 #include <lib/async/default.h>
19 #include <lib/fit/function.h>
20 #include <zircon/status.h>
21
22 #include <deque>
23 #include <utility>
24
25 #include "lib/zx/socket.h"
26 #include "pw_bluetooth_sapphire/internal/host/common/assert.h"
27 #include "pw_bluetooth_sapphire/internal/host/common/byte_buffer.h"
28 #include "pw_bluetooth_sapphire/internal/host/common/log.h"
29 #include "pw_bluetooth_sapphire/internal/host/common/macros.h"
30 #include "pw_bluetooth_sapphire/internal/host/common/trace.h"
31 #include "pw_bluetooth_sapphire/internal/host/common/weak_self.h"
32
33 namespace bt::socket {
34
35 // SocketChannelRelay relays data between a zx::socket and a Channel. This class
36 // should not be used directly. Instead, see SocketFactory.
37 template <typename ChannelT>
38 class SocketChannelRelay final {
39 public:
40 using DeactivationCallback = fit::function<void()>;
41
42 // The kernel allows up to ~256 KB in a socket buffer, which is enough for
43 // about 1 second of data at 2 Mbps. Until we have a use case that requires
44 // more than 1 second of buffering, we allow only a small amount of buffering
45 // within SocketChannelRelay itself.
46 static constexpr size_t kDefaultSocketWriteQueueLimitFrames = 2;
47
48 // Creates a SocketChannelRelay which executes on |dispatcher|. Note that
49 // |dispatcher| must be single-threaded.
50 //
51 // The relay works with SocketFactory to manage the relay's lifetime. On any
52 // of the "terminal events" (see below), the relay will invoke the
53 // DeactivationCallback. On invocation of the DeactivationCallback, the
54 // SocketFactory should destroy the relay. The destruction should be done
55 // synchronously, as a) destruction must happen on |dispatcher|'s thread, and
56 // b) the |dispatcher| may be shutting down.
57 //
58 // The terminal events are:
59 // * the zx::socket is closed
60 // * the Channel is closed
61 // * the dispatcher begins shutting down
62 //
63 // Note that requiring |dispatcher| to be single-threaded shouldn't cause
64 // increased latency vs. multi-threading, since a) all I/O is non-blocking (so
65 // we never leave the thread idle), and b) to provide in-order delivery,
66 // moving the data between the zx::socket and the ChannelT needs to be
67 // serialized even in the multi-threaded case.
68 SocketChannelRelay(zx::socket socket,
69 typename ChannelT::WeakPtr channel,
70 DeactivationCallback deactivation_cb,
71 size_t socket_write_queue_max_frames =
72 kDefaultSocketWriteQueueLimitFrames);
73 ~SocketChannelRelay();
74
75 // Enables read and close callbacks for the zx::socket and the
76 // ChannelT. (Write callbacks aren't necessary until we have data
77 // buffered.) Returns true on success.
78 //
79 // Activate() is guaranteed _not_ to invoke |deactivation_cb|, even in the
80 // event of failure. Instead, in the failure case, the caller should dispose
81 // of |this| directly.
82 [[nodiscard]] bool Activate();
83
84 private:
85 enum class RelayState {
86 kActivating,
87 kActivated,
88 kDeactivating,
89 kDeactivated,
90 };
91
92 // Deactivates and unbinds all callbacks from the zx::socket and the
93 // ChannelT. Drops any data still queued for transmission to the
94 // zx::socket. Ensures that the zx::socket is closed, and the ChannelT
95 // is deactivated. It is an error to call this when |state_ == kDeactivated|.
96 // ChannelT.
97 //
98 // Note that Deactivate() _may_ be called from the dtor. As such, this method
99 // avoids doing any "real work" (such as calling ServiceSocketWriteQueue()),
100 // and constrains itself to just tearing things down.
101 void Deactivate();
102
103 // Deactivates |this|, and invokes deactivation_cb_.
104 // It is an error to call this when |state_ == kDeactivated|.
105 void DeactivateAndRequestDestruction();
106
107 // Callbacks for zx::socket events.
108 void OnSocketReadable(zx_status_t status);
109 void OnSocketWritable(zx_status_t status);
110 void OnSocketClosed(zx_status_t status);
111
112 // Callbacks for ChannelT events.
113 void OnChannelDataReceived(ByteBufferPtr rx_data);
114 void OnChannelClosed();
115
116 // Copies any data currently available on |socket_| to |channel_|. Does not
117 // block for data on |socket_|, and does not retry failed writes to
118 // |channel_|. Returns true if we should attempt to read from this socket
119 // again, and false otherwise.
120 [[nodiscard]] bool CopyFromSocketToChannel();
121
122 // Copies any data pending in |socket_write_queue_| to |socket_|.
123 void ServiceSocketWriteQueue();
124
125 // Binds an async::Wait to a |handler|, but does not enable the wait.
126 // The handler will be wrapped in code that verifies that |this| has not begun
127 // destruction.
128 void BindWait(zx_signals_t trigger,
129 const char* wait_name,
130 async::Wait* wait,
131 fit::function<void(zx_status_t)> handler);
132
133 // Begins waiting on |wait|. Returns true on success.
134 // Note that it is safe to BeginWait() even after a socket operation has
135 // returned ZX_ERR_PEER_CLOSED. This is because "if the handle is closed, the
136 // operation will ... be terminated". (See zx_object_wait_async().)
137 bool BeginWait(const char* wait_name, async::Wait* wait);
138
139 // Clears |wait|'s handler, and cancels |wait|.
140 void UnbindAndCancelWait(async::Wait* wait);
141
142 // Get a trace_flow_id given a counter. Used to make sure trace flows are
143 // unique while they are active. Composed of channel UniqueId and the given
144 // counter, with the unique_id taking the top 32 bits.
145 trace_flow_id_t GetTraceId(uint32_t id);
146
147 RelayState state_; // Initial state is kActivating.
148
149 zx::socket socket_;
150 const typename ChannelT::WeakPtr channel_;
151 async_dispatcher_t* const dispatcher_;
152 DeactivationCallback deactivation_cb_;
153 const size_t socket_write_queue_max_frames_;
154
155 async::Wait sock_read_waiter_;
156 async::Wait sock_write_waiter_;
157 async::Wait sock_close_waiter_;
158
159 // Count of packets received from the peer on the channel.
160 uint32_t channel_rx_packet_count_ = 0u;
161 // Count of packets sent to the peer on the channel.
162 uint32_t channel_tx_packet_count_ = 0u;
163 // Count of packets sent to the socket (should match
164 // |channel_rx_packet_count_| normally)
165 uint32_t socket_packet_sent_count_ = 0u;
166 // Count of packets received from the socket (should match
167 // |channel_Tx_packet_count_| normally)
168 uint32_t socket_packet_recv_count_ = 0u;
169
170 // We use a std::deque here to minimize the number dynamic memory
171 // allocations (cf. std::list, which would require allocation on each
172 // SDU). This comes, however, at the cost of higher memory usage when the
173 // number of SDUs is small. (libc++ uses a minimum of 4KB per deque.)
174 //
175 // TODO(fxbug.dev/42150194): We should set an upper bound on the size of this
176 // queue.
177 std::deque<ByteBufferPtr> socket_write_queue_;
178
179 // Read buffer. This must be larger than the max_tx_sdu_size so that we can
180 // detect truncated datagrams.
181 DynamicByteBuffer read_buf_;
182
183 WeakSelf<SocketChannelRelay> weak_self_; // Keep last.
184
185 BT_DISALLOW_COPY_AND_ASSIGN_ALLOW_MOVE(SocketChannelRelay);
186 };
187
188 template <typename ChannelT>
SocketChannelRelay(zx::socket socket,typename ChannelT::WeakPtr channel,DeactivationCallback deactivation_cb,size_t socket_write_queue_max_frames)189 SocketChannelRelay<ChannelT>::SocketChannelRelay(
190 zx::socket socket,
191 typename ChannelT::WeakPtr channel,
192 DeactivationCallback deactivation_cb,
193 size_t socket_write_queue_max_frames)
194 : state_(RelayState::kActivating),
195 socket_(std::move(socket)),
196 channel_(std::move(channel)),
197 dispatcher_(async_get_default_dispatcher()),
198 deactivation_cb_(std::move(deactivation_cb)),
199 socket_write_queue_max_frames_(socket_write_queue_max_frames),
200 // Subtle: we make the read buffer larger than the TX MTU, so that we can
201 // detect truncated datagrams.
202 read_buf_(channel_->max_tx_sdu_size() + 1),
203 weak_self_(this) {
204 PW_CHECK(dispatcher_);
205 PW_CHECK(socket_);
206 PW_CHECK(channel_.is_alive());
207
208 // Note: binding |this| is safe, as BindWait() wraps the bound method inside
209 // of a lambda which verifies that |this| hasn't been destroyed.
210 BindWait(ZX_SOCKET_READABLE,
211 "socket read waiter",
212 &sock_read_waiter_,
213 fit::bind_member<&SocketChannelRelay::OnSocketReadable>(this));
214 BindWait(ZX_SOCKET_WRITE_THRESHOLD,
215 "socket write waiter",
216 &sock_write_waiter_,
217 fit::bind_member<&SocketChannelRelay::OnSocketWritable>(this));
218 BindWait(ZX_SOCKET_PEER_CLOSED,
219 "socket close waiter",
220 &sock_close_waiter_,
221 fit::bind_member<&SocketChannelRelay::OnSocketClosed>(this));
222 }
223
224 template <typename ChannelT>
~SocketChannelRelay()225 SocketChannelRelay<ChannelT>::~SocketChannelRelay() {
226 if (state_ != RelayState::kDeactivated) {
227 bt_log(TRACE,
228 "l2cap",
229 "Deactivating relay for channel %u in dtor",
230 channel_->id());
231 Deactivate();
232 }
233 }
234
235 template <typename ChannelT>
Activate()236 bool SocketChannelRelay<ChannelT>::Activate() {
237 PW_CHECK(state_ == RelayState::kActivating);
238
239 // Note: we assume that BeginWait() does not synchronously dispatch any
240 // events. The wait handler will assert otherwise.
241 if (!BeginWait("socket close waiter", &sock_close_waiter_)) {
242 // Perhaps |dispatcher| is already stopped.
243 return false;
244 }
245
246 if (!BeginWait("socket read waiter", &sock_read_waiter_)) {
247 // Perhaps |dispatcher| is already stopped.
248 return false;
249 }
250
251 const auto self = weak_self_.GetWeakPtr();
252 const auto channel_id = channel_->id();
253 const bool activate_success = channel_->Activate(
254 [self, channel_id](ByteBufferPtr rx_data) {
255 // Note: this lambda _may_ be invoked immediately for buffered packets.
256 if (self.is_alive()) {
257 self->OnChannelDataReceived(std::move(rx_data));
258 } else {
259 bt_log(TRACE,
260 "l2cap",
261 "Ignoring data received on destroyed relay (channel_id=%#.4x)",
262 channel_id);
263 }
264 },
265 [self, channel_id] {
266 if (self.is_alive()) {
267 self->OnChannelClosed();
268 } else {
269 bt_log(
270 TRACE,
271 "l2cap",
272 "Ignoring channel closure on destroyed relay (channel_id=%#.4x)",
273 channel_id);
274 }
275 });
276 if (!activate_success) {
277 return false;
278 }
279
280 state_ = RelayState::kActivated;
281 return true;
282 }
283
284 template <typename ChannelT>
Deactivate()285 void SocketChannelRelay<ChannelT>::Deactivate() {
286 PW_CHECK(state_ != RelayState::kDeactivated);
287
288 state_ = RelayState::kDeactivating;
289 if (!socket_write_queue_.empty()) {
290 bt_log(DEBUG,
291 "l2cap",
292 "Dropping %zu packets from channel %u due to channel closure",
293 socket_write_queue_.size(),
294 channel_->id());
295 socket_write_queue_.clear();
296 }
297 channel_->Deactivate();
298
299 // We assume that UnbindAndCancelWait() will not trigger a re-entrant call
300 // into Deactivate(). And the RelayIsDestroyedWhenDispatcherIsShutDown test
301 // verifies that to be the case. (If we had re-entrant calls, a
302 // PW_CHECK() in the lambda bound by BindWait() would cause an abort.)
303 UnbindAndCancelWait(&sock_read_waiter_);
304 UnbindAndCancelWait(&sock_write_waiter_);
305 UnbindAndCancelWait(&sock_close_waiter_);
306 socket_.reset();
307
308 // Any further callbacks are bugs. Update state_, to help us detect
309 // those bugs.
310 state_ = RelayState::kDeactivated;
311 }
312
313 template <typename ChannelT>
DeactivateAndRequestDestruction()314 void SocketChannelRelay<ChannelT>::DeactivateAndRequestDestruction() {
315 Deactivate();
316 if (deactivation_cb_) {
317 // NOTE: deactivation_cb_ is expected to destroy |this|. Since |this|
318 // owns deactivation_cb_, we move() deactivation_cb_ outside of |this|
319 // before invoking the callback.
320 auto moved_deactivation_cb = std::move(deactivation_cb_);
321 moved_deactivation_cb();
322 }
323 }
324
325 template <typename ChannelT>
OnSocketReadable(zx_status_t status)326 void SocketChannelRelay<ChannelT>::OnSocketReadable(zx_status_t status) {
327 PW_CHECK(state_ == RelayState::kActivated);
328 if (!CopyFromSocketToChannel() ||
329 !BeginWait("socket read waiter", &sock_read_waiter_)) {
330 DeactivateAndRequestDestruction();
331 }
332 }
333
334 template <typename ChannelT>
OnSocketWritable(zx_status_t status)335 void SocketChannelRelay<ChannelT>::OnSocketWritable(zx_status_t status) {
336 PW_CHECK(state_ == RelayState::kActivated);
337 if (socket_write_queue_.empty()) {
338 // The write queue may be emptied before this signal handler is called if
339 // the first packet in the write queue gets dropped and the subsequent
340 // smaller packets all fit into the socket. If canceling the wait fails,
341 // this handler may be called.
342 bt_log(
343 WARN,
344 "l2cap",
345 "socket_write_queue_ is empty in SocketChannelRelay::OnSocketWritable");
346 return;
347 }
348 ServiceSocketWriteQueue();
349 }
350
351 template <typename ChannelT>
OnSocketClosed(zx_status_t status)352 void SocketChannelRelay<ChannelT>::OnSocketClosed(zx_status_t status) {
353 PW_CHECK(state_ == RelayState::kActivated);
354 DeactivateAndRequestDestruction();
355 }
356
357 template <typename ChannelT>
OnChannelDataReceived(ByteBufferPtr rx_data)358 void SocketChannelRelay<ChannelT>::OnChannelDataReceived(
359 ByteBufferPtr rx_data) {
360 // Note: kActivating is deliberately permitted, as ChannelImpl::Activate()
361 // will synchronously deliver any queued frames.
362 PW_CHECK(state_ != RelayState::kDeactivated);
363 TRACE_DURATION("bluetooth",
364 "SocketChannelRelay::OnChannelDataReceived",
365 "channel id",
366 channel_->id());
367
368 if (state_ == RelayState::kDeactivating) {
369 bt_log(DEBUG,
370 "l2cap",
371 "Ignoring %s on socket for channel %u while deactivating",
372 __func__,
373 channel_->id());
374 return;
375 }
376
377 PW_CHECK(rx_data);
378 if (rx_data->size() == 0) {
379 bt_log(DEBUG,
380 "l2cap",
381 "Ignoring empty %s on socket on channel %u",
382 __func__,
383 channel_->id());
384 return;
385 }
386
387 PW_CHECK(socket_write_queue_.size() <= socket_write_queue_max_frames_);
388 // On a full queue, we drop the oldest element, on the theory that newer data
389 // is more useful. This should be true, e.g., for real-time applications such
390 // as voice calls. In the future, we may want to make the drop-head vs.
391 // drop-tail choice configurable.
392 if (socket_write_queue_.size() == socket_write_queue_max_frames_) {
393 // TODO(fxbug.dev/42082614): Add a metric for number of dropped frames.
394 socket_write_queue_.pop_front();
395 // Cancel the threshold wait, as the packet it corresponds to has been
396 // dropped. ServiceSocketWriteQueue() will start a new wait if necessary.
397 zx_status_t cancel_status = sock_write_waiter_.Cancel();
398 if (cancel_status != ZX_OK) {
399 bt_log(WARN,
400 "l2cap",
401 "failed to cancel sock_write_waiter_ with status: %s",
402 zx_status_get_string(cancel_status));
403 }
404 }
405
406 channel_rx_packet_count_++;
407 TRACE_FLOW_BEGIN("bluetooth",
408 "SocketChannelRelay::OnChannelDataReceived queued",
409 GetTraceId(channel_rx_packet_count_));
410 socket_write_queue_.push_back(std::move(rx_data));
411 ServiceSocketWriteQueue();
412 }
413
414 template <typename ChannelT>
OnChannelClosed()415 void SocketChannelRelay<ChannelT>::OnChannelClosed() {
416 PW_CHECK(state_ != RelayState::kActivating);
417 PW_CHECK(state_ != RelayState::kDeactivated);
418
419 if (state_ == RelayState::kDeactivating) {
420 bt_log(DEBUG,
421 "l2cap",
422 "Ignorning %s on socket for channel %u while deactivating",
423 __func__,
424 channel_->id());
425 return;
426 }
427
428 PW_CHECK(state_ == RelayState::kActivated);
429 if (!socket_write_queue_.empty()) {
430 ServiceSocketWriteQueue();
431 }
432 DeactivateAndRequestDestruction();
433 }
434
435 template <typename ChannelT>
CopyFromSocketToChannel()436 bool SocketChannelRelay<ChannelT>::CopyFromSocketToChannel() {
437 if (channel_->max_tx_sdu_size() > read_buf_.size()) {
438 read_buf_ = DynamicByteBuffer(channel_->max_tx_sdu_size() + 1);
439 }
440 // TODO(fxbug.dev/42153078): Consider yielding occasionally. As-is, we run the
441 // risk of starving other SocketChannelRelays on the same |dispatcher| (and
442 // anyone else on |dispatcher|), if a misbehaving process spams its
443 // zx::socket. And even if starvation isn't an issue, latency/jitter might be.
444 zx_status_t read_res;
445 do {
446 size_t n_bytes_read = 0;
447 read_res = socket_.read(
448 0, read_buf_.mutable_data(), read_buf_.size(), &n_bytes_read);
449 PW_CHECK(read_res == ZX_OK || read_res == ZX_ERR_SHOULD_WAIT ||
450 read_res == ZX_ERR_PEER_CLOSED,
451 "%s",
452 zx_status_get_string(read_res));
453 PW_CHECK(n_bytes_read <= read_buf_.size(),
454 "(n_bytes_read=%zu, read_buf_size=%zu)",
455 n_bytes_read,
456 read_buf_.size());
457 if (read_res == ZX_ERR_SHOULD_WAIT) {
458 return true;
459 }
460
461 if (read_res == ZX_ERR_PEER_CLOSED) {
462 bt_log(WARN,
463 "l2cap",
464 "Failed to read from socket for channel %u: %s",
465 channel_->id(),
466 zx_status_get_string(read_res));
467 return false;
468 }
469
470 PW_CHECK(n_bytes_read > 0);
471 socket_packet_recv_count_++;
472 if (n_bytes_read > channel_->max_tx_sdu_size()) {
473 bt_log(WARN,
474 "l2cap",
475 "Dropping %zu bytes for channel %u as max TX SDU is %u ",
476 n_bytes_read,
477 channel_->id(),
478 channel_->max_tx_sdu_size());
479 return false;
480 }
481
482 // TODO(fxbug.dev/42152967): For low latency and low jitter, IWBN to avoid
483 // allocating dynamic memory on every read.
484 bool write_success = channel_->Send(
485 std::make_unique<DynamicByteBuffer>(read_buf_.view(0, n_bytes_read)));
486 if (!write_success) {
487 bt_log(TRACE,
488 "l2cap",
489 "Failed to write %zu bytes to channel %u",
490 n_bytes_read,
491 channel_->id());
492 }
493 channel_tx_packet_count_++;
494 } while (read_res == ZX_OK);
495
496 return true;
497 }
498
499 template <typename ChannelT>
ServiceSocketWriteQueue()500 void SocketChannelRelay<ChannelT>::ServiceSocketWriteQueue() {
501 // TODO(fxbug.dev/42150083): Similarly to CopyFromSocketToChannel(), we may
502 // want to consider yielding occasionally. The data-rate from the Channel into
503 // the socket write queue should be bounded by PHY layer data rates, which are
504 // much lower than the CPU's data processing throughput, so starvation
505 // shouldn't be an issue. However, latency might be.
506 zx_status_t write_res;
507 TRACE_DURATION("bluetooth",
508 "SocketChannelRelay::ServiceSocketWriteQueue",
509 "channel_id",
510 channel_->id());
511 do {
512 PW_CHECK(!socket_write_queue_.empty());
513 PW_CHECK(socket_write_queue_.front());
514 TRACE_DURATION("bluetooth",
515 "SocketChannelRelay::ServiceSocketWriteQueue write",
516 "channel_id",
517 channel_->id());
518
519 const ByteBuffer& rx_data = *socket_write_queue_.front();
520 PW_CHECK(rx_data.size(), "Zero-length message on write queue");
521
522 socket_packet_sent_count_++;
523 TRACE_FLOW_END("bluetooth",
524 "SocketChannelRelay::OnChannelDataReceived queued",
525 GetTraceId(socket_packet_sent_count_));
526
527 // We probably need to make this unique across all profile sockets.
528 TRACE_FLOW_BEGIN("bluetooth", "ProfilePacket", socket_packet_sent_count_);
529
530 size_t n_bytes_written = 0;
531 write_res =
532 socket_.write(0, rx_data.data(), rx_data.size(), &n_bytes_written);
533 PW_CHECK(write_res == ZX_OK || write_res == ZX_ERR_SHOULD_WAIT ||
534 write_res == ZX_ERR_PEER_CLOSED,
535 "%s",
536 zx_status_get_string(write_res));
537 if (write_res != ZX_OK) {
538 PW_CHECK(n_bytes_written == 0);
539 bt_log(TRACE,
540 "l2cap",
541 "Failed to write %zu bytes to socket for channel %u: %s",
542 rx_data.size(),
543 channel_->id(),
544 zx_status_get_string(write_res));
545 break;
546 }
547 PW_CHECK(n_bytes_written == rx_data.size(),
548 "(n_bytes_written=%zu, rx_data.size()=%zu)",
549 n_bytes_written,
550 rx_data.size());
551 socket_write_queue_.pop_front();
552 } while (write_res == ZX_OK && !socket_write_queue_.empty());
553
554 if (!socket_write_queue_.empty() && write_res == ZX_ERR_SHOULD_WAIT) {
555 // Since we hava data to write, we want to be woken when the socket has free
556 // space in its buffer. And, to avoid spinning, we want to be woken only
557 // when the free space is large enough for our first pending buffer.
558 //
559 // Note: it is safe to leave TX_THRESHOLD set, even when our queue is empty,
560 // because we will only be woken if we also have an active Wait for
561 // ZX_SOCKET_WRITE_THRESHOLD, and Waits are one-shot.
562 const size_t rx_data_len = socket_write_queue_.front()->size();
563 const auto prop_set_res = socket_.set_property(
564 ZX_PROP_SOCKET_TX_THRESHOLD, &rx_data_len, sizeof(rx_data_len));
565 switch (prop_set_res) {
566 case ZX_OK:
567 if (!BeginWait("socket write waiter", &sock_write_waiter_)) {
568 DeactivateAndRequestDestruction();
569 }
570 break;
571 case ZX_ERR_PEER_CLOSED:
572 // Peer closed the socket after the while loop above. Nothing to do
573 // here, as closure event will be handled by OnSocketClosed().
574 break;
575 default:
576 BT_PANIC("Unexpected zx_object_set_property() result: %s",
577 zx_status_get_string(prop_set_res));
578 break;
579 }
580 }
581 }
582
583 template <typename ChannelT>
BindWait(zx_signals_t trigger,const char * wait_name,async::Wait * wait,fit::function<void (zx_status_t)> handler)584 void SocketChannelRelay<ChannelT>::BindWait(
585 zx_signals_t trigger,
586 const char* wait_name,
587 async::Wait* wait,
588 fit::function<void(zx_status_t)> handler) {
589 wait->set_object(socket_.get());
590 wait->set_trigger(trigger);
591 wait->set_handler(
592 [self = weak_self_.GetWeakPtr(),
593 channel_id = channel_->id(),
594 wait_name,
595 expected_wait = wait,
596 handler = std::move(handler)](async_dispatcher_t* actual_dispatcher,
597 async::WaitBase* actual_wait,
598 zx_status_t status,
599 const zx_packet_signal_t* signal) {
600 PW_CHECK(self.is_alive(), "(%s, channel_id=%u)", wait_name, channel_id);
601 PW_CHECK(actual_dispatcher == self->dispatcher_,
602 "(%s, channel_id=%u)",
603 wait_name,
604 channel_id);
605 PW_CHECK(actual_wait == expected_wait,
606 "(%s, channel_id=%u)",
607 wait_name,
608 channel_id);
609 PW_CHECK(status == ZX_OK || status == ZX_ERR_CANCELED,
610 "(%s, channel_id=%u)",
611 wait_name,
612 channel_id);
613
614 if (status == ZX_ERR_CANCELED) { // Dispatcher is shutting down.
615 bt_log(DEBUG,
616 "l2cap",
617 "%s canceled on socket for channel %u",
618 wait_name,
619 channel_id);
620 self->DeactivateAndRequestDestruction();
621 return;
622 }
623
624 PW_CHECK(signal, "(%s, channel_id=%u)", wait_name, channel_id);
625 PW_CHECK(signal->trigger == expected_wait->trigger(),
626 "(%s, channel_id=%u)",
627 wait_name,
628 channel_id);
629 PW_CHECK(self->state_ != RelayState::kActivating,
630 "(%s, channel_id=%u)",
631 wait_name,
632 channel_id);
633 PW_CHECK(self->state_ != RelayState::kDeactivated,
634 "(%s, channel_id=%u)",
635 wait_name,
636 channel_id);
637
638 if (self->state_ == RelayState::kDeactivating) {
639 bt_log(DEBUG,
640 "l2cap",
641 "Ignorning %s on socket for channel %u while deactivating",
642 wait_name,
643 channel_id);
644 return;
645 }
646 handler(status);
647 });
648 }
649
650 template <typename ChannelT>
BeginWait(const char * wait_name,async::Wait * wait)651 bool SocketChannelRelay<ChannelT>::BeginWait(const char* wait_name,
652 async::Wait* wait) {
653 PW_CHECK(state_ != RelayState::kDeactivating);
654 PW_CHECK(state_ != RelayState::kDeactivated);
655
656 if (wait->is_pending()) {
657 return true;
658 }
659
660 zx_status_t wait_res = wait->Begin(dispatcher_);
661 PW_CHECK(wait_res == ZX_OK || wait_res == ZX_ERR_BAD_STATE);
662
663 if (wait_res != ZX_OK) {
664 bt_log(ERROR,
665 "l2cap",
666 "Failed to enable waiting on %s: %s",
667 wait_name,
668 zx_status_get_string(wait_res));
669 return false;
670 }
671
672 return true;
673 }
674
675 template <typename ChannelT>
UnbindAndCancelWait(async::Wait * wait)676 void SocketChannelRelay<ChannelT>::UnbindAndCancelWait(async::Wait* wait) {
677 PW_CHECK(state_ != RelayState::kActivating);
678 PW_CHECK(state_ != RelayState::kDeactivated);
679 zx_status_t cancel_res;
680 wait->set_handler(nullptr);
681 cancel_res = wait->Cancel();
682 PW_CHECK(cancel_res == ZX_OK || cancel_res == ZX_ERR_NOT_FOUND,
683 "Cancel failed: %s",
684 zx_status_get_string(cancel_res));
685 }
686
687 template <typename ChannelT>
GetTraceId(uint32_t id)688 trace_flow_id_t SocketChannelRelay<ChannelT>::GetTraceId(uint32_t id) {
689 static_assert(sizeof(trace_flow_id_t) >=
690 (sizeof(typename ChannelT::UniqueId) + sizeof(uint32_t)),
691 "UniqueId needs to be small enough to make unique trace IDs");
692 return (static_cast<trace_flow_id_t>(channel_->unique_id())
693 << (sizeof(uint32_t) * CHAR_BIT)) |
694 id;
695 }
696
697 } // namespace bt::socket
698