1 // Copyright 2020 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "cast/streaming/rpc_messenger.h"
6
7 #include <memory>
8 #include <string>
9 #include <utility>
10
11 #include "util/osp_logging.h"
12
13 namespace openscreen {
14 namespace cast {
15
16 namespace {
17
operator <<(std::ostream & out,const RpcMessage & message)18 std::ostream& operator<<(std::ostream& out, const RpcMessage& message) {
19 out << "handle=" << message.handle() << ", proc=" << message.proc();
20 switch (message.rpc_oneof_case()) {
21 case RpcMessage::kIntegerValue:
22 return out << ", integer_value=" << message.integer_value();
23 case RpcMessage::kInteger64Value:
24 return out << ", integer64_value=" << message.integer64_value();
25 case RpcMessage::kDoubleValue:
26 return out << ", double_value=" << message.double_value();
27 case RpcMessage::kBooleanValue:
28 return out << ", boolean_value=" << message.boolean_value();
29 case RpcMessage::kStringValue:
30 return out << ", string_value=" << message.string_value();
31 default:
32 return out << ", rpc_oneof=" << message.rpc_oneof_case();
33 }
34
35 OSP_NOTREACHED();
36 }
37
38 } // namespace
39
40 constexpr RpcMessenger::Handle RpcMessenger::kInvalidHandle;
41 constexpr RpcMessenger::Handle RpcMessenger::kAcquireRendererHandle;
42 constexpr RpcMessenger::Handle RpcMessenger::kAcquireDemuxerHandle;
43 constexpr RpcMessenger::Handle RpcMessenger::kFirstHandle;
44
RpcMessenger(SendMessageCallback send_message_cb)45 RpcMessenger::RpcMessenger(SendMessageCallback send_message_cb)
46 : next_handle_(kFirstHandle),
47 send_message_cb_(std::move(send_message_cb)) {}
48
~RpcMessenger()49 RpcMessenger::~RpcMessenger() {
50 receive_callbacks_.clear();
51 }
52
GetUniqueHandle()53 RpcMessenger::Handle RpcMessenger::GetUniqueHandle() {
54 return next_handle_++;
55 }
56
RegisterMessageReceiverCallback(RpcMessenger::Handle handle,ReceiveMessageCallback callback)57 void RpcMessenger::RegisterMessageReceiverCallback(
58 RpcMessenger::Handle handle,
59 ReceiveMessageCallback callback) {
60 OSP_DCHECK(receive_callbacks_.find(handle) == receive_callbacks_.end())
61 << "must deregister before re-registering";
62 receive_callbacks_.emplace_back(handle, std::move(callback));
63 }
64
UnregisterMessageReceiverCallback(RpcMessenger::Handle handle)65 void RpcMessenger::UnregisterMessageReceiverCallback(RpcMessenger::Handle handle) {
66 receive_callbacks_.erase_key(handle);
67 }
68
ProcessMessageFromRemote(const uint8_t * message,std::size_t message_len)69 void RpcMessenger::ProcessMessageFromRemote(const uint8_t* message,
70 std::size_t message_len) {
71 auto rpc = std::make_unique<RpcMessage>();
72 if (!rpc->ParseFromArray(message, message_len)) {
73 OSP_DLOG_WARN << "Failed to parse RPC message from remote: \"" << message
74 << "\"";
75 return;
76 }
77 ProcessMessageFromRemote(std::move(rpc));
78 }
79
ProcessMessageFromRemote(std::unique_ptr<RpcMessage> message)80 void RpcMessenger::ProcessMessageFromRemote(std::unique_ptr<RpcMessage> message) {
81 const auto entry = receive_callbacks_.find(message->handle());
82 if (entry == receive_callbacks_.end()) {
83 OSP_VLOG << "Dropping message due to unregistered handle: "
84 << message->handle();
85 return;
86 }
87 entry->second(std::move(message));
88 }
89
SendMessageToRemote(const RpcMessage & rpc)90 void RpcMessenger::SendMessageToRemote(const RpcMessage& rpc) {
91 OSP_VLOG << "Sending RPC message: " << rpc;
92 std::vector<uint8_t> message(rpc.ByteSizeLong());
93 rpc.SerializeToArray(message.data(), message.size());
94 send_message_cb_(std::move(message));
95 }
96
IsRegisteredForTesting(RpcMessenger::Handle handle)97 bool RpcMessenger::IsRegisteredForTesting(RpcMessenger::Handle handle) {
98 return receive_callbacks_.find(handle) != receive_callbacks_.end();
99 }
100
GetWeakPtr()101 WeakPtr<RpcMessenger> RpcMessenger::GetWeakPtr() {
102 return weak_factory_.GetWeakPtr();
103 }
104
105 } // namespace cast
106 } // namespace openscreen
107