SocketServer: Migrate Wait/kForever to TimeDelta.

Bug: webrtc:13756
Change-Id: Ie36ca38b1ab336742231b101ef7bb5ccf3735659
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/272102
Commit-Queue: Markus Handell <handellm@webrtc.org>
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37903}
This commit is contained in:
Markus Handell 2022-08-25 11:40:13 +00:00 committed by WebRTC LUCI CQ
parent e7e3d5925a
commit 9a21c49337
20 changed files with 80 additions and 50 deletions

View file

@ -36,7 +36,8 @@ class CustomSocketServer : public rtc::PhysicalSocketServer {
void set_conductor(Conductor* conductor) { conductor_ = conductor; } void set_conductor(Conductor* conductor) { conductor_ = conductor; }
// Override so that we can also pump the GTK message loop. // Override so that we can also pump the GTK message loop.
bool Wait(int cms, bool process_io) override { // This function never waits.
bool Wait(webrtc::TimeDelta max_wait_duration, bool process_io) override {
// Pump GTK events. // Pump GTK events.
// TODO(henrike): We really should move either the socket server or UI to a // TODO(henrike): We really should move either the socket server or UI to a
// different thread. Alternatively we could look at merging the two loops // different thread. Alternatively we could look at merging the two loops
@ -49,7 +50,7 @@ class CustomSocketServer : public rtc::PhysicalSocketServer {
client_ != NULL && !client_->is_connected()) { client_ != NULL && !client_->is_connected()) {
message_queue_->Quit(); message_queue_->Quit();
} }
return rtc::PhysicalSocketServer::Wait(0 /*cms == -1 ? 1 : cms*/, return rtc::PhysicalSocketServer::Wait(webrtc::TimeDelta::Zero(),
process_io); process_io);
} }

View file

@ -912,7 +912,11 @@ rtc_library("null_socket_server") {
rtc_source_set("socket_server") { rtc_source_set("socket_server") {
sources = [ "socket_server.h" ] sources = [ "socket_server.h" ]
deps = [ ":socket_factory" ] deps = [
":rtc_event",
":socket_factory",
"../api/units:time_delta",
]
} }
rtc_library("threading") { rtc_library("threading") {
@ -1504,6 +1508,7 @@ if (rtc_include_tests) {
":testclient", ":testclient",
":threading", ":threading",
":timeutils", ":timeutils",
"../api/units:time_delta",
"../system_wrappers", "../system_wrappers",
"../test:fileutils", "../test:fileutils",
"../test:test_main", "../test:test_main",

View file

@ -210,8 +210,9 @@ void FirewallSocketServer::SetMessageQueue(Thread* queue) {
server_->SetMessageQueue(queue); server_->SetMessageQueue(queue);
} }
bool FirewallSocketServer::Wait(int cms, bool process_io) { bool FirewallSocketServer::Wait(webrtc::TimeDelta max_wait_duration,
return server_->Wait(cms, process_io); bool process_io) {
return server_->Wait(max_wait_duration, process_io);
} }
void FirewallSocketServer::WakeUp() { void FirewallSocketServer::WakeUp() {

View file

@ -79,7 +79,7 @@ class FirewallSocketServer : public SocketServer {
Socket* CreateSocket(int family, int type) override; Socket* CreateSocket(int family, int type) override;
void SetMessageQueue(Thread* queue) override; void SetMessageQueue(Thread* queue) override;
bool Wait(int cms, bool process_io) override; bool Wait(webrtc::TimeDelta max_wait_duration, bool process_io) override;
void WakeUp() override; void WakeUp() override;
Socket* WrapSocket(Socket* sock, int type); Socket* WrapSocket(Socket* sock, int type);

View file

@ -384,8 +384,9 @@ void NATSocketServer::SetMessageQueue(Thread* queue) {
server_->SetMessageQueue(queue); server_->SetMessageQueue(queue);
} }
bool NATSocketServer::Wait(int cms, bool process_io) { bool NATSocketServer::Wait(webrtc::TimeDelta max_wait_duration,
return server_->Wait(cms, process_io); bool process_io) {
return server_->Wait(max_wait_duration, process_io);
} }
void NATSocketServer::WakeUp() { void NATSocketServer::WakeUp() {

View file

@ -152,7 +152,7 @@ class NATSocketServer : public SocketServer, public NATInternalSocketFactory {
Socket* CreateSocket(int family, int type) override; Socket* CreateSocket(int family, int type) override;
void SetMessageQueue(Thread* queue) override; void SetMessageQueue(Thread* queue) override;
bool Wait(int cms, bool process_io) override; bool Wait(webrtc::TimeDelta max_wait_duration, bool process_io) override;
void WakeUp() override; void WakeUp() override;
// NATInternalSocketFactory implementation // NATInternalSocketFactory implementation

View file

@ -20,14 +20,12 @@ namespace rtc {
NullSocketServer::NullSocketServer() = default; NullSocketServer::NullSocketServer() = default;
NullSocketServer::~NullSocketServer() {} NullSocketServer::~NullSocketServer() {}
bool NullSocketServer::Wait(int cms, bool process_io) { bool NullSocketServer::Wait(webrtc::TimeDelta max_wait_duration,
bool process_io) {
// Wait with the given timeout. Do not log a warning if we end up waiting for // Wait with the given timeout. Do not log a warning if we end up waiting for
// a long time; that just means no one has any work for us, which is perfectly // a long time; that just means no one has any work for us, which is perfectly
// legitimate. // legitimate.
event_.Wait(/*give_up_after=*/cms == kForever event_.Wait(max_wait_duration, /*warn_after=*/Event::kForever);
? Event::kForever
: webrtc::TimeDelta::Millis(cms),
/*warn_after=*/Event::kForever);
return true; return true;
} }

View file

@ -23,7 +23,7 @@ class RTC_EXPORT NullSocketServer : public SocketServer {
NullSocketServer(); NullSocketServer();
~NullSocketServer() override; ~NullSocketServer() override;
bool Wait(int cms, bool process_io) override; bool Wait(webrtc::TimeDelta max_wait_duration, bool process_io) override;
void WakeUp() override; void WakeUp() override;
Socket* CreateSocket(int family, int type) override; Socket* CreateSocket(int family, int type) override;

View file

@ -14,6 +14,7 @@
#include <memory> #include <memory>
#include "api/units/time_delta.h"
#include "rtc_base/gunit.h" #include "rtc_base/gunit.h"
#include "rtc_base/location.h" #include "rtc_base/location.h"
#include "rtc_base/message_handler.h" #include "rtc_base/message_handler.h"
@ -44,7 +45,7 @@ TEST_F(NullSocketServerTest, WaitAndSet) {
TEST_F(NullSocketServerTest, TestWait) { TEST_F(NullSocketServerTest, TestWait) {
int64_t start = TimeMillis(); int64_t start = TimeMillis();
ss_.Wait(200, true); ss_.Wait(webrtc::TimeDelta::Millis(200), true);
// The actual wait time is dependent on the resolution of the timer used by // The actual wait time is dependent on the resolution of the timer used by
// the Event class. Allow for the event to signal ~20ms early. // the Event class. Allow for the event to signal ~20ms early.
EXPECT_GE(TimeSince(start), 180); EXPECT_GE(TimeSince(start), 180);

View file

@ -9,6 +9,8 @@
*/ */
#include "rtc_base/physical_socket_server.h" #include "rtc_base/physical_socket_server.h"
#include <cstdint>
#if defined(_MSC_VER) && _MSC_VER < 1300 #if defined(_MSC_VER) && _MSC_VER < 1300
#pragma warning(disable : 4786) #pragma warning(disable : 4786)
#endif #endif
@ -1164,12 +1166,20 @@ void PhysicalSocketServer::Update(Dispatcher* pdispatcher) {
#endif #endif
} }
int PhysicalSocketServer::ToCmsWait(webrtc::TimeDelta max_wait_duration) {
return max_wait_duration == Event::kForever
? kForeverMs
: max_wait_duration.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms();
}
#if defined(WEBRTC_POSIX) #if defined(WEBRTC_POSIX)
bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { bool PhysicalSocketServer::Wait(webrtc::TimeDelta max_wait_duration,
bool process_io) {
// We don't support reentrant waiting. // We don't support reentrant waiting.
RTC_DCHECK(!waiting_); RTC_DCHECK(!waiting_);
ScopedSetTrue s(&waiting_); ScopedSetTrue s(&waiting_);
const int cmsWait = ToCmsWait(max_wait_duration);
#if defined(WEBRTC_USE_EPOLL) #if defined(WEBRTC_USE_EPOLL)
// We don't keep a dedicated "epoll" descriptor containing only the non-IO // We don't keep a dedicated "epoll" descriptor containing only the non-IO
// (i.e. signaling) dispatcher, so "poll" will be used instead of the default // (i.e. signaling) dispatcher, so "poll" will be used instead of the default
@ -1256,7 +1266,7 @@ bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
struct timeval* ptvWait = nullptr; struct timeval* ptvWait = nullptr;
struct timeval tvWait; struct timeval tvWait;
int64_t stop_us; int64_t stop_us;
if (cmsWait != kForever) { if (cmsWait != kForeverMs) {
// Calculate wait timeval // Calculate wait timeval
tvWait.tv_sec = cmsWait / 1000; tvWait.tv_sec = cmsWait / 1000;
tvWait.tv_usec = (cmsWait % 1000) * 1000; tvWait.tv_usec = (cmsWait % 1000) * 1000;
@ -1266,7 +1276,6 @@ bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
stop_us = rtc::TimeMicros() + cmsWait * 1000; stop_us = rtc::TimeMicros() + cmsWait * 1000;
} }
fd_set fdsRead; fd_set fdsRead;
fd_set fdsWrite; fd_set fdsWrite;
// Explicitly unpoison these FDs on MemorySanitizer which doesn't handle the // Explicitly unpoison these FDs on MemorySanitizer which doesn't handle the
@ -1454,7 +1463,7 @@ bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
int64_t tvWait = -1; int64_t tvWait = -1;
int64_t tvStop = -1; int64_t tvStop = -1;
if (cmsWait != kForever) { if (cmsWait != kForeverMs) {
tvWait = cmsWait; tvWait = cmsWait;
tvStop = TimeAfter(cmsWait); tvStop = TimeAfter(cmsWait);
} }
@ -1499,7 +1508,7 @@ bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
} }
} }
if (cmsWait != kForever) { if (cmsWait != kForeverMs) {
tvWait = TimeDiff(tvStop, TimeMillis()); tvWait = TimeDiff(tvStop, TimeMillis());
if (tvWait <= 0) { if (tvWait <= 0) {
// Return success on timeout. // Return success on timeout.
@ -1515,7 +1524,7 @@ bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) {
RTC_DCHECK(dispatcher); RTC_DCHECK(dispatcher);
int64_t tvWait = -1; int64_t tvWait = -1;
int64_t tvStop = -1; int64_t tvStop = -1;
if (cmsWait != kForever) { if (cmsWait != kForeverMs) {
tvWait = cmsWait; tvWait = cmsWait;
tvStop = TimeAfter(cmsWait); tvStop = TimeAfter(cmsWait);
} }
@ -1566,7 +1575,7 @@ bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) {
ProcessEvents(dispatcher, readable, writable, error, error); ProcessEvents(dispatcher, readable, writable, error, error);
} }
if (cmsWait != kForever) { if (cmsWait != kForeverMs) {
tvWait = TimeDiff(tvStop, TimeMillis()); tvWait = TimeDiff(tvStop, TimeMillis());
if (tvWait < 0) { if (tvWait < 0) {
// Return success on timeout. // Return success on timeout.
@ -1583,11 +1592,13 @@ bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) {
#endif // WEBRTC_POSIX #endif // WEBRTC_POSIX
#if defined(WEBRTC_WIN) #if defined(WEBRTC_WIN)
bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { bool PhysicalSocketServer::Wait(webrtc::TimeDelta max_wait_duration,
bool process_io) {
// We don't support reentrant waiting. // We don't support reentrant waiting.
RTC_DCHECK(!waiting_); RTC_DCHECK(!waiting_);
ScopedSetTrue s(&waiting_); ScopedSetTrue s(&waiting_);
int cmsWait = ToCmsWait(max_wait_duration);
int64_t cmsTotal = cmsWait; int64_t cmsTotal = cmsWait;
int64_t cmsElapsed = 0; int64_t cmsElapsed = 0;
int64_t msStart = Time(); int64_t msStart = Time();
@ -1634,7 +1645,7 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
// Which is shorter, the delay wait or the asked wait? // Which is shorter, the delay wait or the asked wait?
int64_t cmsNext; int64_t cmsNext;
if (cmsWait == kForever) { if (cmsWait == kForeverMs) {
cmsNext = cmsWait; cmsNext = cmsWait;
} else { } else {
cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
@ -1750,7 +1761,7 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
if (!fWait_) if (!fWait_)
break; break;
cmsElapsed = TimeSince(msStart); cmsElapsed = TimeSince(msStart);
if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) { if ((cmsWait != kForeverMs) && (cmsElapsed >= cmsWait)) {
break; break;
} }
} }

View file

@ -11,6 +11,7 @@
#ifndef RTC_BASE_PHYSICAL_SOCKET_SERVER_H_ #ifndef RTC_BASE_PHYSICAL_SOCKET_SERVER_H_
#define RTC_BASE_PHYSICAL_SOCKET_SERVER_H_ #define RTC_BASE_PHYSICAL_SOCKET_SERVER_H_
#include "api/units/time_delta.h"
#if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX) #if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX)
#include <sys/epoll.h> #include <sys/epoll.h>
#define WEBRTC_USE_EPOLL 1 #define WEBRTC_USE_EPOLL 1
@ -74,7 +75,7 @@ class RTC_EXPORT PhysicalSocketServer : public SocketServer {
virtual Socket* WrapSocket(SOCKET s); virtual Socket* WrapSocket(SOCKET s);
// SocketServer: // SocketServer:
bool Wait(int cms, bool process_io) override; bool Wait(webrtc::TimeDelta max_wait_duration, bool process_io) override;
void WakeUp() override; void WakeUp() override;
void Add(Dispatcher* dispatcher); void Add(Dispatcher* dispatcher);
@ -84,16 +85,19 @@ class RTC_EXPORT PhysicalSocketServer : public SocketServer {
private: private:
// The number of events to process with one call to "epoll_wait". // The number of events to process with one call to "epoll_wait".
static constexpr size_t kNumEpollEvents = 128; static constexpr size_t kNumEpollEvents = 128;
// A local historical definition of "foreverness", in milliseconds.
static constexpr int kForeverMs = -1;
static int ToCmsWait(webrtc::TimeDelta max_wait_duration);
#if defined(WEBRTC_POSIX) #if defined(WEBRTC_POSIX)
bool WaitSelect(int cms, bool process_io); bool WaitSelect(int cmsWait, bool process_io);
#endif // WEBRTC_POSIX #endif // WEBRTC_POSIX
#if defined(WEBRTC_USE_EPOLL) #if defined(WEBRTC_USE_EPOLL)
void AddEpoll(Dispatcher* dispatcher, uint64_t key); void AddEpoll(Dispatcher* dispatcher, uint64_t key);
void RemoveEpoll(Dispatcher* dispatcher); void RemoveEpoll(Dispatcher* dispatcher);
void UpdateEpoll(Dispatcher* dispatcher, uint64_t key); void UpdateEpoll(Dispatcher* dispatcher, uint64_t key);
bool WaitEpoll(int cms); bool WaitEpoll(int cmsWait);
bool WaitPoll(int cms, Dispatcher* dispatcher); bool WaitPoll(int cmsWait, Dispatcher* dispatcher);
// This array is accessed in isolation by a thread calling into Wait(). // This array is accessed in isolation by a thread calling into Wait().
// It's useless to use a SequenceChecker to guard it because a socket // It's useless to use a SequenceChecker to guard it because a socket

View file

@ -13,6 +13,8 @@
#include <memory> #include <memory>
#include "api/units/time_delta.h"
#include "rtc_base/event.h"
#include "rtc_base/socket_factory.h" #include "rtc_base/socket_factory.h"
namespace rtc { namespace rtc {
@ -30,7 +32,7 @@ class NetworkBinderInterface;
// notified of asynchronous I/O from this server's Wait method. // notified of asynchronous I/O from this server's Wait method.
class SocketServer : public SocketFactory { class SocketServer : public SocketFactory {
public: public:
static const int kForever = -1; static constexpr webrtc::TimeDelta kForever = rtc::Event::kForever;
static std::unique_ptr<SocketServer> CreateDefault(); static std::unique_ptr<SocketServer> CreateDefault();
// When the socket server is installed into a Thread, this function is called // When the socket server is installed into a Thread, this function is called
@ -40,10 +42,11 @@ class SocketServer : public SocketFactory {
virtual void SetMessageQueue(Thread* queue) {} virtual void SetMessageQueue(Thread* queue) {}
// Sleeps until: // Sleeps until:
// 1) cms milliseconds have elapsed (unless cms == kForever) // 1) `max_wait_duration` has elapsed (unless `max_wait_duration` ==
// `kForever`)
// 2) WakeUp() is called // 2) WakeUp() is called
// While sleeping, I/O is performed if process_io is true. // While sleeping, I/O is performed if process_io is true.
virtual bool Wait(int cms, bool process_io) = 0; virtual bool Wait(webrtc::TimeDelta max_wait_duration, bool process_io) = 0;
// Causes the current wait (if one is in progress) to wake up. // Causes the current wait (if one is in progress) to wake up.
virtual void WakeUp() = 0; virtual void WakeUp() = 0;

View file

@ -11,6 +11,8 @@
#include "rtc_base/thread.h" #include "rtc_base/thread.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "api/units/time_delta.h"
#include "rtc_base/socket_server.h"
#if defined(WEBRTC_WIN) #if defined(WEBRTC_WIN)
#include <comdef.h> #include <comdef.h>
@ -492,7 +494,9 @@ bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
{ {
// Wait and multiplex in the meantime // Wait and multiplex in the meantime
if (!ss_->Wait(static_cast<int>(cmsNext), process_io)) if (!ss_->Wait(cmsNext == kForever ? SocketServer::kForever
: webrtc::TimeDelta::Millis(cmsNext),
process_io))
return false; return false;
} }
@ -912,7 +916,7 @@ void Thread::Send(const Location& posted_from,
crit_.Enter(); crit_.Enter();
while (!ready) { while (!ready) {
crit_.Leave(); crit_.Leave();
current_thread->socketserver()->Wait(kForever, false); current_thread->socketserver()->Wait(SocketServer::kForever, false);
waited = true; waited = true;
crit_.Enter(); crit_.Enter();
} }

View file

@ -613,7 +613,8 @@ void VirtualSocketServer::SetMessageQueue(Thread* msg_queue) {
msg_queue_ = msg_queue; msg_queue_ = msg_queue;
} }
bool VirtualSocketServer::Wait(int cmsWait, bool process_io) { bool VirtualSocketServer::Wait(webrtc::TimeDelta max_wait_duration,
bool process_io) {
RTC_DCHECK_RUN_ON(msg_queue_); RTC_DCHECK_RUN_ON(msg_queue_);
if (stop_on_idle_ && Thread::Current()->empty()) { if (stop_on_idle_ && Thread::Current()->empty()) {
return false; return false;
@ -622,8 +623,7 @@ bool VirtualSocketServer::Wait(int cmsWait, bool process_io) {
// any real I/O. Received packets come in the form of queued messages, so // any real I/O. Received packets come in the form of queued messages, so
// Thread will ensure WakeUp is called if another thread sends a // Thread will ensure WakeUp is called if another thread sends a
// packet. // packet.
wakeup_.Wait(cmsWait == kForever ? Event::kForever wakeup_.Wait(max_wait_duration);
: webrtc::TimeDelta::Millis(cmsWait));
return true; return true;
} }

View file

@ -223,7 +223,7 @@ class VirtualSocketServer : public SocketServer {
// SocketServer: // SocketServer:
void SetMessageQueue(Thread* queue) override; void SetMessageQueue(Thread* queue) override;
bool Wait(int cms, bool process_io) override; bool Wait(webrtc::TimeDelta max_wait_duration, bool process_io) override;
void WakeUp() override; void WakeUp() override;
void SetDelayOnAddress(const rtc::SocketAddress& address, int delay_ms) { void SetDelayOnAddress(const rtc::SocketAddress& address, int delay_ms) {

View file

@ -304,12 +304,12 @@ void FakeNetworkSocketServer::SetMessageQueue(rtc::Thread* thread) {
} }
// Always returns true (if return false, it won't be invoked again...) // Always returns true (if return false, it won't be invoked again...)
bool FakeNetworkSocketServer::Wait(int cms, bool process_io) { bool FakeNetworkSocketServer::Wait(webrtc::TimeDelta max_wait_duration,
bool process_io) {
RTC_DCHECK(thread_ == rtc::Thread::Current()); RTC_DCHECK(thread_ == rtc::Thread::Current());
if (cms != 0) { if (!max_wait_duration.IsZero())
wakeup_.Wait(cms == kForever ? rtc::Event::kForever wakeup_.Wait(max_wait_duration);
: TimeDelta::Millis(cms));
}
return true; return true;
} }

View file

@ -40,7 +40,7 @@ class FakeNetworkSocketServer : public rtc::SocketServer {
// Called by the network thread when this server is installed, kicking off the // Called by the network thread when this server is installed, kicking off the
// message handler loop. // message handler loop.
void SetMessageQueue(rtc::Thread* thread) override; void SetMessageQueue(rtc::Thread* thread) override;
bool Wait(int cms, bool process_io) override; bool Wait(webrtc::TimeDelta max_wait_duration, bool process_io) override;
void WakeUp() override; void WakeUp() override;
protected: protected:

View file

@ -51,7 +51,8 @@ void RunLoop::FakeSocketServer::FailNextWait() {
fail_next_wait_ = true; fail_next_wait_ = true;
} }
bool RunLoop::FakeSocketServer::Wait(int cms, bool process_io) { bool RunLoop::FakeSocketServer::Wait(webrtc::TimeDelta max_wait_duration,
bool process_io) {
if (fail_next_wait_) { if (fail_next_wait_) {
fail_next_wait_ = false; fail_next_wait_ = false;
return false; return false;

View file

@ -47,7 +47,7 @@ class RunLoop {
void FailNextWait(); void FailNextWait();
private: private:
bool Wait(int cms, bool process_io) override; bool Wait(webrtc::TimeDelta max_wait_duration, bool process_io) override;
void WakeUp() override; void WakeUp() override;
rtc::Socket* CreateSocket(int family, int type) override; rtc::Socket* CreateSocket(int family, int type) override;

View file

@ -24,8 +24,8 @@ class DummySocketServer : public rtc::SocketServer {
RTC_DCHECK_NOTREACHED(); RTC_DCHECK_NOTREACHED();
return nullptr; return nullptr;
} }
bool Wait(int cms, bool process_io) override { bool Wait(TimeDelta max_wait_duration, bool process_io) override {
RTC_CHECK_EQ(cms, 0); RTC_CHECK(max_wait_duration.IsZero());
return true; return true;
} }
void WakeUp() override {} void WakeUp() override {}