mirror of
https://github.com/mollyim/webrtc.git
synced 2025-05-13 05:40:42 +01:00
sctp: Add DcsctpTransport based on dcSCTP
Bug: webrtc:12614 Change-Id: Ie710621610fff9f8bb6c7d800419675892d6a70c Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/215680 Commit-Queue: Florent Castelli <orphis@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Reviewed-by: Victor Boivie <boivie@webrtc.org> Cr-Commit-Position: refs/heads/master@{#33935}
This commit is contained in:
parent
7b1734a96b
commit
a6983c6ea2
9 changed files with 682 additions and 16 deletions
|
@ -387,6 +387,36 @@ rtc_source_set("rtc_data_sctp_transport_internal") {
|
|||
]
|
||||
}
|
||||
|
||||
if (rtc_build_dcsctp) {
|
||||
rtc_library("rtc_data_dcsctp_transport") {
|
||||
sources = [
|
||||
"sctp/dcsctp_transport.cc",
|
||||
"sctp/dcsctp_transport.h",
|
||||
]
|
||||
deps = [
|
||||
":rtc_data_sctp_transport_internal",
|
||||
"../api:array_view",
|
||||
"../media:rtc_media_base",
|
||||
"../net/dcsctp/public:socket",
|
||||
"../net/dcsctp/public:types",
|
||||
"../net/dcsctp/socket:dcsctp_socket",
|
||||
"../net/dcsctp/timer:task_queue_timeout",
|
||||
"../p2p:rtc_p2p",
|
||||
"../rtc_base:checks",
|
||||
"../rtc_base:rtc_base_approved",
|
||||
"../rtc_base:threading",
|
||||
"../rtc_base/task_utils:pending_task_safety_flag",
|
||||
"../rtc_base/task_utils:to_queued_task",
|
||||
"../rtc_base/third_party/sigslot:sigslot",
|
||||
"../system_wrappers",
|
||||
]
|
||||
absl_deps += [
|
||||
"//third_party/abseil-cpp/absl/strings:strings",
|
||||
"//third_party/abseil-cpp/absl/types:optional",
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
if (rtc_build_usrsctp) {
|
||||
rtc_library("rtc_data_usrsctp_transport") {
|
||||
defines = [
|
||||
|
@ -426,11 +456,22 @@ rtc_library("rtc_data_sctp_transport_factory") {
|
|||
":rtc_data_sctp_transport_internal",
|
||||
"../api/transport:sctp_transport_factory_interface",
|
||||
"../rtc_base:threading",
|
||||
"../rtc_base/experiments:field_trial_parser",
|
||||
"../rtc_base/system:unused",
|
||||
]
|
||||
|
||||
if (rtc_enable_sctp) {
|
||||
assert(rtc_build_usrsctp, "An SCTP backend is required to enable SCTP")
|
||||
assert(rtc_build_dcsctp || rtc_build_usrsctp,
|
||||
"An SCTP backend is required to enable SCTP")
|
||||
}
|
||||
|
||||
if (rtc_build_dcsctp) {
|
||||
defines += [ "WEBRTC_HAVE_DCSCTP" ]
|
||||
deps += [
|
||||
":rtc_data_dcsctp_transport",
|
||||
"../system_wrappers",
|
||||
"../system_wrappers:field_trial",
|
||||
]
|
||||
}
|
||||
|
||||
if (rtc_build_usrsctp) {
|
||||
|
|
|
@ -11,6 +11,7 @@ include_rules = [
|
|||
"+modules/video_capture",
|
||||
"+modules/video_coding",
|
||||
"+modules/video_coding/utility",
|
||||
"+net/dcsctp",
|
||||
"+p2p",
|
||||
"+sound",
|
||||
"+system_wrappers",
|
||||
|
|
483
media/sctp/dcsctp_transport.cc
Normal file
483
media/sctp/dcsctp_transport.cc
Normal file
|
@ -0,0 +1,483 @@
|
|||
/*
|
||||
* Copyright 2021 The WebRTC project authors. All Rights Reserved.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license
|
||||
* that can be found in the LICENSE file in the root of the source
|
||||
* tree. An additional intellectual property rights grant can be found
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
|
||||
#include "media/sctp/dcsctp_transport.h"
|
||||
|
||||
#include <cstdint>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "absl/strings/string_view.h"
|
||||
#include "absl/types/optional.h"
|
||||
#include "api/array_view.h"
|
||||
#include "media/base/media_channel.h"
|
||||
#include "net/dcsctp/public/types.h"
|
||||
#include "net/dcsctp/socket/dcsctp_socket.h"
|
||||
#include "p2p/base/packet_transport_internal.h"
|
||||
#include "rtc_base/checks.h"
|
||||
#include "rtc_base/thread.h"
|
||||
#include "rtc_base/trace_event.h"
|
||||
#include "system_wrappers/include/clock.h"
|
||||
|
||||
namespace webrtc {
|
||||
|
||||
namespace {
|
||||
|
||||
enum class WebrtcPPID : dcsctp::PPID::UnderlyingType {
|
||||
kNone = 0, // No protocol is specified.
|
||||
// https://www.rfc-editor.org/rfc/rfc8832.html#section-8.1
|
||||
kDCEP = 50,
|
||||
// https://www.rfc-editor.org/rfc/rfc8831.html#section-8
|
||||
kString = 51,
|
||||
kBinaryPartial = 52, // Deprecated
|
||||
kBinary = 53,
|
||||
kStringPartial = 54, // Deprecated
|
||||
kStringEmpty = 56,
|
||||
kBinaryEmpty = 57,
|
||||
};
|
||||
|
||||
WebrtcPPID ToPPID(cricket::DataMessageType message_type, size_t size) {
|
||||
switch (message_type) {
|
||||
case cricket::DMT_CONTROL:
|
||||
return WebrtcPPID::kDCEP;
|
||||
case cricket::DMT_TEXT:
|
||||
return size > 0 ? WebrtcPPID::kString : WebrtcPPID::kStringEmpty;
|
||||
case cricket::DMT_BINARY:
|
||||
return size > 0 ? WebrtcPPID::kBinary : WebrtcPPID::kBinaryEmpty;
|
||||
default:
|
||||
RTC_NOTREACHED();
|
||||
}
|
||||
return WebrtcPPID::kNone;
|
||||
}
|
||||
|
||||
absl::optional<cricket::DataMessageType> ToDataMessageType(dcsctp::PPID ppid) {
|
||||
switch (static_cast<WebrtcPPID>(ppid.value())) {
|
||||
case WebrtcPPID::kNone:
|
||||
return cricket::DMT_NONE;
|
||||
case WebrtcPPID::kDCEP:
|
||||
return cricket::DMT_CONTROL;
|
||||
case WebrtcPPID::kString:
|
||||
case WebrtcPPID::kStringPartial:
|
||||
case WebrtcPPID::kStringEmpty:
|
||||
return cricket::DMT_TEXT;
|
||||
case WebrtcPPID::kBinary:
|
||||
case WebrtcPPID::kBinaryPartial:
|
||||
case WebrtcPPID::kBinaryEmpty:
|
||||
return cricket::DMT_BINARY;
|
||||
}
|
||||
return absl::nullopt;
|
||||
}
|
||||
|
||||
bool IsEmptyPPID(dcsctp::PPID ppid) {
|
||||
WebrtcPPID webrtc_ppid = static_cast<WebrtcPPID>(ppid.value());
|
||||
return webrtc_ppid == WebrtcPPID::kStringEmpty ||
|
||||
webrtc_ppid == WebrtcPPID::kBinaryEmpty;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
DcSctpTransport::DcSctpTransport(rtc::Thread* network_thread,
|
||||
rtc::PacketTransportInternal* transport,
|
||||
Clock* clock)
|
||||
: network_thread_(network_thread),
|
||||
transport_(transport),
|
||||
clock_(clock),
|
||||
random_(clock_->TimeInMicroseconds()),
|
||||
task_queue_timeout_factory_(
|
||||
*network_thread,
|
||||
[this]() { return TimeMillis(); },
|
||||
[this](dcsctp::TimeoutID timeout_id) {
|
||||
socket_->HandleTimeout(timeout_id);
|
||||
}) {
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
static int instance_count = 0;
|
||||
rtc::StringBuilder sb;
|
||||
sb << debug_name_ << instance_count++;
|
||||
debug_name_ = sb.Release();
|
||||
ConnectTransportSignals();
|
||||
}
|
||||
|
||||
DcSctpTransport::~DcSctpTransport() {
|
||||
if (socket_) {
|
||||
socket_->Close();
|
||||
}
|
||||
}
|
||||
|
||||
void DcSctpTransport::SetDtlsTransport(
|
||||
rtc::PacketTransportInternal* transport) {
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
DisconnectTransportSignals();
|
||||
transport_ = transport;
|
||||
ConnectTransportSignals();
|
||||
MaybeConnectSocket();
|
||||
}
|
||||
|
||||
bool DcSctpTransport::Start(int local_sctp_port,
|
||||
int remote_sctp_port,
|
||||
int max_message_size) {
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
RTC_DCHECK(max_message_size > 0);
|
||||
|
||||
RTC_LOG(LS_INFO) << debug_name_ << "->Start(local=" << local_sctp_port
|
||||
<< ", remote=" << remote_sctp_port
|
||||
<< ", max_message_size=" << max_message_size << ")";
|
||||
|
||||
if (!socket_) {
|
||||
dcsctp::DcSctpOptions options;
|
||||
options.local_port = local_sctp_port;
|
||||
options.remote_port = remote_sctp_port;
|
||||
options.max_message_size = max_message_size;
|
||||
|
||||
socket_ = std::make_unique<dcsctp::DcSctpSocket>(debug_name_, *this,
|
||||
nullptr, options);
|
||||
} else {
|
||||
if (local_sctp_port != socket_->options().local_port ||
|
||||
remote_sctp_port != socket_->options().remote_port) {
|
||||
RTC_LOG(LS_ERROR)
|
||||
<< debug_name_ << "->Start(local=" << local_sctp_port
|
||||
<< ", remote=" << remote_sctp_port
|
||||
<< "): Can't change ports on already started transport.";
|
||||
return false;
|
||||
}
|
||||
socket_->SetMaxMessageSize(max_message_size);
|
||||
}
|
||||
|
||||
MaybeConnectSocket();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool DcSctpTransport::OpenStream(int sid) {
|
||||
RTC_LOG(LS_INFO) << debug_name_ << "->OpenStream(" << sid << ").";
|
||||
if (!socket_) {
|
||||
RTC_LOG(LS_ERROR) << debug_name_ << "->OpenStream(sid=" << sid
|
||||
<< "): Transport is not started.";
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool DcSctpTransport::ResetStream(int sid) {
|
||||
RTC_LOG(LS_INFO) << debug_name_ << "->ResetStream(" << sid << ").";
|
||||
if (!socket_) {
|
||||
RTC_LOG(LS_ERROR) << debug_name_ << "->OpenStream(sid=" << sid
|
||||
<< "): Transport is not started.";
|
||||
return false;
|
||||
}
|
||||
dcsctp::StreamID streams[1] = {dcsctp::StreamID(static_cast<uint16_t>(sid))};
|
||||
socket_->ResetStreams(streams);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool DcSctpTransport::SendData(const cricket::SendDataParams& params,
|
||||
const rtc::CopyOnWriteBuffer& payload,
|
||||
cricket::SendDataResult* result) {
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
|
||||
RTC_LOG(LS_VERBOSE) << debug_name_ << "->SendData(sid=" << params.sid
|
||||
<< ", type=" << params.type
|
||||
<< ", length=" << payload.size() << ").";
|
||||
|
||||
if (!socket_) {
|
||||
RTC_LOG(LS_ERROR) << debug_name_
|
||||
<< "->SendData(...): Transport is not started.";
|
||||
*result = cricket::SDR_ERROR;
|
||||
return false;
|
||||
}
|
||||
|
||||
auto max_message_size = socket_->options().max_message_size;
|
||||
if (max_message_size > 0 && payload.size() > max_message_size) {
|
||||
RTC_LOG(LS_ERROR) << debug_name_
|
||||
<< "->SendData(...): "
|
||||
"Trying to send packet bigger "
|
||||
"than the max message size: "
|
||||
<< payload.size() << " vs max of " << max_message_size;
|
||||
*result = cricket::SDR_ERROR;
|
||||
return false;
|
||||
}
|
||||
|
||||
std::vector<uint8_t> message_payload(payload.cdata(),
|
||||
payload.cdata() + payload.size());
|
||||
if (message_payload.empty()) {
|
||||
// https://www.rfc-editor.org/rfc/rfc8831.html#section-6.6
|
||||
// SCTP does not support the sending of empty user messages. Therefore, if
|
||||
// an empty message has to be sent, the appropriate PPID (WebRTC String
|
||||
// Empty or WebRTC Binary Empty) is used, and the SCTP user message of one
|
||||
// zero byte is sent.
|
||||
message_payload.push_back('\0');
|
||||
}
|
||||
|
||||
dcsctp::DcSctpMessage message(
|
||||
dcsctp::StreamID(static_cast<uint16_t>(params.sid)),
|
||||
dcsctp::PPID(static_cast<uint16_t>(ToPPID(params.type, payload.size()))),
|
||||
std::move(message_payload));
|
||||
|
||||
dcsctp::SendOptions send_options;
|
||||
send_options.unordered = dcsctp::IsUnordered(!params.ordered);
|
||||
if (params.max_rtx_ms > 0)
|
||||
send_options.lifetime = dcsctp::DurationMs(params.max_rtx_ms);
|
||||
if (params.max_rtx_count > 0)
|
||||
send_options.max_retransmissions =
|
||||
static_cast<size_t>(params.max_rtx_count);
|
||||
|
||||
auto error = socket_->Send(std::move(message), send_options);
|
||||
switch (error) {
|
||||
case dcsctp::SendStatus::kSuccess:
|
||||
*result = cricket::SDR_SUCCESS;
|
||||
break;
|
||||
case dcsctp::SendStatus::kErrorResourceExhaustion:
|
||||
*result = cricket::SDR_BLOCK;
|
||||
ready_to_send_data_ = false;
|
||||
break;
|
||||
default:
|
||||
RTC_LOG(LS_ERROR) << debug_name_
|
||||
<< "->SendData(...): send() failed with error "
|
||||
<< dcsctp::ToString(error) << ".";
|
||||
*result = cricket::SDR_ERROR;
|
||||
}
|
||||
|
||||
return *result == cricket::SDR_SUCCESS;
|
||||
}
|
||||
|
||||
bool DcSctpTransport::ReadyToSendData() {
|
||||
return ready_to_send_data_;
|
||||
}
|
||||
|
||||
int DcSctpTransport::max_message_size() const {
|
||||
if (!socket_) {
|
||||
RTC_LOG(LS_ERROR) << debug_name_
|
||||
<< "->max_message_size(...): Transport is not started.";
|
||||
return 0;
|
||||
}
|
||||
return socket_->options().max_message_size;
|
||||
}
|
||||
|
||||
absl::optional<int> DcSctpTransport::max_outbound_streams() const {
|
||||
if (!socket_)
|
||||
return absl::nullopt;
|
||||
return socket_->options().announced_maximum_outgoing_streams;
|
||||
}
|
||||
|
||||
absl::optional<int> DcSctpTransport::max_inbound_streams() const {
|
||||
if (!socket_)
|
||||
return absl::nullopt;
|
||||
return socket_->options().announced_maximum_incoming_streams;
|
||||
}
|
||||
|
||||
void DcSctpTransport::set_debug_name_for_testing(const char* debug_name) {
|
||||
debug_name_ = debug_name;
|
||||
}
|
||||
|
||||
void DcSctpTransport::SendPacket(rtc::ArrayView<const uint8_t> data) {
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
RTC_DCHECK(socket_);
|
||||
|
||||
if (data.size() > (socket_->options().mtu)) {
|
||||
RTC_LOG(LS_ERROR) << debug_name_
|
||||
<< "->SendPacket(...): "
|
||||
"SCTP seems to have made a packet that is bigger "
|
||||
"than its official MTU: "
|
||||
<< data.size() << " vs max of " << socket_->options().mtu;
|
||||
return;
|
||||
}
|
||||
TRACE_EVENT0("webrtc", "DcSctpTransport::SendPacket");
|
||||
|
||||
if (!transport_ || !transport_->writable())
|
||||
return;
|
||||
|
||||
RTC_LOG(LS_VERBOSE) << debug_name_ << "->SendPacket(length=" << data.size()
|
||||
<< ")";
|
||||
|
||||
auto result =
|
||||
transport_->SendPacket(reinterpret_cast<const char*>(data.data()),
|
||||
data.size(), rtc::PacketOptions(), 0);
|
||||
|
||||
if (result < 0) {
|
||||
RTC_LOG(LS_ERROR) << debug_name_ << "->SendPacket(length=" << data.size()
|
||||
<< ") failed with error: " << transport_->GetError()
|
||||
<< ".";
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<dcsctp::Timeout> DcSctpTransport::CreateTimeout() {
|
||||
return task_queue_timeout_factory_.CreateTimeout();
|
||||
}
|
||||
|
||||
dcsctp::TimeMs DcSctpTransport::TimeMillis() {
|
||||
return dcsctp::TimeMs(clock_->TimeInMilliseconds());
|
||||
}
|
||||
|
||||
uint32_t DcSctpTransport::GetRandomInt(uint32_t low, uint32_t high) {
|
||||
return random_.Rand(low, high);
|
||||
}
|
||||
|
||||
void DcSctpTransport::NotifyOutgoingMessageBufferEmpty() {
|
||||
RTC_LOG(LS_VERBOSE) << debug_name_ << "->NotifyOutgoingMessageBufferEmpty()";
|
||||
if (!ready_to_send_data_) {
|
||||
ready_to_send_data_ = true;
|
||||
SignalReadyToSendData();
|
||||
}
|
||||
}
|
||||
|
||||
void DcSctpTransport::OnMessageReceived(dcsctp::DcSctpMessage message) {
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
RTC_LOG(LS_INFO) << debug_name_
|
||||
<< "->OnMessageReceived(sid=" << message.stream_id().value()
|
||||
<< ", ppid=" << message.ppid().value()
|
||||
<< ", length=" << message.payload().size() << ").";
|
||||
cricket::ReceiveDataParams receive_data_params;
|
||||
receive_data_params.sid = message.stream_id().value();
|
||||
auto type = ToDataMessageType(message.ppid());
|
||||
if (!type.has_value()) {
|
||||
RTC_LOG(LS_VERBOSE) << debug_name_
|
||||
<< "->OnMessageReceived(): Received an unknown PPID "
|
||||
<< message.ppid().value()
|
||||
<< " on an SCTP packet. Dropping.";
|
||||
}
|
||||
receive_data_params.type = *type;
|
||||
// No seq_num available from dcSCTP
|
||||
receive_data_params.seq_num = 0;
|
||||
receive_buffer_.Clear();
|
||||
if (!IsEmptyPPID(message.ppid()))
|
||||
receive_buffer_.AppendData(message.payload().data(),
|
||||
message.payload().size());
|
||||
|
||||
SignalDataReceived(receive_data_params, receive_buffer_);
|
||||
}
|
||||
|
||||
void DcSctpTransport::OnError(dcsctp::ErrorKind error,
|
||||
absl::string_view message) {
|
||||
RTC_LOG(LS_ERROR) << debug_name_
|
||||
<< "->OnError(error=" << dcsctp::ToString(error)
|
||||
<< ", message=" << message << ").";
|
||||
}
|
||||
|
||||
void DcSctpTransport::OnAborted(dcsctp::ErrorKind error,
|
||||
absl::string_view message) {
|
||||
RTC_LOG(LS_ERROR) << debug_name_
|
||||
<< "->OnAborted(error=" << dcsctp::ToString(error)
|
||||
<< ", message=" << message << ").";
|
||||
ready_to_send_data_ = false;
|
||||
}
|
||||
|
||||
void DcSctpTransport::OnConnected() {
|
||||
RTC_LOG(LS_INFO) << debug_name_ << "->OnConnected().";
|
||||
ready_to_send_data_ = true;
|
||||
SignalReadyToSendData();
|
||||
SignalAssociationChangeCommunicationUp();
|
||||
}
|
||||
|
||||
void DcSctpTransport::OnClosed() {
|
||||
RTC_LOG(LS_INFO) << debug_name_ << "->OnClosed().";
|
||||
ready_to_send_data_ = false;
|
||||
}
|
||||
|
||||
void DcSctpTransport::OnConnectionRestarted() {
|
||||
RTC_LOG(LS_INFO) << debug_name_ << "->OnConnectionRestarted().";
|
||||
}
|
||||
|
||||
void DcSctpTransport::OnStreamsResetFailed(
|
||||
rtc::ArrayView<const dcsctp::StreamID> outgoing_streams,
|
||||
absl::string_view reason) {
|
||||
// TODO(orphis): Need a test to check for correct behavior
|
||||
for (auto& stream_id : outgoing_streams) {
|
||||
RTC_LOG(LS_ERROR)
|
||||
<< debug_name_
|
||||
<< "->OnStreamsResetFailed(...): Outgoing stream reset failed"
|
||||
<< ", sid=" << stream_id.value() << ", reason: " << reason << ".";
|
||||
}
|
||||
}
|
||||
|
||||
void DcSctpTransport::OnStreamsResetPerformed(
|
||||
rtc::ArrayView<const dcsctp::StreamID> outgoing_streams) {
|
||||
for (auto& stream_id : outgoing_streams) {
|
||||
RTC_LOG(LS_INFO) << debug_name_
|
||||
<< "->OnStreamsResetPerformed(...): Outgoing stream reset"
|
||||
<< ", sid=" << stream_id.value();
|
||||
SignalClosingProcedureComplete(stream_id.value());
|
||||
}
|
||||
}
|
||||
|
||||
void DcSctpTransport::OnIncomingStreamsReset(
|
||||
rtc::ArrayView<const dcsctp::StreamID> incoming_streams) {
|
||||
for (auto& stream_id : incoming_streams) {
|
||||
RTC_LOG(LS_INFO) << debug_name_
|
||||
<< "->OnIncomingStreamsReset(...): Incoming stream reset"
|
||||
<< ", sid=" << stream_id.value();
|
||||
SignalClosingProcedureStartedRemotely(stream_id.value());
|
||||
SignalClosingProcedureComplete(stream_id.value());
|
||||
}
|
||||
}
|
||||
|
||||
void DcSctpTransport::ConnectTransportSignals() {
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
if (!transport_) {
|
||||
return;
|
||||
}
|
||||
transport_->SignalWritableState.connect(
|
||||
this, &DcSctpTransport::OnTransportWritableState);
|
||||
transport_->SignalReadPacket.connect(this,
|
||||
&DcSctpTransport::OnTransportReadPacket);
|
||||
transport_->SignalClosed.connect(this, &DcSctpTransport::OnTransportClosed);
|
||||
}
|
||||
|
||||
void DcSctpTransport::DisconnectTransportSignals() {
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
if (!transport_) {
|
||||
return;
|
||||
}
|
||||
transport_->SignalWritableState.disconnect(this);
|
||||
transport_->SignalReadPacket.disconnect(this);
|
||||
transport_->SignalClosed.disconnect(this);
|
||||
}
|
||||
|
||||
void DcSctpTransport::OnTransportWritableState(
|
||||
rtc::PacketTransportInternal* transport) {
|
||||
RTC_DCHECK_RUN_ON(network_thread_);
|
||||
RTC_DCHECK_EQ(transport_, transport);
|
||||
|
||||
RTC_LOG(LS_INFO) << debug_name_ << "->OnTransportWritableState(), writable="
|
||||
<< transport->writable();
|
||||
|
||||
MaybeConnectSocket();
|
||||
}
|
||||
|
||||
void DcSctpTransport::OnTransportReadPacket(
|
||||
rtc::PacketTransportInternal* transport,
|
||||
const char* data,
|
||||
size_t length,
|
||||
const int64_t& /* packet_time_us */,
|
||||
int flags) {
|
||||
if (flags) {
|
||||
// We are only interested in SCTP packets.
|
||||
return;
|
||||
}
|
||||
|
||||
RTC_LOG(LS_VERBOSE) << debug_name_
|
||||
<< "->OnTransportReadPacket(), length=" << length;
|
||||
if (socket_) {
|
||||
socket_->ReceivePacket(rtc::ArrayView<const uint8_t>(
|
||||
reinterpret_cast<const uint8_t*>(data), length));
|
||||
}
|
||||
}
|
||||
|
||||
void DcSctpTransport::OnTransportClosed(
|
||||
rtc::PacketTransportInternal* transport) {
|
||||
RTC_LOG(LS_VERBOSE) << debug_name_ << "->OnTransportClosed().";
|
||||
SignalClosedAbruptly();
|
||||
}
|
||||
|
||||
void DcSctpTransport::MaybeConnectSocket() {
|
||||
if (transport_ && transport_->writable() && socket_ &&
|
||||
socket_->state() == dcsctp::SocketState::kClosed) {
|
||||
socket_->Connect();
|
||||
}
|
||||
}
|
||||
} // namespace webrtc
|
108
media/sctp/dcsctp_transport.h
Normal file
108
media/sctp/dcsctp_transport.h
Normal file
|
@ -0,0 +1,108 @@
|
|||
/*
|
||||
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license
|
||||
* that can be found in the LICENSE file in the root of the source
|
||||
* tree. An additional intellectual property rights grant can be found
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
|
||||
#ifndef MEDIA_SCTP_DCSCTP_TRANSPORT_H_
|
||||
#define MEDIA_SCTP_DCSCTP_TRANSPORT_H_
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include "absl/strings/string_view.h"
|
||||
#include "absl/types/optional.h"
|
||||
#include "api/array_view.h"
|
||||
#include "media/sctp/sctp_transport_internal.h"
|
||||
#include "net/dcsctp/public/dcsctp_options.h"
|
||||
#include "net/dcsctp/public/dcsctp_socket.h"
|
||||
#include "net/dcsctp/public/types.h"
|
||||
#include "net/dcsctp/timer/task_queue_timeout.h"
|
||||
#include "p2p/base/packet_transport_internal.h"
|
||||
#include "rtc_base/copy_on_write_buffer.h"
|
||||
#include "rtc_base/random.h"
|
||||
#include "rtc_base/third_party/sigslot/sigslot.h"
|
||||
#include "rtc_base/thread.h"
|
||||
#include "system_wrappers/include/clock.h"
|
||||
|
||||
namespace webrtc {
|
||||
|
||||
class DcSctpTransport : public cricket::SctpTransportInternal,
|
||||
public dcsctp::DcSctpSocketCallbacks,
|
||||
public sigslot::has_slots<> {
|
||||
public:
|
||||
DcSctpTransport(rtc::Thread* network_thread,
|
||||
rtc::PacketTransportInternal* transport,
|
||||
Clock* clock);
|
||||
~DcSctpTransport() override;
|
||||
|
||||
// cricket::SctpTransportInternal
|
||||
void SetDtlsTransport(rtc::PacketTransportInternal* transport) override;
|
||||
bool Start(int local_sctp_port,
|
||||
int remote_sctp_port,
|
||||
int max_message_size) override;
|
||||
bool OpenStream(int sid) override;
|
||||
bool ResetStream(int sid) override;
|
||||
bool SendData(const cricket::SendDataParams& params,
|
||||
const rtc::CopyOnWriteBuffer& payload,
|
||||
cricket::SendDataResult* result = nullptr) override;
|
||||
bool ReadyToSendData() override;
|
||||
int max_message_size() const override;
|
||||
absl::optional<int> max_outbound_streams() const override;
|
||||
absl::optional<int> max_inbound_streams() const override;
|
||||
void set_debug_name_for_testing(const char* debug_name) override;
|
||||
|
||||
private:
|
||||
// dcsctp::DcSctpSocketCallbacks
|
||||
void SendPacket(rtc::ArrayView<const uint8_t> data) override;
|
||||
std::unique_ptr<dcsctp::Timeout> CreateTimeout() override;
|
||||
dcsctp::TimeMs TimeMillis() override;
|
||||
uint32_t GetRandomInt(uint32_t low, uint32_t high) override;
|
||||
void NotifyOutgoingMessageBufferEmpty() override;
|
||||
void OnMessageReceived(dcsctp::DcSctpMessage message) override;
|
||||
void OnError(dcsctp::ErrorKind error, absl::string_view message) override;
|
||||
void OnAborted(dcsctp::ErrorKind error, absl::string_view message) override;
|
||||
void OnConnected() override;
|
||||
void OnClosed() override;
|
||||
void OnConnectionRestarted() override;
|
||||
void OnStreamsResetFailed(
|
||||
rtc::ArrayView<const dcsctp::StreamID> outgoing_streams,
|
||||
absl::string_view reason) override;
|
||||
void OnStreamsResetPerformed(
|
||||
rtc::ArrayView<const dcsctp::StreamID> outgoing_streams) override;
|
||||
void OnIncomingStreamsReset(
|
||||
rtc::ArrayView<const dcsctp::StreamID> incoming_streams) override;
|
||||
|
||||
// Transport callbacks
|
||||
void ConnectTransportSignals();
|
||||
void DisconnectTransportSignals();
|
||||
void OnTransportWritableState(rtc::PacketTransportInternal* transport);
|
||||
void OnTransportReadPacket(rtc::PacketTransportInternal* transport,
|
||||
const char* data,
|
||||
size_t length,
|
||||
const int64_t& /* packet_time_us */,
|
||||
int flags);
|
||||
void OnTransportClosed(rtc::PacketTransportInternal* transport);
|
||||
|
||||
void MaybeConnectSocket();
|
||||
|
||||
rtc::Thread* network_thread_;
|
||||
rtc::PacketTransportInternal* transport_;
|
||||
Clock* clock_;
|
||||
Random random_;
|
||||
|
||||
dcsctp::TaskQueueTimeoutFactory task_queue_timeout_factory_;
|
||||
std::unique_ptr<dcsctp::DcSctpSocketInterface> socket_;
|
||||
std::string debug_name_ = "DcSctpTransport";
|
||||
rtc::CopyOnWriteBuffer receive_buffer_;
|
||||
|
||||
bool ready_to_send_data_ = false;
|
||||
};
|
||||
|
||||
} // namespace webrtc
|
||||
|
||||
#endif // MEDIA_SCTP_DCSCTP_TRANSPORT_H_
|
|
@ -12,6 +12,12 @@
|
|||
|
||||
#include "rtc_base/system/unused.h"
|
||||
|
||||
#ifdef WEBRTC_HAVE_DCSCTP
|
||||
#include "media/sctp/dcsctp_transport.h" // nogncheck
|
||||
#include "system_wrappers/include/clock.h" // nogncheck
|
||||
#include "system_wrappers/include/field_trial.h" // nogncheck
|
||||
#endif
|
||||
|
||||
#ifdef WEBRTC_HAVE_USRSCTP
|
||||
#include "media/sctp/usrsctp_transport.h" // nogncheck
|
||||
#endif
|
||||
|
@ -19,14 +25,24 @@
|
|||
namespace cricket {
|
||||
|
||||
SctpTransportFactory::SctpTransportFactory(rtc::Thread* network_thread)
|
||||
: network_thread_(network_thread) {
|
||||
: network_thread_(network_thread), use_dcsctp_("Enabled", false) {
|
||||
RTC_UNUSED(network_thread_);
|
||||
#ifdef WEBRTC_HAVE_DCSCTP
|
||||
webrtc::ParseFieldTrial({&use_dcsctp_}, webrtc::field_trial::FindFullName(
|
||||
"WebRTC-DataChannel-Dcsctp"));
|
||||
#endif
|
||||
}
|
||||
|
||||
std::unique_ptr<SctpTransportInternal>
|
||||
SctpTransportFactory::CreateSctpTransport(
|
||||
rtc::PacketTransportInternal* transport) {
|
||||
std::unique_ptr<SctpTransportInternal> result;
|
||||
#ifdef WEBRTC_HAVE_DCSCTP
|
||||
if (use_dcsctp_.Get()) {
|
||||
result = std::unique_ptr<SctpTransportInternal>(new webrtc::DcSctpTransport(
|
||||
network_thread_, transport, webrtc::Clock::GetRealTimeClock()));
|
||||
}
|
||||
#endif
|
||||
#ifdef WEBRTC_HAVE_USRSCTP
|
||||
if (!result) {
|
||||
result = std::unique_ptr<SctpTransportInternal>(
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include "api/transport/sctp_transport_factory_interface.h"
|
||||
#include "media/sctp/sctp_transport_internal.h"
|
||||
#include "rtc_base/experiments/field_trial_parser.h"
|
||||
#include "rtc_base/thread.h"
|
||||
|
||||
namespace cricket {
|
||||
|
@ -28,6 +29,7 @@ class SctpTransportFactory : public webrtc::SctpTransportFactoryInterface {
|
|||
|
||||
private:
|
||||
rtc::Thread* network_thread_;
|
||||
webrtc::FieldTrialFlag use_dcsctp_;
|
||||
};
|
||||
|
||||
} // namespace cricket
|
||||
|
|
|
@ -27,17 +27,19 @@
|
|||
#include "rtc_base/gunit.h"
|
||||
#include "rtc_base/ref_counted_object.h"
|
||||
#include "rtc_base/virtual_socket_server.h"
|
||||
#include "test/gtest.h"
|
||||
|
||||
namespace webrtc {
|
||||
|
||||
namespace {
|
||||
|
||||
class DataChannelIntegrationTest
|
||||
: public PeerConnectionIntegrationBaseTest,
|
||||
public ::testing::WithParamInterface<SdpSemantics> {
|
||||
class DataChannelIntegrationTest : public PeerConnectionIntegrationBaseTest,
|
||||
public ::testing::WithParamInterface<
|
||||
std::tuple<SdpSemantics, std::string>> {
|
||||
protected:
|
||||
DataChannelIntegrationTest()
|
||||
: PeerConnectionIntegrationBaseTest(GetParam()) {}
|
||||
: PeerConnectionIntegrationBaseTest(std::get<0>(GetParam()),
|
||||
std::get<1>(GetParam())) {}
|
||||
};
|
||||
|
||||
GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(DataChannelIntegrationTest);
|
||||
|
@ -657,15 +659,19 @@ TEST_P(DataChannelIntegrationTest, QueuedPacketsGetDeliveredInUnReliableMode) {
|
|||
EXPECT_EQ(2u, callee()->data_observer()->received_message_count());
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_SUITE_P(DataChannelIntegrationTest,
|
||||
DataChannelIntegrationTest,
|
||||
Values(SdpSemantics::kPlanB,
|
||||
SdpSemantics::kUnifiedPlan));
|
||||
INSTANTIATE_TEST_SUITE_P(
|
||||
DataChannelIntegrationTest,
|
||||
DataChannelIntegrationTest,
|
||||
Combine(Values(SdpSemantics::kPlanB, SdpSemantics::kUnifiedPlan),
|
||||
Values("WebRTC-DataChannel-Dcsctp/Enabled/",
|
||||
"WebRTC-DataChannel-Dcsctp/Disabled/")));
|
||||
|
||||
INSTANTIATE_TEST_SUITE_P(DataChannelIntegrationTest,
|
||||
DataChannelIntegrationTestWithFakeClock,
|
||||
Values(SdpSemantics::kPlanB,
|
||||
SdpSemantics::kUnifiedPlan));
|
||||
INSTANTIATE_TEST_SUITE_P(
|
||||
DataChannelIntegrationTest,
|
||||
DataChannelIntegrationTestWithFakeClock,
|
||||
Combine(Values(SdpSemantics::kPlanB, SdpSemantics::kUnifiedPlan),
|
||||
Values("WebRTC-DataChannel-Dcsctp/Enabled/",
|
||||
"WebRTC-DataChannel-Dcsctp/Disabled/")));
|
||||
|
||||
TEST_F(DataChannelIntegrationTestUnifiedPlan,
|
||||
EndToEndCallWithBundledSctpDataChannel) {
|
||||
|
|
|
@ -1330,12 +1330,17 @@ class MockIceTransportFactory : public IceTransportFactory {
|
|||
// of everything else (including "PeerConnectionFactory"s).
|
||||
class PeerConnectionIntegrationBaseTest : public ::testing::Test {
|
||||
public:
|
||||
explicit PeerConnectionIntegrationBaseTest(SdpSemantics sdp_semantics)
|
||||
PeerConnectionIntegrationBaseTest(
|
||||
SdpSemantics sdp_semantics,
|
||||
absl::optional<std::string> field_trials = absl::nullopt)
|
||||
: sdp_semantics_(sdp_semantics),
|
||||
ss_(new rtc::VirtualSocketServer()),
|
||||
fss_(new rtc::FirewallSocketServer(ss_.get())),
|
||||
network_thread_(new rtc::Thread(fss_.get())),
|
||||
worker_thread_(rtc::Thread::Create()) {
|
||||
worker_thread_(rtc::Thread::Create()),
|
||||
field_trials_(field_trials.has_value()
|
||||
? new test::ScopedFieldTrials(*field_trials)
|
||||
: nullptr) {
|
||||
network_thread_->SetName("PCNetworkThread", this);
|
||||
worker_thread_->SetName("PCWorkerThread", this);
|
||||
RTC_CHECK(network_thread_->Start());
|
||||
|
@ -1839,6 +1844,7 @@ class PeerConnectionIntegrationBaseTest : public ::testing::Test {
|
|||
std::vector<std::unique_ptr<cricket::TestTurnCustomizer>> turn_customizers_;
|
||||
std::unique_ptr<PeerConnectionIntegrationWrapper> caller_;
|
||||
std::unique_ptr<PeerConnectionIntegrationWrapper> callee_;
|
||||
std::unique_ptr<test::ScopedFieldTrials> field_trials_;
|
||||
};
|
||||
|
||||
} // namespace webrtc
|
||||
|
|
|
@ -286,6 +286,9 @@ declare_args() {
|
|||
}
|
||||
|
||||
declare_args() {
|
||||
# Enable the dcsctp backend for DataChannels and related unittests
|
||||
rtc_build_dcsctp = !build_with_mozilla && rtc_enable_sctp
|
||||
|
||||
# Enable the usrsctp backend for DataChannels and related unittests
|
||||
rtc_build_usrsctp = !build_with_mozilla && rtc_enable_sctp
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue