Migrate cricket::Port asynchronous calls to TaskQueueBase interface

Bug: webrtc:9702
Change-Id: I13e05ced190ca64a217961d74ee92dd9c15ed8ce
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/271641
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37849}
This commit is contained in:
Danil Chapovalov 2022-08-19 18:16:59 +02:00 committed by WebRTC LUCI CQ
parent 372ecc30fa
commit 34f6d1c06d
6 changed files with 75 additions and 85 deletions

View file

@ -117,7 +117,6 @@ rtc_library("rtc_p2p") {
"../rtc_base:checks", "../rtc_base:checks",
"../rtc_base:event_tracer", "../rtc_base:event_tracer",
"../rtc_base:ip_address", "../rtc_base:ip_address",
"../rtc_base:location",
"../rtc_base:logging", "../rtc_base:logging",
"../rtc_base:macromagic", "../rtc_base:macromagic",
"../rtc_base:net_helpers", "../rtc_base:net_helpers",

View file

@ -37,8 +37,14 @@
#include "rtc_base/third_party/base64/base64.h" #include "rtc_base/third_party/base64/base64.h"
#include "rtc_base/trace_event.h" #include "rtc_base/trace_event.h"
namespace cricket {
namespace { namespace {
using ::webrtc::RTCError;
using ::webrtc::RTCErrorType;
using ::webrtc::TaskQueueBase;
using ::webrtc::TimeDelta;
rtc::PacketInfoProtocolType ConvertProtocolTypeToPacketInfoProtocolType( rtc::PacketInfoProtocolType ConvertProtocolTypeToPacketInfoProtocolType(
cricket::ProtocolType type) { cricket::ProtocolType type) {
switch (type) { switch (type) {
@ -61,11 +67,6 @@ const int kPortTimeoutDelay = cricket::STUN_TOTAL_TIMEOUT + 5000;
} // namespace } // namespace
namespace cricket {
using webrtc::RTCError;
using webrtc::RTCErrorType;
// TODO(ronghuawu): Use "local", "srflx", "prflx" and "relay". But this requires // TODO(ronghuawu): Use "local", "srflx", "prflx" and "relay". But this requires
// the signaling part be updated correspondingly as well. // the signaling part be updated correspondingly as well.
const char LOCAL_PORT_TYPE[] = "local"; const char LOCAL_PORT_TYPE[] = "local";
@ -105,7 +106,7 @@ std::string Port::ComputeFoundation(absl::string_view type,
return rtc::ToString(rtc::ComputeCrc32(sb.Release())); return rtc::ToString(rtc::ComputeCrc32(sb.Release()));
} }
Port::Port(rtc::Thread* thread, Port::Port(TaskQueueBase* thread,
absl::string_view type, absl::string_view type,
rtc::PacketSocketFactory* factory, rtc::PacketSocketFactory* factory,
const rtc::Network* network, const rtc::Network* network,
@ -134,7 +135,7 @@ Port::Port(rtc::Thread* thread,
Construct(); Construct();
} }
Port::Port(rtc::Thread* thread, Port::Port(TaskQueueBase* thread,
absl::string_view type, absl::string_view type,
rtc::PacketSocketFactory* factory, rtc::PacketSocketFactory* factory,
const rtc::Network* network, const rtc::Network* network,
@ -177,8 +178,7 @@ void Port::Construct() {
network_->SignalTypeChanged.connect(this, &Port::OnNetworkTypeChanged); network_->SignalTypeChanged.connect(this, &Port::OnNetworkTypeChanged);
network_cost_ = network_->GetCost(field_trials()); network_cost_ = network_->GetCost(field_trials());
thread_->PostDelayed(RTC_FROM_HERE, timeout_delay_, this, PostDestroyIfDead(/*delayed=*/true);
MSG_DESTROY_IF_DEAD);
RTC_LOG(LS_INFO) << ToString() << ": Port created with network cost " RTC_LOG(LS_INFO) << ToString() << ": Port created with network cost "
<< network_cost_; << network_cost_;
} }
@ -822,19 +822,33 @@ void Port::KeepAliveUntilPruned() {
void Port::Prune() { void Port::Prune() {
state_ = State::PRUNED; state_ = State::PRUNED;
thread_->Post(RTC_FROM_HERE, this, MSG_DESTROY_IF_DEAD); PostDestroyIfDead(/*delayed=*/false);
} }
// Call to stop any currently pending operations from running. // Call to stop any currently pending operations from running.
void Port::CancelPendingTasks() { void Port::CancelPendingTasks() {
TRACE_EVENT0("webrtc", "Port::CancelPendingTasks"); TRACE_EVENT0("webrtc", "Port::CancelPendingTasks");
RTC_DCHECK_RUN_ON(thread_); RTC_DCHECK_RUN_ON(thread_);
thread_->Clear(this); weak_factory_.InvalidateWeakPtrs();
} }
void Port::OnMessage(rtc::Message* pmsg) { void Port::PostDestroyIfDead(bool delayed) {
rtc::WeakPtr<Port> weak_ptr = NewWeakPtr();
auto task = [weak_ptr = std::move(weak_ptr)] {
if (weak_ptr) {
weak_ptr->DestroyIfDead();
}
};
if (delayed) {
thread_->PostDelayedTask(std::move(task),
TimeDelta::Millis(timeout_delay_));
} else {
thread_->PostTask(std::move(task));
}
}
void Port::DestroyIfDead() {
RTC_DCHECK_RUN_ON(thread_); RTC_DCHECK_RUN_ON(thread_);
RTC_DCHECK(pmsg->message_id == MSG_DESTROY_IF_DEAD);
bool dead = bool dead =
(state_ == State::INIT || state_ == State::PRUNED) && (state_ == State::INIT || state_ == State::PRUNED) &&
connections_.empty() && connections_.empty() &&
@ -908,8 +922,7 @@ bool Port::OnConnectionDestroyed(Connection* conn) {
// not cause the Port to be destroyed. // not cause the Port to be destroyed.
if (connections_.empty()) { if (connections_.empty()) {
last_time_all_connections_removed_ = rtc::TimeMillis(); last_time_all_connections_removed_ = rtc::TimeMillis();
thread_->PostDelayed(RTC_FROM_HERE, timeout_delay_, this, PostDestroyIfDead(/*delayed=*/true);
MSG_DESTROY_IF_DEAD);
} }
return true; return true;

View file

@ -24,6 +24,7 @@
#include "api/field_trials_view.h" #include "api/field_trials_view.h"
#include "api/packet_socket_factory.h" #include "api/packet_socket_factory.h"
#include "api/rtc_error.h" #include "api/rtc_error.h"
#include "api/task_queue/task_queue_base.h"
#include "api/transport/field_trial_based_config.h" #include "api/transport/field_trial_based_config.h"
#include "api/transport/stun.h" #include "api/transport/stun.h"
#include "logging/rtc_event_log/events/rtc_event_ice_candidate_pair.h" #include "logging/rtc_event_log/events/rtc_event_ice_candidate_pair.h"
@ -46,7 +47,6 @@
#include "rtc_base/socket_address.h" #include "rtc_base/socket_address.h"
#include "rtc_base/system/rtc_export.h" #include "rtc_base/system/rtc_export.h"
#include "rtc_base/third_party/sigslot/sigslot.h" #include "rtc_base/third_party/sigslot/sigslot.h"
#include "rtc_base/thread.h"
#include "rtc_base/weak_ptr.h" #include "rtc_base/weak_ptr.h"
namespace cricket { namespace cricket {
@ -176,9 +176,7 @@ typedef std::set<rtc::SocketAddress> ServerAddresses;
// Represents a local communication mechanism that can be used to create // Represents a local communication mechanism that can be used to create
// connections to similar mechanisms of the other client. Subclasses of this // connections to similar mechanisms of the other client. Subclasses of this
// one add support for specific mechanisms like local UDP ports. // one add support for specific mechanisms like local UDP ports.
class Port : public PortInterface, class Port : public PortInterface, public sigslot::has_slots<> {
public rtc::MessageHandler,
public sigslot::has_slots<> {
public: public:
// INIT: The state when a port is just created. // INIT: The state when a port is just created.
// KEEP_ALIVE_UNTIL_PRUNED: A port should not be destroyed even if no // KEEP_ALIVE_UNTIL_PRUNED: A port should not be destroyed even if no
@ -186,14 +184,14 @@ class Port : public PortInterface,
// PRUNED: It will be destroyed if no connection is using it for a period of // PRUNED: It will be destroyed if no connection is using it for a period of
// 30 seconds. // 30 seconds.
enum class State { INIT, KEEP_ALIVE_UNTIL_PRUNED, PRUNED }; enum class State { INIT, KEEP_ALIVE_UNTIL_PRUNED, PRUNED };
Port(rtc::Thread* thread, Port(webrtc::TaskQueueBase* thread,
absl::string_view type, absl::string_view type,
rtc::PacketSocketFactory* factory, rtc::PacketSocketFactory* factory,
const rtc::Network* network, const rtc::Network* network,
absl::string_view username_fragment, absl::string_view username_fragment,
absl::string_view password, absl::string_view password,
const webrtc::FieldTrialsView* field_trials = nullptr); const webrtc::FieldTrialsView* field_trials = nullptr);
Port(rtc::Thread* thread, Port(webrtc::TaskQueueBase* thread,
absl::string_view type, absl::string_view type,
rtc::PacketSocketFactory* factory, rtc::PacketSocketFactory* factory,
const rtc::Network* network, const rtc::Network* network,
@ -232,7 +230,7 @@ class Port : public PortInterface,
void CancelPendingTasks(); void CancelPendingTasks();
// The thread on which this port performs its I/O. // The thread on which this port performs its I/O.
rtc::Thread* thread() { return thread_; } webrtc::TaskQueueBase* thread() { return thread_; }
// The factory used to create the sockets of this port. // The factory used to create the sockets of this port.
rtc::PacketSocketFactory* socket_factory() const { return factory_; } rtc::PacketSocketFactory* socket_factory() const { return factory_; }
@ -346,8 +344,6 @@ class Port : public PortInterface,
// Called if the port has no connections and is no longer useful. // Called if the port has no connections and is no longer useful.
void Destroy(); void Destroy();
void OnMessage(rtc::Message* pmsg) override;
// Debugging description of this port // Debugging description of this port
std::string ToString() const override; std::string ToString() const override;
uint16_t min_port() { return min_port_; } uint16_t min_port() { return min_port_; }
@ -396,8 +392,6 @@ class Port : public PortInterface,
const rtc::SocketAddress& base_address); const rtc::SocketAddress& base_address);
protected: protected:
enum { MSG_DESTROY_IF_DEAD = 0, MSG_FIRST_AVAILABLE };
virtual void UpdateNetworkCost(); virtual void UpdateNetworkCost();
void set_type(absl::string_view type) { type_ = std::string(type); } void set_type(absl::string_view type) { type_ = std::string(type); }
@ -470,6 +464,9 @@ class Port : public PortInterface,
private: private:
void Construct(); void Construct();
void PostDestroyIfDead(bool delayed);
void DestroyIfDead();
// Called internally when deleting a connection object. // Called internally when deleting a connection object.
// Returns true if the connection object was removed from the `connections_` // Returns true if the connection object was removed from the `connections_`
// list and the state updated accordingly. If the connection was not found // list and the state updated accordingly. If the connection was not found
@ -485,7 +482,7 @@ class Port : public PortInterface,
void OnNetworkTypeChanged(const rtc::Network* network); void OnNetworkTypeChanged(const rtc::Network* network);
rtc::Thread* const thread_; webrtc::TaskQueueBase* const thread_;
rtc::PacketSocketFactory* const factory_; rtc::PacketSocketFactory* const factory_;
std::string type_; std::string type_;
bool send_retransmit_count_attribute_; bool send_retransmit_count_attribute_;

View file

@ -79,7 +79,6 @@
#include "p2p/base/p2p_constants.h" #include "p2p/base/p2p_constants.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/ip_address.h" #include "rtc_base/ip_address.h"
#include "rtc_base/location.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/net_helper.h" #include "rtc_base/net_helper.h"
#include "rtc_base/rate_tracker.h" #include "rtc_base/rate_tracker.h"

View file

@ -31,7 +31,9 @@
#include "rtc_base/strings/string_builder.h" #include "rtc_base/strings/string_builder.h"
namespace cricket { namespace cricket {
using ::webrtc::SafeTask; using ::webrtc::SafeTask;
using ::webrtc::TaskQueueBase;
using ::webrtc::TimeDelta; using ::webrtc::TimeDelta;
// TODO(juberti): Move to stun.h when relay messages have been renamed. // TODO(juberti): Move to stun.h when relay messages have been renamed.
@ -210,7 +212,7 @@ class TurnEntry : public sigslot::has_slots<> {
std::string remote_ufrag_; std::string remote_ufrag_;
}; };
TurnPort::TurnPort(rtc::Thread* thread, TurnPort::TurnPort(TaskQueueBase* thread,
rtc::PacketSocketFactory* factory, rtc::PacketSocketFactory* factory,
const rtc::Network* network, const rtc::Network* network,
rtc::AsyncPacketSocket* socket, rtc::AsyncPacketSocket* socket,
@ -250,7 +252,7 @@ TurnPort::TurnPort(rtc::Thread* thread,
allocate_mismatch_retries_(0), allocate_mismatch_retries_(0),
turn_customizer_(customizer) {} turn_customizer_(customizer) {}
TurnPort::TurnPort(rtc::Thread* thread, TurnPort::TurnPort(TaskQueueBase* thread,
rtc::PacketSocketFactory* factory, rtc::PacketSocketFactory* factory,
const rtc::Network* network, const rtc::Network* network,
uint16_t min_port, uint16_t min_port,
@ -894,7 +896,8 @@ void TurnPort::OnAllocateError(int error_code, absl::string_view reason) {
// We will send SignalPortError asynchronously as this can be sent during // We will send SignalPortError asynchronously as this can be sent during
// port initialization. This way it will not be blocking other port // port initialization. This way it will not be blocking other port
// creation. // creation.
thread()->Post(RTC_FROM_HERE, this, MSG_ALLOCATE_ERROR); thread()->PostTask(
SafeTask(task_safety_.flag(), [this] { SignalPortError(this); }));
std::string address = GetLocalAddress().HostAsSensitiveURIString(); std::string address = GetLocalAddress().HostAsSensitiveURIString();
int port = GetLocalAddress().port(); int port = GetLocalAddress().port();
if (server_address_.proto == PROTO_TCP && if (server_address_.proto == PROTO_TCP &&
@ -911,7 +914,8 @@ void TurnPort::OnRefreshError() {
// Need to clear the requests asynchronously because otherwise, the refresh // Need to clear the requests asynchronously because otherwise, the refresh
// request may be deleted twice: once at the end of the message processing // request may be deleted twice: once at the end of the message processing
// and the other in HandleRefreshError(). // and the other in HandleRefreshError().
thread()->Post(RTC_FROM_HERE, this, MSG_REFRESH_ERROR); thread()->PostTask(
SafeTask(task_safety_.flag(), [this] { HandleRefreshError(); }));
} }
void TurnPort::HandleRefreshError() { void TurnPort::HandleRefreshError() {
@ -967,39 +971,21 @@ bool TurnPort::AllowedTurnPort(int port,
return false; return false;
} }
void TurnPort::OnMessage(rtc::Message* message) { void TurnPort::TryAlternateServer() {
switch (message->message_id) { if (server_address().proto == PROTO_UDP) {
case MSG_ALLOCATE_ERROR: // Send another allocate request to alternate server, with the received
SignalPortError(this); // realm and nonce values.
break; SendRequest(new TurnAllocateRequest(this), 0);
case MSG_ALLOCATE_MISMATCH: } else {
OnAllocateMismatch(); // Since it's TCP, we have to delete the connected socket and reconnect
break; // with the alternate server. PrepareAddress will send stun binding once
case MSG_REFRESH_ERROR: // the new socket is connected.
HandleRefreshError(); RTC_DCHECK(server_address().proto == PROTO_TCP ||
break; server_address().proto == PROTO_TLS);
case MSG_TRY_ALTERNATE_SERVER: RTC_DCHECK(!SharedSocket());
if (server_address().proto == PROTO_UDP) { delete socket_;
// Send another allocate request to alternate server, with the received socket_ = nullptr;
// realm and nonce values. PrepareAddress();
SendRequest(new TurnAllocateRequest(this), 0);
} else {
// Since it's TCP, we have to delete the connected socket and reconnect
// with the alternate server. PrepareAddress will send stun binding once
// the new socket is connected.
RTC_DCHECK(server_address().proto == PROTO_TCP ||
server_address().proto == PROTO_TLS);
RTC_DCHECK(!SharedSocket());
delete socket_;
socket_ = NULL;
PrepareAddress();
}
break;
case MSG_ALLOCATION_RELEASED:
Close();
break;
default:
Port::OnMessage(message);
} }
} }
@ -1449,12 +1435,13 @@ void TurnAllocateRequest::OnErrorResponse(StunMessage* response) {
case STUN_ERROR_TRY_ALTERNATE: case STUN_ERROR_TRY_ALTERNATE:
OnTryAlternate(response, error_code); OnTryAlternate(response, error_code);
break; break;
case STUN_ERROR_ALLOCATION_MISMATCH: case STUN_ERROR_ALLOCATION_MISMATCH: {
// We must handle this error async because trying to delete the socket in // We must handle this error async because trying to delete the socket in
// OnErrorResponse will cause a deadlock on the socket. // OnErrorResponse will cause a deadlock on the socket.
port_->thread()->Post(RTC_FROM_HERE, port_, TurnPort* port = port_;
TurnPort::MSG_ALLOCATE_MISMATCH); port->thread()->PostTask(SafeTask(
break; port->task_safety_.flag(), [port] { port->OnAllocateMismatch(); }));
} break;
default: default:
RTC_LOG(LS_WARNING) << port_->ToString() RTC_LOG(LS_WARNING) << port_->ToString()
<< ": Received TURN allocate error response, id=" << ": Received TURN allocate error response, id="
@ -1551,8 +1538,9 @@ void TurnAllocateRequest::OnTryAlternate(StunMessage* response, int code) {
// For TCP, we can't close the original Tcp socket during handling a 300 as // For TCP, we can't close the original Tcp socket during handling a 300 as
// we're still inside that socket's event handler. Doing so will cause // we're still inside that socket's event handler. Doing so will cause
// deadlock. // deadlock.
port_->thread()->Post(RTC_FROM_HERE, port_, TurnPort* port = port_;
TurnPort::MSG_TRY_ALTERNATE_SERVER); port->thread()->PostTask(SafeTask(port->task_safety_.flag(),
[port] { port->TryAlternateServer(); }));
} }
TurnRefreshRequest::TurnRefreshRequest(TurnPort* port, int lifetime /*= -1*/) TurnRefreshRequest::TurnRefreshRequest(TurnPort* port, int lifetime /*= -1*/)
@ -1602,8 +1590,9 @@ void TurnRefreshRequest::OnResponse(StunMessage* response) {
} else { } else {
// If we scheduled a refresh with lifetime 0, we're releasing this // If we scheduled a refresh with lifetime 0, we're releasing this
// allocation; see TurnPort::Release. // allocation; see TurnPort::Release.
port_->thread()->Post(RTC_FROM_HERE, port_, TurnPort* port = port_;
TurnPort::MSG_ALLOCATION_RELEASED); port->thread()->PostTask(
SafeTask(port->task_safety_.flag(), [port] { port->Close(); }));
} }
port_->SignalTurnRefreshResult(port_, TURN_SUCCESS_RESULT_CODE); port_->SignalTurnRefreshResult(port_, TURN_SUCCESS_RESULT_CODE);

View file

@ -24,6 +24,7 @@
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "api/async_dns_resolver.h" #include "api/async_dns_resolver.h"
#include "api/task_queue/pending_task_safety_flag.h" #include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "p2p/base/port.h" #include "p2p/base/port.h"
#include "p2p/client/basic_port_allocator.h" #include "p2p/client/basic_port_allocator.h"
#include "rtc_base/async_packet_socket.h" #include "rtc_base/async_packet_socket.h"
@ -204,7 +205,7 @@ class TurnPort : public Port {
void CloseForTest() { Close(); } void CloseForTest() { Close(); }
protected: protected:
TurnPort(rtc::Thread* thread, TurnPort(webrtc::TaskQueueBase* thread,
rtc::PacketSocketFactory* factory, rtc::PacketSocketFactory* factory,
const rtc::Network* network, const rtc::Network* network,
rtc::AsyncPacketSocket* socket, rtc::AsyncPacketSocket* socket,
@ -219,7 +220,7 @@ class TurnPort : public Port {
rtc::SSLCertificateVerifier* tls_cert_verifier = nullptr, rtc::SSLCertificateVerifier* tls_cert_verifier = nullptr,
const webrtc::FieldTrialsView* field_trials = nullptr); const webrtc::FieldTrialsView* field_trials = nullptr);
TurnPort(rtc::Thread* thread, TurnPort(webrtc::TaskQueueBase* thread,
rtc::PacketSocketFactory* factory, rtc::PacketSocketFactory* factory,
const rtc::Network* network, const rtc::Network* network,
uint16_t min_port, uint16_t min_port,
@ -249,21 +250,13 @@ class TurnPort : public Port {
void Close(); void Close();
private: private:
enum {
MSG_ALLOCATE_ERROR = MSG_FIRST_AVAILABLE,
MSG_ALLOCATE_MISMATCH,
MSG_TRY_ALTERNATE_SERVER,
MSG_REFRESH_ERROR,
MSG_ALLOCATION_RELEASED
};
typedef std::list<TurnEntry*> EntryList; typedef std::list<TurnEntry*> EntryList;
typedef std::map<rtc::Socket::Option, int> SocketOptionsMap; typedef std::map<rtc::Socket::Option, int> SocketOptionsMap;
typedef std::set<rtc::SocketAddress> AttemptedServerSet; typedef std::set<rtc::SocketAddress> AttemptedServerSet;
static bool AllowedTurnPort(int port, static bool AllowedTurnPort(int port,
const webrtc::FieldTrialsView* field_trials); const webrtc::FieldTrialsView* field_trials);
void OnMessage(rtc::Message* pmsg) override; void TryAlternateServer();
bool CreateTurnClientSocket(); bool CreateTurnClientSocket();