mirror of
https://github.com/mollyim/webrtc.git
synced 2025-05-12 13:20:44 +01:00

Bug: webrtc:14366 Change-Id: I949c1d26f030696b18153afef977633c9a5bd4cf Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/272003 Reviewed-by: Danil Chapovalov <danilchap@webrtc.org> Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org> Commit-Queue: Markus Handell <handellm@webrtc.org> Cr-Commit-Position: refs/heads/main@{#37835}
502 lines
19 KiB
C++
502 lines
19 KiB
C++
/*
|
|
* Copyright 2019 The WebRTC Project Authors. All rights reserved.
|
|
*
|
|
* Use of this source code is governed by a BSD-style license
|
|
* that can be found in the LICENSE file in the root of the source
|
|
* tree. An additional intellectual property rights grant can be found
|
|
* in the file PATENTS. All contributing project authors may
|
|
* be found in the AUTHORS file in the root of the source tree.
|
|
*/
|
|
|
|
#include "rtc_base/operations_chain.h"
|
|
|
|
#include <atomic>
|
|
#include <functional>
|
|
#include <memory>
|
|
#include <utility>
|
|
#include <vector>
|
|
|
|
#include "rtc_base/event.h"
|
|
#include "rtc_base/gunit.h"
|
|
#include "rtc_base/thread.h"
|
|
#include "test/gmock.h"
|
|
#include "test/gtest.h"
|
|
|
|
namespace rtc {
|
|
|
|
using ::testing::ElementsAre;
|
|
|
|
namespace {
|
|
|
|
constexpr int kDefaultTimeout = 3000;
|
|
|
|
} // namespace
|
|
|
|
class OperationTracker {
|
|
public:
|
|
OperationTracker() : background_thread_(Thread::Create()) {
|
|
background_thread_->Start();
|
|
}
|
|
// The caller is responsible for ensuring that no operations are pending.
|
|
~OperationTracker() {}
|
|
|
|
// Creates a binding for the synchronous operation (see
|
|
// StartSynchronousOperation() below).
|
|
std::function<void(std::function<void()>)> BindSynchronousOperation(
|
|
Event* operation_complete_event) {
|
|
return [this, operation_complete_event](std::function<void()> callback) {
|
|
StartSynchronousOperation(operation_complete_event, std::move(callback));
|
|
};
|
|
}
|
|
|
|
// Creates a binding for the asynchronous operation (see
|
|
// StartAsynchronousOperation() below).
|
|
std::function<void(std::function<void()>)> BindAsynchronousOperation(
|
|
Event* unblock_operation_event,
|
|
Event* operation_complete_event) {
|
|
return [this, unblock_operation_event,
|
|
operation_complete_event](std::function<void()> callback) {
|
|
StartAsynchronousOperation(unblock_operation_event,
|
|
operation_complete_event, std::move(callback));
|
|
};
|
|
}
|
|
|
|
// When an operation is completed, its associated Event* is added to this
|
|
// list, in chronological order. This allows you to verify the order that
|
|
// operations are executed.
|
|
const std::vector<Event*>& completed_operation_events() const {
|
|
return completed_operation_events_;
|
|
}
|
|
|
|
private:
|
|
// This operation is completed synchronously; the callback is invoked before
|
|
// the function returns.
|
|
void StartSynchronousOperation(Event* operation_complete_event,
|
|
std::function<void()> callback) {
|
|
completed_operation_events_.push_back(operation_complete_event);
|
|
operation_complete_event->Set();
|
|
callback();
|
|
}
|
|
|
|
// This operation is completed asynchronously; it pings `background_thread_`,
|
|
// blocking that thread until `unblock_operation_event` is signaled and then
|
|
// completes upon posting back to the thread that the operation started on.
|
|
// Note that this requires the starting thread to be executing tasks (handle
|
|
// messages), i.e. must not be blocked.
|
|
void StartAsynchronousOperation(Event* unblock_operation_event,
|
|
Event* operation_complete_event,
|
|
std::function<void()> callback) {
|
|
Thread* current_thread = Thread::Current();
|
|
background_thread_->PostTask([this, current_thread, unblock_operation_event,
|
|
operation_complete_event, callback]() {
|
|
unblock_operation_event->Wait(Event::kForever);
|
|
current_thread->PostTask([this, operation_complete_event, callback]() {
|
|
completed_operation_events_.push_back(operation_complete_event);
|
|
operation_complete_event->Set();
|
|
callback();
|
|
});
|
|
});
|
|
}
|
|
|
|
std::unique_ptr<Thread> background_thread_;
|
|
std::vector<Event*> completed_operation_events_;
|
|
};
|
|
|
|
// The OperationTrackerProxy ensures all operations are chained on a separate
|
|
// thread. This allows tests to block while chained operations are posting
|
|
// between threads.
|
|
class OperationTrackerProxy {
|
|
public:
|
|
OperationTrackerProxy()
|
|
: operations_chain_thread_(Thread::Create()),
|
|
operation_tracker_(nullptr),
|
|
operations_chain_(nullptr) {
|
|
operations_chain_thread_->Start();
|
|
}
|
|
|
|
std::unique_ptr<Event> Initialize() {
|
|
std::unique_ptr<Event> event = std::make_unique<Event>();
|
|
operations_chain_thread_->PostTask([this, event_ptr = event.get()]() {
|
|
operation_tracker_ = std::make_unique<OperationTracker>();
|
|
operations_chain_ = OperationsChain::Create();
|
|
event_ptr->Set();
|
|
});
|
|
return event;
|
|
}
|
|
|
|
void SetOnChainEmptyCallback(std::function<void()> on_chain_empty_callback) {
|
|
Event event;
|
|
operations_chain_thread_->PostTask(
|
|
[this, &event,
|
|
on_chain_empty_callback = std::move(on_chain_empty_callback)]() {
|
|
operations_chain_->SetOnChainEmptyCallback(
|
|
std::move(on_chain_empty_callback));
|
|
event.Set();
|
|
});
|
|
event.Wait(Event::kForever);
|
|
}
|
|
|
|
bool IsEmpty() {
|
|
Event event;
|
|
bool is_empty = false;
|
|
operations_chain_thread_->PostTask([this, &event, &is_empty]() {
|
|
is_empty = operations_chain_->IsEmpty();
|
|
event.Set();
|
|
});
|
|
event.Wait(Event::kForever);
|
|
return is_empty;
|
|
}
|
|
|
|
std::unique_ptr<Event> ReleaseOperationChain() {
|
|
std::unique_ptr<Event> event = std::make_unique<Event>();
|
|
operations_chain_thread_->PostTask([this, event_ptr = event.get()]() {
|
|
operations_chain_ = nullptr;
|
|
event_ptr->Set();
|
|
});
|
|
return event;
|
|
}
|
|
|
|
// Chains a synchronous operation on the operation chain's thread.
|
|
std::unique_ptr<Event> PostSynchronousOperation() {
|
|
std::unique_ptr<Event> operation_complete_event = std::make_unique<Event>();
|
|
operations_chain_thread_->PostTask(
|
|
[this,
|
|
operation_complete_event_ptr = operation_complete_event.get()]() {
|
|
operations_chain_->ChainOperation(
|
|
operation_tracker_->BindSynchronousOperation(
|
|
operation_complete_event_ptr));
|
|
});
|
|
return operation_complete_event;
|
|
}
|
|
|
|
// Chains an asynchronous operation on the operation chain's thread. This
|
|
// involves the operation chain thread and an additional background thread.
|
|
std::unique_ptr<Event> PostAsynchronousOperation(
|
|
Event* unblock_operation_event) {
|
|
std::unique_ptr<Event> operation_complete_event = std::make_unique<Event>();
|
|
operations_chain_thread_->PostTask(
|
|
[this, unblock_operation_event,
|
|
operation_complete_event_ptr = operation_complete_event.get()]() {
|
|
operations_chain_->ChainOperation(
|
|
operation_tracker_->BindAsynchronousOperation(
|
|
unblock_operation_event, operation_complete_event_ptr));
|
|
});
|
|
return operation_complete_event;
|
|
}
|
|
|
|
// The order of completed events. Touches the `operation_tracker_` on the
|
|
// calling thread, this is only thread safe if all chained operations have
|
|
// completed.
|
|
const std::vector<Event*>& completed_operation_events() const {
|
|
return operation_tracker_->completed_operation_events();
|
|
}
|
|
|
|
private:
|
|
std::unique_ptr<Thread> operations_chain_thread_;
|
|
std::unique_ptr<OperationTracker> operation_tracker_;
|
|
scoped_refptr<OperationsChain> operations_chain_;
|
|
};
|
|
|
|
// On destruction, sets a boolean flag to true.
|
|
class SignalOnDestruction final {
|
|
public:
|
|
SignalOnDestruction(bool* destructor_called)
|
|
: destructor_called_(destructor_called) {
|
|
RTC_DCHECK(destructor_called_);
|
|
}
|
|
~SignalOnDestruction() {
|
|
// Moved objects will have `destructor_called_` set to null. Destroying a
|
|
// moved SignalOnDestruction should not signal.
|
|
if (destructor_called_) {
|
|
*destructor_called_ = true;
|
|
}
|
|
}
|
|
|
|
SignalOnDestruction(const SignalOnDestruction&) = delete;
|
|
SignalOnDestruction& operator=(const SignalOnDestruction&) = delete;
|
|
|
|
// Move operators.
|
|
SignalOnDestruction(SignalOnDestruction&& other)
|
|
: SignalOnDestruction(other.destructor_called_) {
|
|
other.destructor_called_ = nullptr;
|
|
}
|
|
SignalOnDestruction& operator=(SignalOnDestruction&& other) {
|
|
destructor_called_ = other.destructor_called_;
|
|
other.destructor_called_ = nullptr;
|
|
return *this;
|
|
}
|
|
|
|
private:
|
|
bool* destructor_called_;
|
|
};
|
|
|
|
TEST(OperationsChainTest, SynchronousOperation) {
|
|
OperationTrackerProxy operation_tracker_proxy;
|
|
operation_tracker_proxy.Initialize()->Wait(Event::kForever);
|
|
|
|
operation_tracker_proxy.PostSynchronousOperation()->Wait(Event::kForever);
|
|
}
|
|
|
|
TEST(OperationsChainTest, AsynchronousOperation) {
|
|
OperationTrackerProxy operation_tracker_proxy;
|
|
operation_tracker_proxy.Initialize()->Wait(Event::kForever);
|
|
|
|
Event unblock_async_operation_event;
|
|
auto async_operation_completed_event =
|
|
operation_tracker_proxy.PostAsynchronousOperation(
|
|
&unblock_async_operation_event);
|
|
// This should not be signaled until we unblock the operation.
|
|
EXPECT_FALSE(
|
|
async_operation_completed_event->Wait(webrtc::TimeDelta::Zero()));
|
|
// Unblock the operation and wait for it to complete.
|
|
unblock_async_operation_event.Set();
|
|
async_operation_completed_event->Wait(Event::kForever);
|
|
}
|
|
|
|
TEST(OperationsChainTest,
|
|
SynchronousOperationsAreExecutedImmediatelyWhenChainIsEmpty) {
|
|
// Testing synchonicity must be done without the OperationTrackerProxy to
|
|
// ensure messages are not processed in parallel. This test has no background
|
|
// threads.
|
|
scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
|
|
OperationTracker operation_tracker;
|
|
Event event0;
|
|
operations_chain->ChainOperation(
|
|
operation_tracker.BindSynchronousOperation(&event0));
|
|
// This should already be signaled. (If it wasn't, waiting wouldn't help,
|
|
// because we'd be blocking the only thread that exists.)
|
|
EXPECT_TRUE(event0.Wait(webrtc::TimeDelta::Zero()));
|
|
// Chaining another operation should also execute immediately because the
|
|
// chain should already be empty.
|
|
Event event1;
|
|
operations_chain->ChainOperation(
|
|
operation_tracker.BindSynchronousOperation(&event1));
|
|
EXPECT_TRUE(event1.Wait(webrtc::TimeDelta::Zero()));
|
|
}
|
|
|
|
TEST(OperationsChainTest, AsynchronousOperationBlocksSynchronousOperation) {
|
|
OperationTrackerProxy operation_tracker_proxy;
|
|
operation_tracker_proxy.Initialize()->Wait(Event::kForever);
|
|
|
|
Event unblock_async_operation_event;
|
|
auto async_operation_completed_event =
|
|
operation_tracker_proxy.PostAsynchronousOperation(
|
|
&unblock_async_operation_event);
|
|
|
|
auto sync_operation_completed_event =
|
|
operation_tracker_proxy.PostSynchronousOperation();
|
|
|
|
unblock_async_operation_event.Set();
|
|
|
|
sync_operation_completed_event->Wait(Event::kForever);
|
|
// The asynchronous avent should have blocked the synchronous event, meaning
|
|
// this should already be signaled.
|
|
EXPECT_TRUE(async_operation_completed_event->Wait(webrtc::TimeDelta::Zero()));
|
|
}
|
|
|
|
TEST(OperationsChainTest, OperationsAreExecutedInOrder) {
|
|
OperationTrackerProxy operation_tracker_proxy;
|
|
operation_tracker_proxy.Initialize()->Wait(Event::kForever);
|
|
|
|
// Chain a mix of asynchronous and synchronous operations.
|
|
Event operation0_unblock_event;
|
|
auto operation0_completed_event =
|
|
operation_tracker_proxy.PostAsynchronousOperation(
|
|
&operation0_unblock_event);
|
|
|
|
Event operation1_unblock_event;
|
|
auto operation1_completed_event =
|
|
operation_tracker_proxy.PostAsynchronousOperation(
|
|
&operation1_unblock_event);
|
|
|
|
auto operation2_completed_event =
|
|
operation_tracker_proxy.PostSynchronousOperation();
|
|
|
|
auto operation3_completed_event =
|
|
operation_tracker_proxy.PostSynchronousOperation();
|
|
|
|
Event operation4_unblock_event;
|
|
auto operation4_completed_event =
|
|
operation_tracker_proxy.PostAsynchronousOperation(
|
|
&operation4_unblock_event);
|
|
|
|
auto operation5_completed_event =
|
|
operation_tracker_proxy.PostSynchronousOperation();
|
|
|
|
Event operation6_unblock_event;
|
|
auto operation6_completed_event =
|
|
operation_tracker_proxy.PostAsynchronousOperation(
|
|
&operation6_unblock_event);
|
|
|
|
// Unblock events in reverse order. Operations 5, 3 and 2 are synchronous and
|
|
// don't need to be unblocked.
|
|
operation6_unblock_event.Set();
|
|
operation4_unblock_event.Set();
|
|
operation1_unblock_event.Set();
|
|
operation0_unblock_event.Set();
|
|
// Await all operations. The await-order shouldn't matter since they all get
|
|
// executed eventually.
|
|
operation0_completed_event->Wait(Event::kForever);
|
|
operation1_completed_event->Wait(Event::kForever);
|
|
operation2_completed_event->Wait(Event::kForever);
|
|
operation3_completed_event->Wait(Event::kForever);
|
|
operation4_completed_event->Wait(Event::kForever);
|
|
operation5_completed_event->Wait(Event::kForever);
|
|
operation6_completed_event->Wait(Event::kForever);
|
|
|
|
EXPECT_THAT(
|
|
operation_tracker_proxy.completed_operation_events(),
|
|
ElementsAre(
|
|
operation0_completed_event.get(), operation1_completed_event.get(),
|
|
operation2_completed_event.get(), operation3_completed_event.get(),
|
|
operation4_completed_event.get(), operation5_completed_event.get(),
|
|
operation6_completed_event.get()));
|
|
}
|
|
|
|
TEST(OperationsChainTest, IsEmpty) {
|
|
OperationTrackerProxy operation_tracker_proxy;
|
|
operation_tracker_proxy.Initialize()->Wait(Event::kForever);
|
|
|
|
// The chain is initially empty.
|
|
EXPECT_TRUE(operation_tracker_proxy.IsEmpty());
|
|
// Chain a single event.
|
|
Event unblock_async_operation_event0;
|
|
auto async_operation_completed_event0 =
|
|
operation_tracker_proxy.PostAsynchronousOperation(
|
|
&unblock_async_operation_event0);
|
|
// The chain is not empty while an event is pending.
|
|
EXPECT_FALSE(operation_tracker_proxy.IsEmpty());
|
|
// Completing the operation empties the chain.
|
|
unblock_async_operation_event0.Set();
|
|
async_operation_completed_event0->Wait(Event::kForever);
|
|
EXPECT_TRUE(operation_tracker_proxy.IsEmpty());
|
|
|
|
// Chain multiple events.
|
|
Event unblock_async_operation_event1;
|
|
auto async_operation_completed_event1 =
|
|
operation_tracker_proxy.PostAsynchronousOperation(
|
|
&unblock_async_operation_event1);
|
|
Event unblock_async_operation_event2;
|
|
auto async_operation_completed_event2 =
|
|
operation_tracker_proxy.PostAsynchronousOperation(
|
|
&unblock_async_operation_event2);
|
|
// Again, the chain is not empty while an event is pending.
|
|
EXPECT_FALSE(operation_tracker_proxy.IsEmpty());
|
|
// Upon completing the first event, the chain is still not empty.
|
|
unblock_async_operation_event1.Set();
|
|
async_operation_completed_event1->Wait(Event::kForever);
|
|
EXPECT_FALSE(operation_tracker_proxy.IsEmpty());
|
|
// Completing the last evenet empties the chain.
|
|
unblock_async_operation_event2.Set();
|
|
async_operation_completed_event2->Wait(Event::kForever);
|
|
EXPECT_TRUE(operation_tracker_proxy.IsEmpty());
|
|
}
|
|
|
|
TEST(OperationsChainTest, OnChainEmptyCallback) {
|
|
rtc::AutoThread main_thread;
|
|
OperationTrackerProxy operation_tracker_proxy;
|
|
operation_tracker_proxy.Initialize()->Wait(Event::kForever);
|
|
|
|
std::atomic<size_t> on_empty_callback_counter(0u);
|
|
operation_tracker_proxy.SetOnChainEmptyCallback(
|
|
[&on_empty_callback_counter] { ++on_empty_callback_counter; });
|
|
|
|
// Chain a single event.
|
|
Event unblock_async_operation_event0;
|
|
auto async_operation_completed_event0 =
|
|
operation_tracker_proxy.PostAsynchronousOperation(
|
|
&unblock_async_operation_event0);
|
|
// The callback is not invoked until the operation has completed.
|
|
EXPECT_EQ(0u, on_empty_callback_counter);
|
|
// Completing the operation empties the chain, invoking the callback.
|
|
unblock_async_operation_event0.Set();
|
|
async_operation_completed_event0->Wait(Event::kForever);
|
|
EXPECT_TRUE_WAIT(1u == on_empty_callback_counter, kDefaultTimeout);
|
|
|
|
// Chain multiple events.
|
|
Event unblock_async_operation_event1;
|
|
auto async_operation_completed_event1 =
|
|
operation_tracker_proxy.PostAsynchronousOperation(
|
|
&unblock_async_operation_event1);
|
|
Event unblock_async_operation_event2;
|
|
auto async_operation_completed_event2 =
|
|
operation_tracker_proxy.PostAsynchronousOperation(
|
|
&unblock_async_operation_event2);
|
|
// Again, the callback is not invoked until the operation has completed.
|
|
EXPECT_TRUE_WAIT(1u == on_empty_callback_counter, kDefaultTimeout);
|
|
// Upon completing the first event, the chain is still not empty, so the
|
|
// callback must not be invoked yet.
|
|
unblock_async_operation_event1.Set();
|
|
async_operation_completed_event1->Wait(Event::kForever);
|
|
EXPECT_TRUE_WAIT(1u == on_empty_callback_counter, kDefaultTimeout);
|
|
// Completing the last evenet empties the chain, invoking the callback.
|
|
unblock_async_operation_event2.Set();
|
|
async_operation_completed_event2->Wait(Event::kForever);
|
|
EXPECT_TRUE_WAIT(2u == on_empty_callback_counter, kDefaultTimeout);
|
|
}
|
|
|
|
TEST(OperationsChainTest,
|
|
SafeToReleaseReferenceToOperationChainWhileOperationIsPending) {
|
|
OperationTrackerProxy operation_tracker_proxy;
|
|
operation_tracker_proxy.Initialize()->Wait(Event::kForever);
|
|
|
|
Event unblock_async_operation_event;
|
|
auto async_operation_completed_event =
|
|
operation_tracker_proxy.PostAsynchronousOperation(
|
|
&unblock_async_operation_event);
|
|
|
|
// Pending operations keep the OperationChain alive, making it safe for the
|
|
// test to release any references before unblocking the async operation.
|
|
operation_tracker_proxy.ReleaseOperationChain()->Wait(Event::kForever);
|
|
|
|
unblock_async_operation_event.Set();
|
|
async_operation_completed_event->Wait(Event::kForever);
|
|
}
|
|
|
|
TEST(OperationsChainTest, FunctorIsNotDestroyedWhileExecuting) {
|
|
scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
|
|
|
|
bool destructor_called = false;
|
|
SignalOnDestruction signal_on_destruction(&destructor_called);
|
|
|
|
operations_chain->ChainOperation(
|
|
[signal_on_destruction = std::move(signal_on_destruction),
|
|
&destructor_called](std::function<void()> callback) {
|
|
EXPECT_FALSE(destructor_called);
|
|
// Invoking the callback marks the operation as complete, popping the
|
|
// Operation object from the OperationsChain internal queue.
|
|
callback();
|
|
// Even though the internal Operation object has been destroyed,
|
|
// variables captured by this lambda expression must still be valid (the
|
|
// associated functor must not be deleted while executing).
|
|
EXPECT_FALSE(destructor_called);
|
|
});
|
|
// The lambda having executed synchronously and completed, its captured
|
|
// variables should now have been deleted.
|
|
EXPECT_TRUE(destructor_called);
|
|
}
|
|
|
|
#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
|
|
|
|
TEST(OperationsChainDeathTest, OperationNotInvokingCallbackShouldCrash) {
|
|
scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
|
|
EXPECT_DEATH(
|
|
operations_chain->ChainOperation([](std::function<void()> callback) {}),
|
|
"");
|
|
}
|
|
|
|
TEST(OperationsChainDeathTest,
|
|
OperationInvokingCallbackMultipleTimesShouldCrash) {
|
|
scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
|
|
EXPECT_DEATH(
|
|
operations_chain->ChainOperation([](std::function<void()> callback) {
|
|
// Signal that the operation has completed multiple times.
|
|
callback();
|
|
callback();
|
|
}),
|
|
"");
|
|
}
|
|
|
|
#endif // RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
|
|
|
|
} // namespace rtc
|