xref: /aosp_15_r20/external/webrtc/media/sctp/dcsctp_transport.cc (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1 /*
2  *  Copyright 2021 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "media/sctp/dcsctp_transport.h"
12 
13 #include <atomic>
14 #include <cstdint>
15 #include <limits>
16 #include <utility>
17 #include <vector>
18 
19 #include "absl/strings/string_view.h"
20 #include "absl/types/optional.h"
21 #include "api/array_view.h"
22 #include "media/base/media_channel.h"
23 #include "net/dcsctp/public/dcsctp_socket_factory.h"
24 #include "net/dcsctp/public/packet_observer.h"
25 #include "net/dcsctp/public/text_pcap_packet_observer.h"
26 #include "net/dcsctp/public/types.h"
27 #include "p2p/base/packet_transport_internal.h"
28 #include "rtc_base/checks.h"
29 #include "rtc_base/logging.h"
30 #include "rtc_base/socket.h"
31 #include "rtc_base/strings/string_builder.h"
32 #include "rtc_base/thread.h"
33 #include "rtc_base/trace_event.h"
34 #include "system_wrappers/include/clock.h"
35 
36 namespace webrtc {
37 
38 namespace {
39 using ::dcsctp::SendPacketStatus;
40 
41 // When there is packet loss for a long time, the SCTP retry timers will use
42 // exponential backoff, which can grow to very long durations and when the
43 // connection recovers, it may take a long time to reach the new backoff
44 // duration. By limiting it to a reasonable limit, the time to recover reduces.
45 constexpr dcsctp::DurationMs kMaxTimerBackoffDuration =
46     dcsctp::DurationMs(3000);
47 
48 enum class WebrtcPPID : dcsctp::PPID::UnderlyingType {
49   // https://www.rfc-editor.org/rfc/rfc8832.html#section-8.1
50   kDCEP = 50,
51   // https://www.rfc-editor.org/rfc/rfc8831.html#section-8
52   kString = 51,
53   kBinaryPartial = 52,  // Deprecated
54   kBinary = 53,
55   kStringPartial = 54,  // Deprecated
56   kStringEmpty = 56,
57   kBinaryEmpty = 57,
58 };
59 
ToPPID(DataMessageType message_type,size_t size)60 WebrtcPPID ToPPID(DataMessageType message_type, size_t size) {
61   switch (message_type) {
62     case webrtc::DataMessageType::kControl:
63       return WebrtcPPID::kDCEP;
64     case webrtc::DataMessageType::kText:
65       return size > 0 ? WebrtcPPID::kString : WebrtcPPID::kStringEmpty;
66     case webrtc::DataMessageType::kBinary:
67       return size > 0 ? WebrtcPPID::kBinary : WebrtcPPID::kBinaryEmpty;
68   }
69 }
70 
ToDataMessageType(dcsctp::PPID ppid)71 absl::optional<DataMessageType> ToDataMessageType(dcsctp::PPID ppid) {
72   switch (static_cast<WebrtcPPID>(ppid.value())) {
73     case WebrtcPPID::kDCEP:
74       return webrtc::DataMessageType::kControl;
75     case WebrtcPPID::kString:
76     case WebrtcPPID::kStringPartial:
77     case WebrtcPPID::kStringEmpty:
78       return webrtc::DataMessageType::kText;
79     case WebrtcPPID::kBinary:
80     case WebrtcPPID::kBinaryPartial:
81     case WebrtcPPID::kBinaryEmpty:
82       return webrtc::DataMessageType::kBinary;
83   }
84   return absl::nullopt;
85 }
86 
ToErrorCauseCode(dcsctp::ErrorKind error)87 absl::optional<cricket::SctpErrorCauseCode> ToErrorCauseCode(
88     dcsctp::ErrorKind error) {
89   switch (error) {
90     case dcsctp::ErrorKind::kParseFailed:
91       return cricket::SctpErrorCauseCode::kUnrecognizedParameters;
92     case dcsctp::ErrorKind::kPeerReported:
93       return cricket::SctpErrorCauseCode::kUserInitiatedAbort;
94     case dcsctp::ErrorKind::kWrongSequence:
95     case dcsctp::ErrorKind::kProtocolViolation:
96       return cricket::SctpErrorCauseCode::kProtocolViolation;
97     case dcsctp::ErrorKind::kResourceExhaustion:
98       return cricket::SctpErrorCauseCode::kOutOfResource;
99     case dcsctp::ErrorKind::kTooManyRetries:
100     case dcsctp::ErrorKind::kUnsupportedOperation:
101     case dcsctp::ErrorKind::kNoError:
102     case dcsctp::ErrorKind::kNotConnected:
103       // No SCTP error cause code matches those
104       break;
105   }
106   return absl::nullopt;
107 }
108 
IsEmptyPPID(dcsctp::PPID ppid)109 bool IsEmptyPPID(dcsctp::PPID ppid) {
110   WebrtcPPID webrtc_ppid = static_cast<WebrtcPPID>(ppid.value());
111   return webrtc_ppid == WebrtcPPID::kStringEmpty ||
112          webrtc_ppid == WebrtcPPID::kBinaryEmpty;
113 }
114 }  // namespace
115 
DcSctpTransport(rtc::Thread * network_thread,rtc::PacketTransportInternal * transport,Clock * clock)116 DcSctpTransport::DcSctpTransport(rtc::Thread* network_thread,
117                                  rtc::PacketTransportInternal* transport,
118                                  Clock* clock)
119     : DcSctpTransport(network_thread,
120                       transport,
121                       clock,
122                       std::make_unique<dcsctp::DcSctpSocketFactory>()) {}
123 
DcSctpTransport(rtc::Thread * network_thread,rtc::PacketTransportInternal * transport,Clock * clock,std::unique_ptr<dcsctp::DcSctpSocketFactory> socket_factory)124 DcSctpTransport::DcSctpTransport(
125     rtc::Thread* network_thread,
126     rtc::PacketTransportInternal* transport,
127     Clock* clock,
128     std::unique_ptr<dcsctp::DcSctpSocketFactory> socket_factory)
129     : network_thread_(network_thread),
130       transport_(transport),
131       clock_(clock),
132       random_(clock_->TimeInMicroseconds()),
133       socket_factory_(std::move(socket_factory)),
134       task_queue_timeout_factory_(
135           *network_thread,
136           [this]() { return TimeMillis(); },
__anon22529c360302(dcsctp::TimeoutID timeout_id) 137           [this](dcsctp::TimeoutID timeout_id) {
138             socket_->HandleTimeout(timeout_id);
139           }) {
140   RTC_DCHECK_RUN_ON(network_thread_);
141   static std::atomic<int> instance_count = 0;
142   rtc::StringBuilder sb;
143   sb << debug_name_ << instance_count++;
144   debug_name_ = sb.Release();
145   ConnectTransportSignals();
146 }
147 
~DcSctpTransport()148 DcSctpTransport::~DcSctpTransport() {
149   if (socket_) {
150     socket_->Close();
151   }
152 }
153 
SetOnConnectedCallback(std::function<void ()> callback)154 void DcSctpTransport::SetOnConnectedCallback(std::function<void()> callback) {
155   RTC_DCHECK_RUN_ON(network_thread_);
156   on_connected_callback_ = std::move(callback);
157 }
158 
SetDataChannelSink(DataChannelSink * sink)159 void DcSctpTransport::SetDataChannelSink(DataChannelSink* sink) {
160   RTC_DCHECK_RUN_ON(network_thread_);
161   data_channel_sink_ = sink;
162   if (data_channel_sink_ && ready_to_send_data_) {
163     data_channel_sink_->OnReadyToSend();
164   }
165 }
166 
SetDtlsTransport(rtc::PacketTransportInternal * transport)167 void DcSctpTransport::SetDtlsTransport(
168     rtc::PacketTransportInternal* transport) {
169   RTC_DCHECK_RUN_ON(network_thread_);
170   DisconnectTransportSignals();
171   transport_ = transport;
172   ConnectTransportSignals();
173   MaybeConnectSocket();
174 }
175 
Start(int local_sctp_port,int remote_sctp_port,int max_message_size)176 bool DcSctpTransport::Start(int local_sctp_port,
177                             int remote_sctp_port,
178                             int max_message_size) {
179   RTC_DCHECK_RUN_ON(network_thread_);
180   RTC_DCHECK(max_message_size > 0);
181   RTC_DLOG(LS_INFO) << debug_name_ << "->Start(local=" << local_sctp_port
182                     << ", remote=" << remote_sctp_port
183                     << ", max_message_size=" << max_message_size << ")";
184 
185   if (!socket_) {
186     dcsctp::DcSctpOptions options;
187     options.local_port = local_sctp_port;
188     options.remote_port = remote_sctp_port;
189     options.max_message_size = max_message_size;
190     options.max_timer_backoff_duration = kMaxTimerBackoffDuration;
191     // Don't close the connection automatically on too many retransmissions.
192     options.max_retransmissions = absl::nullopt;
193     options.max_init_retransmits = absl::nullopt;
194 
195     std::unique_ptr<dcsctp::PacketObserver> packet_observer;
196     if (RTC_LOG_CHECK_LEVEL(LS_VERBOSE)) {
197       packet_observer =
198           std::make_unique<dcsctp::TextPcapPacketObserver>(debug_name_);
199     }
200 
201     socket_ = socket_factory_->Create(debug_name_, *this,
202                                       std::move(packet_observer), options);
203   } else {
204     if (local_sctp_port != socket_->options().local_port ||
205         remote_sctp_port != socket_->options().remote_port) {
206       RTC_LOG(LS_ERROR)
207           << debug_name_ << "->Start(local=" << local_sctp_port
208           << ", remote=" << remote_sctp_port
209           << "): Can't change ports on already started transport.";
210       return false;
211     }
212     socket_->SetMaxMessageSize(max_message_size);
213   }
214 
215   MaybeConnectSocket();
216 
217   return true;
218 }
219 
OpenStream(int sid)220 bool DcSctpTransport::OpenStream(int sid) {
221   RTC_DCHECK_RUN_ON(network_thread_);
222   RTC_DLOG(LS_INFO) << debug_name_ << "->OpenStream(" << sid << ").";
223 
224   StreamState stream_state;
225   stream_states_.insert_or_assign(dcsctp::StreamID(static_cast<uint16_t>(sid)),
226                                   stream_state);
227   return true;
228 }
229 
ResetStream(int sid)230 bool DcSctpTransport::ResetStream(int sid) {
231   RTC_DCHECK_RUN_ON(network_thread_);
232   RTC_DLOG(LS_INFO) << debug_name_ << "->ResetStream(" << sid << ").";
233   if (!socket_) {
234     RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid
235                       << "): Transport is not started.";
236     return false;
237   }
238 
239   dcsctp::StreamID streams[1] = {dcsctp::StreamID(static_cast<uint16_t>(sid))};
240 
241   auto it = stream_states_.find(streams[0]);
242   if (it == stream_states_.end()) {
243     RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid
244                       << "): Stream is not open.";
245     return false;
246   }
247 
248   StreamState& stream_state = it->second;
249   if (stream_state.closure_initiated || stream_state.incoming_reset_done ||
250       stream_state.outgoing_reset_done) {
251     // The closing procedure was already initiated by the remote, don't do
252     // anything.
253     return false;
254   }
255   stream_state.closure_initiated = true;
256   socket_->ResetStreams(streams);
257   return true;
258 }
259 
SendData(int sid,const SendDataParams & params,const rtc::CopyOnWriteBuffer & payload,cricket::SendDataResult * result)260 bool DcSctpTransport::SendData(int sid,
261                                const SendDataParams& params,
262                                const rtc::CopyOnWriteBuffer& payload,
263                                cricket::SendDataResult* result) {
264   RTC_DCHECK_RUN_ON(network_thread_);
265   RTC_DLOG(LS_VERBOSE) << debug_name_ << "->SendData(sid=" << sid
266                        << ", type=" << static_cast<int>(params.type)
267                        << ", length=" << payload.size() << ").";
268 
269   if (!socket_) {
270     RTC_LOG(LS_ERROR) << debug_name_
271                       << "->SendData(...): Transport is not started.";
272     *result = cricket::SDR_ERROR;
273     return false;
274   }
275 
276   // It is possible for a message to be sent from the signaling thread at the
277   // same time a data-channel is closing, but before the signaling thread is
278   // aware of it. So we need to keep track of currently active data channels and
279   // skip sending messages for the ones that are not open or closing.
280   // The sending errors are not impacting the data channel API contract as
281   // it is allowed to discard queued messages when the channel is closing.
282   auto stream_state =
283       stream_states_.find(dcsctp::StreamID(static_cast<uint16_t>(sid)));
284   if (stream_state == stream_states_.end()) {
285     RTC_LOG(LS_VERBOSE) << "Skipping message on non-open stream with sid: "
286                         << sid;
287     *result = cricket::SDR_ERROR;
288     return false;
289   }
290 
291   if (stream_state->second.closure_initiated ||
292       stream_state->second.incoming_reset_done ||
293       stream_state->second.outgoing_reset_done) {
294     RTC_LOG(LS_VERBOSE) << "Skipping message on closing stream with sid: "
295                         << sid;
296     *result = cricket::SDR_ERROR;
297     return false;
298   }
299 
300   auto max_message_size = socket_->options().max_message_size;
301   if (max_message_size > 0 && payload.size() > max_message_size) {
302     RTC_LOG(LS_WARNING) << debug_name_
303                         << "->SendData(...): "
304                            "Trying to send packet bigger "
305                            "than the max message size: "
306                         << payload.size() << " vs max of " << max_message_size;
307     *result = cricket::SDR_ERROR;
308     return false;
309   }
310 
311   std::vector<uint8_t> message_payload(payload.cdata(),
312                                        payload.cdata() + payload.size());
313   if (message_payload.empty()) {
314     // https://www.rfc-editor.org/rfc/rfc8831.html#section-6.6
315     // SCTP does not support the sending of empty user messages. Therefore, if
316     // an empty message has to be sent, the appropriate PPID (WebRTC String
317     // Empty or WebRTC Binary Empty) is used, and the SCTP user message of one
318     // zero byte is sent.
319     message_payload.push_back('\0');
320   }
321 
322   dcsctp::DcSctpMessage message(
323       dcsctp::StreamID(static_cast<uint16_t>(sid)),
324       dcsctp::PPID(static_cast<uint16_t>(ToPPID(params.type, payload.size()))),
325       std::move(message_payload));
326 
327   dcsctp::SendOptions send_options;
328   send_options.unordered = dcsctp::IsUnordered(!params.ordered);
329   if (params.max_rtx_ms.has_value()) {
330     RTC_DCHECK(*params.max_rtx_ms >= 0 &&
331                *params.max_rtx_ms <= std::numeric_limits<uint16_t>::max());
332     send_options.lifetime = dcsctp::DurationMs(*params.max_rtx_ms);
333   }
334   if (params.max_rtx_count.has_value()) {
335     RTC_DCHECK(*params.max_rtx_count >= 0 &&
336                *params.max_rtx_count <= std::numeric_limits<uint16_t>::max());
337     send_options.max_retransmissions = *params.max_rtx_count;
338   }
339 
340   auto error = socket_->Send(std::move(message), send_options);
341   switch (error) {
342     case dcsctp::SendStatus::kSuccess:
343       *result = cricket::SDR_SUCCESS;
344       break;
345     case dcsctp::SendStatus::kErrorResourceExhaustion:
346       *result = cricket::SDR_BLOCK;
347       ready_to_send_data_ = false;
348       break;
349     default:
350       RTC_LOG(LS_ERROR) << debug_name_
351                         << "->SendData(...): send() failed with error "
352                         << dcsctp::ToString(error) << ".";
353       *result = cricket::SDR_ERROR;
354       break;
355   }
356 
357   return *result == cricket::SDR_SUCCESS;
358 }
359 
ReadyToSendData()360 bool DcSctpTransport::ReadyToSendData() {
361   return ready_to_send_data_;
362 }
363 
max_message_size() const364 int DcSctpTransport::max_message_size() const {
365   if (!socket_) {
366     RTC_LOG(LS_ERROR) << debug_name_
367                       << "->max_message_size(...): Transport is not started.";
368     return 0;
369   }
370   return socket_->options().max_message_size;
371 }
372 
max_outbound_streams() const373 absl::optional<int> DcSctpTransport::max_outbound_streams() const {
374   if (!socket_)
375     return absl::nullopt;
376   return socket_->options().announced_maximum_outgoing_streams;
377 }
378 
max_inbound_streams() const379 absl::optional<int> DcSctpTransport::max_inbound_streams() const {
380   if (!socket_)
381     return absl::nullopt;
382   return socket_->options().announced_maximum_incoming_streams;
383 }
384 
set_debug_name_for_testing(const char * debug_name)385 void DcSctpTransport::set_debug_name_for_testing(const char* debug_name) {
386   debug_name_ = debug_name;
387 }
388 
SendPacketWithStatus(rtc::ArrayView<const uint8_t> data)389 SendPacketStatus DcSctpTransport::SendPacketWithStatus(
390     rtc::ArrayView<const uint8_t> data) {
391   RTC_DCHECK_RUN_ON(network_thread_);
392   RTC_DCHECK(socket_);
393 
394   if (data.size() > (socket_->options().mtu)) {
395     RTC_LOG(LS_ERROR) << debug_name_
396                       << "->SendPacket(...): "
397                          "SCTP seems to have made a packet that is bigger "
398                          "than its official MTU: "
399                       << data.size() << " vs max of " << socket_->options().mtu;
400     return SendPacketStatus::kError;
401   }
402   TRACE_EVENT0("webrtc", "DcSctpTransport::SendPacket");
403 
404   if (!transport_ || !transport_->writable())
405     return SendPacketStatus::kError;
406 
407   RTC_DLOG(LS_VERBOSE) << debug_name_ << "->SendPacket(length=" << data.size()
408                        << ")";
409 
410   auto result =
411       transport_->SendPacket(reinterpret_cast<const char*>(data.data()),
412                              data.size(), rtc::PacketOptions(), 0);
413 
414   if (result < 0) {
415     RTC_LOG(LS_WARNING) << debug_name_ << "->SendPacket(length=" << data.size()
416                         << ") failed with error: " << transport_->GetError()
417                         << ".";
418 
419     if (rtc::IsBlockingError(transport_->GetError())) {
420       return SendPacketStatus::kTemporaryFailure;
421     }
422     return SendPacketStatus::kError;
423   }
424   return SendPacketStatus::kSuccess;
425 }
426 
CreateTimeout(webrtc::TaskQueueBase::DelayPrecision precision)427 std::unique_ptr<dcsctp::Timeout> DcSctpTransport::CreateTimeout(
428     webrtc::TaskQueueBase::DelayPrecision precision) {
429   return task_queue_timeout_factory_.CreateTimeout(precision);
430 }
431 
TimeMillis()432 dcsctp::TimeMs DcSctpTransport::TimeMillis() {
433   return dcsctp::TimeMs(clock_->TimeInMilliseconds());
434 }
435 
GetRandomInt(uint32_t low,uint32_t high)436 uint32_t DcSctpTransport::GetRandomInt(uint32_t low, uint32_t high) {
437   return random_.Rand(low, high);
438 }
439 
OnTotalBufferedAmountLow()440 void DcSctpTransport::OnTotalBufferedAmountLow() {
441   RTC_DCHECK_RUN_ON(network_thread_);
442   if (!ready_to_send_data_) {
443     ready_to_send_data_ = true;
444     if (data_channel_sink_) {
445       data_channel_sink_->OnReadyToSend();
446     }
447   }
448 }
449 
OnMessageReceived(dcsctp::DcSctpMessage message)450 void DcSctpTransport::OnMessageReceived(dcsctp::DcSctpMessage message) {
451   RTC_DCHECK_RUN_ON(network_thread_);
452   RTC_DLOG(LS_VERBOSE) << debug_name_ << "->OnMessageReceived(sid="
453                        << message.stream_id().value()
454                        << ", ppid=" << message.ppid().value()
455                        << ", length=" << message.payload().size() << ").";
456   cricket::ReceiveDataParams receive_data_params;
457   receive_data_params.sid = message.stream_id().value();
458   auto type = ToDataMessageType(message.ppid());
459   if (!type.has_value()) {
460     RTC_LOG(LS_VERBOSE) << debug_name_
461                         << "->OnMessageReceived(): Received an unknown PPID "
462                         << message.ppid().value()
463                         << " on an SCTP packet. Dropping.";
464   }
465   receive_data_params.type = *type;
466   // No seq_num available from dcSCTP
467   receive_data_params.seq_num = 0;
468   receive_buffer_.Clear();
469   if (!IsEmptyPPID(message.ppid()))
470     receive_buffer_.AppendData(message.payload().data(),
471                                message.payload().size());
472 
473   if (data_channel_sink_) {
474     data_channel_sink_->OnDataReceived(
475         receive_data_params.sid, receive_data_params.type, receive_buffer_);
476   }
477 }
478 
OnError(dcsctp::ErrorKind error,absl::string_view message)479 void DcSctpTransport::OnError(dcsctp::ErrorKind error,
480                               absl::string_view message) {
481   if (error == dcsctp::ErrorKind::kResourceExhaustion) {
482     // Indicates that a message failed to be enqueued, because the send buffer
483     // is full, which is a very common (and wanted) state for high throughput
484     // sending/benchmarks.
485     RTC_LOG(LS_VERBOSE) << debug_name_
486                         << "->OnError(error=" << dcsctp::ToString(error)
487                         << ", message=" << message << ").";
488   } else {
489     RTC_LOG(LS_ERROR) << debug_name_
490                       << "->OnError(error=" << dcsctp::ToString(error)
491                       << ", message=" << message << ").";
492   }
493 }
494 
OnAborted(dcsctp::ErrorKind error,absl::string_view message)495 void DcSctpTransport::OnAborted(dcsctp::ErrorKind error,
496                                 absl::string_view message) {
497   RTC_DCHECK_RUN_ON(network_thread_);
498   RTC_LOG(LS_ERROR) << debug_name_
499                     << "->OnAborted(error=" << dcsctp::ToString(error)
500                     << ", message=" << message << ").";
501   ready_to_send_data_ = false;
502   RTCError rtc_error(RTCErrorType::OPERATION_ERROR_WITH_DATA,
503                      std::string(message));
504   rtc_error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE);
505   auto code = ToErrorCauseCode(error);
506   if (code.has_value()) {
507     rtc_error.set_sctp_cause_code(static_cast<uint16_t>(*code));
508   }
509   if (data_channel_sink_) {
510     data_channel_sink_->OnTransportClosed(rtc_error);
511   }
512 }
513 
OnConnected()514 void DcSctpTransport::OnConnected() {
515   RTC_DCHECK_RUN_ON(network_thread_);
516   RTC_DLOG(LS_INFO) << debug_name_ << "->OnConnected().";
517   ready_to_send_data_ = true;
518   if (data_channel_sink_) {
519     data_channel_sink_->OnReadyToSend();
520   }
521   if (on_connected_callback_) {
522     on_connected_callback_();
523   }
524 }
525 
OnClosed()526 void DcSctpTransport::OnClosed() {
527   RTC_DLOG(LS_INFO) << debug_name_ << "->OnClosed().";
528   ready_to_send_data_ = false;
529 }
530 
OnConnectionRestarted()531 void DcSctpTransport::OnConnectionRestarted() {
532   RTC_DLOG(LS_INFO) << debug_name_ << "->OnConnectionRestarted().";
533 }
534 
OnStreamsResetFailed(rtc::ArrayView<const dcsctp::StreamID> outgoing_streams,absl::string_view reason)535 void DcSctpTransport::OnStreamsResetFailed(
536     rtc::ArrayView<const dcsctp::StreamID> outgoing_streams,
537     absl::string_view reason) {
538   // TODO(orphis): Need a test to check for correct behavior
539   for (auto& stream_id : outgoing_streams) {
540     RTC_LOG(LS_WARNING)
541         << debug_name_
542         << "->OnStreamsResetFailed(...): Outgoing stream reset failed"
543         << ", sid=" << stream_id.value() << ", reason: " << reason << ".";
544   }
545 }
546 
OnStreamsResetPerformed(rtc::ArrayView<const dcsctp::StreamID> outgoing_streams)547 void DcSctpTransport::OnStreamsResetPerformed(
548     rtc::ArrayView<const dcsctp::StreamID> outgoing_streams) {
549   RTC_DCHECK_RUN_ON(network_thread_);
550   for (auto& stream_id : outgoing_streams) {
551     RTC_LOG(LS_INFO) << debug_name_
552                      << "->OnStreamsResetPerformed(...): Outgoing stream reset"
553                      << ", sid=" << stream_id.value();
554 
555     auto it = stream_states_.find(stream_id);
556     if (it == stream_states_.end()) {
557       // Ignoring an outgoing stream reset for a closed stream
558       return;
559     }
560 
561     StreamState& stream_state = it->second;
562     stream_state.outgoing_reset_done = true;
563 
564     if (stream_state.incoming_reset_done) {
565       //  When the close was not initiated locally, we can signal the end of the
566       //  data channel close procedure when the remote ACKs the reset.
567       if (data_channel_sink_) {
568         data_channel_sink_->OnChannelClosed(stream_id.value());
569       }
570       stream_states_.erase(stream_id);
571     }
572   }
573 }
574 
OnIncomingStreamsReset(rtc::ArrayView<const dcsctp::StreamID> incoming_streams)575 void DcSctpTransport::OnIncomingStreamsReset(
576     rtc::ArrayView<const dcsctp::StreamID> incoming_streams) {
577   RTC_DCHECK_RUN_ON(network_thread_);
578   for (auto& stream_id : incoming_streams) {
579     RTC_LOG(LS_INFO) << debug_name_
580                      << "->OnIncomingStreamsReset(...): Incoming stream reset"
581                      << ", sid=" << stream_id.value();
582 
583     auto it = stream_states_.find(stream_id);
584     if (it == stream_states_.end())
585       return;
586 
587     StreamState& stream_state = it->second;
588     stream_state.incoming_reset_done = true;
589 
590     if (!stream_state.closure_initiated) {
591       // When receiving an incoming stream reset event for a non local close
592       // procedure, the transport needs to reset the stream in the other
593       // direction too.
594       dcsctp::StreamID streams[1] = {stream_id};
595       socket_->ResetStreams(streams);
596       if (data_channel_sink_) {
597         data_channel_sink_->OnChannelClosing(stream_id.value());
598       }
599     }
600 
601     if (stream_state.outgoing_reset_done) {
602       // The close procedure that was initiated locally is complete when we
603       // receive and incoming reset event.
604       if (data_channel_sink_) {
605         data_channel_sink_->OnChannelClosed(stream_id.value());
606       }
607       stream_states_.erase(stream_id);
608     }
609   }
610 }
611 
ConnectTransportSignals()612 void DcSctpTransport::ConnectTransportSignals() {
613   RTC_DCHECK_RUN_ON(network_thread_);
614   if (!transport_) {
615     return;
616   }
617   transport_->SignalWritableState.connect(
618       this, &DcSctpTransport::OnTransportWritableState);
619   transport_->SignalReadPacket.connect(this,
620                                        &DcSctpTransport::OnTransportReadPacket);
621   transport_->SignalClosed.connect(this, &DcSctpTransport::OnTransportClosed);
622 }
623 
DisconnectTransportSignals()624 void DcSctpTransport::DisconnectTransportSignals() {
625   RTC_DCHECK_RUN_ON(network_thread_);
626   if (!transport_) {
627     return;
628   }
629   transport_->SignalWritableState.disconnect(this);
630   transport_->SignalReadPacket.disconnect(this);
631   transport_->SignalClosed.disconnect(this);
632 }
633 
OnTransportWritableState(rtc::PacketTransportInternal * transport)634 void DcSctpTransport::OnTransportWritableState(
635     rtc::PacketTransportInternal* transport) {
636   RTC_DCHECK_RUN_ON(network_thread_);
637   RTC_DCHECK_EQ(transport_, transport);
638   RTC_DLOG(LS_VERBOSE) << debug_name_
639                        << "->OnTransportWritableState(), writable="
640                        << transport->writable();
641   MaybeConnectSocket();
642 }
643 
OnTransportReadPacket(rtc::PacketTransportInternal * transport,const char * data,size_t length,const int64_t &,int flags)644 void DcSctpTransport::OnTransportReadPacket(
645     rtc::PacketTransportInternal* transport,
646     const char* data,
647     size_t length,
648     const int64_t& /* packet_time_us */,
649     int flags) {
650   RTC_DCHECK_RUN_ON(network_thread_);
651   if (flags) {
652     // We are only interested in SCTP packets.
653     return;
654   }
655 
656   RTC_DLOG(LS_VERBOSE) << debug_name_
657                        << "->OnTransportReadPacket(), length=" << length;
658   if (socket_) {
659     socket_->ReceivePacket(rtc::ArrayView<const uint8_t>(
660         reinterpret_cast<const uint8_t*>(data), length));
661   }
662 }
663 
OnTransportClosed(rtc::PacketTransportInternal * transport)664 void DcSctpTransport::OnTransportClosed(
665     rtc::PacketTransportInternal* transport) {
666   RTC_DCHECK_RUN_ON(network_thread_);
667   RTC_DLOG(LS_VERBOSE) << debug_name_ << "->OnTransportClosed().";
668   if (data_channel_sink_) {
669     data_channel_sink_->OnTransportClosed({});
670   }
671 }
672 
MaybeConnectSocket()673 void DcSctpTransport::MaybeConnectSocket() {
674   if (transport_ && transport_->writable() && socket_ &&
675       socket_->state() == dcsctp::SocketState::kClosed) {
676     socket_->Connect();
677   }
678 }
679 }  // namespace webrtc
680