From a50a81a150524f69329114dd7478da592046a14e Mon Sep 17 00:00:00 2001 From: Tommi Date: Tue, 11 Apr 2023 17:32:34 +0200 Subject: [PATCH] [DataChannelInterface] Introduce DataChannelInterface::SendAsync() One problem with the existing Send() method is that it has a return value that is problematic for a fully async implementation. A second problem with Send() is that the return value is bool and not RTCError (webrtc:13289), which is why OnSendComplete() uses RTCError. Also, start deprecating `bool Send()` in favor of `void SendAsync()` and adding `network_safety_` flag for posting async operations to the network thread. This flag also takes over from the `connected_to_transport_` which can now be removed. Bug: webrtc:11547, webrtc:13289 Change-Id: I87bbc7e9b964a52684bdfe0e6ebc5230be254e8b Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/299760 Reviewed-by: Danil Chapovalov Commit-Queue: Tomas Gunnarsson Cr-Commit-Position: refs/heads/main@{#39817} --- api/data_channel_interface.cc | 15 ++ api/data_channel_interface.h | 16 +- api/test/mock_data_channel.h | 5 + pc/BUILD.gn | 2 + pc/data_channel_unittest.cc | 388 +++++++++++++++++++++++++++++++++- pc/sctp_data_channel.cc | 92 +++++--- pc/sctp_data_channel.h | 13 +- 7 files changed, 500 insertions(+), 31 deletions(-) diff --git a/api/data_channel_interface.cc b/api/data_channel_interface.cc index bddb9d1b0a..970f53b4bd 100644 --- a/api/data_channel_interface.cc +++ b/api/data_channel_interface.cc @@ -10,6 +10,8 @@ #include "api/data_channel_interface.h" +#include "rtc_base/checks.h" + namespace webrtc { bool DataChannelInterface::ordered() const { @@ -44,4 +46,17 @@ uint64_t DataChannelInterface::MaxSendQueueSize() { return 16 * 1024 * 1024; // 16 MiB } +// TODO(tommi): Remove method once downstream implementations have been removed. +bool DataChannelInterface::Send(const DataBuffer& buffer) { + RTC_DCHECK_NOTREACHED(); + return false; +} + +// TODO(tommi): Remove implementation once method is pure virtual. +void DataChannelInterface::SendAsync( + DataBuffer buffer, + absl::AnyInvocable on_complete) { + RTC_DCHECK_NOTREACHED(); +} + } // namespace webrtc diff --git a/api/data_channel_interface.h b/api/data_channel_interface.h index 35ef8e4f0d..bf27c6c4f3 100644 --- a/api/data_channel_interface.h +++ b/api/data_channel_interface.h @@ -19,6 +19,7 @@ #include +#include "absl/functional/any_invocable.h" #include "absl/types/optional.h" #include "api/priority.h" #include "api/rtc_error.h" @@ -198,7 +199,20 @@ class RTC_EXPORT DataChannelInterface : public rtc::RefCountInterface { // Returns false if the data channel is not in open state or if the send // buffer is full. // TODO(webrtc:13289): Return an RTCError with information about the failure. - virtual bool Send(const DataBuffer& buffer) = 0; + // TODO(tommi): Remove this method once downstream implementations don't refer + // to it. + virtual bool Send(const DataBuffer& buffer); + + // Queues up an asynchronus send operation to run on a network thread. + // Once the operation has completed the `on_complete` callback is invoked, + // on the thread the send operation was done on. It's important that + // `on_complete` implementations do not block the current thread but rather + // post any expensive operations to other worker threads. + // TODO(tommi): Make pure virtual after updating mock class in Chromium. + // Deprecate `Send` in favor of this variant since the return value of `Send` + // is limiting for a fully async implementation (yet in practice is ignored). + virtual void SendAsync(DataBuffer buffer, + absl::AnyInvocable on_complete); // Amount of bytes that can be queued for sending on the data channel. // Those are bytes that have not yet been processed at the SCTP level. diff --git a/api/test/mock_data_channel.h b/api/test/mock_data_channel.h index 38730eaa51..5d38ec1375 100644 --- a/api/test/mock_data_channel.h +++ b/api/test/mock_data_channel.h @@ -51,6 +51,11 @@ class MockDataChannelInterface MOCK_METHOD(uint64_t, buffered_amount, (), (const, override)); MOCK_METHOD(void, Close, (), (override)); MOCK_METHOD(bool, Send, (const DataBuffer& buffer), (override)); + MOCK_METHOD(void, + SendAsync, + (DataBuffer buffer, + absl::AnyInvocable on_complete), + (override)); protected: MockDataChannelInterface() = default; diff --git a/pc/BUILD.gn b/pc/BUILD.gn index 740c3e838c..f93b339662 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -877,6 +877,7 @@ rtc_library("sctp_data_channel") { "../api:rtc_error", "../api:scoped_refptr", "../api:sequence_checker", + "../api/task_queue:pending_task_safety_flag", "../api/transport:datagram_transport_interface", "../media:media_channel", "../media:rtc_data_sctp_transport_internal", @@ -2484,6 +2485,7 @@ if (rtc_include_tests && !build_with_chromium) { "../rtc_base/third_party/sigslot", "../system_wrappers:metrics", "../test:field_trial", + "../test:rtc_expect_death", "../test:run_loop", "../test:scoped_key_value_config", "../test/pc/sctp:fake_sctp_transport", diff --git a/pc/data_channel_unittest.cc b/pc/data_channel_unittest.cc index 7c88d29cc7..9b84a1be61 100644 --- a/pc/data_channel_unittest.cc +++ b/pc/data_channel_unittest.cc @@ -32,6 +32,10 @@ #include "test/gtest.h" #include "test/run_loop.h" +#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID) +#include "test/testsupport/rtc_expect_death.h" +#endif // RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID) + namespace webrtc { namespace { @@ -124,6 +128,20 @@ class SctpDataChannelTest : public ::testing::Test { channel_->RegisterObserver(observer_.get()); } + // Wait for queued up methods to run on the network thread. + void FlushNetworkThread() { + RTC_DCHECK_RUN_ON(run_loop_.task_queue()); + network_thread_.BlockingCall([] {}); + } + + // Used to complete pending methods on the network thread + // that might queue up methods on the signaling (main) thread + // that are run too. + void FlushNetworkThreadAndPendingOperations() { + FlushNetworkThread(); + run_loop_.Flush(); + } + test::RunLoop run_loop_; rtc::Thread network_thread_; InternalDataChannelInit init_; @@ -207,6 +225,48 @@ TEST_F(SctpDataChannelTest, StateTransition) { // Tests that DataChannel::buffered_amount() is correct after the channel is // blocked. TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) { + AddObserver(); + SetChannelReady(); + DataBuffer buffer("abcd"); + size_t successful_sends = 0; + auto send_complete = [&](RTCError err) { + EXPECT_TRUE(err.ok()); + ++successful_sends; + }; + channel_->SendAsync(buffer, send_complete); + FlushNetworkThreadAndPendingOperations(); + EXPECT_EQ(channel_->buffered_amount(), 0u); + size_t successful_send_count = 1; + EXPECT_EQ(successful_send_count, successful_sends); + EXPECT_EQ(successful_send_count, + observer_->on_buffered_amount_change_count()); + + controller_->set_send_blocked(true); + const int number_of_packets = 3; + for (int i = 0; i < number_of_packets; ++i) { + channel_->SendAsync(buffer, send_complete); + ++successful_send_count; + } + FlushNetworkThreadAndPendingOperations(); + EXPECT_EQ(buffer.data.size() * number_of_packets, + channel_->buffered_amount()); + EXPECT_EQ(successful_send_count, successful_sends); + + // An event should not have been fired for buffered amount. + EXPECT_EQ(1u, observer_->on_buffered_amount_change_count()); + + // Now buffered amount events should get fired and the value + // get down to 0u. + controller_->set_send_blocked(false); + run_loop_.Flush(); + EXPECT_EQ(channel_->buffered_amount(), 0u); + EXPECT_EQ(successful_send_count, successful_sends); + EXPECT_EQ(successful_send_count, + observer_->on_buffered_amount_change_count()); +} + +// TODO(tommi): This test uses `Send()`. Remove once fully deprecated. +TEST_F(SctpDataChannelTest, DeprecatedBufferedAmountWhenBlocked) { AddObserver(); SetChannelReady(); DataBuffer buffer("abcd"); @@ -232,7 +292,7 @@ TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) { controller_->set_send_blocked(false); run_loop_.Flush(); successful_send_count += number_of_packets; - EXPECT_EQ(0U, channel_->buffered_amount()); + EXPECT_EQ(channel_->buffered_amount(), 0u); EXPECT_EQ(successful_send_count, observer_->on_buffered_amount_change_count()); } @@ -240,6 +300,28 @@ TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) { // Tests that the queued data are sent when the channel transitions from blocked // to unblocked. TEST_F(SctpDataChannelTest, QueuedDataSentWhenUnblocked) { + AddObserver(); + SetChannelReady(); + DataBuffer buffer("abcd"); + controller_->set_send_blocked(true); + size_t successful_send = 0u; + auto send_complete = [&](RTCError err) { + EXPECT_TRUE(err.ok()); + ++successful_send; + }; + channel_->SendAsync(buffer, send_complete); + FlushNetworkThreadAndPendingOperations(); + EXPECT_EQ(1U, successful_send); + EXPECT_EQ(0U, observer_->on_buffered_amount_change_count()); + + controller_->set_send_blocked(false); + SetChannelReady(); + EXPECT_EQ(channel_->buffered_amount(), 0u); + EXPECT_EQ(observer_->on_buffered_amount_change_count(), 1u); +} + +// TODO(tommi): This test uses `Send()`. Remove once fully deprecated. +TEST_F(SctpDataChannelTest, DeprecatedQueuedDataSentWhenUnblocked) { AddObserver(); SetChannelReady(); DataBuffer buffer("abcd"); @@ -257,6 +339,34 @@ TEST_F(SctpDataChannelTest, QueuedDataSentWhenUnblocked) { // Tests that no crash when the channel is blocked right away while trying to // send queued data. TEST_F(SctpDataChannelTest, BlockedWhenSendQueuedDataNoCrash) { + AddObserver(); + SetChannelReady(); + DataBuffer buffer("abcd"); + controller_->set_send_blocked(true); + size_t successful_send = 0u; + auto send_complete = [&](RTCError err) { + EXPECT_TRUE(err.ok()); + ++successful_send; + }; + channel_->SendAsync(buffer, send_complete); + FlushNetworkThreadAndPendingOperations(); + EXPECT_EQ(1U, successful_send); + EXPECT_EQ(0U, observer_->on_buffered_amount_change_count()); + + // Set channel ready while it is still blocked. + SetChannelReady(); + EXPECT_EQ(buffer.size(), channel_->buffered_amount()); + EXPECT_EQ(0U, observer_->on_buffered_amount_change_count()); + + // Unblock the channel to send queued data again, there should be no crash. + controller_->set_send_blocked(false); + SetChannelReady(); + EXPECT_EQ(0U, channel_->buffered_amount()); + EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); +} + +// TODO(tommi): This test uses `Send()`. Remove once fully deprecated. +TEST_F(SctpDataChannelTest, DeprecatedBlockedWhenSendQueuedDataNoCrash) { AddObserver(); SetChannelReady(); DataBuffer buffer("abcd"); @@ -294,6 +404,55 @@ TEST_F(SctpDataChannelTest, VerifyMessagesAndBytesSent) { EXPECT_EQ(0U, channel_->messages_sent()); EXPECT_EQ(0U, channel_->bytes_sent()); + // Send three buffers while not blocked. + controller_->set_send_blocked(false); + for (int i : {0, 1, 2}) { + channel_->SendAsync(buffers[i], nullptr); + } + FlushNetworkThreadAndPendingOperations(); + + size_t bytes_sent = buffers[0].size() + buffers[1].size() + buffers[2].size(); + EXPECT_EQ_WAIT(0U, channel_->buffered_amount(), kDefaultTimeout); + EXPECT_EQ(3U, channel_->messages_sent()); + EXPECT_EQ(bytes_sent, channel_->bytes_sent()); + + // Send three buffers while blocked, queuing the buffers. + controller_->set_send_blocked(true); + for (int i : {3, 4, 5}) { + channel_->SendAsync(buffers[i], nullptr); + } + FlushNetworkThreadAndPendingOperations(); + size_t bytes_queued = + buffers[3].size() + buffers[4].size() + buffers[5].size(); + EXPECT_EQ(bytes_queued, channel_->buffered_amount()); + EXPECT_EQ(3U, channel_->messages_sent()); + EXPECT_EQ(bytes_sent, channel_->bytes_sent()); + + // Unblock and make sure everything was sent. + controller_->set_send_blocked(false); + EXPECT_EQ_WAIT(0U, channel_->buffered_amount(), kDefaultTimeout); + bytes_sent += bytes_queued; + EXPECT_EQ(6U, channel_->messages_sent()); + EXPECT_EQ(bytes_sent, channel_->bytes_sent()); +} + +// TODO(tommi): This test uses `Send()`. Remove once fully deprecated. +TEST_F(SctpDataChannelTest, DeprecatedVerifyMessagesAndBytesSent) { + AddObserver(); + SetChannelReady(); + std::vector buffers({ + DataBuffer("message 1"), + DataBuffer("msg 2"), + DataBuffer("message three"), + DataBuffer("quadra message"), + DataBuffer("fifthmsg"), + DataBuffer("message of the beast"), + }); + + // Default values. + EXPECT_EQ(0U, channel_->messages_sent()); + EXPECT_EQ(0U, channel_->bytes_sent()); + // Send three buffers while not blocked. controller_->set_send_blocked(false); EXPECT_TRUE(channel_->Send(buffers[0])); @@ -369,6 +528,35 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceivesOpenAck) { EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000); + // Sends a message and verifies it's ordered. + DataBuffer buffer("some data"); + proxy->SendAsync(buffer, nullptr); + EXPECT_TRUE(controller_->last_send_data_params().ordered); + + // Emulates receiving an OPEN_ACK message. + rtc::CopyOnWriteBuffer payload; + WriteDataChannelOpenAckMessage(&payload); + network_thread_.BlockingCall( + [&] { dc->OnDataReceived(DataMessageType::kControl, payload); }); + + // Sends another message and verifies it's unordered. + proxy->SendAsync(buffer, nullptr); + FlushNetworkThreadAndPendingOperations(); + EXPECT_FALSE(controller_->last_send_data_params().ordered); +} + +// TODO(tommi): This test uses `Send()`. Remove once fully deprecated. +TEST_F(SctpDataChannelTest, DeprecatedSendUnorderedAfterReceivesOpenAck) { + SetChannelReady(); + InternalDataChannelInit init; + init.id = 1; + init.ordered = false; + rtc::scoped_refptr dc = + controller_->CreateDataChannel("test1", init); + auto proxy = webrtc::SctpDataChannel::CreateProxy(dc, signaling_safety_); + + EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000); + // Sends a message and verifies it's ordered. DataBuffer buffer("some data"); ASSERT_TRUE(proxy->Send(buffer)); @@ -398,6 +586,29 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceiveData) { EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000); + // Emulates receiving a DATA message. + DataBuffer buffer("data"); + network_thread_.BlockingCall( + [&] { dc->OnDataReceived(DataMessageType::kText, buffer.data); }); + + // Sends a message and verifies it's unordered. + proxy->SendAsync(buffer, nullptr); + FlushNetworkThreadAndPendingOperations(); + EXPECT_FALSE(controller_->last_send_data_params().ordered); +} + +// TODO(tommi): This test uses `Send()`. Remove once fully deprecated. +TEST_F(SctpDataChannelTest, DeprecatedSendUnorderedAfterReceiveData) { + SetChannelReady(); + InternalDataChannelInit init; + init.id = 1; + init.ordered = false; + rtc::scoped_refptr dc = + controller_->CreateDataChannel("test1", init); + auto proxy = webrtc::SctpDataChannel::CreateProxy(dc, signaling_safety_); + + EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000); + // Emulates receiving a DATA message. DataBuffer buffer("data"); network_thread_.BlockingCall( @@ -426,6 +637,24 @@ TEST_F(SctpDataChannelTest, OpenWaitsForOpenMesssage) { TEST_F(SctpDataChannelTest, QueuedCloseFlushes) { DataBuffer buffer("foo"); + controller_->set_send_blocked(true); + SetChannelReady(); + EXPECT_EQ(DataChannelInterface::kConnecting, channel_->state()); + controller_->set_send_blocked(false); + EXPECT_EQ_WAIT(DataChannelInterface::kOpen, channel_->state(), 1000); + controller_->set_send_blocked(true); + channel_->SendAsync(buffer, nullptr); + channel_->Close(); + controller_->set_send_blocked(false); + EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(), 1000); + EXPECT_TRUE(channel_->error().ok()); + EXPECT_EQ(DataMessageType::kText, controller_->last_send_data_params().type); +} + +// TODO(tommi): This test uses `Send()`. Remove once fully deprecated. +TEST_F(SctpDataChannelTest, DeprecatedQueuedCloseFlushes) { + DataBuffer buffer("foo"); + controller_->set_send_blocked(true); SetChannelReady(); EXPECT_EQ(DataChannelInterface::kConnecting, channel_->state()); @@ -442,6 +671,16 @@ TEST_F(SctpDataChannelTest, QueuedCloseFlushes) { // Tests that messages are sent with the right id. TEST_F(SctpDataChannelTest, SendDataId) { + SetChannelSid(inner_channel_, StreamId(1)); + SetChannelReady(); + DataBuffer buffer("data"); + channel_->SendAsync(buffer, nullptr); + FlushNetworkThreadAndPendingOperations(); + EXPECT_EQ(1, controller_->last_sid()); +} + +// TODO(tommi): This test uses `Send()`. Remove once fully deprecated. +TEST_F(SctpDataChannelTest, DeprecatedSendDataId) { SetChannelSid(inner_channel_, StreamId(1)); SetChannelReady(); DataBuffer buffer("data"); @@ -564,6 +803,36 @@ TEST_F(SctpDataChannelTest, OpenAckRoleInitialization) { // Tests that that Send() returns false if the sending buffer is full // and the channel stays open. TEST_F(SctpDataChannelTest, OpenWhenSendBufferFull) { + AddObserver(); + SetChannelReady(); + + const size_t packetSize = 1024; + + rtc::CopyOnWriteBuffer buffer(packetSize); + memset(buffer.MutableData(), 0, buffer.size()); + + DataBuffer packet(buffer, true); + controller_->set_send_blocked(true); + size_t successful_send = 0u, failed_send = 0u; + auto send_complete = [&](RTCError err) { + err.ok() ? ++successful_send : ++failed_send; + }; + + size_t count = DataChannelInterface::MaxSendQueueSize() / packetSize; + for (size_t i = 0; i < count; ++i) { + channel_->SendAsync(packet, send_complete); + } + + // The sending buffer should be full, `Send()` returns false. + channel_->SendAsync(packet, std::move(send_complete)); + FlushNetworkThreadAndPendingOperations(); + EXPECT_TRUE(DataChannelInterface::kOpen == channel_->state()); + EXPECT_EQ(successful_send, count); + EXPECT_EQ(failed_send, 1u); +} + +// TODO(tommi): This test uses `Send()`. Remove once fully deprecated. +TEST_F(SctpDataChannelTest, DeprecatedOpenWhenSendBufferFull) { SetChannelReady(); const size_t packetSize = 1024; @@ -590,6 +859,20 @@ TEST_F(SctpDataChannelTest, ClosedOnTransportError) { DataBuffer buffer("abcd"); controller_->set_transport_error(); + channel_->SendAsync(buffer, nullptr); + + EXPECT_EQ(DataChannelInterface::kClosed, channel_->state()); + EXPECT_FALSE(channel_->error().ok()); + EXPECT_EQ(RTCErrorType::NETWORK_ERROR, channel_->error().type()); + EXPECT_EQ(RTCErrorDetailType::NONE, channel_->error().error_detail()); +} + +// TODO(tommi): This test uses `Send()`. Remove once fully deprecated. +TEST_F(SctpDataChannelTest, DeprecatedClosedOnTransportError) { + SetChannelReady(); + DataBuffer buffer("abcd"); + controller_->set_transport_error(); + EXPECT_TRUE(channel_->Send(buffer)); EXPECT_EQ(DataChannelInterface::kClosed, channel_->state()); @@ -622,6 +905,17 @@ TEST_F(SctpDataChannelTest, SendEmptyData) { SetChannelReady(); EXPECT_EQ(DataChannelInterface::kOpen, channel_->state()); + DataBuffer buffer(""); + channel_->SendAsync(buffer, nullptr); + EXPECT_EQ(DataChannelInterface::kOpen, channel_->state()); +} + +// TODO(tommi): This test uses `Send()`. Remove once fully deprecated. +TEST_F(SctpDataChannelTest, DeprecatedSendEmptyData) { + SetChannelSid(inner_channel_, StreamId(1)); + SetChannelReady(); + EXPECT_EQ(DataChannelInterface::kOpen, channel_->state()); + DataBuffer buffer(""); EXPECT_TRUE(channel_->Send(buffer)); EXPECT_EQ(DataChannelInterface::kOpen, channel_->state()); @@ -646,6 +940,34 @@ TEST_F(SctpDataChannelTest, UnusedTransitionsDirectlyToClosed) { // Test that the data channel goes to the "closed" state (and doesn't crash) // when its transport goes away, even while data is buffered. TEST_F(SctpDataChannelTest, TransportDestroyedWhileDataBuffered) { + AddObserver(); + SetChannelReady(); + + rtc::CopyOnWriteBuffer buffer(1024); + memset(buffer.MutableData(), 0, buffer.size()); + DataBuffer packet(buffer, true); + + // Send a packet while sending is blocked so it ends up buffered. + controller_->set_send_blocked(true); + channel_->SendAsync(packet, nullptr); + + // Tell the data channel that its transport is being destroyed. + // It should then stop using the transport (allowing us to delete it) and + // transition to the "closed" state. + RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, ""); + error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE); + network_thread_.BlockingCall( + [&] { inner_channel_->OnTransportChannelClosed(error); }); + controller_.reset(nullptr); + EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(), + kDefaultTimeout); + EXPECT_FALSE(channel_->error().ok()); + EXPECT_EQ(RTCErrorType::OPERATION_ERROR_WITH_DATA, channel_->error().type()); + EXPECT_EQ(RTCErrorDetailType::SCTP_FAILURE, channel_->error().error_detail()); +} + +// TODO(tommi): This test uses `Send()`. Remove once fully deprecated. +TEST_F(SctpDataChannelTest, DeprecatedTransportDestroyedWhileDataBuffered) { SetChannelReady(); rtc::CopyOnWriteBuffer buffer(1024); @@ -762,5 +1084,69 @@ TEST_F(SctpSidAllocatorTest, SctpIdReusedForRemovedDataChannel) { EXPECT_EQ(even_id.stream_id_int() + 6, allocated_id.stream_id_int()); } +// Code coverage tests for default implementations in data_channel_interface.*. +namespace { +class NoImplDataChannel : public DataChannelInterface { + public: + NoImplDataChannel() = default; + // Send and SendAsync implementations are public and implementation + // is in data_channel_interface.cc. + + private: + // Implementation for pure virtual methods, just for compilation sake. + void RegisterObserver(DataChannelObserver* observer) override {} + void UnregisterObserver() override {} + std::string label() const override { return ""; } + bool reliable() const override { return false; } + int id() const override { return -1; } + DataState state() const override { return DataChannelInterface::kClosed; } + uint32_t messages_sent() const override { return 0u; } + uint64_t bytes_sent() const override { return 0u; } + uint32_t messages_received() const override { return 0u; } + uint64_t bytes_received() const override { return 0u; } + uint64_t buffered_amount() const override { return 0u; } + void Close() override {} +}; + +class NoImplObserver : public DataChannelObserver { + public: + NoImplObserver() = default; + + private: + void OnStateChange() override {} + void OnMessage(const DataBuffer& buffer) override {} +}; +} // namespace + +TEST(DataChannelInterfaceTest, Coverage) { + auto channel = rtc::make_ref_counted(); + EXPECT_FALSE(channel->ordered()); + EXPECT_EQ(channel->maxRetransmitTime(), 0u); + EXPECT_EQ(channel->maxRetransmits(), 0u); + EXPECT_FALSE(channel->maxRetransmitsOpt()); + EXPECT_FALSE(channel->maxPacketLifeTime()); + EXPECT_TRUE(channel->protocol().empty()); + EXPECT_FALSE(channel->negotiated()); + EXPECT_EQ(channel->MaxSendQueueSize(), 16u * 1024u * 1024u); + + NoImplObserver observer; + observer.OnBufferedAmountChange(0u); + EXPECT_FALSE(observer.IsOkToCallOnTheNetworkThread()); +} + +#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID) + +TEST(DataChannelInterfaceDeathTest, SendDefaultImplDchecks) { + auto channel = rtc::make_ref_counted(); + RTC_EXPECT_DEATH(channel->Send(DataBuffer("Foo")), "Check failed: false"); +} + +TEST(DataChannelInterfaceDeathTest, SendAsyncDefaultImplDchecks) { + auto channel = rtc::make_ref_counted(); + RTC_EXPECT_DEATH(channel->SendAsync(DataBuffer("Foo"), nullptr), + "Check failed: false"); +} +#endif // RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID) + } // namespace } // namespace webrtc diff --git a/pc/sctp_data_channel.cc b/pc/sctp_data_channel.cc index 7fad029ae1..8a53dee0a3 100644 --- a/pc/sctp_data_channel.cc +++ b/pc/sctp_data_channel.cc @@ -60,6 +60,10 @@ PROXY_SECONDARY_CONSTMETHOD0(uint64_t, bytes_received) PROXY_SECONDARY_CONSTMETHOD0(uint64_t, buffered_amount) PROXY_SECONDARY_METHOD0(void, Close) PROXY_SECONDARY_METHOD1(bool, Send, const DataBuffer&) +BYPASS_PROXY_METHOD2(void, + SendAsync, + DataBuffer, + absl::AnyInvocable) END_PROXY_MAP(DataChannel) } // namespace @@ -261,6 +265,8 @@ class SctpDataChannel::ObserverAdapter : public DataChannelObserver { })); } + bool IsOkToCallOnTheNetworkThread() override { return true; } + rtc::Thread* signaling_thread() const { return signaling_thread_; } rtc::Thread* network_thread() const { return channel_->network_thread_; } @@ -320,14 +326,16 @@ SctpDataChannel::SctpDataChannel( negotiated_(config.negotiated), ordered_(config.ordered), observer_(nullptr), - controller_(std::move(controller)), - connected_to_transport_(connected_to_transport) { + controller_(std::move(controller)) { RTC_DCHECK_RUN_ON(network_thread_); // Since we constructed on the network thread we can't (yet) check the // `controller_` pointer since doing so will trigger a thread check. RTC_UNUSED(network_thread_); RTC_DCHECK(config.IsValid()); + if (connected_to_transport) + network_safety_->SetAlive(); + switch (config.open_handshake_role) { case InternalDataChannelInit::kNone: // pre-negotiated handshake_state_ = kHandshakeReady; @@ -465,9 +473,6 @@ bool SctpDataChannel::negotiated() const { int SctpDataChannel::id() const { RTC_DCHECK_RUN_ON(network_thread_); - // TODO(tommi): Once an ID has been assigned, it won't change (can be - // considered const). We could do special handling of this and allow bypassing - // the proxy so that we can return a valid id without thread hopping. return id_n_.stream_id_int(); } @@ -554,22 +559,53 @@ uint64_t SctpDataChannel::bytes_received() const { bool SctpDataChannel::Send(const DataBuffer& buffer) { RTC_DCHECK_RUN_ON(network_thread_); + RTCError err = SendImpl(buffer); + if (err.type() == RTCErrorType::INVALID_STATE || + err.type() == RTCErrorType::RESOURCE_EXHAUSTED) { + return false; + } + // Always return true for SCTP DataChannel per the spec. + return true; +} + +// RTC_RUN_ON(network_thread_); +RTCError SctpDataChannel::SendImpl(DataBuffer buffer) { if (state_ != kOpen) { error_ = RTCError(RTCErrorType::INVALID_STATE); - return false; + return error_; } // If the queue is non-empty, we're waiting for SignalReadyToSend, // so just add to the end of the queue and keep waiting. if (!queued_send_data_.Empty()) { - return QueueSendDataMessage(buffer); + error_ = QueueSendDataMessage(buffer) + ? RTCError::OK() + : RTCError(RTCErrorType::RESOURCE_EXHAUSTED); + return error_; } - SendDataMessage(buffer, true); + return SendDataMessage(buffer, true); +} - // Always return true for SCTP DataChannel per the spec. - return true; +void SctpDataChannel::SendAsync( + DataBuffer buffer, + absl::AnyInvocable on_complete) { + // Note: at this point, we do not know on which thread we're being called + // since this method bypasses the proxy. On Android the thread might be VM + // owned, on other platforms it might be the signaling thread, or in Chrome + // it can be the JS thread. We also don't know if it's consistently the same + // thread. So we always post to the network thread (even if the current thread + // might be the network thread - in theory a call could even come from within + // the `on_complete` callback). + network_thread_->PostTask(SafeTask( + network_safety_, [this, buffer = std::move(buffer), + on_complete = std::move(on_complete)]() mutable { + RTC_DCHECK_RUN_ON(network_thread_); + RTCError err = SendImpl(std::move(buffer)); + if (on_complete) + std::move(on_complete)(err); + })); } void SctpDataChannel::SetSctpSid_n(StreamId sid) { @@ -608,7 +644,7 @@ void SctpDataChannel::OnClosingProcedureComplete() { void SctpDataChannel::OnTransportChannelCreated() { RTC_DCHECK_RUN_ON(network_thread_); - connected_to_transport_ = true; + network_safety_->SetAlive(); } void SctpDataChannel::OnTransportChannelClosed(RTCError error) { @@ -697,10 +733,8 @@ void SctpDataChannel::OnTransportReady() { // what triggers the callback to `OnTransportReady()`. // These steps are currently accomplished via two separate PostTask calls to // the signaling thread, but could simply be done in single method call on - // the network thread (which incidentally is the thread that we'll need to - // be on for the below `Send*` calls, which currently do a BlockingCall - // from the signaling thread to the network thread. - RTC_DCHECK(connected_to_transport_); + // the network thread. + RTC_DCHECK(connected_to_transport()); RTC_DCHECK(id_n_.HasValue()); SendQueuedControlMessages(); @@ -716,7 +750,7 @@ void SctpDataChannel::CloseAbruptlyWithError(RTCError error) { return; } - connected_to_transport_ = false; + network_safety_->SetNotAlive(); // Closing abruptly means any queued data gets thrown away. queued_send_data_.Clear(); @@ -746,7 +780,7 @@ void SctpDataChannel::UpdateState() { switch (state_) { case kConnecting: { - if (connected_to_transport_ && controller_) { + if (connected_to_transport() && controller_) { if (handshake_state_ == kHandshakeShouldSendOpen) { rtc::CopyOnWriteBuffer payload; WriteDataChannelOpenMessage(label_, protocol_, priority_, ordered_, @@ -774,7 +808,7 @@ void SctpDataChannel::UpdateState() { break; } case kClosing: { - if (connected_to_transport_ && controller_) { + if (connected_to_transport() && controller_) { // Wait for all queued data to be sent before beginning the closing // procedure. if (queued_send_data_.Empty() && queued_control_data_.Empty()) { @@ -840,7 +874,7 @@ void SctpDataChannel::SendQueuedDataMessages() { while (!queued_send_data_.Empty()) { std::unique_ptr buffer = queued_send_data_.PopFront(); - if (!SendDataMessage(*buffer, false)) { + if (!SendDataMessage(*buffer, false).ok()) { // Return the message to the front of the queue if sending is aborted. queued_send_data_.PushFront(std::move(buffer)); break; @@ -849,12 +883,12 @@ void SctpDataChannel::SendQueuedDataMessages() { } // RTC_RUN_ON(network_thread_). -bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer, - bool queue_if_blocked) { +RTCError SctpDataChannel::SendDataMessage(const DataBuffer& buffer, + bool queue_if_blocked) { SendDataParams send_params; if (!controller_) { error_ = RTCError(RTCErrorType::INVALID_STATE); - return false; + return error_; } send_params.ordered = ordered_; @@ -879,12 +913,16 @@ bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer, if (observer_ && buffer.size() > 0) { observer_->OnBufferedAmountChange(buffer.size()); } - return true; + return error_; } if (error_.type() == RTCErrorType::RESOURCE_EXHAUSTED) { - if (!queue_if_blocked || QueueSendDataMessage(buffer)) { - return false; + if (!queue_if_blocked) + return error_; + + if (QueueSendDataMessage(buffer)) { + error_ = RTCError::OK(); + return error_; } } // Close the channel if the error is not SDR_BLOCK, or if queuing the @@ -895,7 +933,7 @@ bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer, CloseAbruptlyWithError( RTCError(RTCErrorType::NETWORK_ERROR, "Failure to send data")); - return false; + return error_; } // RTC_RUN_ON(network_thread_). @@ -924,7 +962,7 @@ void SctpDataChannel::SendQueuedControlMessages() { // RTC_RUN_ON(network_thread_). bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) { - RTC_DCHECK(connected_to_transport_); + RTC_DCHECK(connected_to_transport()); RTC_DCHECK(id_n_.HasValue()); RTC_DCHECK(controller_); diff --git a/pc/sctp_data_channel.h b/pc/sctp_data_channel.h index 50d5883db6..38b1ae02a7 100644 --- a/pc/sctp_data_channel.h +++ b/pc/sctp_data_channel.h @@ -23,6 +23,7 @@ #include "api/rtc_error.h" #include "api/scoped_refptr.h" #include "api/sequence_checker.h" +#include "api/task_queue/pending_task_safety_flag.h" #include "api/transport/data_channel_transport_interface.h" #include "pc/data_channel_utils.h" #include "pc/sctp_utils.h" @@ -171,6 +172,8 @@ class SctpDataChannel : public DataChannelInterface { uint32_t messages_received() const override; uint64_t bytes_received() const override; bool Send(const DataBuffer& buffer) override; + void SendAsync(DataBuffer buffer, + absl::AnyInvocable on_complete) override; // Close immediately, ignoring any queued data or closing procedure. // This is called when the underlying SctpTransport is being destroyed. @@ -247,13 +250,14 @@ class SctpDataChannel : public DataChannelInterface { kHandshakeReady }; + RTCError SendImpl(DataBuffer buffer) RTC_RUN_ON(network_thread_); void UpdateState() RTC_RUN_ON(network_thread_); void SetState(DataState state) RTC_RUN_ON(network_thread_); void DeliverQueuedReceivedData() RTC_RUN_ON(network_thread_); void SendQueuedDataMessages() RTC_RUN_ON(network_thread_); - bool SendDataMessage(const DataBuffer& buffer, bool queue_if_blocked) + RTCError SendDataMessage(const DataBuffer& buffer, bool queue_if_blocked) RTC_RUN_ON(network_thread_); bool QueueSendDataMessage(const DataBuffer& buffer) RTC_RUN_ON(network_thread_); @@ -262,6 +266,10 @@ class SctpDataChannel : public DataChannelInterface { bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) RTC_RUN_ON(network_thread_); + bool connected_to_transport() const RTC_RUN_ON(network_thread_) { + return network_safety_->alive(); + } + rtc::Thread* const signaling_thread_; rtc::Thread* const network_thread_; StreamId id_n_ RTC_GUARDED_BY(network_thread_); @@ -286,7 +294,6 @@ class SctpDataChannel : public DataChannelInterface { RTC_GUARDED_BY(network_thread_); HandshakeState handshake_state_ RTC_GUARDED_BY(network_thread_) = kHandshakeInit; - bool connected_to_transport_ RTC_GUARDED_BY(network_thread_) = false; // Did we already start the graceful SCTP closing procedure? bool started_closing_procedure_ RTC_GUARDED_BY(network_thread_) = false; // Control messages that always have to get sent out before any queued @@ -294,6 +301,8 @@ class SctpDataChannel : public DataChannelInterface { PacketQueue queued_control_data_ RTC_GUARDED_BY(network_thread_); PacketQueue queued_received_data_ RTC_GUARDED_BY(network_thread_); PacketQueue queued_send_data_ RTC_GUARDED_BY(network_thread_); + rtc::scoped_refptr network_safety_ = + PendingTaskSafetyFlag::CreateDetachedInactive(); }; } // namespace webrtc