in rtc::Thread introduce Invoke without rtc::Location parameter

To reduce usage of rtc::MessageHandler, hide rtc::Thread::Send into private section with intention to deprecate it in favor of the new Invoke function.

Bug: webrtc:9702, webrtc:11318
Change-Id: Ib4c26f9abc361e05a45b2a91929af58ab160b3f0
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/274166
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#38036}
This commit is contained in:
Danil Chapovalov 2022-09-08 13:13:53 +02:00 committed by WebRTC LUCI CQ
parent e0dd6cf363
commit 7c323ad47c
5 changed files with 62 additions and 155 deletions

View file

@ -941,10 +941,8 @@ void Thread::Send(const Location& posted_from,
}
}
void Thread::InvokeInternal(const Location& posted_from,
rtc::FunctionView<void()> functor) {
TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file", posted_from.file_name(),
"src_func", posted_from.function_name());
void Thread::BlockingCall(rtc::FunctionView<void()> functor) {
TRACE_EVENT0("webrtc", "Thread::BlockingCall");
class FunctorMessageHandler : public MessageHandler {
public:
@ -956,7 +954,7 @@ void Thread::InvokeInternal(const Location& posted_from,
rtc::FunctionView<void()> functor_;
} handler(functor);
Send(posted_from, &handler);
Send(/*posted_from=*/{}, &handler, /*id=*/0, /*pdata=*/nullptr);
}
// Called by the ThreadManager when being set as the current thread.

View file

@ -20,6 +20,7 @@
#include <set>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
#include "absl/strings/string_view.h"
@ -47,8 +48,8 @@
#endif
#if RTC_DCHECK_IS_ON
// Counts how many blocking Thread::Invoke or Thread::Send calls are made from
// within a scope and logs the number of blocking calls at the end of the scope.
// Counts how many `Thread::BlockingCall` are made from within a scope and logs
// the number of blocking calls at the end of the scope.
#define RTC_LOG_THREAD_BLOCK_COUNT() \
rtc::Thread::ScopedCountBlockingCalls blocked_call_count_printer( \
[func = __func__](uint32_t actual_block, uint32_t could_block) { \
@ -202,8 +203,8 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
static std::unique_ptr<Thread> Create();
static Thread* Current();
// Used to catch performance regressions. Use this to disallow blocking calls
// (Invoke) for a given scope. If a synchronous call is made while this is in
// Used to catch performance regressions. Use this to disallow BlockingCall
// for a given scope. If a synchronous call is made while this is in
// effect, an assert will be triggered.
// Note that this is a single threaded class.
class ScopedDisallowBlockingCalls {
@ -310,7 +311,7 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
bool SetName(absl::string_view name, const void* obj);
// Sets the expected processing time in ms. The thread will write
// log messages when Invoke() takes more time than this.
// log messages when Dispatch() takes more time than this.
// Default is 50 ms.
void SetDispatchWarningMs(int deadline);
@ -328,41 +329,34 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
// ProcessMessages occasionally.
virtual void Run();
virtual void Send(const Location& posted_from,
MessageHandler* phandler,
uint32_t id = 0,
MessageData* pdata = nullptr);
// Convenience method to invoke a functor on another thread. Caller must
// provide the `ReturnT` template argument, which cannot (easily) be deduced.
// Uses Send() internally, which blocks the current thread until execution
// is complete.
// Ex: bool result = thread.Invoke<bool>(RTC_FROM_HERE,
// &MyFunctionReturningBool);
// Convenience method to invoke a functor on another thread.
// Blocks the current thread until execution is complete.
// Ex: thread.BlockingCall([&] { result = MyFunctionReturningBool(); });
// NOTE: This function can only be called when synchronous calls are allowed.
// See ScopedDisallowBlockingCalls for details.
// NOTE: Blocking invokes are DISCOURAGED, consider if what you're doing can
// NOTE: Blocking calls are DISCOURAGED, consider if what you're doing can
// be achieved with PostTask() and callbacks instead.
template <
class ReturnT,
typename = typename std::enable_if<!std::is_void<ReturnT>::value>::type>
ReturnT Invoke(const Location& posted_from, FunctionView<ReturnT()> functor) {
virtual void BlockingCall(FunctionView<void()> functor);
template <typename Functor,
typename ReturnT = std::invoke_result_t<Functor>,
typename = typename std::enable_if_t<!std::is_void_v<ReturnT>>>
ReturnT BlockingCall(Functor&& functor) {
ReturnT result;
InvokeInternal(posted_from, [functor, &result] { result = functor(); });
BlockingCall([&] { result = std::forward<Functor>(functor)(); });
return result;
}
template <
class ReturnT,
typename = typename std::enable_if<std::is_void<ReturnT>::value>::type>
void Invoke(const Location& posted_from, FunctionView<void()> functor) {
InvokeInternal(posted_from, functor);
// Deprecated, use `BlockingCall` instead.
template <typename ReturnT>
ReturnT Invoke(const Location& posted_from, FunctionView<ReturnT()> functor) {
return BlockingCall(functor);
}
// Allows invoke to specified `thread`. Thread never will be dereferenced and
// will be used only for reference-based comparison, so instance can be safely
// deleted. If NDEBUG is defined and RTC_DCHECK_IS_ON is undefined do
// nothing.
// Allows BlockingCall to specified `thread`. Thread never will be
// dereferenced and will be used only for reference-based comparison, so
// instance can be safely deleted. If NDEBUG is defined and RTC_DCHECK_IS_ON
// is undefined do nothing.
void AllowInvokesToThread(Thread* thread);
// If NDEBUG is defined and RTC_DCHECK_IS_ON is undefined do nothing.
@ -503,12 +497,13 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
private:
static const int kSlowDispatchLoggingThreshold = 50; // 50 ms
// TODO(bugs.webrtc.org/9702): Delete when chromium stops overriding it.
// chromium's ThreadWrapper overrides it just to check it is never called.
virtual bool Peek(Message* pmsg, int cms_wait) {
RTC_DCHECK_NOTREACHED();
return false;
}
// TODO(bugs.webrtc.org/9702): Delete and move Send's implementation into
// `BlockingCall` when derived classes override `BlockingCall` instead.
virtual void Send(const Location& posted_from,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata);
// Get() will process I/O until:
// 1) A message is available (returns true)
// 2) cmsWait seconds have elapsed (returns false)
@ -539,9 +534,6 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
// Return true if the thread is currently running.
bool IsRunning();
void InvokeInternal(const Location& posted_from,
rtc::FunctionView<void()> functor);
// Called by the ThreadManager when being set as the current thread.
void EnsureIsCurrentTaskQueue();

View file

@ -143,77 +143,6 @@ class SignalWhenDestroyedThread : public Thread {
Event* event_;
};
// A bool wrapped in a mutex, to avoid data races. Using a volatile
// bool should be sufficient for correct code ("eventual consistency"
// between caches is sufficient), but we can't tell the compiler about
// that, and then tsan complains about a data race.
// See also discussion at
// http://stackoverflow.com/questions/7223164/is-mutex-needed-to-synchronize-a-simple-flag-between-pthreads
// Using std::atomic<bool> or std::atomic_flag in C++11 is probably
// the right thing to do, but those features are not yet allowed. Or
// rtc::AtomicInt, if/when that is added. Since the use isn't
// performance critical, use a plain critical section for the time
// being.
class AtomicBool {
public:
explicit AtomicBool(bool value = false) : flag_(value) {}
AtomicBool& operator=(bool value) {
webrtc::MutexLock scoped_lock(&mutex_);
flag_ = value;
return *this;
}
bool get() const {
webrtc::MutexLock scoped_lock(&mutex_);
return flag_;
}
private:
mutable webrtc::Mutex mutex_;
bool flag_;
};
// Function objects to test Thread::Invoke.
struct FunctorA {
int operator()() { return 42; }
};
class FunctorB {
public:
explicit FunctorB(AtomicBool* flag) : flag_(flag) {}
void operator()() {
if (flag_)
*flag_ = true;
}
private:
AtomicBool* flag_;
};
struct FunctorC {
int operator()() {
Thread::Current()->ProcessMessages(50);
return 24;
}
};
struct FunctorD {
public:
explicit FunctorD(AtomicBool* flag) : flag_(flag) {}
FunctorD(FunctorD&&) = default;
FunctorD(const FunctorD&) = delete;
FunctorD& operator=(const FunctorD&) = delete;
FunctorD& operator=(FunctorD&&) = default;
void operator()() {
if (flag_)
*flag_ = true;
}
private:
AtomicBool* flag_;
};
// See: https://code.google.com/p/webrtc/issues/detail?id=2409
TEST(ThreadTest, DISABLED_Main) {
rtc::AutoThread main_thread;
@ -276,9 +205,9 @@ TEST(ThreadTest, CountBlockingCalls) {
// Test invoking on the current thread. This should not count as an 'actual'
// invoke, but should still count as an invoke that could block since we
// that the call to Invoke serves a purpose in some configurations (and should
// not be used a general way to call methods on the same thread).
current.Invoke<void>(RTC_FROM_HERE, []() {});
// that the call to `BlockingCall` serves a purpose in some configurations
// (and should not be used a general way to call methods on the same thread).
current.BlockingCall([]() {});
EXPECT_EQ(0u, blocked_calls.GetBlockingCallCount());
EXPECT_EQ(1u, blocked_calls.GetCouldBeBlockingCallCount());
EXPECT_EQ(1u, blocked_calls.GetTotalBlockedCallCount());
@ -286,7 +215,7 @@ TEST(ThreadTest, CountBlockingCalls) {
// Create a new thread to invoke on.
auto thread = Thread::CreateWithSocketServer();
thread->Start();
EXPECT_EQ(42, thread->Invoke<int>(RTC_FROM_HERE, []() { return 42; }));
EXPECT_EQ(42, thread->BlockingCall([]() { return 42; }));
EXPECT_EQ(1u, blocked_calls.GetBlockingCallCount());
EXPECT_EQ(1u, blocked_calls.GetCouldBeBlockingCallCount());
EXPECT_EQ(2u, blocked_calls.GetTotalBlockedCallCount());
@ -307,7 +236,7 @@ TEST(ThreadTest, CountBlockingCallsOneCallback) {
[&](uint32_t actual_block, uint32_t could_block) {
was_called_back = true;
});
current.Invoke<void>(RTC_FROM_HERE, []() {});
current.BlockingCall([]() {});
}
EXPECT_TRUE(was_called_back);
}
@ -323,7 +252,7 @@ TEST(ThreadTest, CountBlockingCallsSkipCallback) {
// Changed `blocked_calls` to not issue the callback if there are 1 or
// fewer blocking calls (i.e. we set the minimum required number to 2).
blocked_calls.set_minimum_call_count_for_callback(2);
current.Invoke<void>(RTC_FROM_HERE, []() {});
current.BlockingCall([]() {});
}
// We should not have gotten a call back.
EXPECT_FALSE(was_called_back);
@ -421,23 +350,23 @@ TEST(ThreadTest, InvokesAllowedByDefault) {
main_thread.ProcessMessages(100);
}
TEST(ThreadTest, Invoke) {
TEST(ThreadTest, BlockingCall) {
// Create and start the thread.
auto thread = Thread::CreateWithSocketServer();
thread->Start();
// Try calling functors.
EXPECT_EQ(42, thread->Invoke<int>(RTC_FROM_HERE, FunctorA()));
AtomicBool called;
FunctorB f2(&called);
thread->Invoke<void>(RTC_FROM_HERE, f2);
EXPECT_TRUE(called.get());
EXPECT_EQ(42, thread->BlockingCall([] { return 42; }));
bool called = false;
thread->BlockingCall([&] { called = true; });
EXPECT_TRUE(called);
// Try calling bare functions.
struct LocalFuncs {
static int Func1() { return 999; }
static void Func2() {}
};
EXPECT_EQ(999, thread->Invoke<int>(RTC_FROM_HERE, &LocalFuncs::Func1));
thread->Invoke<void>(RTC_FROM_HERE, &LocalFuncs::Func2);
EXPECT_EQ(999, thread->BlockingCall(&LocalFuncs::Func1));
thread->BlockingCall(&LocalFuncs::Func2);
}
// Verifies that two threads calling Invoke on each other at the same time does
@ -449,8 +378,8 @@ TEST(ThreadTest, TwoThreadsInvokeDeathTest) {
Thread* main_thread = Thread::Current();
auto other_thread = Thread::CreateWithSocketServer();
other_thread->Start();
other_thread->Invoke<void>(RTC_FROM_HERE, [main_thread] {
RTC_EXPECT_DEATH(main_thread->Invoke<void>(RTC_FROM_HERE, [] {}), "loop");
other_thread->BlockingCall([main_thread] {
RTC_EXPECT_DEATH(main_thread->BlockingCall([] {}), "loop");
});
}
@ -464,10 +393,9 @@ TEST(ThreadTest, ThreeThreadsInvokeDeathTest) {
auto third = Thread::Create();
third->Start();
second->Invoke<void>(RTC_FROM_HERE, [&] {
third->Invoke<void>(RTC_FROM_HERE, [&] {
RTC_EXPECT_DEATH(first->Invoke<void>(RTC_FROM_HERE, [] {}), "loop");
});
second->BlockingCall([&] {
third->BlockingCall(
[&] { RTC_EXPECT_DEATH(first->BlockingCall([] {}), "loop"); });
});
}
@ -476,7 +404,7 @@ TEST(ThreadTest, ThreeThreadsInvokeDeathTest) {
// Verifies that if thread A invokes a call on thread B and thread C is trying
// to invoke A at the same time, thread A does not handle C's invoke while
// invoking B.
TEST(ThreadTest, ThreeThreadsInvoke) {
TEST(ThreadTest, ThreeThreadsBlockingCall) {
AutoThread thread;
Thread* thread_a = Thread::Current();
auto thread_b = Thread::CreateWithSocketServer();
@ -506,7 +434,7 @@ TEST(ThreadTest, ThreeThreadsInvoke) {
struct LocalFuncs {
static void Set(LockedBool* out) { out->Set(true); }
static void InvokeSet(Thread* thread, LockedBool* out) {
thread->Invoke<void>(RTC_FROM_HERE, [out] { Set(out); });
thread->BlockingCall([out] { Set(out); });
}
// Set `out` true and call InvokeSet on `thread`.
@ -538,8 +466,7 @@ TEST(ThreadTest, ThreeThreadsInvoke) {
// Thread B returns when C receives the call and C should be blocked until A
// starts to process messages.
Thread* thread_c_ptr = thread_c.get();
thread_b->Invoke<void>(RTC_FROM_HERE, [thread_c_ptr, thread_a,
&thread_a_called] {
thread_b->BlockingCall([thread_c_ptr, thread_a, &thread_a_called] {
LocalFuncs::AsyncInvokeSetAndWait(thread_c_ptr, thread_a, &thread_a_called);
});
EXPECT_FALSE(thread_a_called.Get());

View file

@ -61,25 +61,18 @@ void SimulatedThread::RunReady(Timestamp at_time) {
}
}
void SimulatedThread::Send(const rtc::Location& posted_from,
rtc::MessageHandler* phandler,
uint32_t id,
rtc::MessageData* pdata) {
void SimulatedThread::BlockingCall(rtc::FunctionView<void()> functor) {
if (IsQuitting())
return;
rtc::Message msg;
msg.posted_from = posted_from;
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
if (IsCurrent()) {
msg.phandler->OnMessage(&msg);
functor();
} else {
TaskQueueBase* yielding_from = TaskQueueBase::Current();
handler_->StartYield(yielding_from);
RunReady(Timestamp::MinusInfinity());
CurrentThreadSetter set_current(this);
msg.phandler->OnMessage(&msg);
functor();
handler_->StopYield(yielding_from);
}
}

View file

@ -36,10 +36,7 @@ class SimulatedThread : public rtc::Thread,
TaskQueueBase* GetAsTaskQueue() override { return this; }
// Thread interface
void Send(const rtc::Location& posted_from,
rtc::MessageHandler* phandler,
uint32_t id,
rtc::MessageData* pdata) override;
void BlockingCall(rtc::FunctionView<void()> functor) override;
void Post(const rtc::Location& posted_from,
rtc::MessageHandler* phandler,
uint32_t id,