1 /*
2 * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "modules/rtp_rtcp/source/rtcp_transceiver.h"
12
13 #include <memory>
14 #include <utility>
15
16 #include "api/units/time_delta.h"
17 #include "api/units/timestamp.h"
18 #include "modules/rtp_rtcp/source/rtcp_packet/remote_estimate.h"
19 #include "modules/rtp_rtcp/source/rtcp_packet/sender_report.h"
20 #include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
21 #include "rtc_base/event.h"
22 #include "rtc_base/task_queue_for_test.h"
23 #include "system_wrappers/include/clock.h"
24 #include "test/gmock.h"
25 #include "test/gtest.h"
26 #include "test/mock_transport.h"
27 #include "test/rtcp_packet_parser.h"
28
29 namespace {
30
31 using ::testing::_;
32 using ::testing::AtLeast;
33 using ::testing::Invoke;
34 using ::testing::InvokeWithoutArgs;
35 using ::testing::IsNull;
36 using ::testing::NiceMock;
37 using ::webrtc::MockTransport;
38 using ::webrtc::RtcpTransceiver;
39 using ::webrtc::RtcpTransceiverConfig;
40 using ::webrtc::SimulatedClock;
41 using ::webrtc::TaskQueueForTest;
42 using ::webrtc::Timestamp;
43 using ::webrtc::rtcp::RemoteEstimate;
44 using ::webrtc::rtcp::RtcpPacket;
45 using ::webrtc::rtcp::TransportFeedback;
46 using ::webrtc::test::RtcpPacketParser;
47
48 class MockMediaReceiverRtcpObserver : public webrtc::MediaReceiverRtcpObserver {
49 public:
50 MOCK_METHOD(void,
51 OnSenderReport,
52 (uint32_t, webrtc::NtpTime, uint32_t),
53 (override));
54 };
55
56 constexpr webrtc::TimeDelta kTimeout = webrtc::TimeDelta::Seconds(1);
57
WaitPostedTasks(TaskQueueForTest * queue)58 void WaitPostedTasks(TaskQueueForTest* queue) {
59 rtc::Event done;
60 queue->PostTask([&done] { done.Set(); });
61 ASSERT_TRUE(done.Wait(kTimeout));
62 }
63
TEST(RtcpTransceiverTest,SendsRtcpOnTaskQueueWhenCreatedOffTaskQueue)64 TEST(RtcpTransceiverTest, SendsRtcpOnTaskQueueWhenCreatedOffTaskQueue) {
65 SimulatedClock clock(0);
66 MockTransport outgoing_transport;
67 TaskQueueForTest queue("rtcp");
68 RtcpTransceiverConfig config;
69 config.clock = &clock;
70 config.outgoing_transport = &outgoing_transport;
71 config.task_queue = queue.Get();
72 EXPECT_CALL(outgoing_transport, SendRtcp(_, _))
73 .WillRepeatedly(InvokeWithoutArgs([&] {
74 EXPECT_TRUE(queue.IsCurrent());
75 return true;
76 }));
77
78 RtcpTransceiver rtcp_transceiver(config);
79 rtcp_transceiver.SendCompoundPacket();
80 WaitPostedTasks(&queue);
81 }
82
TEST(RtcpTransceiverTest,SendsRtcpOnTaskQueueWhenCreatedOnTaskQueue)83 TEST(RtcpTransceiverTest, SendsRtcpOnTaskQueueWhenCreatedOnTaskQueue) {
84 SimulatedClock clock(0);
85 MockTransport outgoing_transport;
86 TaskQueueForTest queue("rtcp");
87 RtcpTransceiverConfig config;
88 config.clock = &clock;
89 config.outgoing_transport = &outgoing_transport;
90 config.task_queue = queue.Get();
91 EXPECT_CALL(outgoing_transport, SendRtcp(_, _))
92 .WillRepeatedly(InvokeWithoutArgs([&] {
93 EXPECT_TRUE(queue.IsCurrent());
94 return true;
95 }));
96
97 std::unique_ptr<RtcpTransceiver> rtcp_transceiver;
98 queue.PostTask([&] {
99 rtcp_transceiver = std::make_unique<RtcpTransceiver>(config);
100 rtcp_transceiver->SendCompoundPacket();
101 });
102 WaitPostedTasks(&queue);
103 }
104
TEST(RtcpTransceiverTest,CanBeDestroyedOnTaskQueue)105 TEST(RtcpTransceiverTest, CanBeDestroyedOnTaskQueue) {
106 SimulatedClock clock(0);
107 NiceMock<MockTransport> outgoing_transport;
108 TaskQueueForTest queue("rtcp");
109 RtcpTransceiverConfig config;
110 config.clock = &clock;
111 config.outgoing_transport = &outgoing_transport;
112 config.task_queue = queue.Get();
113 auto rtcp_transceiver = std::make_unique<RtcpTransceiver>(config);
114
115 queue.PostTask([&] {
116 // Insert a packet just before destruction to test for races.
117 rtcp_transceiver->SendCompoundPacket();
118 rtcp_transceiver.reset();
119 });
120 WaitPostedTasks(&queue);
121 }
122
TEST(RtcpTransceiverTest,CanBeDestroyedWithoutBlocking)123 TEST(RtcpTransceiverTest, CanBeDestroyedWithoutBlocking) {
124 SimulatedClock clock(0);
125 TaskQueueForTest queue("rtcp");
126 NiceMock<MockTransport> outgoing_transport;
127 RtcpTransceiverConfig config;
128 config.clock = &clock;
129 config.outgoing_transport = &outgoing_transport;
130 config.task_queue = queue.Get();
131 auto* rtcp_transceiver = new RtcpTransceiver(config);
132 rtcp_transceiver->SendCompoundPacket();
133
134 rtc::Event done;
135 rtc::Event heavy_task;
136 queue.PostTask([&] {
137 EXPECT_TRUE(heavy_task.Wait(kTimeout));
138 done.Set();
139 });
140 delete rtcp_transceiver;
141
142 heavy_task.Set();
143 EXPECT_TRUE(done.Wait(kTimeout));
144 }
145
TEST(RtcpTransceiverTest,MaySendPacketsAfterDestructor)146 TEST(RtcpTransceiverTest, MaySendPacketsAfterDestructor) { // i.e. Be careful!
147 SimulatedClock clock(0);
148 NiceMock<MockTransport> outgoing_transport; // Must outlive queue below.
149 TaskQueueForTest queue("rtcp");
150 RtcpTransceiverConfig config;
151 config.clock = &clock;
152 config.outgoing_transport = &outgoing_transport;
153 config.task_queue = queue.Get();
154 auto* rtcp_transceiver = new RtcpTransceiver(config);
155
156 rtc::Event heavy_task;
157 queue.PostTask([&] { EXPECT_TRUE(heavy_task.Wait(kTimeout)); });
158 rtcp_transceiver->SendCompoundPacket();
159 delete rtcp_transceiver;
160
161 EXPECT_CALL(outgoing_transport, SendRtcp);
162 heavy_task.Set();
163
164 WaitPostedTasks(&queue);
165 }
166
167 // Use rtp timestamp to distinguish different incoming sender reports.
CreateSenderReport(uint32_t ssrc,uint32_t rtp_time)168 rtc::CopyOnWriteBuffer CreateSenderReport(uint32_t ssrc, uint32_t rtp_time) {
169 webrtc::rtcp::SenderReport sr;
170 sr.SetSenderSsrc(ssrc);
171 sr.SetRtpTimestamp(rtp_time);
172 rtc::Buffer buffer = sr.Build();
173 // Switch to an efficient way creating CopyOnWriteBuffer from RtcpPacket when
174 // there is one. Until then do not worry about extra memcpy in test.
175 return rtc::CopyOnWriteBuffer(buffer.data(), buffer.size());
176 }
177
TEST(RtcpTransceiverTest,DoesntPostToRtcpObserverAfterCallToRemove)178 TEST(RtcpTransceiverTest, DoesntPostToRtcpObserverAfterCallToRemove) {
179 const uint32_t kRemoteSsrc = 1234;
180 SimulatedClock clock(0);
181 MockTransport null_transport;
182 TaskQueueForTest queue("rtcp");
183 RtcpTransceiverConfig config;
184 config.clock = &clock;
185 config.outgoing_transport = &null_transport;
186 config.task_queue = queue.Get();
187 RtcpTransceiver rtcp_transceiver(config);
188 rtc::Event observer_deleted;
189
190 auto observer = std::make_unique<MockMediaReceiverRtcpObserver>();
191 EXPECT_CALL(*observer, OnSenderReport(kRemoteSsrc, _, 1));
192 EXPECT_CALL(*observer, OnSenderReport(kRemoteSsrc, _, 2)).Times(0);
193
194 rtcp_transceiver.AddMediaReceiverRtcpObserver(kRemoteSsrc, observer.get());
195 rtcp_transceiver.ReceivePacket(CreateSenderReport(kRemoteSsrc, 1));
196 rtcp_transceiver.RemoveMediaReceiverRtcpObserver(kRemoteSsrc, observer.get(),
197 /*on_removed=*/[&] {
198 observer.reset();
199 observer_deleted.Set();
200 });
201 rtcp_transceiver.ReceivePacket(CreateSenderReport(kRemoteSsrc, 2));
202
203 EXPECT_TRUE(observer_deleted.Wait(kTimeout));
204 WaitPostedTasks(&queue);
205 }
206
TEST(RtcpTransceiverTest,RemoveMediaReceiverRtcpObserverIsNonBlocking)207 TEST(RtcpTransceiverTest, RemoveMediaReceiverRtcpObserverIsNonBlocking) {
208 const uint32_t kRemoteSsrc = 1234;
209 SimulatedClock clock(0);
210 MockTransport null_transport;
211 TaskQueueForTest queue("rtcp");
212 RtcpTransceiverConfig config;
213 config.clock = &clock;
214 config.outgoing_transport = &null_transport;
215 config.task_queue = queue.Get();
216 RtcpTransceiver rtcp_transceiver(config);
217 auto observer = std::make_unique<MockMediaReceiverRtcpObserver>();
218 rtcp_transceiver.AddMediaReceiverRtcpObserver(kRemoteSsrc, observer.get());
219
220 rtc::Event queue_blocker;
221 rtc::Event observer_deleted;
222 queue.PostTask([&] { EXPECT_TRUE(queue_blocker.Wait(kTimeout)); });
223 rtcp_transceiver.RemoveMediaReceiverRtcpObserver(kRemoteSsrc, observer.get(),
224 /*on_removed=*/[&] {
225 observer.reset();
226 observer_deleted.Set();
227 });
228
229 EXPECT_THAT(observer, Not(IsNull()));
230 queue_blocker.Set();
231 EXPECT_TRUE(observer_deleted.Wait(kTimeout));
232 }
233
TEST(RtcpTransceiverTest,CanCallSendCompoundPacketFromAnyThread)234 TEST(RtcpTransceiverTest, CanCallSendCompoundPacketFromAnyThread) {
235 SimulatedClock clock(0);
236 MockTransport outgoing_transport;
237 TaskQueueForTest queue("rtcp");
238 RtcpTransceiverConfig config;
239 config.clock = &clock;
240 config.outgoing_transport = &outgoing_transport;
241 config.task_queue = queue.Get();
242
243 EXPECT_CALL(outgoing_transport, SendRtcp(_, _))
244 // If test is slow, a periodic task may send an extra packet.
245 .Times(AtLeast(3))
246 .WillRepeatedly(InvokeWithoutArgs([&] {
247 EXPECT_TRUE(queue.IsCurrent());
248 return true;
249 }));
250
251 RtcpTransceiver rtcp_transceiver(config);
252
253 // Call from the construction thread.
254 rtcp_transceiver.SendCompoundPacket();
255 // Call from the same queue transceiver use for processing.
256 queue.PostTask([&] { rtcp_transceiver.SendCompoundPacket(); });
257 // Call from unrelated task queue.
258 TaskQueueForTest queue_send("send_packet");
259 queue_send.PostTask([&] { rtcp_transceiver.SendCompoundPacket(); });
260
261 WaitPostedTasks(&queue_send);
262 WaitPostedTasks(&queue);
263 }
264
TEST(RtcpTransceiverTest,DoesntSendPacketsAfterStopCallback)265 TEST(RtcpTransceiverTest, DoesntSendPacketsAfterStopCallback) {
266 SimulatedClock clock(0);
267 NiceMock<MockTransport> outgoing_transport;
268 TaskQueueForTest queue("rtcp");
269 RtcpTransceiverConfig config;
270 config.clock = &clock;
271 config.outgoing_transport = &outgoing_transport;
272 config.task_queue = queue.Get();
273 config.schedule_periodic_compound_packets = true;
274
275 auto rtcp_transceiver = std::make_unique<RtcpTransceiver>(config);
276 rtc::Event done;
277 rtcp_transceiver->SendCompoundPacket();
278 rtcp_transceiver->Stop([&] {
279 EXPECT_CALL(outgoing_transport, SendRtcp).Times(0);
280 done.Set();
281 });
282 rtcp_transceiver = nullptr;
283 EXPECT_TRUE(done.Wait(kTimeout));
284 }
285
TEST(RtcpTransceiverTest,SendsCombinedRtcpPacketOnTaskQueue)286 TEST(RtcpTransceiverTest, SendsCombinedRtcpPacketOnTaskQueue) {
287 static constexpr uint32_t kSenderSsrc = 12345;
288
289 SimulatedClock clock(0);
290 MockTransport outgoing_transport;
291 TaskQueueForTest queue("rtcp");
292 RtcpTransceiverConfig config;
293 config.clock = &clock;
294 config.feedback_ssrc = kSenderSsrc;
295 config.outgoing_transport = &outgoing_transport;
296 config.task_queue = queue.Get();
297 config.schedule_periodic_compound_packets = false;
298 RtcpTransceiver rtcp_transceiver(config);
299
300 EXPECT_CALL(outgoing_transport, SendRtcp)
301 .WillOnce([&](const uint8_t* buffer, size_t size) {
302 EXPECT_TRUE(queue.IsCurrent());
303 RtcpPacketParser rtcp_parser;
304 rtcp_parser.Parse(buffer, size);
305 EXPECT_EQ(rtcp_parser.transport_feedback()->num_packets(), 1);
306 EXPECT_EQ(rtcp_parser.transport_feedback()->sender_ssrc(), kSenderSsrc);
307 EXPECT_EQ(rtcp_parser.app()->num_packets(), 1);
308 EXPECT_EQ(rtcp_parser.app()->sender_ssrc(), kSenderSsrc);
309 return true;
310 });
311
312 // Create minimalistic transport feedback packet.
313 std::vector<std::unique_ptr<RtcpPacket>> packets;
314 auto transport_feedback = std::make_unique<TransportFeedback>();
315 transport_feedback->AddReceivedPacket(321, Timestamp::Millis(10));
316 packets.push_back(std::move(transport_feedback));
317
318 auto remote_estimate = std::make_unique<RemoteEstimate>();
319 packets.push_back(std::move(remote_estimate));
320
321 rtcp_transceiver.SendCombinedRtcpPacket(std::move(packets));
322 WaitPostedTasks(&queue);
323 }
324
TEST(RtcpTransceiverTest,SendFrameIntraRequestDefaultsToNewRequest)325 TEST(RtcpTransceiverTest, SendFrameIntraRequestDefaultsToNewRequest) {
326 static constexpr uint32_t kSenderSsrc = 12345;
327
328 SimulatedClock clock(0);
329 MockTransport outgoing_transport;
330 TaskQueueForTest queue("rtcp");
331 RtcpTransceiverConfig config;
332 config.clock = &clock;
333 config.feedback_ssrc = kSenderSsrc;
334 config.outgoing_transport = &outgoing_transport;
335 config.task_queue = queue.Get();
336 config.schedule_periodic_compound_packets = false;
337 RtcpTransceiver rtcp_transceiver(config);
338
339 uint8_t first_seq_nr;
340 EXPECT_CALL(outgoing_transport, SendRtcp)
341 .WillOnce([&](const uint8_t* buffer, size_t size) {
342 EXPECT_TRUE(queue.IsCurrent());
343 RtcpPacketParser rtcp_parser;
344 rtcp_parser.Parse(buffer, size);
345 EXPECT_EQ(rtcp_parser.fir()->requests()[0].ssrc, kSenderSsrc);
346 first_seq_nr = rtcp_parser.fir()->requests()[0].seq_nr;
347 return true;
348 })
349 .WillOnce([&](const uint8_t* buffer, size_t size) {
350 EXPECT_TRUE(queue.IsCurrent());
351 RtcpPacketParser rtcp_parser;
352 rtcp_parser.Parse(buffer, size);
353 EXPECT_EQ(rtcp_parser.fir()->requests()[0].ssrc, kSenderSsrc);
354 EXPECT_EQ(rtcp_parser.fir()->requests()[0].seq_nr, first_seq_nr + 1);
355 return true;
356 });
357
358 // Send 2 FIR packets because the sequence numbers are incremented after,
359 // sending. One wouldn't be able to differentiate the new_request.
360 rtcp_transceiver.SendFullIntraRequest({kSenderSsrc});
361 rtcp_transceiver.SendFullIntraRequest({kSenderSsrc});
362
363 WaitPostedTasks(&queue);
364 }
365
366 } // namespace
367