Change CallbackDeferrer to use a variant and callback pointer instead of std::function

This should substantially reduce the overhead due to deferred callbacks in profiles.

Bug: webrtc:15723
Change-Id: I4c52beb91eb08c9b0ac2d1ce9a4e11839aa35e38
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/331020
Reviewed-by: Victor Boivie <boivie@webrtc.org>
Commit-Queue: Daniel Collins <dpcollins@google.com>
Cr-Commit-Position: refs/heads/main@{#41363}
This commit is contained in:
Daniel Collins 2023-12-11 14:19:57 -05:00 committed by WebRTC LUCI CQ
parent 871af9225f
commit f418f48702
3 changed files with 74 additions and 55 deletions

View file

@ -178,6 +178,7 @@ rtc_library("dcsctp_socket") {
"//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/strings", "//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional", "//third_party/abseil-cpp/absl/types:optional",
"//third_party/abseil-cpp/absl/types:variant",
] ]
} }

View file

@ -12,31 +12,6 @@
#include "api/make_ref_counted.h" #include "api/make_ref_counted.h"
namespace dcsctp { namespace dcsctp {
namespace {
// A wrapper around the move-only DcSctpMessage, to let it be captured in a
// lambda.
class MessageDeliverer {
public:
explicit MessageDeliverer(DcSctpMessage&& message)
: state_(rtc::make_ref_counted<State>(std::move(message))) {}
void Deliver(DcSctpSocketCallbacks& c) {
// Really ensure that it's only called once.
RTC_DCHECK(!state_->has_delivered);
state_->has_delivered = true;
c.OnMessageReceived(std::move(state_->message));
}
private:
struct State : public webrtc::RefCountInterface {
explicit State(DcSctpMessage&& m)
: has_delivered(false), message(std::move(m)) {}
bool has_delivered;
DcSctpMessage message;
};
rtc::scoped_refptr<State> state_;
};
} // namespace
void CallbackDeferrer::Prepare() { void CallbackDeferrer::Prepare() {
RTC_DCHECK(!prepared_); RTC_DCHECK(!prepared_);
@ -48,12 +23,12 @@ void CallbackDeferrer::TriggerDeferred() {
// callback, and that might result in adding new callbacks to this instance, // callback, and that might result in adding new callbacks to this instance,
// and the vector can't be modified while iterated on. // and the vector can't be modified while iterated on.
RTC_DCHECK(prepared_); RTC_DCHECK(prepared_);
std::vector<std::function<void(DcSctpSocketCallbacks & cb)>> deferred; std::vector<std::pair<Callback, CallbackData>> deferred;
deferred.swap(deferred_); deferred.swap(deferred_);
prepared_ = false; prepared_ = false;
for (auto& cb : deferred) { for (auto& [cb, data] : deferred) {
cb(underlying_); cb(std::move(data), underlying_);
} }
} }
@ -84,40 +59,57 @@ uint32_t CallbackDeferrer::GetRandomInt(uint32_t low, uint32_t high) {
void CallbackDeferrer::OnMessageReceived(DcSctpMessage message) { void CallbackDeferrer::OnMessageReceived(DcSctpMessage message) {
RTC_DCHECK(prepared_); RTC_DCHECK(prepared_);
deferred_.emplace_back( deferred_.emplace_back(
[deliverer = MessageDeliverer(std::move(message))]( +[](CallbackData data, DcSctpSocketCallbacks& cb) {
DcSctpSocketCallbacks& cb) mutable { deliverer.Deliver(cb); }); return cb.OnMessageReceived(absl::get<DcSctpMessage>(std::move(data)));
},
std::move(message));
} }
void CallbackDeferrer::OnError(ErrorKind error, absl::string_view message) { void CallbackDeferrer::OnError(ErrorKind error, absl::string_view message) {
RTC_DCHECK(prepared_); RTC_DCHECK(prepared_);
deferred_.emplace_back( deferred_.emplace_back(
[error, message = std::string(message)](DcSctpSocketCallbacks& cb) { +[](CallbackData data, DcSctpSocketCallbacks& cb) {
cb.OnError(error, message); Error error = absl::get<Error>(std::move(data));
}); return cb.OnError(error.error, error.message);
},
Error{error, std::string(message)});
} }
void CallbackDeferrer::OnAborted(ErrorKind error, absl::string_view message) { void CallbackDeferrer::OnAborted(ErrorKind error, absl::string_view message) {
RTC_DCHECK(prepared_); RTC_DCHECK(prepared_);
deferred_.emplace_back( deferred_.emplace_back(
[error, message = std::string(message)](DcSctpSocketCallbacks& cb) { +[](CallbackData data, DcSctpSocketCallbacks& cb) {
cb.OnAborted(error, message); Error error = absl::get<Error>(std::move(data));
}); return cb.OnAborted(error.error, error.message);
},
Error{error, std::string(message)});
} }
void CallbackDeferrer::OnConnected() { void CallbackDeferrer::OnConnected() {
RTC_DCHECK(prepared_); RTC_DCHECK(prepared_);
deferred_.emplace_back([](DcSctpSocketCallbacks& cb) { cb.OnConnected(); }); deferred_.emplace_back(
+[](CallbackData data, DcSctpSocketCallbacks& cb) {
return cb.OnConnected();
},
absl::monostate{});
} }
void CallbackDeferrer::OnClosed() { void CallbackDeferrer::OnClosed() {
RTC_DCHECK(prepared_); RTC_DCHECK(prepared_);
deferred_.emplace_back([](DcSctpSocketCallbacks& cb) { cb.OnClosed(); }); deferred_.emplace_back(
+[](CallbackData data, DcSctpSocketCallbacks& cb) {
return cb.OnClosed();
},
absl::monostate{});
} }
void CallbackDeferrer::OnConnectionRestarted() { void CallbackDeferrer::OnConnectionRestarted() {
RTC_DCHECK(prepared_); RTC_DCHECK(prepared_);
deferred_.emplace_back( deferred_.emplace_back(
[](DcSctpSocketCallbacks& cb) { cb.OnConnectionRestarted(); }); +[](CallbackData data, DcSctpSocketCallbacks& cb) {
return cb.OnConnectionRestarted();
},
absl::monostate{});
} }
void CallbackDeferrer::OnStreamsResetFailed( void CallbackDeferrer::OnStreamsResetFailed(
@ -125,42 +117,53 @@ void CallbackDeferrer::OnStreamsResetFailed(
absl::string_view reason) { absl::string_view reason) {
RTC_DCHECK(prepared_); RTC_DCHECK(prepared_);
deferred_.emplace_back( deferred_.emplace_back(
[streams = std::vector<StreamID>(outgoing_streams.begin(), +[](CallbackData data, DcSctpSocketCallbacks& cb) {
outgoing_streams.end()), StreamReset stream_reset = absl::get<StreamReset>(std::move(data));
reason = std::string(reason)](DcSctpSocketCallbacks& cb) { return cb.OnStreamsResetFailed(stream_reset.streams,
cb.OnStreamsResetFailed(streams, reason); stream_reset.message);
}); },
StreamReset{{outgoing_streams.begin(), outgoing_streams.end()},
std::string(reason)});
} }
void CallbackDeferrer::OnStreamsResetPerformed( void CallbackDeferrer::OnStreamsResetPerformed(
rtc::ArrayView<const StreamID> outgoing_streams) { rtc::ArrayView<const StreamID> outgoing_streams) {
RTC_DCHECK(prepared_); RTC_DCHECK(prepared_);
deferred_.emplace_back( deferred_.emplace_back(
[streams = std::vector<StreamID>(outgoing_streams.begin(), +[](CallbackData data, DcSctpSocketCallbacks& cb) {
outgoing_streams.end())]( StreamReset stream_reset = absl::get<StreamReset>(std::move(data));
DcSctpSocketCallbacks& cb) { cb.OnStreamsResetPerformed(streams); }); return cb.OnStreamsResetPerformed(stream_reset.streams);
},
StreamReset{{outgoing_streams.begin(), outgoing_streams.end()}});
} }
void CallbackDeferrer::OnIncomingStreamsReset( void CallbackDeferrer::OnIncomingStreamsReset(
rtc::ArrayView<const StreamID> incoming_streams) { rtc::ArrayView<const StreamID> incoming_streams) {
RTC_DCHECK(prepared_); RTC_DCHECK(prepared_);
deferred_.emplace_back( deferred_.emplace_back(
[streams = std::vector<StreamID>(incoming_streams.begin(), +[](CallbackData data, DcSctpSocketCallbacks& cb) {
incoming_streams.end())]( StreamReset stream_reset = absl::get<StreamReset>(std::move(data));
DcSctpSocketCallbacks& cb) { cb.OnIncomingStreamsReset(streams); }); return cb.OnIncomingStreamsReset(stream_reset.streams);
},
StreamReset{{incoming_streams.begin(), incoming_streams.end()}});
} }
void CallbackDeferrer::OnBufferedAmountLow(StreamID stream_id) { void CallbackDeferrer::OnBufferedAmountLow(StreamID stream_id) {
RTC_DCHECK(prepared_); RTC_DCHECK(prepared_);
deferred_.emplace_back([stream_id](DcSctpSocketCallbacks& cb) { deferred_.emplace_back(
cb.OnBufferedAmountLow(stream_id); +[](CallbackData data, DcSctpSocketCallbacks& cb) {
}); return cb.OnBufferedAmountLow(absl::get<StreamID>(std::move(data)));
},
stream_id);
} }
void CallbackDeferrer::OnTotalBufferedAmountLow() { void CallbackDeferrer::OnTotalBufferedAmountLow() {
RTC_DCHECK(prepared_); RTC_DCHECK(prepared_);
deferred_.emplace_back( deferred_.emplace_back(
[](DcSctpSocketCallbacks& cb) { cb.OnTotalBufferedAmountLow(); }); +[](CallbackData data, DcSctpSocketCallbacks& cb) {
return cb.OnTotalBufferedAmountLow();
},
absl::monostate{});
} }
void CallbackDeferrer::OnLifecycleMessageExpired(LifecycleId lifecycle_id, void CallbackDeferrer::OnLifecycleMessageExpired(LifecycleId lifecycle_id,

View file

@ -18,6 +18,7 @@
#include <vector> #include <vector>
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/types/variant.h"
#include "api/array_view.h" #include "api/array_view.h"
#include "api/ref_counted_base.h" #include "api/ref_counted_base.h"
#include "api/scoped_refptr.h" #include "api/scoped_refptr.h"
@ -89,12 +90,26 @@ class CallbackDeferrer : public DcSctpSocketCallbacks {
void OnLifecycleEnd(LifecycleId lifecycle_id) override; void OnLifecycleEnd(LifecycleId lifecycle_id) override;
private: private:
struct Error {
ErrorKind error;
std::string message;
};
struct StreamReset {
std::vector<StreamID> streams;
std::string message;
};
// Use a pre-sized variant for storage to avoid double heap allocation. This
// variant can hold all cases of stored data.
using CallbackData = absl::
variant<absl::monostate, DcSctpMessage, Error, StreamReset, StreamID>;
using Callback = void (*)(CallbackData, DcSctpSocketCallbacks&);
void Prepare(); void Prepare();
void TriggerDeferred(); void TriggerDeferred();
DcSctpSocketCallbacks& underlying_; DcSctpSocketCallbacks& underlying_;
bool prepared_ = false; bool prepared_ = false;
std::vector<std::function<void(DcSctpSocketCallbacks& cb)>> deferred_; std::vector<std::pair<Callback, CallbackData>> deferred_;
}; };
} // namespace dcsctp } // namespace dcsctp