Introduce Connection::RegisterReceivedPacketCallback

RegisterReceivedPacketCallback is used instead of
sigslot::SignalReadPacket. The callback use a new data class ReceivedPacket that combine meta
data and packet payload from a received packet.

This is the first step in an attempt to cleanup the data types used in
the packet receive pipeline.
Eventually, the ReceivedPacket class can contain more meta data such as
ECN information.

Bug: webrtc:11943,webrtc:15368
Change-Id: I984c561b9262fe4aa00176529bd8d901adf66640
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/325060
Reviewed-by: Jonas Oreland <jonaso@webrtc.org>
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41021}
This commit is contained in:
Per K 2023-10-26 13:54:41 +02:00 committed by WebRTC LUCI CQ
parent 971f8de35a
commit 25db2c65b6
9 changed files with 193 additions and 55 deletions

View file

@ -161,6 +161,7 @@ rtc_library("rtc_p2p") {
"../api/task_queue:pending_task_safety_flag", "../api/task_queue:pending_task_safety_flag",
"../rtc_base:safe_minmax", "../rtc_base:safe_minmax",
"../rtc_base:weak_ptr", "../rtc_base:weak_ptr",
"../rtc_base/network:received_packet",
"../rtc_base/network:sent_packet", "../rtc_base/network:sent_packet",
"../rtc_base/synchronization:mutex", "../rtc_base/synchronization:mutex",
"../rtc_base/system:rtc_export", "../rtc_base/system:rtc_export",

View file

@ -13,6 +13,7 @@
#include <math.h> #include <math.h>
#include <algorithm> #include <algorithm>
#include <cstdint>
#include <memory> #include <memory>
#include <utility> #include <utility>
#include <vector> #include <vector>
@ -21,6 +22,9 @@
#include "absl/strings/escaping.h" #include "absl/strings/escaping.h"
#include "absl/strings/match.h" #include "absl/strings/match.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "api/units/timestamp.h"
#include "p2p/base/port_allocator.h" #include "p2p/base/port_allocator.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/crc32.h" #include "rtc_base/crc32.h"
@ -246,6 +250,7 @@ Connection::Connection(rtc::WeakPtr<Port> port,
Connection::~Connection() { Connection::~Connection() {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
RTC_DCHECK(!port_); RTC_DCHECK(!port_);
RTC_DCHECK(!received_packet_callback_);
} }
webrtc::TaskQueueBase* Connection::network_thread() const { webrtc::TaskQueueBase* Connection::network_thread() const {
@ -445,6 +450,19 @@ void Connection::OnSendStunPacket(const void* data,
} }
} }
void Connection::RegisterReceivedPacketCallback(
absl::AnyInvocable<void(Connection*, const rtc::ReceivedPacket&)>
received_packet_callback) {
RTC_DCHECK_RUN_ON(network_thread_);
RTC_CHECK(!received_packet_callback_);
received_packet_callback_ = std::move(received_packet_callback);
}
void Connection::DeregisterReceivedPacketCallback() {
RTC_DCHECK_RUN_ON(network_thread_);
received_packet_callback_ = nullptr;
}
void Connection::OnReadPacket(const char* data, void Connection::OnReadPacket(const char* data,
size_t size, size_t size,
int64_t packet_time_us) { int64_t packet_time_us) {
@ -459,8 +477,22 @@ void Connection::OnReadPacket(const char* data,
UpdateReceiving(last_data_received_); UpdateReceiving(last_data_received_);
recv_rate_tracker_.AddSamples(size); recv_rate_tracker_.AddSamples(size);
stats_.packets_received++; stats_.packets_received++;
SignalReadPacket(this, data, size, packet_time_us); if (received_packet_callback_) {
RTC_DCHECK(packet_time_us == -1 || packet_time_us >= 0);
RTC_DCHECK(SignalReadPacket.is_empty());
received_packet_callback_(
this, rtc::ReceivedPacket(
rtc::reinterpret_array_view<const uint8_t>(
rtc::MakeArrayView(data, size)),
(packet_time_us >= 0)
? absl::optional<webrtc::Timestamp>(
webrtc::Timestamp::Micros(packet_time_us))
: absl::nullopt));
} else {
// TODO(webrtc:11943): Remove SignalReadPacket once upstream projects have
// switched to use RegisterReceivedPacket.
SignalReadPacket(this, data, size, packet_time_us);
}
// If timed out sending writability checks, start up again // If timed out sending writability checks, start up again
if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT)) { if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT)) {
RTC_LOG(LS_WARNING) RTC_LOG(LS_WARNING)

View file

@ -28,6 +28,7 @@
#include "p2p/base/transport_description.h" #include "p2p/base/transport_description.h"
#include "rtc_base/async_packet_socket.h" #include "rtc_base/async_packet_socket.h"
#include "rtc_base/network.h" #include "rtc_base/network.h"
#include "rtc_base/network/received_packet.h"
#include "rtc_base/numerics/event_based_exponential_moving_average.h" #include "rtc_base/numerics/event_based_exponential_moving_average.h"
#include "rtc_base/rate_tracker.h" #include "rtc_base/rate_tracker.h"
#include "rtc_base/system/rtc_export.h" #include "rtc_base/system/rtc_export.h"
@ -146,8 +147,16 @@ class RTC_EXPORT Connection : public CandidatePairInterface {
// Error if Send() returns < 0 // Error if Send() returns < 0
virtual int GetError() = 0; virtual int GetError() = 0;
// TODO(webrtc:11943): Remove SignalReadPacket once upstream projects have
// switched to use RegisterReceivedPacket.
sigslot::signal4<Connection*, const char*, size_t, int64_t> SignalReadPacket; sigslot::signal4<Connection*, const char*, size_t, int64_t> SignalReadPacket;
// Register as a recipient of received packets. There can only be one.
void RegisterReceivedPacketCallback(
absl::AnyInvocable<void(Connection*, const rtc::ReceivedPacket&)>
received_packet_callback);
void DeregisterReceivedPacketCallback();
sigslot::signal1<Connection*> SignalReadyToSend; sigslot::signal1<Connection*> SignalReadyToSend;
// Called when a packet is received on this connection. // Called when a packet is received on this connection.
@ -501,6 +510,8 @@ class RTC_EXPORT Connection : public CandidatePairInterface {
absl::optional< absl::optional<
std::function<void(webrtc::RTCErrorOr<const StunUInt64Attribute*>)>> std::function<void(webrtc::RTCErrorOr<const StunUInt64Attribute*>)>>
goog_delta_ack_consumer_; goog_delta_ack_consumer_;
absl::AnyInvocable<void(Connection*, const rtc::ReceivedPacket&)>
received_packet_callback_;
}; };
// ProxyConnection defers all the interesting work to the port. // ProxyConnection defers all the interesting work to the port.

View file

@ -276,8 +276,10 @@ void P2PTransportChannel::AddConnection(Connection* connection) {
connection->set_unwritable_timeout(config_.ice_unwritable_timeout); connection->set_unwritable_timeout(config_.ice_unwritable_timeout);
connection->set_unwritable_min_checks(config_.ice_unwritable_min_checks); connection->set_unwritable_min_checks(config_.ice_unwritable_min_checks);
connection->set_inactive_timeout(config_.ice_inactive_timeout); connection->set_inactive_timeout(config_.ice_inactive_timeout);
connection->SignalReadPacket.connect(this, connection->RegisterReceivedPacketCallback(
&P2PTransportChannel::OnReadPacket); [&](Connection* connection, const rtc::ReceivedPacket& packet) {
OnReadPacket(connection, packet);
});
connection->SignalReadyToSend.connect(this, connection->SignalReadyToSend.connect(this,
&P2PTransportChannel::OnReadyToSend); &P2PTransportChannel::OnReadyToSend);
connection->SignalStateChange.connect( connection->SignalStateChange.connect(
@ -2151,6 +2153,7 @@ void P2PTransportChannel::RemoveConnection(Connection* connection) {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
auto it = absl::c_find(connections_, connection); auto it = absl::c_find(connections_, connection);
RTC_DCHECK(it != connections_.end()); RTC_DCHECK(it != connections_.end());
connection->DeregisterReceivedPacketCallback();
connections_.erase(it); connections_.erase(it);
connection->ClearStunDictConsumer(); connection->ClearStunDictConsumer();
ice_controller_->OnConnectionDestroyed(connection); ice_controller_->OnConnectionDestroyed(connection);
@ -2221,39 +2224,30 @@ bool P2PTransportChannel::PrunePort(PortInterface* port) {
// We data is available, let listeners know // We data is available, let listeners know
void P2PTransportChannel::OnReadPacket(Connection* connection, void P2PTransportChannel::OnReadPacket(Connection* connection,
const char* data, const rtc::ReceivedPacket& packet) {
size_t len,
int64_t packet_time_us) {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
if (connection != selected_connection_ && !FindConnection(connection)) {
if (connection == selected_connection_) { // Do not deliver, if packet doesn't belong to the correct transport
// Let the client know of an incoming packet // channel.
packets_received_++; RTC_DCHECK_NOTREACHED();
bytes_received_ += len;
RTC_DCHECK(connection->last_data_received() >= last_data_received_ms_);
last_data_received_ms_ =
std::max(last_data_received_ms_, connection->last_data_received());
SignalReadPacket(this, data, len, packet_time_us, 0);
return; return;
} }
// Do not deliver, if packet doesn't belong to the correct transport // Let the client know of an incoming packet
// channel. packets_received_++;
if (!FindConnection(connection)) bytes_received_ += packet.payload().size();
return; RTC_DCHECK(connection->last_data_received() >= last_data_received_ms_);
last_data_received_ms_ =
std::max(last_data_received_ms_, connection->last_data_received());
packets_received_++; SignalReadPacket(
bytes_received_ += len; this, reinterpret_cast<const char*>(packet.payload().data()),
RTC_DCHECK(connection->last_data_received() >= last_data_received_ms_); packet.payload().size(),
last_data_received_ms_ = packet.arrival_time() ? packet.arrival_time()->us() : -1, 0);
std::max(last_data_received_ms_, connection->last_data_received());
// Let the client know of an incoming packet
SignalReadPacket(this, data, len, packet_time_us, 0);
// May need to switch the sending connection based on the receiving media // May need to switch the sending connection based on the receiving media
// path if this is the controlled side. // path if this is the controlled side.
if (ice_role_ == ICEROLE_CONTROLLED) { if (ice_role_ == ICEROLE_CONTROLLED && connection != selected_connection_) {
ice_controller_->OnImmediateSwitchRequest(IceSwitchReason::DATA_RECEIVED, ice_controller_->OnImmediateSwitchRequest(IceSwitchReason::DATA_RECEIVED,
connection); connection);
} }

View file

@ -342,10 +342,7 @@ class RTC_EXPORT P2PTransportChannel : public IceTransportInternal,
void OnRoleConflict(PortInterface* port); void OnRoleConflict(PortInterface* port);
void OnConnectionStateChange(Connection* connection); void OnConnectionStateChange(Connection* connection);
void OnReadPacket(Connection* connection, void OnReadPacket(Connection* connection, const rtc::ReceivedPacket& packet);
const char* data,
size_t len,
int64_t packet_time_us);
void OnSentPacket(const rtc::SentPacket& sent_packet); void OnSentPacket(const rtc::SentPacket& sent_packet);
void OnReadyToSend(Connection* connection); void OnReadyToSend(Connection* connection);
void OnConnectionDestroyed(Connection* connection); void OnConnectionDestroyed(Connection* connection);

View file

@ -216,19 +216,7 @@ class TurnPortTest : public ::testing::Test,
bool /*port_muxed*/) { bool /*port_muxed*/) {
turn_unknown_address_ = true; turn_unknown_address_ = true;
} }
void OnTurnReadPacket(Connection* conn,
const char* data,
size_t size,
int64_t packet_time_us) {
turn_packets_.push_back(rtc::Buffer(data, size));
}
void OnUdpPortComplete(Port* port) { udp_ready_ = true; } void OnUdpPortComplete(Port* port) { udp_ready_ = true; }
void OnUdpReadPacket(Connection* conn,
const char* data,
size_t size,
int64_t packet_time_us) {
udp_packets_.push_back(rtc::Buffer(data, size));
}
void OnSocketReadPacket(rtc::AsyncPacketSocket* socket, void OnSocketReadPacket(rtc::AsyncPacketSocket* socket,
const char* data, const char* data,
size_t size, size_t size,
@ -248,6 +236,10 @@ class TurnPortTest : public ::testing::Test,
} }
void OnTurnPortClosed() override { turn_port_closed_ = true; } void OnTurnPortClosed() override { turn_port_closed_ = true; }
void OnConnectionSignalDestroyed(Connection* connection) {
connection->DeregisterReceivedPacketCallback();
}
rtc::Socket* CreateServerSocket(const SocketAddress addr) { rtc::Socket* CreateServerSocket(const SocketAddress addr) {
rtc::Socket* socket = ss_->CreateSocket(AF_INET, SOCK_STREAM); rtc::Socket* socket = ss_->CreateSocket(AF_INET, SOCK_STREAM);
EXPECT_GE(socket->Bind(addr), 0); EXPECT_GE(socket->Bind(addr), 0);
@ -727,10 +719,20 @@ class TurnPortTest : public ::testing::Test,
Port::ORIGIN_MESSAGE); Port::ORIGIN_MESSAGE);
ASSERT_TRUE(conn1 != NULL); ASSERT_TRUE(conn1 != NULL);
ASSERT_TRUE(conn2 != NULL); ASSERT_TRUE(conn2 != NULL);
conn1->SignalReadPacket.connect(static_cast<TurnPortTest*>(this), conn1->RegisterReceivedPacketCallback(
&TurnPortTest::OnTurnReadPacket); [&](Connection* connection, const rtc::ReceivedPacket& packet) {
conn2->SignalReadPacket.connect(static_cast<TurnPortTest*>(this), turn_packets_.push_back(
&TurnPortTest::OnUdpReadPacket); rtc::Buffer(packet.payload().data(), packet.payload().size()));
});
conn1->SignalDestroyed.connect(this,
&TurnPortTest::OnConnectionSignalDestroyed);
conn2->RegisterReceivedPacketCallback(
[&](Connection* connection, const rtc::ReceivedPacket& packet) {
udp_packets_.push_back(
rtc::Buffer(packet.payload().data(), packet.payload().size()));
});
conn2->SignalDestroyed.connect(this,
&TurnPortTest::OnConnectionSignalDestroyed);
conn1->Ping(0); conn1->Ping(0);
EXPECT_EQ_SIMULATED_WAIT(Connection::STATE_WRITABLE, conn1->write_state(), EXPECT_EQ_SIMULATED_WAIT(Connection::STATE_WRITABLE, conn1->write_state(),
kSimulatedRtt * 2, fake_clock_); kSimulatedRtt * 2, fake_clock_);
@ -780,10 +782,21 @@ class TurnPortTest : public ::testing::Test,
Port::ORIGIN_MESSAGE); Port::ORIGIN_MESSAGE);
ASSERT_TRUE(conn1 != NULL); ASSERT_TRUE(conn1 != NULL);
ASSERT_TRUE(conn2 != NULL); ASSERT_TRUE(conn2 != NULL);
conn1->SignalReadPacket.connect(static_cast<TurnPortTest*>(this), conn1->RegisterReceivedPacketCallback(
&TurnPortTest::OnTurnReadPacket); [&](Connection* connection, const rtc::ReceivedPacket& packet) {
conn2->SignalReadPacket.connect(static_cast<TurnPortTest*>(this), turn_packets_.push_back(
&TurnPortTest::OnUdpReadPacket); rtc::Buffer(packet.payload().data(), packet.payload().size()));
});
conn1->SignalDestroyed.connect(this,
&TurnPortTest::OnConnectionSignalDestroyed);
conn2->RegisterReceivedPacketCallback(
[&](Connection* connection, const rtc::ReceivedPacket& packet) {
udp_packets_.push_back(
rtc::Buffer(packet.payload().data(), packet.payload().size()));
});
conn2->SignalDestroyed.connect(this,
&TurnPortTest::OnConnectionSignalDestroyed);
conn1->Ping(0); conn1->Ping(0);
EXPECT_EQ_SIMULATED_WAIT(Connection::STATE_WRITABLE, conn1->write_state(), EXPECT_EQ_SIMULATED_WAIT(Connection::STATE_WRITABLE, conn1->write_state(),
kSimulatedRtt * 2, fake_clock_); kSimulatedRtt * 2, fake_clock_);
@ -1507,10 +1520,15 @@ TEST_F(TurnPortTest, TestChannelBindGetErrorResponse) {
kSimulatedRtt, fake_clock_); kSimulatedRtt, fake_clock_);
// Verify that packets are allowed to be sent after a bind request error. // Verify that packets are allowed to be sent after a bind request error.
// They'll just use a send indication instead. // They'll just use a send indication instead.
conn2->SignalReadPacket.connect(static_cast<TurnPortTest*>(this),
&TurnPortTest::OnUdpReadPacket); conn2->RegisterReceivedPacketCallback(
[&](Connection* connection, const rtc::ReceivedPacket& packet) {
udp_packets_.push_back(
rtc::Buffer(packet.payload().data(), packet.payload().size()));
});
conn1->Send(data.data(), data.length(), options); conn1->Send(data.data(), data.length(), options);
EXPECT_TRUE_SIMULATED_WAIT(!udp_packets_.empty(), kSimulatedRtt, fake_clock_); EXPECT_TRUE_SIMULATED_WAIT(!udp_packets_.empty(), kSimulatedRtt, fake_clock_);
conn2->DeregisterReceivedPacketCallback();
} }
// Do a TURN allocation, establish a UDP connection, and send some data. // Do a TURN allocation, establish a UDP connection, and send some data.

View file

@ -16,3 +16,18 @@ rtc_library("sent_packet") {
deps = [ "../system:rtc_export" ] deps = [ "../system:rtc_export" ]
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
} }
rtc_library("received_packet") {
sources = [
"received_packet.cc",
"received_packet.h",
]
deps = [
"../../api:array_view",
"../../api/units:timestamp",
]
absl_deps = [
"//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/types:optional",
]
}

View file

@ -0,0 +1,23 @@
/*
* Copyright 2023 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "rtc_base/network/received_packet.h"
#include <utility>
#include "absl/types/optional.h"
namespace rtc {
ReceivedPacket::ReceivedPacket(rtc::ArrayView<const uint8_t> payload,
absl::optional<webrtc::Timestamp> arrival_time)
: payload_(payload), arrival_time_(std::move(arrival_time)) {}
} // namespace rtc

View file

@ -0,0 +1,47 @@
/*
* Copyright 2023 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef RTC_BASE_NETWORK_RECEIVED_PACKET_H_
#define RTC_BASE_NETWORK_RECEIVED_PACKET_H_
#include <cstdint>
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "api/units/timestamp.h"
namespace rtc {
// ReceivedPacket repressent a received IP packet.
// It contains a payload and metadata.
// ReceivedPacket itself does not put constraints on what payload contains. For
// example it may contains STUN, SCTP, SRTP, RTP, RTCP.... etc.
class ReceivedPacket {
public:
// Caller must keep memory pointed to by payload valid for the lifetime of
// this ReceivedPacket.
ReceivedPacket(
rtc::ArrayView<const uint8_t> payload,
absl::optional<webrtc::Timestamp> arrival_time = absl::nullopt);
rtc::ArrayView<const uint8_t> payload() const { return payload_; }
// Timestamp when this packet was received. Not available on all socket
// implementations.
absl::optional<webrtc::Timestamp> arrival_time() const {
return arrival_time_;
}
private:
rtc::ArrayView<const uint8_t> payload_;
absl::optional<webrtc::Timestamp> arrival_time_;
};
} // namespace rtc
#endif // RTC_BASE_NETWORK_RECEIVED_PACKET_H_