mirror of
https://github.com/mollyim/webrtc.git
synced 2025-05-13 05:40:42 +01:00
network_tester: Remove usage of rtc::Thread::socketserver() and cleanup
Instead of creating a TaskQueue from packet_sender, create a rtc::Thread in test_controller so that test_controller instantiates a SocketServer, eliminating the use of rtc::Thread::socketserver(). Also did various cleanups, such as adding threading annotations, and ensuring that all network operations are done in dedicated threads. Bug: webrtc:13145 Test: Unittest, and manually verified using Android clients and Linux servers Change-Id: I05ebe5e29bd80f14a193c9ee8b0bf63a1b6b94d7 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/263321 Commit-Queue: Daniel.l Lee <daniel.l@hpcnt.com> Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org> Cr-Commit-Position: refs/heads/main@{#37411}
This commit is contained in:
parent
c3f511301b
commit
c931f70896
9 changed files with 130 additions and 64 deletions
|
@ -47,6 +47,7 @@ if (rtc_enable_protobuf) {
|
|||
"../../rtc_base:checks",
|
||||
"../../rtc_base:ignore_wundef",
|
||||
"../../rtc_base:ip_address",
|
||||
"../../rtc_base:logging",
|
||||
"../../rtc_base:macromagic",
|
||||
"../../rtc_base:protobuf_utils",
|
||||
"../../rtc_base:rtc_task_queue",
|
||||
|
@ -81,6 +82,7 @@ if (rtc_enable_protobuf) {
|
|||
deps = [
|
||||
":network_tester",
|
||||
"../../rtc_base:gunit_helpers",
|
||||
"../../rtc_base:threading",
|
||||
"../../test:fileutils",
|
||||
"../../test:test_support",
|
||||
"//testing/gtest",
|
||||
|
@ -98,7 +100,11 @@ if (rtc_enable_protobuf) {
|
|||
rtc_executable("network_tester_server") {
|
||||
sources = [ "server.cc" ]
|
||||
|
||||
deps = [ ":network_tester" ]
|
||||
deps = [
|
||||
":network_tester",
|
||||
"../../rtc_base:null_socket_server",
|
||||
"../../rtc_base:threading",
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -158,9 +164,13 @@ if (is_android) {
|
|||
}
|
||||
|
||||
rtc_shared_library("network_tester_so") {
|
||||
sources = [ "jni.cpp" ]
|
||||
sources = [ "jni.cc" ]
|
||||
|
||||
deps = [ ":network_tester" ]
|
||||
deps = [
|
||||
":network_tester",
|
||||
"../../rtc_base:logging",
|
||||
"../../rtc_base:threading",
|
||||
]
|
||||
|
||||
suppressed_configs += [ "//build/config/android:hide_all_but_jni_onload" ]
|
||||
configs += [ "//build/config/android:hide_all_but_jni" ]
|
||||
|
|
|
@ -21,7 +21,8 @@ ConfigReader::ConfigReader(const std::string& config_file_path)
|
|||
: proto_config_index_(0) {
|
||||
std::ifstream config_stream(config_file_path,
|
||||
std::ios_base::in | std::ios_base::binary);
|
||||
RTC_DCHECK(config_stream.is_open());
|
||||
RTC_DCHECK(config_stream.is_open())
|
||||
<< "Config " << config_file_path << " open failed";
|
||||
RTC_DCHECK(config_stream.good());
|
||||
std::string config_data((std::istreambuf_iterator<char>(config_stream)),
|
||||
(std::istreambuf_iterator<char>()));
|
||||
|
|
|
@ -13,12 +13,15 @@
|
|||
#define JNIEXPORT __attribute__((visibility("default")))
|
||||
#include <string>
|
||||
|
||||
#include "rtc_base/logging.h"
|
||||
#include "rtc_base/thread.h"
|
||||
#include "rtc_tools/network_tester/test_controller.h"
|
||||
|
||||
extern "C" JNIEXPORT jlong JNICALL
|
||||
Java_com_google_media_networktester_NetworkTester_CreateTestController(
|
||||
JNIEnv* jni,
|
||||
jclass) {
|
||||
rtc::ThreadManager::Instance()->WrapCurrentThread();
|
||||
return reinterpret_cast<intptr_t>(new webrtc::TestController(
|
||||
0, 0, "/mnt/sdcard/network_tester_client_config.dat",
|
||||
"/mnt/sdcard/network_tester_client_packet_log.dat"));
|
||||
|
@ -47,7 +50,8 @@ Java_com_google_media_networktester_NetworkTester_TestControllerRun(
|
|||
JNIEnv* jni,
|
||||
jclass,
|
||||
jlong native_pointer) {
|
||||
reinterpret_cast<webrtc::TestController*>(native_pointer)->Run();
|
||||
// 100 ms arbitrary chosen, but it works well.
|
||||
rtc::Thread::Current()->ProcessMessages(/*cms=*/100);
|
||||
}
|
||||
|
||||
extern "C" JNIEXPORT void JNICALL
|
||||
|
@ -60,4 +64,5 @@ Java_com_google_media_networktester_NetworkTester_DestroyTestController(
|
|||
if (test_controller) {
|
||||
delete test_controller;
|
||||
}
|
||||
rtc::ThreadManager::Instance()->UnwrapCurrentThread();
|
||||
}
|
|
@ -20,6 +20,7 @@
|
|||
namespace webrtc {
|
||||
|
||||
TEST(NetworkTesterTest, ServerClient) {
|
||||
rtc::AutoThread main_thread;
|
||||
TestController client(
|
||||
0, 0, webrtc::test::ResourcePath("network_tester/client_config", "dat"),
|
||||
webrtc::test::OutputPath() + "client_packet_log.dat");
|
||||
|
|
|
@ -15,8 +15,6 @@
|
|||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
#include "absl/types/optional.h"
|
||||
#include "api/task_queue/default_task_queue_factory.h"
|
||||
#include "api/task_queue/queued_task.h"
|
||||
#include "api/task_queue/task_queue_base.h"
|
||||
#include "rtc_base/time_utils.h"
|
||||
|
@ -29,12 +27,16 @@ namespace {
|
|||
|
||||
class SendPacketTask : public QueuedTask {
|
||||
public:
|
||||
explicit SendPacketTask(PacketSender* packet_sender)
|
||||
: target_time_ms_(rtc::TimeMillis()), packet_sender_(packet_sender) {}
|
||||
explicit SendPacketTask(
|
||||
PacketSender* packet_sender,
|
||||
rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag)
|
||||
: target_time_ms_(rtc::TimeMillis()),
|
||||
packet_sender_(packet_sender),
|
||||
task_safety_flag_(task_safety_flag) {}
|
||||
|
||||
private:
|
||||
bool Run() override {
|
||||
if (packet_sender_->IsSending()) {
|
||||
if (task_safety_flag_->alive() && packet_sender_->IsSending()) {
|
||||
packet_sender_->SendPacket();
|
||||
target_time_ms_ += packet_sender_->GetSendIntervalMs();
|
||||
int64_t delay_ms = std::max(static_cast<int64_t>(0),
|
||||
|
@ -48,17 +50,24 @@ class SendPacketTask : public QueuedTask {
|
|||
}
|
||||
int64_t target_time_ms_;
|
||||
PacketSender* const packet_sender_;
|
||||
rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag_;
|
||||
};
|
||||
|
||||
class UpdateTestSettingTask : public QueuedTask {
|
||||
public:
|
||||
UpdateTestSettingTask(PacketSender* packet_sender,
|
||||
std::unique_ptr<ConfigReader> config_reader)
|
||||
UpdateTestSettingTask(
|
||||
PacketSender* packet_sender,
|
||||
std::unique_ptr<ConfigReader> config_reader,
|
||||
rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag)
|
||||
: packet_sender_(packet_sender),
|
||||
config_reader_(std::move(config_reader)) {}
|
||||
config_reader_(std::move(config_reader)),
|
||||
task_safety_flag_(task_safety_flag) {}
|
||||
|
||||
private:
|
||||
bool Run() override {
|
||||
if (!task_safety_flag_->alive()) {
|
||||
return true;
|
||||
}
|
||||
auto config = config_reader_->GetNextConfig();
|
||||
if (config) {
|
||||
packet_sender_->UpdateTestSetting((*config).packet_size,
|
||||
|
@ -73,34 +82,38 @@ class UpdateTestSettingTask : public QueuedTask {
|
|||
}
|
||||
PacketSender* const packet_sender_;
|
||||
const std::unique_ptr<ConfigReader> config_reader_;
|
||||
rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag_;
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
PacketSender::PacketSender(TestController* test_controller,
|
||||
const std::string& config_file_path)
|
||||
PacketSender::PacketSender(
|
||||
TestController* test_controller,
|
||||
webrtc::TaskQueueBase* worker_queue,
|
||||
rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag,
|
||||
const std::string& config_file_path)
|
||||
: packet_size_(0),
|
||||
send_interval_ms_(0),
|
||||
sequence_number_(0),
|
||||
sending_(false),
|
||||
config_file_path_(config_file_path),
|
||||
test_controller_(test_controller),
|
||||
task_queue_factory_(CreateDefaultTaskQueueFactory()),
|
||||
worker_queue_(task_queue_factory_->CreateTaskQueue(
|
||||
"Packet Sender",
|
||||
TaskQueueFactory::Priority::HIGH)) {}
|
||||
worker_queue_(worker_queue),
|
||||
task_safety_flag_(task_safety_flag) {}
|
||||
|
||||
PacketSender::~PacketSender() = default;
|
||||
|
||||
void PacketSender::StartSending() {
|
||||
worker_queue_checker_.Detach();
|
||||
worker_queue_.PostTask([this]() {
|
||||
worker_queue_->PostTask(ToQueuedTask(task_safety_flag_, [this]() {
|
||||
RTC_DCHECK_RUN_ON(&worker_queue_checker_);
|
||||
sending_ = true;
|
||||
});
|
||||
worker_queue_.PostTask(std::make_unique<UpdateTestSettingTask>(
|
||||
this, std::make_unique<ConfigReader>(config_file_path_)));
|
||||
worker_queue_.PostTask(std::make_unique<SendPacketTask>(this));
|
||||
}));
|
||||
worker_queue_->PostTask(std::make_unique<UpdateTestSettingTask>(
|
||||
this, std::make_unique<ConfigReader>(config_file_path_),
|
||||
task_safety_flag_));
|
||||
worker_queue_->PostTask(
|
||||
std::make_unique<SendPacketTask>(this, task_safety_flag_));
|
||||
}
|
||||
|
||||
void PacketSender::StopSending() {
|
||||
|
|
|
@ -35,8 +35,11 @@ class TestController;
|
|||
|
||||
class PacketSender {
|
||||
public:
|
||||
PacketSender(TestController* test_controller,
|
||||
const std::string& config_file_path);
|
||||
PacketSender(
|
||||
TestController* test_controller,
|
||||
webrtc::TaskQueueBase* worker_queue,
|
||||
rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag,
|
||||
const std::string& config_file_path);
|
||||
~PacketSender();
|
||||
|
||||
PacketSender(const PacketSender&) = delete;
|
||||
|
@ -59,8 +62,8 @@ class PacketSender {
|
|||
bool sending_ RTC_GUARDED_BY(worker_queue_checker_);
|
||||
const std::string config_file_path_;
|
||||
TestController* const test_controller_;
|
||||
std::unique_ptr<TaskQueueFactory> task_queue_factory_;
|
||||
rtc::TaskQueue worker_queue_;
|
||||
webrtc::TaskQueueBase* worker_queue_;
|
||||
rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag_;
|
||||
};
|
||||
|
||||
} // namespace webrtc
|
||||
|
|
|
@ -8,13 +8,16 @@
|
|||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
|
||||
#include "rtc_base/null_socket_server.h"
|
||||
#include "rtc_tools/network_tester/test_controller.h"
|
||||
|
||||
int main(int /*argn*/, char* /*argv*/[]) {
|
||||
rtc::Thread main_thread(std::make_unique<rtc::NullSocketServer>());
|
||||
webrtc::TestController server(9090, 9090, "server_config.dat",
|
||||
"server_packet_log.dat");
|
||||
while (!server.IsTestDone()) {
|
||||
server.Run();
|
||||
// 100 ms is arbitrary chosen.
|
||||
main_thread.ProcessMessages(/*cms=*/100);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -14,7 +14,9 @@
|
|||
|
||||
#include "absl/types/optional.h"
|
||||
#include "rtc_base/checks.h"
|
||||
#include "rtc_base/internal/default_socket_server.h"
|
||||
#include "rtc_base/ip_address.h"
|
||||
#include "rtc_base/logging.h"
|
||||
#include "rtc_base/thread.h"
|
||||
|
||||
namespace webrtc {
|
||||
|
@ -23,20 +25,32 @@ TestController::TestController(int min_port,
|
|||
int max_port,
|
||||
const std::string& config_file_path,
|
||||
const std::string& log_file_path)
|
||||
// TODO(bugs.webrtc.org/13145): Add a SocketFactory argument.
|
||||
: socket_factory_(
|
||||
rtc::ThreadManager::Instance()->WrapCurrentThread()->socketserver()),
|
||||
: socket_server_(rtc::CreateDefaultSocketServer()),
|
||||
packet_sender_thread_(
|
||||
std::make_unique<rtc::Thread>(socket_server_.get())),
|
||||
socket_factory_(socket_server_.get()),
|
||||
config_file_path_(config_file_path),
|
||||
packet_logger_(log_file_path),
|
||||
local_test_done_(false),
|
||||
remote_test_done_(false) {
|
||||
remote_test_done_(false),
|
||||
task_safety_flag_(PendingTaskSafetyFlag::CreateDetached()) {
|
||||
RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
|
||||
packet_sender_checker_.Detach();
|
||||
send_data_.fill(42);
|
||||
udp_socket_ =
|
||||
std::unique_ptr<rtc::AsyncPacketSocket>(socket_factory_.CreateUdpSocket(
|
||||
rtc::SocketAddress(rtc::GetAnyIP(AF_INET), 0), min_port, max_port));
|
||||
udp_socket_->SignalReadPacket.connect(this, &TestController::OnReadPacket);
|
||||
packet_sender_thread_->SetName("PacketSender", nullptr);
|
||||
packet_sender_thread_->Start();
|
||||
packet_sender_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
|
||||
RTC_DCHECK_RUN_ON(packet_sender_thread_.get());
|
||||
udp_socket_ =
|
||||
std::unique_ptr<rtc::AsyncPacketSocket>(socket_factory_.CreateUdpSocket(
|
||||
rtc::SocketAddress(rtc::GetAnyIP(AF_INET), 0), min_port, max_port));
|
||||
udp_socket_->SignalReadPacket.connect(this, &TestController::OnReadPacket);
|
||||
});
|
||||
}
|
||||
|
||||
TestController::~TestController() {
|
||||
RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
|
||||
packet_sender_thread_->Invoke<void>(
|
||||
RTC_FROM_HERE, [this]() { task_safety_flag_->SetNotAlive(); });
|
||||
}
|
||||
|
||||
void TestController::SendConnectTo(const std::string& hostname, int port) {
|
||||
|
@ -45,18 +59,22 @@ void TestController::SendConnectTo(const std::string& hostname, int port) {
|
|||
NetworkTesterPacket packet;
|
||||
packet.set_type(NetworkTesterPacket::HAND_SHAKING);
|
||||
SendData(packet, absl::nullopt);
|
||||
MutexLock scoped_lock(&local_test_done_lock_);
|
||||
MutexLock scoped_lock(&test_done_lock_);
|
||||
local_test_done_ = false;
|
||||
remote_test_done_ = false;
|
||||
}
|
||||
|
||||
void TestController::Run() {
|
||||
RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
|
||||
rtc::Thread::Current()->ProcessMessages(0);
|
||||
}
|
||||
|
||||
void TestController::SendData(const NetworkTesterPacket& packet,
|
||||
absl::optional<size_t> data_size) {
|
||||
if (!packet_sender_thread_->IsCurrent()) {
|
||||
packet_sender_thread_->PostTask(ToQueuedTask(
|
||||
task_safety_flag_,
|
||||
[this, packet, data_size]() { this->SendData(packet, data_size); }));
|
||||
return;
|
||||
}
|
||||
RTC_DCHECK_RUN_ON(packet_sender_thread_.get());
|
||||
RTC_LOG(LS_VERBOSE) << "SendData";
|
||||
|
||||
// Can be call from packet_sender or from test_controller thread.
|
||||
size_t packet_size = packet.ByteSizeLong();
|
||||
send_data_[0] = packet_size;
|
||||
|
@ -69,17 +87,17 @@ void TestController::SendData(const NetworkTesterPacket& packet,
|
|||
}
|
||||
|
||||
void TestController::OnTestDone() {
|
||||
RTC_DCHECK_RUN_ON(&packet_sender_checker_);
|
||||
RTC_DCHECK_RUN_ON(packet_sender_thread_.get());
|
||||
NetworkTesterPacket packet;
|
||||
packet.set_type(NetworkTesterPacket::TEST_DONE);
|
||||
SendData(packet, absl::nullopt);
|
||||
MutexLock scoped_lock(&local_test_done_lock_);
|
||||
MutexLock scoped_lock(&test_done_lock_);
|
||||
local_test_done_ = true;
|
||||
}
|
||||
|
||||
bool TestController::IsTestDone() {
|
||||
RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
|
||||
MutexLock scoped_lock(&local_test_done_lock_);
|
||||
MutexLock scoped_lock(&test_done_lock_);
|
||||
return local_test_done_ && remote_test_done_;
|
||||
}
|
||||
|
||||
|
@ -88,7 +106,8 @@ void TestController::OnReadPacket(rtc::AsyncPacketSocket* socket,
|
|||
size_t len,
|
||||
const rtc::SocketAddress& remote_addr,
|
||||
const int64_t& packet_time_us) {
|
||||
RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
|
||||
RTC_DCHECK_RUN_ON(packet_sender_thread_.get());
|
||||
RTC_LOG(LS_VERBOSE) << "OnReadPacket";
|
||||
size_t packet_size = data[0];
|
||||
std::string receive_data(&data[1], packet_size);
|
||||
NetworkTesterPacket packet;
|
||||
|
@ -100,17 +119,21 @@ void TestController::OnReadPacket(rtc::AsyncPacketSocket* socket,
|
|||
packet.set_type(NetworkTesterPacket::TEST_START);
|
||||
remote_address_ = remote_addr;
|
||||
SendData(packet, absl::nullopt);
|
||||
packet_sender_.reset(new PacketSender(this, config_file_path_));
|
||||
packet_sender_.reset(new PacketSender(this, packet_sender_thread_.get(),
|
||||
task_safety_flag_,
|
||||
config_file_path_));
|
||||
packet_sender_->StartSending();
|
||||
MutexLock scoped_lock(&local_test_done_lock_);
|
||||
MutexLock scoped_lock(&test_done_lock_);
|
||||
local_test_done_ = false;
|
||||
remote_test_done_ = false;
|
||||
break;
|
||||
}
|
||||
case NetworkTesterPacket::TEST_START: {
|
||||
packet_sender_.reset(new PacketSender(this, config_file_path_));
|
||||
packet_sender_.reset(new PacketSender(this, packet_sender_thread_.get(),
|
||||
task_safety_flag_,
|
||||
config_file_path_));
|
||||
packet_sender_->StartSending();
|
||||
MutexLock scoped_lock(&local_test_done_lock_);
|
||||
MutexLock scoped_lock(&test_done_lock_);
|
||||
local_test_done_ = false;
|
||||
remote_test_done_ = false;
|
||||
break;
|
||||
|
@ -122,6 +145,7 @@ void TestController::OnReadPacket(rtc::AsyncPacketSocket* socket,
|
|||
break;
|
||||
}
|
||||
case NetworkTesterPacket::TEST_DONE: {
|
||||
MutexLock scoped_lock(&test_done_lock_);
|
||||
remote_test_done_ = true;
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "rtc_base/ignore_wundef.h"
|
||||
#include "rtc_base/socket_address.h"
|
||||
#include "rtc_base/synchronization/mutex.h"
|
||||
#include "rtc_base/system/no_unique_address.h"
|
||||
#include "rtc_base/third_party/sigslot/sigslot.h"
|
||||
#include "rtc_base/thread_annotations.h"
|
||||
#include "rtc_tools/network_tester/packet_logger.h"
|
||||
|
@ -49,12 +50,11 @@ class TestController : public sigslot::has_slots<> {
|
|||
int max_port,
|
||||
const std::string& config_file_path,
|
||||
const std::string& log_file_path);
|
||||
~TestController() override;
|
||||
|
||||
TestController(const TestController&) = delete;
|
||||
TestController& operator=(const TestController&) = delete;
|
||||
|
||||
void Run();
|
||||
|
||||
void SendConnectTo(const std::string& hostname, int port);
|
||||
|
||||
void SendData(const NetworkTesterPacket& packet,
|
||||
|
@ -70,18 +70,24 @@ class TestController : public sigslot::has_slots<> {
|
|||
size_t len,
|
||||
const rtc::SocketAddress& remote_addr,
|
||||
const int64_t& packet_time_us);
|
||||
SequenceChecker test_controller_thread_checker_;
|
||||
SequenceChecker packet_sender_checker_;
|
||||
rtc::BasicPacketSocketFactory socket_factory_;
|
||||
RTC_NO_UNIQUE_ADDRESS SequenceChecker test_controller_thread_checker_;
|
||||
std::unique_ptr<rtc::SocketServer> socket_server_;
|
||||
std::unique_ptr<rtc::Thread> packet_sender_thread_;
|
||||
rtc::BasicPacketSocketFactory socket_factory_
|
||||
RTC_GUARDED_BY(packet_sender_thread_);
|
||||
const std::string config_file_path_;
|
||||
PacketLogger packet_logger_;
|
||||
Mutex local_test_done_lock_;
|
||||
bool local_test_done_ RTC_GUARDED_BY(local_test_done_lock_);
|
||||
bool remote_test_done_;
|
||||
std::array<char, kEthernetMtu> send_data_;
|
||||
std::unique_ptr<rtc::AsyncPacketSocket> udp_socket_;
|
||||
PacketLogger packet_logger_ RTC_GUARDED_BY(packet_sender_thread_);
|
||||
Mutex test_done_lock_ RTC_GUARDED_BY(test_controller_thread_checker_);
|
||||
bool local_test_done_ RTC_GUARDED_BY(test_done_lock_);
|
||||
bool remote_test_done_ RTC_GUARDED_BY(test_done_lock_);
|
||||
std::array<char, kEthernetMtu> send_data_
|
||||
RTC_GUARDED_BY(packet_sender_thread_);
|
||||
std::unique_ptr<rtc::AsyncPacketSocket> udp_socket_
|
||||
RTC_GUARDED_BY(packet_sender_thread_);
|
||||
rtc::SocketAddress remote_address_;
|
||||
std::unique_ptr<PacketSender> packet_sender_;
|
||||
std::unique_ptr<PacketSender> packet_sender_
|
||||
RTC_GUARDED_BY(packet_sender_thread_);
|
||||
rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag_;
|
||||
};
|
||||
|
||||
} // namespace webrtc
|
||||
|
|
Loading…
Reference in a new issue