From d07900c8480fd814bf23bde3d452ea29d9c97be9 Mon Sep 17 00:00:00 2001 From: Per K Date: Fri, 17 Nov 2023 10:18:25 +0100 Subject: [PATCH] UDP socket and TCP socket use AsyncPacketSocket::NotifyPacketReceived This is done instead of directly using AsyncPacketSocket::SignalReceived. Bug: webrtc:15368, webrtc:11943 Change-Id: I5671e66b270355188b1252138eced8e6c78ba7ad Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/327521 Commit-Queue: Per Kjellander Reviewed-by: Harald Alvestrand Cr-Commit-Position: refs/heads/main@{#41180} --- p2p/base/stun_server_unittest.cc | 9 ++-- rtc_base/BUILD.gn | 3 ++ rtc_base/async_tcp_socket.cc | 4 +- rtc_base/async_tcp_socket.h | 1 + rtc_base/async_udp_socket.cc | 4 +- rtc_base/nat_unittest.cc | 72 ++++++++++++++++++++------------ rtc_base/socket_unittest.cc | 6 +-- rtc_base/test_client.cc | 55 +++++++++++------------- rtc_base/test_client.h | 25 +++++------ 9 files changed, 94 insertions(+), 85 deletions(-) diff --git a/p2p/base/stun_server_unittest.cc b/p2p/base/stun_server_unittest.cc index 5d3f31fb98..e4ea30cba4 100644 --- a/p2p/base/stun_server_unittest.cc +++ b/p2p/base/stun_server_unittest.cc @@ -33,15 +33,13 @@ const rtc::SocketAddress client_addr("1.2.3.4", 1234); class StunServerTest : public ::testing::Test { public: - StunServerTest() : ss_(new rtc::VirtualSocketServer()), network_(ss_.get()) { + StunServerTest() : ss_(new rtc::VirtualSocketServer()) { + ss_->SetMessageQueue(&main_thread); server_.reset( new StunServer(rtc::AsyncUDPSocket::Create(ss_.get(), server_addr))); client_.reset(new rtc::TestClient( absl::WrapUnique(rtc::AsyncUDPSocket::Create(ss_.get(), client_addr)))); - - network_.Start(); } - ~StunServerTest() override { network_.Stop(); } void Send(const StunMessage& msg) { rtc::ByteBufferWriter buf; @@ -57,7 +55,7 @@ class StunServerTest : public ::testing::Test { std::unique_ptr packet = client_->NextPacket(rtc::TestClient::kTimeoutMs); if (packet) { - rtc::ByteBufferReader buf(packet->buf, packet->size); + rtc::ByteBufferReader buf(packet->buf); msg = new StunMessage(); msg->Read(&buf); } @@ -67,7 +65,6 @@ class StunServerTest : public ::testing::Test { private: rtc::AutoThread main_thread; std::unique_ptr ss_; - rtc::Thread network_; std::unique_ptr server_; std::unique_ptr client_; }; diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index 899a689e49..ca1a457dfa 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -1661,10 +1661,13 @@ rtc_library("testclient") { ] deps = [ ":async_udp_socket", + ":buffer", ":gunit_helpers", ":rtc_base_tests_utils", ":threading", ":timeutils", + "../api/units:timestamp", + "network:received_packet", "synchronization:mutex", ] } diff --git a/rtc_base/async_tcp_socket.cc b/rtc_base/async_tcp_socket.cc index 367c5b04e7..eed4a31c38 100644 --- a/rtc_base/async_tcp_socket.cc +++ b/rtc_base/async_tcp_socket.cc @@ -294,8 +294,8 @@ void AsyncTCPSocket::ProcessInput(char* data, size_t* len) { if (*len < kPacketLenSize + pkt_len) return; - SignalReadPacket(this, data + kPacketLenSize, pkt_len, remote_addr, - TimeMicros()); + NotifyPacketReceived(rtc::ReceivedPacket::CreateFromLegacy( + data + kPacketLenSize, pkt_len, rtc::TimeMicros(), remote_addr)); *len -= kPacketLenSize + pkt_len; if (*len > 0) { diff --git a/rtc_base/async_tcp_socket.h b/rtc_base/async_tcp_socket.h index 541080fba7..90f77d618e 100644 --- a/rtc_base/async_tcp_socket.h +++ b/rtc_base/async_tcp_socket.h @@ -13,6 +13,7 @@ #include +#include #include #include "rtc_base/async_packet_socket.h" diff --git a/rtc_base/async_udp_socket.cc b/rtc_base/async_udp_socket.cc index af7ae56fb6..358420a5de 100644 --- a/rtc_base/async_udp_socket.cc +++ b/rtc_base/async_udp_socket.cc @@ -136,8 +136,8 @@ void AsyncUDPSocket::OnReadEvent(Socket* socket) { // TODO: Make sure that we got all of the packet. // If we did not, then we should resize our buffer to be large enough. - SignalReadPacket(this, buf_, static_cast(len), remote_addr, - timestamp); + NotifyPacketReceived( + rtc::ReceivedPacket::CreateFromLegacy(buf_, len, timestamp, remote_addr)); } void AsyncUDPSocket::OnWriteEvent(Socket* socket) { diff --git a/rtc_base/nat_unittest.cc b/rtc_base/nat_unittest.cc index 19e53543ba..432985d283 100644 --- a/rtc_base/nat_unittest.cc +++ b/rtc_base/nat_unittest.cc @@ -11,14 +11,17 @@ #include #include +#include #include #include #include #include "absl/memory/memory.h" +#include "api/units/time_delta.h" #include "rtc_base/async_packet_socket.h" #include "rtc_base/async_tcp_socket.h" #include "rtc_base/async_udp_socket.h" +#include "rtc_base/event.h" #include "rtc_base/gunit.h" #include "rtc_base/ip_address.h" #include "rtc_base/logging.h" @@ -80,29 +83,36 @@ void TestSend(SocketServer* internal, NATSocketFactory* natsf = new NATSocketFactory( internal, nat->internal_udp_address(), nat->internal_tcp_address()); - TestClient* in = CreateTestClient(natsf, internal_addr); - TestClient* out[4]; - for (int i = 0; i < 4; i++) - out[i] = CreateTestClient(external, external_addrs[i]); - th_int.Start(); th_ext.Start(); + TestClient* in; + th_int.BlockingCall([&] { in = CreateTestClient(natsf, internal_addr); }); + + TestClient* out[4]; + th_ext.BlockingCall([&] { + for (int i = 0; i < 4; i++) + out[i] = CreateTestClient(external, external_addrs[i]); + }); + const char* buf = "filter_test"; size_t len = strlen(buf); - in->SendTo(buf, len, out[0]->address()); + th_int.BlockingCall([&] { in->SendTo(buf, len, out[0]->address()); }); SocketAddress trans_addr; - EXPECT_TRUE(out[0]->CheckNextPacket(buf, len, &trans_addr)); + th_ext.BlockingCall( + [&] { EXPECT_TRUE(out[0]->CheckNextPacket(buf, len, &trans_addr)); }); for (int i = 1; i < 4; i++) { - in->SendTo(buf, len, out[i]->address()); + th_int.BlockingCall([&] { in->SendTo(buf, len, out[i]->address()); }); SocketAddress trans_addr2; - EXPECT_TRUE(out[i]->CheckNextPacket(buf, len, &trans_addr2)); - bool are_same = (trans_addr == trans_addr2); - ASSERT_EQ(are_same, exp_same) << "same translated address"; - ASSERT_NE(AF_UNSPEC, trans_addr.family()); - ASSERT_NE(AF_UNSPEC, trans_addr2.family()); + th_ext.BlockingCall([&] { + EXPECT_TRUE(out[i]->CheckNextPacket(buf, len, &trans_addr2)); + bool are_same = (trans_addr == trans_addr2); + ASSERT_EQ(are_same, exp_same) << "same translated address"; + ASSERT_NE(AF_UNSPEC, trans_addr.family()); + ASSERT_NE(AF_UNSPEC, trans_addr2.family()); + }); } th_int.Stop(); @@ -134,29 +144,39 @@ void TestRecv(SocketServer* internal, NATSocketFactory* natsf = new NATSocketFactory( internal, nat->internal_udp_address(), nat->internal_tcp_address()); - TestClient* in = CreateTestClient(natsf, internal_addr); - TestClient* out[4]; - for (int i = 0; i < 4; i++) - out[i] = CreateTestClient(external, external_addrs[i]); - th_int.Start(); th_ext.Start(); + TestClient* in = nullptr; + th_int.BlockingCall([&] { in = CreateTestClient(natsf, internal_addr); }); + + TestClient* out[4]; + th_ext.BlockingCall([&] { + for (int i = 0; i < 4; i++) + out[i] = CreateTestClient(external, external_addrs[i]); + }); + const char* buf = "filter_test"; size_t len = strlen(buf); - in->SendTo(buf, len, out[0]->address()); + th_int.BlockingCall([&] { in->SendTo(buf, len, out[0]->address()); }); SocketAddress trans_addr; - EXPECT_TRUE(out[0]->CheckNextPacket(buf, len, &trans_addr)); + th_ext.BlockingCall( + [&] { EXPECT_TRUE(out[0]->CheckNextPacket(buf, len, &trans_addr)); }); - out[1]->SendTo(buf, len, trans_addr); - EXPECT_TRUE(CheckReceive(in, !filter_ip, buf, len)); + th_ext.BlockingCall([&] { out[1]->SendTo(buf, len, trans_addr); }); + th_int.BlockingCall( + [&] { EXPECT_TRUE(CheckReceive(in, !filter_ip, buf, len)); }); + th_ext.BlockingCall([&] { out[2]->SendTo(buf, len, trans_addr); }); - out[2]->SendTo(buf, len, trans_addr); - EXPECT_TRUE(CheckReceive(in, !filter_port, buf, len)); + th_int.BlockingCall( + [&] { EXPECT_TRUE(CheckReceive(in, !filter_port, buf, len)); }); - out[3]->SendTo(buf, len, trans_addr); - EXPECT_TRUE(CheckReceive(in, !filter_ip && !filter_port, buf, len)); + th_ext.BlockingCall([&] { out[3]->SendTo(buf, len, trans_addr); }); + + th_int.BlockingCall([&] { + EXPECT_TRUE(CheckReceive(in, !filter_ip && !filter_port, buf, len)); + }); th_int.Stop(); th_ext.Stop(); diff --git a/rtc_base/socket_unittest.cc b/rtc_base/socket_unittest.cc index 0a41a776ac..f5ef2a33fc 100644 --- a/rtc_base/socket_unittest.cc +++ b/rtc_base/socket_unittest.cc @@ -1132,13 +1132,13 @@ void SocketTest::UdpSocketRecvTimestampUseRtcEpoch(const IPAddress& loopback) { client2->SendTo("foo", 3, address); std::unique_ptr packet_1 = client1->NextPacket(10000); ASSERT_TRUE(packet_1 != nullptr); - EXPECT_NEAR(packet_1->packet_time_us, rtc::TimeMicros(), 1000'000); + EXPECT_NEAR(packet_1->packet_time->us(), rtc::TimeMicros(), 1000'000); Thread::SleepMs(100); client2->SendTo("bar", 3, address); std::unique_ptr packet_2 = client1->NextPacket(10000); ASSERT_TRUE(packet_2 != nullptr); - EXPECT_GT(packet_2->packet_time_us, packet_1->packet_time_us); - EXPECT_NEAR(packet_2->packet_time_us, rtc::TimeMicros(), 1000'000); + EXPECT_GT(packet_2->packet_time->us(), packet_1->packet_time->us()); + EXPECT_NEAR(packet_2->packet_time->us(), rtc::TimeMicros(), 1000'000); } } // namespace rtc diff --git a/rtc_base/test_client.cc b/rtc_base/test_client.cc index f23ac2aec0..87c946529e 100644 --- a/rtc_base/test_client.cc +++ b/rtc_base/test_client.cc @@ -15,7 +15,9 @@ #include #include +#include "api/units/timestamp.h" #include "rtc_base/gunit.h" +#include "rtc_base/network/received_packet.h" #include "rtc_base/thread.h" #include "rtc_base/time_utils.h" @@ -30,10 +32,11 @@ TestClient::TestClient(std::unique_ptr socket) TestClient::TestClient(std::unique_ptr socket, ThreadProcessingFakeClock* fake_clock) - : fake_clock_(fake_clock), - socket_(std::move(socket)), - prev_packet_timestamp_(-1) { - socket_->SignalReadPacket.connect(this, &TestClient::OnPacket); + : fake_clock_(fake_clock), socket_(std::move(socket)) { + socket_->RegisterReceivedPacketCallback( + [&](rtc::AsyncPacketSocket* socket, const rtc::ReceivedPacket& packet) { + OnPacket(socket, packet); + }); socket_->SignalReadyToSend.connect(this, &TestClient::OnReadyToSend); } @@ -100,20 +103,22 @@ bool TestClient::CheckNextPacket(const char* buf, bool res = false; std::unique_ptr packet = NextPacket(kTimeoutMs); if (packet) { - res = (packet->size == size && memcmp(packet->buf, buf, size) == 0 && - CheckTimestamp(packet->packet_time_us)); + res = (packet->buf.size() == size && + memcmp(packet->buf.data(), buf, size) == 0 && + CheckTimestamp(packet->packet_time)); if (addr) *addr = packet->addr; } return res; } -bool TestClient::CheckTimestamp(int64_t packet_timestamp) { +bool TestClient::CheckTimestamp( + absl::optional packet_timestamp) { bool res = true; - if (packet_timestamp == -1) { + if (!packet_timestamp) { res = false; } - if (prev_packet_timestamp_ != -1) { + if (prev_packet_timestamp_) { if (packet_timestamp < prev_packet_timestamp_) { res = false; } @@ -145,36 +150,24 @@ int TestClient::SetOption(Socket::Option opt, int value) { } void TestClient::OnPacket(AsyncPacketSocket* socket, - const char* buf, - size_t size, - const SocketAddress& remote_addr, - const int64_t& packet_time_us) { + const rtc::ReceivedPacket& received_packet) { webrtc::MutexLock lock(&mutex_); - packets_.push_back( - std::make_unique(remote_addr, buf, size, packet_time_us)); + packets_.push_back(std::make_unique(received_packet)); } void TestClient::OnReadyToSend(AsyncPacketSocket* socket) { ++ready_to_send_count_; } -TestClient::Packet::Packet(const SocketAddress& a, - const char* b, - size_t s, - int64_t packet_time_us) - : addr(a), buf(0), size(s), packet_time_us(packet_time_us) { - buf = new char[size]; - memcpy(buf, b, size); -} +TestClient::Packet::Packet(const rtc::ReceivedPacket& received_packet) + : addr(received_packet.source_address()), + // Copy received_packet payload to a buffer owned by Packet. + buf(received_packet.payload().data(), received_packet.payload().size()), + packet_time(received_packet.arrival_time()) {} TestClient::Packet::Packet(const Packet& p) - : addr(p.addr), buf(0), size(p.size), packet_time_us(p.packet_time_us) { - buf = new char[size]; - memcpy(buf, p.buf, size); -} - -TestClient::Packet::~Packet() { - delete[] buf; -} + : addr(p.addr), + buf(p.buf.data(), p.buf.size()), + packet_time(p.packet_time) {} } // namespace rtc diff --git a/rtc_base/test_client.h b/rtc_base/test_client.h index dd91d37ab9..6fe6fd5b83 100644 --- a/rtc_base/test_client.h +++ b/rtc_base/test_client.h @@ -14,8 +14,11 @@ #include #include +#include "api/units/timestamp.h" #include "rtc_base/async_udp_socket.h" +#include "rtc_base/buffer.h" #include "rtc_base/fake_clock.h" +#include "rtc_base/network/received_packet.h" #include "rtc_base/synchronization/mutex.h" namespace rtc { @@ -26,17 +29,12 @@ class TestClient : public sigslot::has_slots<> { public: // Records the contents of a packet that was received. struct Packet { - Packet(const SocketAddress& a, - const char* b, - size_t s, - int64_t packet_time_us); + Packet(const rtc::ReceivedPacket& received_packet); Packet(const Packet& p); - virtual ~Packet(); SocketAddress addr; - char* buf; - size_t size; - int64_t packet_time_us; + Buffer buf; + absl::optional packet_time; }; // Default timeout for NextPacket reads. @@ -96,14 +94,11 @@ class TestClient : public sigslot::has_slots<> { static const int kNoPacketTimeoutMs = 1000; // Workaround for the fact that AsyncPacketSocket::GetConnState doesn't exist. Socket::ConnState GetState(); - // Slot for packets read on the socket. + void OnPacket(AsyncPacketSocket* socket, - const char* buf, - size_t len, - const SocketAddress& remote_addr, - const int64_t& packet_time_us); + const rtc::ReceivedPacket& received_packet); void OnReadyToSend(AsyncPacketSocket* socket); - bool CheckTimestamp(int64_t packet_timestamp); + bool CheckTimestamp(absl::optional packet_timestamp); void AdvanceTime(int ms); ThreadProcessingFakeClock* fake_clock_ = nullptr; @@ -111,7 +106,7 @@ class TestClient : public sigslot::has_slots<> { std::unique_ptr socket_; std::vector> packets_; int ready_to_send_count_ = 0; - int64_t prev_packet_timestamp_; + absl::optional prev_packet_timestamp_; }; } // namespace rtc