1*3f982cf4SFabien Sanglard // Copyright 2018 The Chromium Authors. All rights reserved. 2*3f982cf4SFabien Sanglard // Use of this source code is governed by a BSD-style license that can be 3*3f982cf4SFabien Sanglard // found in the LICENSE file. 4*3f982cf4SFabien Sanglard 5*3f982cf4SFabien Sanglard #ifndef OSP_PUBLIC_MESSAGE_DEMUXER_H_ 6*3f982cf4SFabien Sanglard #define OSP_PUBLIC_MESSAGE_DEMUXER_H_ 7*3f982cf4SFabien Sanglard 8*3f982cf4SFabien Sanglard #include <map> 9*3f982cf4SFabien Sanglard #include <memory> 10*3f982cf4SFabien Sanglard #include <vector> 11*3f982cf4SFabien Sanglard 12*3f982cf4SFabien Sanglard #include "osp/msgs/osp_messages.h" 13*3f982cf4SFabien Sanglard #include "platform/api/time.h" 14*3f982cf4SFabien Sanglard #include "platform/base/error.h" 15*3f982cf4SFabien Sanglard 16*3f982cf4SFabien Sanglard namespace openscreen { 17*3f982cf4SFabien Sanglard namespace osp { 18*3f982cf4SFabien Sanglard 19*3f982cf4SFabien Sanglard class QuicStream; 20*3f982cf4SFabien Sanglard 21*3f982cf4SFabien Sanglard // This class separates QUIC stream data into CBOR messages by reading a type 22*3f982cf4SFabien Sanglard // prefix from the stream and passes those messages to any callback matching the 23*3f982cf4SFabien Sanglard // source endpoint and message type. If there is no callback for a given 24*3f982cf4SFabien Sanglard // message type, it will also try a default message listener. 25*3f982cf4SFabien Sanglard class MessageDemuxer { 26*3f982cf4SFabien Sanglard public: 27*3f982cf4SFabien Sanglard class MessageCallback { 28*3f982cf4SFabien Sanglard public: 29*3f982cf4SFabien Sanglard virtual ~MessageCallback() = default; 30*3f982cf4SFabien Sanglard 31*3f982cf4SFabien Sanglard // |buffer| contains data for a message of type |message_type|. However, 32*3f982cf4SFabien Sanglard // the data may be incomplete, in which case the callback should return an 33*3f982cf4SFabien Sanglard // error code of Error::Code::kCborIncompleteMessage. This way, 34*3f982cf4SFabien Sanglard // the MessageDemuxer knows to neither consume the data nor discard it as 35*3f982cf4SFabien Sanglard // bad. 36*3f982cf4SFabien Sanglard virtual ErrorOr<size_t> OnStreamMessage(uint64_t endpoint_id, 37*3f982cf4SFabien Sanglard uint64_t connection_id, 38*3f982cf4SFabien Sanglard msgs::Type message_type, 39*3f982cf4SFabien Sanglard const uint8_t* buffer, 40*3f982cf4SFabien Sanglard size_t buffer_size, 41*3f982cf4SFabien Sanglard Clock::time_point now) = 0; 42*3f982cf4SFabien Sanglard }; 43*3f982cf4SFabien Sanglard 44*3f982cf4SFabien Sanglard class MessageWatch { 45*3f982cf4SFabien Sanglard public: 46*3f982cf4SFabien Sanglard MessageWatch(); 47*3f982cf4SFabien Sanglard MessageWatch(MessageDemuxer* parent, 48*3f982cf4SFabien Sanglard bool is_default, 49*3f982cf4SFabien Sanglard uint64_t endpoint_id, 50*3f982cf4SFabien Sanglard msgs::Type message_type); 51*3f982cf4SFabien Sanglard MessageWatch(MessageWatch&&) noexcept; 52*3f982cf4SFabien Sanglard ~MessageWatch(); 53*3f982cf4SFabien Sanglard MessageWatch& operator=(MessageWatch&&) noexcept; 54*3f982cf4SFabien Sanglard 55*3f982cf4SFabien Sanglard explicit operator bool() const { return parent_; } 56*3f982cf4SFabien Sanglard 57*3f982cf4SFabien Sanglard private: 58*3f982cf4SFabien Sanglard MessageDemuxer* parent_ = nullptr; 59*3f982cf4SFabien Sanglard bool is_default_; 60*3f982cf4SFabien Sanglard uint64_t endpoint_id_; 61*3f982cf4SFabien Sanglard msgs::Type message_type_; 62*3f982cf4SFabien Sanglard }; 63*3f982cf4SFabien Sanglard 64*3f982cf4SFabien Sanglard static constexpr size_t kDefaultBufferLimit = 1 << 16; 65*3f982cf4SFabien Sanglard 66*3f982cf4SFabien Sanglard MessageDemuxer(ClockNowFunctionPtr now_function, size_t buffer_limit); 67*3f982cf4SFabien Sanglard ~MessageDemuxer(); 68*3f982cf4SFabien Sanglard 69*3f982cf4SFabien Sanglard // Starts watching for messages of type |message_type| from the endpoint 70*3f982cf4SFabien Sanglard // identified by |endpoint_id|. When such a message arrives, or if some are 71*3f982cf4SFabien Sanglard // already buffered, |callback| will be called with the message data. 72*3f982cf4SFabien Sanglard MessageWatch WatchMessageType(uint64_t endpoint_id, 73*3f982cf4SFabien Sanglard msgs::Type message_type, 74*3f982cf4SFabien Sanglard MessageCallback* callback); 75*3f982cf4SFabien Sanglard 76*3f982cf4SFabien Sanglard // Starts watching for messages of type |message_type| from any endpoint when 77*3f982cf4SFabien Sanglard // there is not callback set for its specific endpoint ID. 78*3f982cf4SFabien Sanglard MessageWatch SetDefaultMessageTypeWatch(msgs::Type message_type, 79*3f982cf4SFabien Sanglard MessageCallback* callback); 80*3f982cf4SFabien Sanglard 81*3f982cf4SFabien Sanglard // Gives data from |endpoint_id| to the demuxer for processing. 82*3f982cf4SFabien Sanglard // TODO(btolsch): It'd be nice if errors could propagate out of here to close 83*3f982cf4SFabien Sanglard // the stream. 84*3f982cf4SFabien Sanglard void OnStreamData(uint64_t endpoint_id, 85*3f982cf4SFabien Sanglard uint64_t connection_id, 86*3f982cf4SFabien Sanglard const uint8_t* data, 87*3f982cf4SFabien Sanglard size_t data_size); 88*3f982cf4SFabien Sanglard 89*3f982cf4SFabien Sanglard private: 90*3f982cf4SFabien Sanglard struct HandleStreamBufferResult { 91*3f982cf4SFabien Sanglard bool handled; 92*3f982cf4SFabien Sanglard size_t consumed; 93*3f982cf4SFabien Sanglard }; 94*3f982cf4SFabien Sanglard 95*3f982cf4SFabien Sanglard void StopWatchingMessageType(uint64_t endpoint_id, msgs::Type message_type); 96*3f982cf4SFabien Sanglard void StopDefaultMessageTypeWatch(msgs::Type message_type); 97*3f982cf4SFabien Sanglard 98*3f982cf4SFabien Sanglard HandleStreamBufferResult HandleStreamBufferLoop( 99*3f982cf4SFabien Sanglard uint64_t endpoint_id, 100*3f982cf4SFabien Sanglard uint64_t connection_id, 101*3f982cf4SFabien Sanglard std::map<uint64_t, std::map<msgs::Type, MessageCallback*>>::iterator 102*3f982cf4SFabien Sanglard endpoint_entry, 103*3f982cf4SFabien Sanglard std::vector<uint8_t>* buffer); 104*3f982cf4SFabien Sanglard 105*3f982cf4SFabien Sanglard HandleStreamBufferResult HandleStreamBuffer( 106*3f982cf4SFabien Sanglard uint64_t endpoint_id, 107*3f982cf4SFabien Sanglard uint64_t connection_id, 108*3f982cf4SFabien Sanglard std::map<msgs::Type, MessageCallback*>* message_callbacks, 109*3f982cf4SFabien Sanglard std::vector<uint8_t>* buffer); 110*3f982cf4SFabien Sanglard 111*3f982cf4SFabien Sanglard const ClockNowFunctionPtr now_function_; 112*3f982cf4SFabien Sanglard const size_t buffer_limit_; 113*3f982cf4SFabien Sanglard std::map<uint64_t, std::map<msgs::Type, MessageCallback*>> message_callbacks_; 114*3f982cf4SFabien Sanglard std::map<msgs::Type, MessageCallback*> default_callbacks_; 115*3f982cf4SFabien Sanglard 116*3f982cf4SFabien Sanglard // Map<endpoint_id, Map<connection_id, data_buffer>> 117*3f982cf4SFabien Sanglard std::map<uint64_t, std::map<uint64_t, std::vector<uint8_t>>> buffers_; 118*3f982cf4SFabien Sanglard }; 119*3f982cf4SFabien Sanglard 120*3f982cf4SFabien Sanglard // TODO(btolsch): Make sure all uses of MessageWatch are converted to this 121*3f982cf4SFabien Sanglard // resest function for readability. 122*3f982cf4SFabien Sanglard void StopWatching(MessageDemuxer::MessageWatch* watch); 123*3f982cf4SFabien Sanglard 124*3f982cf4SFabien Sanglard class MessageTypeDecoder { 125*3f982cf4SFabien Sanglard public: 126*3f982cf4SFabien Sanglard static ErrorOr<msgs::Type> DecodeType(const std::vector<uint8_t>& buffer, 127*3f982cf4SFabien Sanglard size_t* num_bytes_decoded); 128*3f982cf4SFabien Sanglard 129*3f982cf4SFabien Sanglard private: 130*3f982cf4SFabien Sanglard static ErrorOr<uint64_t> DecodeVarUint(const std::vector<uint8_t>& buffer, 131*3f982cf4SFabien Sanglard size_t* num_bytes_decoded); 132*3f982cf4SFabien Sanglard }; 133*3f982cf4SFabien Sanglard 134*3f982cf4SFabien Sanglard } // namespace osp 135*3f982cf4SFabien Sanglard } // namespace openscreen 136*3f982cf4SFabien Sanglard 137*3f982cf4SFabien Sanglard #endif // OSP_PUBLIC_MESSAGE_DEMUXER_H_ 138