From c931f708967382050f111bfdd73ff689f7f3b190 Mon Sep 17 00:00:00 2001 From: Byoungchan Lee Date: Sun, 3 Jul 2022 17:20:17 +0900 Subject: [PATCH] network_tester: Remove usage of rtc::Thread::socketserver() and cleanup Instead of creating a TaskQueue from packet_sender, create a rtc::Thread in test_controller so that test_controller instantiates a SocketServer, eliminating the use of rtc::Thread::socketserver(). Also did various cleanups, such as adding threading annotations, and ensuring that all network operations are done in dedicated threads. Bug: webrtc:13145 Test: Unittest, and manually verified using Android clients and Linux servers Change-Id: I05ebe5e29bd80f14a193c9ee8b0bf63a1b6b94d7 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/263321 Commit-Queue: Daniel.l Lee Reviewed-by: Mirko Bonadei Cr-Commit-Position: refs/heads/main@{#37411} --- rtc_tools/network_tester/BUILD.gn | 16 ++++- rtc_tools/network_tester/config_reader.cc | 3 +- rtc_tools/network_tester/{jni.cpp => jni.cc} | 7 +- .../network_tester/network_tester_unittest.cc | 1 + rtc_tools/network_tester/packet_sender.cc | 51 +++++++++----- rtc_tools/network_tester/packet_sender.h | 11 +-- rtc_tools/network_tester/server.cc | 5 +- rtc_tools/network_tester/test_controller.cc | 70 +++++++++++++------ rtc_tools/network_tester/test_controller.h | 30 ++++---- 9 files changed, 130 insertions(+), 64 deletions(-) rename rtc_tools/network_tester/{jni.cpp => jni.cc} (87%) diff --git a/rtc_tools/network_tester/BUILD.gn b/rtc_tools/network_tester/BUILD.gn index 4fef8403c2..bb1f5d9f64 100644 --- a/rtc_tools/network_tester/BUILD.gn +++ b/rtc_tools/network_tester/BUILD.gn @@ -47,6 +47,7 @@ if (rtc_enable_protobuf) { "../../rtc_base:checks", "../../rtc_base:ignore_wundef", "../../rtc_base:ip_address", + "../../rtc_base:logging", "../../rtc_base:macromagic", "../../rtc_base:protobuf_utils", "../../rtc_base:rtc_task_queue", @@ -81,6 +82,7 @@ if (rtc_enable_protobuf) { deps = [ ":network_tester", "../../rtc_base:gunit_helpers", + "../../rtc_base:threading", "../../test:fileutils", "../../test:test_support", "//testing/gtest", @@ -98,7 +100,11 @@ if (rtc_enable_protobuf) { rtc_executable("network_tester_server") { sources = [ "server.cc" ] - deps = [ ":network_tester" ] + deps = [ + ":network_tester", + "../../rtc_base:null_socket_server", + "../../rtc_base:threading", + ] } } @@ -158,9 +164,13 @@ if (is_android) { } rtc_shared_library("network_tester_so") { - sources = [ "jni.cpp" ] + sources = [ "jni.cc" ] - deps = [ ":network_tester" ] + deps = [ + ":network_tester", + "../../rtc_base:logging", + "../../rtc_base:threading", + ] suppressed_configs += [ "//build/config/android:hide_all_but_jni_onload" ] configs += [ "//build/config/android:hide_all_but_jni" ] diff --git a/rtc_tools/network_tester/config_reader.cc b/rtc_tools/network_tester/config_reader.cc index ed76a3fcf7..16ae458d50 100644 --- a/rtc_tools/network_tester/config_reader.cc +++ b/rtc_tools/network_tester/config_reader.cc @@ -21,7 +21,8 @@ ConfigReader::ConfigReader(const std::string& config_file_path) : proto_config_index_(0) { std::ifstream config_stream(config_file_path, std::ios_base::in | std::ios_base::binary); - RTC_DCHECK(config_stream.is_open()); + RTC_DCHECK(config_stream.is_open()) + << "Config " << config_file_path << " open failed"; RTC_DCHECK(config_stream.good()); std::string config_data((std::istreambuf_iterator(config_stream)), (std::istreambuf_iterator())); diff --git a/rtc_tools/network_tester/jni.cpp b/rtc_tools/network_tester/jni.cc similarity index 87% rename from rtc_tools/network_tester/jni.cpp rename to rtc_tools/network_tester/jni.cc index 818dd7ce80..f192739ad0 100644 --- a/rtc_tools/network_tester/jni.cpp +++ b/rtc_tools/network_tester/jni.cc @@ -13,12 +13,15 @@ #define JNIEXPORT __attribute__((visibility("default"))) #include +#include "rtc_base/logging.h" +#include "rtc_base/thread.h" #include "rtc_tools/network_tester/test_controller.h" extern "C" JNIEXPORT jlong JNICALL Java_com_google_media_networktester_NetworkTester_CreateTestController( JNIEnv* jni, jclass) { + rtc::ThreadManager::Instance()->WrapCurrentThread(); return reinterpret_cast(new webrtc::TestController( 0, 0, "/mnt/sdcard/network_tester_client_config.dat", "/mnt/sdcard/network_tester_client_packet_log.dat")); @@ -47,7 +50,8 @@ Java_com_google_media_networktester_NetworkTester_TestControllerRun( JNIEnv* jni, jclass, jlong native_pointer) { - reinterpret_cast(native_pointer)->Run(); + // 100 ms arbitrary chosen, but it works well. + rtc::Thread::Current()->ProcessMessages(/*cms=*/100); } extern "C" JNIEXPORT void JNICALL @@ -60,4 +64,5 @@ Java_com_google_media_networktester_NetworkTester_DestroyTestController( if (test_controller) { delete test_controller; } + rtc::ThreadManager::Instance()->UnwrapCurrentThread(); } diff --git a/rtc_tools/network_tester/network_tester_unittest.cc b/rtc_tools/network_tester/network_tester_unittest.cc index 156e8bb23c..60b34e4e9f 100644 --- a/rtc_tools/network_tester/network_tester_unittest.cc +++ b/rtc_tools/network_tester/network_tester_unittest.cc @@ -20,6 +20,7 @@ namespace webrtc { TEST(NetworkTesterTest, ServerClient) { + rtc::AutoThread main_thread; TestController client( 0, 0, webrtc::test::ResourcePath("network_tester/client_config", "dat"), webrtc::test::OutputPath() + "client_packet_log.dat"); diff --git a/rtc_tools/network_tester/packet_sender.cc b/rtc_tools/network_tester/packet_sender.cc index b2c6cd921c..b80bb9872e 100644 --- a/rtc_tools/network_tester/packet_sender.cc +++ b/rtc_tools/network_tester/packet_sender.cc @@ -15,8 +15,6 @@ #include #include -#include "absl/types/optional.h" -#include "api/task_queue/default_task_queue_factory.h" #include "api/task_queue/queued_task.h" #include "api/task_queue/task_queue_base.h" #include "rtc_base/time_utils.h" @@ -29,12 +27,16 @@ namespace { class SendPacketTask : public QueuedTask { public: - explicit SendPacketTask(PacketSender* packet_sender) - : target_time_ms_(rtc::TimeMillis()), packet_sender_(packet_sender) {} + explicit SendPacketTask( + PacketSender* packet_sender, + rtc::scoped_refptr task_safety_flag) + : target_time_ms_(rtc::TimeMillis()), + packet_sender_(packet_sender), + task_safety_flag_(task_safety_flag) {} private: bool Run() override { - if (packet_sender_->IsSending()) { + if (task_safety_flag_->alive() && packet_sender_->IsSending()) { packet_sender_->SendPacket(); target_time_ms_ += packet_sender_->GetSendIntervalMs(); int64_t delay_ms = std::max(static_cast(0), @@ -48,17 +50,24 @@ class SendPacketTask : public QueuedTask { } int64_t target_time_ms_; PacketSender* const packet_sender_; + rtc::scoped_refptr task_safety_flag_; }; class UpdateTestSettingTask : public QueuedTask { public: - UpdateTestSettingTask(PacketSender* packet_sender, - std::unique_ptr config_reader) + UpdateTestSettingTask( + PacketSender* packet_sender, + std::unique_ptr config_reader, + rtc::scoped_refptr task_safety_flag) : packet_sender_(packet_sender), - config_reader_(std::move(config_reader)) {} + config_reader_(std::move(config_reader)), + task_safety_flag_(task_safety_flag) {} private: bool Run() override { + if (!task_safety_flag_->alive()) { + return true; + } auto config = config_reader_->GetNextConfig(); if (config) { packet_sender_->UpdateTestSetting((*config).packet_size, @@ -73,34 +82,38 @@ class UpdateTestSettingTask : public QueuedTask { } PacketSender* const packet_sender_; const std::unique_ptr config_reader_; + rtc::scoped_refptr task_safety_flag_; }; } // namespace -PacketSender::PacketSender(TestController* test_controller, - const std::string& config_file_path) +PacketSender::PacketSender( + TestController* test_controller, + webrtc::TaskQueueBase* worker_queue, + rtc::scoped_refptr task_safety_flag, + const std::string& config_file_path) : packet_size_(0), send_interval_ms_(0), sequence_number_(0), sending_(false), config_file_path_(config_file_path), test_controller_(test_controller), - task_queue_factory_(CreateDefaultTaskQueueFactory()), - worker_queue_(task_queue_factory_->CreateTaskQueue( - "Packet Sender", - TaskQueueFactory::Priority::HIGH)) {} + worker_queue_(worker_queue), + task_safety_flag_(task_safety_flag) {} PacketSender::~PacketSender() = default; void PacketSender::StartSending() { worker_queue_checker_.Detach(); - worker_queue_.PostTask([this]() { + worker_queue_->PostTask(ToQueuedTask(task_safety_flag_, [this]() { RTC_DCHECK_RUN_ON(&worker_queue_checker_); sending_ = true; - }); - worker_queue_.PostTask(std::make_unique( - this, std::make_unique(config_file_path_))); - worker_queue_.PostTask(std::make_unique(this)); + })); + worker_queue_->PostTask(std::make_unique( + this, std::make_unique(config_file_path_), + task_safety_flag_)); + worker_queue_->PostTask( + std::make_unique(this, task_safety_flag_)); } void PacketSender::StopSending() { diff --git a/rtc_tools/network_tester/packet_sender.h b/rtc_tools/network_tester/packet_sender.h index 233ed6a51a..323f75bd0c 100644 --- a/rtc_tools/network_tester/packet_sender.h +++ b/rtc_tools/network_tester/packet_sender.h @@ -35,8 +35,11 @@ class TestController; class PacketSender { public: - PacketSender(TestController* test_controller, - const std::string& config_file_path); + PacketSender( + TestController* test_controller, + webrtc::TaskQueueBase* worker_queue, + rtc::scoped_refptr task_safety_flag, + const std::string& config_file_path); ~PacketSender(); PacketSender(const PacketSender&) = delete; @@ -59,8 +62,8 @@ class PacketSender { bool sending_ RTC_GUARDED_BY(worker_queue_checker_); const std::string config_file_path_; TestController* const test_controller_; - std::unique_ptr task_queue_factory_; - rtc::TaskQueue worker_queue_; + webrtc::TaskQueueBase* worker_queue_; + rtc::scoped_refptr task_safety_flag_; }; } // namespace webrtc diff --git a/rtc_tools/network_tester/server.cc b/rtc_tools/network_tester/server.cc index 4074a483f8..f0f610a925 100644 --- a/rtc_tools/network_tester/server.cc +++ b/rtc_tools/network_tester/server.cc @@ -8,13 +8,16 @@ * be found in the AUTHORS file in the root of the source tree. */ +#include "rtc_base/null_socket_server.h" #include "rtc_tools/network_tester/test_controller.h" int main(int /*argn*/, char* /*argv*/[]) { + rtc::Thread main_thread(std::make_unique()); webrtc::TestController server(9090, 9090, "server_config.dat", "server_packet_log.dat"); while (!server.IsTestDone()) { - server.Run(); + // 100 ms is arbitrary chosen. + main_thread.ProcessMessages(/*cms=*/100); } return 0; } diff --git a/rtc_tools/network_tester/test_controller.cc b/rtc_tools/network_tester/test_controller.cc index d0e2f4cdbe..6b16708080 100644 --- a/rtc_tools/network_tester/test_controller.cc +++ b/rtc_tools/network_tester/test_controller.cc @@ -14,7 +14,9 @@ #include "absl/types/optional.h" #include "rtc_base/checks.h" +#include "rtc_base/internal/default_socket_server.h" #include "rtc_base/ip_address.h" +#include "rtc_base/logging.h" #include "rtc_base/thread.h" namespace webrtc { @@ -23,20 +25,32 @@ TestController::TestController(int min_port, int max_port, const std::string& config_file_path, const std::string& log_file_path) - // TODO(bugs.webrtc.org/13145): Add a SocketFactory argument. - : socket_factory_( - rtc::ThreadManager::Instance()->WrapCurrentThread()->socketserver()), + : socket_server_(rtc::CreateDefaultSocketServer()), + packet_sender_thread_( + std::make_unique(socket_server_.get())), + socket_factory_(socket_server_.get()), config_file_path_(config_file_path), packet_logger_(log_file_path), local_test_done_(false), - remote_test_done_(false) { + remote_test_done_(false), + task_safety_flag_(PendingTaskSafetyFlag::CreateDetached()) { RTC_DCHECK_RUN_ON(&test_controller_thread_checker_); - packet_sender_checker_.Detach(); send_data_.fill(42); - udp_socket_ = - std::unique_ptr(socket_factory_.CreateUdpSocket( - rtc::SocketAddress(rtc::GetAnyIP(AF_INET), 0), min_port, max_port)); - udp_socket_->SignalReadPacket.connect(this, &TestController::OnReadPacket); + packet_sender_thread_->SetName("PacketSender", nullptr); + packet_sender_thread_->Start(); + packet_sender_thread_->Invoke(RTC_FROM_HERE, [&] { + RTC_DCHECK_RUN_ON(packet_sender_thread_.get()); + udp_socket_ = + std::unique_ptr(socket_factory_.CreateUdpSocket( + rtc::SocketAddress(rtc::GetAnyIP(AF_INET), 0), min_port, max_port)); + udp_socket_->SignalReadPacket.connect(this, &TestController::OnReadPacket); + }); +} + +TestController::~TestController() { + RTC_DCHECK_RUN_ON(&test_controller_thread_checker_); + packet_sender_thread_->Invoke( + RTC_FROM_HERE, [this]() { task_safety_flag_->SetNotAlive(); }); } void TestController::SendConnectTo(const std::string& hostname, int port) { @@ -45,18 +59,22 @@ void TestController::SendConnectTo(const std::string& hostname, int port) { NetworkTesterPacket packet; packet.set_type(NetworkTesterPacket::HAND_SHAKING); SendData(packet, absl::nullopt); - MutexLock scoped_lock(&local_test_done_lock_); + MutexLock scoped_lock(&test_done_lock_); local_test_done_ = false; remote_test_done_ = false; } -void TestController::Run() { - RTC_DCHECK_RUN_ON(&test_controller_thread_checker_); - rtc::Thread::Current()->ProcessMessages(0); -} - void TestController::SendData(const NetworkTesterPacket& packet, absl::optional data_size) { + if (!packet_sender_thread_->IsCurrent()) { + packet_sender_thread_->PostTask(ToQueuedTask( + task_safety_flag_, + [this, packet, data_size]() { this->SendData(packet, data_size); })); + return; + } + RTC_DCHECK_RUN_ON(packet_sender_thread_.get()); + RTC_LOG(LS_VERBOSE) << "SendData"; + // Can be call from packet_sender or from test_controller thread. size_t packet_size = packet.ByteSizeLong(); send_data_[0] = packet_size; @@ -69,17 +87,17 @@ void TestController::SendData(const NetworkTesterPacket& packet, } void TestController::OnTestDone() { - RTC_DCHECK_RUN_ON(&packet_sender_checker_); + RTC_DCHECK_RUN_ON(packet_sender_thread_.get()); NetworkTesterPacket packet; packet.set_type(NetworkTesterPacket::TEST_DONE); SendData(packet, absl::nullopt); - MutexLock scoped_lock(&local_test_done_lock_); + MutexLock scoped_lock(&test_done_lock_); local_test_done_ = true; } bool TestController::IsTestDone() { RTC_DCHECK_RUN_ON(&test_controller_thread_checker_); - MutexLock scoped_lock(&local_test_done_lock_); + MutexLock scoped_lock(&test_done_lock_); return local_test_done_ && remote_test_done_; } @@ -88,7 +106,8 @@ void TestController::OnReadPacket(rtc::AsyncPacketSocket* socket, size_t len, const rtc::SocketAddress& remote_addr, const int64_t& packet_time_us) { - RTC_DCHECK_RUN_ON(&test_controller_thread_checker_); + RTC_DCHECK_RUN_ON(packet_sender_thread_.get()); + RTC_LOG(LS_VERBOSE) << "OnReadPacket"; size_t packet_size = data[0]; std::string receive_data(&data[1], packet_size); NetworkTesterPacket packet; @@ -100,17 +119,21 @@ void TestController::OnReadPacket(rtc::AsyncPacketSocket* socket, packet.set_type(NetworkTesterPacket::TEST_START); remote_address_ = remote_addr; SendData(packet, absl::nullopt); - packet_sender_.reset(new PacketSender(this, config_file_path_)); + packet_sender_.reset(new PacketSender(this, packet_sender_thread_.get(), + task_safety_flag_, + config_file_path_)); packet_sender_->StartSending(); - MutexLock scoped_lock(&local_test_done_lock_); + MutexLock scoped_lock(&test_done_lock_); local_test_done_ = false; remote_test_done_ = false; break; } case NetworkTesterPacket::TEST_START: { - packet_sender_.reset(new PacketSender(this, config_file_path_)); + packet_sender_.reset(new PacketSender(this, packet_sender_thread_.get(), + task_safety_flag_, + config_file_path_)); packet_sender_->StartSending(); - MutexLock scoped_lock(&local_test_done_lock_); + MutexLock scoped_lock(&test_done_lock_); local_test_done_ = false; remote_test_done_ = false; break; @@ -122,6 +145,7 @@ void TestController::OnReadPacket(rtc::AsyncPacketSocket* socket, break; } case NetworkTesterPacket::TEST_DONE: { + MutexLock scoped_lock(&test_done_lock_); remote_test_done_ = true; break; } diff --git a/rtc_tools/network_tester/test_controller.h b/rtc_tools/network_tester/test_controller.h index 3933b46b5b..b08fbd5dd8 100644 --- a/rtc_tools/network_tester/test_controller.h +++ b/rtc_tools/network_tester/test_controller.h @@ -25,6 +25,7 @@ #include "rtc_base/ignore_wundef.h" #include "rtc_base/socket_address.h" #include "rtc_base/synchronization/mutex.h" +#include "rtc_base/system/no_unique_address.h" #include "rtc_base/third_party/sigslot/sigslot.h" #include "rtc_base/thread_annotations.h" #include "rtc_tools/network_tester/packet_logger.h" @@ -49,12 +50,11 @@ class TestController : public sigslot::has_slots<> { int max_port, const std::string& config_file_path, const std::string& log_file_path); + ~TestController() override; TestController(const TestController&) = delete; TestController& operator=(const TestController&) = delete; - void Run(); - void SendConnectTo(const std::string& hostname, int port); void SendData(const NetworkTesterPacket& packet, @@ -70,18 +70,24 @@ class TestController : public sigslot::has_slots<> { size_t len, const rtc::SocketAddress& remote_addr, const int64_t& packet_time_us); - SequenceChecker test_controller_thread_checker_; - SequenceChecker packet_sender_checker_; - rtc::BasicPacketSocketFactory socket_factory_; + RTC_NO_UNIQUE_ADDRESS SequenceChecker test_controller_thread_checker_; + std::unique_ptr socket_server_; + std::unique_ptr packet_sender_thread_; + rtc::BasicPacketSocketFactory socket_factory_ + RTC_GUARDED_BY(packet_sender_thread_); const std::string config_file_path_; - PacketLogger packet_logger_; - Mutex local_test_done_lock_; - bool local_test_done_ RTC_GUARDED_BY(local_test_done_lock_); - bool remote_test_done_; - std::array send_data_; - std::unique_ptr udp_socket_; + PacketLogger packet_logger_ RTC_GUARDED_BY(packet_sender_thread_); + Mutex test_done_lock_ RTC_GUARDED_BY(test_controller_thread_checker_); + bool local_test_done_ RTC_GUARDED_BY(test_done_lock_); + bool remote_test_done_ RTC_GUARDED_BY(test_done_lock_); + std::array send_data_ + RTC_GUARDED_BY(packet_sender_thread_); + std::unique_ptr udp_socket_ + RTC_GUARDED_BY(packet_sender_thread_); rtc::SocketAddress remote_address_; - std::unique_ptr packet_sender_; + std::unique_ptr packet_sender_ + RTC_GUARDED_BY(packet_sender_thread_); + rtc::scoped_refptr task_safety_flag_; }; } // namespace webrtc