mirror of
https://github.com/mollyim/webrtc.git
synced 2025-05-13 05:40:42 +01:00
Revert "Take out listen support from AsyncPacketSocket"
This reverts commit b141c162ee
.
Reason for revert: Breaking WebRTC rolls. See https://ci.chromium.org/ui/b/8832847811929676465 for an example failed build.
Original change's description:
> Take out listen support from AsyncPacketSocket
>
> Moved to new interface class AsyncListenSocket.
>
> Bug: webrtc:13065
> Change-Id: Ib96ce154ba19979360ecd8144981d947ff5b8b18
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/232607
> Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
> Reviewed-by: Harald Alvestrand <hta@webrtc.org>
> Commit-Queue: Niels Moller <nisse@webrtc.org>
> Cr-Commit-Position: refs/heads/main@{#35234}
# Not skipping CQ checks because original CL landed > 1 day ago.
Bug: webrtc:13065
Change-Id: Id5d5b35cb21704ca4e3006caf1636906df062609
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/235824
Reviewed-by: Evan Shrubsole <eshr@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Reviewed-by: Tommi <tommi@webrtc.org>
Commit-Queue: Evan Shrubsole <eshr@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35249}
This commit is contained in:
parent
b4d4ae2c23
commit
1f30c2ba9b
11 changed files with 121 additions and 178 deletions
|
@ -49,7 +49,7 @@ AsyncStunTCPSocket* AsyncStunTCPSocket::Create(
|
||||||
}
|
}
|
||||||
|
|
||||||
AsyncStunTCPSocket::AsyncStunTCPSocket(rtc::Socket* socket)
|
AsyncStunTCPSocket::AsyncStunTCPSocket(rtc::Socket* socket)
|
||||||
: rtc::AsyncTCPSocketBase(socket, kBufSize) {}
|
: rtc::AsyncTCPSocketBase(socket, /*listen=*/false, kBufSize) {}
|
||||||
|
|
||||||
int AsyncStunTCPSocket::Send(const void* pv,
|
int AsyncStunTCPSocket::Send(const void* pv,
|
||||||
size_t cb,
|
size_t cb,
|
||||||
|
@ -125,6 +125,10 @@ void AsyncStunTCPSocket::ProcessInput(char* data, size_t* len) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void AsyncStunTCPSocket::HandleIncomingConnection(rtc::Socket* socket) {
|
||||||
|
SignalNewConnection(this, new AsyncStunTCPSocket(socket));
|
||||||
|
}
|
||||||
|
|
||||||
size_t AsyncStunTCPSocket::GetExpectedLength(const void* data,
|
size_t AsyncStunTCPSocket::GetExpectedLength(const void* data,
|
||||||
size_t len,
|
size_t len,
|
||||||
int* pad_bytes) {
|
int* pad_bytes) {
|
||||||
|
|
|
@ -36,6 +36,7 @@ class AsyncStunTCPSocket : public rtc::AsyncTCPSocketBase {
|
||||||
size_t cb,
|
size_t cb,
|
||||||
const rtc::PacketOptions& options) override;
|
const rtc::PacketOptions& options) override;
|
||||||
void ProcessInput(char* data, size_t* len) override;
|
void ProcessInput(char* data, size_t* len) override;
|
||||||
|
void HandleIncomingConnection(rtc::Socket* socket) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// This method returns the message hdr + length written in the header.
|
// This method returns the message hdr + length written in the header.
|
||||||
|
|
|
@ -16,7 +16,6 @@
|
||||||
#include <list>
|
#include <list>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <utility>
|
|
||||||
|
|
||||||
#include "absl/memory/memory.h"
|
#include "absl/memory/memory.h"
|
||||||
#include "rtc_base/network/sent_packet.h"
|
#include "rtc_base/network/sent_packet.h"
|
||||||
|
@ -60,10 +59,10 @@ static unsigned char kTurnChannelDataMessageWithOddLength[] = {
|
||||||
static const rtc::SocketAddress kClientAddr("11.11.11.11", 0);
|
static const rtc::SocketAddress kClientAddr("11.11.11.11", 0);
|
||||||
static const rtc::SocketAddress kServerAddr("22.22.22.22", 0);
|
static const rtc::SocketAddress kServerAddr("22.22.22.22", 0);
|
||||||
|
|
||||||
class AsyncStunServerTCPSocket : public rtc::AsyncTcpListenSocket {
|
class AsyncStunServerTCPSocket : public rtc::AsyncTCPSocket {
|
||||||
public:
|
public:
|
||||||
explicit AsyncStunServerTCPSocket(std::unique_ptr<rtc::Socket> socket)
|
explicit AsyncStunServerTCPSocket(rtc::Socket* socket)
|
||||||
: AsyncTcpListenSocket(std::move(socket)) {}
|
: AsyncTCPSocket(socket, true) {}
|
||||||
void HandleIncomingConnection(rtc::Socket* socket) override {
|
void HandleIncomingConnection(rtc::Socket* socket) override {
|
||||||
SignalNewConnection(this, new AsyncStunTCPSocket(socket));
|
SignalNewConnection(this, new AsyncStunTCPSocket(socket));
|
||||||
}
|
}
|
||||||
|
@ -78,11 +77,9 @@ class AsyncStunTCPSocketTest : public ::testing::Test,
|
||||||
virtual void SetUp() { CreateSockets(); }
|
virtual void SetUp() { CreateSockets(); }
|
||||||
|
|
||||||
void CreateSockets() {
|
void CreateSockets() {
|
||||||
std::unique_ptr<rtc::Socket> server =
|
rtc::Socket* server = vss_->CreateSocket(kServerAddr.family(), SOCK_STREAM);
|
||||||
absl::WrapUnique(vss_->CreateSocket(kServerAddr.family(), SOCK_STREAM));
|
|
||||||
server->Bind(kServerAddr);
|
server->Bind(kServerAddr);
|
||||||
listen_socket_ =
|
listen_socket_ = std::make_unique<AsyncStunServerTCPSocket>(server);
|
||||||
std::make_unique<AsyncStunServerTCPSocket>(std::move(server));
|
|
||||||
listen_socket_->SignalNewConnection.connect(
|
listen_socket_->SignalNewConnection.connect(
|
||||||
this, &AsyncStunTCPSocketTest::OnNewConnection);
|
this, &AsyncStunTCPSocketTest::OnNewConnection);
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,6 @@
|
||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
#include "absl/memory/memory.h"
|
|
||||||
#include "api/async_dns_resolver.h"
|
#include "api/async_dns_resolver.h"
|
||||||
#include "api/wrapping_async_dns_resolver.h"
|
#include "api/wrapping_async_dns_resolver.h"
|
||||||
#include "p2p/base/async_stun_tcp_socket.h"
|
#include "p2p/base/async_stun_tcp_socket.h"
|
||||||
|
@ -90,7 +89,7 @@ AsyncListenSocket* BasicPacketSocketFactory::CreateServerTcpSocket(
|
||||||
|
|
||||||
RTC_CHECK(!(opts & PacketSocketFactory::OPT_STUN));
|
RTC_CHECK(!(opts & PacketSocketFactory::OPT_STUN));
|
||||||
|
|
||||||
return new AsyncTcpListenSocket(absl::WrapUnique(socket));
|
return new AsyncTCPSocket(socket, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
AsyncPacketSocket* BasicPacketSocketFactory::CreateClientTcpSocket(
|
AsyncPacketSocket* BasicPacketSocketFactory::CreateClientTcpSocket(
|
||||||
|
@ -184,7 +183,7 @@ AsyncPacketSocket* BasicPacketSocketFactory::CreateClientTcpSocket(
|
||||||
if (tcp_options.opts & PacketSocketFactory::OPT_STUN) {
|
if (tcp_options.opts & PacketSocketFactory::OPT_STUN) {
|
||||||
tcp_socket = new cricket::AsyncStunTCPSocket(socket);
|
tcp_socket = new cricket::AsyncStunTCPSocket(socket);
|
||||||
} else {
|
} else {
|
||||||
tcp_socket = new AsyncTCPSocket(socket);
|
tcp_socket = new AsyncTCPSocket(socket, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
return tcp_socket;
|
return tcp_socket;
|
||||||
|
|
|
@ -1060,24 +1060,6 @@ class FakeAsyncPacketSocket : public AsyncPacketSocket {
|
||||||
State state_;
|
State state_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class FakeAsyncListenSocket : public AsyncListenSocket {
|
|
||||||
public:
|
|
||||||
// Returns current local address. Address may be set to NULL if the
|
|
||||||
// socket is not bound yet (GetState() returns STATE_BINDING).
|
|
||||||
virtual SocketAddress GetLocalAddress() const { return local_address_; }
|
|
||||||
void Bind(const SocketAddress& address) {
|
|
||||||
local_address_ = address;
|
|
||||||
state_ = State::kBound;
|
|
||||||
}
|
|
||||||
virtual int GetOption(Socket::Option opt, int* value) { return 0; }
|
|
||||||
virtual int SetOption(Socket::Option opt, int value) { return 0; }
|
|
||||||
virtual State GetState() const { return state_; }
|
|
||||||
|
|
||||||
private:
|
|
||||||
SocketAddress local_address_;
|
|
||||||
State state_ = State::kClosed;
|
|
||||||
};
|
|
||||||
|
|
||||||
// Local -> XXXX
|
// Local -> XXXX
|
||||||
TEST_F(PortTest, TestLocalToLocal) {
|
TEST_F(PortTest, TestLocalToLocal) {
|
||||||
TestLocalToLocal();
|
TestLocalToLocal();
|
||||||
|
@ -1526,8 +1508,8 @@ TEST_F(PortTest, TestDelayedBindingUdp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(PortTest, TestDisableInterfaceOfTcpPort) {
|
TEST_F(PortTest, TestDisableInterfaceOfTcpPort) {
|
||||||
FakeAsyncListenSocket* lsocket = new FakeAsyncListenSocket();
|
FakeAsyncPacketSocket* lsocket = new FakeAsyncPacketSocket();
|
||||||
FakeAsyncListenSocket* rsocket = new FakeAsyncListenSocket();
|
FakeAsyncPacketSocket* rsocket = new FakeAsyncPacketSocket();
|
||||||
FakePacketSocketFactory socket_factory;
|
FakePacketSocketFactory socket_factory;
|
||||||
|
|
||||||
socket_factory.set_next_server_tcp_socket(lsocket);
|
socket_factory.set_next_server_tcp_socket(lsocket);
|
||||||
|
@ -1536,8 +1518,10 @@ TEST_F(PortTest, TestDisableInterfaceOfTcpPort) {
|
||||||
socket_factory.set_next_server_tcp_socket(rsocket);
|
socket_factory.set_next_server_tcp_socket(rsocket);
|
||||||
auto rport = CreateTcpPort(kLocalAddr2, &socket_factory);
|
auto rport = CreateTcpPort(kLocalAddr2, &socket_factory);
|
||||||
|
|
||||||
lsocket->Bind(kLocalAddr1);
|
lsocket->set_state(AsyncPacketSocket::STATE_BOUND);
|
||||||
rsocket->Bind(kLocalAddr2);
|
lsocket->local_address_ = kLocalAddr1;
|
||||||
|
rsocket->set_state(AsyncPacketSocket::STATE_BOUND);
|
||||||
|
rsocket->local_address_ = kLocalAddr2;
|
||||||
|
|
||||||
lport->SetIceRole(cricket::ICEROLE_CONTROLLING);
|
lport->SetIceRole(cricket::ICEROLE_CONTROLLING);
|
||||||
lport->SetIceTiebreaker(kTiebreaker1);
|
lport->SetIceTiebreaker(kTiebreaker1);
|
||||||
|
@ -1576,17 +1560,17 @@ void PortTest::TestCrossFamilyPorts(int type) {
|
||||||
SocketAddress("192.168.1.3", 0), SocketAddress("192.168.1.4", 0),
|
SocketAddress("192.168.1.3", 0), SocketAddress("192.168.1.4", 0),
|
||||||
SocketAddress("2001:db8::1", 0), SocketAddress("2001:db8::2", 0)};
|
SocketAddress("2001:db8::1", 0), SocketAddress("2001:db8::2", 0)};
|
||||||
for (int i = 0; i < 4; i++) {
|
for (int i = 0; i < 4; i++) {
|
||||||
|
FakeAsyncPacketSocket* socket = new FakeAsyncPacketSocket();
|
||||||
if (type == SOCK_DGRAM) {
|
if (type == SOCK_DGRAM) {
|
||||||
FakeAsyncPacketSocket* socket = new FakeAsyncPacketSocket();
|
|
||||||
factory.set_next_udp_socket(socket);
|
factory.set_next_udp_socket(socket);
|
||||||
ports[i] = CreateUdpPort(addresses[i], &factory);
|
ports[i] = CreateUdpPort(addresses[i], &factory);
|
||||||
socket->set_state(AsyncPacketSocket::STATE_BINDING);
|
socket->set_state(AsyncPacketSocket::STATE_BINDING);
|
||||||
socket->SignalAddressReady(socket, addresses[i]);
|
socket->SignalAddressReady(socket, addresses[i]);
|
||||||
} else if (type == SOCK_STREAM) {
|
} else if (type == SOCK_STREAM) {
|
||||||
FakeAsyncListenSocket* socket = new FakeAsyncListenSocket();
|
|
||||||
factory.set_next_server_tcp_socket(socket);
|
factory.set_next_server_tcp_socket(socket);
|
||||||
ports[i] = CreateTcpPort(addresses[i], &factory);
|
ports[i] = CreateTcpPort(addresses[i], &factory);
|
||||||
socket->Bind(addresses[i]);
|
socket->set_state(AsyncPacketSocket::STATE_BOUND);
|
||||||
|
socket->local_address_ = addresses[i];
|
||||||
}
|
}
|
||||||
ports[i]->PrepareAddress();
|
ports[i]->PrepareAddress();
|
||||||
}
|
}
|
||||||
|
|
|
@ -169,11 +169,13 @@ void TCPPort::PrepareAddress() {
|
||||||
// Socket may be in the CLOSED state if Listen()
|
// Socket may be in the CLOSED state if Listen()
|
||||||
// failed, we still want to add the socket address.
|
// failed, we still want to add the socket address.
|
||||||
RTC_LOG(LS_VERBOSE) << "Preparing TCP address, current state: "
|
RTC_LOG(LS_VERBOSE) << "Preparing TCP address, current state: "
|
||||||
<< static_cast<int>(listen_socket_->GetState());
|
<< listen_socket_->GetState();
|
||||||
AddAddress(listen_socket_->GetLocalAddress(),
|
if (listen_socket_->GetState() == rtc::AsyncPacketSocket::STATE_BOUND ||
|
||||||
listen_socket_->GetLocalAddress(), rtc::SocketAddress(),
|
listen_socket_->GetState() == rtc::AsyncPacketSocket::STATE_CLOSED)
|
||||||
TCP_PROTOCOL_NAME, "", TCPTYPE_PASSIVE_STR, LOCAL_PORT_TYPE,
|
AddAddress(listen_socket_->GetLocalAddress(),
|
||||||
ICE_TYPE_PREFERENCE_HOST_TCP, 0, "", true);
|
listen_socket_->GetLocalAddress(), rtc::SocketAddress(),
|
||||||
|
TCP_PROTOCOL_NAME, "", TCPTYPE_PASSIVE_STR, LOCAL_PORT_TYPE,
|
||||||
|
ICE_TYPE_PREFERENCE_HOST_TCP, 0, "", true);
|
||||||
} else {
|
} else {
|
||||||
RTC_LOG(LS_INFO) << ToString()
|
RTC_LOG(LS_INFO) << ToString()
|
||||||
<< ": Not listening due to firewall restrictions.";
|
<< ": Not listening due to firewall restrictions.";
|
||||||
|
|
|
@ -128,31 +128,17 @@ class RTC_EXPORT AsyncPacketSocket : public sigslot::has_slots<> {
|
||||||
// CONNECTED to CLOSED.
|
// CONNECTED to CLOSED.
|
||||||
sigslot::signal2<AsyncPacketSocket*, int> SignalClose;
|
sigslot::signal2<AsyncPacketSocket*, int> SignalClose;
|
||||||
|
|
||||||
|
// Used only for listening TCP sockets.
|
||||||
|
sigslot::signal2<AsyncPacketSocket*, AsyncPacketSocket*> SignalNewConnection;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
RTC_DISALLOW_COPY_AND_ASSIGN(AsyncPacketSocket);
|
RTC_DISALLOW_COPY_AND_ASSIGN(AsyncPacketSocket);
|
||||||
};
|
};
|
||||||
|
|
||||||
// Listen socket, producing an AsyncPacketSocket when a peer connects.
|
// TODO(bugs.webrtc.org/13065): Intended to be broken out into a separate class,
|
||||||
class RTC_EXPORT AsyncListenSocket : public sigslot::has_slots<> {
|
// after downstream has adapted the new name. The main feature to move from
|
||||||
public:
|
// AsyncPacketSocket to AsyncListenSocket is the SignalNewConnection.
|
||||||
enum class State {
|
using AsyncListenSocket = AsyncPacketSocket;
|
||||||
kClosed,
|
|
||||||
kBound,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Returns current state of the socket.
|
|
||||||
virtual State GetState() const = 0;
|
|
||||||
|
|
||||||
// Returns current local address. Address may be set to null if the
|
|
||||||
// socket is not bound yet (GetState() returns kBinding).
|
|
||||||
virtual SocketAddress GetLocalAddress() const = 0;
|
|
||||||
|
|
||||||
// Get/set options.
|
|
||||||
virtual int GetOption(Socket::Option opt, int* value) = 0;
|
|
||||||
virtual int SetOption(Socket::Option opt, int value) = 0;
|
|
||||||
|
|
||||||
sigslot::signal2<AsyncListenSocket*, AsyncPacketSocket*> SignalNewConnection;
|
|
||||||
};
|
|
||||||
|
|
||||||
void CopySocketInformationToPacketInfo(size_t packet_size_bytes,
|
void CopySocketInformationToPacketInfo(size_t packet_size_bytes,
|
||||||
const AsyncPacketSocket& socket_from,
|
const AsyncPacketSocket& socket_from,
|
||||||
|
|
|
@ -62,11 +62,16 @@ Socket* AsyncTCPSocketBase::ConnectSocket(
|
||||||
}
|
}
|
||||||
|
|
||||||
AsyncTCPSocketBase::AsyncTCPSocketBase(Socket* socket,
|
AsyncTCPSocketBase::AsyncTCPSocketBase(Socket* socket,
|
||||||
|
bool listen,
|
||||||
size_t max_packet_size)
|
size_t max_packet_size)
|
||||||
: socket_(socket),
|
: socket_(socket),
|
||||||
|
listen_(listen),
|
||||||
max_insize_(max_packet_size),
|
max_insize_(max_packet_size),
|
||||||
max_outsize_(max_packet_size) {
|
max_outsize_(max_packet_size) {
|
||||||
inbuf_.EnsureCapacity(kMinimumRecvSize);
|
if (!listen_) {
|
||||||
|
// Listening sockets don't send/receive data, so they don't need buffers.
|
||||||
|
inbuf_.EnsureCapacity(kMinimumRecvSize);
|
||||||
|
}
|
||||||
|
|
||||||
RTC_DCHECK(socket_.get() != nullptr);
|
RTC_DCHECK(socket_.get() != nullptr);
|
||||||
socket_->SignalConnectEvent.connect(this,
|
socket_->SignalConnectEvent.connect(this,
|
||||||
|
@ -74,6 +79,12 @@ AsyncTCPSocketBase::AsyncTCPSocketBase(Socket* socket,
|
||||||
socket_->SignalReadEvent.connect(this, &AsyncTCPSocketBase::OnReadEvent);
|
socket_->SignalReadEvent.connect(this, &AsyncTCPSocketBase::OnReadEvent);
|
||||||
socket_->SignalWriteEvent.connect(this, &AsyncTCPSocketBase::OnWriteEvent);
|
socket_->SignalWriteEvent.connect(this, &AsyncTCPSocketBase::OnWriteEvent);
|
||||||
socket_->SignalCloseEvent.connect(this, &AsyncTCPSocketBase::OnCloseEvent);
|
socket_->SignalCloseEvent.connect(this, &AsyncTCPSocketBase::OnCloseEvent);
|
||||||
|
|
||||||
|
if (listen_) {
|
||||||
|
if (socket_->Listen(kListenBacklog) < 0) {
|
||||||
|
RTC_LOG(LS_ERROR) << "Listen() failed with error " << socket_->GetError();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
AsyncTCPSocketBase::~AsyncTCPSocketBase() {}
|
AsyncTCPSocketBase::~AsyncTCPSocketBase() {}
|
||||||
|
@ -95,7 +106,11 @@ AsyncTCPSocket::State AsyncTCPSocketBase::GetState() const {
|
||||||
case Socket::CS_CLOSED:
|
case Socket::CS_CLOSED:
|
||||||
return STATE_CLOSED;
|
return STATE_CLOSED;
|
||||||
case Socket::CS_CONNECTING:
|
case Socket::CS_CONNECTING:
|
||||||
return STATE_CONNECTING;
|
if (listen_) {
|
||||||
|
return STATE_BOUND;
|
||||||
|
} else {
|
||||||
|
return STATE_CONNECTING;
|
||||||
|
}
|
||||||
case Socket::CS_CONNECTED:
|
case Socket::CS_CONNECTED:
|
||||||
return STATE_CONNECTED;
|
return STATE_CONNECTED;
|
||||||
default:
|
default:
|
||||||
|
@ -134,6 +149,7 @@ int AsyncTCPSocketBase::SendTo(const void* pv,
|
||||||
}
|
}
|
||||||
|
|
||||||
int AsyncTCPSocketBase::FlushOutBuffer() {
|
int AsyncTCPSocketBase::FlushOutBuffer() {
|
||||||
|
RTC_DCHECK(!listen_);
|
||||||
RTC_DCHECK_GT(outbuf_.size(), 0);
|
RTC_DCHECK_GT(outbuf_.size(), 0);
|
||||||
rtc::ArrayView<uint8_t> view = outbuf_;
|
rtc::ArrayView<uint8_t> view = outbuf_;
|
||||||
int res;
|
int res;
|
||||||
|
@ -173,6 +189,7 @@ int AsyncTCPSocketBase::FlushOutBuffer() {
|
||||||
|
|
||||||
void AsyncTCPSocketBase::AppendToOutBuffer(const void* pv, size_t cb) {
|
void AsyncTCPSocketBase::AppendToOutBuffer(const void* pv, size_t cb) {
|
||||||
RTC_DCHECK(outbuf_.size() + cb <= max_outsize_);
|
RTC_DCHECK(outbuf_.size() + cb <= max_outsize_);
|
||||||
|
RTC_DCHECK(!listen_);
|
||||||
outbuf_.AppendData(static_cast<const uint8_t*>(pv), cb);
|
outbuf_.AppendData(static_cast<const uint8_t*>(pv), cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -183,44 +200,62 @@ void AsyncTCPSocketBase::OnConnectEvent(Socket* socket) {
|
||||||
void AsyncTCPSocketBase::OnReadEvent(Socket* socket) {
|
void AsyncTCPSocketBase::OnReadEvent(Socket* socket) {
|
||||||
RTC_DCHECK(socket_.get() == socket);
|
RTC_DCHECK(socket_.get() == socket);
|
||||||
|
|
||||||
size_t total_recv = 0;
|
if (listen_) {
|
||||||
while (true) {
|
rtc::SocketAddress address;
|
||||||
size_t free_size = inbuf_.capacity() - inbuf_.size();
|
rtc::Socket* new_socket = socket->Accept(&address);
|
||||||
if (free_size < kMinimumRecvSize && inbuf_.capacity() < max_insize_) {
|
if (!new_socket) {
|
||||||
inbuf_.EnsureCapacity(std::min(max_insize_, inbuf_.capacity() * 2));
|
// TODO(stefan): Do something better like forwarding the error
|
||||||
free_size = inbuf_.capacity() - inbuf_.size();
|
// to the user.
|
||||||
|
RTC_LOG(LS_ERROR) << "TCP accept failed with error "
|
||||||
|
<< socket_->GetError();
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int len = socket_->Recv(inbuf_.data() + inbuf_.size(), free_size, nullptr);
|
HandleIncomingConnection(new_socket);
|
||||||
if (len < 0) {
|
|
||||||
// TODO(stefan): Do something better like forwarding the error to the
|
|
||||||
// user.
|
|
||||||
if (!socket_->IsBlocking()) {
|
|
||||||
RTC_LOG(LS_ERROR) << "Recv() returned error: " << socket_->GetError();
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
total_recv += len;
|
// Prime a read event in case data is waiting.
|
||||||
inbuf_.SetSize(inbuf_.size() + len);
|
new_socket->SignalReadEvent(new_socket);
|
||||||
if (!len || static_cast<size_t>(len) < free_size) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!total_recv) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t size = inbuf_.size();
|
|
||||||
ProcessInput(inbuf_.data<char>(), &size);
|
|
||||||
|
|
||||||
if (size > inbuf_.size()) {
|
|
||||||
RTC_LOG(LS_ERROR) << "input buffer overflow";
|
|
||||||
RTC_NOTREACHED();
|
|
||||||
inbuf_.Clear();
|
|
||||||
} else {
|
} else {
|
||||||
inbuf_.SetSize(size);
|
size_t total_recv = 0;
|
||||||
|
while (true) {
|
||||||
|
size_t free_size = inbuf_.capacity() - inbuf_.size();
|
||||||
|
if (free_size < kMinimumRecvSize && inbuf_.capacity() < max_insize_) {
|
||||||
|
inbuf_.EnsureCapacity(std::min(max_insize_, inbuf_.capacity() * 2));
|
||||||
|
free_size = inbuf_.capacity() - inbuf_.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
int len =
|
||||||
|
socket_->Recv(inbuf_.data() + inbuf_.size(), free_size, nullptr);
|
||||||
|
if (len < 0) {
|
||||||
|
// TODO(stefan): Do something better like forwarding the error to the
|
||||||
|
// user.
|
||||||
|
if (!socket_->IsBlocking()) {
|
||||||
|
RTC_LOG(LS_ERROR) << "Recv() returned error: " << socket_->GetError();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
total_recv += len;
|
||||||
|
inbuf_.SetSize(inbuf_.size() + len);
|
||||||
|
if (!len || static_cast<size_t>(len) < free_size) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!total_recv) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t size = inbuf_.size();
|
||||||
|
ProcessInput(inbuf_.data<char>(), &size);
|
||||||
|
|
||||||
|
if (size > inbuf_.size()) {
|
||||||
|
RTC_LOG(LS_ERROR) << "input buffer overflow";
|
||||||
|
RTC_NOTREACHED();
|
||||||
|
inbuf_.Clear();
|
||||||
|
} else {
|
||||||
|
inbuf_.SetSize(size);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -248,11 +283,12 @@ AsyncTCPSocket* AsyncTCPSocket::Create(Socket* socket,
|
||||||
const SocketAddress& bind_address,
|
const SocketAddress& bind_address,
|
||||||
const SocketAddress& remote_address) {
|
const SocketAddress& remote_address) {
|
||||||
return new AsyncTCPSocket(
|
return new AsyncTCPSocket(
|
||||||
AsyncTCPSocketBase::ConnectSocket(socket, bind_address, remote_address));
|
AsyncTCPSocketBase::ConnectSocket(socket, bind_address, remote_address),
|
||||||
|
false);
|
||||||
}
|
}
|
||||||
|
|
||||||
AsyncTCPSocket::AsyncTCPSocket(Socket* socket)
|
AsyncTCPSocket::AsyncTCPSocket(Socket* socket, bool listen)
|
||||||
: AsyncTCPSocketBase(socket, kBufSize) {}
|
: AsyncTCPSocketBase(socket, listen, kBufSize) {}
|
||||||
|
|
||||||
int AsyncTCPSocket::Send(const void* pv,
|
int AsyncTCPSocket::Send(const void* pv,
|
||||||
size_t cb,
|
size_t cb,
|
||||||
|
@ -307,59 +343,8 @@ void AsyncTCPSocket::ProcessInput(char* data, size_t* len) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
AsyncTcpListenSocket::AsyncTcpListenSocket(std::unique_ptr<Socket> socket)
|
void AsyncTCPSocket::HandleIncomingConnection(Socket* socket) {
|
||||||
: socket_(std::move(socket)) {
|
SignalNewConnection(this, new AsyncTCPSocket(socket, false));
|
||||||
RTC_DCHECK(socket_.get() != nullptr);
|
|
||||||
socket_->SignalReadEvent.connect(this, &AsyncTcpListenSocket::OnReadEvent);
|
|
||||||
if (socket_->Listen(kListenBacklog) < 0) {
|
|
||||||
RTC_LOG(LS_ERROR) << "Listen() failed with error " << socket_->GetError();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
AsyncTcpListenSocket::State AsyncTcpListenSocket::GetState() const {
|
|
||||||
switch (socket_->GetState()) {
|
|
||||||
case Socket::CS_CLOSED:
|
|
||||||
return State::kClosed;
|
|
||||||
case Socket::CS_CONNECTING:
|
|
||||||
return State::kBound;
|
|
||||||
default:
|
|
||||||
RTC_NOTREACHED();
|
|
||||||
return State::kClosed;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SocketAddress AsyncTcpListenSocket::GetLocalAddress() const {
|
|
||||||
return socket_->GetLocalAddress();
|
|
||||||
}
|
|
||||||
|
|
||||||
int AsyncTcpListenSocket::GetOption(Socket::Option opt, int* value) {
|
|
||||||
return socket_->GetOption(opt, value);
|
|
||||||
}
|
|
||||||
|
|
||||||
int AsyncTcpListenSocket::SetOption(Socket::Option opt, int value) {
|
|
||||||
return socket_->SetOption(opt, value);
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncTcpListenSocket::OnReadEvent(Socket* socket) {
|
|
||||||
RTC_DCHECK(socket_.get() == socket);
|
|
||||||
|
|
||||||
rtc::SocketAddress address;
|
|
||||||
rtc::Socket* new_socket = socket->Accept(&address);
|
|
||||||
if (!new_socket) {
|
|
||||||
// TODO(stefan): Do something better like forwarding the error
|
|
||||||
// to the user.
|
|
||||||
RTC_LOG(LS_ERROR) << "TCP accept failed with error " << socket_->GetError();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
HandleIncomingConnection(new_socket);
|
|
||||||
|
|
||||||
// Prime a read event in case data is waiting.
|
|
||||||
new_socket->SignalReadEvent(new_socket);
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncTcpListenSocket::HandleIncomingConnection(Socket* socket) {
|
|
||||||
SignalNewConnection(this, new AsyncTCPSocket(socket));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace rtc
|
} // namespace rtc
|
||||||
|
|
|
@ -28,7 +28,7 @@ namespace rtc {
|
||||||
// buffer them in user space.
|
// buffer them in user space.
|
||||||
class AsyncTCPSocketBase : public AsyncPacketSocket {
|
class AsyncTCPSocketBase : public AsyncPacketSocket {
|
||||||
public:
|
public:
|
||||||
AsyncTCPSocketBase(Socket* socket, size_t max_packet_size);
|
AsyncTCPSocketBase(Socket* socket, bool listen, size_t max_packet_size);
|
||||||
~AsyncTCPSocketBase() override;
|
~AsyncTCPSocketBase() override;
|
||||||
|
|
||||||
// Pure virtual methods to send and recv data.
|
// Pure virtual methods to send and recv data.
|
||||||
|
@ -36,6 +36,8 @@ class AsyncTCPSocketBase : public AsyncPacketSocket {
|
||||||
size_t cb,
|
size_t cb,
|
||||||
const rtc::PacketOptions& options) override = 0;
|
const rtc::PacketOptions& options) override = 0;
|
||||||
virtual void ProcessInput(char* data, size_t* len) = 0;
|
virtual void ProcessInput(char* data, size_t* len) = 0;
|
||||||
|
// Signals incoming connection.
|
||||||
|
virtual void HandleIncomingConnection(Socket* socket) = 0;
|
||||||
|
|
||||||
SocketAddress GetLocalAddress() const override;
|
SocketAddress GetLocalAddress() const override;
|
||||||
SocketAddress GetRemoteAddress() const override;
|
SocketAddress GetRemoteAddress() const override;
|
||||||
|
@ -74,6 +76,7 @@ class AsyncTCPSocketBase : public AsyncPacketSocket {
|
||||||
void OnCloseEvent(Socket* socket, int error);
|
void OnCloseEvent(Socket* socket, int error);
|
||||||
|
|
||||||
std::unique_ptr<Socket> socket_;
|
std::unique_ptr<Socket> socket_;
|
||||||
|
bool listen_;
|
||||||
Buffer inbuf_;
|
Buffer inbuf_;
|
||||||
Buffer outbuf_;
|
Buffer outbuf_;
|
||||||
size_t max_insize_;
|
size_t max_insize_;
|
||||||
|
@ -90,37 +93,19 @@ class AsyncTCPSocket : public AsyncTCPSocketBase {
|
||||||
static AsyncTCPSocket* Create(Socket* socket,
|
static AsyncTCPSocket* Create(Socket* socket,
|
||||||
const SocketAddress& bind_address,
|
const SocketAddress& bind_address,
|
||||||
const SocketAddress& remote_address);
|
const SocketAddress& remote_address);
|
||||||
explicit AsyncTCPSocket(Socket* socket);
|
AsyncTCPSocket(Socket* socket, bool listen);
|
||||||
~AsyncTCPSocket() override {}
|
~AsyncTCPSocket() override {}
|
||||||
|
|
||||||
int Send(const void* pv,
|
int Send(const void* pv,
|
||||||
size_t cb,
|
size_t cb,
|
||||||
const rtc::PacketOptions& options) override;
|
const rtc::PacketOptions& options) override;
|
||||||
void ProcessInput(char* data, size_t* len) override;
|
void ProcessInput(char* data, size_t* len) override;
|
||||||
|
void HandleIncomingConnection(Socket* socket) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
RTC_DISALLOW_COPY_AND_ASSIGN(AsyncTCPSocket);
|
RTC_DISALLOW_COPY_AND_ASSIGN(AsyncTCPSocket);
|
||||||
};
|
};
|
||||||
|
|
||||||
class AsyncTcpListenSocket : public AsyncListenSocket {
|
|
||||||
public:
|
|
||||||
explicit AsyncTcpListenSocket(std::unique_ptr<Socket> socket);
|
|
||||||
|
|
||||||
State GetState() const override;
|
|
||||||
SocketAddress GetLocalAddress() const override;
|
|
||||||
|
|
||||||
int GetOption(Socket::Option opt, int* value) override;
|
|
||||||
int SetOption(Socket::Option opt, int value) override;
|
|
||||||
|
|
||||||
virtual void HandleIncomingConnection(rtc::Socket* socket);
|
|
||||||
|
|
||||||
private:
|
|
||||||
// Called by the underlying socket
|
|
||||||
void OnReadEvent(Socket* socket);
|
|
||||||
|
|
||||||
std::unique_ptr<Socket> socket_;
|
|
||||||
};
|
|
||||||
|
|
||||||
} // namespace rtc
|
} // namespace rtc
|
||||||
|
|
||||||
#endif // RTC_BASE_ASYNC_TCP_SOCKET_H_
|
#endif // RTC_BASE_ASYNC_TCP_SOCKET_H_
|
||||||
|
|
|
@ -56,7 +56,7 @@ TestClient* CreateTestClient(SocketFactory* factory,
|
||||||
}
|
}
|
||||||
|
|
||||||
TestClient* CreateTCPTestClient(Socket* socket) {
|
TestClient* CreateTCPTestClient(Socket* socket) {
|
||||||
return new TestClient(std::make_unique<AsyncTCPSocket>(socket));
|
return new TestClient(std::make_unique<AsyncTCPSocket>(socket, false));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests that when sending from internal_addr to external_addrs through the
|
// Tests that when sending from internal_addr to external_addrs through the
|
||||||
|
|
|
@ -41,7 +41,7 @@ class TestEchoServer : public sigslot::has_slots<> {
|
||||||
void OnAccept(Socket* socket) {
|
void OnAccept(Socket* socket) {
|
||||||
Socket* raw_socket = socket->Accept(nullptr);
|
Socket* raw_socket = socket->Accept(nullptr);
|
||||||
if (raw_socket) {
|
if (raw_socket) {
|
||||||
AsyncTCPSocket* packet_socket = new AsyncTCPSocket(raw_socket);
|
AsyncTCPSocket* packet_socket = new AsyncTCPSocket(raw_socket, false);
|
||||||
packet_socket->SignalReadPacket.connect(this, &TestEchoServer::OnPacket);
|
packet_socket->SignalReadPacket.connect(this, &TestEchoServer::OnPacket);
|
||||||
packet_socket->SignalClose.connect(this, &TestEchoServer::OnClose);
|
packet_socket->SignalClose.connect(this, &TestEchoServer::OnClose);
|
||||||
client_sockets_.push_back(packet_socket);
|
client_sockets_.push_back(packet_socket);
|
||||||
|
|
Loading…
Reference in a new issue