Remove usrsctp, dcSCTP is now the unique SCTP implementation

Bug: chromium:1243702
Change-Id: Id11299d26f0f8713a57781b57277837aace531f2
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/251821
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Reviewed-by: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Florent Castelli <orphis@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#36423}
This commit is contained in:
Florent Castelli 2022-03-31 19:15:10 +02:00 committed by WebRTC LUCI CQ
parent c128277f56
commit f2599a7f43
16 changed files with 23 additions and 3699 deletions

2
DEPS
View file

@ -280,8 +280,6 @@ deps = {
'url': 'https://chromium.googlesource.com/chromium/third_party/ub-uiautomator.git@00270549ce3161ae72ceb24712618ea28b4f9434',
'condition': 'checkout_android',
},
'src/third_party/usrsctp/usrsctplib':
'https://chromium.googlesource.com/external/github.com/sctplab/usrsctp@62d7d0c928c9a040dce96aa2f16c00e7e67d59cb',
# Dependency used by libjpeg-turbo.
'src/third_party/yasm/binaries': {
'url': 'https://chromium.googlesource.com/chromium/deps/yasm/binaries.git@52f9b3f4b0aa06da24ef8b123058bb61ee468881',

View file

@ -26,7 +26,6 @@ CPPLINT_EXCEPTIONS = [
'examples/objc',
'media/base/stream_params.h',
'media/base/video_common.h',
'media/sctp/usrsctp_transport.cc',
'modules/audio_coding',
'modules/audio_device',
'modules/audio_processing',

View file

@ -416,35 +416,6 @@ if (rtc_build_dcsctp) {
}
}
if (rtc_build_usrsctp) {
rtc_library("rtc_data_usrsctp_transport") {
defines = [
# "SCTP_DEBUG" # Uncomment for SCTP debugging.
]
sources = [
"sctp/usrsctp_transport.cc",
"sctp/usrsctp_transport.h",
]
deps = [
":rtc_data_sctp_transport_internal",
"../media:rtc_media_base",
"../p2p:rtc_p2p",
"../rtc_base",
"../rtc_base:rtc_base_approved",
"../rtc_base:threading",
"../rtc_base/synchronization:mutex",
"../rtc_base/task_utils:pending_task_safety_flag",
"../rtc_base/task_utils:to_queued_task",
"../rtc_base/third_party/sigslot:sigslot",
"//third_party/usrsctp",
]
absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",
"//third_party/abseil-cpp/absl/types:optional",
]
}
}
rtc_library("rtc_data_sctp_transport_factory") {
defines = []
sources = [
@ -453,16 +424,13 @@ rtc_library("rtc_data_sctp_transport_factory") {
]
deps = [
":rtc_data_sctp_transport_internal",
"../api:field_trials_view",
"../api/transport:sctp_transport_factory_interface",
"../rtc_base:threading",
"../rtc_base/experiments:field_trial_parser",
"../rtc_base/system:unused",
]
if (rtc_enable_sctp) {
assert(rtc_build_dcsctp || rtc_build_usrsctp,
"An SCTP backend is required to enable SCTP")
assert(rtc_build_dcsctp, "An SCTP backend is required to enable SCTP")
}
if (rtc_build_dcsctp) {
@ -473,11 +441,6 @@ rtc_library("rtc_data_sctp_transport_factory") {
"../system_wrappers:field_trial",
]
}
if (rtc_build_usrsctp) {
defines += [ "WEBRTC_HAVE_USRSCTP" ]
deps += [ ":rtc_data_usrsctp_transport" ]
}
}
rtc_source_set("rtc_media") {
@ -684,21 +647,6 @@ if (rtc_include_tests) {
sources += [ "engine/webrtc_voice_engine_unittest.cc" ]
}
if (rtc_build_usrsctp) {
sources += [
"sctp/usrsctp_transport_reliability_unittest.cc",
"sctp/usrsctp_transport_unittest.cc",
]
deps += [
":rtc_data_sctp_transport_internal",
":rtc_data_usrsctp_transport",
"../rtc_base:rtc_event",
"../rtc_base/task_utils:pending_task_safety_flag",
"../rtc_base/task_utils:to_queued_task",
"//third_party/usrsctp",
]
}
if (rtc_opus_support_120ms_ptime) {
defines += [ "WEBRTC_OPUS_SUPPORT_120MS_PTIME=1" ]
} else {

View file

@ -15,7 +15,6 @@ include_rules = [
"+p2p",
"+sound",
"+system_wrappers",
"+usrsctplib",
"+third_party/libyuv",
]

View file

@ -10,29 +10,18 @@
#include "media/sctp/sctp_transport_factory.h"
#include "api/field_trials_view.h"
#include "rtc_base/system/unused.h"
#ifdef WEBRTC_HAVE_DCSCTP
#include "media/sctp/dcsctp_transport.h" // nogncheck
#include "system_wrappers/include/clock.h" // nogncheck
#endif
#ifdef WEBRTC_HAVE_USRSCTP
#include "media/sctp/usrsctp_transport.h" // nogncheck
#include "media/sctp/dcsctp_transport.h" // nogncheck
#include "system_wrappers/include/clock.h" // nogncheck
#endif
namespace cricket {
SctpTransportFactory::SctpTransportFactory(
rtc::Thread* network_thread,
const webrtc::FieldTrialsView& field_trials)
: network_thread_(network_thread), use_usrsctp_("Disabled", false) {
SctpTransportFactory::SctpTransportFactory(rtc::Thread* network_thread)
: network_thread_(network_thread) {
RTC_UNUSED(network_thread_);
#ifdef WEBRTC_HAVE_DCSCTP
webrtc::ParseFieldTrial({&use_usrsctp_},
field_trials.Lookup("WebRTC-DataChannel-Dcsctp"));
#endif
}
std::unique_ptr<SctpTransportInternal>
@ -40,16 +29,8 @@ SctpTransportFactory::CreateSctpTransport(
rtc::PacketTransportInternal* transport) {
std::unique_ptr<SctpTransportInternal> result;
#ifdef WEBRTC_HAVE_DCSCTP
if (!use_usrsctp_.Get()) {
result = std::unique_ptr<SctpTransportInternal>(new webrtc::DcSctpTransport(
network_thread_, transport, webrtc::Clock::GetRealTimeClock()));
}
#endif
#ifdef WEBRTC_HAVE_USRSCTP
if (!result) {
result = std::unique_ptr<SctpTransportInternal>(
new UsrsctpTransport(network_thread_, transport));
}
result = std::unique_ptr<SctpTransportInternal>(new webrtc::DcSctpTransport(
network_thread_, transport, webrtc::Clock::GetRealTimeClock()));
#endif
return result;
}

View file

@ -13,25 +13,21 @@
#include <memory>
#include "api/field_trials_view.h"
#include "api/transport/sctp_transport_factory_interface.h"
#include "media/sctp/sctp_transport_internal.h"
#include "rtc_base/experiments/field_trial_parser.h"
#include "rtc_base/thread.h"
namespace cricket {
class SctpTransportFactory : public webrtc::SctpTransportFactoryInterface {
public:
explicit SctpTransportFactory(rtc::Thread* network_thread,
const webrtc::FieldTrialsView& field_trials);
explicit SctpTransportFactory(rtc::Thread* network_thread);
std::unique_ptr<SctpTransportInternal> CreateSctpTransport(
rtc::PacketTransportInternal* transport) override;
private:
rtc::Thread* network_thread_;
webrtc::FieldTrialFlag use_usrsctp_;
};
} // namespace cricket

File diff suppressed because it is too large Load diff

View file

@ -1,296 +0,0 @@
/*
* Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MEDIA_SCTP_USRSCTP_TRANSPORT_H_
#define MEDIA_SCTP_USRSCTP_TRANSPORT_H_
#include <errno.h>
#include <cstdint>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <vector>
#include "absl/types/optional.h"
#include "rtc_base/buffer.h"
#include "rtc_base/copy_on_write_buffer.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/third_party/sigslot/sigslot.h"
#include "rtc_base/thread.h"
// For SendDataParams/ReceiveDataParams.
#include "media/base/media_channel.h"
#include "media/sctp/sctp_transport_internal.h"
// Defined by "usrsctplib/usrsctp.h"
struct sockaddr_conn;
struct sctp_assoc_change;
struct sctp_rcvinfo;
struct sctp_stream_reset_event;
struct sctp_sendv_spa;
// Defined by <sys/socket.h>
struct socket;
namespace cricket {
// Holds data to be passed on to a transport.
struct SctpInboundPacket;
// From transport calls, data flows like this:
// [network thread (although it can in princple be another thread)]
// 1. SctpTransport::SendData(data)
// 2. usrsctp_sendv(data)
// [network thread returns; sctp thread then calls the following]
// 3. OnSctpOutboundPacket(wrapped_data)
// [sctp thread returns having async invoked on the network thread]
// 4. SctpTransport::OnPacketFromSctpToNetwork(wrapped_data)
// 5. DtlsTransport::SendPacket(wrapped_data)
// 6. ... across network ... a packet is sent back ...
// 7. SctpTransport::OnPacketReceived(wrapped_data)
// 8. usrsctp_conninput(wrapped_data)
// [network thread returns; sctp thread then calls the following]
// 9. OnSctpInboundData(data)
// 10. SctpTransport::OnDataFromSctpToTransport(data)
// [sctp thread returns having async invoked on the network thread]
// 11. SctpTransport::OnDataFromSctpToTransport(data)
// 12. SctpTransport::SignalDataReceived(data)
// [from the same thread, methods registered/connected to
// SctpTransport are called with the received data]
class UsrsctpTransport : public SctpTransportInternal,
public sigslot::has_slots<> {
public:
// `network_thread` is where packets will be processed and callbacks from
// this transport will be posted, and is the only thread on which public
// methods can be called.
// `transport` is not required (can be null).
UsrsctpTransport(rtc::Thread* network_thread,
rtc::PacketTransportInternal* transport);
~UsrsctpTransport() override;
UsrsctpTransport(const UsrsctpTransport&) = delete;
UsrsctpTransport& operator=(const UsrsctpTransport&) = delete;
// SctpTransportInternal overrides (see sctptransportinternal.h for comments).
void SetDtlsTransport(rtc::PacketTransportInternal* transport) override;
bool Start(int local_port, int remote_port, int max_message_size) override;
bool OpenStream(int sid) override;
bool ResetStream(int sid) override;
bool SendData(int sid,
const webrtc::SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
SendDataResult* result = nullptr) override;
bool ReadyToSendData() override;
int max_message_size() const override { return max_message_size_; }
absl::optional<int> max_outbound_streams() const override {
return max_outbound_streams_;
}
absl::optional<int> max_inbound_streams() const override {
return max_inbound_streams_;
}
void set_debug_name_for_testing(const char* debug_name) override {
debug_name_ = debug_name;
}
void InjectDataOrNotificationFromSctpForTesting(const void* data,
size_t length,
struct sctp_rcvinfo rcv,
int flags);
// Exposed to allow Post call from c-callbacks.
// TODO(deadbeef): Remove this or at least make it return a const pointer.
rtc::Thread* network_thread() const { return network_thread_; }
private:
// A message to be sent by the sctp library. This class is used to track the
// progress of writing a single message to the sctp library in the presence of
// partial writes. In this case, the Advance() function is provided in order
// to advance over what has already been accepted by the sctp library and
// avoid copying the remaining partial message buffer.
class OutgoingMessage {
public:
OutgoingMessage(const rtc::CopyOnWriteBuffer& buffer,
int sid,
const webrtc::SendDataParams& send_params)
: buffer_(buffer), sid_(sid), send_params_(send_params) {}
// Advances the buffer by the incremented amount. Must not advance further
// than the current data size.
void Advance(size_t increment) {
RTC_DCHECK_LE(increment + offset_, buffer_.size());
offset_ += increment;
}
size_t size() const { return buffer_.size() - offset_; }
const void* data() const { return buffer_.data() + offset_; }
int sid() const { return sid_; }
webrtc::SendDataParams send_params() const { return send_params_; }
private:
const rtc::CopyOnWriteBuffer buffer_;
int sid_;
const webrtc::SendDataParams send_params_;
size_t offset_ = 0;
};
void ConnectTransportSignals();
void DisconnectTransportSignals();
// Creates the socket and connects.
bool Connect();
// Returns false when opening the socket failed.
bool OpenSctpSocket();
// Helpet method to set socket options.
bool ConfigureSctpSocket();
// Sets |sock_ |to nullptr.
void CloseSctpSocket();
// Sends a SCTP_RESET_STREAM for all streams in closing_ssids_.
bool SendQueuedStreamResets();
// Sets the "ready to send" flag and fires signal if needed.
void SetReadyToSendData();
// Sends the outgoing buffered message that was only partially accepted by the
// sctp lib because it did not have enough space. Returns true if the entire
// buffered message was accepted by the sctp lib.
bool SendBufferedMessage();
// Tries to send the `payload` on the usrsctp lib. The message will be
// advanced by the amount that was sent.
SendDataResult SendMessageInternal(OutgoingMessage* message);
// Callbacks from DTLS transport.
void OnWritableState(rtc::PacketTransportInternal* transport);
virtual void OnPacketRead(rtc::PacketTransportInternal* transport,
const char* data,
size_t len,
const int64_t& packet_time_us,
int flags);
void OnClosed(rtc::PacketTransportInternal* transport);
// Methods related to usrsctp callbacks.
void OnSendThresholdCallback();
sockaddr_conn GetSctpSockAddr(int port);
// Called using `invoker_` to send packet on the network.
void OnPacketFromSctpToNetwork(const rtc::CopyOnWriteBuffer& buffer);
// Called on the network thread.
// Flags are standard socket API flags (RFC 6458).
void OnDataOrNotificationFromSctp(const void* data,
size_t length,
struct sctp_rcvinfo rcv,
int flags);
// Called using `invoker_` to decide what to do with the data.
void OnDataFromSctpToTransport(const ReceiveDataParams& params,
const rtc::CopyOnWriteBuffer& buffer);
// Called using `invoker_` to decide what to do with the notification.
void OnNotificationFromSctp(const rtc::CopyOnWriteBuffer& buffer);
void OnNotificationAssocChange(const sctp_assoc_change& change);
void OnStreamResetEvent(const struct sctp_stream_reset_event* evt);
// Responsible for marshalling incoming data to the transports listeners, and
// outgoing data to the network interface.
rtc::Thread* network_thread_;
// Helps pass inbound/outbound packets asynchronously to the network thread.
webrtc::ScopedTaskSafety task_safety_;
// Underlying DTLS transport.
rtc::PacketTransportInternal* transport_ = nullptr;
// Track the data received from usrsctp between callbacks until the EOR bit
// arrives.
rtc::CopyOnWriteBuffer partial_incoming_message_;
ReceiveDataParams partial_params_;
int partial_flags_;
// A message that was attempted to be sent, but was only partially accepted by
// usrsctp lib with usrsctp_sendv() because it cannot buffer the full message.
// This occurs because we explicitly set the EOR bit when sending, so
// usrsctp_sendv() is not atomic.
absl::optional<OutgoingMessage> partial_outgoing_message_;
bool was_ever_writable_ = false;
int local_port_ = kSctpDefaultPort;
int remote_port_ = kSctpDefaultPort;
int max_message_size_ = kSctpSendBufferSize;
struct socket* sock_ = nullptr; // The socket created by usrsctp_socket(...).
// Has Start been called? Don't create SCTP socket until it has.
bool started_ = false;
// Are we ready to queue data (SCTP socket created, and not blocked due to
// congestion control)? Different than `transport_`'s "ready to send".
bool ready_to_send_data_ = false;
// Used to keep track of the status of each stream (or rather, each pair of
// incoming/outgoing streams with matching IDs). It's specifically used to
// keep track of the status of resets, but more information could be put here
// later.
//
// See datachannel.h for a summary of the closing procedure.
struct StreamStatus {
// Closure initiated by application via ResetStream? Note that
// this may be true while outgoing_reset_initiated is false if the outgoing
// reset needed to be queued.
bool closure_initiated = false;
// Whether we've initiated the outgoing stream reset via
// SCTP_RESET_STREAMS.
bool outgoing_reset_initiated = false;
// Whether usrsctp has indicated that the incoming/outgoing streams have
// been reset. It's expected that the peer will reset its outgoing stream
// (our incoming stream) after receiving the reset for our outgoing stream,
// though older versions of chromium won't do this. See crbug.com/559394
// for context.
bool outgoing_reset_complete = false;
bool incoming_reset_complete = false;
// Some helper methods to improve code readability.
bool is_open() const {
return !closure_initiated && !incoming_reset_complete &&
!outgoing_reset_complete;
}
// We need to send an outgoing reset if the application has closed the data
// channel, or if we received a reset of the incoming stream from the
// remote endpoint, indicating the data channel was closed remotely.
bool need_outgoing_reset() const {
return (incoming_reset_complete || closure_initiated) &&
!outgoing_reset_initiated;
}
bool reset_complete() const {
return outgoing_reset_complete && incoming_reset_complete;
}
};
// Entries should only be removed from this map if `reset_complete` is
// true.
std::map<uint32_t, StreamStatus> stream_status_by_sid_;
// A static human-readable name for debugging messages.
const char* debug_name_ = "UsrsctpTransport";
// Hides usrsctp interactions from this header file.
class UsrSctpWrapper;
// Number of channels negotiated. Not set before negotiation completes.
absl::optional<int> max_outbound_streams_;
absl::optional<int> max_inbound_streams_;
// Used for associating this transport with the underlying sctp socket in
// various callbacks.
uintptr_t id_ = 0;
friend class UsrsctpTransportMap;
};
class UsrsctpTransportMap;
} // namespace cricket
#endif // MEDIA_SCTP_USRSCTP_TRANSPORT_H_

View file

@ -1,809 +0,0 @@
/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include <memory>
#include <queue>
#include <string>
#include "media/sctp/sctp_transport_internal.h"
#include "media/sctp/usrsctp_transport.h"
#include "rtc_base/copy_on_write_buffer.h"
#include "rtc_base/event.h"
#include "rtc_base/gunit.h"
#include "rtc_base/logging.h"
#include "rtc_base/random.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/thread.h"
#include "test/gtest.h"
namespace {
static constexpr int kDefaultTimeout = 10000; // 10 seconds.
static constexpr int kTransport1Port = 15001;
static constexpr int kTransport2Port = 25002;
static constexpr int kLogPerMessagesCount = 100;
/**
* An simple packet transport implementation which can be
* configured to simulate uniform random packet loss and
* configurable random packet delay and reordering.
*/
class SimulatedPacketTransport final : public rtc::PacketTransportInternal {
public:
SimulatedPacketTransport(std::string name,
rtc::Thread* transport_thread,
uint8_t packet_loss_percents,
uint16_t avg_send_delay_millis)
: transport_name_(name),
transport_thread_(transport_thread),
packet_loss_percents_(packet_loss_percents),
avg_send_delay_millis_(avg_send_delay_millis),
random_(42) {
RTC_DCHECK(transport_thread_);
RTC_DCHECK_LE(packet_loss_percents_, 100);
RTC_DCHECK_RUN_ON(transport_thread_);
}
~SimulatedPacketTransport() override {
RTC_DCHECK_RUN_ON(transport_thread_);
destination_ = nullptr;
SignalWritableState(this);
}
SimulatedPacketTransport(const SimulatedPacketTransport&) = delete;
SimulatedPacketTransport& operator=(const SimulatedPacketTransport&) = delete;
const std::string& transport_name() const override { return transport_name_; }
bool writable() const override { return destination_ != nullptr; }
bool receiving() const override { return true; }
int SendPacket(const char* data,
size_t len,
const rtc::PacketOptions& options,
int flags = 0) {
RTC_DCHECK_RUN_ON(transport_thread_);
auto destination = destination_.load();
if (destination == nullptr) {
return -1;
}
if (random_.Rand(100) < packet_loss_percents_) {
// silent packet loss
return 0;
}
rtc::CopyOnWriteBuffer buffer(data, len);
auto send_task = ToQueuedTask(
destination->task_safety_.flag(),
[destination, flags, buffer = std::move(buffer)] {
destination->SignalReadPacket(
destination, reinterpret_cast<const char*>(buffer.data()),
buffer.size(), rtc::Time(), flags);
});
// Introduce random send delay in range [0 .. 2 * avg_send_delay_millis_]
// millis, which will also work as random packet reordering mechanism.
uint16_t actual_send_delay = avg_send_delay_millis_;
int16_t reorder_delay =
avg_send_delay_millis_ *
std::min(1.0, std::max(-1.0, random_.Gaussian(0, 0.5)));
actual_send_delay += reorder_delay;
if (actual_send_delay > 0) {
destination->transport_thread_->PostDelayedTask(std::move(send_task),
actual_send_delay);
} else {
destination->transport_thread_->PostTask(std::move(send_task));
}
return 0;
}
int SetOption(rtc::Socket::Option opt, int value) override { return 0; }
bool GetOption(rtc::Socket::Option opt, int* value) override { return false; }
int GetError() override { return 0; }
absl::optional<rtc::NetworkRoute> network_route() const override {
return absl::nullopt;
}
void SetDestination(SimulatedPacketTransport* destination) {
RTC_DCHECK_RUN_ON(transport_thread_);
if (destination == this) {
return;
}
destination_ = destination;
SignalWritableState(this);
}
private:
const std::string transport_name_;
rtc::Thread* const transport_thread_;
const uint8_t packet_loss_percents_;
const uint16_t avg_send_delay_millis_;
std::atomic<SimulatedPacketTransport*> destination_ ATOMIC_VAR_INIT(nullptr);
webrtc::Random random_;
webrtc::ScopedTaskSafety task_safety_;
};
/**
* A helper class to send specified number of messages over UsrsctpTransport
* with SCTP reliability settings provided by user. The reliability settings are
* specified by passing a template instance of SendDataParams. The sid will be
* assigned by sender itself and will be assigned from range
* [cricket::kMinSctpSid; cricket::kMaxSctpSid]. The wide range of sids are used
* to possibly trigger more execution paths inside usrsctp.
*/
class SctpDataSender final {
public:
SctpDataSender(rtc::Thread* thread,
cricket::UsrsctpTransport* transport,
uint64_t target_messages_count,
webrtc::SendDataParams send_params,
uint32_t sender_id)
: thread_(thread),
transport_(transport),
target_messages_count_(target_messages_count),
send_params_(send_params),
sender_id_(sender_id) {
RTC_DCHECK(thread_);
RTC_DCHECK(transport_);
}
SctpDataSender(const SctpDataSender&) = delete;
SctpDataSender& operator=(const SctpDataSender&) = delete;
void Start() {
thread_->PostTask(ToQueuedTask(task_safety_.flag(), [this] {
if (started_) {
RTC_LOG(LS_INFO) << sender_id_ << " sender is already started";
return;
}
started_ = true;
SendNextMessage();
}));
}
uint64_t BytesSentCount() const { return num_bytes_sent_; }
uint64_t MessagesSentCount() const { return num_messages_sent_; }
absl::optional<std::string> GetLastError() {
absl::optional<std::string> result = absl::nullopt;
thread_->Invoke<void>(RTC_FROM_HERE,
[this, &result] { result = last_error_; });
return result;
}
bool WaitForCompletion(int give_up_after_ms) {
return sent_target_messages_count_.Wait(give_up_after_ms, kDefaultTimeout);
}
private:
void SendNextMessage() {
RTC_DCHECK_RUN_ON(thread_);
if (!started_ || num_messages_sent_ >= target_messages_count_) {
sent_target_messages_count_.Set();
return;
}
if (num_messages_sent_ % kLogPerMessagesCount == 0) {
RTC_LOG(LS_INFO) << sender_id_ << " sender will try send message "
<< (num_messages_sent_ + 1) << " out of "
<< target_messages_count_;
}
webrtc::SendDataParams params(send_params_);
int sid =
cricket::kMinSctpSid + (num_messages_sent_ % cricket::kMaxSctpStreams);
cricket::SendDataResult result;
transport_->SendData(sid, params, payload_, &result);
switch (result) {
case cricket::SDR_BLOCK:
// retry after timeout
thread_->PostDelayedTask(
ToQueuedTask(task_safety_.flag(), [this] { SendNextMessage(); }),
500);
break;
case cricket::SDR_SUCCESS:
// send next
num_bytes_sent_ += payload_.size();
++num_messages_sent_;
thread_->PostTask(
ToQueuedTask(task_safety_.flag(), [this] { SendNextMessage(); }));
break;
case cricket::SDR_ERROR:
// give up
last_error_ = "UsrsctpTransport::SendData error returned";
sent_target_messages_count_.Set();
break;
}
}
rtc::Thread* const thread_;
cricket::UsrsctpTransport* const transport_;
const uint64_t target_messages_count_;
const webrtc::SendDataParams send_params_;
const uint32_t sender_id_;
rtc::CopyOnWriteBuffer payload_{std::string(1400, '.').c_str(), 1400};
std::atomic<bool> started_ ATOMIC_VAR_INIT(false);
std::atomic<uint64_t> num_messages_sent_ ATOMIC_VAR_INIT(0);
rtc::Event sent_target_messages_count_{true, false};
std::atomic<uint64_t> num_bytes_sent_ ATOMIC_VAR_INIT(0);
absl::optional<std::string> last_error_;
webrtc::ScopedTaskSafetyDetached task_safety_;
};
/**
* A helper class which counts number of received messages
* and bytes over UsrsctpTransport. Also allow waiting until
* specified number of messages received.
*/
class SctpDataReceiver final : public sigslot::has_slots<> {
public:
explicit SctpDataReceiver(uint32_t receiver_id,
uint64_t target_messages_count)
: receiver_id_(receiver_id),
target_messages_count_(target_messages_count) {}
SctpDataReceiver(const SctpDataReceiver&) = delete;
SctpDataReceiver& operator=(const SctpDataReceiver&) = delete;
void OnDataReceived(const cricket::ReceiveDataParams& params,
const rtc::CopyOnWriteBuffer& data) {
num_bytes_received_ += data.size();
if (++num_messages_received_ == target_messages_count_) {
received_target_messages_count_.Set();
}
if (num_messages_received_ % kLogPerMessagesCount == 0) {
RTC_LOG(LS_INFO) << receiver_id_ << " receiver got "
<< num_messages_received_ << " messages";
}
}
uint64_t MessagesReceivedCount() const { return num_messages_received_; }
uint64_t BytesReceivedCount() const { return num_bytes_received_; }
bool WaitForMessagesReceived(int timeout_millis) {
return received_target_messages_count_.Wait(timeout_millis);
}
private:
std::atomic<uint64_t> num_messages_received_ ATOMIC_VAR_INIT(0);
std::atomic<uint64_t> num_bytes_received_ ATOMIC_VAR_INIT(0);
rtc::Event received_target_messages_count_{true, false};
const uint32_t receiver_id_;
const uint64_t target_messages_count_;
};
/**
* Simple class to manage set of threads.
*/
class ThreadPool final {
public:
explicit ThreadPool(size_t threads_count) : random_(42) {
RTC_DCHECK(threads_count > 0);
threads_.reserve(threads_count);
for (size_t i = 0; i < threads_count; i++) {
auto thread = rtc::Thread::Create();
thread->SetName("Thread #" + rtc::ToString(i + 1) + " from Pool", this);
thread->Start();
threads_.emplace_back(std::move(thread));
}
}
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
rtc::Thread* GetRandomThread() {
return threads_[random_.Rand(0U, threads_.size() - 1)].get();
}
private:
webrtc::Random random_;
std::vector<std::unique_ptr<rtc::Thread>> threads_;
};
/**
* Represents single ping-pong test over UsrsctpTransport.
* User can specify target number of message for bidirectional
* send, underlying transport packets loss and average packet delay
* and SCTP delivery settings.
*/
class SctpPingPong final {
public:
SctpPingPong(uint32_t id,
uint16_t port1,
uint16_t port2,
rtc::Thread* transport_thread1,
rtc::Thread* transport_thread2,
uint32_t messages_count,
uint8_t packet_loss_percents,
uint16_t avg_send_delay_millis,
webrtc::SendDataParams send_params)
: id_(id),
port1_(port1),
port2_(port2),
transport_thread1_(transport_thread1),
transport_thread2_(transport_thread2),
messages_count_(messages_count),
packet_loss_percents_(packet_loss_percents),
avg_send_delay_millis_(avg_send_delay_millis),
send_params_(send_params) {
RTC_DCHECK(transport_thread1_ != nullptr);
RTC_DCHECK(transport_thread2_ != nullptr);
}
virtual ~SctpPingPong() {
transport_thread1_->Invoke<void>(RTC_FROM_HERE, [this] {
data_sender1_.reset();
sctp_transport1_->SetDtlsTransport(nullptr);
packet_transport1_->SetDestination(nullptr);
});
transport_thread2_->Invoke<void>(RTC_FROM_HERE, [this] {
data_sender2_.reset();
sctp_transport2_->SetDtlsTransport(nullptr);
packet_transport2_->SetDestination(nullptr);
});
transport_thread1_->Invoke<void>(RTC_FROM_HERE, [this] {
sctp_transport1_.reset();
data_receiver1_.reset();
packet_transport1_.reset();
});
transport_thread2_->Invoke<void>(RTC_FROM_HERE, [this] {
sctp_transport2_.reset();
data_receiver2_.reset();
packet_transport2_.reset();
});
}
SctpPingPong(const SctpPingPong&) = delete;
SctpPingPong& operator=(const SctpPingPong&) = delete;
bool Start() {
CreateTwoConnectedSctpTransportsWithAllStreams();
{
webrtc::MutexLock lock(&lock_);
if (!errors_list_.empty()) {
return false;
}
}
data_sender1_.reset(new SctpDataSender(transport_thread1_,
sctp_transport1_.get(),
messages_count_, send_params_, id_));
data_sender2_.reset(new SctpDataSender(transport_thread2_,
sctp_transport2_.get(),
messages_count_, send_params_, id_));
data_sender1_->Start();
data_sender2_->Start();
return true;
}
std::vector<std::string> GetErrorsList() const {
std::vector<std::string> result;
{
webrtc::MutexLock lock(&lock_);
result = errors_list_;
}
return result;
}
void WaitForCompletion(int32_t timeout_millis) {
if (data_sender1_ == nullptr) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", sender 1 is not created");
return;
}
if (data_sender2_ == nullptr) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", sender 2 is not created");
return;
}
if (!data_sender1_->WaitForCompletion(timeout_millis)) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", sender 1 failed to complete within " +
rtc::ToString(timeout_millis) + " millis");
return;
}
auto sender1_error = data_sender1_->GetLastError();
if (sender1_error.has_value()) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", sender 1 error: " + sender1_error.value());
return;
}
if (!data_sender2_->WaitForCompletion(timeout_millis)) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", sender 2 failed to complete within " +
rtc::ToString(timeout_millis) + " millis");
return;
}
auto sender2_error = data_sender2_->GetLastError();
if (sender2_error.has_value()) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", sender 2 error: " + sender1_error.value());
return;
}
if ((data_sender1_->MessagesSentCount() != messages_count_)) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", sender 1 sent only " +
rtc::ToString(data_sender1_->MessagesSentCount()) +
" out of " + rtc::ToString(messages_count_));
return;
}
if ((data_sender2_->MessagesSentCount() != messages_count_)) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", sender 2 sent only " +
rtc::ToString(data_sender2_->MessagesSentCount()) +
" out of " + rtc::ToString(messages_count_));
return;
}
if (!data_receiver1_->WaitForMessagesReceived(timeout_millis)) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", receiver 1 did not complete within " +
rtc::ToString(messages_count_));
return;
}
if (!data_receiver2_->WaitForMessagesReceived(timeout_millis)) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", receiver 2 did not complete within " +
rtc::ToString(messages_count_));
return;
}
if (data_receiver1_->BytesReceivedCount() !=
data_sender2_->BytesSentCount()) {
ReportError(
"SctpPingPong id = " + rtc::ToString(id_) + ", receiver 1 received " +
rtc::ToString(data_receiver1_->BytesReceivedCount()) +
" bytes, but sender 2 send " +
rtc::ToString(rtc::ToString(data_sender2_->BytesSentCount())));
return;
}
if (data_receiver2_->BytesReceivedCount() !=
data_sender1_->BytesSentCount()) {
ReportError(
"SctpPingPong id = " + rtc::ToString(id_) + ", receiver 2 received " +
rtc::ToString(data_receiver2_->BytesReceivedCount()) +
" bytes, but sender 1 send " +
rtc::ToString(rtc::ToString(data_sender1_->BytesSentCount())));
return;
}
RTC_LOG(LS_INFO) << "SctpPingPong id = " << id_ << " is done";
}
private:
void CreateTwoConnectedSctpTransportsWithAllStreams() {
transport_thread1_->Invoke<void>(RTC_FROM_HERE, [this] {
packet_transport1_.reset(new SimulatedPacketTransport(
"SctpPingPong id = " + rtc::ToString(id_) + ", packet transport 1",
transport_thread1_, packet_loss_percents_, avg_send_delay_millis_));
data_receiver1_.reset(new SctpDataReceiver(id_, messages_count_));
sctp_transport1_.reset(new cricket::UsrsctpTransport(
transport_thread1_, packet_transport1_.get()));
sctp_transport1_->set_debug_name_for_testing("sctp transport 1");
sctp_transport1_->SignalDataReceived.connect(
data_receiver1_.get(), &SctpDataReceiver::OnDataReceived);
for (uint32_t i = cricket::kMinSctpSid; i <= cricket::kMaxSctpSid; i++) {
if (!sctp_transport1_->OpenStream(i)) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", sctp transport 1 stream " + rtc::ToString(i) +
" failed to open");
break;
}
}
});
transport_thread2_->Invoke<void>(RTC_FROM_HERE, [this] {
packet_transport2_.reset(new SimulatedPacketTransport(
"SctpPingPong id = " + rtc::ToString(id_) + "packet transport 2",
transport_thread2_, packet_loss_percents_, avg_send_delay_millis_));
data_receiver2_.reset(new SctpDataReceiver(id_, messages_count_));
sctp_transport2_.reset(new cricket::UsrsctpTransport(
transport_thread2_, packet_transport2_.get()));
sctp_transport2_->set_debug_name_for_testing("sctp transport 2");
sctp_transport2_->SignalDataReceived.connect(
data_receiver2_.get(), &SctpDataReceiver::OnDataReceived);
for (uint32_t i = cricket::kMinSctpSid; i <= cricket::kMaxSctpSid; i++) {
if (!sctp_transport2_->OpenStream(i)) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", sctp transport 2 stream " + rtc::ToString(i) +
" failed to open");
break;
}
}
});
transport_thread1_->Invoke<void>(RTC_FROM_HERE, [this] {
packet_transport1_->SetDestination(packet_transport2_.get());
});
transport_thread2_->Invoke<void>(RTC_FROM_HERE, [this] {
packet_transport2_->SetDestination(packet_transport1_.get());
});
transport_thread1_->Invoke<void>(RTC_FROM_HERE, [this] {
if (!sctp_transport1_->Start(port1_, port2_,
cricket::kSctpSendBufferSize)) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", failed to start sctp transport 1");
}
});
transport_thread2_->Invoke<void>(RTC_FROM_HERE, [this] {
if (!sctp_transport2_->Start(port2_, port1_,
cricket::kSctpSendBufferSize)) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", failed to start sctp transport 2");
}
});
}
void ReportError(std::string error) {
webrtc::MutexLock lock(&lock_);
errors_list_.push_back(std::move(error));
}
std::unique_ptr<SimulatedPacketTransport> packet_transport1_;
std::unique_ptr<SimulatedPacketTransport> packet_transport2_;
std::unique_ptr<SctpDataReceiver> data_receiver1_;
std::unique_ptr<SctpDataReceiver> data_receiver2_;
std::unique_ptr<cricket::UsrsctpTransport> sctp_transport1_;
std::unique_ptr<cricket::UsrsctpTransport> sctp_transport2_;
std::unique_ptr<SctpDataSender> data_sender1_;
std::unique_ptr<SctpDataSender> data_sender2_;
mutable webrtc::Mutex lock_;
std::vector<std::string> errors_list_ RTC_GUARDED_BY(lock_);
const uint32_t id_;
const uint16_t port1_;
const uint16_t port2_;
rtc::Thread* const transport_thread1_;
rtc::Thread* const transport_thread2_;
const uint32_t messages_count_;
const uint8_t packet_loss_percents_;
const uint16_t avg_send_delay_millis_;
const webrtc::SendDataParams send_params_;
};
/**
* Helper function to calculate max number of milliseconds
* allowed for test to run based on test configuration.
*/
constexpr int32_t GetExecutionTimeLimitInMillis(uint32_t total_messages,
uint8_t packet_loss_percents) {
return std::min<int64_t>(
std::numeric_limits<int32_t>::max(),
std::max<int64_t>(
1LL * total_messages * 100 *
std::max(1, packet_loss_percents * packet_loss_percents),
kDefaultTimeout));
}
} // namespace
namespace cricket {
/**
* The set of tests intended to check usrsctp reliability on
* stress conditions: multiple sockets, concurrent access,
* lossy network link. It was observed in the past that
* usrsctp might misbehave in concurrent environment
* under load on lossy networks: deadlocks and memory corruption
* issues might happen in non-basic usage scenarios.
* It's recommended to run this test whenever usrsctp version
* used is updated to verify it properly works in stress
* conditions under higher than usual load.
* It is also recommended to enable ASAN when these tests
* are executed, so whenever memory bug is happen inside usrsctp,
* it will be easier to understand what went wrong with ASAN
* provided diagnostics information.
* The tests cases currently disabled by default due to
* long execution time and due to unresolved issue inside
* `usrsctp` library detected by try-bots with ThreadSanitizer.
*/
class UsrSctpReliabilityTest : public ::testing::Test {};
/**
* A simple test which send multiple messages over reliable
* connection, usefull to verify test infrastructure works.
* Execution time is less than 1 second.
*/
TEST_F(UsrSctpReliabilityTest,
DISABLED_AllMessagesAreDeliveredOverReliableConnection) {
auto thread1 = rtc::Thread::Create();
auto thread2 = rtc::Thread::Create();
thread1->Start();
thread2->Start();
constexpr uint8_t packet_loss_percents = 0;
constexpr uint16_t avg_send_delay_millis = 10;
constexpr uint32_t messages_count = 100;
constexpr int32_t wait_timeout =
GetExecutionTimeLimitInMillis(messages_count, packet_loss_percents);
static_assert(wait_timeout > 0,
"Timeout computation must produce positive value");
webrtc::SendDataParams send_params;
send_params.ordered = true;
SctpPingPong test(1, kTransport1Port, kTransport2Port, thread1.get(),
thread2.get(), messages_count, packet_loss_percents,
avg_send_delay_millis, send_params);
EXPECT_TRUE(test.Start()) << rtc::join(test.GetErrorsList(), ';');
test.WaitForCompletion(wait_timeout);
auto errors_list = test.GetErrorsList();
EXPECT_TRUE(errors_list.empty()) << rtc::join(errors_list, ';');
}
/**
* A test to verify that multiple messages can be reliably delivered
* over lossy network when usrsctp configured to guarantee reliably
* and in order delivery.
* The test case is disabled by default because it takes
* long time to run.
* Execution time is about 2.5 minutes.
*/
TEST_F(UsrSctpReliabilityTest,
DISABLED_AllMessagesAreDeliveredOverLossyConnectionReliableAndInOrder) {
auto thread1 = rtc::Thread::Create();
auto thread2 = rtc::Thread::Create();
thread1->Start();
thread2->Start();
constexpr uint8_t packet_loss_percents = 5;
constexpr uint16_t avg_send_delay_millis = 16;
constexpr uint32_t messages_count = 10000;
constexpr int32_t wait_timeout =
GetExecutionTimeLimitInMillis(messages_count, packet_loss_percents);
static_assert(wait_timeout > 0,
"Timeout computation must produce positive value");
webrtc::SendDataParams send_params;
send_params.ordered = true;
SctpPingPong test(1, kTransport1Port, kTransport2Port, thread1.get(),
thread2.get(), messages_count, packet_loss_percents,
avg_send_delay_millis, send_params);
EXPECT_TRUE(test.Start()) << rtc::join(test.GetErrorsList(), ';');
test.WaitForCompletion(wait_timeout);
auto errors_list = test.GetErrorsList();
EXPECT_TRUE(errors_list.empty()) << rtc::join(errors_list, ';');
}
/**
* A test to verify that multiple messages can be reliably delivered
* over lossy network when usrsctp configured to retransmit lost
* packets.
* The test case is disabled by default because it takes
* long time to run.
* Execution time is about 2.5 minutes.
*/
TEST_F(UsrSctpReliabilityTest,
DISABLED_AllMessagesAreDeliveredOverLossyConnectionWithRetries) {
auto thread1 = rtc::Thread::Create();
auto thread2 = rtc::Thread::Create();
thread1->Start();
thread2->Start();
constexpr uint8_t packet_loss_percents = 5;
constexpr uint16_t avg_send_delay_millis = 16;
constexpr uint32_t messages_count = 10000;
constexpr int32_t wait_timeout =
GetExecutionTimeLimitInMillis(messages_count, packet_loss_percents);
static_assert(wait_timeout > 0,
"Timeout computation must produce positive value");
webrtc::SendDataParams send_params;
send_params.ordered = false;
send_params.max_rtx_count = std::numeric_limits<uint16_t>::max();
send_params.max_rtx_ms = std::numeric_limits<uint16_t>::max();
SctpPingPong test(1, kTransport1Port, kTransport2Port, thread1.get(),
thread2.get(), messages_count, packet_loss_percents,
avg_send_delay_millis, send_params);
EXPECT_TRUE(test.Start()) << rtc::join(test.GetErrorsList(), ';');
test.WaitForCompletion(wait_timeout);
auto errors_list = test.GetErrorsList();
EXPECT_TRUE(errors_list.empty()) << rtc::join(errors_list, ';');
}
/**
* This is kind of reliability stress-test of usrsctp to verify
* that all messages are delivered when multiple usrsctp
* sockets used concurrently and underlying transport is lossy.
*
* It was observed in the past that in stress condtions usrsctp
* might encounter deadlock and memory corruption bugs:
* https://github.com/sctplab/usrsctp/issues/325
*
* It is recoomended to run this test whenever usrsctp version
* used by WebRTC is updated.
*
* The test case is disabled by default because it takes
* long time to run.
* Execution time of this test is about 1-2 hours.
*/
TEST_F(UsrSctpReliabilityTest,
DISABLED_AllMessagesAreDeliveredOverLossyConnectionConcurrentTests) {
ThreadPool pool(16);
webrtc::SendDataParams send_params;
send_params.ordered = true;
constexpr uint32_t base_sctp_port = 5000;
// The constants value below were experimentally chosen
// to have reasonable execution time and to reproduce
// particular deadlock issue inside usrsctp:
// https://github.com/sctplab/usrsctp/issues/325
// The constants values may be adjusted next time
// some other issue inside usrsctp need to be debugged.
constexpr uint32_t messages_count = 200;
constexpr uint8_t packet_loss_percents = 5;
constexpr uint16_t avg_send_delay_millis = 0;
constexpr uint32_t parallel_ping_pongs = 16 * 1024;
constexpr uint32_t total_ping_pong_tests = 16 * parallel_ping_pongs;
constexpr int32_t wait_timeout = GetExecutionTimeLimitInMillis(
total_ping_pong_tests * messages_count, packet_loss_percents);
static_assert(wait_timeout > 0,
"Timeout computation must produce positive value");
std::queue<std::unique_ptr<SctpPingPong>> tests;
for (uint32_t i = 0; i < total_ping_pong_tests; i++) {
uint32_t port1 =
base_sctp_port + (2 * i) % (UINT16_MAX - base_sctp_port - 1);
auto test = std::make_unique<SctpPingPong>(
i, port1, port1 + 1, pool.GetRandomThread(), pool.GetRandomThread(),
messages_count, packet_loss_percents, avg_send_delay_millis,
send_params);
EXPECT_TRUE(test->Start()) << rtc::join(test->GetErrorsList(), ';');
tests.emplace(std::move(test));
while (tests.size() >= parallel_ping_pongs) {
auto& oldest_test = tests.front();
oldest_test->WaitForCompletion(wait_timeout);
auto errors_list = oldest_test->GetErrorsList();
EXPECT_TRUE(errors_list.empty()) << rtc::join(errors_list, ';');
tests.pop();
}
}
while (!tests.empty()) {
auto& oldest_test = tests.front();
oldest_test->WaitForCompletion(wait_timeout);
auto errors_list = oldest_test->GetErrorsList();
EXPECT_TRUE(errors_list.empty()) << rtc::join(errors_list, ';');
tests.pop();
}
}
} // namespace cricket

View file

@ -1,883 +0,0 @@
/*
* Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "media/sctp/usrsctp_transport.h"
#include <stdio.h>
#include <string.h>
#include <usrsctp.h>
#include <memory>
#include <string>
#include <vector>
#include "absl/algorithm/container.h"
#include "media/sctp/sctp_transport_internal.h"
#include "p2p/base/fake_dtls_transport.h"
#include "rtc_base/copy_on_write_buffer.h"
#include "rtc_base/gunit.h"
#include "rtc_base/logging.h"
#include "rtc_base/thread.h"
#include "test/gtest.h"
namespace {
static const int kDefaultTimeout = 10000; // 10 seconds.
// Use ports other than the default 5000 for testing.
static const int kTransport1Port = 5001;
static const int kTransport2Port = 5002;
} // namespace
namespace cricket {
// This is essentially a buffer to hold received data. It stores only the last
// received data. Calling OnDataReceived twice overwrites old data with the
// newer one.
// TODO(ldixon): Implement constraints, and allow new data to be added to old
// instead of replacing it.
class SctpFakeDataReceiver : public sigslot::has_slots<> {
public:
SctpFakeDataReceiver() : received_(false) {}
void Clear() {
received_ = false;
last_data_ = "";
last_params_ = ReceiveDataParams();
num_messages_received_ = 0;
}
void OnDataReceived(const ReceiveDataParams& params,
const rtc::CopyOnWriteBuffer& data) {
num_messages_received_++;
received_ = true;
last_data_ = std::string(data.data<char>(), data.size());
last_params_ = params;
}
bool received() const { return received_; }
std::string last_data() const { return last_data_; }
ReceiveDataParams last_params() const { return last_params_; }
size_t num_messages_received() const { return num_messages_received_; }
private:
bool received_;
std::string last_data_;
size_t num_messages_received_ = 0;
ReceiveDataParams last_params_;
};
class SctpTransportObserver : public sigslot::has_slots<> {
public:
explicit SctpTransportObserver(UsrsctpTransport* transport) {
transport->SignalClosingProcedureComplete.connect(
this, &SctpTransportObserver::OnClosingProcedureComplete);
transport->SignalReadyToSendData.connect(
this, &SctpTransportObserver::OnReadyToSend);
}
int StreamCloseCount(int stream) {
return absl::c_count(closed_streams_, stream);
}
bool WasStreamClosed(int stream) {
return absl::c_linear_search(closed_streams_, stream);
}
bool ReadyToSend() { return ready_to_send_; }
private:
void OnClosingProcedureComplete(int stream) {
closed_streams_.push_back(stream);
}
void OnReadyToSend() { ready_to_send_ = true; }
std::vector<int> closed_streams_;
bool ready_to_send_ = false;
};
// Helper class used to immediately attempt to reopen a stream as soon as it's
// been closed.
class SignalTransportClosedReopener : public sigslot::has_slots<> {
public:
SignalTransportClosedReopener(UsrsctpTransport* transport,
UsrsctpTransport* peer)
: transport_(transport), peer_(peer) {}
int StreamCloseCount(int stream) { return absl::c_count(streams_, stream); }
private:
void OnStreamClosed(int stream) {
transport_->OpenStream(stream);
peer_->OpenStream(stream);
streams_.push_back(stream);
}
UsrsctpTransport* transport_;
UsrsctpTransport* peer_;
std::vector<int> streams_;
};
// SCTP Data Engine testing framework.
class SctpTransportTest : public ::testing::Test, public sigslot::has_slots<> {
protected:
// usrsctp uses the NSS random number generator on non-Android platforms,
// so we need to initialize SSL.
static void SetUpTestSuite() {}
void SetupConnectedTransportsWithTwoStreams() {
SetupConnectedTransportsWithTwoStreams(kTransport1Port, kTransport2Port);
}
void SetupConnectedTransportsWithTwoStreams(int port1, int port2) {
fake_dtls1_.reset(new FakeDtlsTransport("fake dtls 1", 0));
fake_dtls2_.reset(new FakeDtlsTransport("fake dtls 2", 0));
recv1_.reset(new SctpFakeDataReceiver());
recv2_.reset(new SctpFakeDataReceiver());
transport1_.reset(CreateTransport(fake_dtls1_.get(), recv1_.get()));
transport1_->set_debug_name_for_testing("transport1");
transport1_->SignalReadyToSendData.connect(
this, &SctpTransportTest::OnChan1ReadyToSend);
transport2_.reset(CreateTransport(fake_dtls2_.get(), recv2_.get()));
transport2_->set_debug_name_for_testing("transport2");
transport2_->SignalReadyToSendData.connect(
this, &SctpTransportTest::OnChan2ReadyToSend);
// Setup two connected transports ready to send and receive.
bool asymmetric = false;
fake_dtls1_->SetDestination(fake_dtls2_.get(), asymmetric);
RTC_LOG(LS_VERBOSE) << "Transport setup ----------------------------- ";
AddStream(1);
AddStream(2);
RTC_LOG(LS_VERBOSE)
<< "Connect the transports -----------------------------";
// Both transports need to have started (with matching ports) for an
// association to be formed.
transport1_->Start(port1, port2, kSctpSendBufferSize);
transport2_->Start(port2, port1, kSctpSendBufferSize);
}
bool AddStream(int sid) {
bool ret = true;
ret = ret && transport1_->OpenStream(sid);
ret = ret && transport2_->OpenStream(sid);
return ret;
}
UsrsctpTransport* CreateTransport(FakeDtlsTransport* fake_dtls,
SctpFakeDataReceiver* recv) {
UsrsctpTransport* transport =
new UsrsctpTransport(rtc::Thread::Current(), fake_dtls);
// When data is received, pass it to the SctpFakeDataReceiver.
transport->SignalDataReceived.connect(
recv, &SctpFakeDataReceiver::OnDataReceived);
return transport;
}
bool SendData(UsrsctpTransport* chan,
int sid,
const std::string& msg,
SendDataResult* result,
bool ordered = false) {
webrtc::SendDataParams params;
params.ordered = ordered;
return chan->SendData(
sid, params, rtc::CopyOnWriteBuffer(&msg[0], msg.length()), result);
}
bool ReceivedData(const SctpFakeDataReceiver* recv,
int sid,
const std::string& msg) {
return (recv->received() && recv->last_params().sid == sid &&
recv->last_data() == msg);
}
bool ProcessMessagesUntilIdle() {
rtc::Thread* thread = rtc::Thread::Current();
while (!thread->empty()) {
rtc::Message msg;
if (thread->Get(&msg, rtc::Thread::kForever)) {
thread->Dispatch(&msg);
}
}
return !thread->IsQuitting();
}
UsrsctpTransport* transport1() { return transport1_.get(); }
UsrsctpTransport* transport2() { return transport2_.get(); }
SctpFakeDataReceiver* receiver1() { return recv1_.get(); }
SctpFakeDataReceiver* receiver2() { return recv2_.get(); }
FakeDtlsTransport* fake_dtls1() { return fake_dtls1_.get(); }
FakeDtlsTransport* fake_dtls2() { return fake_dtls2_.get(); }
int transport1_ready_to_send_count() {
return transport1_ready_to_send_count_;
}
int transport2_ready_to_send_count() {
return transport2_ready_to_send_count_;
}
private:
std::unique_ptr<FakeDtlsTransport> fake_dtls1_;
std::unique_ptr<FakeDtlsTransport> fake_dtls2_;
std::unique_ptr<SctpFakeDataReceiver> recv1_;
std::unique_ptr<SctpFakeDataReceiver> recv2_;
std::unique_ptr<UsrsctpTransport> transport1_;
std::unique_ptr<UsrsctpTransport> transport2_;
int transport1_ready_to_send_count_ = 0;
int transport2_ready_to_send_count_ = 0;
void OnChan1ReadyToSend() { ++transport1_ready_to_send_count_; }
void OnChan2ReadyToSend() { ++transport2_ready_to_send_count_; }
};
TEST_F(SctpTransportTest, MessageInterleavedWithNotification) {
FakeDtlsTransport fake_dtls1("fake dtls 1", 0);
FakeDtlsTransport fake_dtls2("fake dtls 2", 0);
SctpFakeDataReceiver recv1;
SctpFakeDataReceiver recv2;
std::unique_ptr<UsrsctpTransport> transport1(
CreateTransport(&fake_dtls1, &recv1));
std::unique_ptr<UsrsctpTransport> transport2(
CreateTransport(&fake_dtls2, &recv2));
// Add a stream.
transport1->OpenStream(1);
transport2->OpenStream(1);
// Start SCTP transports.
transport1->Start(kSctpDefaultPort, kSctpDefaultPort, kSctpSendBufferSize);
transport2->Start(kSctpDefaultPort, kSctpDefaultPort, kSctpSendBufferSize);
// Connect the two fake DTLS transports.
fake_dtls1.SetDestination(&fake_dtls2, false);
// Ensure the SCTP association has been established
// Note: I'd rather watch for an assoc established state here but couldn't
// find any exposed...
SendDataResult result;
ASSERT_TRUE(SendData(transport2.get(), 1, "meow", &result));
EXPECT_TRUE_WAIT(ReceivedData(&recv1, 1, "meow"), kDefaultTimeout);
// Detach the DTLS transport to ensure only we will inject packets from here
// on.
transport1->SetDtlsTransport(nullptr);
// Prepare chunk buffer and metadata
auto chunk = rtc::CopyOnWriteBuffer(32);
struct sctp_rcvinfo meta = {0};
meta.rcv_sid = 1;
meta.rcv_ssn = 1337;
meta.rcv_ppid = rtc::HostToNetwork32(51); // text (complete)
// Inject chunk 1/2.
meta.rcv_tsn = 42;
meta.rcv_cumtsn = 42;
chunk.SetData("meow?", 5);
transport1->InjectDataOrNotificationFromSctpForTesting(chunk.data(),
chunk.size(), meta, 0);
// Inject a notification in between chunks.
union sctp_notification notification;
memset(&notification, 0, sizeof(notification));
// Type chosen since it's not handled apart from being logged
notification.sn_header.sn_type = SCTP_PEER_ADDR_CHANGE;
notification.sn_header.sn_flags = 0;
notification.sn_header.sn_length = sizeof(notification);
transport1->InjectDataOrNotificationFromSctpForTesting(
&notification, sizeof(notification), {0}, MSG_NOTIFICATION);
// Inject chunk 2/2
meta.rcv_tsn = 42;
meta.rcv_cumtsn = 43;
chunk.SetData(" rawr!", 6);
transport1->InjectDataOrNotificationFromSctpForTesting(
chunk.data(), chunk.size(), meta, MSG_EOR);
// Expect the message to contain both chunks.
EXPECT_TRUE_WAIT(ReceivedData(&recv1, 1, "meow? rawr!"), kDefaultTimeout);
}
// Test that data can be sent end-to-end when an SCTP transport starts with one
// transport (which is unwritable), and then switches to another transport. A
// common scenario due to how BUNDLE works.
TEST_F(SctpTransportTest, SwitchDtlsTransport) {
FakeDtlsTransport black_hole("black hole", 0);
FakeDtlsTransport fake_dtls1("fake dtls 1", 0);
FakeDtlsTransport fake_dtls2("fake dtls 2", 0);
SctpFakeDataReceiver recv1;
SctpFakeDataReceiver recv2;
// Construct transport1 with the "black hole" transport.
std::unique_ptr<UsrsctpTransport> transport1(
CreateTransport(&black_hole, &recv1));
std::unique_ptr<UsrsctpTransport> transport2(
CreateTransport(&fake_dtls2, &recv2));
// Add a stream.
transport1->OpenStream(1);
transport2->OpenStream(1);
// Tell them both to start (though transport1_ is connected to black_hole).
transport1->Start(kTransport1Port, kTransport2Port, kSctpSendBufferSize);
transport2->Start(kTransport2Port, kTransport1Port, kSctpSendBufferSize);
// Switch transport1_ to the normal fake_dtls1_ transport.
transport1->SetDtlsTransport(&fake_dtls1);
// Connect the two fake DTLS transports.
bool asymmetric = false;
fake_dtls1.SetDestination(&fake_dtls2, asymmetric);
// Make sure we end up able to send data.
SendDataResult result;
ASSERT_TRUE(SendData(transport1.get(), 1, "foo", &result));
ASSERT_TRUE(SendData(transport2.get(), 1, "bar", &result));
EXPECT_TRUE_WAIT(ReceivedData(&recv2, 1, "foo"), kDefaultTimeout);
EXPECT_TRUE_WAIT(ReceivedData(&recv1, 1, "bar"), kDefaultTimeout);
// Setting a null DtlsTransport should work. This could happen when an SCTP
// data section is rejected.
transport1->SetDtlsTransport(nullptr);
}
// Calling Start twice shouldn't do anything bad, if with the same parameters.
TEST_F(SctpTransportTest, DuplicateStartCallsIgnored) {
SetupConnectedTransportsWithTwoStreams();
EXPECT_TRUE(transport1()->Start(kTransport1Port, kTransport2Port,
kSctpSendBufferSize));
// Make sure we can still send/recv data.
SendDataResult result;
ASSERT_TRUE(SendData(transport1(), 1, "foo", &result));
ASSERT_TRUE(SendData(transport2(), 1, "bar", &result));
EXPECT_TRUE_WAIT(ReceivedData(receiver2(), 1, "foo"), kDefaultTimeout);
EXPECT_TRUE_WAIT(ReceivedData(receiver1(), 1, "bar"), kDefaultTimeout);
}
// Calling Start a second time with a different port should fail.
TEST_F(SctpTransportTest, CallingStartWithDifferentPortFails) {
SetupConnectedTransportsWithTwoStreams();
EXPECT_FALSE(transport1()->Start(kTransport1Port, 1234, kSctpSendBufferSize));
EXPECT_FALSE(transport1()->Start(1234, kTransport2Port, kSctpSendBufferSize));
}
// A value of -1 for the local/remote port should be treated as the default
// (5000).
TEST_F(SctpTransportTest, NegativeOnePortTreatedAsDefault) {
FakeDtlsTransport fake_dtls1("fake dtls 1", 0);
FakeDtlsTransport fake_dtls2("fake dtls 2", 0);
SctpFakeDataReceiver recv1;
SctpFakeDataReceiver recv2;
std::unique_ptr<UsrsctpTransport> transport1(
CreateTransport(&fake_dtls1, &recv1));
std::unique_ptr<UsrsctpTransport> transport2(
CreateTransport(&fake_dtls2, &recv2));
// Add a stream.
transport1->OpenStream(1);
transport2->OpenStream(1);
// Tell them both to start, giving one transport the default port and the
// other transport -1.
transport1->Start(kSctpDefaultPort, kSctpDefaultPort, kSctpSendBufferSize);
transport2->Start(-1, -1, kSctpSendBufferSize);
// Connect the two fake DTLS transports.
bool asymmetric = false;
fake_dtls1.SetDestination(&fake_dtls2, asymmetric);
// Make sure we end up able to send data.
SendDataResult result;
ASSERT_TRUE(SendData(transport1.get(), 1, "foo", &result));
ASSERT_TRUE(SendData(transport2.get(), 1, "bar", &result));
EXPECT_TRUE_WAIT(ReceivedData(&recv2, 1, "foo"), kDefaultTimeout);
EXPECT_TRUE_WAIT(ReceivedData(&recv1, 1, "bar"), kDefaultTimeout);
}
TEST_F(SctpTransportTest, OpenStreamWithAlreadyOpenedStreamFails) {
FakeDtlsTransport fake_dtls("fake dtls", 0);
SctpFakeDataReceiver recv;
std::unique_ptr<UsrsctpTransport> transport(
CreateTransport(&fake_dtls, &recv));
EXPECT_TRUE(transport->OpenStream(1));
EXPECT_FALSE(transport->OpenStream(1));
}
TEST_F(SctpTransportTest, ResetStreamWithAlreadyResetStreamFails) {
FakeDtlsTransport fake_dtls("fake dtls", 0);
SctpFakeDataReceiver recv;
std::unique_ptr<UsrsctpTransport> transport(
CreateTransport(&fake_dtls, &recv));
EXPECT_TRUE(transport->OpenStream(1));
EXPECT_TRUE(transport->ResetStream(1));
EXPECT_FALSE(transport->ResetStream(1));
}
// Test that SignalReadyToSendData is fired after Start has been called and the
// DTLS transport is writable.
TEST_F(SctpTransportTest, SignalReadyToSendDataAfterDtlsWritable) {
FakeDtlsTransport fake_dtls("fake dtls", 0);
SctpFakeDataReceiver recv;
std::unique_ptr<UsrsctpTransport> transport(
CreateTransport(&fake_dtls, &recv));
SctpTransportObserver observer(transport.get());
transport->Start(kSctpDefaultPort, kSctpDefaultPort, kSctpSendBufferSize);
fake_dtls.SetWritable(true);
EXPECT_TRUE_WAIT(observer.ReadyToSend(), kDefaultTimeout);
}
// Run the below tests using both ordered and unordered mode.
class SctpTransportTestWithOrdered
: public SctpTransportTest,
public ::testing::WithParamInterface<bool> {};
// Tests that a small message gets buffered and later sent by the
// UsrsctpTransport when the sctp library only accepts the message partially.
TEST_P(SctpTransportTestWithOrdered, SendSmallBufferedOutgoingMessage) {
bool ordered = GetParam();
SetupConnectedTransportsWithTwoStreams();
// Wait for initial SCTP association to be formed.
EXPECT_EQ_WAIT(1, transport1_ready_to_send_count(), kDefaultTimeout);
// Make the fake transport unwritable so that messages pile up for the SCTP
// socket.
fake_dtls1()->SetWritable(false);
SendDataResult result;
// Fill almost all of sctp library's send buffer.
ASSERT_TRUE(SendData(transport1(), /*sid=*/1,
std::string(kSctpSendBufferSize - 1, 'a'), &result,
ordered));
std::string buffered_message("hello hello");
// UsrsctpTransport accepts this message by buffering part of it.
ASSERT_TRUE(
SendData(transport1(), /*sid=*/1, buffered_message, &result, ordered));
ASSERT_TRUE(transport1()->ReadyToSendData());
// Sending anything else should block now.
ASSERT_FALSE(
SendData(transport1(), /*sid=*/1, "hello again", &result, ordered));
ASSERT_EQ(SDR_BLOCK, result);
ASSERT_FALSE(transport1()->ReadyToSendData());
// Make sure the ready-to-send count hasn't changed.
EXPECT_EQ(1, transport1_ready_to_send_count());
// Make the transport writable again and expect a "SignalReadyToSendData" at
// some point after sending the buffered message.
fake_dtls1()->SetWritable(true);
EXPECT_EQ_WAIT(2, transport1_ready_to_send_count(), kDefaultTimeout);
EXPECT_TRUE_WAIT(ReceivedData(receiver2(), 1, buffered_message),
kDefaultTimeout);
EXPECT_EQ(2u, receiver2()->num_messages_received());
}
// Tests that a large message gets buffered and later sent by the
// UsrsctpTransport when the sctp library only accepts the message partially.
TEST_P(SctpTransportTestWithOrdered, SendLargeBufferedOutgoingMessage) {
bool ordered = GetParam();
SetupConnectedTransportsWithTwoStreams();
// Wait for initial SCTP association to be formed.
EXPECT_EQ_WAIT(1, transport1_ready_to_send_count(), kDefaultTimeout);
// Make the fake transport unwritable so that messages pile up for the SCTP
// socket.
fake_dtls1()->SetWritable(false);
SendDataResult result;
// Fill almost all of sctp library's send buffer.
ASSERT_TRUE(SendData(transport1(), /*sid=*/1,
std::string(kSctpSendBufferSize / 2, 'a'), &result,
ordered));
std::string buffered_message(kSctpSendBufferSize, 'b');
// UsrsctpTransport accepts this message by buffering the second half.
ASSERT_TRUE(
SendData(transport1(), /*sid=*/1, buffered_message, &result, ordered));
ASSERT_TRUE(transport1()->ReadyToSendData());
// Sending anything else should block now.
ASSERT_FALSE(
SendData(transport1(), /*sid=*/1, "hello again", &result, ordered));
ASSERT_EQ(SDR_BLOCK, result);
ASSERT_FALSE(transport1()->ReadyToSendData());
// Make sure the ready-to-send count hasn't changed.
EXPECT_EQ(1, transport1_ready_to_send_count());
// Make the transport writable again and expect a "SignalReadyToSendData" at
// some point.
fake_dtls1()->SetWritable(true);
EXPECT_EQ_WAIT(2, transport1_ready_to_send_count(), kDefaultTimeout);
EXPECT_TRUE_WAIT(ReceivedData(receiver2(), 1, buffered_message),
kDefaultTimeout);
EXPECT_EQ(2u, receiver2()->num_messages_received());
}
// Tests that a large message gets buffered and later sent by the
// UsrsctpTransport when the sctp library only accepts the message partially
// during a stream reset.
TEST_P(SctpTransportTestWithOrdered,
SendLargeBufferedOutgoingMessageDuringReset) {
bool ordered = GetParam();
SetupConnectedTransportsWithTwoStreams();
SctpTransportObserver transport2_observer(transport2());
// Wait for initial SCTP association to be formed.
EXPECT_EQ_WAIT(1, transport1_ready_to_send_count(), kDefaultTimeout);
// Make the fake transport unwritable so that messages pile up for the SCTP
// socket.
fake_dtls1()->SetWritable(false);
SendDataResult result;
// Fill almost all of sctp library's send buffer.
ASSERT_TRUE(SendData(transport1(), /*sid=*/1,
std::string(kSctpSendBufferSize / 2, 'a'), &result,
ordered));
std::string buffered_message(kSctpSendBufferSize, 'b');
// UsrsctpTransport accepts this message by buffering the second half.
ASSERT_TRUE(
SendData(transport1(), /*sid=*/1, buffered_message, &result, ordered));
// Queue a stream reset
transport1()->ResetStream(/*sid=*/1);
// Make the transport writable again and expect a "SignalReadyToSendData" at
// some point after sending the buffered message.
fake_dtls1()->SetWritable(true);
EXPECT_EQ_WAIT(2, transport1_ready_to_send_count(), kDefaultTimeout);
// Queued message should be received by the receiver before receiving the
// reset
EXPECT_TRUE_WAIT(ReceivedData(receiver2(), 1, buffered_message),
kDefaultTimeout);
EXPECT_EQ(2u, receiver2()->num_messages_received());
EXPECT_TRUE_WAIT(transport2_observer.WasStreamClosed(1), kDefaultTimeout);
}
TEST_P(SctpTransportTestWithOrdered, SendData) {
bool ordered = GetParam();
SetupConnectedTransportsWithTwoStreams();
SendDataResult result;
RTC_LOG(LS_VERBOSE)
<< "transport1 sending: 'hello?' -----------------------------";
ASSERT_TRUE(SendData(transport1(), 1, "hello?", &result, ordered));
EXPECT_EQ(SDR_SUCCESS, result);
EXPECT_TRUE_WAIT(ReceivedData(receiver2(), 1, "hello?"), kDefaultTimeout);
RTC_LOG(LS_VERBOSE) << "recv2.received=" << receiver2()->received()
<< ", recv2.last_params.sid="
<< receiver2()->last_params().sid
<< ", recv2.last_params.seq_num="
<< receiver2()->last_params().seq_num
<< ", recv2.last_data=" << receiver2()->last_data();
RTC_LOG(LS_VERBOSE)
<< "transport2 sending: 'hi transport1' -----------------------------";
ASSERT_TRUE(SendData(transport2(), 2, "hi transport1", &result, ordered));
EXPECT_EQ(SDR_SUCCESS, result);
EXPECT_TRUE_WAIT(ReceivedData(receiver1(), 2, "hi transport1"),
kDefaultTimeout);
RTC_LOG(LS_VERBOSE) << "recv1.received=" << receiver1()->received()
<< ", recv1.last_params.sid="
<< receiver1()->last_params().sid
<< ", recv1.last_params.seq_num="
<< receiver1()->last_params().seq_num
<< ", recv1.last_data=" << receiver1()->last_data();
}
// Sends a lot of large messages at once and verifies SDR_BLOCK is returned.
TEST_P(SctpTransportTestWithOrdered, SendDataBlocked) {
SetupConnectedTransportsWithTwoStreams();
SendDataResult result;
webrtc::SendDataParams params;
params.ordered = GetParam();
std::vector<char> buffer(1024 * 64, 0);
for (size_t i = 0; i < 100; ++i) {
transport1()->SendData(
1, params, rtc::CopyOnWriteBuffer(&buffer[0], buffer.size()), &result);
if (result == SDR_BLOCK)
break;
}
EXPECT_EQ(SDR_BLOCK, result);
}
// Test that after an SCTP socket's buffer is filled, SignalReadyToSendData
// is fired after it begins to be drained.
TEST_P(SctpTransportTestWithOrdered, SignalReadyToSendDataAfterBlocked) {
SetupConnectedTransportsWithTwoStreams();
// Wait for initial SCTP association to be formed.
EXPECT_EQ_WAIT(1, transport1_ready_to_send_count(), kDefaultTimeout);
// Make the fake transport unwritable so that messages pile up for the SCTP
// socket.
fake_dtls1()->SetWritable(false);
// Send messages until we get EWOULDBLOCK.
static const size_t kMaxMessages = 1024;
webrtc::SendDataParams params;
params.ordered = GetParam();
rtc::CopyOnWriteBuffer buf(1024);
memset(buf.MutableData(), 0, 1024);
SendDataResult result;
size_t message_count = 0;
for (; message_count < kMaxMessages; ++message_count) {
if (!transport1()->SendData(1, params, buf, &result) &&
result == SDR_BLOCK) {
break;
}
}
ASSERT_NE(kMaxMessages, message_count)
<< "Sent max number of messages without getting SDR_BLOCK?";
// Make sure the ready-to-send count hasn't changed.
EXPECT_EQ(1, transport1_ready_to_send_count());
// Make the transport writable again and expect a "SignalReadyToSendData" at
// some point.
fake_dtls1()->SetWritable(true);
EXPECT_EQ_WAIT(2, transport1_ready_to_send_count(), kDefaultTimeout);
EXPECT_EQ_WAIT(message_count, receiver2()->num_messages_received(),
kDefaultTimeout);
}
INSTANTIATE_TEST_SUITE_P(SctpTransportTest,
SctpTransportTestWithOrdered,
::testing::Bool());
// This is a regression test that fails with earlier versions of SCTP in
// unordered mode. See bugs.webrtc.org/10939.
TEST_F(SctpTransportTest, SendsLargeDataBufferedBySctpLib) {
SetupConnectedTransportsWithTwoStreams();
// Wait for initial SCTP association to be formed.
EXPECT_EQ_WAIT(1, transport1_ready_to_send_count(), kDefaultTimeout);
// Make the fake transport unwritable so that messages pile up for the SCTP
// socket.
fake_dtls1()->SetWritable(false);
SendDataResult result;
std::string buffered_message(kSctpSendBufferSize - 1, 'a');
ASSERT_TRUE(SendData(transport1(), 1, buffered_message, &result, false));
fake_dtls1()->SetWritable(true);
EXPECT_EQ_WAIT(1, transport1_ready_to_send_count(), kDefaultTimeout);
EXPECT_TRUE_WAIT(ReceivedData(receiver2(), 1, buffered_message),
kDefaultTimeout);
}
// Trying to send data for a nonexistent stream should fail.
TEST_F(SctpTransportTest, SendDataWithNonexistentStreamFails) {
SetupConnectedTransportsWithTwoStreams();
SendDataResult result;
EXPECT_FALSE(SendData(transport2(), 123, "some data", &result));
EXPECT_EQ(SDR_ERROR, result);
}
TEST_F(SctpTransportTest, SendDataHighPorts) {
SetupConnectedTransportsWithTwoStreams(32768, 32769);
SendDataResult result;
ASSERT_TRUE(SendData(transport1(), 1, "hello?", &result));
EXPECT_EQ(SDR_SUCCESS, result);
EXPECT_TRUE_WAIT(ReceivedData(receiver2(), 1, "hello?"), kDefaultTimeout);
ASSERT_TRUE(SendData(transport2(), 2, "hi transport1", &result));
EXPECT_EQ(SDR_SUCCESS, result);
EXPECT_TRUE_WAIT(ReceivedData(receiver1(), 2, "hi transport1"),
kDefaultTimeout);
}
TEST_F(SctpTransportTest, ClosesRemoteStream) {
SetupConnectedTransportsWithTwoStreams();
SctpTransportObserver transport1_observer(transport1());
SctpTransportObserver transport2_observer(transport2());
SendDataResult result;
ASSERT_TRUE(SendData(transport1(), 1, "hello?", &result));
EXPECT_EQ(SDR_SUCCESS, result);
EXPECT_TRUE_WAIT(ReceivedData(receiver2(), 1, "hello?"), kDefaultTimeout);
ASSERT_TRUE(SendData(transport2(), 2, "hi transport1", &result));
EXPECT_EQ(SDR_SUCCESS, result);
EXPECT_TRUE_WAIT(ReceivedData(receiver1(), 2, "hi transport1"),
kDefaultTimeout);
// Close stream 1 on transport 1. Transport 2 should notify us.
transport1()->ResetStream(1);
EXPECT_TRUE_WAIT(transport2_observer.WasStreamClosed(1), kDefaultTimeout);
}
TEST_F(SctpTransportTest, ClosesRemoteStreamWithNoData) {
SetupConnectedTransportsWithTwoStreams();
SctpTransportObserver transport1_observer(transport1());
SctpTransportObserver transport2_observer(transport2());
// Close stream 1 on transport 1. Transport 2 should notify us.
transport1()->ResetStream(1);
EXPECT_TRUE_WAIT(transport2_observer.WasStreamClosed(1), kDefaultTimeout);
}
TEST_F(SctpTransportTest, ClosesTwoRemoteStreams) {
SetupConnectedTransportsWithTwoStreams();
AddStream(3);
SctpTransportObserver transport1_observer(transport1());
SctpTransportObserver transport2_observer(transport2());
SendDataResult result;
ASSERT_TRUE(SendData(transport1(), 1, "hello?", &result));
EXPECT_EQ(SDR_SUCCESS, result);
EXPECT_TRUE_WAIT(ReceivedData(receiver2(), 1, "hello?"), kDefaultTimeout);
ASSERT_TRUE(SendData(transport2(), 2, "hi transport1", &result));
EXPECT_EQ(SDR_SUCCESS, result);
EXPECT_TRUE_WAIT(ReceivedData(receiver1(), 2, "hi transport1"),
kDefaultTimeout);
// Close two streams on one side.
transport2()->ResetStream(2);
transport2()->ResetStream(3);
EXPECT_TRUE_WAIT(transport2_observer.WasStreamClosed(2), kDefaultTimeout);
EXPECT_TRUE_WAIT(transport2_observer.WasStreamClosed(3), kDefaultTimeout);
}
TEST_F(SctpTransportTest, ClosesStreamsOnBothSides) {
SetupConnectedTransportsWithTwoStreams();
AddStream(3);
AddStream(4);
SctpTransportObserver transport1_observer(transport1());
SctpTransportObserver transport2_observer(transport2());
SendDataResult result;
ASSERT_TRUE(SendData(transport1(), 1, "hello?", &result));
EXPECT_EQ(SDR_SUCCESS, result);
EXPECT_TRUE_WAIT(ReceivedData(receiver2(), 1, "hello?"), kDefaultTimeout);
ASSERT_TRUE(SendData(transport2(), 2, "hi transport1", &result));
EXPECT_EQ(SDR_SUCCESS, result);
EXPECT_TRUE_WAIT(ReceivedData(receiver1(), 2, "hi transport1"),
kDefaultTimeout);
// Close one stream on transport1(), while closing three streams on
// transport2(). They will conflict (only one side can close anything at a
// time, apparently). Test the resolution of the conflict.
transport1()->ResetStream(1);
transport2()->ResetStream(2);
transport2()->ResetStream(3);
transport2()->ResetStream(4);
EXPECT_TRUE_WAIT(transport2_observer.WasStreamClosed(1), kDefaultTimeout);
EXPECT_TRUE_WAIT(transport1_observer.WasStreamClosed(2), kDefaultTimeout);
EXPECT_TRUE_WAIT(transport1_observer.WasStreamClosed(3), kDefaultTimeout);
EXPECT_TRUE_WAIT(transport1_observer.WasStreamClosed(4), kDefaultTimeout);
}
TEST_F(SctpTransportTest, RefusesHighNumberedTransports) {
SetupConnectedTransportsWithTwoStreams();
EXPECT_TRUE(AddStream(kMaxSctpSid));
EXPECT_FALSE(AddStream(kMaxSctpSid + 1));
}
TEST_F(SctpTransportTest, ReusesAStream) {
// Shut down transport 1, then open it up again for reuse.
SetupConnectedTransportsWithTwoStreams();
SendDataResult result;
SctpTransportObserver transport2_observer(transport2());
ASSERT_TRUE(SendData(transport1(), 1, "hello?", &result));
EXPECT_EQ(SDR_SUCCESS, result);
EXPECT_TRUE_WAIT(ReceivedData(receiver2(), 1, "hello?"), kDefaultTimeout);
transport1()->ResetStream(1);
EXPECT_TRUE_WAIT(transport2_observer.WasStreamClosed(1), kDefaultTimeout);
// Transport 1 is gone now.
// Create a new transport 1.
AddStream(1);
ASSERT_TRUE(SendData(transport1(), 1, "hi?", &result));
EXPECT_EQ(SDR_SUCCESS, result);
EXPECT_TRUE_WAIT(ReceivedData(receiver2(), 1, "hi?"), kDefaultTimeout);
transport1()->ResetStream(1);
EXPECT_EQ_WAIT(2, transport2_observer.StreamCloseCount(1), kDefaultTimeout);
}
TEST_F(SctpTransportTest, RejectsTooLargeMessageSize) {
FakeDtlsTransport fake_dtls("fake dtls", 0);
SctpFakeDataReceiver recv;
std::unique_ptr<UsrsctpTransport> transport(
CreateTransport(&fake_dtls, &recv));
EXPECT_FALSE(transport->Start(kSctpDefaultPort, kSctpDefaultPort,
kSctpSendBufferSize + 1));
}
TEST_F(SctpTransportTest, RejectsTooSmallMessageSize) {
FakeDtlsTransport fake_dtls("fake dtls", 0);
SctpFakeDataReceiver recv;
std::unique_ptr<UsrsctpTransport> transport(
CreateTransport(&fake_dtls, &recv));
EXPECT_FALSE(transport->Start(kSctpDefaultPort, kSctpDefaultPort, 0));
}
TEST_F(SctpTransportTest, RejectsSendTooLargeMessages) {
SetupConnectedTransportsWithTwoStreams();
// Use "Start" to reduce the max message size
transport1()->Start(kTransport1Port, kTransport2Port, 10);
EXPECT_EQ(10, transport1()->max_message_size());
const char eleven_characters[] = "12345678901";
SendDataResult result;
EXPECT_FALSE(SendData(transport1(), 1, eleven_characters, &result));
}
// Regression test for: crbug.com/1137936
TEST_F(SctpTransportTest, SctpRestartWithPendingDataDoesNotDeadlock) {
// In order to trigger a restart, we'll connect two transports, then
// disconnect them and connect the first to a third, which will initiate the
// new handshake.
FakeDtlsTransport fake_dtls1("fake dtls 1", 0);
FakeDtlsTransport fake_dtls2("fake dtls 2", 0);
FakeDtlsTransport fake_dtls3("fake dtls 3", 0);
SctpFakeDataReceiver recv1;
SctpFakeDataReceiver recv2;
SctpFakeDataReceiver recv3;
std::unique_ptr<UsrsctpTransport> transport1(
CreateTransport(&fake_dtls1, &recv1));
std::unique_ptr<UsrsctpTransport> transport2(
CreateTransport(&fake_dtls2, &recv2));
std::unique_ptr<UsrsctpTransport> transport3(
CreateTransport(&fake_dtls3, &recv3));
SctpTransportObserver observer(transport1.get());
// Connect the first two transports.
fake_dtls1.SetDestination(&fake_dtls2, /*asymmetric=*/false);
transport1->OpenStream(1);
transport2->OpenStream(1);
transport1->Start(5000, 5000, kSctpSendBufferSize);
transport2->Start(5000, 5000, kSctpSendBufferSize);
// Sanity check that we can send data.
SendDataResult result;
ASSERT_TRUE(SendData(transport1.get(), 1, "foo", &result));
ASSERT_TRUE_WAIT(ReceivedData(&recv2, 1, "foo"), kDefaultTimeout);
// Disconnect the transports and attempt to send a message, which will be
// stored in an output queue; this is necessary to reproduce the bug.
fake_dtls1.SetDestination(nullptr, /*asymmetric=*/false);
EXPECT_TRUE(SendData(transport1.get(), 1, "bar", &result));
// Now connect to the third transport.
fake_dtls1.SetDestination(&fake_dtls3, /*asymmetric=*/false);
transport3->OpenStream(1);
transport3->Start(5000, 5000, kSctpSendBufferSize);
// Send data from the new endpoint to the original endpoint. If data is
// received that means the restart must have been successful.
EXPECT_TRUE(SendData(transport3.get(), 1, "baz", &result));
EXPECT_TRUE_WAIT(ReceivedData(&recv1, 1, "baz"), kDefaultTimeout);
}
} // namespace cricket

View file

@ -79,8 +79,7 @@ std::unique_ptr<SctpTransportFactoryInterface> MaybeCreateSctpFactory(
return factory;
}
#ifdef WEBRTC_HAVE_SCTP
return std::make_unique<cricket::SctpTransportFactory>(network_thread,
field_trials);
return std::make_unique<cricket::SctpTransportFactory>(network_thread);
#else
return nullptr;
#endif

View file

@ -56,13 +56,12 @@ namespace {
#define DISABLED_ON_ANDROID(t) t
#endif
class DataChannelIntegrationTest : public PeerConnectionIntegrationBaseTest,
public ::testing::WithParamInterface<
std::tuple<SdpSemantics, std::string>> {
class DataChannelIntegrationTest
: public PeerConnectionIntegrationBaseTest,
public ::testing::WithParamInterface<SdpSemantics> {
protected:
DataChannelIntegrationTest()
: PeerConnectionIntegrationBaseTest(std::get<0>(GetParam()),
std::get<1>(GetParam())) {}
: PeerConnectionIntegrationBaseTest(GetParam()) {}
};
// Fake clock must be set before threads are started to prevent race on
@ -420,7 +419,7 @@ TEST_P(DataChannelIntegrationTest, SctpDataChannelConfigSentToOtherSide) {
EXPECT_FALSE(callee()->data_channel()->negotiated());
}
// Test usrsctp's ability to process unordered data stream, where data actually
// Test sctp's ability to process unordered data stream, where data actually
// arrives out of order using simulated delays. Previously there have been some
// bugs in this area.
TEST_P(DataChannelIntegrationTest, StressTestUnorderedSctpDataChannel) {
@ -844,17 +843,8 @@ TEST_P(DataChannelIntegrationTest,
EXPECT_GT(202u, callee()->data_observer()->received_message_count());
EXPECT_LE(2u, callee()->data_observer()->received_message_count());
// Then, check that observed behavior (lose some messages) has not changed
if (!trials().IsDisabled("WebRTC-DataChannel-Dcsctp")) {
// DcSctp loses all messages. This is correct.
EXPECT_EQ(2u, callee()->data_observer()->received_message_count());
} else {
// Usrsctp loses some messages, but keeps messages not attempted.
// THIS IS THE WRONG BEHAVIOR. According to discussion in
// https://github.com/sctplab/usrsctp/issues/584, all these packets
// should be discarded.
// TODO(bugs.webrtc.org/12731): Fix this.
EXPECT_EQ(90u, callee()->data_observer()->received_message_count());
}
// DcSctp loses all messages. This is correct.
EXPECT_EQ(2u, callee()->data_observer()->received_message_count());
}
TEST_P(DataChannelIntegrationTest,
@ -918,12 +908,10 @@ TEST_P(DataChannelIntegrationTest,
callee()->data_observer()->received_message_count());
}
INSTANTIATE_TEST_SUITE_P(
DataChannelIntegrationTest,
DataChannelIntegrationTest,
Combine(Values(SdpSemantics::kPlanB, SdpSemantics::kUnifiedPlan),
Values("WebRTC-DataChannel-Dcsctp/Enabled/",
"WebRTC-DataChannel-Dcsctp/Disabled/")));
INSTANTIATE_TEST_SUITE_P(DataChannelIntegrationTest,
DataChannelIntegrationTest,
Values(SdpSemantics::kPlanB,
SdpSemantics::kUnifiedPlan));
TEST_F(DataChannelIntegrationTestUnifiedPlan,
EndToEndCallWithBundledSctpDataChannel) {

View file

@ -28,10 +28,9 @@ closes, but the object itself may survive longer than the PeerConnection.
## cricket::SctpTransportInternal
[`cricket::SctpTransportInternal`](https://source.chromium.org/chromium/chromium/src/+/main:third_party/webrtc/media/sctp/sctp_transport_internal.h?q=cricket::SctpTransportInternal) owns two objects: The SCTP association object (currently
implemented by wrapping the usrsctp library) and the DTLS transport, which is
the object used to send and receive messages as emitted from or consumed by the
usrsctp library.
[`cricket::SctpTransportInternal`](https://source.chromium.org/chromium/chromium/src/+/main:third_party/webrtc/media/sctp/sctp_transport_internal.h?q=cricket::SctpTransportInternal) owns two objects: The SCTP association object
and the DTLS transport, which is the object used to send and receive messages
as emitted from or consumed by the sctp library.
It communicates state changes and events using sigslot.

View file

@ -64,7 +64,6 @@ LIB_TO_LICENSES_DICT = {
'pffft': ['third_party/pffft/LICENSE'],
'protobuf': ['third_party/protobuf/LICENSE'],
'rnnoise': ['third_party/rnnoise/COPYING'],
'usrsctp': ['third_party/usrsctp/LICENSE'],
'webrtc': ['LICENSE'],
'zlib': ['third_party/zlib/LICENSE'],
'base64': ['rtc_base/third_party/base64/LICENSE'],

View file

@ -32,22 +32,6 @@ char kTSanDefaultSuppressions[] =
// https://code.google.com/p/webrtc/issues/detail?id=2080
"race:rtc_base/logging.cc\n"
// third_party/usrsctp
// TODO(jiayl): https://code.google.com/p/webrtc/issues/detail?id=3492
"race:user_sctp_timer_iterate\n"
// https://code.google.com/p/webrtc/issues/detail?id=5151
"race:sctp_close\n"
// lock-order-inversion in usrsctp
// TODO(orphis): https://crbug.com/webrtc/12823
"deadlock:usrsctp_conninput\n"
"deadlock:usrsctp_connect\n"
// data race in usrsctp
// https://crbug.com/webrtc/9850
"race:sctp_process_cookie_existing\n"
// Potential deadlocks detected after roll in r6516.
// https://code.google.com/p/webrtc/issues/detail?id=3509
"deadlock:webrtc::test::UdpSocketManagerPosixImpl::RemoveSocket\n"

View file

@ -306,9 +306,6 @@ declare_args() {
# Enable the dcsctp backend for DataChannels and related unittests
rtc_build_dcsctp = !build_with_mozilla && rtc_enable_sctp
# Enable the usrsctp backend for DataChannels and related unittests
rtc_build_usrsctp = !build_with_mozilla && rtc_enable_sctp
# Enable gRPC used for negotiation in multiprocess tests
rtc_enable_grpc = rtc_enable_protobuf && (is_linux || is_mac)
}