mirror of
https://github.com/mollyim/webrtc.git
synced 2025-05-13 05:40:42 +01:00
AsyncTCPSocket: try sending outgoing data until EWOULDBLOCK
The AsyncTCPSocket is an AsyncPacketSocket which means it emulates UDP-like (packet) semantics via a TCP stream. When sending, if the entire packet could not be written then the packet socket should indicate it wrote the whole thing and flush out the remaining later when the socket is available. The WriteEvent signal was already wired up but was not getting fired (at least with the virtual sockets) since it would not call Send() enough on the underlying socket to get an EWOULDBLOCK that would register the async event. This changes AsyncTCPSocket to repeatedly call Send() on the underlying socket until the entire packet has been written or EWOULDBLOCK was returned. Bug: webrtc:6655 Change-Id: I41e81e0c106c9b3e712a8a0f792d28745d93f2d2 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/168083 Reviewed-by: Qingsi Wang <qingsi@webrtc.org> Commit-Queue: Steve Anton <steveanton@webrtc.org> Cr-Commit-Position: refs/heads/master@{#30449}
This commit is contained in:
parent
2181228624
commit
3fa2b80e14
3 changed files with 38 additions and 36 deletions
|
@ -106,10 +106,10 @@ class AsyncStunTCPSocketTest : public ::testing::Test,
|
||||||
|
|
||||||
bool Send(const void* data, size_t len) {
|
bool Send(const void* data, size_t len) {
|
||||||
rtc::PacketOptions options;
|
rtc::PacketOptions options;
|
||||||
size_t ret =
|
int ret =
|
||||||
send_socket_->Send(reinterpret_cast<const char*>(data), len, options);
|
send_socket_->Send(reinterpret_cast<const char*>(data), len, options);
|
||||||
vss_->ProcessMessagesUntilIdle();
|
vss_->ProcessMessagesUntilIdle();
|
||||||
return (ret == len);
|
return (ret == static_cast<int>(len));
|
||||||
}
|
}
|
||||||
|
|
||||||
bool CheckData(const void* data, int len) {
|
bool CheckData(const void* data, int len) {
|
||||||
|
@ -224,10 +224,6 @@ TEST_F(AsyncStunTCPSocketTest, TestTooSmallMessageBuffer) {
|
||||||
|
|
||||||
// Verifying a legal large turn message.
|
// Verifying a legal large turn message.
|
||||||
TEST_F(AsyncStunTCPSocketTest, TestMaximumSizeTurnPacket) {
|
TEST_F(AsyncStunTCPSocketTest, TestMaximumSizeTurnPacket) {
|
||||||
// We have problem in getting the SignalWriteEvent from the virtual socket
|
|
||||||
// server. So increasing the send buffer to 64k.
|
|
||||||
// TODO(mallinath) - Remove this setting after we fix vss issue.
|
|
||||||
vss_->set_send_buffer_capacity(64 * 1024);
|
|
||||||
unsigned char packet[65539];
|
unsigned char packet[65539];
|
||||||
packet[0] = 0x40;
|
packet[0] = 0x40;
|
||||||
packet[1] = 0x00;
|
packet[1] = 0x00;
|
||||||
|
@ -238,10 +234,6 @@ TEST_F(AsyncStunTCPSocketTest, TestMaximumSizeTurnPacket) {
|
||||||
|
|
||||||
// Verifying a legal large stun message.
|
// Verifying a legal large stun message.
|
||||||
TEST_F(AsyncStunTCPSocketTest, TestMaximumSizeStunPacket) {
|
TEST_F(AsyncStunTCPSocketTest, TestMaximumSizeStunPacket) {
|
||||||
// We have problem in getting the SignalWriteEvent from the virtual socket
|
|
||||||
// server. So increasing the send buffer to 64k.
|
|
||||||
// TODO(mallinath) - Remove this setting after we fix vss issue.
|
|
||||||
vss_->set_send_buffer_capacity(64 * 1024);
|
|
||||||
unsigned char packet[65552];
|
unsigned char packet[65552];
|
||||||
packet[0] = 0x00;
|
packet[0] = 0x00;
|
||||||
packet[1] = 0x01;
|
packet[1] = 0x01;
|
||||||
|
@ -250,8 +242,9 @@ TEST_F(AsyncStunTCPSocketTest, TestMaximumSizeStunPacket) {
|
||||||
EXPECT_TRUE(Send(packet, sizeof(packet)));
|
EXPECT_TRUE(Send(packet, sizeof(packet)));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Investigate why WriteEvent is not signaled from VSS.
|
// Test that a turn message is sent completely even if it exceeds the socket
|
||||||
TEST_F(AsyncStunTCPSocketTest, DISABLED_TestWithSmallSendBuffer) {
|
// send buffer capacity.
|
||||||
|
TEST_F(AsyncStunTCPSocketTest, TestWithSmallSendBuffer) {
|
||||||
vss_->set_send_buffer_capacity(1);
|
vss_->set_send_buffer_capacity(1);
|
||||||
Send(kTurnChannelDataMessageWithOddLength,
|
Send(kTurnChannelDataMessageWithOddLength,
|
||||||
sizeof(kTurnChannelDataMessageWithOddLength));
|
sizeof(kTurnChannelDataMessageWithOddLength));
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
|
||||||
|
#include "api/array_view.h"
|
||||||
#include "rtc_base/byte_order.h"
|
#include "rtc_base/byte_order.h"
|
||||||
#include "rtc_base/checks.h"
|
#include "rtc_base/checks.h"
|
||||||
#include "rtc_base/logging.h"
|
#include "rtc_base/logging.h"
|
||||||
|
@ -147,33 +148,42 @@ int AsyncTCPSocketBase::SendTo(const void* pv,
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int AsyncTCPSocketBase::SendRaw(const void* pv, size_t cb) {
|
|
||||||
if (outbuf_.size() + cb > max_outsize_) {
|
|
||||||
socket_->SetError(EMSGSIZE);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
RTC_DCHECK(!listen_);
|
|
||||||
outbuf_.AppendData(static_cast<const uint8_t*>(pv), cb);
|
|
||||||
|
|
||||||
return FlushOutBuffer();
|
|
||||||
}
|
|
||||||
|
|
||||||
int AsyncTCPSocketBase::FlushOutBuffer() {
|
int AsyncTCPSocketBase::FlushOutBuffer() {
|
||||||
RTC_DCHECK(!listen_);
|
RTC_DCHECK(!listen_);
|
||||||
int res = socket_->Send(outbuf_.data(), outbuf_.size());
|
RTC_DCHECK_GT(outbuf_.size(), 0);
|
||||||
if (res <= 0) {
|
rtc::ArrayView<uint8_t> view = outbuf_;
|
||||||
return res;
|
int res;
|
||||||
|
while (view.size() > 0) {
|
||||||
|
res = socket_->Send(view.data(), view.size());
|
||||||
|
if (res <= 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (static_cast<size_t>(res) > view.size()) {
|
||||||
|
RTC_NOTREACHED();
|
||||||
|
res = -1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
view = view.subview(res);
|
||||||
}
|
}
|
||||||
if (static_cast<size_t>(res) > outbuf_.size()) {
|
if (res > 0) {
|
||||||
RTC_NOTREACHED();
|
// The output buffer may have been written out over multiple partial Send(),
|
||||||
return -1;
|
// so reconstruct the total written length.
|
||||||
|
RTC_DCHECK_EQ(view.size(), 0);
|
||||||
|
res = outbuf_.size();
|
||||||
|
outbuf_.Clear();
|
||||||
|
} else {
|
||||||
|
// There was an error when calling Send(), so there will still be data left
|
||||||
|
// to send at a later point.
|
||||||
|
RTC_DCHECK_GT(view.size(), 0);
|
||||||
|
// In the special case of EWOULDBLOCK, signal that we had a partial write.
|
||||||
|
if (socket_->GetError() == EWOULDBLOCK) {
|
||||||
|
res = outbuf_.size() - view.size();
|
||||||
|
}
|
||||||
|
if (view.size() < outbuf_.size()) {
|
||||||
|
memmove(outbuf_.data(), view.data(), view.size());
|
||||||
|
outbuf_.SetSize(view.size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
size_t new_size = outbuf_.size() - res;
|
|
||||||
if (new_size > 0) {
|
|
||||||
memmove(outbuf_.data(), outbuf_.data() + res, new_size);
|
|
||||||
}
|
|
||||||
outbuf_.SetSize(new_size);
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -61,7 +61,6 @@ class AsyncTCPSocketBase : public AsyncPacketSocket {
|
||||||
static AsyncSocket* ConnectSocket(AsyncSocket* socket,
|
static AsyncSocket* ConnectSocket(AsyncSocket* socket,
|
||||||
const SocketAddress& bind_address,
|
const SocketAddress& bind_address,
|
||||||
const SocketAddress& remote_address);
|
const SocketAddress& remote_address);
|
||||||
virtual int SendRaw(const void* pv, size_t cb);
|
|
||||||
int FlushOutBuffer();
|
int FlushOutBuffer();
|
||||||
// Add data to |outbuf_|.
|
// Add data to |outbuf_|.
|
||||||
void AppendToOutBuffer(const void* pv, size_t cb);
|
void AppendToOutBuffer(const void* pv, size_t cb);
|
||||||
|
|
Loading…
Reference in a new issue