1 // Copyright 2023 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include <array> 17 #include <atomic> 18 #include <cstddef> 19 20 #include "pw_metric/metric.h" 21 #include "pw_rpc_transport/egress_ingress.h" 22 #include "pw_status/status.h" 23 #include "pw_stream/stream.h" 24 #include "pw_thread/thread.h" 25 26 namespace pw::rpc { 27 28 template <size_t kReadSize> 29 class StreamRpcDispatcher : public pw::thread::ThreadCore { 30 public: StreamRpcDispatcher(pw::stream::Reader & reader,pw::rpc::RpcIngressHandler & ingress_handler)31 StreamRpcDispatcher(pw::stream::Reader& reader, 32 pw::rpc::RpcIngressHandler& ingress_handler) 33 : reader_(reader), ingress_handler_(ingress_handler) {} ~StreamRpcDispatcher()34 ~StreamRpcDispatcher() override { Stop(); } 35 metrics()36 const metric::Group& metrics() const { return metrics_; } 37 num_read_errors()38 uint32_t num_read_errors() const { return read_errors_.value(); } num_egress_errors()39 uint32_t num_egress_errors() const { return egress_errors_.value(); } 40 41 // Once stopped, will no longer process data. Stop()42 void Stop() { 43 if (stopped_) { 44 return; 45 } 46 stopped_ = true; 47 } 48 49 protected: 50 // From pw::thread::ThreadCore. Run()51 void Run() final { 52 while (!stopped_) { 53 auto read = reader_.Read(read_buffer_); 54 if (!read.ok()) { 55 read_errors_.Increment(); 56 continue; 57 } 58 59 if (const auto status = ingress_handler_.ProcessIncomingData(*read); 60 !status.ok()) { 61 egress_errors_.Increment(); 62 continue; 63 } 64 } 65 } 66 67 private: 68 std::array<std::byte, kReadSize> read_buffer_ = {}; 69 pw::stream::Reader& reader_; 70 pw::rpc::RpcIngressHandler& ingress_handler_; 71 std::atomic<bool> stopped_ = false; 72 PW_METRIC_GROUP(metrics_, "pw_rpc_stream_rpc_dispatcher"); 73 PW_METRIC(metrics_, read_errors_, "read_errors", 0u); 74 PW_METRIC(metrics_, egress_errors_, "egress_errors", 0u); 75 }; 76 77 } // namespace pw::rpc 78