1 // Copyright 2024 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include "pw_async2/dispatcher_base.h" 17 #include "pw_channel/channel.h" 18 #include "pw_multibuf/allocator.h" 19 #include "pw_multibuf/multibuf.h" 20 #include "pw_status/status.h" 21 #include "pw_stream/stream.h" 22 #include "pw_sync/interrupt_spin_lock.h" 23 #include "pw_sync/thread_notification.h" 24 #include "pw_thread/thread.h" 25 26 namespace pw::channel { 27 namespace internal { 28 29 /// State for the stream-reading thread. 30 class StreamChannelReadState { 31 public: 32 StreamChannelReadState() = default; 33 StreamChannelReadState(const StreamChannelReadState&) = delete; 34 StreamChannelReadState& operator=(const StreamChannelReadState&) = delete; 35 StreamChannelReadState(StreamChannelReadState&&) = delete; 36 StreamChannelReadState& operator=(StreamChannelReadState&&) = delete; 37 38 /// Whether or not the `ReadLoop` already has a buffer available into which 39 /// data can be read. 40 bool HasBufferToFill(); 41 42 /// Provide a buffer for `ReadLoop` to read data into. 43 void ProvideBufferToFill(multibuf::MultiBuf&& buf); 44 45 /// Receives any available data processed by `ReadLoop`. 46 /// 47 /// If no data is available, schedules a wakeup of the task in `cx` when 48 /// new data arrives. 49 async2::Poll<Result<multibuf::MultiBuf>> PendFilledBuffer( 50 async2::Context& cx); 51 52 /// A loop which reads data from `reader` into buffers provided by 53 /// `ProvideBufferToFill` and then makes them available via 54 /// `PendFilledBuffer`. 55 /// 56 /// This is blocking and is intended to be run on an independent thread. 57 void ReadLoop(stream::Reader& reader); 58 59 private: 60 multibuf::OwnedChunk WaitForBufferToFillAndTakeFrontChunk(); 61 void ProvideFilledBuffer(multibuf::MultiBuf&& filled_buffer); 62 void SetReadError(Status status); 63 64 sync::ThreadNotification buffer_to_fill_available_; 65 async2::Waker on_buffer_filled_; 66 sync::InterruptSpinLock buffer_lock_; 67 multibuf::MultiBuf buffer_to_fill_ PW_GUARDED_BY(buffer_lock_); 68 multibuf::MultiBuf filled_buffer_ PW_GUARDED_BY(buffer_lock_); 69 Status status_ PW_GUARDED_BY(buffer_lock_); 70 }; 71 72 /// State for the stream-writing thread. 73 class StreamChannelWriteState { 74 public: 75 StreamChannelWriteState() = default; 76 StreamChannelWriteState(const StreamChannelWriteState&) = delete; 77 StreamChannelWriteState& operator=(const StreamChannelWriteState&) = delete; 78 StreamChannelWriteState(StreamChannelWriteState&&) = delete; 79 StreamChannelWriteState& operator=(StreamChannelWriteState&&) = delete; 80 81 /// Queues `buf` to be sent into `writer` via the `WriteLoop`. 82 /// 83 /// Returns a status indicating whether the `WriteLoop` has encountered 84 /// errors writing into `writer`. 85 Status SendData(multibuf::MultiBuf&& buf); 86 87 /// A loop which writes the data sent via `SendData` into `writer`. 88 /// 89 /// This is blocking and is intended to be run on an independent thread. 90 void WriteLoop(stream::Writer& writer); 91 92 private: 93 sync::ThreadNotification data_available_; 94 sync::InterruptSpinLock buffer_lock_; 95 multibuf::MultiBuf buffer_to_write_ PW_GUARDED_BY(buffer_lock_); 96 Status status_; 97 }; 98 99 } // namespace internal 100 101 /// @defgroup pw_channel_stream_channel 102 /// @{ 103 104 /// A channel which delegates to an underlying reader and writer stream. 105 /// 106 /// NOTE: this channel as well as its `reader` and `writer` must all continue to 107 /// exist for the duration of the program, as they are referenced by other 108 /// threads. 109 /// 110 /// This unfortunate requirement is due to the fact that `Stream::Read` and 111 /// `Stream::Write` are blocking. The stream reading and writing threaads 112 /// may be blocked on `Read` or `Write` calls, and therefore cannot cleanly 113 /// be shutdown. 114 class StreamChannel final 115 : public channel::Implement<channel::ByteReaderWriter> { 116 public: 117 StreamChannel(stream::Reader& reader, 118 const thread::Options& read_thread_options, 119 multibuf::MultiBufAllocator& read_allocator, 120 stream::Writer& writer, 121 const thread::Options& write_thread_option, 122 multibuf::MultiBufAllocator& write_allocator); 123 124 // Deprecated: prefer the two-allocator constructor in order to prevent reads 125 // and writes blocking on waiting for buffer space from the other. StreamChannel(multibuf::MultiBufAllocator & allocator,stream::Reader & reader,const thread::Options & read_thread_options,stream::Writer & writer,const thread::Options & write_thread_options)126 StreamChannel(multibuf::MultiBufAllocator& allocator, 127 stream::Reader& reader, 128 const thread::Options& read_thread_options, 129 stream::Writer& writer, 130 const thread::Options& write_thread_options) 131 : StreamChannel(reader, 132 read_thread_options, 133 allocator, 134 writer, 135 write_thread_options, 136 allocator) {} 137 138 // StreamChannel is referenced from other threads and is therefore not movable 139 // or copyable. 140 StreamChannel(const StreamChannel&) = delete; 141 StreamChannel& operator=(const StreamChannel&) = delete; 142 StreamChannel(StreamChannel&&) = delete; 143 StreamChannel& operator=(StreamChannel&&) = delete; 144 145 private: 146 // StreamChannel must live forever, as its state is referenced by other 147 // threads. 148 ~StreamChannel() final = default; 149 150 Status ProvideBufferIfAvailable(async2::Context& cx); 151 152 async2::Poll<Result<multibuf::MultiBuf>> DoPendRead( 153 async2::Context& cx) override; 154 155 async2::Poll<Status> DoPendReadyToWrite(async2::Context& cx) override; 156 DoPendAllocateWriteBuffer(async2::Context & cx,size_t min_bytes)157 async2::Poll<std::optional<multibuf::MultiBuf>> DoPendAllocateWriteBuffer( 158 async2::Context& cx, size_t min_bytes) override { 159 write_allocation_future_.SetDesiredSize(min_bytes); 160 return write_allocation_future_.Pend(cx); 161 } 162 163 Status DoStageWrite(multibuf::MultiBuf&& data) override; 164 DoPendWrite(async2::Context &)165 async2::Poll<Status> DoPendWrite(async2::Context&) override { 166 return OkStatus(); 167 } 168 DoPendClose(async2::Context &)169 async2::Poll<Status> DoPendClose(async2::Context&) override { 170 return async2::Ready(OkStatus()); 171 } 172 173 stream::Reader& reader_; 174 stream::Writer& writer_; 175 internal::StreamChannelReadState read_state_; 176 internal::StreamChannelWriteState write_state_; 177 multibuf::MultiBufAllocationFuture read_allocation_future_; 178 multibuf::MultiBufAllocationFuture write_allocation_future_; 179 }; 180 181 /// @} 182 183 } // namespace pw::channel 184