[CallbackList] Use CallbackList in AsyncPacketSocket for close events.

This removes use of the SignalClose sigslot. This CL includes thread
checks for the callback list and updates some call sites to unsubscribe
from events before deletion or detaching from a socket instance.

Bug: webrtc:11943
Change-Id: Ib66d39aa5cc795b750c9e3eaa85ed6af8b55b2b5
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/258561
Reviewed-by: Niels Moller <nisse@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#36540}
This commit is contained in:
Tomas Gunnarsson 2022-04-13 09:03:52 +00:00 committed by WebRTC LUCI CQ
parent a19f0c7409
commit f15189dbce
10 changed files with 73 additions and 14 deletions

View file

@ -672,8 +672,8 @@ class PortTest : public ::testing::Test, public sigslot::has_slots<> {
// Ensure redundant SignalClose events on TcpConnection won't break tcp // Ensure redundant SignalClose events on TcpConnection won't break tcp
// reconnection. Chromium will fire SignalClose for all outstanding IPC // reconnection. Chromium will fire SignalClose for all outstanding IPC
// packets during reconnection. // packets during reconnection.
tcp_conn1->socket()->SignalClose(tcp_conn1->socket(), 0); tcp_conn1->socket()->NotifyClosedForTest(0);
tcp_conn2->socket()->SignalClose(tcp_conn2->socket(), 0); tcp_conn2->socket()->NotifyClosedForTest(0);
// Speed up destroying ch2's connection such that the test is ready to // Speed up destroying ch2's connection such that the test is ready to
// accept a new connection from ch1 before ch1's connection destroys itself. // accept a new connection from ch1 before ch1's connection destroys itself.
@ -1625,7 +1625,7 @@ TEST_F(PortTest, TestDisableInterfaceOfTcpPort) {
lconn->Ping(0); lconn->Ping(0);
// Now disconnect the client socket... // Now disconnect the client socket...
socket->SignalClose(socket, 1); socket->NotifyClosedForTest(1);
// And prevent new sockets from being created. // And prevent new sockets from being created.
socket_factory.set_next_client_tcp_socket(nullptr); socket_factory.set_next_client_tcp_socket(nullptr);

View file

@ -519,6 +519,7 @@ void TCPConnection::OnClose(rtc::AsyncPacketSocket* socket, int error) {
// initial connect() (i.e. `pretending_to_be_writable_` is false) . We have // initial connect() (i.e. `pretending_to_be_writable_` is false) . We have
// to manually destroy here as this connection, as never connected, will not // to manually destroy here as this connection, as never connected, will not
// be scheduled for ping to trigger destroy. // be scheduled for ping to trigger destroy.
socket_->UnsubscribeClose(this);
Destroy(); Destroy();
} }
} }
@ -557,6 +558,11 @@ void TCPConnection::CreateOutgoingTcpSocket() {
int opts = (remote_candidate().protocol() == SSLTCP_PROTOCOL_NAME) int opts = (remote_candidate().protocol() == SSLTCP_PROTOCOL_NAME)
? rtc::PacketSocketFactory::OPT_TLS_FAKE ? rtc::PacketSocketFactory::OPT_TLS_FAKE
: 0; : 0;
if (socket_) {
socket_->UnsubscribeClose(this);
}
rtc::PacketSocketTcpOptions tcp_opts; rtc::PacketSocketTcpOptions tcp_opts;
tcp_opts.opts = opts; tcp_opts.opts = opts;
socket_.reset(port()->socket_factory()->CreateClientTcpSocket( socket_.reset(port()->socket_factory()->CreateClientTcpSocket(
@ -590,7 +596,8 @@ void TCPConnection::ConnectSocketSignals(rtc::AsyncPacketSocket* socket) {
} }
socket->SignalReadPacket.connect(this, &TCPConnection::OnReadPacket); socket->SignalReadPacket.connect(this, &TCPConnection::OnReadPacket);
socket->SignalReadyToSend.connect(this, &TCPConnection::OnReadyToSend); socket->SignalReadyToSend.connect(this, &TCPConnection::OnReadyToSend);
socket->SignalClose.connect(this, &TCPConnection::OnClose); socket->SubscribeClose(
this, [this](rtc::AsyncPacketSocket* s, int err) { OnClose(s, err); });
} }
} // namespace cricket } // namespace cricket

View file

@ -306,6 +306,10 @@ TurnPort::~TurnPort() {
while (!entries_.empty()) { while (!entries_.empty()) {
DestroyEntry(entries_.front()); DestroyEntry(entries_.front());
} }
if (socket_)
socket_->UnsubscribeClose(this);
if (!SharedSocket()) { if (!SharedSocket()) {
delete socket_; delete socket_;
} }
@ -451,7 +455,9 @@ bool TurnPort::CreateTurnClientSocket() {
if (server_address_.proto == PROTO_TCP || if (server_address_.proto == PROTO_TCP ||
server_address_.proto == PROTO_TLS) { server_address_.proto == PROTO_TLS) {
socket_->SignalConnect.connect(this, &TurnPort::OnSocketConnect); socket_->SignalConnect.connect(this, &TurnPort::OnSocketConnect);
socket_->SignalClose.connect(this, &TurnPort::OnSocketClose); socket_->SubscribeClose(this, [this](rtc::AsyncPacketSocket* s, int err) {
OnSocketClose(s, err);
});
} else { } else {
state_ = STATE_CONNECTED; state_ = STATE_CONNECTED;
} }
@ -542,6 +548,9 @@ void TurnPort::OnAllocateMismatch() {
<< ": Allocating a new socket after " << ": Allocating a new socket after "
"STUN_ERROR_ALLOCATION_MISMATCH, retry: " "STUN_ERROR_ALLOCATION_MISMATCH, retry: "
<< allocate_mismatch_retries_ + 1; << allocate_mismatch_retries_ + 1;
socket_->UnsubscribeClose(this);
if (SharedSocket()) { if (SharedSocket()) {
ResetSharedSocket(); ResetSharedSocket();
} else { } else {

View file

@ -1305,7 +1305,7 @@ TEST_F(TurnPortTest, TestSocketCloseWillDestroyConnection) {
Port::ORIGIN_MESSAGE); Port::ORIGIN_MESSAGE);
EXPECT_NE(nullptr, conn); EXPECT_NE(nullptr, conn);
EXPECT_TRUE(!turn_port_->connections().empty()); EXPECT_TRUE(!turn_port_->connections().empty());
turn_port_->socket()->SignalClose(turn_port_->socket(), 1); turn_port_->socket()->NotifyClosedForTest(1);
EXPECT_TRUE_SIMULATED_WAIT(turn_port_->connections().empty(), EXPECT_TRUE_SIMULATED_WAIT(turn_port_->connections().empty(),
kConnectionDestructionDelay, fake_clock_); kConnectionDestructionDelay, fake_clock_);
} }

View file

@ -195,7 +195,10 @@ void TurnServer::AcceptConnection(rtc::Socket* server_socket) {
cricket::AsyncStunTCPSocket* tcp_socket = cricket::AsyncStunTCPSocket* tcp_socket =
new cricket::AsyncStunTCPSocket(accepted_socket); new cricket::AsyncStunTCPSocket(accepted_socket);
tcp_socket->SignalClose.connect(this, &TurnServer::OnInternalSocketClose); tcp_socket->SubscribeClose(this,
[this](rtc::AsyncPacketSocket* s, int err) {
OnInternalSocketClose(s, err);
});
// Finally add the socket so it can start communicating with the client. // Finally add the socket so it can start communicating with the client.
AddInternalSocket(tcp_socket, info.proto); AddInternalSocket(tcp_socket, info.proto);
} }
@ -564,6 +567,7 @@ void TurnServer::DestroyInternalSocket(rtc::AsyncPacketSocket* socket) {
InternalSocketMap::iterator iter = server_sockets_.find(socket); InternalSocketMap::iterator iter = server_sockets_.find(socket);
if (iter != server_sockets_.end()) { if (iter != server_sockets_.end()) {
rtc::AsyncPacketSocket* socket = iter->first; rtc::AsyncPacketSocket* socket = iter->first;
socket->UnsubscribeClose(this);
socket->SignalReadPacket.disconnect(this); socket->SignalReadPacket.disconnect(this);
server_sockets_.erase(iter); server_sockets_.erase(iter);
std::unique_ptr<rtc::AsyncPacketSocket> socket_to_delete = std::unique_ptr<rtc::AsyncPacketSocket> socket_to_delete =

View file

@ -935,6 +935,7 @@ rtc_library("rtc_base") {
deps = [ deps = [
":async_resolver_interface", ":async_resolver_interface",
":async_socket", ":async_socket",
":callback_list",
":checks", ":checks",
":ip_address", ":ip_address",
":logging", ":logging",

View file

@ -24,10 +24,24 @@ PacketOptions::PacketOptions(DiffServCodePoint dscp) : dscp(dscp) {}
PacketOptions::PacketOptions(const PacketOptions& other) = default; PacketOptions::PacketOptions(const PacketOptions& other) = default;
PacketOptions::~PacketOptions() = default; PacketOptions::~PacketOptions() = default;
AsyncPacketSocket::AsyncPacketSocket() = default; AsyncPacketSocket::AsyncPacketSocket() {
network_checker_.Detach();
}
AsyncPacketSocket::~AsyncPacketSocket() = default; AsyncPacketSocket::~AsyncPacketSocket() = default;
void AsyncPacketSocket::SubscribeClose(
const void* removal_tag,
std::function<void(AsyncPacketSocket*, int)> callback) {
RTC_DCHECK_RUN_ON(&network_checker_);
on_close_.AddReceiver(removal_tag, std::move(callback));
}
void AsyncPacketSocket::UnsubscribeClose(const void* removal_tag) {
RTC_DCHECK_RUN_ON(&network_checker_);
on_close_.RemoveReceivers(removal_tag);
}
void CopySocketInformationToPacketInfo(size_t packet_size_bytes, void CopySocketInformationToPacketInfo(size_t packet_size_bytes,
const AsyncPacketSocket& socket_from, const AsyncPacketSocket& socket_from,
bool is_connectionless, bool is_connectionless,

View file

@ -13,9 +13,12 @@
#include <vector> #include <vector>
#include "api/sequence_checker.h"
#include "rtc_base/callback_list.h"
#include "rtc_base/dscp.h" #include "rtc_base/dscp.h"
#include "rtc_base/network/sent_packet.h" #include "rtc_base/network/sent_packet.h"
#include "rtc_base/socket.h" #include "rtc_base/socket.h"
#include "rtc_base/system/no_unique_address.h"
#include "rtc_base/system/rtc_export.h" #include "rtc_base/system/rtc_export.h"
#include "rtc_base/third_party/sigslot/sigslot.h" #include "rtc_base/third_party/sigslot/sigslot.h"
#include "rtc_base/time_utils.h" #include "rtc_base/time_utils.h"
@ -100,6 +103,11 @@ class RTC_EXPORT AsyncPacketSocket : public sigslot::has_slots<> {
virtual int GetError() const = 0; virtual int GetError() const = 0;
virtual void SetError(int error) = 0; virtual void SetError(int error) = 0;
// Register a callback to be called when the socket is closed.
void SubscribeClose(const void* removal_tag,
std::function<void(AsyncPacketSocket*, int)> callback);
void UnsubscribeClose(const void* removal_tag);
// Emitted each time a packet is read. Used only for UDP and // Emitted each time a packet is read. Used only for UDP and
// connected TCP sockets. // connected TCP sockets.
sigslot::signal5<AsyncPacketSocket*, sigslot::signal5<AsyncPacketSocket*,
@ -126,9 +134,25 @@ class RTC_EXPORT AsyncPacketSocket : public sigslot::has_slots<> {
// CONNECTING to CONNECTED. // CONNECTING to CONNECTED.
sigslot::signal1<AsyncPacketSocket*> SignalConnect; sigslot::signal1<AsyncPacketSocket*> SignalConnect;
// Emitted for client TCP sockets when state is changed from void NotifyClosedForTest(int err) { NotifyClosed(err); }
// CONNECTED to CLOSED.
sigslot::signal2<AsyncPacketSocket*, int> SignalClose; protected:
// TODO(bugs.webrtc.org/11943): Remove after updating downstream code.
void SignalClose(AsyncPacketSocket* s, int err) {
RTC_DCHECK_EQ(s, this);
NotifyClosed(err);
}
void NotifyClosed(int err) {
RTC_DCHECK_RUN_ON(&network_checker_);
on_close_.Send(this, err);
}
RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker network_checker_;
private:
webrtc::CallbackList<AsyncPacketSocket*, int> on_close_
RTC_GUARDED_BY(&network_checker_);
}; };
// Listen socket, producing an AsyncPacketSocket when a peer connects. // Listen socket, producing an AsyncPacketSocket when a peer connects.

View file

@ -68,7 +68,6 @@ AsyncTCPSocketBase::AsyncTCPSocketBase(Socket* socket,
max_outsize_(max_packet_size) { max_outsize_(max_packet_size) {
inbuf_.EnsureCapacity(kMinimumRecvSize); inbuf_.EnsureCapacity(kMinimumRecvSize);
RTC_DCHECK(socket_.get() != nullptr);
socket_->SignalConnectEvent.connect(this, socket_->SignalConnectEvent.connect(this,
&AsyncTCPSocketBase::OnConnectEvent); &AsyncTCPSocketBase::OnConnectEvent);
socket_->SignalReadEvent.connect(this, &AsyncTCPSocketBase::OnReadEvent); socket_->SignalReadEvent.connect(this, &AsyncTCPSocketBase::OnReadEvent);
@ -237,7 +236,7 @@ void AsyncTCPSocketBase::OnWriteEvent(Socket* socket) {
} }
void AsyncTCPSocketBase::OnCloseEvent(Socket* socket, int error) { void AsyncTCPSocketBase::OnCloseEvent(Socket* socket, int error) {
SignalClose(this, error); NotifyClosed(error);
} }
// AsyncTCPSocket // AsyncTCPSocket

View file

@ -45,7 +45,8 @@ class TestEchoServer : public sigslot::has_slots<> {
if (raw_socket) { if (raw_socket) {
AsyncTCPSocket* packet_socket = new AsyncTCPSocket(raw_socket); AsyncTCPSocket* packet_socket = new AsyncTCPSocket(raw_socket);
packet_socket->SignalReadPacket.connect(this, &TestEchoServer::OnPacket); packet_socket->SignalReadPacket.connect(this, &TestEchoServer::OnPacket);
packet_socket->SignalClose.connect(this, &TestEchoServer::OnClose); packet_socket->SubscribeClose(
this, [this](AsyncPacketSocket* s, int err) { OnClose(s, err); });
client_sockets_.push_back(packet_socket); client_sockets_.push_back(packet_socket);
} }
} }