AsyncPacketSocket::RegisterReceivedPacketCallback

This cl introduce RegisterReceivedPacketCallback and
DeregisterReceivedPacketCallback that will be used to replace AsyncPacketSocket::SignalReadPacket

A "proof of concept" cl is here: https://webrtc-review.googlesource.com/c/src/+/327324

Bug: webrtc:15368
Change-Id: I07e4f564dc8420d78e542991689778d8531225df
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/327325
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41164}
This commit is contained in:
Per K 2023-11-15 10:43:32 +01:00 committed by WebRTC LUCI CQ
parent 8b54e37cac
commit 4ac371883e
9 changed files with 270 additions and 79 deletions

View file

@ -608,6 +608,7 @@ if (rtc_include_tests && !build_with_chromium) {
"p2p:libstunprober_unittests", "p2p:libstunprober_unittests",
"p2p:rtc_p2p_unittests", "p2p:rtc_p2p_unittests",
"rtc_base:async_dns_resolver_unittests", "rtc_base:async_dns_resolver_unittests",
"rtc_base:async_packet_socket_unittest",
"rtc_base:callback_list_unittests", "rtc_base:callback_list_unittests",
"rtc_base:rtc_base_approved_unittests", "rtc_base:rtc_base_approved_unittests",
"rtc_base:rtc_base_unittests", "rtc_base:rtc_base_unittests",

View file

@ -266,14 +266,14 @@ static void SendPingAndReceiveResponse(Connection* lconn,
lconn->Ping(rtc::TimeMillis()); lconn->Ping(rtc::TimeMillis());
ASSERT_TRUE_WAIT(lport->last_stun_msg(), kDefaultTimeout); ASSERT_TRUE_WAIT(lport->last_stun_msg(), kDefaultTimeout);
ASSERT_GT(lport->last_stun_buf().size(), 0u); ASSERT_GT(lport->last_stun_buf().size(), 0u);
rconn->OnReadPacket( rconn->OnReadPacket(rtc::ReceivedPacket(lport->last_stun_buf(),
rtc::ReceivedPacket(lport->last_stun_buf(), absl::nullopt)); rtc::SocketAddress(), absl::nullopt));
clock->AdvanceTime(webrtc::TimeDelta::Millis(ms)); clock->AdvanceTime(webrtc::TimeDelta::Millis(ms));
ASSERT_TRUE_WAIT(rport->last_stun_msg(), kDefaultTimeout); ASSERT_TRUE_WAIT(rport->last_stun_msg(), kDefaultTimeout);
ASSERT_GT(rport->last_stun_buf().size(), 0u); ASSERT_GT(rport->last_stun_buf().size(), 0u);
lconn->OnReadPacket( lconn->OnReadPacket(rtc::ReceivedPacket(rport->last_stun_buf(),
rtc::ReceivedPacket(rport->last_stun_buf(), absl::nullopt)); rtc::SocketAddress(), absl::nullopt));
} }
class TestChannel : public sigslot::has_slots<> { class TestChannel : public sigslot::has_slots<> {
@ -1494,8 +1494,8 @@ TEST_F(PortTest, TestLoopbackCall) {
ASSERT_TRUE_WAIT(lport->last_stun_msg() != NULL, kDefaultTimeout); ASSERT_TRUE_WAIT(lport->last_stun_msg() != NULL, kDefaultTimeout);
IceMessage* msg = lport->last_stun_msg(); IceMessage* msg = lport->last_stun_msg();
EXPECT_EQ(STUN_BINDING_REQUEST, msg->type()); EXPECT_EQ(STUN_BINDING_REQUEST, msg->type());
conn->OnReadPacket( conn->OnReadPacket(rtc::ReceivedPacket(lport->last_stun_buf(),
rtc::ReceivedPacket(lport->last_stun_buf(), absl::nullopt)); rtc::SocketAddress(), absl::nullopt));
ASSERT_TRUE_WAIT(lport->last_stun_msg() != NULL, kDefaultTimeout); ASSERT_TRUE_WAIT(lport->last_stun_msg() != NULL, kDefaultTimeout);
msg = lport->last_stun_msg(); msg = lport->last_stun_msg();
EXPECT_EQ(STUN_BINDING_RESPONSE, msg->type()); EXPECT_EQ(STUN_BINDING_RESPONSE, msg->type());
@ -1528,10 +1528,8 @@ TEST_F(PortTest, TestLoopbackCall) {
lport->Reset(); lport->Reset();
auto buf = std::make_unique<ByteBufferWriter>(); auto buf = std::make_unique<ByteBufferWriter>();
WriteStunMessage(*modified_req, buf.get()); WriteStunMessage(*modified_req, buf.get());
conn1->OnReadPacket( conn1->OnReadPacket(rtc::ReceivedPacket::CreateFromLegacy(
rtc::ReceivedPacket(rtc::reinterpret_array_view<const uint8_t>( buf->Data(), buf->Length(), /*packet_time_us=*/-1));
rtc::MakeArrayView(buf->Data(), buf->Length())),
absl::nullopt));
ASSERT_TRUE_WAIT(lport->last_stun_msg() != NULL, kDefaultTimeout); ASSERT_TRUE_WAIT(lport->last_stun_msg() != NULL, kDefaultTimeout);
msg = lport->last_stun_msg(); msg = lport->last_stun_msg();
EXPECT_EQ(STUN_BINDING_ERROR_RESPONSE, msg->type()); EXPECT_EQ(STUN_BINDING_ERROR_RESPONSE, msg->type());
@ -1564,8 +1562,8 @@ TEST_F(PortTest, TestIceRoleConflict) {
IceMessage* msg = rport->last_stun_msg(); IceMessage* msg = rport->last_stun_msg();
EXPECT_EQ(STUN_BINDING_REQUEST, msg->type()); EXPECT_EQ(STUN_BINDING_REQUEST, msg->type());
// Send rport binding request to lport. // Send rport binding request to lport.
lconn->OnReadPacket( lconn->OnReadPacket(rtc::ReceivedPacket(rport->last_stun_buf(),
rtc::ReceivedPacket(rport->last_stun_buf(), absl::nullopt)); rtc::SocketAddress(), absl::nullopt));
ASSERT_TRUE_WAIT(lport->last_stun_msg() != NULL, kDefaultTimeout); ASSERT_TRUE_WAIT(lport->last_stun_msg() != NULL, kDefaultTimeout);
EXPECT_EQ(STUN_BINDING_RESPONSE, lport->last_stun_msg()->type()); EXPECT_EQ(STUN_BINDING_RESPONSE, lport->last_stun_msg()->type());
@ -1897,14 +1895,14 @@ TEST_F(PortTest, TestSendStunMessage) {
std::unique_ptr<IceMessage> request = CopyStunMessage(*msg); std::unique_ptr<IceMessage> request = CopyStunMessage(*msg);
// Receive the BINDING-REQUEST and respond with BINDING-RESPONSE. // Receive the BINDING-REQUEST and respond with BINDING-RESPONSE.
rconn->OnReadPacket( rconn->OnReadPacket(rtc::ReceivedPacket(lport->last_stun_buf(),
rtc::ReceivedPacket(lport->last_stun_buf(), absl::nullopt)); rtc::SocketAddress(), absl::nullopt));
msg = rport->last_stun_msg(); msg = rport->last_stun_msg();
ASSERT_TRUE(msg != NULL); ASSERT_TRUE(msg != NULL);
EXPECT_EQ(STUN_BINDING_RESPONSE, msg->type()); EXPECT_EQ(STUN_BINDING_RESPONSE, msg->type());
// Received a BINDING-RESPONSE. // Received a BINDING-RESPONSE.
lconn->OnReadPacket( lconn->OnReadPacket(rtc::ReceivedPacket(rport->last_stun_buf(),
rtc::ReceivedPacket(rport->last_stun_buf(), absl::nullopt)); rtc::SocketAddress(), absl::nullopt));
// Verify the STUN Stats. // Verify the STUN Stats.
EXPECT_EQ(1U, lconn->stats().sent_ping_requests_total); EXPECT_EQ(1U, lconn->stats().sent_ping_requests_total);
@ -1984,12 +1982,12 @@ TEST_F(PortTest, TestSendStunMessage) {
// Respond with a BINDING-RESPONSE. // Respond with a BINDING-RESPONSE.
request = CopyStunMessage(*msg); request = CopyStunMessage(*msg);
lconn->OnReadPacket( lconn->OnReadPacket(rtc::ReceivedPacket(rport->last_stun_buf(),
rtc::ReceivedPacket(rport->last_stun_buf(), absl::nullopt)); rtc::SocketAddress(), absl::nullopt));
msg = lport->last_stun_msg(); msg = lport->last_stun_msg();
// Receive the BINDING-RESPONSE. // Receive the BINDING-RESPONSE.
rconn->OnReadPacket( rconn->OnReadPacket(rtc::ReceivedPacket(lport->last_stun_buf(),
rtc::ReceivedPacket(lport->last_stun_buf(), absl::nullopt)); rtc::SocketAddress(), absl::nullopt));
// Verify the Stun ping stats. // Verify the Stun ping stats.
EXPECT_EQ(3U, rconn->stats().sent_ping_requests_total); EXPECT_EQ(3U, rconn->stats().sent_ping_requests_total);
@ -2040,8 +2038,8 @@ TEST_F(PortTest, TestNomination) {
lconn->Ping(0); lconn->Ping(0);
ASSERT_TRUE_WAIT(lport->last_stun_msg(), kDefaultTimeout); ASSERT_TRUE_WAIT(lport->last_stun_msg(), kDefaultTimeout);
ASSERT_GT(lport->last_stun_buf().size(), 0u); ASSERT_GT(lport->last_stun_buf().size(), 0u);
rconn->OnReadPacket( rconn->OnReadPacket(rtc::ReceivedPacket(lport->last_stun_buf(),
rtc::ReceivedPacket(lport->last_stun_buf(), absl::nullopt)); rtc::SocketAddress(), absl::nullopt));
EXPECT_EQ(nomination, rconn->remote_nomination()); EXPECT_EQ(nomination, rconn->remote_nomination());
EXPECT_FALSE(lconn->nominated()); EXPECT_FALSE(lconn->nominated());
@ -2053,8 +2051,8 @@ TEST_F(PortTest, TestNomination) {
// updating the acknowledged nomination of `lconn`. // updating the acknowledged nomination of `lconn`.
ASSERT_TRUE_WAIT(rport->last_stun_msg(), kDefaultTimeout); ASSERT_TRUE_WAIT(rport->last_stun_msg(), kDefaultTimeout);
ASSERT_GT(rport->last_stun_buf().size(), 0u); ASSERT_GT(rport->last_stun_buf().size(), 0u);
lconn->OnReadPacket( lconn->OnReadPacket(rtc::ReceivedPacket(rport->last_stun_buf(),
rtc::ReceivedPacket(rport->last_stun_buf(), absl::nullopt)); rtc::SocketAddress(), absl::nullopt));
EXPECT_EQ(nomination, lconn->acked_nomination()); EXPECT_EQ(nomination, lconn->acked_nomination());
EXPECT_TRUE(lconn->nominated()); EXPECT_TRUE(lconn->nominated());
@ -2181,8 +2179,8 @@ TEST_F(PortTest, TestNetworkCostChange) {
IceMessage* msg = lport->last_stun_msg(); IceMessage* msg = lport->last_stun_msg();
EXPECT_EQ(STUN_BINDING_REQUEST, msg->type()); EXPECT_EQ(STUN_BINDING_REQUEST, msg->type());
// Pass the binding request to rport. // Pass the binding request to rport.
rconn->OnReadPacket( rconn->OnReadPacket(rtc::ReceivedPacket(lport->last_stun_buf(),
rtc::ReceivedPacket(lport->last_stun_buf(), absl::nullopt)); rtc::SocketAddress(), absl::nullopt));
// Wait until rport sends the response and then check the remote network cost. // Wait until rport sends the response and then check the remote network cost.
ASSERT_TRUE_WAIT(rport->last_stun_msg() != NULL, kDefaultTimeout); ASSERT_TRUE_WAIT(rport->last_stun_msg() != NULL, kDefaultTimeout);
@ -2512,8 +2510,8 @@ TEST_F(PortTest,
// Send request. // Send request.
lconn->Ping(0); lconn->Ping(0);
ASSERT_TRUE_WAIT(lport->last_stun_msg() != NULL, kDefaultTimeout); ASSERT_TRUE_WAIT(lport->last_stun_msg() != NULL, kDefaultTimeout);
rconn->OnReadPacket( rconn->OnReadPacket(rtc::ReceivedPacket(lport->last_stun_buf(),
rtc::ReceivedPacket(lport->last_stun_buf(), absl::nullopt)); rtc::SocketAddress(), absl::nullopt));
// Intercept request and add comprehension required attribute. // Intercept request and add comprehension required attribute.
ASSERT_TRUE_WAIT(rport->last_stun_msg() != NULL, kDefaultTimeout); ASSERT_TRUE_WAIT(rport->last_stun_msg() != NULL, kDefaultTimeout);
@ -2523,10 +2521,8 @@ TEST_F(PortTest,
modified_response->AddFingerprint(); modified_response->AddFingerprint();
ByteBufferWriter buf; ByteBufferWriter buf;
WriteStunMessage(*modified_response, &buf); WriteStunMessage(*modified_response, &buf);
lconn->OnReadPacket( lconn->OnReadPacket(rtc::ReceivedPacket::CreateFromLegacy(
rtc::ReceivedPacket(rtc::reinterpret_array_view<const uint8_t>( buf.Data(), buf.Length(), /*packet_time_us=*/-1));
rtc::MakeArrayView(buf.Data(), buf.Length())),
absl::nullopt));
// Response should have been ignored, leaving us unwritable still. // Response should have been ignored, leaving us unwritable still.
EXPECT_FALSE(lconn->writable()); EXPECT_FALSE(lconn->writable());
} }
@ -2554,10 +2550,8 @@ TEST_F(PortTest,
in_msg->AddFingerprint(); in_msg->AddFingerprint();
ByteBufferWriter buf; ByteBufferWriter buf;
WriteStunMessage(*in_msg, &buf); WriteStunMessage(*in_msg, &buf);
lconn->OnReadPacket( lconn->OnReadPacket(rtc::ReceivedPacket::CreateFromLegacy(
rtc::ReceivedPacket(rtc::reinterpret_array_view<const uint8_t>( buf.Data(), buf.Length(), /*packet_time_us=*/-1));
rtc::MakeArrayView(buf.Data(), buf.Length())),
absl::nullopt));
EXPECT_EQ(0u, lconn->last_ping_received()); EXPECT_EQ(0u, lconn->last_ping_received());
} }
@ -2603,8 +2597,8 @@ TEST_F(PortTest, TestHandleStunBindingIndication) {
IceMessage* msg = rport->last_stun_msg(); IceMessage* msg = rport->last_stun_msg();
EXPECT_EQ(STUN_BINDING_REQUEST, msg->type()); EXPECT_EQ(STUN_BINDING_REQUEST, msg->type());
// Send rport binding request to lport. // Send rport binding request to lport.
lconn->OnReadPacket( lconn->OnReadPacket(rtc::ReceivedPacket(rport->last_stun_buf(),
rtc::ReceivedPacket(rport->last_stun_buf(), absl::nullopt)); rtc::SocketAddress(), absl::nullopt));
ASSERT_TRUE_WAIT(lport->last_stun_msg() != NULL, kDefaultTimeout); ASSERT_TRUE_WAIT(lport->last_stun_msg() != NULL, kDefaultTimeout);
EXPECT_EQ(STUN_BINDING_RESPONSE, lport->last_stun_msg()->type()); EXPECT_EQ(STUN_BINDING_RESPONSE, lport->last_stun_msg()->type());
@ -2613,10 +2607,8 @@ TEST_F(PortTest, TestHandleStunBindingIndication) {
// Adding a delay of 100ms. // Adding a delay of 100ms.
rtc::Thread::Current()->ProcessMessages(100); rtc::Thread::Current()->ProcessMessages(100);
// Pinging lconn using stun indication message. // Pinging lconn using stun indication message.
lconn->OnReadPacket( lconn->OnReadPacket(rtc::ReceivedPacket::CreateFromLegacy(
rtc::ReceivedPacket(rtc::reinterpret_array_view<const uint8_t>( buf->Data(), buf->Length(), /*packet_time_us=*/-1));
rtc::MakeArrayView(buf->Data(), buf->Length())),
absl::nullopt));
int64_t last_ping_received2 = lconn->last_ping_received(); int64_t last_ping_received2 = lconn->last_ping_received();
EXPECT_GT(last_ping_received2, last_ping_received1); EXPECT_GT(last_ping_received2, last_ping_received1);
} }
@ -3113,8 +3105,8 @@ TEST_F(PortTest, TestIceLiteConnectivity) {
con->SendStunBindingResponse(request.get()); con->SendStunBindingResponse(request.get());
// Feeding the respone message from litemode to the full mode connection. // Feeding the respone message from litemode to the full mode connection.
ch1.conn()->OnReadPacket( ch1.conn()->OnReadPacket(rtc::ReceivedPacket(
rtc::ReceivedPacket(ice_lite_port->last_stun_buf(), absl::nullopt)); ice_lite_port->last_stun_buf(), rtc::SocketAddress(), absl::nullopt));
// Verifying full mode connection becomes writable from the response. // Verifying full mode connection becomes writable from the response.
EXPECT_EQ_WAIT(Connection::STATE_WRITABLE, ch1.conn()->write_state(), EXPECT_EQ_WAIT(Connection::STATE_WRITABLE, ch1.conn()->write_state(),
@ -3231,8 +3223,8 @@ TEST_P(GoogPingTest, TestGoogPingAnnounceEnable) {
GetSupportedGoogPingVersion(response) >= kGoogPingVersion); GetSupportedGoogPingVersion(response) >= kGoogPingVersion);
// Feeding the respone message back. // Feeding the respone message back.
ch1.conn()->OnReadPacket( ch1.conn()->OnReadPacket(rtc::ReceivedPacket(
rtc::ReceivedPacket(port2->last_stun_buf(), absl::nullopt)); port2->last_stun_buf(), rtc::SocketAddress(), absl::nullopt));
port1->Reset(); port1->Reset();
port2->Reset(); port2->Reset();
@ -3415,10 +3407,8 @@ TEST_F(PortTest, TestGoogPingUnsupportedVersionInStunBindingResponse) {
modified_response->Write(&buf); modified_response->Write(&buf);
// Feeding the modified respone message back. // Feeding the modified respone message back.
ch1.conn()->OnReadPacket( ch1.conn()->OnReadPacket(rtc::ReceivedPacket::CreateFromLegacy(
rtc::ReceivedPacket(rtc::reinterpret_array_view<const uint8_t>( buf.Data(), buf.Length(), /*packet_time_us=*/-1));
rtc::MakeArrayView(buf.Data(), buf.Length())),
absl::nullopt));
port1->Reset(); port1->Reset();
port2->Reset(); port2->Reset();
@ -3490,8 +3480,8 @@ TEST_F(PortTest, TestChangeInAttributeMakesGoogPingFallsbackToStunBinding) {
ASSERT_TRUE(GetSupportedGoogPingVersion(response) >= kGoogPingVersion); ASSERT_TRUE(GetSupportedGoogPingVersion(response) >= kGoogPingVersion);
// Feeding the respone message back. // Feeding the respone message back.
ch1.conn()->OnReadPacket( ch1.conn()->OnReadPacket(rtc::ReceivedPacket(
rtc::ReceivedPacket(port2->last_stun_buf(), absl::nullopt)); port2->last_stun_buf(), rtc::SocketAddress(), absl::nullopt));
port1->Reset(); port1->Reset();
port2->Reset(); port2->Reset();
@ -3575,8 +3565,8 @@ TEST_F(PortTest, TestErrorResponseMakesGoogPingFallBackToStunBinding) {
ASSERT_TRUE(GetSupportedGoogPingVersion(response) >= kGoogPingVersion); ASSERT_TRUE(GetSupportedGoogPingVersion(response) >= kGoogPingVersion);
// Feeding the respone message back. // Feeding the respone message back.
ch1.conn()->OnReadPacket( ch1.conn()->OnReadPacket(rtc::ReceivedPacket(
rtc::ReceivedPacket(port2->last_stun_buf(), absl::nullopt)); port2->last_stun_buf(), rtc::SocketAddress(), absl::nullopt));
port1->Reset(); port1->Reset();
port2->Reset(); port2->Reset();
@ -3602,10 +3592,8 @@ TEST_F(PortTest, TestErrorResponseMakesGoogPingFallBackToStunBinding) {
rtc::ByteBufferWriter buf; rtc::ByteBufferWriter buf;
error_response.Write(&buf); error_response.Write(&buf);
ch1.conn()->OnReadPacket( ch1.conn()->OnReadPacket(rtc::ReceivedPacket::CreateFromLegacy(
rtc::ReceivedPacket(rtc::reinterpret_array_view<const uint8_t>( buf.Data(), buf.Length(), /*packet_time_us=*/-1));
rtc::MakeArrayView(buf.Data(), buf.Length())),
absl::nullopt));
// And now the third ping...this should be a binding. // And now the third ping...this should be a binding.
port1->Reset(); port1->Reset();
@ -3842,8 +3830,8 @@ class ConnectionTest : public PortTest {
lconn->Ping(rtc::TimeMillis()); lconn->Ping(rtc::TimeMillis());
ASSERT_TRUE_WAIT(lport->last_stun_msg(), kDefaultTimeout); ASSERT_TRUE_WAIT(lport->last_stun_msg(), kDefaultTimeout);
ASSERT_GT(lport->last_stun_buf().size(), 0u); ASSERT_GT(lport->last_stun_buf().size(), 0u);
rconn->OnReadPacket( rconn->OnReadPacket(rtc::ReceivedPacket(
rtc::ReceivedPacket(lport->last_stun_buf(), absl::nullopt)); lport->last_stun_buf(), rtc::SocketAddress(), absl::nullopt));
clock_.AdvanceTime(webrtc::TimeDelta::Millis(ms)); clock_.AdvanceTime(webrtc::TimeDelta::Millis(ms));
ASSERT_TRUE_WAIT(rport->last_stun_msg(), kDefaultTimeout); ASSERT_TRUE_WAIT(rport->last_stun_msg(), kDefaultTimeout);
@ -3857,7 +3845,8 @@ class ConnectionTest : public PortTest {
rtc::BufferT<uint8_t> reply; rtc::BufferT<uint8_t> reply;
SendPingAndCaptureReply(lconn, rconn, ms, &reply); SendPingAndCaptureReply(lconn, rconn, ms, &reply);
lconn->OnReadPacket(rtc::ReceivedPacket(reply, absl::nullopt)); lconn->OnReadPacket(
rtc::ReceivedPacket(reply, rtc::SocketAddress(), absl::nullopt));
} }
void OnConnectionStateChange(Connection* connection) { num_state_changes_++; } void OnConnectionStateChange(Connection* connection) { num_state_changes_++; }
@ -3918,7 +3907,8 @@ TEST_F(ConnectionTest, ConnectionForgetLearnedStateDiscardsPendingPings) {
EXPECT_FALSE(lconn->writable()); EXPECT_FALSE(lconn->writable());
EXPECT_FALSE(lconn->receiving()); EXPECT_FALSE(lconn->receiving());
lconn->OnReadPacket(rtc::ReceivedPacket(reply, absl::nullopt)); lconn->OnReadPacket(
rtc::ReceivedPacket(reply, rtc::SocketAddress(), absl::nullopt));
// That reply was discarded due to the ForgetLearnedState() while it was // That reply was discarded due to the ForgetLearnedState() while it was
// outstanding. // outstanding.
@ -3990,15 +3980,15 @@ TEST_F(ConnectionTest, SendReceiveGoogDelta) {
lconn->Ping(rtc::TimeMillis(), std::move(delta)); lconn->Ping(rtc::TimeMillis(), std::move(delta));
ASSERT_TRUE_WAIT(lport_->last_stun_msg(), kDefaultTimeout); ASSERT_TRUE_WAIT(lport_->last_stun_msg(), kDefaultTimeout);
ASSERT_GT(lport_->last_stun_buf().size(), 0u); ASSERT_GT(lport_->last_stun_buf().size(), 0u);
rconn->OnReadPacket( rconn->OnReadPacket(rtc::ReceivedPacket(lport_->last_stun_buf(),
rtc::ReceivedPacket(lport_->last_stun_buf(), absl::nullopt)); rtc::SocketAddress(), absl::nullopt));
EXPECT_TRUE(received_goog_delta); EXPECT_TRUE(received_goog_delta);
clock_.AdvanceTime(webrtc::TimeDelta::Millis(ms)); clock_.AdvanceTime(webrtc::TimeDelta::Millis(ms));
ASSERT_TRUE_WAIT(rport_->last_stun_msg(), kDefaultTimeout); ASSERT_TRUE_WAIT(rport_->last_stun_msg(), kDefaultTimeout);
ASSERT_GT(rport_->last_stun_buf().size(), 0u); ASSERT_GT(rport_->last_stun_buf().size(), 0u);
lconn->OnReadPacket( lconn->OnReadPacket(rtc::ReceivedPacket(rport_->last_stun_buf(),
rtc::ReceivedPacket(rport_->last_stun_buf(), absl::nullopt)); rtc::SocketAddress(), absl::nullopt));
EXPECT_TRUE(received_goog_delta_ack); EXPECT_TRUE(received_goog_delta_ack);
} }
@ -4028,14 +4018,14 @@ TEST_F(ConnectionTest, SendGoogDeltaNoReply) {
lconn->Ping(rtc::TimeMillis(), std::move(delta)); lconn->Ping(rtc::TimeMillis(), std::move(delta));
ASSERT_TRUE_WAIT(lport_->last_stun_msg(), kDefaultTimeout); ASSERT_TRUE_WAIT(lport_->last_stun_msg(), kDefaultTimeout);
ASSERT_GT(lport_->last_stun_buf().size(), 0u); ASSERT_GT(lport_->last_stun_buf().size(), 0u);
rconn->OnReadPacket( rconn->OnReadPacket(rtc::ReceivedPacket(lport_->last_stun_buf(),
rtc::ReceivedPacket(lport_->last_stun_buf(), absl::nullopt)); rtc::SocketAddress(), absl::nullopt));
clock_.AdvanceTime(webrtc::TimeDelta::Millis(ms)); clock_.AdvanceTime(webrtc::TimeDelta::Millis(ms));
ASSERT_TRUE_WAIT(rport_->last_stun_msg(), kDefaultTimeout); ASSERT_TRUE_WAIT(rport_->last_stun_msg(), kDefaultTimeout);
ASSERT_GT(rport_->last_stun_buf().size(), 0u); ASSERT_GT(rport_->last_stun_buf().size(), 0u);
lconn->OnReadPacket( lconn->OnReadPacket(rtc::ReceivedPacket(rport_->last_stun_buf(),
rtc::ReceivedPacket(rport_->last_stun_buf(), absl::nullopt)); rtc::SocketAddress(), absl::nullopt));
EXPECT_TRUE(received_goog_delta_ack_error); EXPECT_TRUE(received_goog_delta_ack_error);
} }

View file

@ -1369,10 +1369,12 @@ rtc_library("async_packet_socket") {
] ]
deps = [ deps = [
":callback_list", ":callback_list",
":checks",
":dscp", ":dscp",
":socket", ":socket",
":timeutils", ":timeutils",
"../api:sequence_checker", "../api:sequence_checker",
"network:received_packet",
"network:sent_packet", "network:sent_packet",
"system:no_unique_address", "system:no_unique_address",
"system:rtc_export", "system:rtc_export",
@ -1380,6 +1382,21 @@ rtc_library("async_packet_socket") {
] ]
} }
if (rtc_include_tests) {
rtc_library("async_packet_socket_unittest") {
testonly = true
visibility = [ "*" ]
sources = [ "async_packet_socket_unittest.cc" ]
deps = [
":async_packet_socket",
":gunit_helpers",
"../test:test_support",
"network:received_packet",
"third_party/sigslot",
]
}
}
rtc_library("mdns_responder_interface") { rtc_library("mdns_responder_interface") {
sources = [ "mdns_responder_interface.h" ] sources = [ "mdns_responder_interface.h" ]
deps = [ ":ip_address" ] deps = [ ":ip_address" ]

View file

@ -10,6 +10,8 @@
#include "rtc_base/async_packet_socket.h" #include "rtc_base/async_packet_socket.h"
#include "rtc_base/checks.h"
namespace rtc { namespace rtc {
PacketTimeUpdateParams::PacketTimeUpdateParams() = default; PacketTimeUpdateParams::PacketTimeUpdateParams() = default;
@ -38,6 +40,41 @@ void AsyncPacketSocket::UnsubscribeCloseEvent(const void* removal_tag) {
on_close_.RemoveReceivers(removal_tag); on_close_.RemoveReceivers(removal_tag);
} }
void AsyncPacketSocket::RegisterReceivedPacketCallback(
absl::AnyInvocable<void(AsyncPacketSocket*, const rtc::ReceivedPacket&)>
received_packet_callback) {
RTC_DCHECK_RUN_ON(&network_checker_);
RTC_CHECK(!received_packet_callback_);
SignalReadPacket.connect(this, &AsyncPacketSocket::NotifyPacketReceived);
received_packet_callback_ = std::move(received_packet_callback);
}
void AsyncPacketSocket::DeregisterReceivedPacketCallback() {
RTC_DCHECK_RUN_ON(&network_checker_);
SignalReadPacket.disconnect(this);
received_packet_callback_ = nullptr;
}
void AsyncPacketSocket::NotifyPacketReceived(
const rtc::ReceivedPacket& packet) {
RTC_DCHECK_RUN_ON(&network_checker_);
if (received_packet_callback_) {
received_packet_callback_(this, packet);
return;
}
if (SignalReadPacket.is_empty()) {
RTC_DCHECK_NOTREACHED() << " No listener registered";
return;
}
// TODO(bugs.webrtc.org:15368): Remove. This code path is only used if
// SignalReadyPacket is used by clients to get notification of received
// packets but actual socket implementation use NotifyPacketReceived to
// trigger the notification.
SignalReadPacket(this, reinterpret_cast<const char*>(packet.payload().data()),
packet.payload().size(), packet.source_address(),
packet.arrival_time() ? packet.arrival_time()->us() : -1);
}
void CopySocketInformationToPacketInfo(size_t packet_size_bytes, void CopySocketInformationToPacketInfo(size_t packet_size_bytes,
const AsyncPacketSocket& socket_from, const AsyncPacketSocket& socket_from,
bool is_connectionless, bool is_connectionless,

View file

@ -11,11 +11,13 @@
#ifndef RTC_BASE_ASYNC_PACKET_SOCKET_H_ #ifndef RTC_BASE_ASYNC_PACKET_SOCKET_H_
#define RTC_BASE_ASYNC_PACKET_SOCKET_H_ #define RTC_BASE_ASYNC_PACKET_SOCKET_H_
#include <cstdint>
#include <vector> #include <vector>
#include "api/sequence_checker.h" #include "api/sequence_checker.h"
#include "rtc_base/callback_list.h" #include "rtc_base/callback_list.h"
#include "rtc_base/dscp.h" #include "rtc_base/dscp.h"
#include "rtc_base/network/received_packet.h"
#include "rtc_base/network/sent_packet.h" #include "rtc_base/network/sent_packet.h"
#include "rtc_base/socket.h" #include "rtc_base/socket.h"
#include "rtc_base/system/no_unique_address.h" #include "rtc_base/system/no_unique_address.h"
@ -115,8 +117,14 @@ class RTC_EXPORT AsyncPacketSocket : public sigslot::has_slots<> {
std::function<void(AsyncPacketSocket*, int)> callback); std::function<void(AsyncPacketSocket*, int)> callback);
void UnsubscribeCloseEvent(const void* removal_tag); void UnsubscribeCloseEvent(const void* removal_tag);
void RegisterReceivedPacketCallback(
absl::AnyInvocable<void(AsyncPacketSocket*, const rtc::ReceivedPacket&)>
received_packet_callback);
void DeregisterReceivedPacketCallback();
// Emitted each time a packet is read. Used only for UDP and // Emitted each time a packet is read. Used only for UDP and
// connected TCP sockets. // connected TCP sockets.
// TODO(bugs.webrtc.org:15368): Deprecate and remove.
sigslot::signal5<AsyncPacketSocket*, sigslot::signal5<AsyncPacketSocket*,
const char*, const char*,
size_t, size_t,
@ -155,12 +163,26 @@ class RTC_EXPORT AsyncPacketSocket : public sigslot::has_slots<> {
on_close_.Send(this, err); on_close_.Send(this, err);
} }
// TODO(bugs.webrtc.org:15368): Deprecate and remove.
void NotifyPacketReceived(AsyncPacketSocket*,
const char* data,
size_t size,
const SocketAddress& address,
const int64_t& packet_time_us) {
NotifyPacketReceived(
ReceivedPacket::CreateFromLegacy(data, size, packet_time_us, address));
}
void NotifyPacketReceived(const rtc::ReceivedPacket& packet);
RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker network_checker_{ RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker network_checker_{
webrtc::SequenceChecker::kDetached}; webrtc::SequenceChecker::kDetached};
private: private:
webrtc::CallbackList<AsyncPacketSocket*, int> on_close_ webrtc::CallbackList<AsyncPacketSocket*, int> on_close_
RTC_GUARDED_BY(&network_checker_); RTC_GUARDED_BY(&network_checker_);
absl::AnyInvocable<void(AsyncPacketSocket*, const rtc::ReceivedPacket&)>
received_packet_callback_ RTC_GUARDED_BY(&network_checker_);
}; };
// Listen socket, producing an AsyncPacketSocket when a peer connects. // Listen socket, producing an AsyncPacketSocket when a peer connects.

View file

@ -0,0 +1,110 @@
/*
* Copyright 2023 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "rtc_base/async_packet_socket.h"
#include "rtc_base/third_party/sigslot/sigslot.h"
#include "test/gmock.h"
#include "test/gtest.h"
namespace rtc {
namespace {
using ::testing::MockFunction;
class MockAsyncPacketSocket : public rtc::AsyncPacketSocket {
public:
~MockAsyncPacketSocket() = default;
MOCK_METHOD(SocketAddress, GetLocalAddress, (), (const, override));
MOCK_METHOD(SocketAddress, GetRemoteAddress, (), (const, override));
MOCK_METHOD(int,
Send,
(const void* pv, size_t cb, const rtc::PacketOptions& options),
(override));
MOCK_METHOD(int,
SendTo,
(const void* pv,
size_t cb,
const SocketAddress& addr,
const rtc::PacketOptions& options),
(override));
MOCK_METHOD(int, Close, (), (override));
MOCK_METHOD(State, GetState, (), (const, override));
MOCK_METHOD(int,
GetOption,
(rtc::Socket::Option opt, int* value),
(override));
MOCK_METHOD(int, SetOption, (rtc::Socket::Option opt, int value), (override));
MOCK_METHOD(int, GetError, (), (const, override));
MOCK_METHOD(void, SetError, (int error), (override));
void NotifyPacketReceived() {
char data[1] = {'a'};
AsyncPacketSocket::NotifyPacketReceived(this, data, 1, SocketAddress(), -1);
}
};
TEST(AsyncPacketSocket, RegisteredCallbackReceivePacketsFromNotify) {
MockAsyncPacketSocket mock_socket;
MockFunction<void(AsyncPacketSocket*, const rtc::ReceivedPacket&)>
received_packet;
EXPECT_CALL(received_packet, Call);
mock_socket.RegisterReceivedPacketCallback(received_packet.AsStdFunction());
mock_socket.NotifyPacketReceived();
}
TEST(AsyncPacketSocket, RegisteredCallbackReceivePacketsFromSignalReadPacket) {
MockAsyncPacketSocket mock_socket;
MockFunction<void(AsyncPacketSocket*, const rtc::ReceivedPacket&)>
received_packet;
EXPECT_CALL(received_packet, Call);
mock_socket.RegisterReceivedPacketCallback(received_packet.AsStdFunction());
char data[1] = {'a'};
mock_socket.SignalReadPacket(&mock_socket, data, 1, SocketAddress(), -1);
}
TEST(AsyncPacketSocket, SignalReadPacketTriggeredByNotifyPacketReceived) {
class SigslotPacketReceiver : public sigslot::has_slots<> {
public:
explicit SigslotPacketReceiver(rtc::AsyncPacketSocket& socket) {
socket.SignalReadPacket.connect(this,
&SigslotPacketReceiver::OnPacketReceived);
}
bool packet_received() const { return packet_received_; }
private:
void OnPacketReceived(AsyncPacketSocket*,
const char*,
size_t,
const SocketAddress&,
// TODO(bugs.webrtc.org/9584): Change to passing the
// int64_t timestamp by value.
const int64_t&) {
packet_received_ = true;
}
bool packet_received_ = false;
};
MockAsyncPacketSocket mock_socket;
SigslotPacketReceiver receiver(mock_socket);
ASSERT_FALSE(receiver.packet_received());
mock_socket.NotifyPacketReceived();
EXPECT_TRUE(receiver.packet_received());
}
} // namespace
} // namespace rtc

View file

@ -23,6 +23,7 @@ rtc_library("received_packet") {
"received_packet.h", "received_packet.h",
] ]
deps = [ deps = [
"..:socket_address",
"../../api:array_view", "../../api:array_view",
"../../api/units:timestamp", "../../api/units:timestamp",
] ]

View file

@ -17,16 +17,22 @@
namespace rtc { namespace rtc {
ReceivedPacket::ReceivedPacket(rtc::ArrayView<const uint8_t> payload, ReceivedPacket::ReceivedPacket(rtc::ArrayView<const uint8_t> payload,
const SocketAddress& source_address,
absl::optional<webrtc::Timestamp> arrival_time) absl::optional<webrtc::Timestamp> arrival_time)
: payload_(payload), arrival_time_(std::move(arrival_time)) {} : payload_(payload),
arrival_time_(std::move(arrival_time)),
source_address_(source_address) {}
// static // static
ReceivedPacket ReceivedPacket::CreateFromLegacy(const char* data, ReceivedPacket ReceivedPacket::CreateFromLegacy(
size_t size, const char* data,
int64_t packet_time_us) { size_t size,
int64_t packet_time_us,
const rtc::SocketAddress& source_address) {
RTC_DCHECK(packet_time_us == -1 || packet_time_us >= 0); RTC_DCHECK(packet_time_us == -1 || packet_time_us >= 0);
return ReceivedPacket(rtc::reinterpret_array_view<const uint8_t>( return ReceivedPacket(rtc::reinterpret_array_view<const uint8_t>(
rtc::MakeArrayView(data, size)), rtc::MakeArrayView(data, size)),
source_address,
(packet_time_us >= 0) (packet_time_us >= 0)
? absl::optional<webrtc::Timestamp>( ? absl::optional<webrtc::Timestamp>(
webrtc::Timestamp::Micros(packet_time_us)) webrtc::Timestamp::Micros(packet_time_us))

View file

@ -15,6 +15,7 @@
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include "api/array_view.h" #include "api/array_view.h"
#include "api/units/timestamp.h" #include "api/units/timestamp.h"
#include "rtc_base/socket_address.h"
namespace rtc { namespace rtc {
@ -24,12 +25,15 @@ namespace rtc {
// example it may contains STUN, SCTP, SRTP, RTP, RTCP.... etc. // example it may contains STUN, SCTP, SRTP, RTP, RTCP.... etc.
class ReceivedPacket { class ReceivedPacket {
public: public:
// Caller must keep memory pointed to by payload valid for the lifetime of // Caller must keep memory pointed to by payload and address valid for the
// this ReceivedPacket. // lifetime of this ReceivedPacket.
ReceivedPacket( ReceivedPacket(
rtc::ArrayView<const uint8_t> payload, rtc::ArrayView<const uint8_t> payload,
const SocketAddress& source_address,
absl::optional<webrtc::Timestamp> arrival_time = absl::nullopt); absl::optional<webrtc::Timestamp> arrival_time = absl::nullopt);
// Address/port of the packet sender.
const SocketAddress& source_address() const { return source_address_; }
rtc::ArrayView<const uint8_t> payload() const { return payload_; } rtc::ArrayView<const uint8_t> payload() const { return payload_; }
// Timestamp when this packet was received. Not available on all socket // Timestamp when this packet was received. Not available on all socket
@ -38,13 +42,16 @@ class ReceivedPacket {
return arrival_time_; return arrival_time_;
} }
static ReceivedPacket CreateFromLegacy(const char* data, static ReceivedPacket CreateFromLegacy(
size_t size, const char* data,
int64_t packet_time_us); size_t size,
int64_t packet_time_us,
const rtc::SocketAddress& = rtc::SocketAddress());
private: private:
rtc::ArrayView<const uint8_t> payload_; rtc::ArrayView<const uint8_t> payload_;
absl::optional<webrtc::Timestamp> arrival_time_; absl::optional<webrtc::Timestamp> arrival_time_;
const SocketAddress& source_address_;
}; };
} // namespace rtc } // namespace rtc