1 /*
2 * Copyright 2021 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "media/sctp/dcsctp_transport.h"
12
13 #include <atomic>
14 #include <cstdint>
15 #include <limits>
16 #include <utility>
17 #include <vector>
18
19 #include "absl/strings/string_view.h"
20 #include "absl/types/optional.h"
21 #include "api/array_view.h"
22 #include "media/base/media_channel.h"
23 #include "net/dcsctp/public/dcsctp_socket_factory.h"
24 #include "net/dcsctp/public/packet_observer.h"
25 #include "net/dcsctp/public/text_pcap_packet_observer.h"
26 #include "net/dcsctp/public/types.h"
27 #include "p2p/base/packet_transport_internal.h"
28 #include "rtc_base/checks.h"
29 #include "rtc_base/logging.h"
30 #include "rtc_base/socket.h"
31 #include "rtc_base/strings/string_builder.h"
32 #include "rtc_base/thread.h"
33 #include "rtc_base/trace_event.h"
34 #include "system_wrappers/include/clock.h"
35
36 namespace webrtc {
37
38 namespace {
39 using ::dcsctp::SendPacketStatus;
40
41 // When there is packet loss for a long time, the SCTP retry timers will use
42 // exponential backoff, which can grow to very long durations and when the
43 // connection recovers, it may take a long time to reach the new backoff
44 // duration. By limiting it to a reasonable limit, the time to recover reduces.
45 constexpr dcsctp::DurationMs kMaxTimerBackoffDuration =
46 dcsctp::DurationMs(3000);
47
48 enum class WebrtcPPID : dcsctp::PPID::UnderlyingType {
49 // https://www.rfc-editor.org/rfc/rfc8832.html#section-8.1
50 kDCEP = 50,
51 // https://www.rfc-editor.org/rfc/rfc8831.html#section-8
52 kString = 51,
53 kBinaryPartial = 52, // Deprecated
54 kBinary = 53,
55 kStringPartial = 54, // Deprecated
56 kStringEmpty = 56,
57 kBinaryEmpty = 57,
58 };
59
ToPPID(DataMessageType message_type,size_t size)60 WebrtcPPID ToPPID(DataMessageType message_type, size_t size) {
61 switch (message_type) {
62 case webrtc::DataMessageType::kControl:
63 return WebrtcPPID::kDCEP;
64 case webrtc::DataMessageType::kText:
65 return size > 0 ? WebrtcPPID::kString : WebrtcPPID::kStringEmpty;
66 case webrtc::DataMessageType::kBinary:
67 return size > 0 ? WebrtcPPID::kBinary : WebrtcPPID::kBinaryEmpty;
68 }
69 }
70
ToDataMessageType(dcsctp::PPID ppid)71 absl::optional<DataMessageType> ToDataMessageType(dcsctp::PPID ppid) {
72 switch (static_cast<WebrtcPPID>(ppid.value())) {
73 case WebrtcPPID::kDCEP:
74 return webrtc::DataMessageType::kControl;
75 case WebrtcPPID::kString:
76 case WebrtcPPID::kStringPartial:
77 case WebrtcPPID::kStringEmpty:
78 return webrtc::DataMessageType::kText;
79 case WebrtcPPID::kBinary:
80 case WebrtcPPID::kBinaryPartial:
81 case WebrtcPPID::kBinaryEmpty:
82 return webrtc::DataMessageType::kBinary;
83 }
84 return absl::nullopt;
85 }
86
ToErrorCauseCode(dcsctp::ErrorKind error)87 absl::optional<cricket::SctpErrorCauseCode> ToErrorCauseCode(
88 dcsctp::ErrorKind error) {
89 switch (error) {
90 case dcsctp::ErrorKind::kParseFailed:
91 return cricket::SctpErrorCauseCode::kUnrecognizedParameters;
92 case dcsctp::ErrorKind::kPeerReported:
93 return cricket::SctpErrorCauseCode::kUserInitiatedAbort;
94 case dcsctp::ErrorKind::kWrongSequence:
95 case dcsctp::ErrorKind::kProtocolViolation:
96 return cricket::SctpErrorCauseCode::kProtocolViolation;
97 case dcsctp::ErrorKind::kResourceExhaustion:
98 return cricket::SctpErrorCauseCode::kOutOfResource;
99 case dcsctp::ErrorKind::kTooManyRetries:
100 case dcsctp::ErrorKind::kUnsupportedOperation:
101 case dcsctp::ErrorKind::kNoError:
102 case dcsctp::ErrorKind::kNotConnected:
103 // No SCTP error cause code matches those
104 break;
105 }
106 return absl::nullopt;
107 }
108
IsEmptyPPID(dcsctp::PPID ppid)109 bool IsEmptyPPID(dcsctp::PPID ppid) {
110 WebrtcPPID webrtc_ppid = static_cast<WebrtcPPID>(ppid.value());
111 return webrtc_ppid == WebrtcPPID::kStringEmpty ||
112 webrtc_ppid == WebrtcPPID::kBinaryEmpty;
113 }
114 } // namespace
115
DcSctpTransport(rtc::Thread * network_thread,rtc::PacketTransportInternal * transport,Clock * clock)116 DcSctpTransport::DcSctpTransport(rtc::Thread* network_thread,
117 rtc::PacketTransportInternal* transport,
118 Clock* clock)
119 : DcSctpTransport(network_thread,
120 transport,
121 clock,
122 std::make_unique<dcsctp::DcSctpSocketFactory>()) {}
123
DcSctpTransport(rtc::Thread * network_thread,rtc::PacketTransportInternal * transport,Clock * clock,std::unique_ptr<dcsctp::DcSctpSocketFactory> socket_factory)124 DcSctpTransport::DcSctpTransport(
125 rtc::Thread* network_thread,
126 rtc::PacketTransportInternal* transport,
127 Clock* clock,
128 std::unique_ptr<dcsctp::DcSctpSocketFactory> socket_factory)
129 : network_thread_(network_thread),
130 transport_(transport),
131 clock_(clock),
132 random_(clock_->TimeInMicroseconds()),
133 socket_factory_(std::move(socket_factory)),
134 task_queue_timeout_factory_(
135 *network_thread,
136 [this]() { return TimeMillis(); },
__anon22529c360302(dcsctp::TimeoutID timeout_id) 137 [this](dcsctp::TimeoutID timeout_id) {
138 socket_->HandleTimeout(timeout_id);
139 }) {
140 RTC_DCHECK_RUN_ON(network_thread_);
141 static std::atomic<int> instance_count = 0;
142 rtc::StringBuilder sb;
143 sb << debug_name_ << instance_count++;
144 debug_name_ = sb.Release();
145 ConnectTransportSignals();
146 }
147
~DcSctpTransport()148 DcSctpTransport::~DcSctpTransport() {
149 if (socket_) {
150 socket_->Close();
151 }
152 }
153
SetOnConnectedCallback(std::function<void ()> callback)154 void DcSctpTransport::SetOnConnectedCallback(std::function<void()> callback) {
155 RTC_DCHECK_RUN_ON(network_thread_);
156 on_connected_callback_ = std::move(callback);
157 }
158
SetDataChannelSink(DataChannelSink * sink)159 void DcSctpTransport::SetDataChannelSink(DataChannelSink* sink) {
160 RTC_DCHECK_RUN_ON(network_thread_);
161 data_channel_sink_ = sink;
162 if (data_channel_sink_ && ready_to_send_data_) {
163 data_channel_sink_->OnReadyToSend();
164 }
165 }
166
SetDtlsTransport(rtc::PacketTransportInternal * transport)167 void DcSctpTransport::SetDtlsTransport(
168 rtc::PacketTransportInternal* transport) {
169 RTC_DCHECK_RUN_ON(network_thread_);
170 DisconnectTransportSignals();
171 transport_ = transport;
172 ConnectTransportSignals();
173 MaybeConnectSocket();
174 }
175
Start(int local_sctp_port,int remote_sctp_port,int max_message_size)176 bool DcSctpTransport::Start(int local_sctp_port,
177 int remote_sctp_port,
178 int max_message_size) {
179 RTC_DCHECK_RUN_ON(network_thread_);
180 RTC_DCHECK(max_message_size > 0);
181 RTC_DLOG(LS_INFO) << debug_name_ << "->Start(local=" << local_sctp_port
182 << ", remote=" << remote_sctp_port
183 << ", max_message_size=" << max_message_size << ")";
184
185 if (!socket_) {
186 dcsctp::DcSctpOptions options;
187 options.local_port = local_sctp_port;
188 options.remote_port = remote_sctp_port;
189 options.max_message_size = max_message_size;
190 options.max_timer_backoff_duration = kMaxTimerBackoffDuration;
191 // Don't close the connection automatically on too many retransmissions.
192 options.max_retransmissions = absl::nullopt;
193 options.max_init_retransmits = absl::nullopt;
194
195 std::unique_ptr<dcsctp::PacketObserver> packet_observer;
196 if (RTC_LOG_CHECK_LEVEL(LS_VERBOSE)) {
197 packet_observer =
198 std::make_unique<dcsctp::TextPcapPacketObserver>(debug_name_);
199 }
200
201 socket_ = socket_factory_->Create(debug_name_, *this,
202 std::move(packet_observer), options);
203 } else {
204 if (local_sctp_port != socket_->options().local_port ||
205 remote_sctp_port != socket_->options().remote_port) {
206 RTC_LOG(LS_ERROR)
207 << debug_name_ << "->Start(local=" << local_sctp_port
208 << ", remote=" << remote_sctp_port
209 << "): Can't change ports on already started transport.";
210 return false;
211 }
212 socket_->SetMaxMessageSize(max_message_size);
213 }
214
215 MaybeConnectSocket();
216
217 return true;
218 }
219
OpenStream(int sid)220 bool DcSctpTransport::OpenStream(int sid) {
221 RTC_DCHECK_RUN_ON(network_thread_);
222 RTC_DLOG(LS_INFO) << debug_name_ << "->OpenStream(" << sid << ").";
223
224 StreamState stream_state;
225 stream_states_.insert_or_assign(dcsctp::StreamID(static_cast<uint16_t>(sid)),
226 stream_state);
227 return true;
228 }
229
ResetStream(int sid)230 bool DcSctpTransport::ResetStream(int sid) {
231 RTC_DCHECK_RUN_ON(network_thread_);
232 RTC_DLOG(LS_INFO) << debug_name_ << "->ResetStream(" << sid << ").";
233 if (!socket_) {
234 RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid
235 << "): Transport is not started.";
236 return false;
237 }
238
239 dcsctp::StreamID streams[1] = {dcsctp::StreamID(static_cast<uint16_t>(sid))};
240
241 auto it = stream_states_.find(streams[0]);
242 if (it == stream_states_.end()) {
243 RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid
244 << "): Stream is not open.";
245 return false;
246 }
247
248 StreamState& stream_state = it->second;
249 if (stream_state.closure_initiated || stream_state.incoming_reset_done ||
250 stream_state.outgoing_reset_done) {
251 // The closing procedure was already initiated by the remote, don't do
252 // anything.
253 return false;
254 }
255 stream_state.closure_initiated = true;
256 socket_->ResetStreams(streams);
257 return true;
258 }
259
SendData(int sid,const SendDataParams & params,const rtc::CopyOnWriteBuffer & payload,cricket::SendDataResult * result)260 bool DcSctpTransport::SendData(int sid,
261 const SendDataParams& params,
262 const rtc::CopyOnWriteBuffer& payload,
263 cricket::SendDataResult* result) {
264 RTC_DCHECK_RUN_ON(network_thread_);
265 RTC_DLOG(LS_VERBOSE) << debug_name_ << "->SendData(sid=" << sid
266 << ", type=" << static_cast<int>(params.type)
267 << ", length=" << payload.size() << ").";
268
269 if (!socket_) {
270 RTC_LOG(LS_ERROR) << debug_name_
271 << "->SendData(...): Transport is not started.";
272 *result = cricket::SDR_ERROR;
273 return false;
274 }
275
276 // It is possible for a message to be sent from the signaling thread at the
277 // same time a data-channel is closing, but before the signaling thread is
278 // aware of it. So we need to keep track of currently active data channels and
279 // skip sending messages for the ones that are not open or closing.
280 // The sending errors are not impacting the data channel API contract as
281 // it is allowed to discard queued messages when the channel is closing.
282 auto stream_state =
283 stream_states_.find(dcsctp::StreamID(static_cast<uint16_t>(sid)));
284 if (stream_state == stream_states_.end()) {
285 RTC_LOG(LS_VERBOSE) << "Skipping message on non-open stream with sid: "
286 << sid;
287 *result = cricket::SDR_ERROR;
288 return false;
289 }
290
291 if (stream_state->second.closure_initiated ||
292 stream_state->second.incoming_reset_done ||
293 stream_state->second.outgoing_reset_done) {
294 RTC_LOG(LS_VERBOSE) << "Skipping message on closing stream with sid: "
295 << sid;
296 *result = cricket::SDR_ERROR;
297 return false;
298 }
299
300 auto max_message_size = socket_->options().max_message_size;
301 if (max_message_size > 0 && payload.size() > max_message_size) {
302 RTC_LOG(LS_WARNING) << debug_name_
303 << "->SendData(...): "
304 "Trying to send packet bigger "
305 "than the max message size: "
306 << payload.size() << " vs max of " << max_message_size;
307 *result = cricket::SDR_ERROR;
308 return false;
309 }
310
311 std::vector<uint8_t> message_payload(payload.cdata(),
312 payload.cdata() + payload.size());
313 if (message_payload.empty()) {
314 // https://www.rfc-editor.org/rfc/rfc8831.html#section-6.6
315 // SCTP does not support the sending of empty user messages. Therefore, if
316 // an empty message has to be sent, the appropriate PPID (WebRTC String
317 // Empty or WebRTC Binary Empty) is used, and the SCTP user message of one
318 // zero byte is sent.
319 message_payload.push_back('\0');
320 }
321
322 dcsctp::DcSctpMessage message(
323 dcsctp::StreamID(static_cast<uint16_t>(sid)),
324 dcsctp::PPID(static_cast<uint16_t>(ToPPID(params.type, payload.size()))),
325 std::move(message_payload));
326
327 dcsctp::SendOptions send_options;
328 send_options.unordered = dcsctp::IsUnordered(!params.ordered);
329 if (params.max_rtx_ms.has_value()) {
330 RTC_DCHECK(*params.max_rtx_ms >= 0 &&
331 *params.max_rtx_ms <= std::numeric_limits<uint16_t>::max());
332 send_options.lifetime = dcsctp::DurationMs(*params.max_rtx_ms);
333 }
334 if (params.max_rtx_count.has_value()) {
335 RTC_DCHECK(*params.max_rtx_count >= 0 &&
336 *params.max_rtx_count <= std::numeric_limits<uint16_t>::max());
337 send_options.max_retransmissions = *params.max_rtx_count;
338 }
339
340 auto error = socket_->Send(std::move(message), send_options);
341 switch (error) {
342 case dcsctp::SendStatus::kSuccess:
343 *result = cricket::SDR_SUCCESS;
344 break;
345 case dcsctp::SendStatus::kErrorResourceExhaustion:
346 *result = cricket::SDR_BLOCK;
347 ready_to_send_data_ = false;
348 break;
349 default:
350 RTC_LOG(LS_ERROR) << debug_name_
351 << "->SendData(...): send() failed with error "
352 << dcsctp::ToString(error) << ".";
353 *result = cricket::SDR_ERROR;
354 break;
355 }
356
357 return *result == cricket::SDR_SUCCESS;
358 }
359
ReadyToSendData()360 bool DcSctpTransport::ReadyToSendData() {
361 return ready_to_send_data_;
362 }
363
max_message_size() const364 int DcSctpTransport::max_message_size() const {
365 if (!socket_) {
366 RTC_LOG(LS_ERROR) << debug_name_
367 << "->max_message_size(...): Transport is not started.";
368 return 0;
369 }
370 return socket_->options().max_message_size;
371 }
372
max_outbound_streams() const373 absl::optional<int> DcSctpTransport::max_outbound_streams() const {
374 if (!socket_)
375 return absl::nullopt;
376 return socket_->options().announced_maximum_outgoing_streams;
377 }
378
max_inbound_streams() const379 absl::optional<int> DcSctpTransport::max_inbound_streams() const {
380 if (!socket_)
381 return absl::nullopt;
382 return socket_->options().announced_maximum_incoming_streams;
383 }
384
set_debug_name_for_testing(const char * debug_name)385 void DcSctpTransport::set_debug_name_for_testing(const char* debug_name) {
386 debug_name_ = debug_name;
387 }
388
SendPacketWithStatus(rtc::ArrayView<const uint8_t> data)389 SendPacketStatus DcSctpTransport::SendPacketWithStatus(
390 rtc::ArrayView<const uint8_t> data) {
391 RTC_DCHECK_RUN_ON(network_thread_);
392 RTC_DCHECK(socket_);
393
394 if (data.size() > (socket_->options().mtu)) {
395 RTC_LOG(LS_ERROR) << debug_name_
396 << "->SendPacket(...): "
397 "SCTP seems to have made a packet that is bigger "
398 "than its official MTU: "
399 << data.size() << " vs max of " << socket_->options().mtu;
400 return SendPacketStatus::kError;
401 }
402 TRACE_EVENT0("webrtc", "DcSctpTransport::SendPacket");
403
404 if (!transport_ || !transport_->writable())
405 return SendPacketStatus::kError;
406
407 RTC_DLOG(LS_VERBOSE) << debug_name_ << "->SendPacket(length=" << data.size()
408 << ")";
409
410 auto result =
411 transport_->SendPacket(reinterpret_cast<const char*>(data.data()),
412 data.size(), rtc::PacketOptions(), 0);
413
414 if (result < 0) {
415 RTC_LOG(LS_WARNING) << debug_name_ << "->SendPacket(length=" << data.size()
416 << ") failed with error: " << transport_->GetError()
417 << ".";
418
419 if (rtc::IsBlockingError(transport_->GetError())) {
420 return SendPacketStatus::kTemporaryFailure;
421 }
422 return SendPacketStatus::kError;
423 }
424 return SendPacketStatus::kSuccess;
425 }
426
CreateTimeout(webrtc::TaskQueueBase::DelayPrecision precision)427 std::unique_ptr<dcsctp::Timeout> DcSctpTransport::CreateTimeout(
428 webrtc::TaskQueueBase::DelayPrecision precision) {
429 return task_queue_timeout_factory_.CreateTimeout(precision);
430 }
431
TimeMillis()432 dcsctp::TimeMs DcSctpTransport::TimeMillis() {
433 return dcsctp::TimeMs(clock_->TimeInMilliseconds());
434 }
435
GetRandomInt(uint32_t low,uint32_t high)436 uint32_t DcSctpTransport::GetRandomInt(uint32_t low, uint32_t high) {
437 return random_.Rand(low, high);
438 }
439
OnTotalBufferedAmountLow()440 void DcSctpTransport::OnTotalBufferedAmountLow() {
441 RTC_DCHECK_RUN_ON(network_thread_);
442 if (!ready_to_send_data_) {
443 ready_to_send_data_ = true;
444 if (data_channel_sink_) {
445 data_channel_sink_->OnReadyToSend();
446 }
447 }
448 }
449
OnMessageReceived(dcsctp::DcSctpMessage message)450 void DcSctpTransport::OnMessageReceived(dcsctp::DcSctpMessage message) {
451 RTC_DCHECK_RUN_ON(network_thread_);
452 RTC_DLOG(LS_VERBOSE) << debug_name_ << "->OnMessageReceived(sid="
453 << message.stream_id().value()
454 << ", ppid=" << message.ppid().value()
455 << ", length=" << message.payload().size() << ").";
456 cricket::ReceiveDataParams receive_data_params;
457 receive_data_params.sid = message.stream_id().value();
458 auto type = ToDataMessageType(message.ppid());
459 if (!type.has_value()) {
460 RTC_LOG(LS_VERBOSE) << debug_name_
461 << "->OnMessageReceived(): Received an unknown PPID "
462 << message.ppid().value()
463 << " on an SCTP packet. Dropping.";
464 }
465 receive_data_params.type = *type;
466 // No seq_num available from dcSCTP
467 receive_data_params.seq_num = 0;
468 receive_buffer_.Clear();
469 if (!IsEmptyPPID(message.ppid()))
470 receive_buffer_.AppendData(message.payload().data(),
471 message.payload().size());
472
473 if (data_channel_sink_) {
474 data_channel_sink_->OnDataReceived(
475 receive_data_params.sid, receive_data_params.type, receive_buffer_);
476 }
477 }
478
OnError(dcsctp::ErrorKind error,absl::string_view message)479 void DcSctpTransport::OnError(dcsctp::ErrorKind error,
480 absl::string_view message) {
481 if (error == dcsctp::ErrorKind::kResourceExhaustion) {
482 // Indicates that a message failed to be enqueued, because the send buffer
483 // is full, which is a very common (and wanted) state for high throughput
484 // sending/benchmarks.
485 RTC_LOG(LS_VERBOSE) << debug_name_
486 << "->OnError(error=" << dcsctp::ToString(error)
487 << ", message=" << message << ").";
488 } else {
489 RTC_LOG(LS_ERROR) << debug_name_
490 << "->OnError(error=" << dcsctp::ToString(error)
491 << ", message=" << message << ").";
492 }
493 }
494
OnAborted(dcsctp::ErrorKind error,absl::string_view message)495 void DcSctpTransport::OnAborted(dcsctp::ErrorKind error,
496 absl::string_view message) {
497 RTC_DCHECK_RUN_ON(network_thread_);
498 RTC_LOG(LS_ERROR) << debug_name_
499 << "->OnAborted(error=" << dcsctp::ToString(error)
500 << ", message=" << message << ").";
501 ready_to_send_data_ = false;
502 RTCError rtc_error(RTCErrorType::OPERATION_ERROR_WITH_DATA,
503 std::string(message));
504 rtc_error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE);
505 auto code = ToErrorCauseCode(error);
506 if (code.has_value()) {
507 rtc_error.set_sctp_cause_code(static_cast<uint16_t>(*code));
508 }
509 if (data_channel_sink_) {
510 data_channel_sink_->OnTransportClosed(rtc_error);
511 }
512 }
513
OnConnected()514 void DcSctpTransport::OnConnected() {
515 RTC_DCHECK_RUN_ON(network_thread_);
516 RTC_DLOG(LS_INFO) << debug_name_ << "->OnConnected().";
517 ready_to_send_data_ = true;
518 if (data_channel_sink_) {
519 data_channel_sink_->OnReadyToSend();
520 }
521 if (on_connected_callback_) {
522 on_connected_callback_();
523 }
524 }
525
OnClosed()526 void DcSctpTransport::OnClosed() {
527 RTC_DLOG(LS_INFO) << debug_name_ << "->OnClosed().";
528 ready_to_send_data_ = false;
529 }
530
OnConnectionRestarted()531 void DcSctpTransport::OnConnectionRestarted() {
532 RTC_DLOG(LS_INFO) << debug_name_ << "->OnConnectionRestarted().";
533 }
534
OnStreamsResetFailed(rtc::ArrayView<const dcsctp::StreamID> outgoing_streams,absl::string_view reason)535 void DcSctpTransport::OnStreamsResetFailed(
536 rtc::ArrayView<const dcsctp::StreamID> outgoing_streams,
537 absl::string_view reason) {
538 // TODO(orphis): Need a test to check for correct behavior
539 for (auto& stream_id : outgoing_streams) {
540 RTC_LOG(LS_WARNING)
541 << debug_name_
542 << "->OnStreamsResetFailed(...): Outgoing stream reset failed"
543 << ", sid=" << stream_id.value() << ", reason: " << reason << ".";
544 }
545 }
546
OnStreamsResetPerformed(rtc::ArrayView<const dcsctp::StreamID> outgoing_streams)547 void DcSctpTransport::OnStreamsResetPerformed(
548 rtc::ArrayView<const dcsctp::StreamID> outgoing_streams) {
549 RTC_DCHECK_RUN_ON(network_thread_);
550 for (auto& stream_id : outgoing_streams) {
551 RTC_LOG(LS_INFO) << debug_name_
552 << "->OnStreamsResetPerformed(...): Outgoing stream reset"
553 << ", sid=" << stream_id.value();
554
555 auto it = stream_states_.find(stream_id);
556 if (it == stream_states_.end()) {
557 // Ignoring an outgoing stream reset for a closed stream
558 return;
559 }
560
561 StreamState& stream_state = it->second;
562 stream_state.outgoing_reset_done = true;
563
564 if (stream_state.incoming_reset_done) {
565 // When the close was not initiated locally, we can signal the end of the
566 // data channel close procedure when the remote ACKs the reset.
567 if (data_channel_sink_) {
568 data_channel_sink_->OnChannelClosed(stream_id.value());
569 }
570 stream_states_.erase(stream_id);
571 }
572 }
573 }
574
OnIncomingStreamsReset(rtc::ArrayView<const dcsctp::StreamID> incoming_streams)575 void DcSctpTransport::OnIncomingStreamsReset(
576 rtc::ArrayView<const dcsctp::StreamID> incoming_streams) {
577 RTC_DCHECK_RUN_ON(network_thread_);
578 for (auto& stream_id : incoming_streams) {
579 RTC_LOG(LS_INFO) << debug_name_
580 << "->OnIncomingStreamsReset(...): Incoming stream reset"
581 << ", sid=" << stream_id.value();
582
583 auto it = stream_states_.find(stream_id);
584 if (it == stream_states_.end())
585 return;
586
587 StreamState& stream_state = it->second;
588 stream_state.incoming_reset_done = true;
589
590 if (!stream_state.closure_initiated) {
591 // When receiving an incoming stream reset event for a non local close
592 // procedure, the transport needs to reset the stream in the other
593 // direction too.
594 dcsctp::StreamID streams[1] = {stream_id};
595 socket_->ResetStreams(streams);
596 if (data_channel_sink_) {
597 data_channel_sink_->OnChannelClosing(stream_id.value());
598 }
599 }
600
601 if (stream_state.outgoing_reset_done) {
602 // The close procedure that was initiated locally is complete when we
603 // receive and incoming reset event.
604 if (data_channel_sink_) {
605 data_channel_sink_->OnChannelClosed(stream_id.value());
606 }
607 stream_states_.erase(stream_id);
608 }
609 }
610 }
611
ConnectTransportSignals()612 void DcSctpTransport::ConnectTransportSignals() {
613 RTC_DCHECK_RUN_ON(network_thread_);
614 if (!transport_) {
615 return;
616 }
617 transport_->SignalWritableState.connect(
618 this, &DcSctpTransport::OnTransportWritableState);
619 transport_->SignalReadPacket.connect(this,
620 &DcSctpTransport::OnTransportReadPacket);
621 transport_->SignalClosed.connect(this, &DcSctpTransport::OnTransportClosed);
622 }
623
DisconnectTransportSignals()624 void DcSctpTransport::DisconnectTransportSignals() {
625 RTC_DCHECK_RUN_ON(network_thread_);
626 if (!transport_) {
627 return;
628 }
629 transport_->SignalWritableState.disconnect(this);
630 transport_->SignalReadPacket.disconnect(this);
631 transport_->SignalClosed.disconnect(this);
632 }
633
OnTransportWritableState(rtc::PacketTransportInternal * transport)634 void DcSctpTransport::OnTransportWritableState(
635 rtc::PacketTransportInternal* transport) {
636 RTC_DCHECK_RUN_ON(network_thread_);
637 RTC_DCHECK_EQ(transport_, transport);
638 RTC_DLOG(LS_VERBOSE) << debug_name_
639 << "->OnTransportWritableState(), writable="
640 << transport->writable();
641 MaybeConnectSocket();
642 }
643
OnTransportReadPacket(rtc::PacketTransportInternal * transport,const char * data,size_t length,const int64_t &,int flags)644 void DcSctpTransport::OnTransportReadPacket(
645 rtc::PacketTransportInternal* transport,
646 const char* data,
647 size_t length,
648 const int64_t& /* packet_time_us */,
649 int flags) {
650 RTC_DCHECK_RUN_ON(network_thread_);
651 if (flags) {
652 // We are only interested in SCTP packets.
653 return;
654 }
655
656 RTC_DLOG(LS_VERBOSE) << debug_name_
657 << "->OnTransportReadPacket(), length=" << length;
658 if (socket_) {
659 socket_->ReceivePacket(rtc::ArrayView<const uint8_t>(
660 reinterpret_cast<const uint8_t*>(data), length));
661 }
662 }
663
OnTransportClosed(rtc::PacketTransportInternal * transport)664 void DcSctpTransport::OnTransportClosed(
665 rtc::PacketTransportInternal* transport) {
666 RTC_DCHECK_RUN_ON(network_thread_);
667 RTC_DLOG(LS_VERBOSE) << debug_name_ << "->OnTransportClosed().";
668 if (data_channel_sink_) {
669 data_channel_sink_->OnTransportClosed({});
670 }
671 }
672
MaybeConnectSocket()673 void DcSctpTransport::MaybeConnectSocket() {
674 if (transport_ && transport_->writable() && socket_ &&
675 socket_->state() == dcsctp::SocketState::kClosed) {
676 socket_->Connect();
677 }
678 }
679 } // namespace webrtc
680