xref: /aosp_15_r20/external/pigweed/pw_rpc_transport/public/pw_rpc_transport/stream_rpc_dispatcher.h (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <array>
17 #include <atomic>
18 #include <cstddef>
19 
20 #include "pw_metric/metric.h"
21 #include "pw_rpc_transport/egress_ingress.h"
22 #include "pw_status/status.h"
23 #include "pw_stream/stream.h"
24 #include "pw_thread/thread.h"
25 
26 namespace pw::rpc {
27 
28 template <size_t kReadSize>
29 class StreamRpcDispatcher : public pw::thread::ThreadCore {
30  public:
StreamRpcDispatcher(pw::stream::Reader & reader,pw::rpc::RpcIngressHandler & ingress_handler)31   StreamRpcDispatcher(pw::stream::Reader& reader,
32                       pw::rpc::RpcIngressHandler& ingress_handler)
33       : reader_(reader), ingress_handler_(ingress_handler) {}
~StreamRpcDispatcher()34   ~StreamRpcDispatcher() override { Stop(); }
35 
metrics()36   const metric::Group& metrics() const { return metrics_; }
37 
num_read_errors()38   uint32_t num_read_errors() const { return read_errors_.value(); }
num_egress_errors()39   uint32_t num_egress_errors() const { return egress_errors_.value(); }
40 
41   // Once stopped, will no longer process data.
Stop()42   void Stop() {
43     if (stopped_) {
44       return;
45     }
46     stopped_ = true;
47   }
48 
49  protected:
50   // From pw::thread::ThreadCore.
Run()51   void Run() final {
52     while (!stopped_) {
53       auto read = reader_.Read(read_buffer_);
54       if (!read.ok()) {
55         read_errors_.Increment();
56         continue;
57       }
58 
59       if (const auto status = ingress_handler_.ProcessIncomingData(*read);
60           !status.ok()) {
61         egress_errors_.Increment();
62         continue;
63       }
64     }
65   }
66 
67  private:
68   std::array<std::byte, kReadSize> read_buffer_ = {};
69   pw::stream::Reader& reader_;
70   pw::rpc::RpcIngressHandler& ingress_handler_;
71   std::atomic<bool> stopped_ = false;
72   PW_METRIC_GROUP(metrics_, "pw_rpc_stream_rpc_dispatcher");
73   PW_METRIC(metrics_, read_errors_, "read_errors", 0u);
74   PW_METRIC(metrics_, egress_errors_, "egress_errors", 0u);
75 };
76 
77 }  // namespace pw::rpc
78