/* * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. * * Data Channel Benchmarking tool. * * Create a server using: ./data_channel_benchmark --server --port 12345 * Start the flow of data from the server to a client using: * ./data_channel_benchmark --port 12345 --transfer_size 100 --packet_size 8196 * The throughput is reported on the server console. * * The negotiation does not require a 3rd party server and is done over a gRPC * transport. No TURN server is configured, so both peers need to be reachable * using STUN only. */ #include #include #include "absl/cleanup/cleanup.h" #include "absl/flags/flag.h" #include "absl/flags/parse.h" #include "rtc_base/event.h" #include "rtc_base/ssl_adapter.h" #include "rtc_base/thread.h" #include "rtc_tools/data_channel_benchmark/grpc_signaling.h" #include "rtc_tools/data_channel_benchmark/peer_connection_client.h" #include "system_wrappers/include/field_trial.h" ABSL_FLAG(int, verbose, 0, "verbosity level (0-5)"); ABSL_FLAG(bool, server, false, "Server mode"); ABSL_FLAG(bool, oneshot, true, "Terminate after serving a client"); ABSL_FLAG(std::string, address, "localhost", "Connect to server address"); ABSL_FLAG(uint16_t, port, 0, "Connect to port (0 for random)"); ABSL_FLAG(uint64_t, transfer_size, 2, "Transfer size (MiB)"); ABSL_FLAG(uint64_t, packet_size, 256 * 1024, "Packet size"); ABSL_FLAG(std::string, force_fieldtrials, "", "Field trials control experimental feature code which can be forced. " "E.g. running with --force_fieldtrials=WebRTC-FooFeature/Enable/" " will assign the group Enable to field trial WebRTC-FooFeature."); struct SetupMessage { size_t packet_size; size_t transfer_size; std::string ToString() { char buffer[64]; rtc::SimpleStringBuilder sb(buffer); sb << packet_size << "," << transfer_size; return sb.str(); } static SetupMessage FromString(absl::string_view sv) { SetupMessage result; auto parameters = rtc::split(sv, ','); std::from_chars(parameters[0].data(), parameters[0].data() + parameters[0].size(), result.packet_size, 10); std::from_chars(parameters[1].data(), parameters[1].data() + parameters[1].size(), result.transfer_size, 10); return result; } }; class DataChannelObserverImpl : public webrtc::DataChannelObserver { public: explicit DataChannelObserverImpl(webrtc::DataChannelInterface* dc) : dc_(dc), bytes_received_(0) {} void OnStateChange() override { RTC_LOG(LS_INFO) << "State changed to " << dc_->state(); switch (dc_->state()) { case webrtc::DataChannelInterface::DataState::kOpen: open_event_.Set(); break; case webrtc::DataChannelInterface::DataState::kClosed: closed_event_.Set(); break; default: break; } } void OnMessage(const webrtc::DataBuffer& buffer) override { bytes_received_ += buffer.data.size(); if (bytes_received_threshold_ && bytes_received_ >= bytes_received_threshold_) { bytes_received_event_.Set(); } if (setup_message_.empty() && !buffer.binary) { setup_message_.assign(buffer.data.cdata(), buffer.data.size()); setup_message_event_.Set(); } } void OnBufferedAmountChange(uint64_t sent_data_size) override { if (dc_->buffered_amount() < webrtc::DataChannelInterface::MaxSendQueueSize() / 2) low_buffered_threshold_event_.Set(); else low_buffered_threshold_event_.Reset(); } bool WaitForOpenState(int duration_ms) { return dc_->state() == webrtc::DataChannelInterface::DataState::kOpen || open_event_.Wait(duration_ms); } bool WaitForClosedState(int duration_ms) { return dc_->state() == webrtc::DataChannelInterface::DataState::kClosed || closed_event_.Wait(duration_ms); } // Set how many received bytes are required until // WaitForBytesReceivedThreshold return true. void SetBytesReceivedThreshold(uint64_t bytes_received_threshold) { bytes_received_threshold_ = bytes_received_threshold; if (bytes_received_ >= bytes_received_threshold_) bytes_received_event_.Set(); } // Wait until the received byte count reaches the desired value. bool WaitForBytesReceivedThreshold(int duration_ms) { return (bytes_received_threshold_ && bytes_received_ >= bytes_received_threshold_) || bytes_received_event_.Wait(duration_ms); } bool WaitForLowbufferedThreshold(int duration_ms) { return low_buffered_threshold_event_.Wait(duration_ms); } std::string SetupMessage() { return setup_message_; } bool WaitForSetupMessage(int duration_ms) { return setup_message_event_.Wait(duration_ms); } private: webrtc::DataChannelInterface* dc_; rtc::Event open_event_; rtc::Event closed_event_; rtc::Event bytes_received_event_; absl::optional bytes_received_threshold_; uint64_t bytes_received_; rtc::Event low_buffered_threshold_event_; std::string setup_message_; rtc::Event setup_message_event_; }; int RunServer() { bool oneshot = absl::GetFlag(FLAGS_oneshot); uint16_t port = absl::GetFlag(FLAGS_port); auto signaling_thread = rtc::Thread::Create(); signaling_thread->Start(); { auto factory = webrtc::PeerConnectionClient::CreateDefaultFactory( signaling_thread.get()); auto grpc_server = webrtc::GrpcSignalingServerInterface::Create( [factory = rtc::scoped_refptr( factory)](webrtc::SignalingInterface* signaling) { webrtc::PeerConnectionClient client(factory.get(), signaling); client.StartPeerConnection(); auto peer_connection = client.peerConnection(); // Set up the data channel auto dc_or_error = peer_connection->CreateDataChannelOrError("benchmark", nullptr); auto data_channel = dc_or_error.MoveValue(); auto data_channel_observer = std::make_unique(data_channel.get()); data_channel->RegisterObserver(data_channel_observer.get()); absl::Cleanup unregister_observer( [data_channel] { data_channel->UnregisterObserver(); }); // Wait for a first message from the remote peer. // It configures how much data should be sent and how big the packets // should be. // First message is "packet_size,transfer_size". data_channel_observer->WaitForSetupMessage(rtc::Event::kForever); auto parameters = SetupMessage::FromString(data_channel_observer->SetupMessage()); // Wait for the sender and receiver peers to stabilize (send all ACKs) // This makes it easier to isolate the sending part when profiling. absl::SleepFor(absl::Seconds(1)); std::string data(parameters.packet_size, '0'); size_t remaining_data = parameters.transfer_size; auto begin_time = webrtc::Clock::GetRealTimeClock()->CurrentTime(); while (remaining_data) { if (remaining_data < data.size()) data.resize(remaining_data); rtc::CopyOnWriteBuffer buffer(data); webrtc::DataBuffer data_buffer(buffer, true); if (!data_channel->Send(data_buffer)) { // If the send() call failed, the buffers are full. // We wait until there's more room. data_channel_observer->WaitForLowbufferedThreshold( rtc::Event::kForever); continue; } remaining_data -= buffer.size(); fprintf(stderr, "Progress: %zu / %zu (%zu%%)\n", (parameters.transfer_size - remaining_data), parameters.transfer_size, (100 - remaining_data * 100 / parameters.transfer_size)); } // Receiver signals the data channel close event when it has received // all the data it requested. data_channel_observer->WaitForClosedState(rtc::Event::kForever); auto end_time = webrtc::Clock::GetRealTimeClock()->CurrentTime(); auto duration_ms = (end_time - begin_time).ms(); double throughput = (parameters.transfer_size / 1024. / 1024.) / (duration_ms / 1000.); printf("Elapsed time: %zums %gMiB/s\n", duration_ms, throughput); }, port, oneshot); grpc_server->Start(); printf("Server listening on port %d\n", grpc_server->SelectedPort()); grpc_server->Wait(); } signaling_thread->Quit(); return 0; } int RunClient() { uint16_t port = absl::GetFlag(FLAGS_port); std::string server_address = absl::GetFlag(FLAGS_address); size_t transfer_size = absl::GetFlag(FLAGS_transfer_size) * 1024 * 1024; size_t packet_size = absl::GetFlag(FLAGS_packet_size); auto signaling_thread = rtc::Thread::Create(); signaling_thread->Start(); { auto factory = webrtc::PeerConnectionClient::CreateDefaultFactory( signaling_thread.get()); auto grpc_client = webrtc::GrpcSignalingClientInterface::Create( server_address + ":" + std::to_string(port)); webrtc::PeerConnectionClient client(factory.get(), grpc_client->signaling_client()); // Set up the callback to receive the data channel from the sender. rtc::scoped_refptr data_channel; rtc::Event got_data_channel; client.SetOnDataChannel( [&data_channel, &got_data_channel]( rtc::scoped_refptr channel) { data_channel = channel; got_data_channel.Set(); }); // Connect to the server. if (!grpc_client->Start()) { fprintf(stderr, "Failed to connect to server\n"); return 1; } // Wait for the data channel to be received got_data_channel.Wait(rtc::Event::kForever); // DataChannel needs an observer to start draining the read queue DataChannelObserverImpl observer(data_channel.get()); observer.SetBytesReceivedThreshold(transfer_size); data_channel->RegisterObserver(&observer); absl::Cleanup unregister_observer( [data_channel] { data_channel->UnregisterObserver(); }); // Send a configuration string to the server to tell it to send // 'packet_size' bytes packets and send a total of 'transfer_size' MB. observer.WaitForOpenState(rtc::Event::kForever); SetupMessage setup_message = { .packet_size = packet_size, .transfer_size = transfer_size, }; if (!data_channel->Send(webrtc::DataBuffer(setup_message.ToString()))) { fprintf(stderr, "Failed to send parameter string\n"); return 1; } // Wait until we have received all the data observer.WaitForBytesReceivedThreshold(rtc::Event::kForever); // Close the data channel, signaling to the server we have received // all the requested data. data_channel->Close(); } signaling_thread->Quit(); return 0; } int main(int argc, char** argv) { rtc::InitializeSSL(); absl::ParseCommandLine(argc, argv); // Make sure that higher severity number means more logs by reversing the // rtc::LoggingSeverity values. auto logging_severity = std::max(0, rtc::LS_NONE - absl::GetFlag(FLAGS_verbose)); rtc::LogMessage::LogToDebug( static_cast(logging_severity)); bool is_server = absl::GetFlag(FLAGS_server); std::string field_trials = absl::GetFlag(FLAGS_force_fieldtrials); webrtc::field_trial::InitFieldTrialsFromString(field_trials.c_str()); return is_server ? RunServer() : RunClient(); }