xref: /aosp_15_r20/external/openscreen/cast/standalone_sender/looping_file_cast_agent.cc (revision 3f982cf4871df8771c9d4abe6e9a6f8d829b2736)
1 // Copyright 2020 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "cast/standalone_sender/looping_file_cast_agent.h"
6 
7 #include <string>
8 #include <utility>
9 #include <vector>
10 
11 #include "cast/common/channel/message_util.h"
12 #include "cast/standalone_sender/looping_file_sender.h"
13 #include "cast/streaming/capture_recommendations.h"
14 #include "cast/streaming/constants.h"
15 #include "cast/streaming/offer_messages.h"
16 #include "json/value.h"
17 #include "platform/api/tls_connection_factory.h"
18 #include "util/json/json_helpers.h"
19 #include "util/stringprintf.h"
20 #include "util/trace_logging.h"
21 
22 namespace openscreen {
23 namespace cast {
24 namespace {
25 
26 using DeviceMediaPolicy = SenderSocketFactory::DeviceMediaPolicy;
27 
28 }  // namespace
29 
LoopingFileCastAgent(TaskRunner * task_runner,ShutdownCallback shutdown_callback)30 LoopingFileCastAgent::LoopingFileCastAgent(TaskRunner* task_runner,
31                                            ShutdownCallback shutdown_callback)
32     : task_runner_(task_runner),
33       shutdown_callback_(std::move(shutdown_callback)),
34       connection_handler_(&router_, this),
35       socket_factory_(this, task_runner_),
36       connection_factory_(
37           TlsConnectionFactory::CreateFactory(&socket_factory_, task_runner_)),
38       message_port_(&router_) {
39   router_.AddHandlerForLocalId(kPlatformSenderId, this);
40   socket_factory_.set_factory(connection_factory_.get());
41 }
42 
~LoopingFileCastAgent()43 LoopingFileCastAgent::~LoopingFileCastAgent() {
44   Shutdown();
45 }
46 
Connect(ConnectionSettings settings)47 void LoopingFileCastAgent::Connect(ConnectionSettings settings) {
48   TRACE_DEFAULT_SCOPED(TraceCategory::kStandaloneSender);
49 
50   OSP_DCHECK(!connection_settings_);
51   connection_settings_ = std::move(settings);
52   const auto policy = connection_settings_->should_include_video
53                           ? DeviceMediaPolicy::kIncludesVideo
54                           : DeviceMediaPolicy::kAudioOnly;
55 
56   task_runner_->PostTask([this, policy] {
57     wake_lock_ = ScopedWakeLock::Create(task_runner_);
58     socket_factory_.Connect(connection_settings_->receiver_endpoint, policy,
59                             &router_);
60   });
61 }
62 
OnConnected(SenderSocketFactory * factory,const IPEndpoint & endpoint,std::unique_ptr<CastSocket> socket)63 void LoopingFileCastAgent::OnConnected(SenderSocketFactory* factory,
64                                        const IPEndpoint& endpoint,
65                                        std::unique_ptr<CastSocket> socket) {
66   TRACE_DEFAULT_SCOPED(TraceCategory::kStandaloneSender);
67 
68   if (message_port_.GetSocketId() != ToCastSocketId(nullptr)) {
69     OSP_LOG_WARN << "Already connected, dropping peer at: " << endpoint;
70     return;
71   }
72   message_port_.SetSocket(socket->GetWeakPtr());
73   router_.TakeSocket(this, std::move(socket));
74 
75   OSP_LOG_INFO << "Launching Mirroring App on the Cast Receiver...";
76   static constexpr char kLaunchMessageTemplate[] =
77       R"({"type":"LAUNCH", "requestId":%d, "appId":"%s"})";
78   router_.Send(VirtualConnection{kPlatformSenderId, kPlatformReceiverId,
79                                  message_port_.GetSocketId()},
80                MakeSimpleUTF8Message(
81                    kReceiverNamespace,
82                    StringPrintf(kLaunchMessageTemplate, next_request_id_++,
83                                 GetMirroringAppId())));
84 }
85 
OnError(SenderSocketFactory * factory,const IPEndpoint & endpoint,Error error)86 void LoopingFileCastAgent::OnError(SenderSocketFactory* factory,
87                                    const IPEndpoint& endpoint,
88                                    Error error) {
89   OSP_LOG_ERROR << "Cast agent received socket factory error: " << error;
90   Shutdown();
91 }
92 
OnClose(CastSocket * cast_socket)93 void LoopingFileCastAgent::OnClose(CastSocket* cast_socket) {
94   OSP_VLOG << "Cast agent socket closed.";
95   Shutdown();
96 }
97 
OnError(CastSocket * socket,Error error)98 void LoopingFileCastAgent::OnError(CastSocket* socket, Error error) {
99   OSP_LOG_ERROR << "Cast agent received socket error: " << error;
100   Shutdown();
101 }
102 
IsConnectionAllowed(const VirtualConnection & virtual_conn) const103 bool LoopingFileCastAgent::IsConnectionAllowed(
104     const VirtualConnection& virtual_conn) const {
105   return true;
106 }
107 
OnMessage(VirtualConnectionRouter * router,CastSocket * socket,::cast::channel::CastMessage message)108 void LoopingFileCastAgent::OnMessage(VirtualConnectionRouter* router,
109                                      CastSocket* socket,
110                                      ::cast::channel::CastMessage message) {
111   if (message_port_.GetSocketId() == ToCastSocketId(socket) &&
112       !message_port_.client_sender_id().empty() &&
113       message_port_.client_sender_id() == message.destination_id()) {
114     OSP_DCHECK(message_port_.client_sender_id() != kPlatformSenderId);
115     message_port_.OnMessage(router, socket, std::move(message));
116     return;
117   }
118 
119   if (message.destination_id() != kPlatformSenderId &&
120       message.destination_id() != kBroadcastId) {
121     return;  // Message not for us.
122   }
123 
124   if (message.namespace_() == kReceiverNamespace &&
125       message_port_.GetSocketId() == ToCastSocketId(socket)) {
126     const ErrorOr<Json::Value> payload = json::Parse(message.payload_utf8());
127     if (payload.is_error()) {
128       OSP_LOG_ERROR << "Failed to parse message: " << payload.error();
129     }
130 
131     if (HasType(payload.value(), CastMessageType::kReceiverStatus)) {
132       HandleReceiverStatus(payload.value());
133     } else if (HasType(payload.value(), CastMessageType::kLaunchError)) {
134       std::string reason;
135       if (!json::TryParseString(payload.value()[kMessageKeyReason], &reason)) {
136         reason = "UNKNOWN";
137       }
138       OSP_LOG_ERROR
139           << "Failed to launch the Cast Mirroring App on the Receiver! Reason: "
140           << reason;
141       Shutdown();
142     } else if (HasType(payload.value(), CastMessageType::kInvalidRequest)) {
143       std::string reason;
144       if (!json::TryParseString(payload.value()[kMessageKeyReason], &reason)) {
145         reason = "UNKNOWN";
146       }
147       OSP_LOG_ERROR << "Cast Receiver thinks our request is invalid: "
148                     << reason;
149     }
150   }
151 }
152 
GetMirroringAppId() const153 const char* LoopingFileCastAgent::GetMirroringAppId() const {
154   if (connection_settings_ && !connection_settings_->should_include_video) {
155     return kMirroringAudioOnlyAppId;
156   }
157   return kMirroringAppId;
158 }
159 
HandleReceiverStatus(const Json::Value & status)160 void LoopingFileCastAgent::HandleReceiverStatus(const Json::Value& status) {
161   const Json::Value& details =
162       (status[kMessageKeyStatus].isObject() &&
163        status[kMessageKeyStatus][kMessageKeyApplications].isArray())
164           ? status[kMessageKeyStatus][kMessageKeyApplications][0]
165           : Json::Value();
166 
167   std::string running_app_id;
168   if (!json::TryParseString(details[kMessageKeyAppId], &running_app_id) ||
169       running_app_id != GetMirroringAppId()) {
170     // The mirroring app is not running. If it was just stopped, Shutdown() will
171     // tear everything down. If it has been stopped already, Shutdown() is a
172     // no-op.
173     Shutdown();
174     return;
175   }
176 
177   std::string session_id;
178   if (!json::TryParseString(details[kMessageKeySessionId], &session_id) ||
179       session_id.empty()) {
180     OSP_LOG_ERROR
181         << "Cannot continue: Cast Receiver did not provide a session ID for "
182            "the Mirroring App running on it.";
183     Shutdown();
184     return;
185   }
186   if (app_session_id_ != session_id) {
187     if (app_session_id_.empty()) {
188       app_session_id_ = session_id;
189     } else {
190       OSP_LOG_ERROR << "Cannot continue: Different Mirroring App session is "
191                        "now running on the Cast Receiver.";
192       Shutdown();
193       return;
194     }
195   }
196 
197   if (remote_connection_) {
198     // The mirroring app is running and this LoopingFileCastAgent is already
199     // streaming to it (or is awaiting message routing to be established). There
200     // are no additional actions to be taken in response to this extra
201     // RECEIVER_STATUS message.
202     return;
203   }
204 
205   std::string message_destination_id;
206   if (!json::TryParseString(details[kMessageKeyTransportId],
207                             &message_destination_id) ||
208       message_destination_id.empty()) {
209     OSP_LOG_ERROR
210         << "Cannot continue: Cast Receiver did not provide a transport ID for "
211            "routing messages to the Mirroring App running on it.";
212     Shutdown();
213     return;
214   }
215 
216   remote_connection_.emplace(
217       VirtualConnection{MakeUniqueSessionId("streaming_sender"),
218                         message_destination_id, message_port_.GetSocketId()});
219   OSP_LOG_INFO << "Starting-up message routing to the Cast Receiver's "
220                   "Mirroring App (sessionId="
221                << app_session_id_ << ")...";
222   connection_handler_.OpenRemoteConnection(
223       *remote_connection_,
224       [this](bool success) { OnRemoteMessagingOpened(success); });
225 }
226 
OnRemoteMessagingOpened(bool success)227 void LoopingFileCastAgent::OnRemoteMessagingOpened(bool success) {
228   if (!remote_connection_) {
229     return;  // Shutdown() was called in the meantime.
230   }
231 
232   if (success) {
233     OSP_LOG_INFO << "Starting streaming session...";
234     CreateAndStartSession();
235   } else {
236     OSP_LOG_INFO << "Failed to establish messaging to the Cast Receiver's "
237                     "Mirroring App. Perhaps another Cast Sender is using it?";
238     Shutdown();
239   }
240 }
241 
CreateAndStartSession()242 void LoopingFileCastAgent::CreateAndStartSession() {
243   TRACE_DEFAULT_SCOPED(TraceCategory::kStandaloneSender);
244 
245   OSP_DCHECK(remote_connection_.has_value());
246   environment_ =
247       std::make_unique<Environment>(&Clock::now, task_runner_, IPEndpoint{});
248 
249   SenderSession::Configuration config{
250       connection_settings_->receiver_endpoint.address,
251       this,
252       environment_.get(),
253       &message_port_,
254       remote_connection_->local_id,
255       remote_connection_->peer_id,
256       connection_settings_->use_android_rtp_hack};
257   current_session_ = std::make_unique<SenderSession>(std::move(config));
258   OSP_DCHECK(!message_port_.client_sender_id().empty());
259 
260   AudioCaptureConfig audio_config;
261   // Opus does best at 192kbps, so we cap that here.
262   audio_config.bit_rate = 192 * 1000;
263   VideoCaptureConfig video_config = {
264       .codec = connection_settings_->codec,
265       // The video config is allowed to use whatever is left over after audio.
266       .max_bit_rate =
267           connection_settings_->max_bitrate - audio_config.bit_rate};
268   // Use default display resolution of 1080P.
269   video_config.resolutions.emplace_back(Resolution{1920, 1080});
270 
271   OSP_VLOG << "Starting session negotiation.";
272   Error negotiation_error;
273   if (connection_settings_->use_remoting) {
274     remoting_sender_ = std::make_unique<RemotingSender>(
275         current_session_->rpc_messenger(), AudioCodec::kOpus,
276         connection_settings_->codec, this);
277 
278     negotiation_error =
279         current_session_->NegotiateRemoting(audio_config, video_config);
280   } else {
281     negotiation_error =
282         current_session_->Negotiate({audio_config}, {video_config});
283   }
284   if (!negotiation_error.ok()) {
285     OSP_LOG_ERROR << "Failed to negotiate a session: " << negotiation_error;
286   }
287 }
288 
OnNegotiated(const SenderSession * session,SenderSession::ConfiguredSenders senders,capture_recommendations::Recommendations capture_recommendations)289 void LoopingFileCastAgent::OnNegotiated(
290     const SenderSession* session,
291     SenderSession::ConfiguredSenders senders,
292     capture_recommendations::Recommendations capture_recommendations) {
293   if (senders.audio_sender == nullptr || senders.video_sender == nullptr) {
294     OSP_LOG_ERROR << "Missing both audio and video, so exiting...";
295     return;
296   }
297 
298   file_sender_ = std::make_unique<LoopingFileSender>(
299       environment_.get(), connection_settings_.value(), session,
300       std::move(senders), [this]() { shutdown_callback_(); });
301 }
302 
OnRemotingNegotiated(const SenderSession * session,SenderSession::RemotingNegotiation negotiation)303 void LoopingFileCastAgent::OnRemotingNegotiated(
304     const SenderSession* session,
305     SenderSession::RemotingNegotiation negotiation) {
306   if (negotiation.senders.audio_sender == nullptr &&
307       negotiation.senders.video_sender == nullptr) {
308     OSP_LOG_ERROR << "Missing both audio and video, so exiting...";
309     return;
310   }
311 
312   current_negotiation_ =
313       std::make_unique<SenderSession::RemotingNegotiation>(negotiation);
314   if (is_ready_for_remoting_) {
315     StartRemotingSenders();
316   }
317 }
318 
OnError(const SenderSession * session,Error error)319 void LoopingFileCastAgent::OnError(const SenderSession* session, Error error) {
320   OSP_LOG_ERROR << "SenderSession fatal error: " << error;
321   Shutdown();
322 }
323 
OnReady()324 void LoopingFileCastAgent::OnReady() {
325   is_ready_for_remoting_ = true;
326   if (current_negotiation_) {
327     StartRemotingSenders();
328   }
329 }
330 
OnPlaybackRateChange(double rate)331 void LoopingFileCastAgent::OnPlaybackRateChange(double rate) {
332   file_sender_->SetPlaybackRate(rate);
333 }
334 
StartRemotingSenders()335 void LoopingFileCastAgent::StartRemotingSenders() {
336   OSP_DCHECK(current_negotiation_);
337   file_sender_ = std::make_unique<LoopingFileSender>(
338       environment_.get(), connection_settings_.value(), current_session_.get(),
339       std::move(current_negotiation_->senders),
340       [this]() { shutdown_callback_(); });
341   current_negotiation_.reset();
342   is_ready_for_remoting_ = false;
343 }
344 
Shutdown()345 void LoopingFileCastAgent::Shutdown() {
346   TRACE_DEFAULT_SCOPED(TraceCategory::kStandaloneSender);
347 
348   file_sender_.reset();
349   if (current_session_) {
350     OSP_LOG_INFO << "Stopping mirroring session...";
351     current_session_.reset();
352   }
353   OSP_DCHECK(message_port_.client_sender_id().empty());
354   environment_.reset();
355 
356   if (remote_connection_) {
357     const VirtualConnection connection = *remote_connection_;
358     // Reset |remote_connection_| because ConnectionNamespaceHandler may
359     // call-back into OnRemoteMessagingOpened().
360     remote_connection_.reset();
361     connection_handler_.CloseRemoteConnection(connection);
362   }
363 
364   if (!app_session_id_.empty()) {
365     OSP_LOG_INFO << "Stopping the Cast Receiver's Mirroring App...";
366     static constexpr char kStopMessageTemplate[] =
367         R"({"type":"STOP", "requestId":%d, "sessionId":"%s"})";
368     std::string stop_json = StringPrintf(
369         kStopMessageTemplate, next_request_id_++, app_session_id_.c_str());
370     router_.Send(
371         VirtualConnection{kPlatformSenderId, kPlatformReceiverId,
372                           message_port_.GetSocketId()},
373         MakeSimpleUTF8Message(kReceiverNamespace, std::move(stop_json)));
374     app_session_id_.clear();
375   }
376 
377   if (message_port_.GetSocketId() != ToCastSocketId(nullptr)) {
378     router_.CloseSocket(message_port_.GetSocketId());
379     message_port_.SetSocket({});
380   }
381 
382   wake_lock_.reset();
383 
384   if (shutdown_callback_) {
385     const ShutdownCallback callback = std::move(shutdown_callback_);
386     callback();
387   }
388 }
389 
390 }  // namespace cast
391 }  // namespace openscreen
392