1 /*
2 * Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "call/degraded_call.h"
12
13 #include <memory>
14 #include <utility>
15
16 #include "absl/strings/string_view.h"
17 #include "modules/rtp_rtcp/source/rtp_util.h"
18 #include "rtc_base/event.h"
19
20 namespace webrtc {
21
FakeNetworkPipeOnTaskQueue(TaskQueueBase * task_queue,rtc::scoped_refptr<PendingTaskSafetyFlag> call_alive,Clock * clock,std::unique_ptr<NetworkBehaviorInterface> network_behavior)22 DegradedCall::FakeNetworkPipeOnTaskQueue::FakeNetworkPipeOnTaskQueue(
23 TaskQueueBase* task_queue,
24 rtc::scoped_refptr<PendingTaskSafetyFlag> call_alive,
25 Clock* clock,
26 std::unique_ptr<NetworkBehaviorInterface> network_behavior)
27 : clock_(clock),
28 task_queue_(task_queue),
29 call_alive_(std::move(call_alive)),
30 pipe_(clock, std::move(network_behavior)) {}
31
SendRtp(const uint8_t * packet,size_t length,const PacketOptions & options,Transport * transport)32 void DegradedCall::FakeNetworkPipeOnTaskQueue::SendRtp(
33 const uint8_t* packet,
34 size_t length,
35 const PacketOptions& options,
36 Transport* transport) {
37 pipe_.SendRtp(packet, length, options, transport);
38 Process();
39 }
40
SendRtcp(const uint8_t * packet,size_t length,Transport * transport)41 void DegradedCall::FakeNetworkPipeOnTaskQueue::SendRtcp(const uint8_t* packet,
42 size_t length,
43 Transport* transport) {
44 pipe_.SendRtcp(packet, length, transport);
45 Process();
46 }
47
AddActiveTransport(Transport * transport)48 void DegradedCall::FakeNetworkPipeOnTaskQueue::AddActiveTransport(
49 Transport* transport) {
50 pipe_.AddActiveTransport(transport);
51 }
52
RemoveActiveTransport(Transport * transport)53 void DegradedCall::FakeNetworkPipeOnTaskQueue::RemoveActiveTransport(
54 Transport* transport) {
55 pipe_.RemoveActiveTransport(transport);
56 }
57
Process()58 bool DegradedCall::FakeNetworkPipeOnTaskQueue::Process() {
59 pipe_.Process();
60 auto time_to_next = pipe_.TimeUntilNextProcess();
61 if (!time_to_next) {
62 // Packet was probably sent immediately.
63 return false;
64 }
65
66 task_queue_->PostTask(SafeTask(call_alive_, [this, time_to_next] {
67 RTC_DCHECK_RUN_ON(task_queue_);
68 int64_t next_process_time = *time_to_next + clock_->TimeInMilliseconds();
69 if (!next_process_ms_ || next_process_time < *next_process_ms_) {
70 next_process_ms_ = next_process_time;
71 task_queue_->PostDelayedHighPrecisionTask(
72 SafeTask(call_alive_,
73 [this] {
74 RTC_DCHECK_RUN_ON(task_queue_);
75 if (!Process()) {
76 next_process_ms_.reset();
77 }
78 }),
79 TimeDelta::Millis(*time_to_next));
80 }
81 }));
82
83 return true;
84 }
85
FakeNetworkPipeTransportAdapter(FakeNetworkPipeOnTaskQueue * fake_network,Call * call,Clock * clock,Transport * real_transport)86 DegradedCall::FakeNetworkPipeTransportAdapter::FakeNetworkPipeTransportAdapter(
87 FakeNetworkPipeOnTaskQueue* fake_network,
88 Call* call,
89 Clock* clock,
90 Transport* real_transport)
91 : network_pipe_(fake_network),
92 call_(call),
93 clock_(clock),
94 real_transport_(real_transport) {
95 network_pipe_->AddActiveTransport(real_transport);
96 }
97
98 DegradedCall::FakeNetworkPipeTransportAdapter::
~FakeNetworkPipeTransportAdapter()99 ~FakeNetworkPipeTransportAdapter() {
100 network_pipe_->RemoveActiveTransport(real_transport_);
101 }
102
SendRtp(const uint8_t * packet,size_t length,const PacketOptions & options)103 bool DegradedCall::FakeNetworkPipeTransportAdapter::SendRtp(
104 const uint8_t* packet,
105 size_t length,
106 const PacketOptions& options) {
107 // A call here comes from the RTP stack (probably pacer). We intercept it and
108 // put it in the fake network pipe instead, but report to Call that is has
109 // been sent, so that the bandwidth estimator sees the delay we add.
110 network_pipe_->SendRtp(packet, length, options, real_transport_);
111 if (options.packet_id != -1) {
112 rtc::SentPacket sent_packet;
113 sent_packet.packet_id = options.packet_id;
114 sent_packet.send_time_ms = clock_->TimeInMilliseconds();
115 sent_packet.info.included_in_feedback = options.included_in_feedback;
116 sent_packet.info.included_in_allocation = options.included_in_allocation;
117 sent_packet.info.packet_size_bytes = length;
118 sent_packet.info.packet_type = rtc::PacketType::kData;
119 call_->OnSentPacket(sent_packet);
120 }
121 return true;
122 }
123
SendRtcp(const uint8_t * packet,size_t length)124 bool DegradedCall::FakeNetworkPipeTransportAdapter::SendRtcp(
125 const uint8_t* packet,
126 size_t length) {
127 network_pipe_->SendRtcp(packet, length, real_transport_);
128 return true;
129 }
130
ThreadedPacketReceiver(webrtc::TaskQueueBase * worker_thread,webrtc::TaskQueueBase * network_thread,rtc::scoped_refptr<PendingTaskSafetyFlag> call_alive,webrtc::PacketReceiver * receiver)131 DegradedCall::ThreadedPacketReceiver::ThreadedPacketReceiver(
132 webrtc::TaskQueueBase* worker_thread,
133 webrtc::TaskQueueBase* network_thread,
134 rtc::scoped_refptr<PendingTaskSafetyFlag> call_alive,
135 webrtc::PacketReceiver* receiver)
136 : worker_thread_(worker_thread),
137 network_thread_(network_thread),
138 call_alive_(std::move(call_alive)),
139 receiver_(receiver) {}
140
141 DegradedCall::ThreadedPacketReceiver::~ThreadedPacketReceiver() = default;
142
143 PacketReceiver::DeliveryStatus
DeliverPacket(MediaType media_type,rtc::CopyOnWriteBuffer packet,int64_t packet_time_us)144 DegradedCall::ThreadedPacketReceiver::DeliverPacket(
145 MediaType media_type,
146 rtc::CopyOnWriteBuffer packet,
147 int64_t packet_time_us) {
148 // `Call::DeliverPacket` expects RTCP packets to be delivered from the
149 // network thread and RTP packets to be delivered from the worker thread.
150 // Because `FakeNetworkPipe` queues packets, the thread used when this packet
151 // is delivered to `DegradedCall::DeliverPacket` may differ from the thread
152 // used when this packet is delivered to
153 // `ThreadedPacketReceiver::DeliverPacket`. To solve this problem, always
154 // make sure that packets are sent in the correct thread.
155 if (IsRtcpPacket(packet)) {
156 if (!network_thread_->IsCurrent()) {
157 network_thread_->PostTask(
158 SafeTask(call_alive_, [receiver = receiver_, media_type,
159 packet = std::move(packet), packet_time_us]() {
160 receiver->DeliverPacket(media_type, std::move(packet),
161 packet_time_us);
162 }));
163 return DELIVERY_OK;
164 }
165 } else {
166 if (!worker_thread_->IsCurrent()) {
167 worker_thread_->PostTask([receiver = receiver_, media_type,
168 packet = std::move(packet), packet_time_us]() {
169 receiver->DeliverPacket(media_type, std::move(packet), packet_time_us);
170 });
171 return DELIVERY_OK;
172 }
173 }
174
175 return receiver_->DeliverPacket(media_type, std::move(packet),
176 packet_time_us);
177 }
178
DegradedCall(std::unique_ptr<Call> call,const std::vector<TimeScopedNetworkConfig> & send_configs,const std::vector<TimeScopedNetworkConfig> & receive_configs)179 DegradedCall::DegradedCall(
180 std::unique_ptr<Call> call,
181 const std::vector<TimeScopedNetworkConfig>& send_configs,
182 const std::vector<TimeScopedNetworkConfig>& receive_configs)
183 : clock_(Clock::GetRealTimeClock()),
184 call_(std::move(call)),
185 call_alive_(PendingTaskSafetyFlag::CreateDetached()),
186 send_config_index_(0),
187 send_configs_(send_configs),
188 send_simulated_network_(nullptr),
189 receive_config_index_(0),
190 receive_configs_(receive_configs) {
191 if (!receive_configs_.empty()) {
192 auto network = std::make_unique<SimulatedNetwork>(receive_configs_[0]);
193 receive_simulated_network_ = network.get();
194 receive_pipe_ =
195 std::make_unique<webrtc::FakeNetworkPipe>(clock_, std::move(network));
196 packet_receiver_ = std::make_unique<ThreadedPacketReceiver>(
197 call_->worker_thread(), call_->network_thread(), call_alive_,
198 call_->Receiver());
199 receive_pipe_->SetReceiver(packet_receiver_.get());
200 if (receive_configs_.size() > 1) {
201 call_->network_thread()->PostDelayedTask(
202 SafeTask(call_alive_, [this] { UpdateReceiveNetworkConfig(); }),
203 receive_configs_[0].duration);
204 }
205 }
206 if (!send_configs_.empty()) {
207 auto network = std::make_unique<SimulatedNetwork>(send_configs_[0]);
208 send_simulated_network_ = network.get();
209 send_pipe_ = std::make_unique<FakeNetworkPipeOnTaskQueue>(
210 call_->network_thread(), call_alive_, clock_, std::move(network));
211 if (send_configs_.size() > 1) {
212 call_->network_thread()->PostDelayedTask(
213 SafeTask(call_alive_, [this] { UpdateSendNetworkConfig(); }),
214 send_configs_[0].duration);
215 }
216 }
217 }
218
~DegradedCall()219 DegradedCall::~DegradedCall() {
220 RTC_DCHECK_RUN_ON(call_->worker_thread());
221 // Thread synchronization is required to call `SetNotAlive`.
222 // Otherwise, when the `DegradedCall` object is destroyed but
223 // `SetNotAlive` has not yet been called,
224 // another Closure guarded by `call_alive_` may be called.
225 rtc::Event event;
226 call_->network_thread()->PostTask(
227 [flag = std::move(call_alive_), &event]() mutable {
228 flag->SetNotAlive();
229 event.Set();
230 });
231 event.Wait(rtc::Event::kForever);
232 }
233
CreateAudioSendStream(const AudioSendStream::Config & config)234 AudioSendStream* DegradedCall::CreateAudioSendStream(
235 const AudioSendStream::Config& config) {
236 if (!send_configs_.empty()) {
237 auto transport_adapter = std::make_unique<FakeNetworkPipeTransportAdapter>(
238 send_pipe_.get(), call_.get(), clock_, config.send_transport);
239 AudioSendStream::Config degrade_config = config;
240 degrade_config.send_transport = transport_adapter.get();
241 AudioSendStream* send_stream = call_->CreateAudioSendStream(degrade_config);
242 if (send_stream) {
243 audio_send_transport_adapters_[send_stream] =
244 std::move(transport_adapter);
245 }
246 return send_stream;
247 }
248 return call_->CreateAudioSendStream(config);
249 }
250
DestroyAudioSendStream(AudioSendStream * send_stream)251 void DegradedCall::DestroyAudioSendStream(AudioSendStream* send_stream) {
252 call_->DestroyAudioSendStream(send_stream);
253 audio_send_transport_adapters_.erase(send_stream);
254 }
255
CreateAudioReceiveStream(const AudioReceiveStreamInterface::Config & config)256 AudioReceiveStreamInterface* DegradedCall::CreateAudioReceiveStream(
257 const AudioReceiveStreamInterface::Config& config) {
258 return call_->CreateAudioReceiveStream(config);
259 }
260
DestroyAudioReceiveStream(AudioReceiveStreamInterface * receive_stream)261 void DegradedCall::DestroyAudioReceiveStream(
262 AudioReceiveStreamInterface* receive_stream) {
263 call_->DestroyAudioReceiveStream(receive_stream);
264 }
265
CreateVideoSendStream(VideoSendStream::Config config,VideoEncoderConfig encoder_config)266 VideoSendStream* DegradedCall::CreateVideoSendStream(
267 VideoSendStream::Config config,
268 VideoEncoderConfig encoder_config) {
269 std::unique_ptr<FakeNetworkPipeTransportAdapter> transport_adapter;
270 if (!send_configs_.empty()) {
271 transport_adapter = std::make_unique<FakeNetworkPipeTransportAdapter>(
272 send_pipe_.get(), call_.get(), clock_, config.send_transport);
273 config.send_transport = transport_adapter.get();
274 }
275 VideoSendStream* send_stream = call_->CreateVideoSendStream(
276 std::move(config), std::move(encoder_config));
277 if (send_stream && transport_adapter) {
278 video_send_transport_adapters_[send_stream] = std::move(transport_adapter);
279 }
280 return send_stream;
281 }
282
CreateVideoSendStream(VideoSendStream::Config config,VideoEncoderConfig encoder_config,std::unique_ptr<FecController> fec_controller)283 VideoSendStream* DegradedCall::CreateVideoSendStream(
284 VideoSendStream::Config config,
285 VideoEncoderConfig encoder_config,
286 std::unique_ptr<FecController> fec_controller) {
287 std::unique_ptr<FakeNetworkPipeTransportAdapter> transport_adapter;
288 if (!send_configs_.empty()) {
289 transport_adapter = std::make_unique<FakeNetworkPipeTransportAdapter>(
290 send_pipe_.get(), call_.get(), clock_, config.send_transport);
291 config.send_transport = transport_adapter.get();
292 }
293 VideoSendStream* send_stream = call_->CreateVideoSendStream(
294 std::move(config), std::move(encoder_config), std::move(fec_controller));
295 if (send_stream && transport_adapter) {
296 video_send_transport_adapters_[send_stream] = std::move(transport_adapter);
297 }
298 return send_stream;
299 }
300
DestroyVideoSendStream(VideoSendStream * send_stream)301 void DegradedCall::DestroyVideoSendStream(VideoSendStream* send_stream) {
302 call_->DestroyVideoSendStream(send_stream);
303 video_send_transport_adapters_.erase(send_stream);
304 }
305
CreateVideoReceiveStream(VideoReceiveStreamInterface::Config configuration)306 VideoReceiveStreamInterface* DegradedCall::CreateVideoReceiveStream(
307 VideoReceiveStreamInterface::Config configuration) {
308 return call_->CreateVideoReceiveStream(std::move(configuration));
309 }
310
DestroyVideoReceiveStream(VideoReceiveStreamInterface * receive_stream)311 void DegradedCall::DestroyVideoReceiveStream(
312 VideoReceiveStreamInterface* receive_stream) {
313 call_->DestroyVideoReceiveStream(receive_stream);
314 }
315
CreateFlexfecReceiveStream(const FlexfecReceiveStream::Config config)316 FlexfecReceiveStream* DegradedCall::CreateFlexfecReceiveStream(
317 const FlexfecReceiveStream::Config config) {
318 return call_->CreateFlexfecReceiveStream(std::move(config));
319 }
320
DestroyFlexfecReceiveStream(FlexfecReceiveStream * receive_stream)321 void DegradedCall::DestroyFlexfecReceiveStream(
322 FlexfecReceiveStream* receive_stream) {
323 call_->DestroyFlexfecReceiveStream(receive_stream);
324 }
325
AddAdaptationResource(rtc::scoped_refptr<Resource> resource)326 void DegradedCall::AddAdaptationResource(
327 rtc::scoped_refptr<Resource> resource) {
328 call_->AddAdaptationResource(std::move(resource));
329 }
330
Receiver()331 PacketReceiver* DegradedCall::Receiver() {
332 if (!receive_configs_.empty()) {
333 return this;
334 }
335 return call_->Receiver();
336 }
337
338 RtpTransportControllerSendInterface*
GetTransportControllerSend()339 DegradedCall::GetTransportControllerSend() {
340 return call_->GetTransportControllerSend();
341 }
342
GetStats() const343 Call::Stats DegradedCall::GetStats() const {
344 return call_->GetStats();
345 }
346
trials() const347 const FieldTrialsView& DegradedCall::trials() const {
348 return call_->trials();
349 }
350
network_thread() const351 TaskQueueBase* DegradedCall::network_thread() const {
352 return call_->network_thread();
353 }
354
worker_thread() const355 TaskQueueBase* DegradedCall::worker_thread() const {
356 return call_->worker_thread();
357 }
358
SignalChannelNetworkState(MediaType media,NetworkState state)359 void DegradedCall::SignalChannelNetworkState(MediaType media,
360 NetworkState state) {
361 call_->SignalChannelNetworkState(media, state);
362 }
363
OnAudioTransportOverheadChanged(int transport_overhead_per_packet)364 void DegradedCall::OnAudioTransportOverheadChanged(
365 int transport_overhead_per_packet) {
366 call_->OnAudioTransportOverheadChanged(transport_overhead_per_packet);
367 }
368
OnLocalSsrcUpdated(AudioReceiveStreamInterface & stream,uint32_t local_ssrc)369 void DegradedCall::OnLocalSsrcUpdated(AudioReceiveStreamInterface& stream,
370 uint32_t local_ssrc) {
371 call_->OnLocalSsrcUpdated(stream, local_ssrc);
372 }
373
OnLocalSsrcUpdated(VideoReceiveStreamInterface & stream,uint32_t local_ssrc)374 void DegradedCall::OnLocalSsrcUpdated(VideoReceiveStreamInterface& stream,
375 uint32_t local_ssrc) {
376 call_->OnLocalSsrcUpdated(stream, local_ssrc);
377 }
378
OnLocalSsrcUpdated(FlexfecReceiveStream & stream,uint32_t local_ssrc)379 void DegradedCall::OnLocalSsrcUpdated(FlexfecReceiveStream& stream,
380 uint32_t local_ssrc) {
381 call_->OnLocalSsrcUpdated(stream, local_ssrc);
382 }
383
OnUpdateSyncGroup(AudioReceiveStreamInterface & stream,absl::string_view sync_group)384 void DegradedCall::OnUpdateSyncGroup(AudioReceiveStreamInterface& stream,
385 absl::string_view sync_group) {
386 call_->OnUpdateSyncGroup(stream, sync_group);
387 }
388
OnSentPacket(const rtc::SentPacket & sent_packet)389 void DegradedCall::OnSentPacket(const rtc::SentPacket& sent_packet) {
390 if (!send_configs_.empty()) {
391 // If we have a degraded send-transport, we have already notified call
392 // about the supposed network send time. Discard the actual network send
393 // time in order to properly fool the BWE.
394 return;
395 }
396 call_->OnSentPacket(sent_packet);
397 }
398
DeliverPacket(MediaType media_type,rtc::CopyOnWriteBuffer packet,int64_t packet_time_us)399 PacketReceiver::DeliveryStatus DegradedCall::DeliverPacket(
400 MediaType media_type,
401 rtc::CopyOnWriteBuffer packet,
402 int64_t packet_time_us) {
403 PacketReceiver::DeliveryStatus status = receive_pipe_->DeliverPacket(
404 media_type, std::move(packet), packet_time_us);
405 // This is not optimal, but there are many places where there are thread
406 // checks that fail if we're not using the worker thread call into this
407 // method. If we want to fix this we probably need a task queue to do handover
408 // of all overriden methods, which feels like overkill for the current use
409 // case.
410 // By just having this thread call out via the Process() method we work around
411 // that, with the tradeoff that a non-zero delay may become a little larger
412 // than anticipated at very low packet rates.
413 receive_pipe_->Process();
414 return status;
415 }
416
SetClientBitratePreferences(const webrtc::BitrateSettings & preferences)417 void DegradedCall::SetClientBitratePreferences(
418 const webrtc::BitrateSettings& preferences) {
419 call_->SetClientBitratePreferences(preferences);
420 }
421
UpdateSendNetworkConfig()422 void DegradedCall::UpdateSendNetworkConfig() {
423 send_config_index_ = (send_config_index_ + 1) % send_configs_.size();
424 send_simulated_network_->SetConfig(send_configs_[send_config_index_]);
425 call_->network_thread()->PostDelayedTask(
426 SafeTask(call_alive_, [this] { UpdateSendNetworkConfig(); }),
427 send_configs_[send_config_index_].duration);
428 }
429
UpdateReceiveNetworkConfig()430 void DegradedCall::UpdateReceiveNetworkConfig() {
431 receive_config_index_ = (receive_config_index_ + 1) % receive_configs_.size();
432 receive_simulated_network_->SetConfig(
433 receive_configs_[receive_config_index_]);
434 call_->network_thread()->PostDelayedTask(
435 SafeTask(call_alive_, [this] { UpdateReceiveNetworkConfig(); }),
436 receive_configs_[receive_config_index_].duration);
437 }
438 } // namespace webrtc
439