mirror of
https://github.com/mollyim/webrtc.git
synced 2025-05-16 07:10:38 +01:00
dcsctp: Refactor CallbackDeferrer
Moving the implementation to a source file. Bug: webrtc:13217 Change-Id: I2929f4af96a9d01d3adfa49b36b021e4b229a025 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/233241 Reviewed-by: Florent Castelli <orphis@webrtc.org> Commit-Queue: Victor Boivie <boivie@webrtc.org> Cr-Commit-Position: refs/heads/main@{#35116}
This commit is contained in:
parent
43651f502c
commit
0081f1c331
3 changed files with 160 additions and 123 deletions
|
@ -160,6 +160,7 @@ rtc_library("dcsctp_socket") {
|
||||||
"../tx:send_queue",
|
"../tx:send_queue",
|
||||||
]
|
]
|
||||||
sources = [
|
sources = [
|
||||||
|
"callback_deferrer.cc",
|
||||||
"callback_deferrer.h",
|
"callback_deferrer.h",
|
||||||
"dcsctp_socket.cc",
|
"dcsctp_socket.cc",
|
||||||
"dcsctp_socket.h",
|
"dcsctp_socket.h",
|
||||||
|
|
142
net/dcsctp/socket/callback_deferrer.cc
Normal file
142
net/dcsctp/socket/callback_deferrer.cc
Normal file
|
@ -0,0 +1,142 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
||||||
|
*
|
||||||
|
* Use of this source code is governed by a BSD-style license
|
||||||
|
* that can be found in the LICENSE file in the root of the source
|
||||||
|
* tree. An additional intellectual property rights grant can be found
|
||||||
|
* in the file PATENTS. All contributing project authors may
|
||||||
|
* be found in the AUTHORS file in the root of the source tree.
|
||||||
|
*/
|
||||||
|
#include "net/dcsctp/socket/callback_deferrer.h"
|
||||||
|
|
||||||
|
namespace dcsctp {
|
||||||
|
namespace {
|
||||||
|
// A wrapper around the move-only DcSctpMessage, to let it be captured in a
|
||||||
|
// lambda.
|
||||||
|
class MessageDeliverer {
|
||||||
|
public:
|
||||||
|
explicit MessageDeliverer(DcSctpMessage&& message)
|
||||||
|
: state_(rtc::make_ref_counted<State>(std::move(message))) {}
|
||||||
|
|
||||||
|
void Deliver(DcSctpSocketCallbacks& c) {
|
||||||
|
// Really ensure that it's only called once.
|
||||||
|
RTC_DCHECK(!state_->has_delivered);
|
||||||
|
state_->has_delivered = true;
|
||||||
|
c.OnMessageReceived(std::move(state_->message));
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
struct State : public rtc::RefCountInterface {
|
||||||
|
explicit State(DcSctpMessage&& m)
|
||||||
|
: has_delivered(false), message(std::move(m)) {}
|
||||||
|
bool has_delivered;
|
||||||
|
DcSctpMessage message;
|
||||||
|
};
|
||||||
|
rtc::scoped_refptr<State> state_;
|
||||||
|
};
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
void CallbackDeferrer::TriggerDeferred() {
|
||||||
|
// Need to swap here. The client may call into the library from within a
|
||||||
|
// callback, and that might result in adding new callbacks to this instance,
|
||||||
|
// and the vector can't be modified while iterated on.
|
||||||
|
std::vector<std::function<void(DcSctpSocketCallbacks & cb)>> deferred;
|
||||||
|
deferred.swap(deferred_);
|
||||||
|
|
||||||
|
for (auto& cb : deferred) {
|
||||||
|
cb(underlying_);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SendPacketStatus CallbackDeferrer::SendPacketWithStatus(
|
||||||
|
rtc::ArrayView<const uint8_t> data) {
|
||||||
|
// Will not be deferred - call directly.
|
||||||
|
return underlying_.SendPacketWithStatus(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<Timeout> CallbackDeferrer::CreateTimeout() {
|
||||||
|
// Will not be deferred - call directly.
|
||||||
|
return underlying_.CreateTimeout();
|
||||||
|
}
|
||||||
|
|
||||||
|
TimeMs CallbackDeferrer::TimeMillis() {
|
||||||
|
// Will not be deferred - call directly.
|
||||||
|
return underlying_.TimeMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t CallbackDeferrer::GetRandomInt(uint32_t low, uint32_t high) {
|
||||||
|
// Will not be deferred - call directly.
|
||||||
|
return underlying_.GetRandomInt(low, high);
|
||||||
|
}
|
||||||
|
|
||||||
|
void CallbackDeferrer::OnMessageReceived(DcSctpMessage message) {
|
||||||
|
deferred_.emplace_back(
|
||||||
|
[deliverer = MessageDeliverer(std::move(message))](
|
||||||
|
DcSctpSocketCallbacks& cb) mutable { deliverer.Deliver(cb); });
|
||||||
|
}
|
||||||
|
|
||||||
|
void CallbackDeferrer::OnError(ErrorKind error, absl::string_view message) {
|
||||||
|
deferred_.emplace_back(
|
||||||
|
[error, message = std::string(message)](DcSctpSocketCallbacks& cb) {
|
||||||
|
cb.OnError(error, message);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void CallbackDeferrer::OnAborted(ErrorKind error, absl::string_view message) {
|
||||||
|
deferred_.emplace_back(
|
||||||
|
[error, message = std::string(message)](DcSctpSocketCallbacks& cb) {
|
||||||
|
cb.OnAborted(error, message);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void CallbackDeferrer::OnConnected() {
|
||||||
|
deferred_.emplace_back([](DcSctpSocketCallbacks& cb) { cb.OnConnected(); });
|
||||||
|
}
|
||||||
|
|
||||||
|
void CallbackDeferrer::OnClosed() {
|
||||||
|
deferred_.emplace_back([](DcSctpSocketCallbacks& cb) { cb.OnClosed(); });
|
||||||
|
}
|
||||||
|
|
||||||
|
void CallbackDeferrer::OnConnectionRestarted() {
|
||||||
|
deferred_.emplace_back(
|
||||||
|
[](DcSctpSocketCallbacks& cb) { cb.OnConnectionRestarted(); });
|
||||||
|
}
|
||||||
|
|
||||||
|
void CallbackDeferrer::OnStreamsResetFailed(
|
||||||
|
rtc::ArrayView<const StreamID> outgoing_streams,
|
||||||
|
absl::string_view reason) {
|
||||||
|
deferred_.emplace_back(
|
||||||
|
[streams = std::vector<StreamID>(outgoing_streams.begin(),
|
||||||
|
outgoing_streams.end()),
|
||||||
|
reason = std::string(reason)](DcSctpSocketCallbacks& cb) {
|
||||||
|
cb.OnStreamsResetFailed(streams, reason);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void CallbackDeferrer::OnStreamsResetPerformed(
|
||||||
|
rtc::ArrayView<const StreamID> outgoing_streams) {
|
||||||
|
deferred_.emplace_back(
|
||||||
|
[streams = std::vector<StreamID>(outgoing_streams.begin(),
|
||||||
|
outgoing_streams.end())](
|
||||||
|
DcSctpSocketCallbacks& cb) { cb.OnStreamsResetPerformed(streams); });
|
||||||
|
}
|
||||||
|
|
||||||
|
void CallbackDeferrer::OnIncomingStreamsReset(
|
||||||
|
rtc::ArrayView<const StreamID> incoming_streams) {
|
||||||
|
deferred_.emplace_back(
|
||||||
|
[streams = std::vector<StreamID>(incoming_streams.begin(),
|
||||||
|
incoming_streams.end())](
|
||||||
|
DcSctpSocketCallbacks& cb) { cb.OnIncomingStreamsReset(streams); });
|
||||||
|
}
|
||||||
|
|
||||||
|
void CallbackDeferrer::OnBufferedAmountLow(StreamID stream_id) {
|
||||||
|
deferred_.emplace_back([stream_id](DcSctpSocketCallbacks& cb) {
|
||||||
|
cb.OnBufferedAmountLow(stream_id);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void CallbackDeferrer::OnTotalBufferedAmountLow() {
|
||||||
|
deferred_.emplace_back(
|
||||||
|
[](DcSctpSocketCallbacks& cb) { cb.OnTotalBufferedAmountLow(); });
|
||||||
|
}
|
||||||
|
} // namespace dcsctp
|
|
@ -47,136 +47,30 @@ class CallbackDeferrer : public DcSctpSocketCallbacks {
|
||||||
explicit CallbackDeferrer(DcSctpSocketCallbacks& underlying)
|
explicit CallbackDeferrer(DcSctpSocketCallbacks& underlying)
|
||||||
: underlying_(underlying) {}
|
: underlying_(underlying) {}
|
||||||
|
|
||||||
void TriggerDeferred() {
|
void TriggerDeferred();
|
||||||
// Need to swap here. The client may call into the library from within a
|
|
||||||
// callback, and that might result in adding new callbacks to this instance,
|
|
||||||
// and the vector can't be modified while iterated on.
|
|
||||||
std::vector<std::function<void(DcSctpSocketCallbacks & cb)>> deferred;
|
|
||||||
deferred.swap(deferred_);
|
|
||||||
|
|
||||||
for (auto& cb : deferred) {
|
|
||||||
cb(underlying_);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// Implementation of DcSctpSocketCallbacks
|
||||||
SendPacketStatus SendPacketWithStatus(
|
SendPacketStatus SendPacketWithStatus(
|
||||||
rtc::ArrayView<const uint8_t> data) override {
|
rtc::ArrayView<const uint8_t> data) override;
|
||||||
// Will not be deferred - call directly.
|
std::unique_ptr<Timeout> CreateTimeout() override;
|
||||||
return underlying_.SendPacketWithStatus(data);
|
TimeMs TimeMillis() override;
|
||||||
}
|
uint32_t GetRandomInt(uint32_t low, uint32_t high) override;
|
||||||
|
void OnMessageReceived(DcSctpMessage message) override;
|
||||||
std::unique_ptr<Timeout> CreateTimeout() override {
|
void OnError(ErrorKind error, absl::string_view message) override;
|
||||||
// Will not be deferred - call directly.
|
void OnAborted(ErrorKind error, absl::string_view message) override;
|
||||||
return underlying_.CreateTimeout();
|
void OnConnected() override;
|
||||||
}
|
void OnClosed() override;
|
||||||
|
void OnConnectionRestarted() override;
|
||||||
TimeMs TimeMillis() override {
|
|
||||||
// Will not be deferred - call directly.
|
|
||||||
return underlying_.TimeMillis();
|
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t GetRandomInt(uint32_t low, uint32_t high) override {
|
|
||||||
// Will not be deferred - call directly.
|
|
||||||
return underlying_.GetRandomInt(low, high);
|
|
||||||
}
|
|
||||||
|
|
||||||
void OnMessageReceived(DcSctpMessage message) override {
|
|
||||||
deferred_.emplace_back(
|
|
||||||
[deliverer = MessageDeliverer(std::move(message))](
|
|
||||||
DcSctpSocketCallbacks& cb) mutable { deliverer.Deliver(cb); });
|
|
||||||
}
|
|
||||||
|
|
||||||
void OnError(ErrorKind error, absl::string_view message) override {
|
|
||||||
deferred_.emplace_back(
|
|
||||||
[error, message = std::string(message)](DcSctpSocketCallbacks& cb) {
|
|
||||||
cb.OnError(error, message);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
void OnAborted(ErrorKind error, absl::string_view message) override {
|
|
||||||
deferred_.emplace_back(
|
|
||||||
[error, message = std::string(message)](DcSctpSocketCallbacks& cb) {
|
|
||||||
cb.OnAborted(error, message);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
void OnConnected() override {
|
|
||||||
deferred_.emplace_back([](DcSctpSocketCallbacks& cb) { cb.OnConnected(); });
|
|
||||||
}
|
|
||||||
|
|
||||||
void OnClosed() override {
|
|
||||||
deferred_.emplace_back([](DcSctpSocketCallbacks& cb) { cb.OnClosed(); });
|
|
||||||
}
|
|
||||||
|
|
||||||
void OnConnectionRestarted() override {
|
|
||||||
deferred_.emplace_back(
|
|
||||||
[](DcSctpSocketCallbacks& cb) { cb.OnConnectionRestarted(); });
|
|
||||||
}
|
|
||||||
|
|
||||||
void OnStreamsResetFailed(rtc::ArrayView<const StreamID> outgoing_streams,
|
void OnStreamsResetFailed(rtc::ArrayView<const StreamID> outgoing_streams,
|
||||||
absl::string_view reason) override {
|
absl::string_view reason) override;
|
||||||
deferred_.emplace_back(
|
|
||||||
[streams = std::vector<StreamID>(outgoing_streams.begin(),
|
|
||||||
outgoing_streams.end()),
|
|
||||||
reason = std::string(reason)](DcSctpSocketCallbacks& cb) {
|
|
||||||
cb.OnStreamsResetFailed(streams, reason);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
void OnStreamsResetPerformed(
|
void OnStreamsResetPerformed(
|
||||||
rtc::ArrayView<const StreamID> outgoing_streams) override {
|
rtc::ArrayView<const StreamID> outgoing_streams) override;
|
||||||
deferred_.emplace_back(
|
|
||||||
[streams = std::vector<StreamID>(outgoing_streams.begin(),
|
|
||||||
outgoing_streams.end())](
|
|
||||||
DcSctpSocketCallbacks& cb) {
|
|
||||||
cb.OnStreamsResetPerformed(streams);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
void OnIncomingStreamsReset(
|
void OnIncomingStreamsReset(
|
||||||
rtc::ArrayView<const StreamID> incoming_streams) override {
|
rtc::ArrayView<const StreamID> incoming_streams) override;
|
||||||
deferred_.emplace_back(
|
void OnBufferedAmountLow(StreamID stream_id) override;
|
||||||
[streams = std::vector<StreamID>(incoming_streams.begin(),
|
void OnTotalBufferedAmountLow() override;
|
||||||
incoming_streams.end())](
|
|
||||||
DcSctpSocketCallbacks& cb) { cb.OnIncomingStreamsReset(streams); });
|
|
||||||
}
|
|
||||||
|
|
||||||
void OnBufferedAmountLow(StreamID stream_id) override {
|
|
||||||
deferred_.emplace_back([stream_id](DcSctpSocketCallbacks& cb) {
|
|
||||||
cb.OnBufferedAmountLow(stream_id);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
void OnTotalBufferedAmountLow() override {
|
|
||||||
deferred_.emplace_back(
|
|
||||||
[](DcSctpSocketCallbacks& cb) { cb.OnTotalBufferedAmountLow(); });
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// A wrapper around the move-only DcSctpMessage, to let it be captured in a
|
|
||||||
// lambda.
|
|
||||||
class MessageDeliverer {
|
|
||||||
public:
|
|
||||||
explicit MessageDeliverer(DcSctpMessage&& message)
|
|
||||||
: state_(rtc::make_ref_counted<State>(std::move(message))) {}
|
|
||||||
|
|
||||||
void Deliver(DcSctpSocketCallbacks& c) {
|
|
||||||
// Really ensure that it's only called once.
|
|
||||||
RTC_DCHECK(!state_->has_delivered);
|
|
||||||
state_->has_delivered = true;
|
|
||||||
c.OnMessageReceived(std::move(state_->message));
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
struct State : public rtc::RefCountInterface {
|
|
||||||
explicit State(DcSctpMessage&& m)
|
|
||||||
: has_delivered(false), message(std::move(m)) {}
|
|
||||||
bool has_delivered;
|
|
||||||
DcSctpMessage message;
|
|
||||||
};
|
|
||||||
rtc::scoped_refptr<State> state_;
|
|
||||||
};
|
|
||||||
|
|
||||||
DcSctpSocketCallbacks& underlying_;
|
DcSctpSocketCallbacks& underlying_;
|
||||||
std::vector<std::function<void(DcSctpSocketCallbacks& cb)>> deferred_;
|
std::vector<std::function<void(DcSctpSocketCallbacks& cb)>> deferred_;
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in a new issue