Reland "Add rtc::Thread invoke policy."

This is a reland of 26d4f9cd39

Original change's description:
> Add rtc::Thread invoke policy.
> 
> Policy will allow explicitly specify thread between which invokes are
> allowed, or explicitly forbid any invokes.
> 
> Change-Id: I360e7cba3ce1c21abd5047c6f175d8c4e0e99c6f
> Bug: webrtc:11728
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/177526
> Reviewed-by: Tommi <tommi@webrtc.org>
> Commit-Queue: Artem Titov <titovartem@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#31604}

Bug: webrtc:11728
Change-Id: Id700b870d8c8dd6fa97380422e568dfb69de131f
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/178564
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Reviewed-by: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31621}
This commit is contained in:
Artem Titov 2020-07-03 12:09:26 +02:00 committed by Commit Bot
parent c07e904a25
commit dfc5f0d19d
4 changed files with 124 additions and 0 deletions

View file

@ -10,6 +10,7 @@
#include "pc/peer_connection_factory.h"
#include <cstdio>
#include <memory>
#include <utility>
#include <vector>
@ -107,6 +108,10 @@ PeerConnectionFactory::PeerConnectionFactory(
wraps_current_thread_ = true;
}
}
signaling_thread_->AllowInvokesToThread(worker_thread_);
signaling_thread_->AllowInvokesToThread(network_thread_);
worker_thread_->AllowInvokesToThread(network_thread_);
network_thread_->DisallowAllInvokes();
}
PeerConnectionFactory::~PeerConnectionFactory() {

View file

@ -34,6 +34,7 @@
#include "rtc_base/critical_section.h"
#include "rtc_base/logging.h"
#include "rtc_base/null_socket_server.h"
#include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/time_utils.h"
#include "rtc_base/trace_event.h"
@ -892,6 +893,7 @@ void Thread::Send(const Location& posted_from,
AutoThread thread;
Thread* current_thread = Thread::Current();
RTC_DCHECK(current_thread != nullptr); // AutoThread ensures this
RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this));
#if RTC_DCHECK_IS_ON
ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread,
this);
@ -974,6 +976,50 @@ void Thread::QueuedTaskHandler::OnMessage(Message* msg) {
task.release();
}
void Thread::AllowInvokesToThread(Thread* thread) {
#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
if (!IsCurrent()) {
PostTask(webrtc::ToQueuedTask(
[thread, this]() { AllowInvokesToThread(thread); }));
return;
}
RTC_DCHECK_RUN_ON(this);
allowed_threads_.push_back(thread);
invoke_policy_enabled_ = true;
#endif
}
void Thread::DisallowAllInvokes() {
#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
if (!IsCurrent()) {
PostTask(webrtc::ToQueuedTask([this]() { DisallowAllInvokes(); }));
return;
}
RTC_DCHECK_RUN_ON(this);
allowed_threads_.clear();
invoke_policy_enabled_ = true;
#endif
}
// Returns true if no policies added or if there is at least one policy
// that permits invocation to |target| thread.
bool Thread::IsInvokeToThreadAllowed(rtc::Thread* target) {
#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
RTC_DCHECK_RUN_ON(this);
if (!invoke_policy_enabled_) {
return true;
}
for (const auto* thread : allowed_threads_) {
if (thread == target) {
return true;
}
}
return false;
#else
return true;
#endif
}
void Thread::PostTask(std::unique_ptr<webrtc::QueuedTask> task) {
// Though Post takes MessageData by raw pointer (last parameter), it still
// takes it with ownership.

View file

@ -338,6 +338,18 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
InvokeInternal(posted_from, functor);
}
// Allows invoke to specified |thread|. Thread never will be dereferenced and
// will be used only for reference-based comparison, so instance can be safely
// deleted. If NDEBUG is defined and DCHECK_ALWAYS_ON is undefined do nothing.
void AllowInvokesToThread(Thread* thread);
// If NDEBUG is defined and DCHECK_ALWAYS_ON is undefined do nothing.
void DisallowAllInvokes();
// Returns true if |target| was allowed by AllowInvokesToThread() or if no
// calls were made to AllowInvokesToThread and DisallowAllInvokes. Otherwise
// returns false.
// If NDEBUG is defined and DCHECK_ALWAYS_ON is undefined always returns true.
bool IsInvokeToThreadAllowed(rtc::Thread* target);
// Posts a task to invoke the functor on |this| thread asynchronously, i.e.
// without blocking the thread that invoked PostTask(). Ownership of |functor|
// is passed and (usually, see below) destroyed on |this| thread after it is
@ -566,6 +578,10 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
MessageList messages_ RTC_GUARDED_BY(crit_);
PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_);
uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_);
#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
std::vector<Thread*> allowed_threads_ RTC_GUARDED_BY(this);
bool invoke_policy_enabled_ RTC_GUARDED_BY(this) = false;
#endif
CriticalSection crit_;
bool fInitialized_;
bool fDestroyed_;

View file

@ -288,6 +288,63 @@ TEST(ThreadTest, Wrap) {
ThreadManager::Instance()->SetCurrentThread(current_thread);
}
#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
TEST(ThreadTest, InvokeToThreadAllowedReturnsTrueWithoutPolicies) {
// Create and start the thread.
auto thread1 = Thread::CreateWithSocketServer();
auto thread2 = Thread::CreateWithSocketServer();
thread1->PostTask(ToQueuedTask(
[&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); }));
Thread* th_main = Thread::Current();
th_main->ProcessMessages(100);
}
TEST(ThreadTest, InvokeAllowedWhenThreadsAdded) {
// Create and start the thread.
auto thread1 = Thread::CreateWithSocketServer();
auto thread2 = Thread::CreateWithSocketServer();
auto thread3 = Thread::CreateWithSocketServer();
auto thread4 = Thread::CreateWithSocketServer();
thread1->AllowInvokesToThread(thread2.get());
thread1->AllowInvokesToThread(thread3.get());
thread1->PostTask(ToQueuedTask([&]() {
EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get()));
EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread3.get()));
EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread4.get()));
}));
Thread* th_main = Thread::Current();
th_main->ProcessMessages(100);
}
TEST(ThreadTest, InvokesDisallowedWhenDisallowAllInvokes) {
// Create and start the thread.
auto thread1 = Thread::CreateWithSocketServer();
auto thread2 = Thread::CreateWithSocketServer();
thread1->DisallowAllInvokes();
thread1->PostTask(ToQueuedTask([&]() {
EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread2.get()));
}));
Thread* th_main = Thread::Current();
th_main->ProcessMessages(100);
}
#endif // (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
TEST(ThreadTest, InvokesAllowedByDefault) {
// Create and start the thread.
auto thread1 = Thread::CreateWithSocketServer();
auto thread2 = Thread::CreateWithSocketServer();
thread1->PostTask(ToQueuedTask(
[&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); }));
Thread* th_main = Thread::Current();
th_main->ProcessMessages(100);
}
TEST(ThreadTest, Invoke) {
// Create and start the thread.
auto thread = Thread::CreateWithSocketServer();