Makes Thread::Send execute sent messages after pending posted messages.

Bug: webrtc:11255
Change-Id: I4b9036d22c9db3a5ec0e19fc5f2f5ac0d7e2289a
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/168058
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Reviewed-by: Ali Tofigh <alito@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#30667}
This commit is contained in:
Sebastian Jansson 2020-03-03 10:48:05 +01:00 committed by Commit Bot
parent 3a087a839a
commit da7267a10f
4 changed files with 96 additions and 107 deletions

View file

@ -762,6 +762,7 @@ rtc_library("rtc_base") {
"network:sent_packet", "network:sent_packet",
"system:file_wrapper", "system:file_wrapper",
"system:rtc_export", "system:rtc_export",
"task_utils:to_queued_task",
"third_party/base64", "third_party/base64",
"third_party/sigslot", "third_party/sigslot",
"//third_party/abseil-cpp/absl/algorithm:container", "//third_party/abseil-cpp/absl/algorithm:container",
@ -1333,6 +1334,7 @@ if (rtc_include_tests) {
"../api/task_queue", "../api/task_queue",
"../api/task_queue:task_queue_test", "../api/task_queue:task_queue_test",
"../test:fileutils", "../test:fileutils",
"../test:rtc_expect_death",
"../test:test_main", "../test:test_main",
"../test:test_support", "../test:test_support",
"memory:fifo_buffer", "memory:fifo_buffer",

View file

@ -34,6 +34,7 @@
#include "rtc_base/critical_section.h" #include "rtc_base/critical_section.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/null_socket_server.h" #include "rtc_base/null_socket_server.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/time_utils.h" #include "rtc_base/time_utils.h"
#include "rtc_base/trace_event.h" #include "rtc_base/trace_event.h"
@ -142,9 +143,44 @@ void ThreadManager::RemoveInternal(Thread* message_queue) {
if (iter != message_queues_.end()) { if (iter != message_queues_.end()) {
message_queues_.erase(iter); message_queues_.erase(iter);
} }
#if RTC_DCHECK_IS_ON
RemoveFromSendGraph(message_queue);
#endif
} }
} }
#if RTC_DCHECK_IS_ON
void ThreadManager::RemoveFromSendGraph(Thread* thread) {
for (auto it = send_graph_.begin(); it != send_graph_.end();) {
if (it->first == thread) {
it = send_graph_.erase(it);
} else {
it->second.erase(thread);
++it;
}
}
}
void ThreadManager::RegisterSendAndCheckForCycles(Thread* source,
Thread* target) {
CritScope cs(&crit_);
std::deque<Thread*> all_targets({target});
// We check the pre-existing who-sends-to-who graph for any path from target
// to source. This loop is guaranteed to terminate because per the send graph
// invariant, there are no cycles in the graph.
for (auto it = all_targets.begin(); it != all_targets.end(); ++it) {
const auto& targets = send_graph_[*it];
all_targets.insert(all_targets.end(), targets.begin(), targets.end());
}
RTC_CHECK_EQ(absl::c_count(all_targets, source), 0)
<< " send loop between " << source->name() << " and " << target->name();
// We may now insert source -> target without creating a cycle, since there
// was no path from target to source per the prior CHECK.
send_graph_[source].insert(target);
}
#endif
// static // static
void ThreadManager::Clear(MessageHandler* handler) { void ThreadManager::Clear(MessageHandler* handler) {
return Instance()->ClearInternal(handler); return Instance()->ClearInternal(handler);
@ -404,9 +440,6 @@ bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
int64_t msStart = TimeMillis(); int64_t msStart = TimeMillis();
int64_t msCurrent = msStart; int64_t msCurrent = msStart;
while (true) { while (true) {
// Check for sent messages
ReceiveSendsFromThread(nullptr);
// Check for posted events // Check for posted events
int64_t cmsDelayNext = kForever; int64_t cmsDelayNext = kForever;
bool first_pass = true; bool first_pass = true;
@ -836,7 +869,7 @@ void Thread::Send(const Location& posted_from,
msg.message_id = id; msg.message_id = id;
msg.pdata = pdata; msg.pdata = pdata;
if (IsCurrent()) { if (IsCurrent()) {
phandler->OnMessage(&msg); msg.phandler->OnMessage(&msg);
return; return;
} }
@ -845,27 +878,23 @@ void Thread::Send(const Location& posted_from,
AutoThread thread; AutoThread thread;
Thread* current_thread = Thread::Current(); Thread* current_thread = Thread::Current();
RTC_DCHECK(current_thread != nullptr); // AutoThread ensures this RTC_DCHECK(current_thread != nullptr); // AutoThread ensures this
#if RTC_DCHECK_IS_ON
ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread,
this);
#endif
bool ready = false; bool ready = false;
{ PostTask(
CritScope cs(&crit_); webrtc::ToQueuedTask([msg]() mutable { msg.phandler->OnMessage(&msg); },
_SendMessage smsg; [this, &ready, current_thread] {
smsg.thread = current_thread; CritScope cs(&crit_);
smsg.msg = msg; ready = true;
smsg.ready = &ready; current_thread->socketserver()->WakeUp();
sendlist_.push_back(smsg); }));
}
// Wait for a reply
WakeUpSocketServer();
bool waited = false; bool waited = false;
crit_.Enter(); crit_.Enter();
while (!ready) { while (!ready) {
crit_.Leave(); crit_.Leave();
// We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary
// thread invoking calls on the current thread.
current_thread->ReceiveSendsFromThread(this);
current_thread->socketserver()->Wait(kForever, false); current_thread->socketserver()->Wait(kForever, false);
waited = true; waited = true;
crit_.Enter(); crit_.Enter();
@ -888,38 +917,6 @@ void Thread::Send(const Location& posted_from,
} }
} }
void Thread::ReceiveSendsFromThread(const Thread* source) {
// Receive a sent message. Cleanup scenarios:
// - thread sending exits: We don't allow this, since thread can exit
// only via Join, so Send must complete.
// - thread receiving exits: Wakeup/set ready in Thread::Clear()
// - object target cleared: Wakeup/set ready in Thread::Clear()
_SendMessage smsg;
crit_.Enter();
while (PopSendMessageFromThread(source, &smsg)) {
crit_.Leave();
Dispatch(&smsg.msg);
crit_.Enter();
*smsg.ready = true;
smsg.thread->socketserver()->WakeUp();
}
crit_.Leave();
}
bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) {
for (auto it = sendlist_.begin(); it != sendlist_.end(); ++it) {
if (it->thread == source || source == nullptr) {
*msg = *it;
sendlist_.erase(it);
return true;
}
}
return false;
}
void Thread::InvokeInternal(const Location& posted_from, void Thread::InvokeInternal(const Location& posted_from,
rtc::FunctionView<void()> functor) { rtc::FunctionView<void()> functor) {
TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file", posted_from.file_name(), TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file", posted_from.file_name(),
@ -981,26 +978,6 @@ void Thread::Clear(MessageHandler* phandler,
uint32_t id, uint32_t id,
MessageList* removed) { MessageList* removed) {
CritScope cs(&crit_); CritScope cs(&crit_);
// Remove messages on sendlist_ with phandler
// Object target cleared: remove from send list, wakeup/set ready
// if sender not null.
for (auto iter = sendlist_.begin(); iter != sendlist_.end();) {
_SendMessage smsg = *iter;
if (smsg.msg.Match(phandler, id)) {
if (removed) {
removed->push_back(smsg.msg);
} else {
delete smsg.msg.pdata;
}
iter = sendlist_.erase(iter);
*smsg.ready = true;
smsg.thread->socketserver()->WakeUp();
continue;
}
++iter;
}
ClearInternal(phandler, id, removed); ClearInternal(phandler, id, removed);
} }

View file

@ -14,8 +14,10 @@
#include <stdint.h> #include <stdint.h>
#include <list> #include <list>
#include <map>
#include <memory> #include <memory>
#include <queue> #include <queue>
#include <set>
#include <string> #include <string>
#include <type_traits> #include <type_traits>
#include <vector> #include <vector>
@ -112,6 +114,13 @@ class RTC_EXPORT ThreadManager {
bool IsMainThread(); bool IsMainThread();
#if RTC_DCHECK_IS_ON
// Registers that a Send operation is to be performed between |source| and
// |target|, while checking that this does not cause a send cycle that could
// potentially cause a deadlock.
void RegisterSendAndCheckForCycles(Thread* source, Thread* target);
#endif
private: private:
ThreadManager(); ThreadManager();
~ThreadManager(); ~ThreadManager();
@ -121,6 +130,9 @@ class RTC_EXPORT ThreadManager {
void RemoveInternal(Thread* message_queue); void RemoveInternal(Thread* message_queue);
void ClearInternal(MessageHandler* handler); void ClearInternal(MessageHandler* handler);
void ProcessAllMessageQueuesInternal(); void ProcessAllMessageQueuesInternal();
#if RTC_DCHECK_IS_ON
void RemoveFromSendGraph(Thread* thread) RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
#endif
// This list contains all live Threads. // This list contains all live Threads.
std::vector<Thread*> message_queues_ RTC_GUARDED_BY(crit_); std::vector<Thread*> message_queues_ RTC_GUARDED_BY(crit_);
@ -130,6 +142,12 @@ class RTC_EXPORT ThreadManager {
// calls. // calls.
CriticalSection crit_; CriticalSection crit_;
size_t processing_ RTC_GUARDED_BY(crit_) = 0; size_t processing_ RTC_GUARDED_BY(crit_) = 0;
#if RTC_DCHECK_IS_ON
// Represents all thread seand actions by storing all send targets per thread.
// This is used by RegisterSendAndCheckForCycles. This graph has no cycles
// since we will trigger a CHECK failure if a cycle is introduced.
std::map<Thread*, std::set<Thread*>> send_graph_ RTC_GUARDED_BY(crit_);
#endif
#if defined(WEBRTC_POSIX) #if defined(WEBRTC_POSIX)
pthread_key_t key_; pthread_key_t key_;
@ -145,13 +163,6 @@ class RTC_EXPORT ThreadManager {
RTC_DISALLOW_COPY_AND_ASSIGN(ThreadManager); RTC_DISALLOW_COPY_AND_ASSIGN(ThreadManager);
}; };
struct _SendMessage {
_SendMessage() {}
Thread* thread;
Message msg;
bool* ready;
};
// WARNING! SUBCLASSES MUST CALL Stop() IN THEIR DESTRUCTORS! See ~Thread(). // WARNING! SUBCLASSES MUST CALL Stop() IN THEIR DESTRUCTORS! See ~Thread().
class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
@ -537,16 +548,6 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
// Return true if the thread is currently running. // Return true if the thread is currently running.
bool IsRunning(); bool IsRunning();
// Processes received "Send" requests. If |source| is not null, only requests
// from |source| are processed, otherwise, all requests are processed.
void ReceiveSendsFromThread(const Thread* source);
// If |source| is not null, pops the first "Send" message from |source| in
// |sendlist_|, otherwise, pops the first "Send" message of |sendlist_|.
// The caller must lock |crit_| before calling.
// Returns true if there is such a message.
bool PopSendMessageFromThread(const Thread* source, _SendMessage* msg);
void InvokeInternal(const Location& posted_from, void InvokeInternal(const Location& posted_from,
rtc::FunctionView<void()> functor); rtc::FunctionView<void()> functor);
@ -570,7 +571,6 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
// Used if SocketServer ownership lies with |this|. // Used if SocketServer ownership lies with |this|.
std::unique_ptr<SocketServer> own_ss_; std::unique_ptr<SocketServer> own_ss_;
std::list<_SendMessage> sendlist_;
std::string name_; std::string name_;
// TODO(tommi): Add thread checks for proper use of control methods. // TODO(tommi): Add thread checks for proper use of control methods.

View file

@ -24,6 +24,7 @@
#include "rtc_base/socket_address.h" #include "rtc_base/socket_address.h"
#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/third_party/sigslot/sigslot.h" #include "rtc_base/third_party/sigslot/sigslot.h"
#include "test/testsupport/rtc_expect_death.h"
#if defined(WEBRTC_WIN) #if defined(WEBRTC_WIN)
#include <comdef.h> // NOLINT #include <comdef.h> // NOLINT
@ -307,29 +308,38 @@ TEST(ThreadTest, Invoke) {
} }
// Verifies that two threads calling Invoke on each other at the same time does // Verifies that two threads calling Invoke on each other at the same time does
// not deadlock. // not deadlock but crash.
TEST(ThreadTest, TwoThreadsInvokeNoDeadlock) { #if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
TEST(ThreadTest, TwoThreadsInvokeDeathTest) {
::testing::GTEST_FLAG(death_test_style) = "threadsafe";
AutoThread thread; AutoThread thread;
Thread* current_thread = Thread::Current(); Thread* main_thread = Thread::Current();
ASSERT_TRUE(current_thread != nullptr);
auto other_thread = Thread::CreateWithSocketServer(); auto other_thread = Thread::CreateWithSocketServer();
other_thread->Start(); other_thread->Start();
other_thread->Invoke<void>(RTC_FROM_HERE, [main_thread] {
struct LocalFuncs { RTC_EXPECT_DEATH(main_thread->Invoke<void>(RTC_FROM_HERE, [] {}), "loop");
static void Set(bool* out) { *out = true; } });
static void InvokeSet(Thread* thread, bool* out) {
thread->Invoke<void>(RTC_FROM_HERE, Bind(&Set, out));
}
};
bool called = false;
other_thread->Invoke<void>(
RTC_FROM_HERE, Bind(&LocalFuncs::InvokeSet, current_thread, &called));
EXPECT_TRUE(called);
} }
TEST(ThreadTest, ThreeThreadsInvokeDeathTest) {
::testing::GTEST_FLAG(death_test_style) = "threadsafe";
AutoThread thread;
Thread* first = Thread::Current();
auto second = Thread::Create();
second->Start();
auto third = Thread::Create();
third->Start();
second->Invoke<void>(RTC_FROM_HERE, [&] {
third->Invoke<void>(RTC_FROM_HERE, [&] {
RTC_EXPECT_DEATH(first->Invoke<void>(RTC_FROM_HERE, [] {}), "loop");
});
});
}
#endif
// Verifies that if thread A invokes a call on thread B and thread C is trying // Verifies that if thread A invokes a call on thread B and thread C is trying
// to invoke A at the same time, thread A does not handle C's invoke while // to invoke A at the same time, thread A does not handle C's invoke while
// invoking B. // invoking B.