xref: /aosp_15_r20/external/openscreen/cast/streaming/sender_packet_router.cc (revision 3f982cf4871df8771c9d4abe6e9a6f8d829b2736)
1*3f982cf4SFabien Sanglard // Copyright 2020 The Chromium Authors. All rights reserved.
2*3f982cf4SFabien Sanglard // Use of this source code is governed by a BSD-style license that can be
3*3f982cf4SFabien Sanglard // found in the LICENSE file.
4*3f982cf4SFabien Sanglard 
5*3f982cf4SFabien Sanglard #include "cast/streaming/sender_packet_router.h"
6*3f982cf4SFabien Sanglard 
7*3f982cf4SFabien Sanglard #include <algorithm>
8*3f982cf4SFabien Sanglard #include <utility>
9*3f982cf4SFabien Sanglard 
10*3f982cf4SFabien Sanglard #include "cast/streaming/constants.h"
11*3f982cf4SFabien Sanglard #include "cast/streaming/packet_util.h"
12*3f982cf4SFabien Sanglard #include "util/chrono_helpers.h"
13*3f982cf4SFabien Sanglard #include "util/osp_logging.h"
14*3f982cf4SFabien Sanglard #include "util/saturate_cast.h"
15*3f982cf4SFabien Sanglard #include "util/stringprintf.h"
16*3f982cf4SFabien Sanglard 
17*3f982cf4SFabien Sanglard namespace openscreen {
18*3f982cf4SFabien Sanglard namespace cast {
19*3f982cf4SFabien Sanglard 
SenderPacketRouter(Environment * environment,int max_burst_bitrate)20*3f982cf4SFabien Sanglard SenderPacketRouter::SenderPacketRouter(Environment* environment,
21*3f982cf4SFabien Sanglard                                        int max_burst_bitrate)
22*3f982cf4SFabien Sanglard     : SenderPacketRouter(
23*3f982cf4SFabien Sanglard           environment,
24*3f982cf4SFabien Sanglard           ComputeMaxPacketsPerBurst(max_burst_bitrate,
25*3f982cf4SFabien Sanglard                                     environment->GetMaxPacketSize(),
26*3f982cf4SFabien Sanglard                                     kDefaultBurstInterval),
27*3f982cf4SFabien Sanglard           kDefaultBurstInterval) {}
28*3f982cf4SFabien Sanglard 
SenderPacketRouter(Environment * environment,int max_packets_per_burst,milliseconds burst_interval)29*3f982cf4SFabien Sanglard SenderPacketRouter::SenderPacketRouter(Environment* environment,
30*3f982cf4SFabien Sanglard                                        int max_packets_per_burst,
31*3f982cf4SFabien Sanglard                                        milliseconds burst_interval)
32*3f982cf4SFabien Sanglard     : BandwidthEstimator(max_packets_per_burst,
33*3f982cf4SFabien Sanglard                          burst_interval,
34*3f982cf4SFabien Sanglard                          environment->now()),
35*3f982cf4SFabien Sanglard       environment_(environment),
36*3f982cf4SFabien Sanglard       packet_buffer_size_(environment->GetMaxPacketSize()),
37*3f982cf4SFabien Sanglard       packet_buffer_(new uint8_t[packet_buffer_size_]),
38*3f982cf4SFabien Sanglard       max_packets_per_burst_(max_packets_per_burst),
39*3f982cf4SFabien Sanglard       burst_interval_(burst_interval),
40*3f982cf4SFabien Sanglard       max_burst_bitrate_(ComputeMaxBurstBitrate(packet_buffer_size_,
41*3f982cf4SFabien Sanglard                                                 max_packets_per_burst_,
42*3f982cf4SFabien Sanglard                                                 burst_interval_)),
43*3f982cf4SFabien Sanglard       alarm_(environment_->now_function(), environment_->task_runner()) {
44*3f982cf4SFabien Sanglard   OSP_DCHECK(environment_);
45*3f982cf4SFabien Sanglard   OSP_DCHECK_GT(packet_buffer_size_, kRequiredNetworkPacketSize);
46*3f982cf4SFabien Sanglard }
47*3f982cf4SFabien Sanglard 
~SenderPacketRouter()48*3f982cf4SFabien Sanglard SenderPacketRouter::~SenderPacketRouter() {
49*3f982cf4SFabien Sanglard   OSP_DCHECK(senders_.empty());
50*3f982cf4SFabien Sanglard }
51*3f982cf4SFabien Sanglard 
OnSenderCreated(Ssrc receiver_ssrc,Sender * sender)52*3f982cf4SFabien Sanglard void SenderPacketRouter::OnSenderCreated(Ssrc receiver_ssrc, Sender* sender) {
53*3f982cf4SFabien Sanglard   OSP_DCHECK(FindEntry(receiver_ssrc) == senders_.end());
54*3f982cf4SFabien Sanglard   senders_.push_back(SenderEntry{receiver_ssrc, sender, kNever, kNever});
55*3f982cf4SFabien Sanglard 
56*3f982cf4SFabien Sanglard   if (senders_.size() == 1) {
57*3f982cf4SFabien Sanglard     environment_->ConsumeIncomingPackets(this);
58*3f982cf4SFabien Sanglard   } else {
59*3f982cf4SFabien Sanglard     // Sort the list of Senders so that they are iterated in priority order.
60*3f982cf4SFabien Sanglard     std::sort(senders_.begin(), senders_.end());
61*3f982cf4SFabien Sanglard   }
62*3f982cf4SFabien Sanglard }
63*3f982cf4SFabien Sanglard 
OnSenderDestroyed(Ssrc receiver_ssrc)64*3f982cf4SFabien Sanglard void SenderPacketRouter::OnSenderDestroyed(Ssrc receiver_ssrc) {
65*3f982cf4SFabien Sanglard   const auto it = FindEntry(receiver_ssrc);
66*3f982cf4SFabien Sanglard   OSP_DCHECK(it != senders_.end());
67*3f982cf4SFabien Sanglard   senders_.erase(it);
68*3f982cf4SFabien Sanglard 
69*3f982cf4SFabien Sanglard   // If there are no longer any Senders, suspend receiving RTCP packets.
70*3f982cf4SFabien Sanglard   if (senders_.empty()) {
71*3f982cf4SFabien Sanglard     environment_->DropIncomingPackets();
72*3f982cf4SFabien Sanglard   }
73*3f982cf4SFabien Sanglard }
74*3f982cf4SFabien Sanglard 
RequestRtcpSend(Ssrc receiver_ssrc)75*3f982cf4SFabien Sanglard void SenderPacketRouter::RequestRtcpSend(Ssrc receiver_ssrc) {
76*3f982cf4SFabien Sanglard   const auto it = FindEntry(receiver_ssrc);
77*3f982cf4SFabien Sanglard   OSP_DCHECK(it != senders_.end());
78*3f982cf4SFabien Sanglard   it->next_rtcp_send_time = Alarm::kImmediately;
79*3f982cf4SFabien Sanglard   ScheduleNextBurst();
80*3f982cf4SFabien Sanglard }
81*3f982cf4SFabien Sanglard 
RequestRtpSend(Ssrc receiver_ssrc)82*3f982cf4SFabien Sanglard void SenderPacketRouter::RequestRtpSend(Ssrc receiver_ssrc) {
83*3f982cf4SFabien Sanglard   const auto it = FindEntry(receiver_ssrc);
84*3f982cf4SFabien Sanglard   OSP_DCHECK(it != senders_.end());
85*3f982cf4SFabien Sanglard   it->next_rtp_send_time = Alarm::kImmediately;
86*3f982cf4SFabien Sanglard   ScheduleNextBurst();
87*3f982cf4SFabien Sanglard }
88*3f982cf4SFabien Sanglard 
OnReceivedPacket(const IPEndpoint & source,Clock::time_point arrival_time,std::vector<uint8_t> packet)89*3f982cf4SFabien Sanglard void SenderPacketRouter::OnReceivedPacket(const IPEndpoint& source,
90*3f982cf4SFabien Sanglard                                           Clock::time_point arrival_time,
91*3f982cf4SFabien Sanglard                                           std::vector<uint8_t> packet) {
92*3f982cf4SFabien Sanglard   // If the packet did not come from the expected endpoint, ignore it.
93*3f982cf4SFabien Sanglard   OSP_DCHECK_NE(source.port, uint16_t{0});
94*3f982cf4SFabien Sanglard   if (source != environment_->remote_endpoint()) {
95*3f982cf4SFabien Sanglard     return;
96*3f982cf4SFabien Sanglard   }
97*3f982cf4SFabien Sanglard 
98*3f982cf4SFabien Sanglard   // Determine which Sender to dispatch the packet to. Senders may only receive
99*3f982cf4SFabien Sanglard   // RTCP packets from Receivers. Log a warning containing a pretty-printed dump
100*3f982cf4SFabien Sanglard   // if the packet is not an RTCP packet.
101*3f982cf4SFabien Sanglard   const std::pair<ApparentPacketType, Ssrc> seems_like =
102*3f982cf4SFabien Sanglard       InspectPacketForRouting(packet);
103*3f982cf4SFabien Sanglard   if (seems_like.first != ApparentPacketType::RTCP) {
104*3f982cf4SFabien Sanglard     constexpr int kMaxPartiaHexDumpSize = 96;
105*3f982cf4SFabien Sanglard     const std::size_t encode_size =
106*3f982cf4SFabien Sanglard         std::min(packet.size(), static_cast<size_t>(kMaxPartiaHexDumpSize));
107*3f982cf4SFabien Sanglard     OSP_LOG_WARN << "UNKNOWN packet of " << packet.size()
108*3f982cf4SFabien Sanglard                  << " bytes. Partial hex dump: "
109*3f982cf4SFabien Sanglard                  << HexEncode(packet.data(), encode_size);
110*3f982cf4SFabien Sanglard     return;
111*3f982cf4SFabien Sanglard   }
112*3f982cf4SFabien Sanglard   const auto it = FindEntry(seems_like.second);
113*3f982cf4SFabien Sanglard   if (it != senders_.end()) {
114*3f982cf4SFabien Sanglard     it->sender->OnReceivedRtcpPacket(arrival_time, std::move(packet));
115*3f982cf4SFabien Sanglard   }
116*3f982cf4SFabien Sanglard }
117*3f982cf4SFabien Sanglard 
FindEntry(Ssrc receiver_ssrc)118*3f982cf4SFabien Sanglard SenderPacketRouter::SenderEntries::iterator SenderPacketRouter::FindEntry(
119*3f982cf4SFabien Sanglard     Ssrc receiver_ssrc) {
120*3f982cf4SFabien Sanglard   return std::find_if(senders_.begin(), senders_.end(),
121*3f982cf4SFabien Sanglard                       [receiver_ssrc](const SenderEntry& entry) {
122*3f982cf4SFabien Sanglard                         return entry.receiver_ssrc == receiver_ssrc;
123*3f982cf4SFabien Sanglard                       });
124*3f982cf4SFabien Sanglard }
125*3f982cf4SFabien Sanglard 
ScheduleNextBurst()126*3f982cf4SFabien Sanglard void SenderPacketRouter::ScheduleNextBurst() {
127*3f982cf4SFabien Sanglard   // Determine the next burst time by scanning for the earliest of the
128*3f982cf4SFabien Sanglard   // next-scheduled send times for each Sender.
129*3f982cf4SFabien Sanglard   const Clock::time_point earliest_allowed_burst_time =
130*3f982cf4SFabien Sanglard       last_burst_time_ + burst_interval_;
131*3f982cf4SFabien Sanglard   Clock::time_point next_burst_time = kNever;
132*3f982cf4SFabien Sanglard   for (const SenderEntry& entry : senders_) {
133*3f982cf4SFabien Sanglard     const auto next_send_time =
134*3f982cf4SFabien Sanglard         std::min(entry.next_rtcp_send_time, entry.next_rtp_send_time);
135*3f982cf4SFabien Sanglard     if (next_send_time >= next_burst_time) {
136*3f982cf4SFabien Sanglard       continue;
137*3f982cf4SFabien Sanglard     }
138*3f982cf4SFabien Sanglard     if (next_send_time <= earliest_allowed_burst_time) {
139*3f982cf4SFabien Sanglard       next_burst_time = earliest_allowed_burst_time;
140*3f982cf4SFabien Sanglard       // No need to continue, since |next_burst_time| cannot become any earlier.
141*3f982cf4SFabien Sanglard       break;
142*3f982cf4SFabien Sanglard     }
143*3f982cf4SFabien Sanglard     next_burst_time = next_send_time;
144*3f982cf4SFabien Sanglard   }
145*3f982cf4SFabien Sanglard 
146*3f982cf4SFabien Sanglard   // Schedule the alarm for the next burst time unless none of the Senders has
147*3f982cf4SFabien Sanglard   // anything to send.
148*3f982cf4SFabien Sanglard   if (next_burst_time == kNever) {
149*3f982cf4SFabien Sanglard     alarm_.Cancel();
150*3f982cf4SFabien Sanglard   } else {
151*3f982cf4SFabien Sanglard     alarm_.Schedule([this] { SendBurstOfPackets(); }, next_burst_time);
152*3f982cf4SFabien Sanglard   }
153*3f982cf4SFabien Sanglard }
154*3f982cf4SFabien Sanglard 
SendBurstOfPackets()155*3f982cf4SFabien Sanglard void SenderPacketRouter::SendBurstOfPackets() {
156*3f982cf4SFabien Sanglard   // Treat RTCP packets as "critical priority," and so there is no upper limit
157*3f982cf4SFabien Sanglard   // on the number to send. Practically, this will always be limited by the
158*3f982cf4SFabien Sanglard   // number of Senders; so, this won't be a huge number of packets.
159*3f982cf4SFabien Sanglard   const Clock::time_point burst_time = environment_->now();
160*3f982cf4SFabien Sanglard   const int num_rtcp_packets_sent = SendJustTheRtcpPackets(burst_time);
161*3f982cf4SFabien Sanglard   // Now send all the RTP packets, up to the maximum number allowed in a burst.
162*3f982cf4SFabien Sanglard   // Higher priority Senders' RTP packets are sent first.
163*3f982cf4SFabien Sanglard   const int num_rtp_packets_sent = SendJustTheRtpPackets(
164*3f982cf4SFabien Sanglard       burst_time, max_packets_per_burst_ - num_rtcp_packets_sent);
165*3f982cf4SFabien Sanglard   last_burst_time_ = burst_time;
166*3f982cf4SFabien Sanglard 
167*3f982cf4SFabien Sanglard   BandwidthEstimator::OnBurstComplete(
168*3f982cf4SFabien Sanglard       num_rtcp_packets_sent + num_rtp_packets_sent, burst_time);
169*3f982cf4SFabien Sanglard 
170*3f982cf4SFabien Sanglard   ScheduleNextBurst();
171*3f982cf4SFabien Sanglard }
172*3f982cf4SFabien Sanglard 
SendJustTheRtcpPackets(Clock::time_point send_time)173*3f982cf4SFabien Sanglard int SenderPacketRouter::SendJustTheRtcpPackets(Clock::time_point send_time) {
174*3f982cf4SFabien Sanglard   int num_sent = 0;
175*3f982cf4SFabien Sanglard   for (SenderEntry& entry : senders_) {
176*3f982cf4SFabien Sanglard     if (entry.next_rtcp_send_time > send_time) {
177*3f982cf4SFabien Sanglard       continue;
178*3f982cf4SFabien Sanglard     }
179*3f982cf4SFabien Sanglard 
180*3f982cf4SFabien Sanglard     // Note: Only one RTCP packet is sent from the same Sender in the same
181*3f982cf4SFabien Sanglard     // burst. This is because RTCP packets are supposed to always contain the
182*3f982cf4SFabien Sanglard     // most up-to-date Sender state. Having multiple RTCP packets in the same
183*3f982cf4SFabien Sanglard     // burst would mean that all but the last one are old/irrelevant snapshots
184*3f982cf4SFabien Sanglard     // of Sender state, and this would just thrash/confuse the Receiver.
185*3f982cf4SFabien Sanglard     const absl::Span<uint8_t> packet =
186*3f982cf4SFabien Sanglard         entry.sender->GetRtcpPacketForImmediateSend(
187*3f982cf4SFabien Sanglard             send_time,
188*3f982cf4SFabien Sanglard             absl::Span<uint8_t>(packet_buffer_.get(), packet_buffer_size_));
189*3f982cf4SFabien Sanglard     if (!packet.empty()) {
190*3f982cf4SFabien Sanglard       environment_->SendPacket(packet);
191*3f982cf4SFabien Sanglard       entry.next_rtcp_send_time = send_time + kRtcpReportInterval;
192*3f982cf4SFabien Sanglard       ++num_sent;
193*3f982cf4SFabien Sanglard     }
194*3f982cf4SFabien Sanglard   }
195*3f982cf4SFabien Sanglard 
196*3f982cf4SFabien Sanglard   return num_sent;
197*3f982cf4SFabien Sanglard }
198*3f982cf4SFabien Sanglard 
SendJustTheRtpPackets(Clock::time_point send_time,int num_packets_to_send)199*3f982cf4SFabien Sanglard int SenderPacketRouter::SendJustTheRtpPackets(Clock::time_point send_time,
200*3f982cf4SFabien Sanglard                                               int num_packets_to_send) {
201*3f982cf4SFabien Sanglard   int num_sent = 0;
202*3f982cf4SFabien Sanglard   for (SenderEntry& entry : senders_) {
203*3f982cf4SFabien Sanglard     if (num_sent >= num_packets_to_send) {
204*3f982cf4SFabien Sanglard       break;
205*3f982cf4SFabien Sanglard     }
206*3f982cf4SFabien Sanglard     if (entry.next_rtp_send_time > send_time) {
207*3f982cf4SFabien Sanglard       continue;
208*3f982cf4SFabien Sanglard     }
209*3f982cf4SFabien Sanglard 
210*3f982cf4SFabien Sanglard     for (; num_sent < num_packets_to_send; ++num_sent) {
211*3f982cf4SFabien Sanglard       const absl::Span<uint8_t> packet =
212*3f982cf4SFabien Sanglard           entry.sender->GetRtpPacketForImmediateSend(
213*3f982cf4SFabien Sanglard               send_time,
214*3f982cf4SFabien Sanglard               absl::Span<uint8_t>(packet_buffer_.get(), packet_buffer_size_));
215*3f982cf4SFabien Sanglard       if (packet.empty()) {
216*3f982cf4SFabien Sanglard         break;
217*3f982cf4SFabien Sanglard       }
218*3f982cf4SFabien Sanglard       environment_->SendPacket(packet);
219*3f982cf4SFabien Sanglard     }
220*3f982cf4SFabien Sanglard     entry.next_rtp_send_time = entry.sender->GetRtpResumeTime();
221*3f982cf4SFabien Sanglard   }
222*3f982cf4SFabien Sanglard 
223*3f982cf4SFabien Sanglard   return num_sent;
224*3f982cf4SFabien Sanglard }
225*3f982cf4SFabien Sanglard 
226*3f982cf4SFabien Sanglard namespace {
227*3f982cf4SFabien Sanglard constexpr int kBitsPerByte = 8;
228*3f982cf4SFabien Sanglard constexpr auto kOneSecondInMilliseconds = to_milliseconds(seconds(1));
229*3f982cf4SFabien Sanglard }  // namespace
230*3f982cf4SFabien Sanglard 
231*3f982cf4SFabien Sanglard // static
ComputeMaxPacketsPerBurst(int max_burst_bitrate,int packet_size,milliseconds burst_interval)232*3f982cf4SFabien Sanglard int SenderPacketRouter::ComputeMaxPacketsPerBurst(int max_burst_bitrate,
233*3f982cf4SFabien Sanglard                                                   int packet_size,
234*3f982cf4SFabien Sanglard                                                   milliseconds burst_interval) {
235*3f982cf4SFabien Sanglard   OSP_DCHECK_GT(max_burst_bitrate, 0);
236*3f982cf4SFabien Sanglard   OSP_DCHECK_GT(packet_size, 0);
237*3f982cf4SFabien Sanglard   OSP_DCHECK_GT(burst_interval, milliseconds(0));
238*3f982cf4SFabien Sanglard   OSP_DCHECK_LE(burst_interval, kOneSecondInMilliseconds);
239*3f982cf4SFabien Sanglard 
240*3f982cf4SFabien Sanglard   const int max_packets_per_second =
241*3f982cf4SFabien Sanglard       max_burst_bitrate / kBitsPerByte / packet_size;
242*3f982cf4SFabien Sanglard   const int bursts_per_second = kOneSecondInMilliseconds / burst_interval;
243*3f982cf4SFabien Sanglard   return std::max(max_packets_per_second / bursts_per_second, 1);
244*3f982cf4SFabien Sanglard }
245*3f982cf4SFabien Sanglard 
246*3f982cf4SFabien Sanglard // static
ComputeMaxBurstBitrate(int packet_size,int max_packets_per_burst,milliseconds burst_interval)247*3f982cf4SFabien Sanglard int SenderPacketRouter::ComputeMaxBurstBitrate(int packet_size,
248*3f982cf4SFabien Sanglard                                                int max_packets_per_burst,
249*3f982cf4SFabien Sanglard                                                milliseconds burst_interval) {
250*3f982cf4SFabien Sanglard   OSP_DCHECK_GT(packet_size, 0);
251*3f982cf4SFabien Sanglard   OSP_DCHECK_GT(max_packets_per_burst, 0);
252*3f982cf4SFabien Sanglard   OSP_DCHECK_GT(burst_interval, milliseconds(0));
253*3f982cf4SFabien Sanglard   OSP_DCHECK_LE(burst_interval, kOneSecondInMilliseconds);
254*3f982cf4SFabien Sanglard 
255*3f982cf4SFabien Sanglard   const int64_t max_bits_per_burst =
256*3f982cf4SFabien Sanglard       int64_t{packet_size} * kBitsPerByte * max_packets_per_burst;
257*3f982cf4SFabien Sanglard   const int bursts_per_second = kOneSecondInMilliseconds / burst_interval;
258*3f982cf4SFabien Sanglard   return saturate_cast<int>(max_bits_per_burst * bursts_per_second);
259*3f982cf4SFabien Sanglard }
260*3f982cf4SFabien Sanglard 
261*3f982cf4SFabien Sanglard SenderPacketRouter::Sender::~Sender() = default;
262*3f982cf4SFabien Sanglard 
263*3f982cf4SFabien Sanglard // static
264*3f982cf4SFabien Sanglard constexpr int SenderPacketRouter::kDefaultMaxBurstBitrate;
265*3f982cf4SFabien Sanglard // static
266*3f982cf4SFabien Sanglard constexpr milliseconds SenderPacketRouter::kDefaultBurstInterval;
267*3f982cf4SFabien Sanglard // static
268*3f982cf4SFabien Sanglard constexpr Clock::time_point SenderPacketRouter::kNever;
269*3f982cf4SFabien Sanglard 
270*3f982cf4SFabien Sanglard }  // namespace cast
271*3f982cf4SFabien Sanglard }  // namespace openscreen
272