xref: /aosp_15_r20/external/openscreen/cast/streaming/session_messenger.cc (revision 3f982cf4871df8771c9d4abe6e9a6f8d829b2736)
1*3f982cf4SFabien Sanglard // Copyright 2020 The Chromium Authors. All rights reserved.
2*3f982cf4SFabien Sanglard // Use of this source code is governed by a BSD-style license that can be
3*3f982cf4SFabien Sanglard // found in the LICENSE file.
4*3f982cf4SFabien Sanglard 
5*3f982cf4SFabien Sanglard #include "cast/streaming/session_messenger.h"
6*3f982cf4SFabien Sanglard 
7*3f982cf4SFabien Sanglard #include "absl/strings/ascii.h"
8*3f982cf4SFabien Sanglard #include "cast/common/public/message_port.h"
9*3f982cf4SFabien Sanglard #include "cast/streaming/message_fields.h"
10*3f982cf4SFabien Sanglard #include "util/json/json_helpers.h"
11*3f982cf4SFabien Sanglard #include "util/json/json_serialization.h"
12*3f982cf4SFabien Sanglard #include "util/osp_logging.h"
13*3f982cf4SFabien Sanglard 
14*3f982cf4SFabien Sanglard namespace openscreen {
15*3f982cf4SFabien Sanglard namespace cast {
16*3f982cf4SFabien Sanglard 
17*3f982cf4SFabien Sanglard namespace {
18*3f982cf4SFabien Sanglard 
ReplyIfTimedOut(int sequence_number,ReceiverMessage::Type reply_type,std::vector<std::pair<int,SenderSessionMessenger::ReplyCallback>> * replies)19*3f982cf4SFabien Sanglard void ReplyIfTimedOut(
20*3f982cf4SFabien Sanglard     int sequence_number,
21*3f982cf4SFabien Sanglard     ReceiverMessage::Type reply_type,
22*3f982cf4SFabien Sanglard     std::vector<std::pair<int, SenderSessionMessenger::ReplyCallback>>*
23*3f982cf4SFabien Sanglard         replies) {
24*3f982cf4SFabien Sanglard   for (auto it = replies->begin(); it != replies->end(); ++it) {
25*3f982cf4SFabien Sanglard     if (it->first == sequence_number) {
26*3f982cf4SFabien Sanglard       OSP_VLOG
27*3f982cf4SFabien Sanglard           << "Replying with empty message due to timeout for sequence number: "
28*3f982cf4SFabien Sanglard           << sequence_number;
29*3f982cf4SFabien Sanglard       it->second(ReceiverMessage{reply_type, sequence_number});
30*3f982cf4SFabien Sanglard       replies->erase(it);
31*3f982cf4SFabien Sanglard       break;
32*3f982cf4SFabien Sanglard     }
33*3f982cf4SFabien Sanglard   }
34*3f982cf4SFabien Sanglard }
35*3f982cf4SFabien Sanglard 
36*3f982cf4SFabien Sanglard }  // namespace
37*3f982cf4SFabien Sanglard 
SessionMessenger(MessagePort * message_port,std::string source_id,ErrorCallback cb)38*3f982cf4SFabien Sanglard SessionMessenger::SessionMessenger(MessagePort* message_port,
39*3f982cf4SFabien Sanglard                                    std::string source_id,
40*3f982cf4SFabien Sanglard                                    ErrorCallback cb)
41*3f982cf4SFabien Sanglard     : message_port_(message_port), error_callback_(std::move(cb)) {
42*3f982cf4SFabien Sanglard   OSP_DCHECK(message_port_);
43*3f982cf4SFabien Sanglard   OSP_DCHECK(!source_id.empty());
44*3f982cf4SFabien Sanglard   message_port_->SetClient(this, source_id);
45*3f982cf4SFabien Sanglard }
46*3f982cf4SFabien Sanglard 
~SessionMessenger()47*3f982cf4SFabien Sanglard SessionMessenger::~SessionMessenger() {
48*3f982cf4SFabien Sanglard   message_port_->ResetClient();
49*3f982cf4SFabien Sanglard }
50*3f982cf4SFabien Sanglard 
SendMessage(const std::string & destination_id,const std::string & namespace_,const Json::Value & message_root)51*3f982cf4SFabien Sanglard Error SessionMessenger::SendMessage(const std::string& destination_id,
52*3f982cf4SFabien Sanglard                                     const std::string& namespace_,
53*3f982cf4SFabien Sanglard                                     const Json::Value& message_root) {
54*3f982cf4SFabien Sanglard   OSP_DCHECK(namespace_ == kCastRemotingNamespace ||
55*3f982cf4SFabien Sanglard              namespace_ == kCastWebrtcNamespace);
56*3f982cf4SFabien Sanglard   auto body_or_error = json::Stringify(message_root);
57*3f982cf4SFabien Sanglard   if (body_or_error.is_error()) {
58*3f982cf4SFabien Sanglard     return std::move(body_or_error.error());
59*3f982cf4SFabien Sanglard   }
60*3f982cf4SFabien Sanglard   OSP_VLOG << "Sending message: DESTINATION[" << destination_id
61*3f982cf4SFabien Sanglard            << "], NAMESPACE[" << namespace_ << "], BODY:\n"
62*3f982cf4SFabien Sanglard            << body_or_error.value();
63*3f982cf4SFabien Sanglard   message_port_->PostMessage(destination_id, namespace_, body_or_error.value());
64*3f982cf4SFabien Sanglard   return Error::None();
65*3f982cf4SFabien Sanglard }
66*3f982cf4SFabien Sanglard 
ReportError(Error error)67*3f982cf4SFabien Sanglard void SessionMessenger::ReportError(Error error) {
68*3f982cf4SFabien Sanglard   error_callback_(std::move(error));
69*3f982cf4SFabien Sanglard }
70*3f982cf4SFabien Sanglard 
SenderSessionMessenger(MessagePort * message_port,std::string source_id,std::string receiver_id,ErrorCallback cb,TaskRunner * task_runner)71*3f982cf4SFabien Sanglard SenderSessionMessenger::SenderSessionMessenger(MessagePort* message_port,
72*3f982cf4SFabien Sanglard                                                std::string source_id,
73*3f982cf4SFabien Sanglard                                                std::string receiver_id,
74*3f982cf4SFabien Sanglard                                                ErrorCallback cb,
75*3f982cf4SFabien Sanglard                                                TaskRunner* task_runner)
76*3f982cf4SFabien Sanglard     : SessionMessenger(message_port, std::move(source_id), std::move(cb)),
77*3f982cf4SFabien Sanglard       task_runner_(task_runner),
78*3f982cf4SFabien Sanglard       receiver_id_(std::move(receiver_id)) {}
79*3f982cf4SFabien Sanglard 
SetHandler(ReceiverMessage::Type type,ReplyCallback cb)80*3f982cf4SFabien Sanglard void SenderSessionMessenger::SetHandler(ReceiverMessage::Type type,
81*3f982cf4SFabien Sanglard                                         ReplyCallback cb) {
82*3f982cf4SFabien Sanglard   // Currently the only handler allowed is for RPC messages.
83*3f982cf4SFabien Sanglard   OSP_DCHECK(type == ReceiverMessage::Type::kRpc);
84*3f982cf4SFabien Sanglard   rpc_callback_ = std::move(cb);
85*3f982cf4SFabien Sanglard }
86*3f982cf4SFabien Sanglard 
SendOutboundMessage(SenderMessage message)87*3f982cf4SFabien Sanglard Error SenderSessionMessenger::SendOutboundMessage(SenderMessage message) {
88*3f982cf4SFabien Sanglard   const auto namespace_ = (message.type == SenderMessage::Type::kRpc)
89*3f982cf4SFabien Sanglard                               ? kCastRemotingNamespace
90*3f982cf4SFabien Sanglard                               : kCastWebrtcNamespace;
91*3f982cf4SFabien Sanglard 
92*3f982cf4SFabien Sanglard   ErrorOr<Json::Value> jsonified = message.ToJson();
93*3f982cf4SFabien Sanglard   OSP_CHECK(jsonified.is_value()) << "Tried to send an invalid message";
94*3f982cf4SFabien Sanglard   return SessionMessenger::SendMessage(receiver_id_, namespace_,
95*3f982cf4SFabien Sanglard                                        jsonified.value());
96*3f982cf4SFabien Sanglard }
97*3f982cf4SFabien Sanglard 
SendRequest(SenderMessage message,ReceiverMessage::Type reply_type,ReplyCallback cb)98*3f982cf4SFabien Sanglard Error SenderSessionMessenger::SendRequest(SenderMessage message,
99*3f982cf4SFabien Sanglard                                           ReceiverMessage::Type reply_type,
100*3f982cf4SFabien Sanglard                                           ReplyCallback cb) {
101*3f982cf4SFabien Sanglard   static constexpr std::chrono::milliseconds kReplyTimeout{4000};
102*3f982cf4SFabien Sanglard   // RPC messages are not meant to be request/reply.
103*3f982cf4SFabien Sanglard   OSP_DCHECK(reply_type != ReceiverMessage::Type::kRpc);
104*3f982cf4SFabien Sanglard 
105*3f982cf4SFabien Sanglard   const Error error = SendOutboundMessage(message);
106*3f982cf4SFabien Sanglard   if (!error.ok()) {
107*3f982cf4SFabien Sanglard     return error;
108*3f982cf4SFabien Sanglard   }
109*3f982cf4SFabien Sanglard 
110*3f982cf4SFabien Sanglard   OSP_DCHECK(awaiting_replies_.find(message.sequence_number) ==
111*3f982cf4SFabien Sanglard              awaiting_replies_.end());
112*3f982cf4SFabien Sanglard   awaiting_replies_.emplace_back(message.sequence_number, std::move(cb));
113*3f982cf4SFabien Sanglard   task_runner_->PostTaskWithDelay(
114*3f982cf4SFabien Sanglard       [self = weak_factory_.GetWeakPtr(), reply_type,
115*3f982cf4SFabien Sanglard        seq_num = message.sequence_number] {
116*3f982cf4SFabien Sanglard         if (self) {
117*3f982cf4SFabien Sanglard           ReplyIfTimedOut(seq_num, reply_type, &self->awaiting_replies_);
118*3f982cf4SFabien Sanglard         }
119*3f982cf4SFabien Sanglard       },
120*3f982cf4SFabien Sanglard       kReplyTimeout);
121*3f982cf4SFabien Sanglard 
122*3f982cf4SFabien Sanglard   return Error::None();
123*3f982cf4SFabien Sanglard }
124*3f982cf4SFabien Sanglard 
OnMessage(const std::string & source_id,const std::string & message_namespace,const std::string & message)125*3f982cf4SFabien Sanglard void SenderSessionMessenger::OnMessage(const std::string& source_id,
126*3f982cf4SFabien Sanglard                                        const std::string& message_namespace,
127*3f982cf4SFabien Sanglard                                        const std::string& message) {
128*3f982cf4SFabien Sanglard   if (source_id != receiver_id_) {
129*3f982cf4SFabien Sanglard     OSP_DLOG_WARN << "Received message from unknown/incorrect Cast Receiver, "
130*3f982cf4SFabien Sanglard                      "expected id \""
131*3f982cf4SFabien Sanglard                   << receiver_id_ << "\", got \"" << source_id << "\"";
132*3f982cf4SFabien Sanglard     return;
133*3f982cf4SFabien Sanglard   }
134*3f982cf4SFabien Sanglard 
135*3f982cf4SFabien Sanglard   if (message_namespace != kCastWebrtcNamespace &&
136*3f982cf4SFabien Sanglard       message_namespace != kCastRemotingNamespace) {
137*3f982cf4SFabien Sanglard     OSP_DLOG_WARN << "Received message from unknown namespace: "
138*3f982cf4SFabien Sanglard                   << message_namespace;
139*3f982cf4SFabien Sanglard     return;
140*3f982cf4SFabien Sanglard   }
141*3f982cf4SFabien Sanglard 
142*3f982cf4SFabien Sanglard   ErrorOr<Json::Value> message_body = json::Parse(message);
143*3f982cf4SFabien Sanglard   if (!message_body) {
144*3f982cf4SFabien Sanglard     ReportError(message_body.error());
145*3f982cf4SFabien Sanglard     OSP_DLOG_WARN << "Received an invalid message: " << message;
146*3f982cf4SFabien Sanglard     return;
147*3f982cf4SFabien Sanglard   }
148*3f982cf4SFabien Sanglard 
149*3f982cf4SFabien Sanglard   // If the message is valid JSON and we don't understand it, there are two
150*3f982cf4SFabien Sanglard   // options: (1) it's an unknown type, or (2) the receiver filled out the
151*3f982cf4SFabien Sanglard   // message incorrectly. In the first case we can drop it, it's likely just
152*3f982cf4SFabien Sanglard   // unsupported. In the second case we might need it, so worth warning the
153*3f982cf4SFabien Sanglard   // client.
154*3f982cf4SFabien Sanglard   ErrorOr<ReceiverMessage> receiver_message =
155*3f982cf4SFabien Sanglard       ReceiverMessage::Parse(message_body.value());
156*3f982cf4SFabien Sanglard   if (receiver_message.is_error()) {
157*3f982cf4SFabien Sanglard     ReportError(receiver_message.error());
158*3f982cf4SFabien Sanglard     OSP_DLOG_WARN << "Received an invalid receiver message: "
159*3f982cf4SFabien Sanglard                   << receiver_message.error();
160*3f982cf4SFabien Sanglard   }
161*3f982cf4SFabien Sanglard 
162*3f982cf4SFabien Sanglard   if (receiver_message.value().type == ReceiverMessage::Type::kRpc) {
163*3f982cf4SFabien Sanglard     if (rpc_callback_) {
164*3f982cf4SFabien Sanglard       rpc_callback_(receiver_message.value({}));
165*3f982cf4SFabien Sanglard     } else {
166*3f982cf4SFabien Sanglard       OSP_DLOG_INFO << "Received RTP message but no callback, dropping";
167*3f982cf4SFabien Sanglard     }
168*3f982cf4SFabien Sanglard   } else {
169*3f982cf4SFabien Sanglard     int sequence_number;
170*3f982cf4SFabien Sanglard     if (!json::TryParseInt(message_body.value()[kSequenceNumber],
171*3f982cf4SFabien Sanglard                            &sequence_number)) {
172*3f982cf4SFabien Sanglard       OSP_DLOG_WARN << "Received a message without a sequence number";
173*3f982cf4SFabien Sanglard       return;
174*3f982cf4SFabien Sanglard     }
175*3f982cf4SFabien Sanglard 
176*3f982cf4SFabien Sanglard     auto it = awaiting_replies_.find(sequence_number);
177*3f982cf4SFabien Sanglard     if (it == awaiting_replies_.end()) {
178*3f982cf4SFabien Sanglard       OSP_DLOG_WARN << "Received a reply I wasn't waiting for: "
179*3f982cf4SFabien Sanglard                     << sequence_number;
180*3f982cf4SFabien Sanglard       return;
181*3f982cf4SFabien Sanglard     }
182*3f982cf4SFabien Sanglard 
183*3f982cf4SFabien Sanglard     it->second(std::move(receiver_message.value({})));
184*3f982cf4SFabien Sanglard 
185*3f982cf4SFabien Sanglard     // Calling the function callback may result in the checksum of the pointed
186*3f982cf4SFabien Sanglard     // to object to change, so calling erase() on the iterator after executing
187*3f982cf4SFabien Sanglard     // second() may result in a segfault.
188*3f982cf4SFabien Sanglard     awaiting_replies_.erase_key(sequence_number);
189*3f982cf4SFabien Sanglard   }
190*3f982cf4SFabien Sanglard }
191*3f982cf4SFabien Sanglard 
OnError(Error error)192*3f982cf4SFabien Sanglard void SenderSessionMessenger::OnError(Error error) {
193*3f982cf4SFabien Sanglard   OSP_DLOG_WARN << "Received an error in the session messenger: " << error;
194*3f982cf4SFabien Sanglard }
195*3f982cf4SFabien Sanglard 
ReceiverSessionMessenger(MessagePort * message_port,std::string source_id,ErrorCallback cb)196*3f982cf4SFabien Sanglard ReceiverSessionMessenger::ReceiverSessionMessenger(MessagePort* message_port,
197*3f982cf4SFabien Sanglard                                                    std::string source_id,
198*3f982cf4SFabien Sanglard                                                    ErrorCallback cb)
199*3f982cf4SFabien Sanglard     : SessionMessenger(message_port, std::move(source_id), std::move(cb)) {}
200*3f982cf4SFabien Sanglard 
SetHandler(SenderMessage::Type type,RequestCallback cb)201*3f982cf4SFabien Sanglard void ReceiverSessionMessenger::SetHandler(SenderMessage::Type type,
202*3f982cf4SFabien Sanglard                                           RequestCallback cb) {
203*3f982cf4SFabien Sanglard   OSP_DCHECK(callbacks_.find(type) == callbacks_.end());
204*3f982cf4SFabien Sanglard   callbacks_.emplace_back(type, std::move(cb));
205*3f982cf4SFabien Sanglard }
206*3f982cf4SFabien Sanglard 
SendMessage(ReceiverMessage message)207*3f982cf4SFabien Sanglard Error ReceiverSessionMessenger::SendMessage(ReceiverMessage message) {
208*3f982cf4SFabien Sanglard   if (sender_session_id_.empty()) {
209*3f982cf4SFabien Sanglard     return Error(Error::Code::kInitializationFailure,
210*3f982cf4SFabien Sanglard                  "Tried to send a message without receiving one first");
211*3f982cf4SFabien Sanglard   }
212*3f982cf4SFabien Sanglard 
213*3f982cf4SFabien Sanglard   const auto namespace_ = (message.type == ReceiverMessage::Type::kRpc)
214*3f982cf4SFabien Sanglard                               ? kCastRemotingNamespace
215*3f982cf4SFabien Sanglard                               : kCastWebrtcNamespace;
216*3f982cf4SFabien Sanglard 
217*3f982cf4SFabien Sanglard   ErrorOr<Json::Value> message_json = message.ToJson();
218*3f982cf4SFabien Sanglard   OSP_CHECK(message_json.is_value()) << "Tried to send an invalid message";
219*3f982cf4SFabien Sanglard   return SessionMessenger::SendMessage(sender_session_id_, namespace_,
220*3f982cf4SFabien Sanglard                                        message_json.value());
221*3f982cf4SFabien Sanglard }
222*3f982cf4SFabien Sanglard 
OnMessage(const std::string & source_id,const std::string & message_namespace,const std::string & message)223*3f982cf4SFabien Sanglard void ReceiverSessionMessenger::OnMessage(const std::string& source_id,
224*3f982cf4SFabien Sanglard                                          const std::string& message_namespace,
225*3f982cf4SFabien Sanglard                                          const std::string& message) {
226*3f982cf4SFabien Sanglard   // We assume we are connected to the first sender_id we receive.
227*3f982cf4SFabien Sanglard   if (sender_session_id_.empty()) {
228*3f982cf4SFabien Sanglard     sender_session_id_ = source_id;
229*3f982cf4SFabien Sanglard   } else if (source_id != sender_session_id_) {
230*3f982cf4SFabien Sanglard     OSP_DLOG_WARN << "Received message from unknown/incorrect sender, expected "
231*3f982cf4SFabien Sanglard                      "id \""
232*3f982cf4SFabien Sanglard                   << sender_session_id_ << "\", got \"" << source_id << "\"";
233*3f982cf4SFabien Sanglard     return;
234*3f982cf4SFabien Sanglard   }
235*3f982cf4SFabien Sanglard 
236*3f982cf4SFabien Sanglard   if (message_namespace != kCastWebrtcNamespace &&
237*3f982cf4SFabien Sanglard       message_namespace != kCastRemotingNamespace) {
238*3f982cf4SFabien Sanglard     OSP_DLOG_WARN << "Received message from unknown namespace: "
239*3f982cf4SFabien Sanglard                   << message_namespace;
240*3f982cf4SFabien Sanglard     return;
241*3f982cf4SFabien Sanglard   }
242*3f982cf4SFabien Sanglard 
243*3f982cf4SFabien Sanglard   // If the message is bad JSON, the sender is in a funky state so we
244*3f982cf4SFabien Sanglard   // report an error.
245*3f982cf4SFabien Sanglard   ErrorOr<Json::Value> message_body = json::Parse(message);
246*3f982cf4SFabien Sanglard   if (message_body.is_error()) {
247*3f982cf4SFabien Sanglard     ReportError(message_body.error());
248*3f982cf4SFabien Sanglard     return;
249*3f982cf4SFabien Sanglard   }
250*3f982cf4SFabien Sanglard 
251*3f982cf4SFabien Sanglard   // If the message is valid JSON and we don't understand it, there are two
252*3f982cf4SFabien Sanglard   // options: (1) it's an unknown type, or (2) the sender filled out the message
253*3f982cf4SFabien Sanglard   // incorrectly. In the first case we can drop it, it's likely just
254*3f982cf4SFabien Sanglard   // unsupported. In the second case we might need it, so worth warning the
255*3f982cf4SFabien Sanglard   // client.
256*3f982cf4SFabien Sanglard   ErrorOr<SenderMessage> sender_message =
257*3f982cf4SFabien Sanglard       SenderMessage::Parse(message_body.value());
258*3f982cf4SFabien Sanglard   if (sender_message.is_error()) {
259*3f982cf4SFabien Sanglard     ReportError(sender_message.error());
260*3f982cf4SFabien Sanglard     OSP_DLOG_WARN << "Received an invalid sender message: "
261*3f982cf4SFabien Sanglard                   << sender_message.error();
262*3f982cf4SFabien Sanglard     return;
263*3f982cf4SFabien Sanglard   }
264*3f982cf4SFabien Sanglard 
265*3f982cf4SFabien Sanglard   auto it = callbacks_.find(sender_message.value().type);
266*3f982cf4SFabien Sanglard   if (it == callbacks_.end()) {
267*3f982cf4SFabien Sanglard     OSP_DLOG_INFO << "Received message without a callback, dropping";
268*3f982cf4SFabien Sanglard   } else {
269*3f982cf4SFabien Sanglard     it->second(sender_message.value());
270*3f982cf4SFabien Sanglard   }
271*3f982cf4SFabien Sanglard }
272*3f982cf4SFabien Sanglard 
OnError(Error error)273*3f982cf4SFabien Sanglard void ReceiverSessionMessenger::OnError(Error error) {
274*3f982cf4SFabien Sanglard   OSP_DLOG_WARN << "Received an error in the session messenger: " << error;
275*3f982cf4SFabien Sanglard }
276*3f982cf4SFabien Sanglard 
277*3f982cf4SFabien Sanglard }  // namespace cast
278*3f982cf4SFabien Sanglard }  // namespace openscreen
279