Reland "Take out listen support from AsyncPacketSocket"

This is a reland of b141c162ee

Original change's description:
> Take out listen support from AsyncPacketSocket
>
> Moved to new interface class AsyncListenSocket.
>
> Bug: webrtc:13065
> Change-Id: Ib96ce154ba19979360ecd8144981d947ff5b8b18
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/232607
> Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
> Reviewed-by: Harald Alvestrand <hta@webrtc.org>
> Commit-Queue: Niels Moller <nisse@webrtc.org>
> Cr-Commit-Position: refs/heads/main@{#35234}

Bug: webrtc:13065
Change-Id: I88bebdd80ebe6bcf6ac635023924d79fbfb76813
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/235960
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35260}
This commit is contained in:
Niels Möller 2021-10-19 10:11:02 +02:00 committed by WebRTC LUCI CQ
parent b62ee8ce94
commit d30ece1804
11 changed files with 178 additions and 121 deletions

View file

@ -49,7 +49,7 @@ AsyncStunTCPSocket* AsyncStunTCPSocket::Create(
}
AsyncStunTCPSocket::AsyncStunTCPSocket(rtc::Socket* socket)
: rtc::AsyncTCPSocketBase(socket, /*listen=*/false, kBufSize) {}
: rtc::AsyncTCPSocketBase(socket, kBufSize) {}
int AsyncStunTCPSocket::Send(const void* pv,
size_t cb,
@ -125,10 +125,6 @@ void AsyncStunTCPSocket::ProcessInput(char* data, size_t* len) {
}
}
void AsyncStunTCPSocket::HandleIncomingConnection(rtc::Socket* socket) {
SignalNewConnection(this, new AsyncStunTCPSocket(socket));
}
size_t AsyncStunTCPSocket::GetExpectedLength(const void* data,
size_t len,
int* pad_bytes) {

View file

@ -36,7 +36,6 @@ class AsyncStunTCPSocket : public rtc::AsyncTCPSocketBase {
size_t cb,
const rtc::PacketOptions& options) override;
void ProcessInput(char* data, size_t* len) override;
void HandleIncomingConnection(rtc::Socket* socket) override;
private:
// This method returns the message hdr + length written in the header.

View file

@ -16,6 +16,7 @@
#include <list>
#include <memory>
#include <string>
#include <utility>
#include "absl/memory/memory.h"
#include "rtc_base/network/sent_packet.h"
@ -59,10 +60,10 @@ static unsigned char kTurnChannelDataMessageWithOddLength[] = {
static const rtc::SocketAddress kClientAddr("11.11.11.11", 0);
static const rtc::SocketAddress kServerAddr("22.22.22.22", 0);
class AsyncStunServerTCPSocket : public rtc::AsyncTCPSocket {
class AsyncStunServerTCPSocket : public rtc::AsyncTcpListenSocket {
public:
explicit AsyncStunServerTCPSocket(rtc::Socket* socket)
: AsyncTCPSocket(socket, true) {}
explicit AsyncStunServerTCPSocket(std::unique_ptr<rtc::Socket> socket)
: AsyncTcpListenSocket(std::move(socket)) {}
void HandleIncomingConnection(rtc::Socket* socket) override {
SignalNewConnection(this, new AsyncStunTCPSocket(socket));
}
@ -77,9 +78,11 @@ class AsyncStunTCPSocketTest : public ::testing::Test,
virtual void SetUp() { CreateSockets(); }
void CreateSockets() {
rtc::Socket* server = vss_->CreateSocket(kServerAddr.family(), SOCK_STREAM);
std::unique_ptr<rtc::Socket> server =
absl::WrapUnique(vss_->CreateSocket(kServerAddr.family(), SOCK_STREAM));
server->Bind(kServerAddr);
listen_socket_ = std::make_unique<AsyncStunServerTCPSocket>(server);
listen_socket_ =
std::make_unique<AsyncStunServerTCPSocket>(std::move(server));
listen_socket_->SignalNewConnection.connect(
this, &AsyncStunTCPSocketTest::OnNewConnection);

View file

@ -14,6 +14,7 @@
#include <string>
#include "absl/memory/memory.h"
#include "api/async_dns_resolver.h"
#include "api/wrapping_async_dns_resolver.h"
#include "p2p/base/async_stun_tcp_socket.h"
@ -89,7 +90,7 @@ AsyncListenSocket* BasicPacketSocketFactory::CreateServerTcpSocket(
RTC_CHECK(!(opts & PacketSocketFactory::OPT_STUN));
return new AsyncTCPSocket(socket, true);
return new AsyncTcpListenSocket(absl::WrapUnique(socket));
}
AsyncPacketSocket* BasicPacketSocketFactory::CreateClientTcpSocket(
@ -183,7 +184,7 @@ AsyncPacketSocket* BasicPacketSocketFactory::CreateClientTcpSocket(
if (tcp_options.opts & PacketSocketFactory::OPT_STUN) {
tcp_socket = new cricket::AsyncStunTCPSocket(socket);
} else {
tcp_socket = new AsyncTCPSocket(socket, false);
tcp_socket = new AsyncTCPSocket(socket);
}
return tcp_socket;

View file

@ -1060,6 +1060,24 @@ class FakeAsyncPacketSocket : public AsyncPacketSocket {
State state_;
};
class FakeAsyncListenSocket : public AsyncListenSocket {
public:
// Returns current local address. Address may be set to NULL if the
// socket is not bound yet (GetState() returns STATE_BINDING).
virtual SocketAddress GetLocalAddress() const { return local_address_; }
void Bind(const SocketAddress& address) {
local_address_ = address;
state_ = State::kBound;
}
virtual int GetOption(Socket::Option opt, int* value) { return 0; }
virtual int SetOption(Socket::Option opt, int value) { return 0; }
virtual State GetState() const { return state_; }
private:
SocketAddress local_address_;
State state_ = State::kClosed;
};
// Local -> XXXX
TEST_F(PortTest, TestLocalToLocal) {
TestLocalToLocal();
@ -1508,8 +1526,8 @@ TEST_F(PortTest, TestDelayedBindingUdp) {
}
TEST_F(PortTest, TestDisableInterfaceOfTcpPort) {
FakeAsyncPacketSocket* lsocket = new FakeAsyncPacketSocket();
FakeAsyncPacketSocket* rsocket = new FakeAsyncPacketSocket();
FakeAsyncListenSocket* lsocket = new FakeAsyncListenSocket();
FakeAsyncListenSocket* rsocket = new FakeAsyncListenSocket();
FakePacketSocketFactory socket_factory;
socket_factory.set_next_server_tcp_socket(lsocket);
@ -1518,10 +1536,8 @@ TEST_F(PortTest, TestDisableInterfaceOfTcpPort) {
socket_factory.set_next_server_tcp_socket(rsocket);
auto rport = CreateTcpPort(kLocalAddr2, &socket_factory);
lsocket->set_state(AsyncPacketSocket::STATE_BOUND);
lsocket->local_address_ = kLocalAddr1;
rsocket->set_state(AsyncPacketSocket::STATE_BOUND);
rsocket->local_address_ = kLocalAddr2;
lsocket->Bind(kLocalAddr1);
rsocket->Bind(kLocalAddr2);
lport->SetIceRole(cricket::ICEROLE_CONTROLLING);
lport->SetIceTiebreaker(kTiebreaker1);
@ -1560,17 +1576,17 @@ void PortTest::TestCrossFamilyPorts(int type) {
SocketAddress("192.168.1.3", 0), SocketAddress("192.168.1.4", 0),
SocketAddress("2001:db8::1", 0), SocketAddress("2001:db8::2", 0)};
for (int i = 0; i < 4; i++) {
FakeAsyncPacketSocket* socket = new FakeAsyncPacketSocket();
if (type == SOCK_DGRAM) {
FakeAsyncPacketSocket* socket = new FakeAsyncPacketSocket();
factory.set_next_udp_socket(socket);
ports[i] = CreateUdpPort(addresses[i], &factory);
socket->set_state(AsyncPacketSocket::STATE_BINDING);
socket->SignalAddressReady(socket, addresses[i]);
} else if (type == SOCK_STREAM) {
FakeAsyncListenSocket* socket = new FakeAsyncListenSocket();
factory.set_next_server_tcp_socket(socket);
ports[i] = CreateTcpPort(addresses[i], &factory);
socket->set_state(AsyncPacketSocket::STATE_BOUND);
socket->local_address_ = addresses[i];
socket->Bind(addresses[i]);
}
ports[i]->PrepareAddress();
}

View file

@ -169,9 +169,7 @@ void TCPPort::PrepareAddress() {
// Socket may be in the CLOSED state if Listen()
// failed, we still want to add the socket address.
RTC_LOG(LS_VERBOSE) << "Preparing TCP address, current state: "
<< listen_socket_->GetState();
if (listen_socket_->GetState() == rtc::AsyncPacketSocket::STATE_BOUND ||
listen_socket_->GetState() == rtc::AsyncPacketSocket::STATE_CLOSED)
<< static_cast<int>(listen_socket_->GetState());
AddAddress(listen_socket_->GetLocalAddress(),
listen_socket_->GetLocalAddress(), rtc::SocketAddress(),
TCP_PROTOCOL_NAME, "", TCPTYPE_PASSIVE_STR, LOCAL_PORT_TYPE,

View file

@ -128,17 +128,31 @@ class RTC_EXPORT AsyncPacketSocket : public sigslot::has_slots<> {
// CONNECTED to CLOSED.
sigslot::signal2<AsyncPacketSocket*, int> SignalClose;
// Used only for listening TCP sockets.
sigslot::signal2<AsyncPacketSocket*, AsyncPacketSocket*> SignalNewConnection;
private:
RTC_DISALLOW_COPY_AND_ASSIGN(AsyncPacketSocket);
};
// TODO(bugs.webrtc.org/13065): Intended to be broken out into a separate class,
// after downstream has adapted the new name. The main feature to move from
// AsyncPacketSocket to AsyncListenSocket is the SignalNewConnection.
using AsyncListenSocket = AsyncPacketSocket;
// Listen socket, producing an AsyncPacketSocket when a peer connects.
class RTC_EXPORT AsyncListenSocket : public sigslot::has_slots<> {
public:
enum class State {
kClosed,
kBound,
};
// Returns current state of the socket.
virtual State GetState() const = 0;
// Returns current local address. Address may be set to null if the
// socket is not bound yet (GetState() returns kBinding).
virtual SocketAddress GetLocalAddress() const = 0;
// Get/set options.
virtual int GetOption(Socket::Option opt, int* value) = 0;
virtual int SetOption(Socket::Option opt, int value) = 0;
sigslot::signal2<AsyncListenSocket*, AsyncPacketSocket*> SignalNewConnection;
};
void CopySocketInformationToPacketInfo(size_t packet_size_bytes,
const AsyncPacketSocket& socket_from,

View file

@ -62,16 +62,11 @@ Socket* AsyncTCPSocketBase::ConnectSocket(
}
AsyncTCPSocketBase::AsyncTCPSocketBase(Socket* socket,
bool listen,
size_t max_packet_size)
: socket_(socket),
listen_(listen),
max_insize_(max_packet_size),
max_outsize_(max_packet_size) {
if (!listen_) {
// Listening sockets don't send/receive data, so they don't need buffers.
inbuf_.EnsureCapacity(kMinimumRecvSize);
}
RTC_DCHECK(socket_.get() != nullptr);
socket_->SignalConnectEvent.connect(this,
@ -79,12 +74,6 @@ AsyncTCPSocketBase::AsyncTCPSocketBase(Socket* socket,
socket_->SignalReadEvent.connect(this, &AsyncTCPSocketBase::OnReadEvent);
socket_->SignalWriteEvent.connect(this, &AsyncTCPSocketBase::OnWriteEvent);
socket_->SignalCloseEvent.connect(this, &AsyncTCPSocketBase::OnCloseEvent);
if (listen_) {
if (socket_->Listen(kListenBacklog) < 0) {
RTC_LOG(LS_ERROR) << "Listen() failed with error " << socket_->GetError();
}
}
}
AsyncTCPSocketBase::~AsyncTCPSocketBase() {}
@ -106,11 +95,7 @@ AsyncTCPSocket::State AsyncTCPSocketBase::GetState() const {
case Socket::CS_CLOSED:
return STATE_CLOSED;
case Socket::CS_CONNECTING:
if (listen_) {
return STATE_BOUND;
} else {
return STATE_CONNECTING;
}
case Socket::CS_CONNECTED:
return STATE_CONNECTED;
default:
@ -149,7 +134,6 @@ int AsyncTCPSocketBase::SendTo(const void* pv,
}
int AsyncTCPSocketBase::FlushOutBuffer() {
RTC_DCHECK(!listen_);
RTC_DCHECK_GT(outbuf_.size(), 0);
rtc::ArrayView<uint8_t> view = outbuf_;
int res;
@ -189,7 +173,6 @@ int AsyncTCPSocketBase::FlushOutBuffer() {
void AsyncTCPSocketBase::AppendToOutBuffer(const void* pv, size_t cb) {
RTC_DCHECK(outbuf_.size() + cb <= max_outsize_);
RTC_DCHECK(!listen_);
outbuf_.AppendData(static_cast<const uint8_t*>(pv), cb);
}
@ -200,22 +183,6 @@ void AsyncTCPSocketBase::OnConnectEvent(Socket* socket) {
void AsyncTCPSocketBase::OnReadEvent(Socket* socket) {
RTC_DCHECK(socket_.get() == socket);
if (listen_) {
rtc::SocketAddress address;
rtc::Socket* new_socket = socket->Accept(&address);
if (!new_socket) {
// TODO(stefan): Do something better like forwarding the error
// to the user.
RTC_LOG(LS_ERROR) << "TCP accept failed with error "
<< socket_->GetError();
return;
}
HandleIncomingConnection(new_socket);
// Prime a read event in case data is waiting.
new_socket->SignalReadEvent(new_socket);
} else {
size_t total_recv = 0;
while (true) {
size_t free_size = inbuf_.capacity() - inbuf_.size();
@ -224,8 +191,7 @@ void AsyncTCPSocketBase::OnReadEvent(Socket* socket) {
free_size = inbuf_.capacity() - inbuf_.size();
}
int len =
socket_->Recv(inbuf_.data() + inbuf_.size(), free_size, nullptr);
int len = socket_->Recv(inbuf_.data() + inbuf_.size(), free_size, nullptr);
if (len < 0) {
// TODO(stefan): Do something better like forwarding the error to the
// user.
@ -256,7 +222,6 @@ void AsyncTCPSocketBase::OnReadEvent(Socket* socket) {
} else {
inbuf_.SetSize(size);
}
}
}
void AsyncTCPSocketBase::OnWriteEvent(Socket* socket) {
@ -283,12 +248,11 @@ AsyncTCPSocket* AsyncTCPSocket::Create(Socket* socket,
const SocketAddress& bind_address,
const SocketAddress& remote_address) {
return new AsyncTCPSocket(
AsyncTCPSocketBase::ConnectSocket(socket, bind_address, remote_address),
false);
AsyncTCPSocketBase::ConnectSocket(socket, bind_address, remote_address));
}
AsyncTCPSocket::AsyncTCPSocket(Socket* socket, bool listen)
: AsyncTCPSocketBase(socket, listen, kBufSize) {}
AsyncTCPSocket::AsyncTCPSocket(Socket* socket)
: AsyncTCPSocketBase(socket, kBufSize) {}
int AsyncTCPSocket::Send(const void* pv,
size_t cb,
@ -343,8 +307,59 @@ void AsyncTCPSocket::ProcessInput(char* data, size_t* len) {
}
}
void AsyncTCPSocket::HandleIncomingConnection(Socket* socket) {
SignalNewConnection(this, new AsyncTCPSocket(socket, false));
AsyncTcpListenSocket::AsyncTcpListenSocket(std::unique_ptr<Socket> socket)
: socket_(std::move(socket)) {
RTC_DCHECK(socket_.get() != nullptr);
socket_->SignalReadEvent.connect(this, &AsyncTcpListenSocket::OnReadEvent);
if (socket_->Listen(kListenBacklog) < 0) {
RTC_LOG(LS_ERROR) << "Listen() failed with error " << socket_->GetError();
}
}
AsyncTcpListenSocket::State AsyncTcpListenSocket::GetState() const {
switch (socket_->GetState()) {
case Socket::CS_CLOSED:
return State::kClosed;
case Socket::CS_CONNECTING:
return State::kBound;
default:
RTC_NOTREACHED();
return State::kClosed;
}
}
SocketAddress AsyncTcpListenSocket::GetLocalAddress() const {
return socket_->GetLocalAddress();
}
int AsyncTcpListenSocket::GetOption(Socket::Option opt, int* value) {
return socket_->GetOption(opt, value);
}
int AsyncTcpListenSocket::SetOption(Socket::Option opt, int value) {
return socket_->SetOption(opt, value);
}
void AsyncTcpListenSocket::OnReadEvent(Socket* socket) {
RTC_DCHECK(socket_.get() == socket);
rtc::SocketAddress address;
rtc::Socket* new_socket = socket->Accept(&address);
if (!new_socket) {
// TODO(stefan): Do something better like forwarding the error
// to the user.
RTC_LOG(LS_ERROR) << "TCP accept failed with error " << socket_->GetError();
return;
}
HandleIncomingConnection(new_socket);
// Prime a read event in case data is waiting.
new_socket->SignalReadEvent(new_socket);
}
void AsyncTcpListenSocket::HandleIncomingConnection(Socket* socket) {
SignalNewConnection(this, new AsyncTCPSocket(socket));
}
} // namespace rtc

View file

@ -28,7 +28,7 @@ namespace rtc {
// buffer them in user space.
class AsyncTCPSocketBase : public AsyncPacketSocket {
public:
AsyncTCPSocketBase(Socket* socket, bool listen, size_t max_packet_size);
AsyncTCPSocketBase(Socket* socket, size_t max_packet_size);
~AsyncTCPSocketBase() override;
// Pure virtual methods to send and recv data.
@ -36,8 +36,6 @@ class AsyncTCPSocketBase : public AsyncPacketSocket {
size_t cb,
const rtc::PacketOptions& options) override = 0;
virtual void ProcessInput(char* data, size_t* len) = 0;
// Signals incoming connection.
virtual void HandleIncomingConnection(Socket* socket) = 0;
SocketAddress GetLocalAddress() const override;
SocketAddress GetRemoteAddress() const override;
@ -76,7 +74,6 @@ class AsyncTCPSocketBase : public AsyncPacketSocket {
void OnCloseEvent(Socket* socket, int error);
std::unique_ptr<Socket> socket_;
bool listen_;
Buffer inbuf_;
Buffer outbuf_;
size_t max_insize_;
@ -93,19 +90,37 @@ class AsyncTCPSocket : public AsyncTCPSocketBase {
static AsyncTCPSocket* Create(Socket* socket,
const SocketAddress& bind_address,
const SocketAddress& remote_address);
AsyncTCPSocket(Socket* socket, bool listen);
explicit AsyncTCPSocket(Socket* socket);
~AsyncTCPSocket() override {}
int Send(const void* pv,
size_t cb,
const rtc::PacketOptions& options) override;
void ProcessInput(char* data, size_t* len) override;
void HandleIncomingConnection(Socket* socket) override;
private:
RTC_DISALLOW_COPY_AND_ASSIGN(AsyncTCPSocket);
};
class AsyncTcpListenSocket : public AsyncListenSocket {
public:
explicit AsyncTcpListenSocket(std::unique_ptr<Socket> socket);
State GetState() const override;
SocketAddress GetLocalAddress() const override;
int GetOption(Socket::Option opt, int* value) override;
int SetOption(Socket::Option opt, int value) override;
virtual void HandleIncomingConnection(rtc::Socket* socket);
private:
// Called by the underlying socket
void OnReadEvent(Socket* socket);
std::unique_ptr<Socket> socket_;
};
} // namespace rtc
#endif // RTC_BASE_ASYNC_TCP_SOCKET_H_

View file

@ -56,7 +56,7 @@ TestClient* CreateTestClient(SocketFactory* factory,
}
TestClient* CreateTCPTestClient(Socket* socket) {
return new TestClient(std::make_unique<AsyncTCPSocket>(socket, false));
return new TestClient(std::make_unique<AsyncTCPSocket>(socket));
}
// Tests that when sending from internal_addr to external_addrs through the

View file

@ -41,7 +41,7 @@ class TestEchoServer : public sigslot::has_slots<> {
void OnAccept(Socket* socket) {
Socket* raw_socket = socket->Accept(nullptr);
if (raw_socket) {
AsyncTCPSocket* packet_socket = new AsyncTCPSocket(raw_socket, false);
AsyncTCPSocket* packet_socket = new AsyncTCPSocket(raw_socket);
packet_socket->SignalReadPacket.connect(this, &TestEchoServer::OnPacket);
packet_socket->SignalClose.connect(this, &TestEchoServer::OnClose);
client_sockets_.push_back(packet_socket);