[DataChannelInterface] Introduce DataChannelInterface::SendAsync()

One problem with the existing Send() method is that it has a return
value that is problematic for a fully async implementation.

A second problem with Send() is that the return value is bool and not
RTCError (webrtc:13289), which is why OnSendComplete() uses RTCError.

Also, start deprecating `bool Send()` in favor of `void SendAsync()` and
adding `network_safety_` flag for posting async operations to the
network thread. This flag also takes over from the
`connected_to_transport_` which can now be removed.

Bug: webrtc:11547, webrtc:13289
Change-Id: I87bbc7e9b964a52684bdfe0e6ebc5230be254e8b
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/299760
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39817}
This commit is contained in:
Tommi 2023-04-11 17:32:34 +02:00 committed by WebRTC LUCI CQ
parent dd557fdb1e
commit a50a81a150
7 changed files with 500 additions and 31 deletions

View file

@ -10,6 +10,8 @@
#include "api/data_channel_interface.h" #include "api/data_channel_interface.h"
#include "rtc_base/checks.h"
namespace webrtc { namespace webrtc {
bool DataChannelInterface::ordered() const { bool DataChannelInterface::ordered() const {
@ -44,4 +46,17 @@ uint64_t DataChannelInterface::MaxSendQueueSize() {
return 16 * 1024 * 1024; // 16 MiB return 16 * 1024 * 1024; // 16 MiB
} }
// TODO(tommi): Remove method once downstream implementations have been removed.
bool DataChannelInterface::Send(const DataBuffer& buffer) {
RTC_DCHECK_NOTREACHED();
return false;
}
// TODO(tommi): Remove implementation once method is pure virtual.
void DataChannelInterface::SendAsync(
DataBuffer buffer,
absl::AnyInvocable<void(RTCError) &&> on_complete) {
RTC_DCHECK_NOTREACHED();
}
} // namespace webrtc } // namespace webrtc

View file

@ -19,6 +19,7 @@
#include <string> #include <string>
#include "absl/functional/any_invocable.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include "api/priority.h" #include "api/priority.h"
#include "api/rtc_error.h" #include "api/rtc_error.h"
@ -198,7 +199,20 @@ class RTC_EXPORT DataChannelInterface : public rtc::RefCountInterface {
// Returns false if the data channel is not in open state or if the send // Returns false if the data channel is not in open state or if the send
// buffer is full. // buffer is full.
// TODO(webrtc:13289): Return an RTCError with information about the failure. // TODO(webrtc:13289): Return an RTCError with information about the failure.
virtual bool Send(const DataBuffer& buffer) = 0; // TODO(tommi): Remove this method once downstream implementations don't refer
// to it.
virtual bool Send(const DataBuffer& buffer);
// Queues up an asynchronus send operation to run on a network thread.
// Once the operation has completed the `on_complete` callback is invoked,
// on the thread the send operation was done on. It's important that
// `on_complete` implementations do not block the current thread but rather
// post any expensive operations to other worker threads.
// TODO(tommi): Make pure virtual after updating mock class in Chromium.
// Deprecate `Send` in favor of this variant since the return value of `Send`
// is limiting for a fully async implementation (yet in practice is ignored).
virtual void SendAsync(DataBuffer buffer,
absl::AnyInvocable<void(RTCError) &&> on_complete);
// Amount of bytes that can be queued for sending on the data channel. // Amount of bytes that can be queued for sending on the data channel.
// Those are bytes that have not yet been processed at the SCTP level. // Those are bytes that have not yet been processed at the SCTP level.

View file

@ -51,6 +51,11 @@ class MockDataChannelInterface
MOCK_METHOD(uint64_t, buffered_amount, (), (const, override)); MOCK_METHOD(uint64_t, buffered_amount, (), (const, override));
MOCK_METHOD(void, Close, (), (override)); MOCK_METHOD(void, Close, (), (override));
MOCK_METHOD(bool, Send, (const DataBuffer& buffer), (override)); MOCK_METHOD(bool, Send, (const DataBuffer& buffer), (override));
MOCK_METHOD(void,
SendAsync,
(DataBuffer buffer,
absl::AnyInvocable<void(RTCError) &&> on_complete),
(override));
protected: protected:
MockDataChannelInterface() = default; MockDataChannelInterface() = default;

View file

@ -877,6 +877,7 @@ rtc_library("sctp_data_channel") {
"../api:rtc_error", "../api:rtc_error",
"../api:scoped_refptr", "../api:scoped_refptr",
"../api:sequence_checker", "../api:sequence_checker",
"../api/task_queue:pending_task_safety_flag",
"../api/transport:datagram_transport_interface", "../api/transport:datagram_transport_interface",
"../media:media_channel", "../media:media_channel",
"../media:rtc_data_sctp_transport_internal", "../media:rtc_data_sctp_transport_internal",
@ -2484,6 +2485,7 @@ if (rtc_include_tests && !build_with_chromium) {
"../rtc_base/third_party/sigslot", "../rtc_base/third_party/sigslot",
"../system_wrappers:metrics", "../system_wrappers:metrics",
"../test:field_trial", "../test:field_trial",
"../test:rtc_expect_death",
"../test:run_loop", "../test:run_loop",
"../test:scoped_key_value_config", "../test:scoped_key_value_config",
"../test/pc/sctp:fake_sctp_transport", "../test/pc/sctp:fake_sctp_transport",

View file

@ -32,6 +32,10 @@
#include "test/gtest.h" #include "test/gtest.h"
#include "test/run_loop.h" #include "test/run_loop.h"
#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
#include "test/testsupport/rtc_expect_death.h"
#endif // RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
namespace webrtc { namespace webrtc {
namespace { namespace {
@ -124,6 +128,20 @@ class SctpDataChannelTest : public ::testing::Test {
channel_->RegisterObserver(observer_.get()); channel_->RegisterObserver(observer_.get());
} }
// Wait for queued up methods to run on the network thread.
void FlushNetworkThread() {
RTC_DCHECK_RUN_ON(run_loop_.task_queue());
network_thread_.BlockingCall([] {});
}
// Used to complete pending methods on the network thread
// that might queue up methods on the signaling (main) thread
// that are run too.
void FlushNetworkThreadAndPendingOperations() {
FlushNetworkThread();
run_loop_.Flush();
}
test::RunLoop run_loop_; test::RunLoop run_loop_;
rtc::Thread network_thread_; rtc::Thread network_thread_;
InternalDataChannelInit init_; InternalDataChannelInit init_;
@ -207,6 +225,48 @@ TEST_F(SctpDataChannelTest, StateTransition) {
// Tests that DataChannel::buffered_amount() is correct after the channel is // Tests that DataChannel::buffered_amount() is correct after the channel is
// blocked. // blocked.
TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) { TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) {
AddObserver();
SetChannelReady();
DataBuffer buffer("abcd");
size_t successful_sends = 0;
auto send_complete = [&](RTCError err) {
EXPECT_TRUE(err.ok());
++successful_sends;
};
channel_->SendAsync(buffer, send_complete);
FlushNetworkThreadAndPendingOperations();
EXPECT_EQ(channel_->buffered_amount(), 0u);
size_t successful_send_count = 1;
EXPECT_EQ(successful_send_count, successful_sends);
EXPECT_EQ(successful_send_count,
observer_->on_buffered_amount_change_count());
controller_->set_send_blocked(true);
const int number_of_packets = 3;
for (int i = 0; i < number_of_packets; ++i) {
channel_->SendAsync(buffer, send_complete);
++successful_send_count;
}
FlushNetworkThreadAndPendingOperations();
EXPECT_EQ(buffer.data.size() * number_of_packets,
channel_->buffered_amount());
EXPECT_EQ(successful_send_count, successful_sends);
// An event should not have been fired for buffered amount.
EXPECT_EQ(1u, observer_->on_buffered_amount_change_count());
// Now buffered amount events should get fired and the value
// get down to 0u.
controller_->set_send_blocked(false);
run_loop_.Flush();
EXPECT_EQ(channel_->buffered_amount(), 0u);
EXPECT_EQ(successful_send_count, successful_sends);
EXPECT_EQ(successful_send_count,
observer_->on_buffered_amount_change_count());
}
// TODO(tommi): This test uses `Send()`. Remove once fully deprecated.
TEST_F(SctpDataChannelTest, DeprecatedBufferedAmountWhenBlocked) {
AddObserver(); AddObserver();
SetChannelReady(); SetChannelReady();
DataBuffer buffer("abcd"); DataBuffer buffer("abcd");
@ -232,7 +292,7 @@ TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) {
controller_->set_send_blocked(false); controller_->set_send_blocked(false);
run_loop_.Flush(); run_loop_.Flush();
successful_send_count += number_of_packets; successful_send_count += number_of_packets;
EXPECT_EQ(0U, channel_->buffered_amount()); EXPECT_EQ(channel_->buffered_amount(), 0u);
EXPECT_EQ(successful_send_count, EXPECT_EQ(successful_send_count,
observer_->on_buffered_amount_change_count()); observer_->on_buffered_amount_change_count());
} }
@ -240,6 +300,28 @@ TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) {
// Tests that the queued data are sent when the channel transitions from blocked // Tests that the queued data are sent when the channel transitions from blocked
// to unblocked. // to unblocked.
TEST_F(SctpDataChannelTest, QueuedDataSentWhenUnblocked) { TEST_F(SctpDataChannelTest, QueuedDataSentWhenUnblocked) {
AddObserver();
SetChannelReady();
DataBuffer buffer("abcd");
controller_->set_send_blocked(true);
size_t successful_send = 0u;
auto send_complete = [&](RTCError err) {
EXPECT_TRUE(err.ok());
++successful_send;
};
channel_->SendAsync(buffer, send_complete);
FlushNetworkThreadAndPendingOperations();
EXPECT_EQ(1U, successful_send);
EXPECT_EQ(0U, observer_->on_buffered_amount_change_count());
controller_->set_send_blocked(false);
SetChannelReady();
EXPECT_EQ(channel_->buffered_amount(), 0u);
EXPECT_EQ(observer_->on_buffered_amount_change_count(), 1u);
}
// TODO(tommi): This test uses `Send()`. Remove once fully deprecated.
TEST_F(SctpDataChannelTest, DeprecatedQueuedDataSentWhenUnblocked) {
AddObserver(); AddObserver();
SetChannelReady(); SetChannelReady();
DataBuffer buffer("abcd"); DataBuffer buffer("abcd");
@ -257,6 +339,34 @@ TEST_F(SctpDataChannelTest, QueuedDataSentWhenUnblocked) {
// Tests that no crash when the channel is blocked right away while trying to // Tests that no crash when the channel is blocked right away while trying to
// send queued data. // send queued data.
TEST_F(SctpDataChannelTest, BlockedWhenSendQueuedDataNoCrash) { TEST_F(SctpDataChannelTest, BlockedWhenSendQueuedDataNoCrash) {
AddObserver();
SetChannelReady();
DataBuffer buffer("abcd");
controller_->set_send_blocked(true);
size_t successful_send = 0u;
auto send_complete = [&](RTCError err) {
EXPECT_TRUE(err.ok());
++successful_send;
};
channel_->SendAsync(buffer, send_complete);
FlushNetworkThreadAndPendingOperations();
EXPECT_EQ(1U, successful_send);
EXPECT_EQ(0U, observer_->on_buffered_amount_change_count());
// Set channel ready while it is still blocked.
SetChannelReady();
EXPECT_EQ(buffer.size(), channel_->buffered_amount());
EXPECT_EQ(0U, observer_->on_buffered_amount_change_count());
// Unblock the channel to send queued data again, there should be no crash.
controller_->set_send_blocked(false);
SetChannelReady();
EXPECT_EQ(0U, channel_->buffered_amount());
EXPECT_EQ(1U, observer_->on_buffered_amount_change_count());
}
// TODO(tommi): This test uses `Send()`. Remove once fully deprecated.
TEST_F(SctpDataChannelTest, DeprecatedBlockedWhenSendQueuedDataNoCrash) {
AddObserver(); AddObserver();
SetChannelReady(); SetChannelReady();
DataBuffer buffer("abcd"); DataBuffer buffer("abcd");
@ -294,6 +404,55 @@ TEST_F(SctpDataChannelTest, VerifyMessagesAndBytesSent) {
EXPECT_EQ(0U, channel_->messages_sent()); EXPECT_EQ(0U, channel_->messages_sent());
EXPECT_EQ(0U, channel_->bytes_sent()); EXPECT_EQ(0U, channel_->bytes_sent());
// Send three buffers while not blocked.
controller_->set_send_blocked(false);
for (int i : {0, 1, 2}) {
channel_->SendAsync(buffers[i], nullptr);
}
FlushNetworkThreadAndPendingOperations();
size_t bytes_sent = buffers[0].size() + buffers[1].size() + buffers[2].size();
EXPECT_EQ_WAIT(0U, channel_->buffered_amount(), kDefaultTimeout);
EXPECT_EQ(3U, channel_->messages_sent());
EXPECT_EQ(bytes_sent, channel_->bytes_sent());
// Send three buffers while blocked, queuing the buffers.
controller_->set_send_blocked(true);
for (int i : {3, 4, 5}) {
channel_->SendAsync(buffers[i], nullptr);
}
FlushNetworkThreadAndPendingOperations();
size_t bytes_queued =
buffers[3].size() + buffers[4].size() + buffers[5].size();
EXPECT_EQ(bytes_queued, channel_->buffered_amount());
EXPECT_EQ(3U, channel_->messages_sent());
EXPECT_EQ(bytes_sent, channel_->bytes_sent());
// Unblock and make sure everything was sent.
controller_->set_send_blocked(false);
EXPECT_EQ_WAIT(0U, channel_->buffered_amount(), kDefaultTimeout);
bytes_sent += bytes_queued;
EXPECT_EQ(6U, channel_->messages_sent());
EXPECT_EQ(bytes_sent, channel_->bytes_sent());
}
// TODO(tommi): This test uses `Send()`. Remove once fully deprecated.
TEST_F(SctpDataChannelTest, DeprecatedVerifyMessagesAndBytesSent) {
AddObserver();
SetChannelReady();
std::vector<DataBuffer> buffers({
DataBuffer("message 1"),
DataBuffer("msg 2"),
DataBuffer("message three"),
DataBuffer("quadra message"),
DataBuffer("fifthmsg"),
DataBuffer("message of the beast"),
});
// Default values.
EXPECT_EQ(0U, channel_->messages_sent());
EXPECT_EQ(0U, channel_->bytes_sent());
// Send three buffers while not blocked. // Send three buffers while not blocked.
controller_->set_send_blocked(false); controller_->set_send_blocked(false);
EXPECT_TRUE(channel_->Send(buffers[0])); EXPECT_TRUE(channel_->Send(buffers[0]));
@ -369,6 +528,35 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceivesOpenAck) {
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000); EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000);
// Sends a message and verifies it's ordered.
DataBuffer buffer("some data");
proxy->SendAsync(buffer, nullptr);
EXPECT_TRUE(controller_->last_send_data_params().ordered);
// Emulates receiving an OPEN_ACK message.
rtc::CopyOnWriteBuffer payload;
WriteDataChannelOpenAckMessage(&payload);
network_thread_.BlockingCall(
[&] { dc->OnDataReceived(DataMessageType::kControl, payload); });
// Sends another message and verifies it's unordered.
proxy->SendAsync(buffer, nullptr);
FlushNetworkThreadAndPendingOperations();
EXPECT_FALSE(controller_->last_send_data_params().ordered);
}
// TODO(tommi): This test uses `Send()`. Remove once fully deprecated.
TEST_F(SctpDataChannelTest, DeprecatedSendUnorderedAfterReceivesOpenAck) {
SetChannelReady();
InternalDataChannelInit init;
init.id = 1;
init.ordered = false;
rtc::scoped_refptr<SctpDataChannel> dc =
controller_->CreateDataChannel("test1", init);
auto proxy = webrtc::SctpDataChannel::CreateProxy(dc, signaling_safety_);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000);
// Sends a message and verifies it's ordered. // Sends a message and verifies it's ordered.
DataBuffer buffer("some data"); DataBuffer buffer("some data");
ASSERT_TRUE(proxy->Send(buffer)); ASSERT_TRUE(proxy->Send(buffer));
@ -398,6 +586,29 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceiveData) {
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000); EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000);
// Emulates receiving a DATA message.
DataBuffer buffer("data");
network_thread_.BlockingCall(
[&] { dc->OnDataReceived(DataMessageType::kText, buffer.data); });
// Sends a message and verifies it's unordered.
proxy->SendAsync(buffer, nullptr);
FlushNetworkThreadAndPendingOperations();
EXPECT_FALSE(controller_->last_send_data_params().ordered);
}
// TODO(tommi): This test uses `Send()`. Remove once fully deprecated.
TEST_F(SctpDataChannelTest, DeprecatedSendUnorderedAfterReceiveData) {
SetChannelReady();
InternalDataChannelInit init;
init.id = 1;
init.ordered = false;
rtc::scoped_refptr<SctpDataChannel> dc =
controller_->CreateDataChannel("test1", init);
auto proxy = webrtc::SctpDataChannel::CreateProxy(dc, signaling_safety_);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000);
// Emulates receiving a DATA message. // Emulates receiving a DATA message.
DataBuffer buffer("data"); DataBuffer buffer("data");
network_thread_.BlockingCall( network_thread_.BlockingCall(
@ -426,6 +637,24 @@ TEST_F(SctpDataChannelTest, OpenWaitsForOpenMesssage) {
TEST_F(SctpDataChannelTest, QueuedCloseFlushes) { TEST_F(SctpDataChannelTest, QueuedCloseFlushes) {
DataBuffer buffer("foo"); DataBuffer buffer("foo");
controller_->set_send_blocked(true);
SetChannelReady();
EXPECT_EQ(DataChannelInterface::kConnecting, channel_->state());
controller_->set_send_blocked(false);
EXPECT_EQ_WAIT(DataChannelInterface::kOpen, channel_->state(), 1000);
controller_->set_send_blocked(true);
channel_->SendAsync(buffer, nullptr);
channel_->Close();
controller_->set_send_blocked(false);
EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(), 1000);
EXPECT_TRUE(channel_->error().ok());
EXPECT_EQ(DataMessageType::kText, controller_->last_send_data_params().type);
}
// TODO(tommi): This test uses `Send()`. Remove once fully deprecated.
TEST_F(SctpDataChannelTest, DeprecatedQueuedCloseFlushes) {
DataBuffer buffer("foo");
controller_->set_send_blocked(true); controller_->set_send_blocked(true);
SetChannelReady(); SetChannelReady();
EXPECT_EQ(DataChannelInterface::kConnecting, channel_->state()); EXPECT_EQ(DataChannelInterface::kConnecting, channel_->state());
@ -442,6 +671,16 @@ TEST_F(SctpDataChannelTest, QueuedCloseFlushes) {
// Tests that messages are sent with the right id. // Tests that messages are sent with the right id.
TEST_F(SctpDataChannelTest, SendDataId) { TEST_F(SctpDataChannelTest, SendDataId) {
SetChannelSid(inner_channel_, StreamId(1));
SetChannelReady();
DataBuffer buffer("data");
channel_->SendAsync(buffer, nullptr);
FlushNetworkThreadAndPendingOperations();
EXPECT_EQ(1, controller_->last_sid());
}
// TODO(tommi): This test uses `Send()`. Remove once fully deprecated.
TEST_F(SctpDataChannelTest, DeprecatedSendDataId) {
SetChannelSid(inner_channel_, StreamId(1)); SetChannelSid(inner_channel_, StreamId(1));
SetChannelReady(); SetChannelReady();
DataBuffer buffer("data"); DataBuffer buffer("data");
@ -564,6 +803,36 @@ TEST_F(SctpDataChannelTest, OpenAckRoleInitialization) {
// Tests that that Send() returns false if the sending buffer is full // Tests that that Send() returns false if the sending buffer is full
// and the channel stays open. // and the channel stays open.
TEST_F(SctpDataChannelTest, OpenWhenSendBufferFull) { TEST_F(SctpDataChannelTest, OpenWhenSendBufferFull) {
AddObserver();
SetChannelReady();
const size_t packetSize = 1024;
rtc::CopyOnWriteBuffer buffer(packetSize);
memset(buffer.MutableData(), 0, buffer.size());
DataBuffer packet(buffer, true);
controller_->set_send_blocked(true);
size_t successful_send = 0u, failed_send = 0u;
auto send_complete = [&](RTCError err) {
err.ok() ? ++successful_send : ++failed_send;
};
size_t count = DataChannelInterface::MaxSendQueueSize() / packetSize;
for (size_t i = 0; i < count; ++i) {
channel_->SendAsync(packet, send_complete);
}
// The sending buffer should be full, `Send()` returns false.
channel_->SendAsync(packet, std::move(send_complete));
FlushNetworkThreadAndPendingOperations();
EXPECT_TRUE(DataChannelInterface::kOpen == channel_->state());
EXPECT_EQ(successful_send, count);
EXPECT_EQ(failed_send, 1u);
}
// TODO(tommi): This test uses `Send()`. Remove once fully deprecated.
TEST_F(SctpDataChannelTest, DeprecatedOpenWhenSendBufferFull) {
SetChannelReady(); SetChannelReady();
const size_t packetSize = 1024; const size_t packetSize = 1024;
@ -590,6 +859,20 @@ TEST_F(SctpDataChannelTest, ClosedOnTransportError) {
DataBuffer buffer("abcd"); DataBuffer buffer("abcd");
controller_->set_transport_error(); controller_->set_transport_error();
channel_->SendAsync(buffer, nullptr);
EXPECT_EQ(DataChannelInterface::kClosed, channel_->state());
EXPECT_FALSE(channel_->error().ok());
EXPECT_EQ(RTCErrorType::NETWORK_ERROR, channel_->error().type());
EXPECT_EQ(RTCErrorDetailType::NONE, channel_->error().error_detail());
}
// TODO(tommi): This test uses `Send()`. Remove once fully deprecated.
TEST_F(SctpDataChannelTest, DeprecatedClosedOnTransportError) {
SetChannelReady();
DataBuffer buffer("abcd");
controller_->set_transport_error();
EXPECT_TRUE(channel_->Send(buffer)); EXPECT_TRUE(channel_->Send(buffer));
EXPECT_EQ(DataChannelInterface::kClosed, channel_->state()); EXPECT_EQ(DataChannelInterface::kClosed, channel_->state());
@ -622,6 +905,17 @@ TEST_F(SctpDataChannelTest, SendEmptyData) {
SetChannelReady(); SetChannelReady();
EXPECT_EQ(DataChannelInterface::kOpen, channel_->state()); EXPECT_EQ(DataChannelInterface::kOpen, channel_->state());
DataBuffer buffer("");
channel_->SendAsync(buffer, nullptr);
EXPECT_EQ(DataChannelInterface::kOpen, channel_->state());
}
// TODO(tommi): This test uses `Send()`. Remove once fully deprecated.
TEST_F(SctpDataChannelTest, DeprecatedSendEmptyData) {
SetChannelSid(inner_channel_, StreamId(1));
SetChannelReady();
EXPECT_EQ(DataChannelInterface::kOpen, channel_->state());
DataBuffer buffer(""); DataBuffer buffer("");
EXPECT_TRUE(channel_->Send(buffer)); EXPECT_TRUE(channel_->Send(buffer));
EXPECT_EQ(DataChannelInterface::kOpen, channel_->state()); EXPECT_EQ(DataChannelInterface::kOpen, channel_->state());
@ -646,6 +940,34 @@ TEST_F(SctpDataChannelTest, UnusedTransitionsDirectlyToClosed) {
// Test that the data channel goes to the "closed" state (and doesn't crash) // Test that the data channel goes to the "closed" state (and doesn't crash)
// when its transport goes away, even while data is buffered. // when its transport goes away, even while data is buffered.
TEST_F(SctpDataChannelTest, TransportDestroyedWhileDataBuffered) { TEST_F(SctpDataChannelTest, TransportDestroyedWhileDataBuffered) {
AddObserver();
SetChannelReady();
rtc::CopyOnWriteBuffer buffer(1024);
memset(buffer.MutableData(), 0, buffer.size());
DataBuffer packet(buffer, true);
// Send a packet while sending is blocked so it ends up buffered.
controller_->set_send_blocked(true);
channel_->SendAsync(packet, nullptr);
// Tell the data channel that its transport is being destroyed.
// It should then stop using the transport (allowing us to delete it) and
// transition to the "closed" state.
RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, "");
error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE);
network_thread_.BlockingCall(
[&] { inner_channel_->OnTransportChannelClosed(error); });
controller_.reset(nullptr);
EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(),
kDefaultTimeout);
EXPECT_FALSE(channel_->error().ok());
EXPECT_EQ(RTCErrorType::OPERATION_ERROR_WITH_DATA, channel_->error().type());
EXPECT_EQ(RTCErrorDetailType::SCTP_FAILURE, channel_->error().error_detail());
}
// TODO(tommi): This test uses `Send()`. Remove once fully deprecated.
TEST_F(SctpDataChannelTest, DeprecatedTransportDestroyedWhileDataBuffered) {
SetChannelReady(); SetChannelReady();
rtc::CopyOnWriteBuffer buffer(1024); rtc::CopyOnWriteBuffer buffer(1024);
@ -762,5 +1084,69 @@ TEST_F(SctpSidAllocatorTest, SctpIdReusedForRemovedDataChannel) {
EXPECT_EQ(even_id.stream_id_int() + 6, allocated_id.stream_id_int()); EXPECT_EQ(even_id.stream_id_int() + 6, allocated_id.stream_id_int());
} }
// Code coverage tests for default implementations in data_channel_interface.*.
namespace {
class NoImplDataChannel : public DataChannelInterface {
public:
NoImplDataChannel() = default;
// Send and SendAsync implementations are public and implementation
// is in data_channel_interface.cc.
private:
// Implementation for pure virtual methods, just for compilation sake.
void RegisterObserver(DataChannelObserver* observer) override {}
void UnregisterObserver() override {}
std::string label() const override { return ""; }
bool reliable() const override { return false; }
int id() const override { return -1; }
DataState state() const override { return DataChannelInterface::kClosed; }
uint32_t messages_sent() const override { return 0u; }
uint64_t bytes_sent() const override { return 0u; }
uint32_t messages_received() const override { return 0u; }
uint64_t bytes_received() const override { return 0u; }
uint64_t buffered_amount() const override { return 0u; }
void Close() override {}
};
class NoImplObserver : public DataChannelObserver {
public:
NoImplObserver() = default;
private:
void OnStateChange() override {}
void OnMessage(const DataBuffer& buffer) override {}
};
} // namespace
TEST(DataChannelInterfaceTest, Coverage) {
auto channel = rtc::make_ref_counted<NoImplDataChannel>();
EXPECT_FALSE(channel->ordered());
EXPECT_EQ(channel->maxRetransmitTime(), 0u);
EXPECT_EQ(channel->maxRetransmits(), 0u);
EXPECT_FALSE(channel->maxRetransmitsOpt());
EXPECT_FALSE(channel->maxPacketLifeTime());
EXPECT_TRUE(channel->protocol().empty());
EXPECT_FALSE(channel->negotiated());
EXPECT_EQ(channel->MaxSendQueueSize(), 16u * 1024u * 1024u);
NoImplObserver observer;
observer.OnBufferedAmountChange(0u);
EXPECT_FALSE(observer.IsOkToCallOnTheNetworkThread());
}
#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
TEST(DataChannelInterfaceDeathTest, SendDefaultImplDchecks) {
auto channel = rtc::make_ref_counted<NoImplDataChannel>();
RTC_EXPECT_DEATH(channel->Send(DataBuffer("Foo")), "Check failed: false");
}
TEST(DataChannelInterfaceDeathTest, SendAsyncDefaultImplDchecks) {
auto channel = rtc::make_ref_counted<NoImplDataChannel>();
RTC_EXPECT_DEATH(channel->SendAsync(DataBuffer("Foo"), nullptr),
"Check failed: false");
}
#endif // RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
} // namespace } // namespace
} // namespace webrtc } // namespace webrtc

View file

@ -60,6 +60,10 @@ PROXY_SECONDARY_CONSTMETHOD0(uint64_t, bytes_received)
PROXY_SECONDARY_CONSTMETHOD0(uint64_t, buffered_amount) PROXY_SECONDARY_CONSTMETHOD0(uint64_t, buffered_amount)
PROXY_SECONDARY_METHOD0(void, Close) PROXY_SECONDARY_METHOD0(void, Close)
PROXY_SECONDARY_METHOD1(bool, Send, const DataBuffer&) PROXY_SECONDARY_METHOD1(bool, Send, const DataBuffer&)
BYPASS_PROXY_METHOD2(void,
SendAsync,
DataBuffer,
absl::AnyInvocable<void(RTCError) &&>)
END_PROXY_MAP(DataChannel) END_PROXY_MAP(DataChannel)
} // namespace } // namespace
@ -261,6 +265,8 @@ class SctpDataChannel::ObserverAdapter : public DataChannelObserver {
})); }));
} }
bool IsOkToCallOnTheNetworkThread() override { return true; }
rtc::Thread* signaling_thread() const { return signaling_thread_; } rtc::Thread* signaling_thread() const { return signaling_thread_; }
rtc::Thread* network_thread() const { return channel_->network_thread_; } rtc::Thread* network_thread() const { return channel_->network_thread_; }
@ -320,14 +326,16 @@ SctpDataChannel::SctpDataChannel(
negotiated_(config.negotiated), negotiated_(config.negotiated),
ordered_(config.ordered), ordered_(config.ordered),
observer_(nullptr), observer_(nullptr),
controller_(std::move(controller)), controller_(std::move(controller)) {
connected_to_transport_(connected_to_transport) {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
// Since we constructed on the network thread we can't (yet) check the // Since we constructed on the network thread we can't (yet) check the
// `controller_` pointer since doing so will trigger a thread check. // `controller_` pointer since doing so will trigger a thread check.
RTC_UNUSED(network_thread_); RTC_UNUSED(network_thread_);
RTC_DCHECK(config.IsValid()); RTC_DCHECK(config.IsValid());
if (connected_to_transport)
network_safety_->SetAlive();
switch (config.open_handshake_role) { switch (config.open_handshake_role) {
case InternalDataChannelInit::kNone: // pre-negotiated case InternalDataChannelInit::kNone: // pre-negotiated
handshake_state_ = kHandshakeReady; handshake_state_ = kHandshakeReady;
@ -465,9 +473,6 @@ bool SctpDataChannel::negotiated() const {
int SctpDataChannel::id() const { int SctpDataChannel::id() const {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
// TODO(tommi): Once an ID has been assigned, it won't change (can be
// considered const). We could do special handling of this and allow bypassing
// the proxy so that we can return a valid id without thread hopping.
return id_n_.stream_id_int(); return id_n_.stream_id_int();
} }
@ -554,22 +559,53 @@ uint64_t SctpDataChannel::bytes_received() const {
bool SctpDataChannel::Send(const DataBuffer& buffer) { bool SctpDataChannel::Send(const DataBuffer& buffer) {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
RTCError err = SendImpl(buffer);
if (err.type() == RTCErrorType::INVALID_STATE ||
err.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
return false;
}
// Always return true for SCTP DataChannel per the spec.
return true;
}
// RTC_RUN_ON(network_thread_);
RTCError SctpDataChannel::SendImpl(DataBuffer buffer) {
if (state_ != kOpen) { if (state_ != kOpen) {
error_ = RTCError(RTCErrorType::INVALID_STATE); error_ = RTCError(RTCErrorType::INVALID_STATE);
return false; return error_;
} }
// If the queue is non-empty, we're waiting for SignalReadyToSend, // If the queue is non-empty, we're waiting for SignalReadyToSend,
// so just add to the end of the queue and keep waiting. // so just add to the end of the queue and keep waiting.
if (!queued_send_data_.Empty()) { if (!queued_send_data_.Empty()) {
return QueueSendDataMessage(buffer); error_ = QueueSendDataMessage(buffer)
? RTCError::OK()
: RTCError(RTCErrorType::RESOURCE_EXHAUSTED);
return error_;
} }
SendDataMessage(buffer, true); return SendDataMessage(buffer, true);
}
// Always return true for SCTP DataChannel per the spec. void SctpDataChannel::SendAsync(
return true; DataBuffer buffer,
absl::AnyInvocable<void(RTCError) &&> on_complete) {
// Note: at this point, we do not know on which thread we're being called
// since this method bypasses the proxy. On Android the thread might be VM
// owned, on other platforms it might be the signaling thread, or in Chrome
// it can be the JS thread. We also don't know if it's consistently the same
// thread. So we always post to the network thread (even if the current thread
// might be the network thread - in theory a call could even come from within
// the `on_complete` callback).
network_thread_->PostTask(SafeTask(
network_safety_, [this, buffer = std::move(buffer),
on_complete = std::move(on_complete)]() mutable {
RTC_DCHECK_RUN_ON(network_thread_);
RTCError err = SendImpl(std::move(buffer));
if (on_complete)
std::move(on_complete)(err);
}));
} }
void SctpDataChannel::SetSctpSid_n(StreamId sid) { void SctpDataChannel::SetSctpSid_n(StreamId sid) {
@ -608,7 +644,7 @@ void SctpDataChannel::OnClosingProcedureComplete() {
void SctpDataChannel::OnTransportChannelCreated() { void SctpDataChannel::OnTransportChannelCreated() {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
connected_to_transport_ = true; network_safety_->SetAlive();
} }
void SctpDataChannel::OnTransportChannelClosed(RTCError error) { void SctpDataChannel::OnTransportChannelClosed(RTCError error) {
@ -697,10 +733,8 @@ void SctpDataChannel::OnTransportReady() {
// what triggers the callback to `OnTransportReady()`. // what triggers the callback to `OnTransportReady()`.
// These steps are currently accomplished via two separate PostTask calls to // These steps are currently accomplished via two separate PostTask calls to
// the signaling thread, but could simply be done in single method call on // the signaling thread, but could simply be done in single method call on
// the network thread (which incidentally is the thread that we'll need to // the network thread.
// be on for the below `Send*` calls, which currently do a BlockingCall RTC_DCHECK(connected_to_transport());
// from the signaling thread to the network thread.
RTC_DCHECK(connected_to_transport_);
RTC_DCHECK(id_n_.HasValue()); RTC_DCHECK(id_n_.HasValue());
SendQueuedControlMessages(); SendQueuedControlMessages();
@ -716,7 +750,7 @@ void SctpDataChannel::CloseAbruptlyWithError(RTCError error) {
return; return;
} }
connected_to_transport_ = false; network_safety_->SetNotAlive();
// Closing abruptly means any queued data gets thrown away. // Closing abruptly means any queued data gets thrown away.
queued_send_data_.Clear(); queued_send_data_.Clear();
@ -746,7 +780,7 @@ void SctpDataChannel::UpdateState() {
switch (state_) { switch (state_) {
case kConnecting: { case kConnecting: {
if (connected_to_transport_ && controller_) { if (connected_to_transport() && controller_) {
if (handshake_state_ == kHandshakeShouldSendOpen) { if (handshake_state_ == kHandshakeShouldSendOpen) {
rtc::CopyOnWriteBuffer payload; rtc::CopyOnWriteBuffer payload;
WriteDataChannelOpenMessage(label_, protocol_, priority_, ordered_, WriteDataChannelOpenMessage(label_, protocol_, priority_, ordered_,
@ -774,7 +808,7 @@ void SctpDataChannel::UpdateState() {
break; break;
} }
case kClosing: { case kClosing: {
if (connected_to_transport_ && controller_) { if (connected_to_transport() && controller_) {
// Wait for all queued data to be sent before beginning the closing // Wait for all queued data to be sent before beginning the closing
// procedure. // procedure.
if (queued_send_data_.Empty() && queued_control_data_.Empty()) { if (queued_send_data_.Empty() && queued_control_data_.Empty()) {
@ -840,7 +874,7 @@ void SctpDataChannel::SendQueuedDataMessages() {
while (!queued_send_data_.Empty()) { while (!queued_send_data_.Empty()) {
std::unique_ptr<DataBuffer> buffer = queued_send_data_.PopFront(); std::unique_ptr<DataBuffer> buffer = queued_send_data_.PopFront();
if (!SendDataMessage(*buffer, false)) { if (!SendDataMessage(*buffer, false).ok()) {
// Return the message to the front of the queue if sending is aborted. // Return the message to the front of the queue if sending is aborted.
queued_send_data_.PushFront(std::move(buffer)); queued_send_data_.PushFront(std::move(buffer));
break; break;
@ -849,12 +883,12 @@ void SctpDataChannel::SendQueuedDataMessages() {
} }
// RTC_RUN_ON(network_thread_). // RTC_RUN_ON(network_thread_).
bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer, RTCError SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
bool queue_if_blocked) { bool queue_if_blocked) {
SendDataParams send_params; SendDataParams send_params;
if (!controller_) { if (!controller_) {
error_ = RTCError(RTCErrorType::INVALID_STATE); error_ = RTCError(RTCErrorType::INVALID_STATE);
return false; return error_;
} }
send_params.ordered = ordered_; send_params.ordered = ordered_;
@ -879,12 +913,16 @@ bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
if (observer_ && buffer.size() > 0) { if (observer_ && buffer.size() > 0) {
observer_->OnBufferedAmountChange(buffer.size()); observer_->OnBufferedAmountChange(buffer.size());
} }
return true; return error_;
} }
if (error_.type() == RTCErrorType::RESOURCE_EXHAUSTED) { if (error_.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
if (!queue_if_blocked || QueueSendDataMessage(buffer)) { if (!queue_if_blocked)
return false; return error_;
if (QueueSendDataMessage(buffer)) {
error_ = RTCError::OK();
return error_;
} }
} }
// Close the channel if the error is not SDR_BLOCK, or if queuing the // Close the channel if the error is not SDR_BLOCK, or if queuing the
@ -895,7 +933,7 @@ bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
CloseAbruptlyWithError( CloseAbruptlyWithError(
RTCError(RTCErrorType::NETWORK_ERROR, "Failure to send data")); RTCError(RTCErrorType::NETWORK_ERROR, "Failure to send data"));
return false; return error_;
} }
// RTC_RUN_ON(network_thread_). // RTC_RUN_ON(network_thread_).
@ -924,7 +962,7 @@ void SctpDataChannel::SendQueuedControlMessages() {
// RTC_RUN_ON(network_thread_). // RTC_RUN_ON(network_thread_).
bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) { bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
RTC_DCHECK(connected_to_transport_); RTC_DCHECK(connected_to_transport());
RTC_DCHECK(id_n_.HasValue()); RTC_DCHECK(id_n_.HasValue());
RTC_DCHECK(controller_); RTC_DCHECK(controller_);

View file

@ -23,6 +23,7 @@
#include "api/rtc_error.h" #include "api/rtc_error.h"
#include "api/scoped_refptr.h" #include "api/scoped_refptr.h"
#include "api/sequence_checker.h" #include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/transport/data_channel_transport_interface.h" #include "api/transport/data_channel_transport_interface.h"
#include "pc/data_channel_utils.h" #include "pc/data_channel_utils.h"
#include "pc/sctp_utils.h" #include "pc/sctp_utils.h"
@ -171,6 +172,8 @@ class SctpDataChannel : public DataChannelInterface {
uint32_t messages_received() const override; uint32_t messages_received() const override;
uint64_t bytes_received() const override; uint64_t bytes_received() const override;
bool Send(const DataBuffer& buffer) override; bool Send(const DataBuffer& buffer) override;
void SendAsync(DataBuffer buffer,
absl::AnyInvocable<void(RTCError) &&> on_complete) override;
// Close immediately, ignoring any queued data or closing procedure. // Close immediately, ignoring any queued data or closing procedure.
// This is called when the underlying SctpTransport is being destroyed. // This is called when the underlying SctpTransport is being destroyed.
@ -247,13 +250,14 @@ class SctpDataChannel : public DataChannelInterface {
kHandshakeReady kHandshakeReady
}; };
RTCError SendImpl(DataBuffer buffer) RTC_RUN_ON(network_thread_);
void UpdateState() RTC_RUN_ON(network_thread_); void UpdateState() RTC_RUN_ON(network_thread_);
void SetState(DataState state) RTC_RUN_ON(network_thread_); void SetState(DataState state) RTC_RUN_ON(network_thread_);
void DeliverQueuedReceivedData() RTC_RUN_ON(network_thread_); void DeliverQueuedReceivedData() RTC_RUN_ON(network_thread_);
void SendQueuedDataMessages() RTC_RUN_ON(network_thread_); void SendQueuedDataMessages() RTC_RUN_ON(network_thread_);
bool SendDataMessage(const DataBuffer& buffer, bool queue_if_blocked) RTCError SendDataMessage(const DataBuffer& buffer, bool queue_if_blocked)
RTC_RUN_ON(network_thread_); RTC_RUN_ON(network_thread_);
bool QueueSendDataMessage(const DataBuffer& buffer) bool QueueSendDataMessage(const DataBuffer& buffer)
RTC_RUN_ON(network_thread_); RTC_RUN_ON(network_thread_);
@ -262,6 +266,10 @@ class SctpDataChannel : public DataChannelInterface {
bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer)
RTC_RUN_ON(network_thread_); RTC_RUN_ON(network_thread_);
bool connected_to_transport() const RTC_RUN_ON(network_thread_) {
return network_safety_->alive();
}
rtc::Thread* const signaling_thread_; rtc::Thread* const signaling_thread_;
rtc::Thread* const network_thread_; rtc::Thread* const network_thread_;
StreamId id_n_ RTC_GUARDED_BY(network_thread_); StreamId id_n_ RTC_GUARDED_BY(network_thread_);
@ -286,7 +294,6 @@ class SctpDataChannel : public DataChannelInterface {
RTC_GUARDED_BY(network_thread_); RTC_GUARDED_BY(network_thread_);
HandshakeState handshake_state_ RTC_GUARDED_BY(network_thread_) = HandshakeState handshake_state_ RTC_GUARDED_BY(network_thread_) =
kHandshakeInit; kHandshakeInit;
bool connected_to_transport_ RTC_GUARDED_BY(network_thread_) = false;
// Did we already start the graceful SCTP closing procedure? // Did we already start the graceful SCTP closing procedure?
bool started_closing_procedure_ RTC_GUARDED_BY(network_thread_) = false; bool started_closing_procedure_ RTC_GUARDED_BY(network_thread_) = false;
// Control messages that always have to get sent out before any queued // Control messages that always have to get sent out before any queued
@ -294,6 +301,8 @@ class SctpDataChannel : public DataChannelInterface {
PacketQueue queued_control_data_ RTC_GUARDED_BY(network_thread_); PacketQueue queued_control_data_ RTC_GUARDED_BY(network_thread_);
PacketQueue queued_received_data_ RTC_GUARDED_BY(network_thread_); PacketQueue queued_received_data_ RTC_GUARDED_BY(network_thread_);
PacketQueue queued_send_data_ RTC_GUARDED_BY(network_thread_); PacketQueue queued_send_data_ RTC_GUARDED_BY(network_thread_);
rtc::scoped_refptr<PendingTaskSafetyFlag> network_safety_ =
PendingTaskSafetyFlag::CreateDetachedInactive();
}; };
} // namespace webrtc } // namespace webrtc