xref: /aosp_15_r20/external/pigweed/pw_channel/public/pw_channel/stream_channel.h (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include "pw_async2/dispatcher_base.h"
17 #include "pw_channel/channel.h"
18 #include "pw_multibuf/allocator.h"
19 #include "pw_multibuf/multibuf.h"
20 #include "pw_status/status.h"
21 #include "pw_stream/stream.h"
22 #include "pw_sync/interrupt_spin_lock.h"
23 #include "pw_sync/thread_notification.h"
24 #include "pw_thread/thread.h"
25 
26 namespace pw::channel {
27 namespace internal {
28 
29 /// State for the stream-reading thread.
30 class StreamChannelReadState {
31  public:
32   StreamChannelReadState() = default;
33   StreamChannelReadState(const StreamChannelReadState&) = delete;
34   StreamChannelReadState& operator=(const StreamChannelReadState&) = delete;
35   StreamChannelReadState(StreamChannelReadState&&) = delete;
36   StreamChannelReadState& operator=(StreamChannelReadState&&) = delete;
37 
38   /// Whether or not the `ReadLoop` already has a buffer available into which
39   /// data can be read.
40   bool HasBufferToFill();
41 
42   /// Provide a buffer for `ReadLoop` to read data into.
43   void ProvideBufferToFill(multibuf::MultiBuf&& buf);
44 
45   /// Receives any available data processed by `ReadLoop`.
46   ///
47   /// If no data is available, schedules a wakeup of the task in `cx` when
48   /// new data arrives.
49   async2::Poll<Result<multibuf::MultiBuf>> PendFilledBuffer(
50       async2::Context& cx);
51 
52   /// A loop which reads data from `reader` into buffers provided by
53   /// `ProvideBufferToFill` and then makes them available via
54   /// `PendFilledBuffer`.
55   ///
56   /// This is blocking and is intended to be run on an independent thread.
57   void ReadLoop(stream::Reader& reader);
58 
59  private:
60   multibuf::OwnedChunk WaitForBufferToFillAndTakeFrontChunk();
61   void ProvideFilledBuffer(multibuf::MultiBuf&& filled_buffer);
62   void SetReadError(Status status);
63 
64   sync::ThreadNotification buffer_to_fill_available_;
65   async2::Waker on_buffer_filled_;
66   sync::InterruptSpinLock buffer_lock_;
67   multibuf::MultiBuf buffer_to_fill_ PW_GUARDED_BY(buffer_lock_);
68   multibuf::MultiBuf filled_buffer_ PW_GUARDED_BY(buffer_lock_);
69   Status status_ PW_GUARDED_BY(buffer_lock_);
70 };
71 
72 /// State for the stream-writing thread.
73 class StreamChannelWriteState {
74  public:
75   StreamChannelWriteState() = default;
76   StreamChannelWriteState(const StreamChannelWriteState&) = delete;
77   StreamChannelWriteState& operator=(const StreamChannelWriteState&) = delete;
78   StreamChannelWriteState(StreamChannelWriteState&&) = delete;
79   StreamChannelWriteState& operator=(StreamChannelWriteState&&) = delete;
80 
81   /// Queues `buf` to be sent into `writer` via the `WriteLoop`.
82   ///
83   /// Returns a status indicating whether the `WriteLoop` has encountered
84   /// errors writing into `writer`.
85   Status SendData(multibuf::MultiBuf&& buf);
86 
87   /// A loop which writes the data sent via `SendData` into `writer`.
88   ///
89   /// This is blocking and is intended to be run on an independent thread.
90   void WriteLoop(stream::Writer& writer);
91 
92  private:
93   sync::ThreadNotification data_available_;
94   sync::InterruptSpinLock buffer_lock_;
95   multibuf::MultiBuf buffer_to_write_ PW_GUARDED_BY(buffer_lock_);
96   Status status_;
97 };
98 
99 }  // namespace internal
100 
101 /// @defgroup pw_channel_stream_channel
102 /// @{
103 
104 /// A channel which delegates to an underlying reader and writer stream.
105 ///
106 /// NOTE: this channel as well as its `reader` and `writer` must all continue to
107 /// exist for the duration of the program, as they are referenced by other
108 /// threads.
109 ///
110 /// This unfortunate requirement is due to the fact that `Stream::Read` and
111 /// `Stream::Write` are blocking. The stream reading and writing threaads
112 /// may be blocked on `Read` or `Write` calls, and therefore cannot cleanly
113 /// be shutdown.
114 class StreamChannel final
115     : public channel::Implement<channel::ByteReaderWriter> {
116  public:
117   StreamChannel(stream::Reader& reader,
118                 const thread::Options& read_thread_options,
119                 multibuf::MultiBufAllocator& read_allocator,
120                 stream::Writer& writer,
121                 const thread::Options& write_thread_option,
122                 multibuf::MultiBufAllocator& write_allocator);
123 
124   // Deprecated: prefer the two-allocator constructor in order to prevent reads
125   // and writes blocking on waiting for buffer space from the other.
StreamChannel(multibuf::MultiBufAllocator & allocator,stream::Reader & reader,const thread::Options & read_thread_options,stream::Writer & writer,const thread::Options & write_thread_options)126   StreamChannel(multibuf::MultiBufAllocator& allocator,
127                 stream::Reader& reader,
128                 const thread::Options& read_thread_options,
129                 stream::Writer& writer,
130                 const thread::Options& write_thread_options)
131       : StreamChannel(reader,
132                       read_thread_options,
133                       allocator,
134                       writer,
135                       write_thread_options,
136                       allocator) {}
137 
138   // StreamChannel is referenced from other threads and is therefore not movable
139   // or copyable.
140   StreamChannel(const StreamChannel&) = delete;
141   StreamChannel& operator=(const StreamChannel&) = delete;
142   StreamChannel(StreamChannel&&) = delete;
143   StreamChannel& operator=(StreamChannel&&) = delete;
144 
145  private:
146   // StreamChannel must live forever, as its state is referenced by other
147   // threads.
148   ~StreamChannel() final = default;
149 
150   Status ProvideBufferIfAvailable(async2::Context& cx);
151 
152   async2::Poll<Result<multibuf::MultiBuf>> DoPendRead(
153       async2::Context& cx) override;
154 
155   async2::Poll<Status> DoPendReadyToWrite(async2::Context& cx) override;
156 
DoPendAllocateWriteBuffer(async2::Context & cx,size_t min_bytes)157   async2::Poll<std::optional<multibuf::MultiBuf>> DoPendAllocateWriteBuffer(
158       async2::Context& cx, size_t min_bytes) override {
159     write_allocation_future_.SetDesiredSize(min_bytes);
160     return write_allocation_future_.Pend(cx);
161   }
162 
163   Status DoStageWrite(multibuf::MultiBuf&& data) override;
164 
DoPendWrite(async2::Context &)165   async2::Poll<Status> DoPendWrite(async2::Context&) override {
166     return OkStatus();
167   }
168 
DoPendClose(async2::Context &)169   async2::Poll<Status> DoPendClose(async2::Context&) override {
170     return async2::Ready(OkStatus());
171   }
172 
173   stream::Reader& reader_;
174   stream::Writer& writer_;
175   internal::StreamChannelReadState read_state_;
176   internal::StreamChannelWriteState write_state_;
177   multibuf::MultiBufAllocationFuture read_allocation_future_;
178   multibuf::MultiBufAllocationFuture write_allocation_future_;
179 };
180 
181 /// @}
182 
183 }  // namespace pw::channel
184