1*3f982cf4SFabien Sanglard // Copyright 2020 The Chromium Authors. All rights reserved.
2*3f982cf4SFabien Sanglard // Use of this source code is governed by a BSD-style license that can be
3*3f982cf4SFabien Sanglard // found in the LICENSE file.
4*3f982cf4SFabien Sanglard
5*3f982cf4SFabien Sanglard #include "cast/streaming/rpc_messenger.h"
6*3f982cf4SFabien Sanglard
7*3f982cf4SFabien Sanglard #include <memory>
8*3f982cf4SFabien Sanglard #include <string>
9*3f982cf4SFabien Sanglard #include <utility>
10*3f982cf4SFabien Sanglard
11*3f982cf4SFabien Sanglard #include "util/osp_logging.h"
12*3f982cf4SFabien Sanglard
13*3f982cf4SFabien Sanglard namespace openscreen {
14*3f982cf4SFabien Sanglard namespace cast {
15*3f982cf4SFabien Sanglard
16*3f982cf4SFabien Sanglard namespace {
17*3f982cf4SFabien Sanglard
operator <<(std::ostream & out,const RpcMessage & message)18*3f982cf4SFabien Sanglard std::ostream& operator<<(std::ostream& out, const RpcMessage& message) {
19*3f982cf4SFabien Sanglard out << "handle=" << message.handle() << ", proc=" << message.proc();
20*3f982cf4SFabien Sanglard switch (message.rpc_oneof_case()) {
21*3f982cf4SFabien Sanglard case RpcMessage::kIntegerValue:
22*3f982cf4SFabien Sanglard return out << ", integer_value=" << message.integer_value();
23*3f982cf4SFabien Sanglard case RpcMessage::kInteger64Value:
24*3f982cf4SFabien Sanglard return out << ", integer64_value=" << message.integer64_value();
25*3f982cf4SFabien Sanglard case RpcMessage::kDoubleValue:
26*3f982cf4SFabien Sanglard return out << ", double_value=" << message.double_value();
27*3f982cf4SFabien Sanglard case RpcMessage::kBooleanValue:
28*3f982cf4SFabien Sanglard return out << ", boolean_value=" << message.boolean_value();
29*3f982cf4SFabien Sanglard case RpcMessage::kStringValue:
30*3f982cf4SFabien Sanglard return out << ", string_value=" << message.string_value();
31*3f982cf4SFabien Sanglard default:
32*3f982cf4SFabien Sanglard return out << ", rpc_oneof=" << message.rpc_oneof_case();
33*3f982cf4SFabien Sanglard }
34*3f982cf4SFabien Sanglard
35*3f982cf4SFabien Sanglard OSP_NOTREACHED();
36*3f982cf4SFabien Sanglard }
37*3f982cf4SFabien Sanglard
38*3f982cf4SFabien Sanglard } // namespace
39*3f982cf4SFabien Sanglard
40*3f982cf4SFabien Sanglard constexpr RpcMessenger::Handle RpcMessenger::kInvalidHandle;
41*3f982cf4SFabien Sanglard constexpr RpcMessenger::Handle RpcMessenger::kAcquireRendererHandle;
42*3f982cf4SFabien Sanglard constexpr RpcMessenger::Handle RpcMessenger::kAcquireDemuxerHandle;
43*3f982cf4SFabien Sanglard constexpr RpcMessenger::Handle RpcMessenger::kFirstHandle;
44*3f982cf4SFabien Sanglard
RpcMessenger(SendMessageCallback send_message_cb)45*3f982cf4SFabien Sanglard RpcMessenger::RpcMessenger(SendMessageCallback send_message_cb)
46*3f982cf4SFabien Sanglard : next_handle_(kFirstHandle),
47*3f982cf4SFabien Sanglard send_message_cb_(std::move(send_message_cb)) {}
48*3f982cf4SFabien Sanglard
~RpcMessenger()49*3f982cf4SFabien Sanglard RpcMessenger::~RpcMessenger() {
50*3f982cf4SFabien Sanglard receive_callbacks_.clear();
51*3f982cf4SFabien Sanglard }
52*3f982cf4SFabien Sanglard
GetUniqueHandle()53*3f982cf4SFabien Sanglard RpcMessenger::Handle RpcMessenger::GetUniqueHandle() {
54*3f982cf4SFabien Sanglard return next_handle_++;
55*3f982cf4SFabien Sanglard }
56*3f982cf4SFabien Sanglard
RegisterMessageReceiverCallback(RpcMessenger::Handle handle,ReceiveMessageCallback callback)57*3f982cf4SFabien Sanglard void RpcMessenger::RegisterMessageReceiverCallback(
58*3f982cf4SFabien Sanglard RpcMessenger::Handle handle,
59*3f982cf4SFabien Sanglard ReceiveMessageCallback callback) {
60*3f982cf4SFabien Sanglard OSP_DCHECK(receive_callbacks_.find(handle) == receive_callbacks_.end())
61*3f982cf4SFabien Sanglard << "must deregister before re-registering";
62*3f982cf4SFabien Sanglard receive_callbacks_.emplace_back(handle, std::move(callback));
63*3f982cf4SFabien Sanglard }
64*3f982cf4SFabien Sanglard
UnregisterMessageReceiverCallback(RpcMessenger::Handle handle)65*3f982cf4SFabien Sanglard void RpcMessenger::UnregisterMessageReceiverCallback(RpcMessenger::Handle handle) {
66*3f982cf4SFabien Sanglard receive_callbacks_.erase_key(handle);
67*3f982cf4SFabien Sanglard }
68*3f982cf4SFabien Sanglard
ProcessMessageFromRemote(const uint8_t * message,std::size_t message_len)69*3f982cf4SFabien Sanglard void RpcMessenger::ProcessMessageFromRemote(const uint8_t* message,
70*3f982cf4SFabien Sanglard std::size_t message_len) {
71*3f982cf4SFabien Sanglard auto rpc = std::make_unique<RpcMessage>();
72*3f982cf4SFabien Sanglard if (!rpc->ParseFromArray(message, message_len)) {
73*3f982cf4SFabien Sanglard OSP_DLOG_WARN << "Failed to parse RPC message from remote: \"" << message
74*3f982cf4SFabien Sanglard << "\"";
75*3f982cf4SFabien Sanglard return;
76*3f982cf4SFabien Sanglard }
77*3f982cf4SFabien Sanglard ProcessMessageFromRemote(std::move(rpc));
78*3f982cf4SFabien Sanglard }
79*3f982cf4SFabien Sanglard
ProcessMessageFromRemote(std::unique_ptr<RpcMessage> message)80*3f982cf4SFabien Sanglard void RpcMessenger::ProcessMessageFromRemote(std::unique_ptr<RpcMessage> message) {
81*3f982cf4SFabien Sanglard const auto entry = receive_callbacks_.find(message->handle());
82*3f982cf4SFabien Sanglard if (entry == receive_callbacks_.end()) {
83*3f982cf4SFabien Sanglard OSP_VLOG << "Dropping message due to unregistered handle: "
84*3f982cf4SFabien Sanglard << message->handle();
85*3f982cf4SFabien Sanglard return;
86*3f982cf4SFabien Sanglard }
87*3f982cf4SFabien Sanglard entry->second(std::move(message));
88*3f982cf4SFabien Sanglard }
89*3f982cf4SFabien Sanglard
SendMessageToRemote(const RpcMessage & rpc)90*3f982cf4SFabien Sanglard void RpcMessenger::SendMessageToRemote(const RpcMessage& rpc) {
91*3f982cf4SFabien Sanglard OSP_VLOG << "Sending RPC message: " << rpc;
92*3f982cf4SFabien Sanglard std::vector<uint8_t> message(rpc.ByteSizeLong());
93*3f982cf4SFabien Sanglard rpc.SerializeToArray(message.data(), message.size());
94*3f982cf4SFabien Sanglard send_message_cb_(std::move(message));
95*3f982cf4SFabien Sanglard }
96*3f982cf4SFabien Sanglard
IsRegisteredForTesting(RpcMessenger::Handle handle)97*3f982cf4SFabien Sanglard bool RpcMessenger::IsRegisteredForTesting(RpcMessenger::Handle handle) {
98*3f982cf4SFabien Sanglard return receive_callbacks_.find(handle) != receive_callbacks_.end();
99*3f982cf4SFabien Sanglard }
100*3f982cf4SFabien Sanglard
GetWeakPtr()101*3f982cf4SFabien Sanglard WeakPtr<RpcMessenger> RpcMessenger::GetWeakPtr() {
102*3f982cf4SFabien Sanglard return weak_factory_.GetWeakPtr();
103*3f982cf4SFabien Sanglard }
104*3f982cf4SFabien Sanglard
105*3f982cf4SFabien Sanglard } // namespace cast
106*3f982cf4SFabien Sanglard } // namespace openscreen
107