mirror of
https://github.com/mollyim/webrtc.git
synced 2025-05-13 05:40:42 +01:00
Reapply "Refactor AsyncTcpSocket(s) to use rtc::ReceivedPackets"
This reverts commit 264547d084
.
Refactor AsyncTcpSocket(s) to use rtc::ReceivedPackets
Patchset 1 contains original cl.
Newer patchsets contains fix of the problem from pathset 1.
Bug: webrtc:15368, webrtc:11943
Change-Id: Ib8c4c06daf502a5dec8c31beea78eacac8c3c644
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/328820
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Reviewed-by: Jonas Oreland <jonaso@google.com>
Cr-Commit-Position: refs/heads/main@{#41255}
This commit is contained in:
parent
b202bc1db2
commit
357947f2f0
5 changed files with 84 additions and 41 deletions
|
@ -14,9 +14,15 @@
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
|
||||||
|
#include <cstddef>
|
||||||
|
#include <cstdint>
|
||||||
|
|
||||||
|
#include "api/array_view.h"
|
||||||
#include "api/transport/stun.h"
|
#include "api/transport/stun.h"
|
||||||
|
#include "api/units/timestamp.h"
|
||||||
#include "rtc_base/byte_order.h"
|
#include "rtc_base/byte_order.h"
|
||||||
#include "rtc_base/checks.h"
|
#include "rtc_base/checks.h"
|
||||||
|
#include "rtc_base/network/received_packet.h"
|
||||||
#include "rtc_base/network/sent_packet.h"
|
#include "rtc_base/network/sent_packet.h"
|
||||||
#include "rtc_base/time_utils.h"
|
#include "rtc_base/time_utils.h"
|
||||||
|
|
||||||
|
@ -89,7 +95,7 @@ int AsyncStunTCPSocket::Send(const void* pv,
|
||||||
return static_cast<int>(cb);
|
return static_cast<int>(cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncStunTCPSocket::ProcessInput(char* data, size_t* len) {
|
size_t AsyncStunTCPSocket::ProcessInput(rtc::ArrayView<const uint8_t> data) {
|
||||||
rtc::SocketAddress remote_addr(GetRemoteAddress());
|
rtc::SocketAddress remote_addr(GetRemoteAddress());
|
||||||
// STUN packet - First 4 bytes. Total header size is 20 bytes.
|
// STUN packet - First 4 bytes. Total header size is 20 bytes.
|
||||||
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
||||||
|
@ -101,26 +107,27 @@ void AsyncStunTCPSocket::ProcessInput(char* data, size_t* len) {
|
||||||
// | Channel Number | Length |
|
// | Channel Number | Length |
|
||||||
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
||||||
|
|
||||||
|
size_t processed_bytes = 0;
|
||||||
while (true) {
|
while (true) {
|
||||||
|
size_t bytes_left = data.size() - processed_bytes;
|
||||||
// We need at least 4 bytes to read the STUN or ChannelData packet length.
|
// We need at least 4 bytes to read the STUN or ChannelData packet length.
|
||||||
if (*len < kPacketLenOffset + kPacketLenSize)
|
if (bytes_left < kPacketLenOffset + kPacketLenSize)
|
||||||
return;
|
return processed_bytes;
|
||||||
|
|
||||||
int pad_bytes;
|
int pad_bytes;
|
||||||
size_t expected_pkt_len = GetExpectedLength(data, *len, &pad_bytes);
|
size_t expected_pkt_len = GetExpectedLength(data.data() + processed_bytes,
|
||||||
|
bytes_left, &pad_bytes);
|
||||||
size_t actual_length = expected_pkt_len + pad_bytes;
|
size_t actual_length = expected_pkt_len + pad_bytes;
|
||||||
|
|
||||||
if (*len < actual_length) {
|
if (bytes_left < actual_length) {
|
||||||
return;
|
return processed_bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
SignalReadPacket(this, data, expected_pkt_len, remote_addr,
|
rtc::ReceivedPacket received_packet(
|
||||||
rtc::TimeMicros());
|
data.subview(processed_bytes, expected_pkt_len), remote_addr,
|
||||||
|
webrtc::Timestamp::Micros(rtc::TimeMicros()));
|
||||||
*len -= actual_length;
|
NotifyPacketReceived(received_packet);
|
||||||
if (*len > 0) {
|
processed_bytes += actual_length;
|
||||||
memmove(data, data + actual_length, *len);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,7 @@ class AsyncStunTCPSocket : public rtc::AsyncTCPSocketBase {
|
||||||
int Send(const void* pv,
|
int Send(const void* pv,
|
||||||
size_t cb,
|
size_t cb,
|
||||||
const rtc::PacketOptions& options) override;
|
const rtc::PacketOptions& options) override;
|
||||||
void ProcessInput(char* data, size_t* len) override;
|
size_t ProcessInput(rtc::ArrayView<const uint8_t> data) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// This method returns the message hdr + length written in the header.
|
// This method returns the message hdr + length written in the header.
|
||||||
|
|
|
@ -13,12 +13,17 @@
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
|
||||||
|
#include <cstdint>
|
||||||
#include <list>
|
#include <list>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
#include "absl/memory/memory.h"
|
#include "absl/memory/memory.h"
|
||||||
|
#include "api/array_view.h"
|
||||||
|
#include "rtc_base/buffer.h"
|
||||||
|
#include "rtc_base/byte_buffer.h"
|
||||||
|
#include "rtc_base/network/received_packet.h"
|
||||||
#include "rtc_base/network/sent_packet.h"
|
#include "rtc_base/network/sent_packet.h"
|
||||||
#include "rtc_base/socket.h"
|
#include "rtc_base/socket.h"
|
||||||
#include "rtc_base/third_party/sigslot/sigslot.h"
|
#include "rtc_base/third_party/sigslot/sigslot.h"
|
||||||
|
@ -96,11 +101,10 @@ class AsyncStunTCPSocketTest : public ::testing::Test,
|
||||||
}
|
}
|
||||||
|
|
||||||
void OnReadPacket(rtc::AsyncPacketSocket* socket,
|
void OnReadPacket(rtc::AsyncPacketSocket* socket,
|
||||||
const char* data,
|
const rtc::ReceivedPacket& packet) {
|
||||||
size_t len,
|
recv_packets_.push_back(
|
||||||
const rtc::SocketAddress& remote_addr,
|
std::string(reinterpret_cast<const char*>(packet.payload().data()),
|
||||||
const int64_t& /* packet_time_us */) {
|
packet.payload().size()));
|
||||||
recv_packets_.push_back(std::string(data, len));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void OnSentPacket(rtc::AsyncPacketSocket* socket,
|
void OnSentPacket(rtc::AsyncPacketSocket* socket,
|
||||||
|
@ -111,8 +115,10 @@ class AsyncStunTCPSocketTest : public ::testing::Test,
|
||||||
void OnNewConnection(rtc::AsyncListenSocket* /*server*/,
|
void OnNewConnection(rtc::AsyncListenSocket* /*server*/,
|
||||||
rtc::AsyncPacketSocket* new_socket) {
|
rtc::AsyncPacketSocket* new_socket) {
|
||||||
recv_socket_ = absl::WrapUnique(new_socket);
|
recv_socket_ = absl::WrapUnique(new_socket);
|
||||||
new_socket->SignalReadPacket.connect(this,
|
new_socket->RegisterReceivedPacketCallback(
|
||||||
&AsyncStunTCPSocketTest::OnReadPacket);
|
[&](rtc::AsyncPacketSocket* socket, const rtc::ReceivedPacket& packet) {
|
||||||
|
OnReadPacket(socket, packet);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Send(const void* data, size_t len) {
|
bool Send(const void* data, size_t len) {
|
||||||
|
@ -164,6 +170,30 @@ TEST_F(AsyncStunTCPSocketTest, TestMultipleStunPackets) {
|
||||||
EXPECT_EQ(4u, recv_packets_.size());
|
EXPECT_EQ(4u, recv_packets_.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(AsyncStunTCPSocketTest, ProcessInputHandlesMultiplePackets) {
|
||||||
|
send_socket_->RegisterReceivedPacketCallback(
|
||||||
|
[&](rtc::AsyncPacketSocket* socket, const rtc::ReceivedPacket& packet) {
|
||||||
|
recv_packets_.push_back(
|
||||||
|
std::string(reinterpret_cast<const char*>(packet.payload().data()),
|
||||||
|
packet.payload().size()));
|
||||||
|
});
|
||||||
|
rtc::Buffer buffer;
|
||||||
|
buffer.AppendData(kStunMessageWithZeroLength,
|
||||||
|
sizeof(kStunMessageWithZeroLength));
|
||||||
|
// ChannelData message MUST be padded to
|
||||||
|
// a multiple of four bytes.
|
||||||
|
const unsigned char kTurnChannelData[] = {
|
||||||
|
0x40, 0x00, 0x00, 0x04, 0x21, 0x12, 0xA4, 0x42,
|
||||||
|
};
|
||||||
|
buffer.AppendData(kTurnChannelData, sizeof(kTurnChannelData));
|
||||||
|
|
||||||
|
send_socket_->ProcessInput(buffer);
|
||||||
|
EXPECT_EQ(2u, recv_packets_.size());
|
||||||
|
EXPECT_TRUE(CheckData(kStunMessageWithZeroLength,
|
||||||
|
sizeof(kStunMessageWithZeroLength)));
|
||||||
|
EXPECT_TRUE(CheckData(kTurnChannelData, sizeof(kTurnChannelData)));
|
||||||
|
}
|
||||||
|
|
||||||
// Verifying TURN channel data message with zero length.
|
// Verifying TURN channel data message with zero length.
|
||||||
TEST_F(AsyncStunTCPSocketTest, TestTurnChannelDataWithZeroLength) {
|
TEST_F(AsyncStunTCPSocketTest, TestTurnChannelDataWithZeroLength) {
|
||||||
EXPECT_TRUE(Send(kTurnChannelDataMessageWithZeroLength,
|
EXPECT_TRUE(Send(kTurnChannelDataMessageWithZeroLength,
|
||||||
|
|
|
@ -14,6 +14,8 @@
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
#include <cstddef>
|
||||||
|
#include <cstdint>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
|
||||||
#include "api/array_view.h"
|
#include "api/array_view.h"
|
||||||
|
@ -209,15 +211,17 @@ void AsyncTCPSocketBase::OnReadEvent(Socket* socket) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t size = inbuf_.size();
|
size_t processed = ProcessInput(inbuf_);
|
||||||
ProcessInput(inbuf_.data<char>(), &size);
|
size_t bytes_remaining = inbuf_.size() - processed;
|
||||||
|
if (processed > inbuf_.size()) {
|
||||||
if (size > inbuf_.size()) {
|
|
||||||
RTC_LOG(LS_ERROR) << "input buffer overflow";
|
RTC_LOG(LS_ERROR) << "input buffer overflow";
|
||||||
RTC_DCHECK_NOTREACHED();
|
RTC_DCHECK_NOTREACHED();
|
||||||
inbuf_.Clear();
|
inbuf_.Clear();
|
||||||
} else {
|
} else {
|
||||||
inbuf_.SetSize(size);
|
if (bytes_remaining > 0) {
|
||||||
|
memmove(inbuf_.data(), inbuf_.data() + processed, bytes_remaining);
|
||||||
|
}
|
||||||
|
inbuf_.SetSize(bytes_remaining);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -283,24 +287,24 @@ int AsyncTCPSocket::Send(const void* pv,
|
||||||
return static_cast<int>(cb);
|
return static_cast<int>(cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncTCPSocket::ProcessInput(char* data, size_t* len) {
|
size_t AsyncTCPSocket::ProcessInput(rtc::ArrayView<const uint8_t> data) {
|
||||||
SocketAddress remote_addr(GetRemoteAddress());
|
SocketAddress remote_addr(GetRemoteAddress());
|
||||||
|
|
||||||
|
size_t processed_bytes = 0;
|
||||||
while (true) {
|
while (true) {
|
||||||
if (*len < kPacketLenSize)
|
size_t bytes_left = data.size() - processed_bytes;
|
||||||
return;
|
if (bytes_left < kPacketLenSize)
|
||||||
|
return processed_bytes;
|
||||||
|
|
||||||
PacketLength pkt_len = rtc::GetBE16(data);
|
PacketLength pkt_len = rtc::GetBE16(data.data() + processed_bytes);
|
||||||
if (*len < kPacketLenSize + pkt_len)
|
if (bytes_left < kPacketLenSize + pkt_len)
|
||||||
return;
|
return processed_bytes;
|
||||||
|
|
||||||
NotifyPacketReceived(rtc::ReceivedPacket::CreateFromLegacy(
|
rtc::ReceivedPacket received_packet(
|
||||||
data + kPacketLenSize, pkt_len, rtc::TimeMicros(), remote_addr));
|
data.subview(processed_bytes + kPacketLenSize, pkt_len), remote_addr,
|
||||||
|
webrtc::Timestamp::Micros(rtc::TimeMicros()));
|
||||||
*len -= kPacketLenSize + pkt_len;
|
NotifyPacketReceived(received_packet);
|
||||||
if (*len > 0) {
|
processed_bytes += kPacketLenSize + pkt_len;
|
||||||
memmove(data, data + kPacketLenSize + pkt_len, *len);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
|
||||||
|
#include "api/array_view.h"
|
||||||
#include "rtc_base/async_packet_socket.h"
|
#include "rtc_base/async_packet_socket.h"
|
||||||
#include "rtc_base/buffer.h"
|
#include "rtc_base/buffer.h"
|
||||||
#include "rtc_base/socket.h"
|
#include "rtc_base/socket.h"
|
||||||
|
@ -38,7 +39,8 @@ class AsyncTCPSocketBase : public AsyncPacketSocket {
|
||||||
int Send(const void* pv,
|
int Send(const void* pv,
|
||||||
size_t cb,
|
size_t cb,
|
||||||
const rtc::PacketOptions& options) override = 0;
|
const rtc::PacketOptions& options) override = 0;
|
||||||
virtual void ProcessInput(char* data, size_t* len) = 0;
|
// Must return the number of bytes processed.
|
||||||
|
virtual size_t ProcessInput(rtc::ArrayView<const uint8_t> data) = 0;
|
||||||
|
|
||||||
SocketAddress GetLocalAddress() const override;
|
SocketAddress GetLocalAddress() const override;
|
||||||
SocketAddress GetRemoteAddress() const override;
|
SocketAddress GetRemoteAddress() const override;
|
||||||
|
@ -100,7 +102,7 @@ class AsyncTCPSocket : public AsyncTCPSocketBase {
|
||||||
int Send(const void* pv,
|
int Send(const void* pv,
|
||||||
size_t cb,
|
size_t cb,
|
||||||
const rtc::PacketOptions& options) override;
|
const rtc::PacketOptions& options) override;
|
||||||
void ProcessInput(char* data, size_t* len) override;
|
size_t ProcessInput(rtc::ArrayView<const uint8_t>) override;
|
||||||
};
|
};
|
||||||
|
|
||||||
class AsyncTcpListenSocket : public AsyncListenSocket {
|
class AsyncTcpListenSocket : public AsyncListenSocket {
|
||||||
|
|
Loading…
Reference in a new issue