diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index d23cf13c47..d8eb6b5013 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -762,6 +762,7 @@ rtc_library("rtc_base") { "network:sent_packet", "system:file_wrapper", "system:rtc_export", + "task_utils:to_queued_task", "third_party/base64", "third_party/sigslot", "//third_party/abseil-cpp/absl/algorithm:container", @@ -1333,6 +1334,7 @@ if (rtc_include_tests) { "../api/task_queue", "../api/task_queue:task_queue_test", "../test:fileutils", + "../test:rtc_expect_death", "../test:test_main", "../test:test_support", "memory:fifo_buffer", diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc index 00a582cc06..0fb2e813e0 100644 --- a/rtc_base/thread.cc +++ b/rtc_base/thread.cc @@ -34,6 +34,7 @@ #include "rtc_base/critical_section.h" #include "rtc_base/logging.h" #include "rtc_base/null_socket_server.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/time_utils.h" #include "rtc_base/trace_event.h" @@ -142,9 +143,44 @@ void ThreadManager::RemoveInternal(Thread* message_queue) { if (iter != message_queues_.end()) { message_queues_.erase(iter); } +#if RTC_DCHECK_IS_ON + RemoveFromSendGraph(message_queue); +#endif } } +#if RTC_DCHECK_IS_ON +void ThreadManager::RemoveFromSendGraph(Thread* thread) { + for (auto it = send_graph_.begin(); it != send_graph_.end();) { + if (it->first == thread) { + it = send_graph_.erase(it); + } else { + it->second.erase(thread); + ++it; + } + } +} + +void ThreadManager::RegisterSendAndCheckForCycles(Thread* source, + Thread* target) { + CritScope cs(&crit_); + std::deque all_targets({target}); + // We check the pre-existing who-sends-to-who graph for any path from target + // to source. This loop is guaranteed to terminate because per the send graph + // invariant, there are no cycles in the graph. + for (auto it = all_targets.begin(); it != all_targets.end(); ++it) { + const auto& targets = send_graph_[*it]; + all_targets.insert(all_targets.end(), targets.begin(), targets.end()); + } + RTC_CHECK_EQ(absl::c_count(all_targets, source), 0) + << " send loop between " << source->name() << " and " << target->name(); + + // We may now insert source -> target without creating a cycle, since there + // was no path from target to source per the prior CHECK. + send_graph_[source].insert(target); +} +#endif + // static void ThreadManager::Clear(MessageHandler* handler) { return Instance()->ClearInternal(handler); @@ -404,9 +440,6 @@ bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) { int64_t msStart = TimeMillis(); int64_t msCurrent = msStart; while (true) { - // Check for sent messages - ReceiveSendsFromThread(nullptr); - // Check for posted events int64_t cmsDelayNext = kForever; bool first_pass = true; @@ -836,7 +869,7 @@ void Thread::Send(const Location& posted_from, msg.message_id = id; msg.pdata = pdata; if (IsCurrent()) { - phandler->OnMessage(&msg); + msg.phandler->OnMessage(&msg); return; } @@ -845,27 +878,23 @@ void Thread::Send(const Location& posted_from, AutoThread thread; Thread* current_thread = Thread::Current(); RTC_DCHECK(current_thread != nullptr); // AutoThread ensures this - +#if RTC_DCHECK_IS_ON + ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread, + this); +#endif bool ready = false; - { - CritScope cs(&crit_); - _SendMessage smsg; - smsg.thread = current_thread; - smsg.msg = msg; - smsg.ready = &ready; - sendlist_.push_back(smsg); - } - - // Wait for a reply - WakeUpSocketServer(); + PostTask( + webrtc::ToQueuedTask([msg]() mutable { msg.phandler->OnMessage(&msg); }, + [this, &ready, current_thread] { + CritScope cs(&crit_); + ready = true; + current_thread->socketserver()->WakeUp(); + })); bool waited = false; crit_.Enter(); while (!ready) { crit_.Leave(); - // We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary - // thread invoking calls on the current thread. - current_thread->ReceiveSendsFromThread(this); current_thread->socketserver()->Wait(kForever, false); waited = true; crit_.Enter(); @@ -888,38 +917,6 @@ void Thread::Send(const Location& posted_from, } } -void Thread::ReceiveSendsFromThread(const Thread* source) { - // Receive a sent message. Cleanup scenarios: - // - thread sending exits: We don't allow this, since thread can exit - // only via Join, so Send must complete. - // - thread receiving exits: Wakeup/set ready in Thread::Clear() - // - object target cleared: Wakeup/set ready in Thread::Clear() - _SendMessage smsg; - - crit_.Enter(); - while (PopSendMessageFromThread(source, &smsg)) { - crit_.Leave(); - - Dispatch(&smsg.msg); - - crit_.Enter(); - *smsg.ready = true; - smsg.thread->socketserver()->WakeUp(); - } - crit_.Leave(); -} - -bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) { - for (auto it = sendlist_.begin(); it != sendlist_.end(); ++it) { - if (it->thread == source || source == nullptr) { - *msg = *it; - sendlist_.erase(it); - return true; - } - } - return false; -} - void Thread::InvokeInternal(const Location& posted_from, rtc::FunctionView functor) { TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file", posted_from.file_name(), @@ -981,26 +978,6 @@ void Thread::Clear(MessageHandler* phandler, uint32_t id, MessageList* removed) { CritScope cs(&crit_); - - // Remove messages on sendlist_ with phandler - // Object target cleared: remove from send list, wakeup/set ready - // if sender not null. - for (auto iter = sendlist_.begin(); iter != sendlist_.end();) { - _SendMessage smsg = *iter; - if (smsg.msg.Match(phandler, id)) { - if (removed) { - removed->push_back(smsg.msg); - } else { - delete smsg.msg.pdata; - } - iter = sendlist_.erase(iter); - *smsg.ready = true; - smsg.thread->socketserver()->WakeUp(); - continue; - } - ++iter; - } - ClearInternal(phandler, id, removed); } diff --git a/rtc_base/thread.h b/rtc_base/thread.h index d08c3bd09c..74aab623c8 100644 --- a/rtc_base/thread.h +++ b/rtc_base/thread.h @@ -14,8 +14,10 @@ #include #include +#include #include #include +#include #include #include #include @@ -112,6 +114,13 @@ class RTC_EXPORT ThreadManager { bool IsMainThread(); +#if RTC_DCHECK_IS_ON + // Registers that a Send operation is to be performed between |source| and + // |target|, while checking that this does not cause a send cycle that could + // potentially cause a deadlock. + void RegisterSendAndCheckForCycles(Thread* source, Thread* target); +#endif + private: ThreadManager(); ~ThreadManager(); @@ -121,6 +130,9 @@ class RTC_EXPORT ThreadManager { void RemoveInternal(Thread* message_queue); void ClearInternal(MessageHandler* handler); void ProcessAllMessageQueuesInternal(); +#if RTC_DCHECK_IS_ON + void RemoveFromSendGraph(Thread* thread) RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); +#endif // This list contains all live Threads. std::vector message_queues_ RTC_GUARDED_BY(crit_); @@ -130,6 +142,12 @@ class RTC_EXPORT ThreadManager { // calls. CriticalSection crit_; size_t processing_ RTC_GUARDED_BY(crit_) = 0; +#if RTC_DCHECK_IS_ON + // Represents all thread seand actions by storing all send targets per thread. + // This is used by RegisterSendAndCheckForCycles. This graph has no cycles + // since we will trigger a CHECK failure if a cycle is introduced. + std::map> send_graph_ RTC_GUARDED_BY(crit_); +#endif #if defined(WEBRTC_POSIX) pthread_key_t key_; @@ -145,13 +163,6 @@ class RTC_EXPORT ThreadManager { RTC_DISALLOW_COPY_AND_ASSIGN(ThreadManager); }; -struct _SendMessage { - _SendMessage() {} - Thread* thread; - Message msg; - bool* ready; -}; - // WARNING! SUBCLASSES MUST CALL Stop() IN THEIR DESTRUCTORS! See ~Thread(). class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { @@ -537,16 +548,6 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { // Return true if the thread is currently running. bool IsRunning(); - // Processes received "Send" requests. If |source| is not null, only requests - // from |source| are processed, otherwise, all requests are processed. - void ReceiveSendsFromThread(const Thread* source); - - // If |source| is not null, pops the first "Send" message from |source| in - // |sendlist_|, otherwise, pops the first "Send" message of |sendlist_|. - // The caller must lock |crit_| before calling. - // Returns true if there is such a message. - bool PopSendMessageFromThread(const Thread* source, _SendMessage* msg); - void InvokeInternal(const Location& posted_from, rtc::FunctionView functor); @@ -570,7 +571,6 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { // Used if SocketServer ownership lies with |this|. std::unique_ptr own_ss_; - std::list<_SendMessage> sendlist_; std::string name_; // TODO(tommi): Add thread checks for proper use of control methods. diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc index fb54bb557d..91bea4f9b3 100644 --- a/rtc_base/thread_unittest.cc +++ b/rtc_base/thread_unittest.cc @@ -24,6 +24,7 @@ #include "rtc_base/socket_address.h" #include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/third_party/sigslot/sigslot.h" +#include "test/testsupport/rtc_expect_death.h" #if defined(WEBRTC_WIN) #include // NOLINT @@ -307,29 +308,38 @@ TEST(ThreadTest, Invoke) { } // Verifies that two threads calling Invoke on each other at the same time does -// not deadlock. -TEST(ThreadTest, TwoThreadsInvokeNoDeadlock) { +// not deadlock but crash. +#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID) +TEST(ThreadTest, TwoThreadsInvokeDeathTest) { + ::testing::GTEST_FLAG(death_test_style) = "threadsafe"; AutoThread thread; - Thread* current_thread = Thread::Current(); - ASSERT_TRUE(current_thread != nullptr); - + Thread* main_thread = Thread::Current(); auto other_thread = Thread::CreateWithSocketServer(); other_thread->Start(); - - struct LocalFuncs { - static void Set(bool* out) { *out = true; } - static void InvokeSet(Thread* thread, bool* out) { - thread->Invoke(RTC_FROM_HERE, Bind(&Set, out)); - } - }; - - bool called = false; - other_thread->Invoke( - RTC_FROM_HERE, Bind(&LocalFuncs::InvokeSet, current_thread, &called)); - - EXPECT_TRUE(called); + other_thread->Invoke(RTC_FROM_HERE, [main_thread] { + RTC_EXPECT_DEATH(main_thread->Invoke(RTC_FROM_HERE, [] {}), "loop"); + }); } +TEST(ThreadTest, ThreeThreadsInvokeDeathTest) { + ::testing::GTEST_FLAG(death_test_style) = "threadsafe"; + AutoThread thread; + Thread* first = Thread::Current(); + + auto second = Thread::Create(); + second->Start(); + auto third = Thread::Create(); + third->Start(); + + second->Invoke(RTC_FROM_HERE, [&] { + third->Invoke(RTC_FROM_HERE, [&] { + RTC_EXPECT_DEATH(first->Invoke(RTC_FROM_HERE, [] {}), "loop"); + }); + }); +} + +#endif + // Verifies that if thread A invokes a call on thread B and thread C is trying // to invoke A at the same time, thread A does not handle C's invoke while // invoking B.