xref: /aosp_15_r20/external/openscreen/osp/public/message_demuxer.h (revision 3f982cf4871df8771c9d4abe6e9a6f8d829b2736)
1*3f982cf4SFabien Sanglard // Copyright 2018 The Chromium Authors. All rights reserved.
2*3f982cf4SFabien Sanglard // Use of this source code is governed by a BSD-style license that can be
3*3f982cf4SFabien Sanglard // found in the LICENSE file.
4*3f982cf4SFabien Sanglard 
5*3f982cf4SFabien Sanglard #ifndef OSP_PUBLIC_MESSAGE_DEMUXER_H_
6*3f982cf4SFabien Sanglard #define OSP_PUBLIC_MESSAGE_DEMUXER_H_
7*3f982cf4SFabien Sanglard 
8*3f982cf4SFabien Sanglard #include <map>
9*3f982cf4SFabien Sanglard #include <memory>
10*3f982cf4SFabien Sanglard #include <vector>
11*3f982cf4SFabien Sanglard 
12*3f982cf4SFabien Sanglard #include "osp/msgs/osp_messages.h"
13*3f982cf4SFabien Sanglard #include "platform/api/time.h"
14*3f982cf4SFabien Sanglard #include "platform/base/error.h"
15*3f982cf4SFabien Sanglard 
16*3f982cf4SFabien Sanglard namespace openscreen {
17*3f982cf4SFabien Sanglard namespace osp {
18*3f982cf4SFabien Sanglard 
19*3f982cf4SFabien Sanglard class QuicStream;
20*3f982cf4SFabien Sanglard 
21*3f982cf4SFabien Sanglard // This class separates QUIC stream data into CBOR messages by reading a type
22*3f982cf4SFabien Sanglard // prefix from the stream and passes those messages to any callback matching the
23*3f982cf4SFabien Sanglard // source endpoint and message type.  If there is no callback for a given
24*3f982cf4SFabien Sanglard // message type, it will also try a default message listener.
25*3f982cf4SFabien Sanglard class MessageDemuxer {
26*3f982cf4SFabien Sanglard  public:
27*3f982cf4SFabien Sanglard   class MessageCallback {
28*3f982cf4SFabien Sanglard    public:
29*3f982cf4SFabien Sanglard     virtual ~MessageCallback() = default;
30*3f982cf4SFabien Sanglard 
31*3f982cf4SFabien Sanglard     // |buffer| contains data for a message of type |message_type|.  However,
32*3f982cf4SFabien Sanglard     // the data may be incomplete, in which case the callback should return an
33*3f982cf4SFabien Sanglard     // error code of Error::Code::kCborIncompleteMessage.  This way,
34*3f982cf4SFabien Sanglard     // the MessageDemuxer knows to neither consume the data nor discard it as
35*3f982cf4SFabien Sanglard     // bad.
36*3f982cf4SFabien Sanglard     virtual ErrorOr<size_t> OnStreamMessage(uint64_t endpoint_id,
37*3f982cf4SFabien Sanglard                                             uint64_t connection_id,
38*3f982cf4SFabien Sanglard                                             msgs::Type message_type,
39*3f982cf4SFabien Sanglard                                             const uint8_t* buffer,
40*3f982cf4SFabien Sanglard                                             size_t buffer_size,
41*3f982cf4SFabien Sanglard                                             Clock::time_point now) = 0;
42*3f982cf4SFabien Sanglard   };
43*3f982cf4SFabien Sanglard 
44*3f982cf4SFabien Sanglard   class MessageWatch {
45*3f982cf4SFabien Sanglard    public:
46*3f982cf4SFabien Sanglard     MessageWatch();
47*3f982cf4SFabien Sanglard     MessageWatch(MessageDemuxer* parent,
48*3f982cf4SFabien Sanglard                  bool is_default,
49*3f982cf4SFabien Sanglard                  uint64_t endpoint_id,
50*3f982cf4SFabien Sanglard                  msgs::Type message_type);
51*3f982cf4SFabien Sanglard     MessageWatch(MessageWatch&&) noexcept;
52*3f982cf4SFabien Sanglard     ~MessageWatch();
53*3f982cf4SFabien Sanglard     MessageWatch& operator=(MessageWatch&&) noexcept;
54*3f982cf4SFabien Sanglard 
55*3f982cf4SFabien Sanglard     explicit operator bool() const { return parent_; }
56*3f982cf4SFabien Sanglard 
57*3f982cf4SFabien Sanglard    private:
58*3f982cf4SFabien Sanglard     MessageDemuxer* parent_ = nullptr;
59*3f982cf4SFabien Sanglard     bool is_default_;
60*3f982cf4SFabien Sanglard     uint64_t endpoint_id_;
61*3f982cf4SFabien Sanglard     msgs::Type message_type_;
62*3f982cf4SFabien Sanglard   };
63*3f982cf4SFabien Sanglard 
64*3f982cf4SFabien Sanglard   static constexpr size_t kDefaultBufferLimit = 1 << 16;
65*3f982cf4SFabien Sanglard 
66*3f982cf4SFabien Sanglard   MessageDemuxer(ClockNowFunctionPtr now_function, size_t buffer_limit);
67*3f982cf4SFabien Sanglard   ~MessageDemuxer();
68*3f982cf4SFabien Sanglard 
69*3f982cf4SFabien Sanglard   // Starts watching for messages of type |message_type| from the endpoint
70*3f982cf4SFabien Sanglard   // identified by |endpoint_id|.  When such a message arrives, or if some are
71*3f982cf4SFabien Sanglard   // already buffered, |callback| will be called with the message data.
72*3f982cf4SFabien Sanglard   MessageWatch WatchMessageType(uint64_t endpoint_id,
73*3f982cf4SFabien Sanglard                                 msgs::Type message_type,
74*3f982cf4SFabien Sanglard                                 MessageCallback* callback);
75*3f982cf4SFabien Sanglard 
76*3f982cf4SFabien Sanglard   // Starts watching for messages of type |message_type| from any endpoint when
77*3f982cf4SFabien Sanglard   // there is not callback set for its specific endpoint ID.
78*3f982cf4SFabien Sanglard   MessageWatch SetDefaultMessageTypeWatch(msgs::Type message_type,
79*3f982cf4SFabien Sanglard                                           MessageCallback* callback);
80*3f982cf4SFabien Sanglard 
81*3f982cf4SFabien Sanglard   // Gives data from |endpoint_id| to the demuxer for processing.
82*3f982cf4SFabien Sanglard   // TODO(btolsch): It'd be nice if errors could propagate out of here to close
83*3f982cf4SFabien Sanglard   // the stream.
84*3f982cf4SFabien Sanglard   void OnStreamData(uint64_t endpoint_id,
85*3f982cf4SFabien Sanglard                     uint64_t connection_id,
86*3f982cf4SFabien Sanglard                     const uint8_t* data,
87*3f982cf4SFabien Sanglard                     size_t data_size);
88*3f982cf4SFabien Sanglard 
89*3f982cf4SFabien Sanglard  private:
90*3f982cf4SFabien Sanglard   struct HandleStreamBufferResult {
91*3f982cf4SFabien Sanglard     bool handled;
92*3f982cf4SFabien Sanglard     size_t consumed;
93*3f982cf4SFabien Sanglard   };
94*3f982cf4SFabien Sanglard 
95*3f982cf4SFabien Sanglard   void StopWatchingMessageType(uint64_t endpoint_id, msgs::Type message_type);
96*3f982cf4SFabien Sanglard   void StopDefaultMessageTypeWatch(msgs::Type message_type);
97*3f982cf4SFabien Sanglard 
98*3f982cf4SFabien Sanglard   HandleStreamBufferResult HandleStreamBufferLoop(
99*3f982cf4SFabien Sanglard       uint64_t endpoint_id,
100*3f982cf4SFabien Sanglard       uint64_t connection_id,
101*3f982cf4SFabien Sanglard       std::map<uint64_t, std::map<msgs::Type, MessageCallback*>>::iterator
102*3f982cf4SFabien Sanglard           endpoint_entry,
103*3f982cf4SFabien Sanglard       std::vector<uint8_t>* buffer);
104*3f982cf4SFabien Sanglard 
105*3f982cf4SFabien Sanglard   HandleStreamBufferResult HandleStreamBuffer(
106*3f982cf4SFabien Sanglard       uint64_t endpoint_id,
107*3f982cf4SFabien Sanglard       uint64_t connection_id,
108*3f982cf4SFabien Sanglard       std::map<msgs::Type, MessageCallback*>* message_callbacks,
109*3f982cf4SFabien Sanglard       std::vector<uint8_t>* buffer);
110*3f982cf4SFabien Sanglard 
111*3f982cf4SFabien Sanglard   const ClockNowFunctionPtr now_function_;
112*3f982cf4SFabien Sanglard   const size_t buffer_limit_;
113*3f982cf4SFabien Sanglard   std::map<uint64_t, std::map<msgs::Type, MessageCallback*>> message_callbacks_;
114*3f982cf4SFabien Sanglard   std::map<msgs::Type, MessageCallback*> default_callbacks_;
115*3f982cf4SFabien Sanglard 
116*3f982cf4SFabien Sanglard   // Map<endpoint_id, Map<connection_id, data_buffer>>
117*3f982cf4SFabien Sanglard   std::map<uint64_t, std::map<uint64_t, std::vector<uint8_t>>> buffers_;
118*3f982cf4SFabien Sanglard };
119*3f982cf4SFabien Sanglard 
120*3f982cf4SFabien Sanglard // TODO(btolsch): Make sure all uses of MessageWatch are converted to this
121*3f982cf4SFabien Sanglard // resest function for readability.
122*3f982cf4SFabien Sanglard void StopWatching(MessageDemuxer::MessageWatch* watch);
123*3f982cf4SFabien Sanglard 
124*3f982cf4SFabien Sanglard class MessageTypeDecoder {
125*3f982cf4SFabien Sanglard  public:
126*3f982cf4SFabien Sanglard   static ErrorOr<msgs::Type> DecodeType(const std::vector<uint8_t>& buffer,
127*3f982cf4SFabien Sanglard                                         size_t* num_bytes_decoded);
128*3f982cf4SFabien Sanglard 
129*3f982cf4SFabien Sanglard  private:
130*3f982cf4SFabien Sanglard   static ErrorOr<uint64_t> DecodeVarUint(const std::vector<uint8_t>& buffer,
131*3f982cf4SFabien Sanglard                                          size_t* num_bytes_decoded);
132*3f982cf4SFabien Sanglard };
133*3f982cf4SFabien Sanglard 
134*3f982cf4SFabien Sanglard }  // namespace osp
135*3f982cf4SFabien Sanglard }  // namespace openscreen
136*3f982cf4SFabien Sanglard 
137*3f982cf4SFabien Sanglard #endif  // OSP_PUBLIC_MESSAGE_DEMUXER_H_
138