1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_channel/forwarding_channel.h"
16
17 namespace pw::channel::internal {
18
19 async2::Poll<Result<multibuf::MultiBuf>>
DoPendRead(async2::Context & cx)20 ForwardingChannel<DataType::kDatagram>::DoPendRead(async2::Context& cx)
21 PW_NO_LOCK_SAFETY_ANALYSIS {
22 std::lock_guard lock(pair_.mutex_);
23
24 // Close this channel if the sibling is closed, but return any remaining data.
25 if (!sibling_.is_write_open()) {
26 set_read_closed();
27 if (!read_queue_.has_value()) {
28 return Status::FailedPrecondition();
29 }
30 } else if (!read_queue_.has_value()) {
31 PW_ASYNC_STORE_WAKER(
32 cx,
33 waker_,
34 "ForwardingChannel is waiting for incoming data from its peer");
35 return async2::Pending();
36 }
37
38 auto read_data = std::move(*read_queue_);
39 read_queue_.reset();
40 std::move(sibling_.waker_).Wake();
41 return read_data;
42 }
43
DoPendReadyToWrite(async2::Context & cx)44 async2::Poll<Status> ForwardingChannel<DataType::kDatagram>::DoPendReadyToWrite(
45 async2::Context& cx) PW_NO_LOCK_SAFETY_ANALYSIS {
46 std::lock_guard lock(pair_.mutex_);
47 if (sibling_.read_queue_.has_value()) {
48 PW_ASYNC_STORE_WAKER(
49 cx,
50 waker_,
51 "ForwardingChannel is waiting for its peer to read the data "
52 "it enqueued");
53 return async2::Pending();
54 }
55 return async2::Ready(OkStatus());
56 }
57
DoStageWrite(multibuf::MultiBuf && data)58 Status ForwardingChannel<DataType::kDatagram>::DoStageWrite(
59 multibuf::MultiBuf&& data) PW_NO_LOCK_SAFETY_ANALYSIS {
60 std::lock_guard lock(pair_.mutex_);
61 PW_DASSERT(!sibling_.read_queue_.has_value());
62 sibling_.read_queue_ = std::move(data);
63
64 std::move(sibling_.waker_).Wake();
65 return OkStatus();
66 }
67
DoPendClose(async2::Context &)68 async2::Poll<Status> ForwardingChannel<DataType::kDatagram>::DoPendClose(
69 async2::Context&) PW_NO_LOCK_SAFETY_ANALYSIS {
70 std::lock_guard lock(pair_.mutex_);
71 sibling_.set_write_closed(); // No more writes from the other end
72 read_queue_.reset();
73 std::move(sibling_.waker_).Wake();
74 return OkStatus();
75 }
76
77 async2::Poll<Result<multibuf::MultiBuf>>
DoPendRead(async2::Context & cx)78 ForwardingChannel<DataType::kByte>::DoPendRead(async2::Context& cx) {
79 std::lock_guard lock(pair_.mutex_);
80
81 // Close this channel if the sibling is closed, but return any remaining data.
82 if (!sibling_.is_write_open()) {
83 set_read_closed();
84 if (read_queue_.empty()) {
85 return Status::FailedPrecondition();
86 }
87 } else if (read_queue_.empty()) {
88 PW_ASYNC_STORE_WAKER(
89 cx,
90 read_waker_,
91 "ForwardingChannel is waiting for incoming data from its peer");
92 return async2::Pending();
93 }
94
95 auto read_data = std::move(read_queue_);
96 read_queue_ = {};
97 return read_data;
98 }
99
DoStageWrite(multibuf::MultiBuf && data)100 Status ForwardingChannel<DataType::kByte>::DoStageWrite(
101 multibuf::MultiBuf&& data) PW_NO_LOCK_SAFETY_ANALYSIS {
102 std::lock_guard lock(pair_.mutex_);
103 if (data.empty()) {
104 return OkStatus(); // no data, nothing to do
105 }
106
107 sibling_.read_queue_.PushSuffix(std::move(data));
108 std::move(sibling_.read_waker_).Wake();
109 return OkStatus();
110 }
111
DoPendClose(async2::Context &)112 async2::Poll<Status> ForwardingChannel<DataType::kByte>::DoPendClose(
113 async2::Context&) PW_NO_LOCK_SAFETY_ANALYSIS {
114 std::lock_guard lock(pair_.mutex_);
115 sibling_.set_write_closed(); // No more writes from the other end
116 read_queue_.Release();
117 std::move(sibling_.read_waker_).Wake();
118 return OkStatus();
119 }
120
121 } // namespace pw::channel::internal
122