xref: /aosp_15_r20/external/pigweed/pw_channel/forwarding_channel.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_channel/forwarding_channel.h"
16 
17 namespace pw::channel::internal {
18 
19 async2::Poll<Result<multibuf::MultiBuf>>
DoPendRead(async2::Context & cx)20 ForwardingChannel<DataType::kDatagram>::DoPendRead(async2::Context& cx)
21     PW_NO_LOCK_SAFETY_ANALYSIS {
22   std::lock_guard lock(pair_.mutex_);
23 
24   // Close this channel if the sibling is closed, but return any remaining data.
25   if (!sibling_.is_write_open()) {
26     set_read_closed();
27     if (!read_queue_.has_value()) {
28       return Status::FailedPrecondition();
29     }
30   } else if (!read_queue_.has_value()) {
31     PW_ASYNC_STORE_WAKER(
32         cx,
33         waker_,
34         "ForwardingChannel is waiting for incoming data from its peer");
35     return async2::Pending();
36   }
37 
38   auto read_data = std::move(*read_queue_);
39   read_queue_.reset();
40   std::move(sibling_.waker_).Wake();
41   return read_data;
42 }
43 
DoPendReadyToWrite(async2::Context & cx)44 async2::Poll<Status> ForwardingChannel<DataType::kDatagram>::DoPendReadyToWrite(
45     async2::Context& cx) PW_NO_LOCK_SAFETY_ANALYSIS {
46   std::lock_guard lock(pair_.mutex_);
47   if (sibling_.read_queue_.has_value()) {
48     PW_ASYNC_STORE_WAKER(
49         cx,
50         waker_,
51         "ForwardingChannel is waiting for its peer to read the data "
52         "it enqueued");
53     return async2::Pending();
54   }
55   return async2::Ready(OkStatus());
56 }
57 
DoStageWrite(multibuf::MultiBuf && data)58 Status ForwardingChannel<DataType::kDatagram>::DoStageWrite(
59     multibuf::MultiBuf&& data) PW_NO_LOCK_SAFETY_ANALYSIS {
60   std::lock_guard lock(pair_.mutex_);
61   PW_DASSERT(!sibling_.read_queue_.has_value());
62   sibling_.read_queue_ = std::move(data);
63 
64   std::move(sibling_.waker_).Wake();
65   return OkStatus();
66 }
67 
DoPendClose(async2::Context &)68 async2::Poll<Status> ForwardingChannel<DataType::kDatagram>::DoPendClose(
69     async2::Context&) PW_NO_LOCK_SAFETY_ANALYSIS {
70   std::lock_guard lock(pair_.mutex_);
71   sibling_.set_write_closed();  // No more writes from the other end
72   read_queue_.reset();
73   std::move(sibling_.waker_).Wake();
74   return OkStatus();
75 }
76 
77 async2::Poll<Result<multibuf::MultiBuf>>
DoPendRead(async2::Context & cx)78 ForwardingChannel<DataType::kByte>::DoPendRead(async2::Context& cx) {
79   std::lock_guard lock(pair_.mutex_);
80 
81   // Close this channel if the sibling is closed, but return any remaining data.
82   if (!sibling_.is_write_open()) {
83     set_read_closed();
84     if (read_queue_.empty()) {
85       return Status::FailedPrecondition();
86     }
87   } else if (read_queue_.empty()) {
88     PW_ASYNC_STORE_WAKER(
89         cx,
90         read_waker_,
91         "ForwardingChannel is waiting for incoming data from its peer");
92     return async2::Pending();
93   }
94 
95   auto read_data = std::move(read_queue_);
96   read_queue_ = {};
97   return read_data;
98 }
99 
DoStageWrite(multibuf::MultiBuf && data)100 Status ForwardingChannel<DataType::kByte>::DoStageWrite(
101     multibuf::MultiBuf&& data) PW_NO_LOCK_SAFETY_ANALYSIS {
102   std::lock_guard lock(pair_.mutex_);
103   if (data.empty()) {
104     return OkStatus();  // no data, nothing to do
105   }
106 
107   sibling_.read_queue_.PushSuffix(std::move(data));
108   std::move(sibling_.read_waker_).Wake();
109   return OkStatus();
110 }
111 
DoPendClose(async2::Context &)112 async2::Poll<Status> ForwardingChannel<DataType::kByte>::DoPendClose(
113     async2::Context&) PW_NO_LOCK_SAFETY_ANALYSIS {
114   std::lock_guard lock(pair_.mutex_);
115   sibling_.set_write_closed();  // No more writes from the other end
116   read_queue_.Release();
117   std::move(sibling_.read_waker_).Wake();
118   return OkStatus();
119 }
120 
121 }  // namespace pw::channel::internal
122