Introduce Socket::ReceiveBuffer and RecvFrom(ReceiveBuffer& buffer)

Intention is to gradually stop using raw pointers and make it easier to introduce
new meta data types.
A  default implementation is added that  use existing int RecvFrom(void*
pv,..)
In this cl, async_udp_socket.cc use the new method. There should be no
behaviour change.

Bug: webrtc:15368
Change-Id: I8f9773a65d24ab5bbac3534dcc37ee1ed874a2c7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/332200
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Reviewed-by: Jonas Oreland <jonaso@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41497}
This commit is contained in:
Per K 2023-12-22 13:24:40 +01:00 committed by WebRTC LUCI CQ
parent 98b0da181b
commit f81af2f8fd
5 changed files with 73 additions and 22 deletions

View file

@ -1105,13 +1105,16 @@ rtc_library("socket") {
"socket.h",
]
deps = [
":buffer",
":macromagic",
":socket_address",
"../api/units:timestamp",
"third_party/sigslot",
]
if (is_win) {
deps += [ ":win32" ]
}
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
rtc_source_set("network_constants") {
@ -1342,7 +1345,9 @@ rtc_library("async_udp_socket") {
":socket_factory",
":timeutils",
"../api:sequence_checker",
"../api/units:time_delta",
"../system_wrappers:field_trial",
"network:received_packet",
"network:sent_packet",
"system:no_unique_address",
]

View file

@ -10,9 +10,11 @@
#include "rtc_base/async_udp_socket.h"
#include "absl/types/optional.h"
#include "api/units/time_delta.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/network/received_packet.h"
#include "rtc_base/network/sent_packet.h"
#include "rtc_base/time_utils.h"
#include "system_wrappers/include/field_trial.h"
@ -109,10 +111,8 @@ void AsyncUDPSocket::OnReadEvent(Socket* socket) {
RTC_DCHECK(socket_.get() == socket);
RTC_DCHECK_RUN_ON(&sequence_checker_);
SocketAddress remote_addr;
int64_t timestamp = -1;
int len = socket_->RecvFrom(buf_, BUF_SIZE, &remote_addr, &timestamp);
Socket::ReceiveBuffer receive_buffer(buffer_);
int len = socket_->RecvFrom(receive_buffer);
if (len < 0) {
// An error here typically means we got an ICMP error in response to our
// send datagram, indicating the remote address was unreachable.
@ -123,21 +123,31 @@ void AsyncUDPSocket::OnReadEvent(Socket* socket) {
<< "] receive failed with error " << socket_->GetError();
return;
}
if (timestamp == -1) {
// Timestamp from socket is not available.
timestamp = TimeMicros();
} else {
if (!socket_time_offset_) {
socket_time_offset_ =
!IsScmTimeStampExperimentDisabled() ? TimeMicros() - timestamp : 0;
}
timestamp += *socket_time_offset_;
if (len == 0) {
// Spurios wakeup.
return;
}
// TODO: Make sure that we got all of the packet.
// If we did not, then we should resize our buffer to be large enough.
NotifyPacketReceived(
rtc::ReceivedPacket::CreateFromLegacy(buf_, len, timestamp, remote_addr));
if (!receive_buffer.arrival_time) {
// Timestamp from socket is not available.
receive_buffer.arrival_time = webrtc::Timestamp::Micros(rtc::TimeMicros());
} else {
if (!socket_time_offset_) {
// Estimate timestamp offset from first packet arrival time unless
// disabled
bool estimate_time_offset = !IsScmTimeStampExperimentDisabled();
if (estimate_time_offset) {
socket_time_offset_ = webrtc::Timestamp::Micros(rtc::TimeMicros()) -
*receive_buffer.arrival_time;
} else {
socket_time_offset_ = webrtc::TimeDelta::Micros(0);
}
}
*receive_buffer.arrival_time += *socket_time_offset_;
}
NotifyPacketReceived(ReceivedPacket(receive_buffer.payload,
receive_buffer.source_address,
receive_buffer.arrival_time));
}
void AsyncUDPSocket::OnWriteEvent(Socket* socket) {

View file

@ -18,6 +18,7 @@
#include "absl/types/optional.h"
#include "api/sequence_checker.h"
#include "api/units/time_delta.h"
#include "rtc_base/async_packet_socket.h"
#include "rtc_base/socket.h"
#include "rtc_base/socket_address.h"
@ -68,9 +69,9 @@ class AsyncUDPSocket : public AsyncPacketSocket {
RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker sequence_checker_;
std::unique_ptr<Socket> socket_;
static constexpr int BUF_SIZE = 64 * 1024;
char buf_[BUF_SIZE] RTC_GUARDED_BY(sequence_checker_);
absl::optional<int64_t> socket_time_offset_ RTC_GUARDED_BY(sequence_checker_);
rtc::Buffer buffer_ RTC_GUARDED_BY(sequence_checker_);
absl::optional<webrtc::TimeDelta> socket_time_offset_
RTC_GUARDED_BY(sequence_checker_);
};
} // namespace rtc

View file

@ -10,4 +10,24 @@
#include "rtc_base/socket.h"
namespace rtc {} // namespace rtc
#include <cstdint>
#include "rtc_base/buffer.h"
namespace rtc {
int Socket::RecvFrom(ReceiveBuffer& buffer) {
static constexpr int BUF_SIZE = 64 * 1024;
int64_t timestamp = -1;
buffer.payload.EnsureCapacity(BUF_SIZE);
int len = RecvFrom(buffer.payload.data(), buffer.payload.capacity(),
&buffer.source_address, &timestamp);
buffer.payload.SetSize(len > 0 ? len : 0);
if (len > 0 && timestamp != -1) {
buffer.arrival_time = webrtc::Timestamp::Micros(timestamp);
}
return len;
}
} // namespace rtc

View file

@ -13,6 +13,8 @@
#include <errno.h>
#include "absl/types/optional.h"
#if defined(WEBRTC_POSIX)
#include <arpa/inet.h>
#include <netinet/in.h>
@ -25,6 +27,8 @@
#include "rtc_base/win32.h"
#endif
#include "api/units/timestamp.h"
#include "rtc_base/buffer.h"
#include "rtc_base/socket_address.h"
#include "rtc_base/third_party/sigslot/sigslot.h"
@ -80,6 +84,13 @@ inline bool IsBlockingError(int e) {
// methods match those of normal UNIX sockets very closely.
class Socket {
public:
struct ReceiveBuffer {
ReceiveBuffer(rtc::Buffer& payload) : payload(payload) {}
absl::optional<webrtc::Timestamp> arrival_time;
SocketAddress source_address;
rtc::Buffer& payload;
};
virtual ~Socket() {}
Socket(const Socket&) = delete;
@ -103,6 +114,10 @@ class Socket {
size_t cb,
SocketAddress* paddr,
int64_t* timestamp) = 0;
// Intended to replace RecvFrom(void* ...).
// Default implementation calls RecvFrom(void* ...) with 64Kbyte buffer.
// Returns number of bytes received or a negative value on error.
virtual int RecvFrom(ReceiveBuffer& buffer);
virtual int Listen(int backlog) = 0;
virtual Socket* Accept(SocketAddress* paddr) = 0;
virtual int Close() = 0;