1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #pragma once
16 
17 #include <lib/async/cpp/wait.h>
18 #include <lib/async/default.h>
19 #include <lib/fit/function.h>
20 #include <zircon/status.h>
21 
22 #include <deque>
23 #include <utility>
24 
25 #include "lib/zx/socket.h"
26 #include "pw_bluetooth_sapphire/internal/host/common/assert.h"
27 #include "pw_bluetooth_sapphire/internal/host/common/byte_buffer.h"
28 #include "pw_bluetooth_sapphire/internal/host/common/log.h"
29 #include "pw_bluetooth_sapphire/internal/host/common/macros.h"
30 #include "pw_bluetooth_sapphire/internal/host/common/trace.h"
31 #include "pw_bluetooth_sapphire/internal/host/common/weak_self.h"
32 
33 namespace bt::socket {
34 
35 // SocketChannelRelay relays data between a zx::socket and a Channel. This class
36 // should not be used directly. Instead, see SocketFactory.
37 template <typename ChannelT>
38 class SocketChannelRelay final {
39  public:
40   using DeactivationCallback = fit::function<void()>;
41 
42   // The kernel allows up to ~256 KB in a socket buffer, which is enough for
43   // about 1 second of data at 2 Mbps. Until we have a use case that requires
44   // more than 1 second of buffering, we allow only a small amount of buffering
45   // within SocketChannelRelay itself.
46   static constexpr size_t kDefaultSocketWriteQueueLimitFrames = 2;
47 
48   // Creates a SocketChannelRelay which executes on |dispatcher|. Note that
49   // |dispatcher| must be single-threaded.
50   //
51   // The relay works with SocketFactory to manage the relay's lifetime. On any
52   // of the "terminal events" (see below), the relay will invoke the
53   // DeactivationCallback. On invocation of the DeactivationCallback, the
54   // SocketFactory should destroy the relay. The destruction should be done
55   // synchronously, as a) destruction must happen on |dispatcher|'s thread, and
56   // b) the |dispatcher| may be shutting down.
57   //
58   // The terminal events are:
59   // * the zx::socket is closed
60   // * the Channel is closed
61   // * the dispatcher begins shutting down
62   //
63   // Note that requiring |dispatcher| to be single-threaded shouldn't cause
64   // increased latency vs. multi-threading, since a) all I/O is non-blocking (so
65   // we never leave the thread idle), and b) to provide in-order delivery,
66   // moving the data between the zx::socket and the ChannelT needs to be
67   // serialized even in the multi-threaded case.
68   SocketChannelRelay(zx::socket socket,
69                      typename ChannelT::WeakPtr channel,
70                      DeactivationCallback deactivation_cb,
71                      size_t socket_write_queue_max_frames =
72                          kDefaultSocketWriteQueueLimitFrames);
73   ~SocketChannelRelay();
74 
75   // Enables read and close callbacks for the zx::socket and the
76   // ChannelT. (Write callbacks aren't necessary until we have data
77   // buffered.) Returns true on success.
78   //
79   // Activate() is guaranteed _not_ to invoke |deactivation_cb|, even in the
80   // event of failure. Instead, in the failure case, the caller should dispose
81   // of |this| directly.
82   [[nodiscard]] bool Activate();
83 
84  private:
85   enum class RelayState {
86     kActivating,
87     kActivated,
88     kDeactivating,
89     kDeactivated,
90   };
91 
92   // Deactivates and unbinds all callbacks from the zx::socket and the
93   // ChannelT. Drops any data still queued for transmission to the
94   // zx::socket. Ensures that the zx::socket is closed, and the ChannelT
95   // is deactivated. It is an error to call this when |state_ == kDeactivated|.
96   // ChannelT.
97   //
98   // Note that Deactivate() _may_ be called from the dtor. As such, this method
99   // avoids doing any "real work" (such as calling ServiceSocketWriteQueue()),
100   // and constrains itself to just tearing things down.
101   void Deactivate();
102 
103   // Deactivates |this|, and invokes deactivation_cb_.
104   // It is an error to call this when |state_ == kDeactivated|.
105   void DeactivateAndRequestDestruction();
106 
107   // Callbacks for zx::socket events.
108   void OnSocketReadable(zx_status_t status);
109   void OnSocketWritable(zx_status_t status);
110   void OnSocketClosed(zx_status_t status);
111 
112   // Callbacks for ChannelT events.
113   void OnChannelDataReceived(ByteBufferPtr rx_data);
114   void OnChannelClosed();
115 
116   // Copies any data currently available on |socket_| to |channel_|. Does not
117   // block for data on |socket_|, and does not retry failed writes to
118   // |channel_|. Returns true if we should attempt to read from this socket
119   // again, and false otherwise.
120   [[nodiscard]] bool CopyFromSocketToChannel();
121 
122   // Copies any data pending in |socket_write_queue_| to |socket_|.
123   void ServiceSocketWriteQueue();
124 
125   // Binds an async::Wait to a |handler|, but does not enable the wait.
126   // The handler will be wrapped in code that verifies that |this| has not begun
127   // destruction.
128   void BindWait(zx_signals_t trigger,
129                 const char* wait_name,
130                 async::Wait* wait,
131                 fit::function<void(zx_status_t)> handler);
132 
133   // Begins waiting on |wait|. Returns true on success.
134   // Note that it is safe to BeginWait() even after a socket operation has
135   // returned ZX_ERR_PEER_CLOSED. This is because "if the handle is closed, the
136   // operation will ... be terminated". (See zx_object_wait_async().)
137   bool BeginWait(const char* wait_name, async::Wait* wait);
138 
139   // Clears |wait|'s handler, and cancels |wait|.
140   void UnbindAndCancelWait(async::Wait* wait);
141 
142   // Get a trace_flow_id given a counter. Used to make sure trace flows are
143   // unique while they are active. Composed of channel UniqueId and the given
144   // counter, with the unique_id taking the top 32 bits.
145   trace_flow_id_t GetTraceId(uint32_t id);
146 
147   RelayState state_;  // Initial state is kActivating.
148 
149   zx::socket socket_;
150   const typename ChannelT::WeakPtr channel_;
151   async_dispatcher_t* const dispatcher_;
152   DeactivationCallback deactivation_cb_;
153   const size_t socket_write_queue_max_frames_;
154 
155   async::Wait sock_read_waiter_;
156   async::Wait sock_write_waiter_;
157   async::Wait sock_close_waiter_;
158 
159   // Count of packets received from the peer on the channel.
160   uint32_t channel_rx_packet_count_ = 0u;
161   // Count of packets sent to the peer on the channel.
162   uint32_t channel_tx_packet_count_ = 0u;
163   // Count of packets sent to the socket (should match
164   // |channel_rx_packet_count_| normally)
165   uint32_t socket_packet_sent_count_ = 0u;
166   // Count of packets received from the socket (should match
167   // |channel_Tx_packet_count_| normally)
168   uint32_t socket_packet_recv_count_ = 0u;
169 
170   // We use a std::deque here to minimize the number dynamic memory
171   // allocations (cf. std::list, which would require allocation on each
172   // SDU). This comes, however, at the cost of higher memory usage when the
173   // number of SDUs is small. (libc++ uses a minimum of 4KB per deque.)
174   //
175   // TODO(fxbug.dev/42150194): We should set an upper bound on the size of this
176   // queue.
177   std::deque<ByteBufferPtr> socket_write_queue_;
178 
179   // Read buffer. This must be larger than the max_tx_sdu_size so that we can
180   // detect truncated datagrams.
181   DynamicByteBuffer read_buf_;
182 
183   WeakSelf<SocketChannelRelay> weak_self_;  // Keep last.
184 
185   BT_DISALLOW_COPY_AND_ASSIGN_ALLOW_MOVE(SocketChannelRelay);
186 };
187 
188 template <typename ChannelT>
SocketChannelRelay(zx::socket socket,typename ChannelT::WeakPtr channel,DeactivationCallback deactivation_cb,size_t socket_write_queue_max_frames)189 SocketChannelRelay<ChannelT>::SocketChannelRelay(
190     zx::socket socket,
191     typename ChannelT::WeakPtr channel,
192     DeactivationCallback deactivation_cb,
193     size_t socket_write_queue_max_frames)
194     : state_(RelayState::kActivating),
195       socket_(std::move(socket)),
196       channel_(std::move(channel)),
197       dispatcher_(async_get_default_dispatcher()),
198       deactivation_cb_(std::move(deactivation_cb)),
199       socket_write_queue_max_frames_(socket_write_queue_max_frames),
200       // Subtle: we make the read buffer larger than the TX MTU, so that we can
201       // detect truncated datagrams.
202       read_buf_(channel_->max_tx_sdu_size() + 1),
203       weak_self_(this) {
204   PW_CHECK(dispatcher_);
205   PW_CHECK(socket_);
206   PW_CHECK(channel_.is_alive());
207 
208   // Note: binding |this| is safe, as BindWait() wraps the bound method inside
209   // of a lambda which verifies that |this| hasn't been destroyed.
210   BindWait(ZX_SOCKET_READABLE,
211            "socket read waiter",
212            &sock_read_waiter_,
213            fit::bind_member<&SocketChannelRelay::OnSocketReadable>(this));
214   BindWait(ZX_SOCKET_WRITE_THRESHOLD,
215            "socket write waiter",
216            &sock_write_waiter_,
217            fit::bind_member<&SocketChannelRelay::OnSocketWritable>(this));
218   BindWait(ZX_SOCKET_PEER_CLOSED,
219            "socket close waiter",
220            &sock_close_waiter_,
221            fit::bind_member<&SocketChannelRelay::OnSocketClosed>(this));
222 }
223 
224 template <typename ChannelT>
~SocketChannelRelay()225 SocketChannelRelay<ChannelT>::~SocketChannelRelay() {
226   if (state_ != RelayState::kDeactivated) {
227     bt_log(TRACE,
228            "l2cap",
229            "Deactivating relay for channel %u in dtor",
230            channel_->id());
231     Deactivate();
232   }
233 }
234 
235 template <typename ChannelT>
Activate()236 bool SocketChannelRelay<ChannelT>::Activate() {
237   PW_CHECK(state_ == RelayState::kActivating);
238 
239   // Note: we assume that BeginWait() does not synchronously dispatch any
240   // events. The wait handler will assert otherwise.
241   if (!BeginWait("socket close waiter", &sock_close_waiter_)) {
242     // Perhaps |dispatcher| is already stopped.
243     return false;
244   }
245 
246   if (!BeginWait("socket read waiter", &sock_read_waiter_)) {
247     // Perhaps |dispatcher| is already stopped.
248     return false;
249   }
250 
251   const auto self = weak_self_.GetWeakPtr();
252   const auto channel_id = channel_->id();
253   const bool activate_success = channel_->Activate(
254       [self, channel_id](ByteBufferPtr rx_data) {
255         // Note: this lambda _may_ be invoked immediately for buffered packets.
256         if (self.is_alive()) {
257           self->OnChannelDataReceived(std::move(rx_data));
258         } else {
259           bt_log(TRACE,
260                  "l2cap",
261                  "Ignoring data received on destroyed relay (channel_id=%#.4x)",
262                  channel_id);
263         }
264       },
265       [self, channel_id] {
266         if (self.is_alive()) {
267           self->OnChannelClosed();
268         } else {
269           bt_log(
270               TRACE,
271               "l2cap",
272               "Ignoring channel closure on destroyed relay (channel_id=%#.4x)",
273               channel_id);
274         }
275       });
276   if (!activate_success) {
277     return false;
278   }
279 
280   state_ = RelayState::kActivated;
281   return true;
282 }
283 
284 template <typename ChannelT>
Deactivate()285 void SocketChannelRelay<ChannelT>::Deactivate() {
286   PW_CHECK(state_ != RelayState::kDeactivated);
287 
288   state_ = RelayState::kDeactivating;
289   if (!socket_write_queue_.empty()) {
290     bt_log(DEBUG,
291            "l2cap",
292            "Dropping %zu packets from channel %u due to channel closure",
293            socket_write_queue_.size(),
294            channel_->id());
295     socket_write_queue_.clear();
296   }
297   channel_->Deactivate();
298 
299   // We assume that UnbindAndCancelWait() will not trigger a re-entrant call
300   // into Deactivate(). And the RelayIsDestroyedWhenDispatcherIsShutDown test
301   // verifies that to be the case. (If we had re-entrant calls, a
302   // PW_CHECK() in the lambda bound by BindWait() would cause an abort.)
303   UnbindAndCancelWait(&sock_read_waiter_);
304   UnbindAndCancelWait(&sock_write_waiter_);
305   UnbindAndCancelWait(&sock_close_waiter_);
306   socket_.reset();
307 
308   // Any further callbacks are bugs. Update state_, to help us detect
309   // those bugs.
310   state_ = RelayState::kDeactivated;
311 }
312 
313 template <typename ChannelT>
DeactivateAndRequestDestruction()314 void SocketChannelRelay<ChannelT>::DeactivateAndRequestDestruction() {
315   Deactivate();
316   if (deactivation_cb_) {
317     // NOTE: deactivation_cb_ is expected to destroy |this|. Since |this|
318     // owns deactivation_cb_, we move() deactivation_cb_ outside of |this|
319     // before invoking the callback.
320     auto moved_deactivation_cb = std::move(deactivation_cb_);
321     moved_deactivation_cb();
322   }
323 }
324 
325 template <typename ChannelT>
OnSocketReadable(zx_status_t status)326 void SocketChannelRelay<ChannelT>::OnSocketReadable(zx_status_t status) {
327   PW_CHECK(state_ == RelayState::kActivated);
328   if (!CopyFromSocketToChannel() ||
329       !BeginWait("socket read waiter", &sock_read_waiter_)) {
330     DeactivateAndRequestDestruction();
331   }
332 }
333 
334 template <typename ChannelT>
OnSocketWritable(zx_status_t status)335 void SocketChannelRelay<ChannelT>::OnSocketWritable(zx_status_t status) {
336   PW_CHECK(state_ == RelayState::kActivated);
337   if (socket_write_queue_.empty()) {
338     // The write queue may be emptied before this signal handler is called if
339     // the first packet in the write queue gets dropped and the subsequent
340     // smaller packets all fit into the socket. If canceling the wait fails,
341     // this handler may be called.
342     bt_log(
343         WARN,
344         "l2cap",
345         "socket_write_queue_ is empty in SocketChannelRelay::OnSocketWritable");
346     return;
347   }
348   ServiceSocketWriteQueue();
349 }
350 
351 template <typename ChannelT>
OnSocketClosed(zx_status_t status)352 void SocketChannelRelay<ChannelT>::OnSocketClosed(zx_status_t status) {
353   PW_CHECK(state_ == RelayState::kActivated);
354   DeactivateAndRequestDestruction();
355 }
356 
357 template <typename ChannelT>
OnChannelDataReceived(ByteBufferPtr rx_data)358 void SocketChannelRelay<ChannelT>::OnChannelDataReceived(
359     ByteBufferPtr rx_data) {
360   // Note: kActivating is deliberately permitted, as ChannelImpl::Activate()
361   // will synchronously deliver any queued frames.
362   PW_CHECK(state_ != RelayState::kDeactivated);
363   TRACE_DURATION("bluetooth",
364                  "SocketChannelRelay::OnChannelDataReceived",
365                  "channel id",
366                  channel_->id());
367 
368   if (state_ == RelayState::kDeactivating) {
369     bt_log(DEBUG,
370            "l2cap",
371            "Ignoring %s on socket for channel %u while deactivating",
372            __func__,
373            channel_->id());
374     return;
375   }
376 
377   PW_CHECK(rx_data);
378   if (rx_data->size() == 0) {
379     bt_log(DEBUG,
380            "l2cap",
381            "Ignoring empty %s on socket on channel %u",
382            __func__,
383            channel_->id());
384     return;
385   }
386 
387   PW_CHECK(socket_write_queue_.size() <= socket_write_queue_max_frames_);
388   // On a full queue, we drop the oldest element, on the theory that newer data
389   // is more useful. This should be true, e.g., for real-time applications such
390   // as voice calls. In the future, we may want to make the drop-head vs.
391   // drop-tail choice configurable.
392   if (socket_write_queue_.size() == socket_write_queue_max_frames_) {
393     // TODO(fxbug.dev/42082614): Add a metric for number of dropped frames.
394     socket_write_queue_.pop_front();
395     // Cancel the threshold wait, as the packet it corresponds to has been
396     // dropped. ServiceSocketWriteQueue() will start a new wait if necessary.
397     zx_status_t cancel_status = sock_write_waiter_.Cancel();
398     if (cancel_status != ZX_OK) {
399       bt_log(WARN,
400              "l2cap",
401              "failed to cancel sock_write_waiter_ with status: %s",
402              zx_status_get_string(cancel_status));
403     }
404   }
405 
406   channel_rx_packet_count_++;
407   TRACE_FLOW_BEGIN("bluetooth",
408                    "SocketChannelRelay::OnChannelDataReceived queued",
409                    GetTraceId(channel_rx_packet_count_));
410   socket_write_queue_.push_back(std::move(rx_data));
411   ServiceSocketWriteQueue();
412 }
413 
414 template <typename ChannelT>
OnChannelClosed()415 void SocketChannelRelay<ChannelT>::OnChannelClosed() {
416   PW_CHECK(state_ != RelayState::kActivating);
417   PW_CHECK(state_ != RelayState::kDeactivated);
418 
419   if (state_ == RelayState::kDeactivating) {
420     bt_log(DEBUG,
421            "l2cap",
422            "Ignorning %s on socket for channel %u while deactivating",
423            __func__,
424            channel_->id());
425     return;
426   }
427 
428   PW_CHECK(state_ == RelayState::kActivated);
429   if (!socket_write_queue_.empty()) {
430     ServiceSocketWriteQueue();
431   }
432   DeactivateAndRequestDestruction();
433 }
434 
435 template <typename ChannelT>
CopyFromSocketToChannel()436 bool SocketChannelRelay<ChannelT>::CopyFromSocketToChannel() {
437   if (channel_->max_tx_sdu_size() > read_buf_.size()) {
438     read_buf_ = DynamicByteBuffer(channel_->max_tx_sdu_size() + 1);
439   }
440   // TODO(fxbug.dev/42153078): Consider yielding occasionally. As-is, we run the
441   // risk of starving other SocketChannelRelays on the same |dispatcher| (and
442   // anyone else on |dispatcher|), if a misbehaving process spams its
443   // zx::socket. And even if starvation isn't an issue, latency/jitter might be.
444   zx_status_t read_res;
445   do {
446     size_t n_bytes_read = 0;
447     read_res = socket_.read(
448         0, read_buf_.mutable_data(), read_buf_.size(), &n_bytes_read);
449     PW_CHECK(read_res == ZX_OK || read_res == ZX_ERR_SHOULD_WAIT ||
450                  read_res == ZX_ERR_PEER_CLOSED,
451              "%s",
452              zx_status_get_string(read_res));
453     PW_CHECK(n_bytes_read <= read_buf_.size(),
454              "(n_bytes_read=%zu, read_buf_size=%zu)",
455              n_bytes_read,
456              read_buf_.size());
457     if (read_res == ZX_ERR_SHOULD_WAIT) {
458       return true;
459     }
460 
461     if (read_res == ZX_ERR_PEER_CLOSED) {
462       bt_log(WARN,
463              "l2cap",
464              "Failed to read from socket for channel %u: %s",
465              channel_->id(),
466              zx_status_get_string(read_res));
467       return false;
468     }
469 
470     PW_CHECK(n_bytes_read > 0);
471     socket_packet_recv_count_++;
472     if (n_bytes_read > channel_->max_tx_sdu_size()) {
473       bt_log(WARN,
474              "l2cap",
475              "Dropping %zu bytes for channel %u as max TX SDU is %u ",
476              n_bytes_read,
477              channel_->id(),
478              channel_->max_tx_sdu_size());
479       return false;
480     }
481 
482     // TODO(fxbug.dev/42152967): For low latency and low jitter, IWBN to avoid
483     // allocating dynamic memory on every read.
484     bool write_success = channel_->Send(
485         std::make_unique<DynamicByteBuffer>(read_buf_.view(0, n_bytes_read)));
486     if (!write_success) {
487       bt_log(TRACE,
488              "l2cap",
489              "Failed to write %zu bytes to channel %u",
490              n_bytes_read,
491              channel_->id());
492     }
493     channel_tx_packet_count_++;
494   } while (read_res == ZX_OK);
495 
496   return true;
497 }
498 
499 template <typename ChannelT>
ServiceSocketWriteQueue()500 void SocketChannelRelay<ChannelT>::ServiceSocketWriteQueue() {
501   // TODO(fxbug.dev/42150083): Similarly to CopyFromSocketToChannel(), we may
502   // want to consider yielding occasionally. The data-rate from the Channel into
503   // the socket write queue should be bounded by PHY layer data rates, which are
504   // much lower than the CPU's data processing throughput, so starvation
505   // shouldn't be an issue. However, latency might be.
506   zx_status_t write_res;
507   TRACE_DURATION("bluetooth",
508                  "SocketChannelRelay::ServiceSocketWriteQueue",
509                  "channel_id",
510                  channel_->id());
511   do {
512     PW_CHECK(!socket_write_queue_.empty());
513     PW_CHECK(socket_write_queue_.front());
514     TRACE_DURATION("bluetooth",
515                    "SocketChannelRelay::ServiceSocketWriteQueue write",
516                    "channel_id",
517                    channel_->id());
518 
519     const ByteBuffer& rx_data = *socket_write_queue_.front();
520     PW_CHECK(rx_data.size(), "Zero-length message on write queue");
521 
522     socket_packet_sent_count_++;
523     TRACE_FLOW_END("bluetooth",
524                    "SocketChannelRelay::OnChannelDataReceived queued",
525                    GetTraceId(socket_packet_sent_count_));
526 
527     // We probably need to make this unique across all profile sockets.
528     TRACE_FLOW_BEGIN("bluetooth", "ProfilePacket", socket_packet_sent_count_);
529 
530     size_t n_bytes_written = 0;
531     write_res =
532         socket_.write(0, rx_data.data(), rx_data.size(), &n_bytes_written);
533     PW_CHECK(write_res == ZX_OK || write_res == ZX_ERR_SHOULD_WAIT ||
534                  write_res == ZX_ERR_PEER_CLOSED,
535              "%s",
536              zx_status_get_string(write_res));
537     if (write_res != ZX_OK) {
538       PW_CHECK(n_bytes_written == 0);
539       bt_log(TRACE,
540              "l2cap",
541              "Failed to write %zu bytes to socket for channel %u: %s",
542              rx_data.size(),
543              channel_->id(),
544              zx_status_get_string(write_res));
545       break;
546     }
547     PW_CHECK(n_bytes_written == rx_data.size(),
548              "(n_bytes_written=%zu, rx_data.size()=%zu)",
549              n_bytes_written,
550              rx_data.size());
551     socket_write_queue_.pop_front();
552   } while (write_res == ZX_OK && !socket_write_queue_.empty());
553 
554   if (!socket_write_queue_.empty() && write_res == ZX_ERR_SHOULD_WAIT) {
555     // Since we hava data to write, we want to be woken when the socket has free
556     // space in its buffer. And, to avoid spinning, we want to be woken only
557     // when the free space is large enough for our first pending buffer.
558     //
559     // Note: it is safe to leave TX_THRESHOLD set, even when our queue is empty,
560     // because we will only be woken if we also have an active Wait for
561     // ZX_SOCKET_WRITE_THRESHOLD, and Waits are one-shot.
562     const size_t rx_data_len = socket_write_queue_.front()->size();
563     const auto prop_set_res = socket_.set_property(
564         ZX_PROP_SOCKET_TX_THRESHOLD, &rx_data_len, sizeof(rx_data_len));
565     switch (prop_set_res) {
566       case ZX_OK:
567         if (!BeginWait("socket write waiter", &sock_write_waiter_)) {
568           DeactivateAndRequestDestruction();
569         }
570         break;
571       case ZX_ERR_PEER_CLOSED:
572         // Peer closed the socket after the while loop above. Nothing to do
573         // here, as closure event will be handled by OnSocketClosed().
574         break;
575       default:
576         BT_PANIC("Unexpected zx_object_set_property() result: %s",
577                  zx_status_get_string(prop_set_res));
578         break;
579     }
580   }
581 }
582 
583 template <typename ChannelT>
BindWait(zx_signals_t trigger,const char * wait_name,async::Wait * wait,fit::function<void (zx_status_t)> handler)584 void SocketChannelRelay<ChannelT>::BindWait(
585     zx_signals_t trigger,
586     const char* wait_name,
587     async::Wait* wait,
588     fit::function<void(zx_status_t)> handler) {
589   wait->set_object(socket_.get());
590   wait->set_trigger(trigger);
591   wait->set_handler(
592       [self = weak_self_.GetWeakPtr(),
593        channel_id = channel_->id(),
594        wait_name,
595        expected_wait = wait,
596        handler = std::move(handler)](async_dispatcher_t* actual_dispatcher,
597                                      async::WaitBase* actual_wait,
598                                      zx_status_t status,
599                                      const zx_packet_signal_t* signal) {
600         PW_CHECK(self.is_alive(), "(%s, channel_id=%u)", wait_name, channel_id);
601         PW_CHECK(actual_dispatcher == self->dispatcher_,
602                  "(%s, channel_id=%u)",
603                  wait_name,
604                  channel_id);
605         PW_CHECK(actual_wait == expected_wait,
606                  "(%s, channel_id=%u)",
607                  wait_name,
608                  channel_id);
609         PW_CHECK(status == ZX_OK || status == ZX_ERR_CANCELED,
610                  "(%s, channel_id=%u)",
611                  wait_name,
612                  channel_id);
613 
614         if (status == ZX_ERR_CANCELED) {  // Dispatcher is shutting down.
615           bt_log(DEBUG,
616                  "l2cap",
617                  "%s canceled on socket for channel %u",
618                  wait_name,
619                  channel_id);
620           self->DeactivateAndRequestDestruction();
621           return;
622         }
623 
624         PW_CHECK(signal, "(%s, channel_id=%u)", wait_name, channel_id);
625         PW_CHECK(signal->trigger == expected_wait->trigger(),
626                  "(%s, channel_id=%u)",
627                  wait_name,
628                  channel_id);
629         PW_CHECK(self->state_ != RelayState::kActivating,
630                  "(%s, channel_id=%u)",
631                  wait_name,
632                  channel_id);
633         PW_CHECK(self->state_ != RelayState::kDeactivated,
634                  "(%s, channel_id=%u)",
635                  wait_name,
636                  channel_id);
637 
638         if (self->state_ == RelayState::kDeactivating) {
639           bt_log(DEBUG,
640                  "l2cap",
641                  "Ignorning %s on socket for channel %u while deactivating",
642                  wait_name,
643                  channel_id);
644           return;
645         }
646         handler(status);
647       });
648 }
649 
650 template <typename ChannelT>
BeginWait(const char * wait_name,async::Wait * wait)651 bool SocketChannelRelay<ChannelT>::BeginWait(const char* wait_name,
652                                              async::Wait* wait) {
653   PW_CHECK(state_ != RelayState::kDeactivating);
654   PW_CHECK(state_ != RelayState::kDeactivated);
655 
656   if (wait->is_pending()) {
657     return true;
658   }
659 
660   zx_status_t wait_res = wait->Begin(dispatcher_);
661   PW_CHECK(wait_res == ZX_OK || wait_res == ZX_ERR_BAD_STATE);
662 
663   if (wait_res != ZX_OK) {
664     bt_log(ERROR,
665            "l2cap",
666            "Failed to enable waiting on %s: %s",
667            wait_name,
668            zx_status_get_string(wait_res));
669     return false;
670   }
671 
672   return true;
673 }
674 
675 template <typename ChannelT>
UnbindAndCancelWait(async::Wait * wait)676 void SocketChannelRelay<ChannelT>::UnbindAndCancelWait(async::Wait* wait) {
677   PW_CHECK(state_ != RelayState::kActivating);
678   PW_CHECK(state_ != RelayState::kDeactivated);
679   zx_status_t cancel_res;
680   wait->set_handler(nullptr);
681   cancel_res = wait->Cancel();
682   PW_CHECK(cancel_res == ZX_OK || cancel_res == ZX_ERR_NOT_FOUND,
683            "Cancel failed: %s",
684            zx_status_get_string(cancel_res));
685 }
686 
687 template <typename ChannelT>
GetTraceId(uint32_t id)688 trace_flow_id_t SocketChannelRelay<ChannelT>::GetTraceId(uint32_t id) {
689   static_assert(sizeof(trace_flow_id_t) >=
690                     (sizeof(typename ChannelT::UniqueId) + sizeof(uint32_t)),
691                 "UniqueId needs to be small enough to make unique trace IDs");
692   return (static_cast<trace_flow_id_t>(channel_->unique_id())
693           << (sizeof(uint32_t) * CHAR_BIT)) |
694          id;
695 }
696 
697 }  // namespace bt::socket
698