VideoRtpReceiver & AudioRtpReceiver threading fixes.

For implementations where the signaling and worker threads are not
the same thread, this significantly cuts down on Thread::Invoke()s that
would block the signaling thread while waiting for the worker thread.

For Audio and Video Rtp receivers, the following methods now do not
block the signaling thread:
* GetParameters
* SetJitterBufferMinimumDelay
* GetSources
* SetFrameDecryptor / GetFrameDecryptor
* SetDepacketizerToDecoderFrameTransformer

Importantly this change also makes the track() accessor accessible
directly from the application thread (bypassing the proxy) since
for receiver objects, the track object is const.

Other changes:

* Remove RefCountedObject inheritance, use make_ref_counted instead.
* Every member variable in the rtp receiver classes is now RTC_GUARDED
* Stop() now fully clears up worker thread state, and Stop() is
  consistently called before destruction. This means that there's one
  thread hop instead of at least 4 before (sometimes more), per receiver.
* OnChanged triggered volume for audio tracks is done asynchronously.
* Deleted most of the JitterBufferDelay implementation. Turns out that
  it was largely unnecessary overhead and complexity.

It seems that these two classes are copy/pasted to a large extent
so further refactoring would be good in the future, as to not have to
fix each issue twice.

Bug: chromium:1184611
Change-Id: I1ba5c3abbd1b0571f7d12850d64004fd2d83e5e2
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/218605
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Markus Handell <handellm@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34022}
This commit is contained in:
Tommi 2021-05-17 14:50:10 +02:00 committed by WebRTC LUCI CQ
parent b27a9f9481
commit 4ccdf932e1
21 changed files with 585 additions and 583 deletions

View file

@ -100,11 +100,13 @@ class RTC_EXPORT RtpReceiverInterface : public rtc::RefCountInterface {
// before it is sent across the network. This will decrypt the entire frame
// using the user provided decryption mechanism regardless of whether SRTP is
// enabled or not.
// TODO(bugs.webrtc.org/12772): Remove.
virtual void SetFrameDecryptor(
rtc::scoped_refptr<FrameDecryptorInterface> frame_decryptor);
// Returns a pointer to the frame decryptor set previously by the
// user. This can be used to update the state of the object.
// TODO(bugs.webrtc.org/12772): Remove.
virtual rtc::scoped_refptr<FrameDecryptorInterface> GetFrameDecryptor() const;
// Sets a frame transformer between the depacketizer and the decoder to enable
@ -120,25 +122,29 @@ class RTC_EXPORT RtpReceiverInterface : public rtc::RefCountInterface {
// Define proxy for RtpReceiverInterface.
// TODO(deadbeef): Move this to .cc file and out of api/. What threads methods
// are called on is an implementation detail.
BEGIN_PRIMARY_PROXY_MAP(RtpReceiver)
BEGIN_PROXY_MAP(RtpReceiver)
PROXY_PRIMARY_THREAD_DESTRUCTOR()
PROXY_CONSTMETHOD0(rtc::scoped_refptr<MediaStreamTrackInterface>, track)
BYPASS_PROXY_CONSTMETHOD0(rtc::scoped_refptr<MediaStreamTrackInterface>, track)
PROXY_CONSTMETHOD0(rtc::scoped_refptr<DtlsTransportInterface>, dtls_transport)
PROXY_CONSTMETHOD0(std::vector<std::string>, stream_ids)
PROXY_CONSTMETHOD0(std::vector<rtc::scoped_refptr<MediaStreamInterface>>,
streams)
BYPASS_PROXY_CONSTMETHOD0(cricket::MediaType, media_type)
BYPASS_PROXY_CONSTMETHOD0(std::string, id)
PROXY_CONSTMETHOD0(RtpParameters, GetParameters)
PROXY_SECONDARY_CONSTMETHOD0(RtpParameters, GetParameters)
PROXY_METHOD1(void, SetObserver, RtpReceiverObserverInterface*)
PROXY_METHOD1(void, SetJitterBufferMinimumDelay, absl::optional<double>)
PROXY_CONSTMETHOD0(std::vector<RtpSource>, GetSources)
PROXY_METHOD1(void,
PROXY_SECONDARY_METHOD1(void,
SetJitterBufferMinimumDelay,
absl::optional<double>)
PROXY_SECONDARY_CONSTMETHOD0(std::vector<RtpSource>, GetSources)
// TODO(bugs.webrtc.org/12772): Remove.
PROXY_SECONDARY_METHOD1(void,
SetFrameDecryptor,
rtc::scoped_refptr<FrameDecryptorInterface>)
PROXY_CONSTMETHOD0(rtc::scoped_refptr<FrameDecryptorInterface>,
// TODO(bugs.webrtc.org/12772): Remove.
PROXY_SECONDARY_CONSTMETHOD0(rtc::scoped_refptr<FrameDecryptorInterface>,
GetFrameDecryptor)
PROXY_METHOD1(void,
PROXY_SECONDARY_METHOD1(void,
SetDepacketizerToDecoderFrameTransformer,
rtc::scoped_refptr<FrameTransformerInterface>)
END_PROXY_MAP()

View file

@ -216,8 +216,6 @@ rtc_library("peerconnection") {
":connection_context",
":dtmf_sender",
":jitter_buffer_delay",
":jitter_buffer_delay_interface",
":jitter_buffer_delay_proxy",
":media_protocol_names",
":media_stream",
":peer_connection_message_handler",
@ -502,8 +500,6 @@ rtc_library("audio_rtp_receiver") {
deps = [
":audio_track",
":jitter_buffer_delay",
":jitter_buffer_delay_interface",
":jitter_buffer_delay_proxy",
":media_stream",
":remote_audio_source",
":rtp_receiver",
@ -520,6 +516,9 @@ rtc_library("audio_rtp_receiver") {
"../rtc_base:checks",
"../rtc_base:refcount",
"../rtc_base:threading",
"../rtc_base/system:no_unique_address",
"../rtc_base/task_utils:pending_task_safety_flag",
"../rtc_base/task_utils:to_queued_task",
]
absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",
@ -535,8 +534,6 @@ rtc_library("video_rtp_receiver") {
]
deps = [
":jitter_buffer_delay",
":jitter_buffer_delay_interface",
":jitter_buffer_delay_proxy",
":media_stream",
":rtp_receiver",
":video_rtp_track_source",
@ -556,6 +553,7 @@ rtc_library("video_rtp_receiver") {
"../rtc_base:checks",
"../rtc_base:rtc_base_approved",
"../rtc_base:threading",
"../rtc_base/system:no_unique_address",
]
absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",
@ -615,19 +613,6 @@ rtc_library("video_track") {
]
}
rtc_source_set("jitter_buffer_delay_interface") {
sources = [ "jitter_buffer_delay_interface.h" ]
deps = [
"../media:rtc_media_base",
"../rtc_base:refcount",
]
absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",
]
}
rtc_source_set("sdp_state_provider") {
sources = [ "sdp_state_provider.h" ]
deps = [
@ -636,35 +621,19 @@ rtc_source_set("sdp_state_provider") {
]
}
rtc_source_set("jitter_buffer_delay_proxy") {
sources = [ "jitter_buffer_delay_proxy.h" ]
deps = [
":jitter_buffer_delay_interface",
"../api:libjingle_peerconnection_api",
"../media:rtc_media_base",
]
}
rtc_library("jitter_buffer_delay") {
sources = [
"jitter_buffer_delay.cc",
"jitter_buffer_delay.h",
]
deps = [
":jitter_buffer_delay_interface",
"../api:sequence_checker",
"../media:rtc_media_base",
"../rtc_base",
"../rtc_base:checks",
"../rtc_base:refcount",
"../rtc_base:safe_conversions",
"../rtc_base:safe_minmax",
"../rtc_base:threading",
]
absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",
"../rtc_base/system:no_unique_address",
]
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
rtc_library("remote_audio_source") {
@ -1024,7 +993,6 @@ if (rtc_include_tests && !build_with_chromium) {
":dtmf_sender",
":integration_test_helpers",
":jitter_buffer_delay",
":jitter_buffer_delay_interface",
":media_stream",
":peerconnection",
":remote_audio_source",
@ -1092,6 +1060,7 @@ if (rtc_include_tests && !build_with_chromium) {
"../test:field_trial",
"../test:fileutils",
"../test:rtp_test_utils",
"../test:test_common",
"../test/pc/sctp:fake_sctp_transport",
"./scenario_tests:pc_scenario_tests",
"//third_party/abseil-cpp/absl/algorithm:container",
@ -1181,7 +1150,6 @@ if (rtc_include_tests && !build_with_chromium) {
":audio_track",
":dtmf_sender",
":jitter_buffer_delay",
":jitter_buffer_delay_interface",
":media_stream",
":pc_test_utils",
":peerconnection",
@ -1291,7 +1259,6 @@ if (rtc_include_tests && !build_with_chromium) {
"test/frame_generator_capturer_video_track_source.h",
"test/mock_channel_interface.h",
"test/mock_data_channel.h",
"test/mock_delayable.h",
"test/mock_peer_connection_observers.h",
"test/mock_rtp_receiver_internal.h",
"test/mock_rtp_sender_internal.h",
@ -1303,7 +1270,6 @@ if (rtc_include_tests && !build_with_chromium) {
deps = [
":jitter_buffer_delay",
":jitter_buffer_delay_interface",
":libjingle_peerconnection",
":peerconnection",
":rtc_pc_base",

View file

@ -18,11 +18,10 @@
#include "api/media_stream_track_proxy.h"
#include "api/sequence_checker.h"
#include "pc/audio_track.h"
#include "pc/jitter_buffer_delay.h"
#include "pc/jitter_buffer_delay_proxy.h"
#include "rtc_base/checks.h"
#include "rtc_base/location.h"
#include "rtc_base/logging.h"
#include "rtc_base/task_utils/to_queued_task.h"
namespace webrtc {
@ -52,10 +51,7 @@ AudioRtpReceiver::AudioRtpReceiver(
AudioTrack::Create(receiver_id, source_))),
cached_track_enabled_(track_->enabled()),
attachment_id_(GenerateUniqueId()),
delay_(JitterBufferDelayProxy::Create(
rtc::Thread::Current(),
worker_thread_,
rtc::make_ref_counted<JitterBufferDelay>(worker_thread))) {
worker_thread_safety_(PendingTaskSafetyFlag::CreateDetachedInactive()) {
RTC_DCHECK(worker_thread_);
RTC_DCHECK(track_->GetSource()->remote());
track_->RegisterObserver(this);
@ -64,140 +60,188 @@ AudioRtpReceiver::AudioRtpReceiver(
}
AudioRtpReceiver::~AudioRtpReceiver() {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
RTC_DCHECK(stopped_);
RTC_DCHECK(!media_channel_);
track_->GetSource()->UnregisterAudioObserver(this);
track_->UnregisterObserver(this);
Stop();
}
void AudioRtpReceiver::OnChanged() {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
if (cached_track_enabled_ != track_->enabled()) {
cached_track_enabled_ = track_->enabled();
Reconfigure();
worker_thread_->PostTask(ToQueuedTask(
worker_thread_safety_,
[this, enabled = cached_track_enabled_, volume = cached_volume_]() {
RTC_DCHECK_RUN_ON(worker_thread_);
Reconfigure(enabled, volume);
}));
}
}
bool AudioRtpReceiver::SetOutputVolume(double volume) {
// RTC_RUN_ON(worker_thread_)
void AudioRtpReceiver::SetOutputVolume_w(double volume) {
RTC_DCHECK_GE(volume, 0.0);
RTC_DCHECK_LE(volume, 10.0);
RTC_DCHECK(media_channel_);
RTC_DCHECK(!stopped_);
return worker_thread_->Invoke<bool>(RTC_FROM_HERE, [&] {
return ssrc_ ? media_channel_->SetOutputVolume(*ssrc_, volume)
ssrc_ ? media_channel_->SetOutputVolume(*ssrc_, volume)
: media_channel_->SetDefaultOutputVolume(volume);
});
}
void AudioRtpReceiver::OnSetVolume(double volume) {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
RTC_DCHECK_GE(volume, 0);
RTC_DCHECK_LE(volume, 10);
cached_volume_ = volume;
if (!media_channel_ || stopped_) {
RTC_LOG(LS_ERROR)
<< "AudioRtpReceiver::OnSetVolume: No audio channel exists.";
if (stopped_)
return;
}
cached_volume_ = volume;
// When the track is disabled, the volume of the source, which is the
// corresponding WebRtc Voice Engine channel will be 0. So we do not allow
// setting the volume to the source when the track is disabled.
if (!stopped_ && track_->enabled()) {
if (!SetOutputVolume(cached_volume_)) {
RTC_NOTREACHED();
}
if (track_->enabled()) {
worker_thread_->PostTask(
ToQueuedTask(worker_thread_safety_, [this, volume = cached_volume_]() {
RTC_DCHECK_RUN_ON(worker_thread_);
SetOutputVolume_w(volume);
}));
}
}
rtc::scoped_refptr<DtlsTransportInterface> AudioRtpReceiver::dtls_transport()
const {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
return dtls_transport_;
}
std::vector<std::string> AudioRtpReceiver::stream_ids() const {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
std::vector<std::string> stream_ids(streams_.size());
for (size_t i = 0; i < streams_.size(); ++i)
stream_ids[i] = streams_[i]->id();
return stream_ids;
}
std::vector<rtc::scoped_refptr<MediaStreamInterface>>
AudioRtpReceiver::streams() const {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
return streams_;
}
RtpParameters AudioRtpReceiver::GetParameters() const {
if (!media_channel_ || stopped_) {
RTC_DCHECK_RUN_ON(worker_thread_);
if (!media_channel_)
return RtpParameters();
}
return worker_thread_->Invoke<RtpParameters>(RTC_FROM_HERE, [&] {
return ssrc_ ? media_channel_->GetRtpReceiveParameters(*ssrc_)
: media_channel_->GetDefaultRtpReceiveParameters();
});
}
void AudioRtpReceiver::SetFrameDecryptor(
rtc::scoped_refptr<FrameDecryptorInterface> frame_decryptor) {
RTC_DCHECK_RUN_ON(worker_thread_);
frame_decryptor_ = std::move(frame_decryptor);
// Special Case: Set the frame decryptor to any value on any existing channel.
if (media_channel_ && ssrc_.has_value() && !stopped_) {
worker_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
if (media_channel_ && ssrc_) {
media_channel_->SetFrameDecryptor(*ssrc_, frame_decryptor_);
});
}
}
rtc::scoped_refptr<FrameDecryptorInterface>
AudioRtpReceiver::GetFrameDecryptor() const {
RTC_DCHECK_RUN_ON(worker_thread_);
return frame_decryptor_;
}
void AudioRtpReceiver::Stop() {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
// TODO(deadbeef): Need to do more here to fully stop receiving packets.
if (stopped_) {
return;
}
if (!stopped_) {
source_->SetState(MediaSourceInterface::kEnded);
if (media_channel_) {
// Allow that SetOutputVolume fail. This is the normal case when the
// underlying media channel has already been deleted.
SetOutputVolume(0.0);
}
stopped_ = true;
}
worker_thread_->Invoke<void>(RTC_FROM_HERE, [&]() {
RTC_DCHECK_RUN_ON(worker_thread_);
if (media_channel_)
SetOutputVolume_w(0.0);
SetMediaChannel_w(nullptr);
});
}
void AudioRtpReceiver::StopAndEndTrack() {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
Stop();
track_->internal()->set_ended();
}
void AudioRtpReceiver::RestartMediaChannel(absl::optional<uint32_t> ssrc) {
RTC_DCHECK(media_channel_);
if (!stopped_ && ssrc_ == ssrc) {
return;
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
bool ok = worker_thread_->Invoke<bool>(
RTC_FROM_HERE, [&, enabled = cached_track_enabled_,
volume = cached_volume_, was_stopped = stopped_]() {
RTC_DCHECK_RUN_ON(worker_thread_);
if (!media_channel_) {
RTC_DCHECK(was_stopped);
return false; // Can't restart.
}
if (!stopped_) {
source_->Stop(media_channel_, ssrc_);
delay_->OnStop();
if (!was_stopped && ssrc_ == ssrc) {
// Already running with that ssrc.
RTC_DCHECK(worker_thread_safety_->alive());
return true;
}
ssrc_ = ssrc;
if (!was_stopped) {
source_->Stop(media_channel_, ssrc_);
}
ssrc_ = std::move(ssrc);
source_->Start(media_channel_, ssrc_);
if (ssrc_) {
media_channel_->SetBaseMinimumPlayoutDelayMs(*ssrc_, delay_.GetMs());
}
Reconfigure(enabled, volume);
return true;
});
if (!ok)
return;
stopped_ = false;
source_->Start(media_channel_, ssrc);
delay_->OnStart(media_channel_, ssrc.value_or(0));
Reconfigure();
}
void AudioRtpReceiver::SetupMediaChannel(uint32_t ssrc) {
if (!media_channel_) {
RTC_LOG(LS_ERROR)
<< "AudioRtpReceiver::SetupMediaChannel: No audio channel exists.";
return;
}
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
RestartMediaChannel(ssrc);
}
void AudioRtpReceiver::SetupUnsignaledMediaChannel() {
if (!media_channel_) {
RTC_LOG(LS_ERROR) << "AudioRtpReceiver::SetupUnsignaledMediaChannel: No "
"audio channel exists.";
}
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
RestartMediaChannel(absl::nullopt);
}
uint32_t AudioRtpReceiver::ssrc() const {
RTC_DCHECK_RUN_ON(worker_thread_);
return ssrc_.value_or(0);
}
void AudioRtpReceiver::set_stream_ids(std::vector<std::string> stream_ids) {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
SetStreams(CreateStreamsFromIds(std::move(stream_ids)));
}
void AudioRtpReceiver::set_transport(
rtc::scoped_refptr<DtlsTransportInterface> dtls_transport) {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
dtls_transport_ = std::move(dtls_transport);
}
void AudioRtpReceiver::SetStreams(
const std::vector<rtc::scoped_refptr<MediaStreamInterface>>& streams) {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
// Remove remote track from any streams that are going away.
for (const auto& existing_stream : streams_) {
bool removed = true;
@ -230,51 +274,42 @@ void AudioRtpReceiver::SetStreams(
}
std::vector<RtpSource> AudioRtpReceiver::GetSources() const {
if (!media_channel_ || !ssrc_ || stopped_) {
RTC_DCHECK_RUN_ON(worker_thread_);
if (!media_channel_ || !ssrc_) {
return {};
}
return worker_thread_->Invoke<std::vector<RtpSource>>(
RTC_FROM_HERE, [&] { return media_channel_->GetSources(*ssrc_); });
return media_channel_->GetSources(*ssrc_);
}
void AudioRtpReceiver::SetDepacketizerToDecoderFrameTransformer(
rtc::scoped_refptr<webrtc::FrameTransformerInterface> frame_transformer) {
worker_thread_->Invoke<void>(
RTC_FROM_HERE, [this, frame_transformer = std::move(frame_transformer)] {
RTC_DCHECK_RUN_ON(worker_thread_);
frame_transformer_ = frame_transformer;
if (media_channel_ && ssrc_.has_value() && !stopped_) {
media_channel_->SetDepacketizerToDecoderFrameTransformer(
*ssrc_, frame_transformer);
if (media_channel_) {
media_channel_->SetDepacketizerToDecoderFrameTransformer(ssrc_.value_or(0),
frame_transformer);
}
});
frame_transformer_ = std::move(frame_transformer);
}
void AudioRtpReceiver::Reconfigure() {
if (!media_channel_ || stopped_) {
RTC_LOG(LS_ERROR)
<< "AudioRtpReceiver::Reconfigure: No audio channel exists.";
return;
}
if (!SetOutputVolume(track_->enabled() ? cached_volume_ : 0)) {
RTC_NOTREACHED();
}
// Reattach the frame decryptor if we were reconfigured.
MaybeAttachFrameDecryptorToMediaChannel(
ssrc_, worker_thread_, frame_decryptor_, media_channel_, stopped_);
// RTC_RUN_ON(worker_thread_)
void AudioRtpReceiver::Reconfigure(bool track_enabled, double volume) {
RTC_DCHECK(media_channel_);
if (media_channel_ && ssrc_.has_value() && !stopped_) {
worker_thread_->Invoke<void>(RTC_FROM_HERE, [this] {
RTC_DCHECK_RUN_ON(worker_thread_);
if (!frame_transformer_)
return;
SetOutputVolume_w(track_enabled ? volume : 0);
if (ssrc_ && frame_decryptor_) {
// Reattach the frame decryptor if we were reconfigured.
media_channel_->SetFrameDecryptor(*ssrc_, frame_decryptor_);
}
if (frame_transformer_) {
media_channel_->SetDepacketizerToDecoderFrameTransformer(
*ssrc_, frame_transformer_);
});
ssrc_.value_or(0), frame_transformer_);
}
}
void AudioRtpReceiver::SetObserver(RtpReceiverObserverInterface* observer) {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
observer_ = observer;
// Deliver any notifications the observer may have missed by being set late.
if (received_first_packet_ && observer_) {
@ -284,16 +319,35 @@ void AudioRtpReceiver::SetObserver(RtpReceiverObserverInterface* observer) {
void AudioRtpReceiver::SetJitterBufferMinimumDelay(
absl::optional<double> delay_seconds) {
delay_->Set(delay_seconds);
RTC_DCHECK_RUN_ON(worker_thread_);
delay_.Set(delay_seconds);
if (media_channel_ && ssrc_)
media_channel_->SetBaseMinimumPlayoutDelayMs(*ssrc_, delay_.GetMs());
}
void AudioRtpReceiver::SetMediaChannel(cricket::MediaChannel* media_channel) {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
RTC_DCHECK(media_channel == nullptr ||
media_channel->media_type() == media_type());
if (stopped_ && !media_channel)
return;
worker_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
RTC_DCHECK_RUN_ON(worker_thread_);
SetMediaChannel_w(media_channel);
});
}
// RTC_RUN_ON(worker_thread_)
void AudioRtpReceiver::SetMediaChannel_w(cricket::MediaChannel* media_channel) {
media_channel ? worker_thread_safety_->SetAlive()
: worker_thread_safety_->SetNotAlive();
media_channel_ = static_cast<cricket::VoiceMediaChannel*>(media_channel);
}
void AudioRtpReceiver::NotifyFirstPacketReceived() {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
if (observer_) {
observer_->OnFirstPacketReceived(media_type());
}

View file

@ -12,6 +12,7 @@
#define PC_AUDIO_RTP_RECEIVER_H_
#include <stdint.h>
#include <string>
#include <vector>
@ -25,13 +26,16 @@
#include "api/rtp_parameters.h"
#include "api/rtp_receiver_interface.h"
#include "api/scoped_refptr.h"
#include "api/sequence_checker.h"
#include "api/transport/rtp/rtp_source.h"
#include "media/base/media_channel.h"
#include "pc/audio_track.h"
#include "pc/jitter_buffer_delay_interface.h"
#include "pc/jitter_buffer_delay.h"
#include "pc/remote_audio_source.h"
#include "pc/rtp_receiver.h"
#include "rtc_base/ref_counted_object.h"
#include "rtc_base/system/no_unique_address.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/thread.h"
#include "rtc_base/thread_annotations.h"
@ -39,7 +43,7 @@ namespace webrtc {
class AudioRtpReceiver : public ObserverInterface,
public AudioSourceInterface::AudioObserver,
public rtc::RefCountedObject<RtpReceiverInternal> {
public RtpReceiverInternal {
public:
AudioRtpReceiver(rtc::Thread* worker_thread,
std::string receiver_id,
@ -59,22 +63,16 @@ class AudioRtpReceiver : public ObserverInterface,
// AudioSourceInterface::AudioObserver implementation
void OnSetVolume(double volume) override;
rtc::scoped_refptr<AudioTrackInterface> audio_track() const {
return track_.get();
}
rtc::scoped_refptr<AudioTrackInterface> audio_track() const { return track_; }
// RtpReceiverInterface implementation
rtc::scoped_refptr<MediaStreamTrackInterface> track() const override {
return track_.get();
}
rtc::scoped_refptr<DtlsTransportInterface> dtls_transport() const override {
return dtls_transport_;
return track_;
}
rtc::scoped_refptr<DtlsTransportInterface> dtls_transport() const override;
std::vector<std::string> stream_ids() const override;
std::vector<rtc::scoped_refptr<MediaStreamInterface>> streams()
const override {
return streams_;
}
const override;
cricket::MediaType media_type() const override {
return cricket::MEDIA_TYPE_AUDIO;
@ -95,13 +93,11 @@ class AudioRtpReceiver : public ObserverInterface,
void StopAndEndTrack() override;
void SetupMediaChannel(uint32_t ssrc) override;
void SetupUnsignaledMediaChannel() override;
uint32_t ssrc() const override { return ssrc_.value_or(0); }
uint32_t ssrc() const override;
void NotifyFirstPacketReceived() override;
void set_stream_ids(std::vector<std::string> stream_ids) override;
void set_transport(
rtc::scoped_refptr<DtlsTransportInterface> dtls_transport) override {
dtls_transport_ = dtls_transport;
}
rtc::scoped_refptr<DtlsTransportInterface> dtls_transport) override;
void SetStreams(const std::vector<rtc::scoped_refptr<MediaStreamInterface>>&
streams) override;
void SetObserver(RtpReceiverObserverInterface* observer) override;
@ -119,29 +115,40 @@ class AudioRtpReceiver : public ObserverInterface,
private:
void RestartMediaChannel(absl::optional<uint32_t> ssrc);
void Reconfigure();
bool SetOutputVolume(double volume);
void Reconfigure(bool track_enabled, double volume)
RTC_RUN_ON(worker_thread_);
void SetOutputVolume_w(double volume) RTC_RUN_ON(worker_thread_);
void SetMediaChannel_w(cricket::MediaChannel* media_channel)
RTC_RUN_ON(worker_thread_);
RTC_NO_UNIQUE_ADDRESS SequenceChecker signaling_thread_checker_;
rtc::Thread* const worker_thread_;
const std::string id_;
const rtc::scoped_refptr<RemoteAudioSource> source_;
const rtc::scoped_refptr<AudioTrackProxyWithInternal<AudioTrack>> track_;
cricket::VoiceMediaChannel* media_channel_ = nullptr;
absl::optional<uint32_t> ssrc_;
std::vector<rtc::scoped_refptr<MediaStreamInterface>> streams_;
bool cached_track_enabled_;
double cached_volume_ = 1;
bool stopped_ = true;
RtpReceiverObserverInterface* observer_ = nullptr;
bool received_first_packet_ = false;
int attachment_id_ = 0;
rtc::scoped_refptr<FrameDecryptorInterface> frame_decryptor_;
rtc::scoped_refptr<DtlsTransportInterface> dtls_transport_;
// Allows to thread safely change playout delay. Handles caching cases if
cricket::VoiceMediaChannel* media_channel_ RTC_GUARDED_BY(worker_thread_) =
nullptr;
absl::optional<uint32_t> ssrc_ RTC_GUARDED_BY(worker_thread_);
std::vector<rtc::scoped_refptr<MediaStreamInterface>> streams_
RTC_GUARDED_BY(&signaling_thread_checker_);
bool cached_track_enabled_ RTC_GUARDED_BY(&signaling_thread_checker_);
double cached_volume_ RTC_GUARDED_BY(&signaling_thread_checker_) = 1.0;
bool stopped_ RTC_GUARDED_BY(&signaling_thread_checker_) = true;
RtpReceiverObserverInterface* observer_
RTC_GUARDED_BY(&signaling_thread_checker_) = nullptr;
bool received_first_packet_ RTC_GUARDED_BY(&signaling_thread_checker_) =
false;
const int attachment_id_;
rtc::scoped_refptr<FrameDecryptorInterface> frame_decryptor_
RTC_GUARDED_BY(worker_thread_);
rtc::scoped_refptr<DtlsTransportInterface> dtls_transport_
RTC_GUARDED_BY(&signaling_thread_checker_);
// Stores and updates the playout delay. Handles caching cases if
// |SetJitterBufferMinimumDelay| is called before start.
rtc::scoped_refptr<JitterBufferDelayInterface> delay_;
JitterBufferDelay delay_ RTC_GUARDED_BY(worker_thread_);
rtc::scoped_refptr<FrameTransformerInterface> frame_transformer_
RTC_GUARDED_BY(worker_thread_);
const rtc::scoped_refptr<PendingTaskSafetyFlag> worker_thread_safety_;
};
} // namespace webrtc

View file

@ -14,7 +14,6 @@
#include "rtc_base/checks.h"
#include "rtc_base/numerics/safe_conversions.h"
#include "rtc_base/numerics/safe_minmax.h"
#include "rtc_base/thread.h"
namespace {
constexpr int kDefaultDelay = 0;
@ -23,43 +22,21 @@ constexpr int kMaximumDelayMs = 10000;
namespace webrtc {
JitterBufferDelay::JitterBufferDelay(rtc::Thread* worker_thread)
: signaling_thread_(rtc::Thread::Current()), worker_thread_(worker_thread) {
RTC_DCHECK(worker_thread_);
}
void JitterBufferDelay::OnStart(cricket::Delayable* media_channel,
uint32_t ssrc) {
RTC_DCHECK_RUN_ON(signaling_thread_);
media_channel_ = media_channel;
ssrc_ = ssrc;
// Trying to apply cached delay for the audio stream.
if (cached_delay_seconds_) {
Set(cached_delay_seconds_.value());
}
}
void JitterBufferDelay::OnStop() {
RTC_DCHECK_RUN_ON(signaling_thread_);
// Assume that audio stream is no longer present.
media_channel_ = nullptr;
ssrc_ = absl::nullopt;
JitterBufferDelay::JitterBufferDelay() {
worker_thread_checker_.Detach();
}
void JitterBufferDelay::Set(absl::optional<double> delay_seconds) {
RTC_DCHECK_RUN_ON(worker_thread_);
// TODO(kuddai) propagate absl::optional deeper down as default preference.
int delay_ms =
rtc::saturated_cast<int>(delay_seconds.value_or(kDefaultDelay) * 1000);
delay_ms = rtc::SafeClamp(delay_ms, 0, kMaximumDelayMs);
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
cached_delay_seconds_ = delay_seconds;
if (media_channel_ && ssrc_) {
media_channel_->SetBaseMinimumPlayoutDelayMs(ssrc_.value(), delay_ms);
}
}
int JitterBufferDelay::GetMs() const {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
return rtc::SafeClamp(
rtc::saturated_cast<int>(cached_delay_seconds_.value_or(kDefaultDelay) *
1000),
0, kMaximumDelayMs);
}
} // namespace webrtc

View file

@ -14,36 +14,25 @@
#include <stdint.h>
#include "absl/types/optional.h"
#include "media/base/delayable.h"
#include "pc/jitter_buffer_delay_interface.h"
#include "rtc_base/thread.h"
#include "api/sequence_checker.h"
#include "rtc_base/system/no_unique_address.h"
namespace webrtc {
// JitterBufferDelay converts delay from seconds to milliseconds for the
// underlying media channel. It also handles cases when user sets delay before
// the start of media_channel by caching its request. Note, this class is not
// thread safe. Its thread safe version is defined in
// pc/jitter_buffer_delay_proxy.h
class JitterBufferDelay : public JitterBufferDelayInterface {
// the start of media_channel by caching its request.
class JitterBufferDelay {
public:
// Must be called on signaling thread.
explicit JitterBufferDelay(rtc::Thread* worker_thread);
JitterBufferDelay();
void OnStart(cricket::Delayable* media_channel, uint32_t ssrc) override;
void OnStop() override;
void Set(absl::optional<double> delay_seconds) override;
void Set(absl::optional<double> delay_seconds);
int GetMs() const;
private:
// Throughout webrtc source, sometimes it is also called as |main_thread_|.
rtc::Thread* const signaling_thread_;
rtc::Thread* const worker_thread_;
// Media channel and ssrc together uniqely identify audio stream.
cricket::Delayable* media_channel_ = nullptr;
absl::optional<uint32_t> ssrc_;
absl::optional<double> cached_delay_seconds_;
RTC_NO_UNIQUE_ADDRESS SequenceChecker worker_thread_checker_;
absl::optional<double> cached_delay_seconds_
RTC_GUARDED_BY(&worker_thread_checker_);
};
} // namespace webrtc

View file

@ -1,39 +0,0 @@
/*
* Copyright 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef PC_JITTER_BUFFER_DELAY_INTERFACE_H_
#define PC_JITTER_BUFFER_DELAY_INTERFACE_H_
#include <stdint.h>
#include "absl/types/optional.h"
#include "media/base/delayable.h"
#include "rtc_base/ref_count.h"
namespace webrtc {
// JitterBufferDelay delivers user's queries to the underlying media channel. It
// can describe either video or audio delay for receiving stream. "Interface"
// suffix in the interface name is required to be compatible with api/proxy.cc
class JitterBufferDelayInterface : public rtc::RefCountInterface {
public:
// OnStart allows to uniqely identify to which receiving stream playout
// delay must correpond through |media_channel| and |ssrc| pair.
virtual void OnStart(cricket::Delayable* media_channel, uint32_t ssrc) = 0;
// Indicates that underlying receiving stream is stopped.
virtual void OnStop() = 0;
virtual void Set(absl::optional<double> delay_seconds) = 0;
};
} // namespace webrtc
#endif // PC_JITTER_BUFFER_DELAY_INTERFACE_H_

View file

@ -1,31 +0,0 @@
/*
* Copyright 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef PC_JITTER_BUFFER_DELAY_PROXY_H_
#define PC_JITTER_BUFFER_DELAY_PROXY_H_
#include <stdint.h>
#include "api/proxy.h"
#include "media/base/delayable.h"
#include "pc/jitter_buffer_delay_interface.h"
namespace webrtc {
BEGIN_PROXY_MAP(JitterBufferDelay)
PROXY_PRIMARY_THREAD_DESTRUCTOR()
PROXY_METHOD2(void, OnStart, cricket::Delayable*, uint32_t)
PROXY_METHOD0(void, OnStop)
PROXY_SECONDARY_METHOD1(void, Set, absl::optional<double>)
END_PROXY_MAP()
} // namespace webrtc
#endif // PC_JITTER_BUFFER_DELAY_PROXY_H_

View file

@ -13,79 +13,47 @@
#include <stdint.h>
#include "absl/types/optional.h"
#include "api/scoped_refptr.h"
#include "pc/test/mock_delayable.h"
#include "rtc_base/ref_counted_object.h"
#include "rtc_base/thread.h"
#include "test/gmock.h"
#include "test/gtest.h"
using ::testing::Return;
namespace {
constexpr int kSsrc = 1234;
} // namespace
namespace webrtc {
class JitterBufferDelayTest : public ::testing::Test {
public:
JitterBufferDelayTest()
: delay_(
rtc::make_ref_counted<JitterBufferDelay>(rtc::Thread::Current())) {}
JitterBufferDelayTest() {}
protected:
rtc::scoped_refptr<JitterBufferDelayInterface> delay_;
MockDelayable delayable_;
JitterBufferDelay delay_;
};
TEST_F(JitterBufferDelayTest, Set) {
delay_->OnStart(&delayable_, kSsrc);
EXPECT_CALL(delayable_, SetBaseMinimumPlayoutDelayMs(kSsrc, 3000))
.WillOnce(Return(true));
// Delay in seconds.
delay_->Set(3.0);
delay_.Set(3.0);
EXPECT_EQ(delay_.GetMs(), 3000);
}
TEST_F(JitterBufferDelayTest, Caching) {
// Check that value is cached before start.
delay_->Set(4.0);
// Check that cached value applied on the start.
EXPECT_CALL(delayable_, SetBaseMinimumPlayoutDelayMs(kSsrc, 4000))
.WillOnce(Return(true));
delay_->OnStart(&delayable_, kSsrc);
TEST_F(JitterBufferDelayTest, DefaultValue) {
EXPECT_EQ(delay_.GetMs(), 0); // Default value is 0ms.
}
TEST_F(JitterBufferDelayTest, Clamping) {
delay_->OnStart(&delayable_, kSsrc);
// In current Jitter Buffer implementation (Audio or Video) maximum supported
// value is 10000 milliseconds.
EXPECT_CALL(delayable_, SetBaseMinimumPlayoutDelayMs(kSsrc, 10000))
.WillOnce(Return(true));
delay_->Set(10.5);
delay_.Set(10.5);
EXPECT_EQ(delay_.GetMs(), 10000);
// Test int overflow.
EXPECT_CALL(delayable_, SetBaseMinimumPlayoutDelayMs(kSsrc, 10000))
.WillOnce(Return(true));
delay_->Set(21474836470.0);
delay_.Set(21474836470.0);
EXPECT_EQ(delay_.GetMs(), 10000);
EXPECT_CALL(delayable_, SetBaseMinimumPlayoutDelayMs(kSsrc, 0))
.WillOnce(Return(true));
delay_->Set(-21474836470.0);
delay_.Set(-21474836470.0);
EXPECT_EQ(delay_.GetMs(), 0);
// Boundary value in seconds to milliseconds conversion.
EXPECT_CALL(delayable_, SetBaseMinimumPlayoutDelayMs(kSsrc, 0))
.WillOnce(Return(true));
delay_->Set(0.0009);
delay_.Set(0.0009);
EXPECT_EQ(delay_.GetMs(), 0);
EXPECT_CALL(delayable_, SetBaseMinimumPlayoutDelayMs(kSsrc, 0))
.WillOnce(Return(true));
delay_->Set(-2.0);
delay_.Set(-2.0);
EXPECT_EQ(delay_.GetMs(), 0);
}
} // namespace webrtc

View file

@ -61,7 +61,7 @@ RemoteAudioSource::RemoteAudioSource(
}
RemoteAudioSource::~RemoteAudioSource() {
RTC_DCHECK(main_thread_->IsCurrent());
RTC_DCHECK_RUN_ON(main_thread_);
RTC_DCHECK(audio_observers_.empty());
if (!sinks_.empty()) {
RTC_LOG(LS_WARNING)
@ -71,32 +71,28 @@ RemoteAudioSource::~RemoteAudioSource() {
void RemoteAudioSource::Start(cricket::VoiceMediaChannel* media_channel,
absl::optional<uint32_t> ssrc) {
RTC_DCHECK_RUN_ON(main_thread_);
RTC_DCHECK(media_channel);
RTC_DCHECK_RUN_ON(worker_thread_);
// Register for callbacks immediately before AddSink so that we always get
// notified when a channel goes out of scope (signaled when "AudioDataProxy"
// is destroyed).
worker_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
ssrc ? media_channel->SetRawAudioSink(
*ssrc, std::make_unique<AudioDataProxy>(this))
RTC_DCHECK(media_channel);
ssrc ? media_channel->SetRawAudioSink(*ssrc,
std::make_unique<AudioDataProxy>(this))
: media_channel->SetDefaultRawAudioSink(
std::make_unique<AudioDataProxy>(this));
});
}
void RemoteAudioSource::Stop(cricket::VoiceMediaChannel* media_channel,
absl::optional<uint32_t> ssrc) {
RTC_DCHECK_RUN_ON(main_thread_);
RTC_DCHECK_RUN_ON(worker_thread_);
RTC_DCHECK(media_channel);
worker_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
ssrc ? media_channel->SetRawAudioSink(*ssrc, nullptr)
: media_channel->SetDefaultRawAudioSink(nullptr);
});
}
void RemoteAudioSource::SetState(SourceState new_state) {
RTC_DCHECK_RUN_ON(main_thread_);
if (state_ != new_state) {
state_ = new_state;
FireOnChanged();
@ -104,12 +100,12 @@ void RemoteAudioSource::SetState(SourceState new_state) {
}
MediaSourceInterface::SourceState RemoteAudioSource::state() const {
RTC_DCHECK(main_thread_->IsCurrent());
RTC_DCHECK_RUN_ON(main_thread_);
return state_;
}
bool RemoteAudioSource::remote() const {
RTC_DCHECK(main_thread_->IsCurrent());
RTC_DCHECK_RUN_ON(main_thread_);
return true;
}
@ -135,7 +131,7 @@ void RemoteAudioSource::UnregisterAudioObserver(AudioObserver* observer) {
}
void RemoteAudioSource::AddSink(AudioTrackSinkInterface* sink) {
RTC_DCHECK(main_thread_->IsCurrent());
RTC_DCHECK_RUN_ON(main_thread_);
RTC_DCHECK(sink);
if (state_ != MediaSourceInterface::kLive) {
@ -149,7 +145,7 @@ void RemoteAudioSource::AddSink(AudioTrackSinkInterface* sink) {
}
void RemoteAudioSource::RemoveSink(AudioTrackSinkInterface* sink) {
RTC_DCHECK(main_thread_->IsCurrent());
RTC_DCHECK_RUN_ON(main_thread_);
RTC_DCHECK(sink);
MutexLock lock(&sink_lock_);
@ -184,7 +180,7 @@ void RemoteAudioSource::OnAudioChannelGone() {
}
void RemoteAudioSource::OnMessage(rtc::Message* msg) {
RTC_DCHECK(main_thread_->IsCurrent());
RTC_DCHECK_RUN_ON(main_thread_);
sinks_.clear();
SetState(MediaSourceInterface::kEnded);
// Will possibly delete this RemoteAudioSource since it is reference counted

View file

@ -39,20 +39,4 @@ RtpReceiverInternal::CreateStreamsFromIds(std::vector<std::string> stream_ids) {
return streams;
}
// Attempt to attach the frame decryptor to the current media channel on the
// correct worker thread only if both the media channel exists and a ssrc has
// been allocated to the stream.
void RtpReceiverInternal::MaybeAttachFrameDecryptorToMediaChannel(
const absl::optional<uint32_t>& ssrc,
rtc::Thread* worker_thread,
rtc::scoped_refptr<webrtc::FrameDecryptorInterface> frame_decryptor,
cricket::MediaChannel* media_channel,
bool stopped) {
if (media_channel && frame_decryptor && ssrc.has_value() && !stopped) {
worker_thread->Invoke<void>(RTC_FROM_HERE, [&] {
media_channel->SetFrameDecryptor(*ssrc, frame_decryptor);
});
}
}
} // namespace webrtc

View file

@ -92,13 +92,6 @@ class RtpReceiverInternal : public RtpReceiverInterface {
static std::vector<rtc::scoped_refptr<MediaStreamInterface>>
CreateStreamsFromIds(std::vector<std::string> stream_ids);
static void MaybeAttachFrameDecryptorToMediaChannel(
const absl::optional<uint32_t>& ssrc,
rtc::Thread* worker_thread,
rtc::scoped_refptr<webrtc::FrameDecryptorInterface> frame_decryptor,
cricket::MediaChannel* media_channel,
bool stopped);
};
} // namespace webrtc

View file

@ -63,6 +63,7 @@
#include "rtc_base/thread.h"
#include "test/gmock.h"
#include "test/gtest.h"
#include "test/run_loop.h"
using ::testing::_;
using ::testing::ContainerEq;
@ -299,8 +300,8 @@ class RtpSenderReceiverTest
void CreateAudioRtpReceiver(
std::vector<rtc::scoped_refptr<MediaStreamInterface>> streams = {}) {
audio_rtp_receiver_ =
new AudioRtpReceiver(rtc::Thread::Current(), kAudioTrackId, streams,
audio_rtp_receiver_ = rtc::make_ref_counted<AudioRtpReceiver>(
rtc::Thread::Current(), kAudioTrackId, streams,
/*is_unified_plan=*/true);
audio_rtp_receiver_->SetMediaChannel(voice_media_channel_);
audio_rtp_receiver_->SetupMediaChannel(kAudioSsrc);
@ -310,8 +311,8 @@ class RtpSenderReceiverTest
void CreateVideoRtpReceiver(
std::vector<rtc::scoped_refptr<MediaStreamInterface>> streams = {}) {
video_rtp_receiver_ =
new VideoRtpReceiver(rtc::Thread::Current(), kVideoTrackId, streams);
video_rtp_receiver_ = rtc::make_ref_counted<VideoRtpReceiver>(
rtc::Thread::Current(), kVideoTrackId, streams);
video_rtp_receiver_->SetMediaChannel(video_media_channel_);
video_rtp_receiver_->SetupMediaChannel(kVideoSsrc);
video_track_ = video_rtp_receiver_->video_track();
@ -330,19 +331,25 @@ class RtpSenderReceiverTest
video_media_channel_->AddRecvStream(stream_params);
uint32_t primary_ssrc = stream_params.first_ssrc();
video_rtp_receiver_ =
new VideoRtpReceiver(rtc::Thread::Current(), kVideoTrackId, streams);
video_rtp_receiver_ = rtc::make_ref_counted<VideoRtpReceiver>(
rtc::Thread::Current(), kVideoTrackId, streams);
video_rtp_receiver_->SetMediaChannel(video_media_channel_);
video_rtp_receiver_->SetupMediaChannel(primary_ssrc);
video_track_ = video_rtp_receiver_->video_track();
}
void DestroyAudioRtpReceiver() {
if (!audio_rtp_receiver_)
return;
audio_rtp_receiver_->Stop();
audio_rtp_receiver_ = nullptr;
VerifyVoiceChannelNoOutput();
}
void DestroyVideoRtpReceiver() {
if (!video_rtp_receiver_)
return;
video_rtp_receiver_->Stop();
video_rtp_receiver_ = nullptr;
VerifyVideoChannelNoOutput();
}
@ -498,6 +505,7 @@ class RtpSenderReceiverTest
}
protected:
test::RunLoop run_loop_;
rtc::Thread* const network_thread_;
rtc::Thread* const worker_thread_;
webrtc::RtcEventLogNull event_log_;
@ -599,11 +607,15 @@ TEST_F(RtpSenderReceiverTest, RemoteAudioTrackDisable) {
EXPECT_TRUE(voice_media_channel_->GetOutputVolume(kAudioSsrc, &volume));
EXPECT_EQ(1, volume);
// Handling of enable/disable is applied asynchronously.
audio_track_->set_enabled(false);
run_loop_.Flush();
EXPECT_TRUE(voice_media_channel_->GetOutputVolume(kAudioSsrc, &volume));
EXPECT_EQ(0, volume);
audio_track_->set_enabled(true);
run_loop_.Flush();
EXPECT_TRUE(voice_media_channel_->GetOutputVolume(kAudioSsrc, &volume));
EXPECT_EQ(1, volume);
@ -636,6 +648,7 @@ TEST_F(RtpSenderReceiverTest, RemoteVideoTrackState) {
EXPECT_EQ(webrtc::MediaStreamTrackInterface::kEnded, video_track_->state());
EXPECT_EQ(webrtc::MediaSourceInterface::kEnded,
video_track_->GetSource()->state());
DestroyVideoRtpReceiver();
}
// Currently no action is taken when a remote video track is disabled or
@ -657,22 +670,27 @@ TEST_F(RtpSenderReceiverTest, RemoteAudioTrackSetVolume) {
double volume;
audio_track_->GetSource()->SetVolume(0.5);
run_loop_.Flush();
EXPECT_TRUE(voice_media_channel_->GetOutputVolume(kAudioSsrc, &volume));
EXPECT_EQ(0.5, volume);
// Disable the audio track, this should prevent setting the volume.
audio_track_->set_enabled(false);
RTC_DCHECK_EQ(worker_thread_, run_loop_.task_queue());
run_loop_.Flush();
audio_track_->GetSource()->SetVolume(0.8);
EXPECT_TRUE(voice_media_channel_->GetOutputVolume(kAudioSsrc, &volume));
EXPECT_EQ(0, volume);
// When the track is enabled, the previously set volume should take effect.
audio_track_->set_enabled(true);
run_loop_.Flush();
EXPECT_TRUE(voice_media_channel_->GetOutputVolume(kAudioSsrc, &volume));
EXPECT_EQ(0.8, volume);
// Try changing volume one more time.
audio_track_->GetSource()->SetVolume(0.9);
run_loop_.Flush();
EXPECT_TRUE(voice_media_channel_->GetOutputVolume(kAudioSsrc, &volume));
EXPECT_EQ(0.9, volume);
@ -683,12 +701,14 @@ TEST_F(RtpSenderReceiverTest, AudioRtpReceiverDelay) {
CreateAudioRtpReceiver();
VerifyRtpReceiverDelayBehaviour(voice_media_channel_,
audio_rtp_receiver_.get(), kAudioSsrc);
DestroyAudioRtpReceiver();
}
TEST_F(RtpSenderReceiverTest, VideoRtpReceiverDelay) {
CreateVideoRtpReceiver();
VerifyRtpReceiverDelayBehaviour(video_media_channel_,
video_rtp_receiver_.get(), kVideoSsrc);
DestroyVideoRtpReceiver();
}
// Test that the media channel isn't enabled for sending if the audio sender
@ -1582,6 +1602,7 @@ TEST_F(RtpSenderReceiverTest, AudioReceiverCanSetFrameDecryptor) {
audio_rtp_receiver_->SetFrameDecryptor(fake_frame_decryptor);
EXPECT_EQ(fake_frame_decryptor.get(),
audio_rtp_receiver_->GetFrameDecryptor().get());
DestroyAudioRtpReceiver();
}
// Validate that the default FrameEncryptor setting is nullptr.
@ -1593,6 +1614,7 @@ TEST_F(RtpSenderReceiverTest, AudioReceiverCannotSetFrameDecryptorAfterStop) {
audio_rtp_receiver_->Stop();
audio_rtp_receiver_->SetFrameDecryptor(fake_frame_decryptor);
// TODO(webrtc:9926) - Validate media channel not set once fakes updated.
DestroyAudioRtpReceiver();
}
// Validate that the default FrameEncryptor setting is nullptr.
@ -1627,6 +1649,7 @@ TEST_F(RtpSenderReceiverTest, VideoReceiverCanSetFrameDecryptor) {
video_rtp_receiver_->SetFrameDecryptor(fake_frame_decryptor);
EXPECT_EQ(fake_frame_decryptor.get(),
video_rtp_receiver_->GetFrameDecryptor().get());
DestroyVideoRtpReceiver();
}
// Validate that the default FrameEncryptor setting is nullptr.
@ -1638,6 +1661,7 @@ TEST_F(RtpSenderReceiverTest, VideoReceiverCannotSetFrameDecryptorAfterStop) {
video_rtp_receiver_->Stop();
video_rtp_receiver_->SetFrameDecryptor(fake_frame_decryptor);
// TODO(webrtc:9926) - Validate media channel not set once fakes updated.
DestroyVideoRtpReceiver();
}
// Checks that calling the internal methods for get/set parameters do not

View file

@ -211,14 +211,10 @@ void RtpTransceiver::SetChannel(cricket::ChannelInterface* channel) {
for (const auto& receiver : receivers_) {
if (!channel_) {
// TODO(tommi): This can internally block and hop to the worker thread.
// It's likely that SetMediaChannel also does that, so perhaps we should
// require SetMediaChannel(nullptr) to also Stop() and skip this call.
receiver->internal()->Stop();
} else {
receiver->internal()->SetMediaChannel(channel_->media_channel());
}
receiver->internal()->SetMediaChannel(channel_ ? channel_->media_channel()
: nullptr);
}
}
@ -268,12 +264,8 @@ bool RtpTransceiver::RemoveReceiver(RtpReceiverInterface* receiver) {
if (it == receivers_.end()) {
return false;
}
// `Stop()` will clear the internally cached pointer to the media channel.
(*it)->internal()->Stop();
// After the receiver has been removed, there's no guarantee that the
// contained media channel isn't deleted shortly after this. To make sure that
// the receiver doesn't spontaneously try to use it's (potentially stale)
// media channel reference, we clear it out.
(*it)->internal()->SetMediaChannel(nullptr);
receivers_.erase(it);
return true;
}

View file

@ -93,6 +93,7 @@ class RtpTransceiverUnifiedPlanTest : public ::testing::Test {
rtc::Thread::Current(),
sender_),
RtpReceiverProxyWithInternal<RtpReceiverInternal>::Create(
rtc::Thread::Current(),
rtc::Thread::Current(),
receiver_),
channel_manager_.get(),
@ -162,6 +163,7 @@ class RtpTransceiverTestForHeaderExtensions : public ::testing::Test {
rtc::Thread::Current(),
sender_),
RtpReceiverProxyWithInternal<RtpReceiverInternal>::Create(
rtc::Thread::Current(),
rtc::Thread::Current(),
receiver_),
channel_manager_.get(),

View file

@ -11,6 +11,7 @@
#include "pc/rtp_transmission_manager.h"
#include <algorithm>
#include <utility>
#include "absl/types/optional.h"
#include "api/peer_connection_interface.h"
@ -240,14 +241,16 @@ RtpTransmissionManager::CreateReceiver(cricket::MediaType media_type,
receiver;
if (media_type == cricket::MEDIA_TYPE_AUDIO) {
receiver = RtpReceiverProxyWithInternal<RtpReceiverInternal>::Create(
signaling_thread(),
new AudioRtpReceiver(worker_thread(), receiver_id,
std::vector<std::string>({}), IsUnifiedPlan()));
signaling_thread(), worker_thread(),
rtc::make_ref_counted<AudioRtpReceiver>(worker_thread(), receiver_id,
std::vector<std::string>({}),
IsUnifiedPlan()));
NoteUsageEvent(UsageEvent::AUDIO_ADDED);
} else {
RTC_DCHECK_EQ(media_type, cricket::MEDIA_TYPE_VIDEO);
receiver = RtpReceiverProxyWithInternal<RtpReceiverInternal>::Create(
signaling_thread(), new VideoRtpReceiver(worker_thread(), receiver_id,
signaling_thread(), worker_thread(),
rtc::make_ref_counted<VideoRtpReceiver>(worker_thread(), receiver_id,
std::vector<std::string>({})));
NoteUsageEvent(UsageEvent::VIDEO_ADDED);
}
@ -453,7 +456,7 @@ void RtpTransmissionManager::CreateAudioReceiver(
streams.push_back(rtc::scoped_refptr<MediaStreamInterface>(stream));
// TODO(https://crbug.com/webrtc/9480): When we remove remote_streams(), use
// the constructor taking stream IDs instead.
auto* audio_receiver = new AudioRtpReceiver(
auto audio_receiver = rtc::make_ref_counted<AudioRtpReceiver>(
worker_thread(), remote_sender_info.sender_id, streams, IsUnifiedPlan());
audio_receiver->SetMediaChannel(voice_media_channel());
if (remote_sender_info.sender_id == kDefaultAudioSenderId) {
@ -462,7 +465,7 @@ void RtpTransmissionManager::CreateAudioReceiver(
audio_receiver->SetupMediaChannel(remote_sender_info.first_ssrc);
}
auto receiver = RtpReceiverProxyWithInternal<RtpReceiverInternal>::Create(
signaling_thread(), audio_receiver);
signaling_thread(), worker_thread(), std::move(audio_receiver));
GetAudioTransceiver()->internal()->AddReceiver(receiver);
Observer()->OnAddTrack(receiver, streams);
NoteUsageEvent(UsageEvent::AUDIO_ADDED);
@ -476,7 +479,7 @@ void RtpTransmissionManager::CreateVideoReceiver(
streams.push_back(rtc::scoped_refptr<MediaStreamInterface>(stream));
// TODO(https://crbug.com/webrtc/9480): When we remove remote_streams(), use
// the constructor taking stream IDs instead.
auto* video_receiver = new VideoRtpReceiver(
auto video_receiver = rtc::make_ref_counted<VideoRtpReceiver>(
worker_thread(), remote_sender_info.sender_id, streams);
video_receiver->SetMediaChannel(video_media_channel());
if (remote_sender_info.sender_id == kDefaultVideoSenderId) {
@ -485,7 +488,7 @@ void RtpTransmissionManager::CreateVideoReceiver(
video_receiver->SetupMediaChannel(remote_sender_info.first_ssrc);
}
auto receiver = RtpReceiverProxyWithInternal<RtpReceiverInternal>::Create(
signaling_thread(), video_receiver);
signaling_thread(), worker_thread(), std::move(video_receiver));
GetVideoTransceiver()->internal()->AddReceiver(receiver);
Observer()->OnAddTrack(receiver, streams);
NoteUsageEvent(UsageEvent::VIDEO_ADDED);

View file

@ -1163,9 +1163,10 @@ void StatsCollector::ExtractMediaInfo(
std::vector<std::unique_ptr<MediaChannelStatsGatherer>> gatherers;
auto transceivers = pc_->GetTransceiversInternal();
{
rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls;
for (const auto& transceiver : pc_->GetTransceiversInternal()) {
for (const auto& transceiver : transceivers) {
cricket::ChannelInterface* channel = transceiver->internal()->channel();
if (!channel) {
continue;
@ -1176,20 +1177,37 @@ void StatsCollector::ExtractMediaInfo(
gatherer->transport_name = transport_names_by_mid.at(gatherer->mid);
for (const auto& sender : transceiver->internal()->senders()) {
std::string track_id = (sender->track() ? sender->track()->id() : "");
auto track = sender->track();
std::string track_id = (track ? track->id() : "");
gatherer->sender_track_id_by_ssrc.insert(
std::make_pair(sender->ssrc(), track_id));
}
for (const auto& receiver : transceiver->internal()->receivers()) {
gatherer->receiver_track_id_by_ssrc.insert(std::make_pair(
receiver->internal()->ssrc(), receiver->track()->id()));
}
// Populating `receiver_track_id_by_ssrc` will be done on the worker
// thread as the `ssrc` property of the receiver needs to be accessed
// there.
gatherers.push_back(std::move(gatherer));
}
}
pc_->worker_thread()->Invoke<void>(RTC_FROM_HERE, [&] {
rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls;
// Populate `receiver_track_id_by_ssrc` for the gatherers.
int i = 0;
for (const auto& transceiver : transceivers) {
cricket::ChannelInterface* channel = transceiver->internal()->channel();
if (!channel)
continue;
MediaChannelStatsGatherer* gatherer = gatherers[i++].get();
RTC_DCHECK_EQ(gatherer->mid, channel->content_name());
for (const auto& receiver : transceiver->internal()->receivers()) {
gatherer->receiver_track_id_by_ssrc.insert(std::make_pair(
receiver->internal()->ssrc(), receiver->track()->id()));
}
}
for (auto it = gatherers.begin(); it != gatherers.end();
/* incremented manually */) {
MediaChannelStatsGatherer* gatherer = it->get();

View file

@ -182,7 +182,7 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase {
// TODO(steveanton): Switch tests to use RtpTransceivers directly.
auto receiver_proxy =
RtpReceiverProxyWithInternal<RtpReceiverInternal>::Create(
signaling_thread_, receiver);
signaling_thread_, worker_thread_, receiver);
GetOrCreateFirstTransceiverOfType(receiver->media_type())
->internal()
->AddReceiver(receiver_proxy);

View file

@ -17,8 +17,6 @@
#include "api/video/recordable_encoded_frame.h"
#include "api/video_track_source_proxy.h"
#include "pc/jitter_buffer_delay.h"
#include "pc/jitter_buffer_delay_proxy.h"
#include "pc/video_track.h"
#include "rtc_base/checks.h"
#include "rtc_base/location.h"
@ -39,7 +37,7 @@ VideoRtpReceiver::VideoRtpReceiver(
const std::vector<rtc::scoped_refptr<MediaStreamInterface>>& streams)
: worker_thread_(worker_thread),
id_(receiver_id),
source_(new RefCountedObject<VideoRtpTrackSource>(this)),
source_(rtc::make_ref_counted<VideoRtpTrackSource>(&source_callback_)),
track_(VideoTrackProxyWithInternal<VideoTrack>::Create(
rtc::Thread::Current(),
worker_thread,
@ -49,111 +47,130 @@ VideoRtpReceiver::VideoRtpReceiver(
worker_thread,
source_),
worker_thread))),
attachment_id_(GenerateUniqueId()),
delay_(JitterBufferDelayProxy::Create(
rtc::Thread::Current(),
worker_thread,
new rtc::RefCountedObject<JitterBufferDelay>(worker_thread))) {
attachment_id_(GenerateUniqueId()) {
RTC_DCHECK(worker_thread_);
SetStreams(streams);
source_->SetState(MediaSourceInterface::kLive);
}
VideoRtpReceiver::~VideoRtpReceiver() {
// Since cricket::VideoRenderer is not reference counted,
// we need to remove it from the channel before we are deleted.
Stop();
// Make sure we can't be called by the |source_| anymore.
worker_thread_->Invoke<void>(RTC_FROM_HERE,
[this] { source_->ClearCallback(); });
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
RTC_DCHECK(stopped_);
RTC_DCHECK(!media_channel_);
}
std::vector<std::string> VideoRtpReceiver::stream_ids() const {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
std::vector<std::string> stream_ids(streams_.size());
for (size_t i = 0; i < streams_.size(); ++i)
stream_ids[i] = streams_[i]->id();
return stream_ids;
}
rtc::scoped_refptr<DtlsTransportInterface> VideoRtpReceiver::dtls_transport()
const {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
return dtls_transport_;
}
std::vector<rtc::scoped_refptr<MediaStreamInterface>>
VideoRtpReceiver::streams() const {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
return streams_;
}
RtpParameters VideoRtpReceiver::GetParameters() const {
if (!media_channel_ || stopped_) {
RTC_DCHECK_RUN_ON(worker_thread_);
if (!media_channel_)
return RtpParameters();
}
return worker_thread_->Invoke<RtpParameters>(RTC_FROM_HERE, [&] {
return ssrc_ ? media_channel_->GetRtpReceiveParameters(*ssrc_)
: media_channel_->GetDefaultRtpReceiveParameters();
});
}
void VideoRtpReceiver::SetFrameDecryptor(
rtc::scoped_refptr<FrameDecryptorInterface> frame_decryptor) {
RTC_DCHECK_RUN_ON(worker_thread_);
frame_decryptor_ = std::move(frame_decryptor);
// Special Case: Set the frame decryptor to any value on any existing channel.
if (media_channel_ && ssrc_.has_value() && !stopped_) {
worker_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
if (media_channel_ && ssrc_) {
media_channel_->SetFrameDecryptor(*ssrc_, frame_decryptor_);
});
}
}
rtc::scoped_refptr<FrameDecryptorInterface>
VideoRtpReceiver::GetFrameDecryptor() const {
RTC_DCHECK_RUN_ON(worker_thread_);
return frame_decryptor_;
}
void VideoRtpReceiver::SetDepacketizerToDecoderFrameTransformer(
rtc::scoped_refptr<FrameTransformerInterface> frame_transformer) {
worker_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
RTC_DCHECK_RUN_ON(worker_thread_);
frame_transformer_ = std::move(frame_transformer);
if (media_channel_ && !stopped_) {
if (media_channel_) {
media_channel_->SetDepacketizerToDecoderFrameTransformer(
ssrc_.value_or(0), frame_transformer_);
}
});
}
void VideoRtpReceiver::Stop() {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
// TODO(deadbeef): Need to do more here to fully stop receiving packets.
if (stopped_) {
return;
}
if (!stopped_) {
source_->SetState(MediaSourceInterface::kEnded);
if (!media_channel_) {
RTC_LOG(LS_WARNING) << "VideoRtpReceiver::Stop: No video channel exists.";
} else {
// Allow that SetSink fails. This is the normal case when the underlying
// media channel has already been deleted.
stopped_ = true;
}
worker_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
RTC_DCHECK_RUN_ON(worker_thread_);
if (media_channel_) {
SetSink(nullptr);
});
SetMediaChannel_w(nullptr);
}
delay_->OnStop();
stopped_ = true;
source_->ClearCallback();
});
}
void VideoRtpReceiver::StopAndEndTrack() {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
Stop();
track_->internal()->set_ended();
}
void VideoRtpReceiver::RestartMediaChannel(absl::optional<uint32_t> ssrc) {
RTC_DCHECK(media_channel_);
if (!stopped_ && ssrc_ == ssrc) {
return;
}
worker_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
// `stopped_` will be `true` on construction. RestartMediaChannel
// can in this case function like "ensure started" and flip `stopped_`
// to false.
// TODO(tommi): Can we restart the media channel without blocking?
bool ok = worker_thread_->Invoke<bool>(RTC_FROM_HERE, [&, was_stopped =
stopped_] {
RTC_DCHECK_RUN_ON(worker_thread_);
if (!stopped_) {
if (!media_channel_) {
// Ignore further negotiations if we've already been stopped and don't
// have an associated media channel.
RTC_DCHECK(was_stopped);
return false; // Can't restart.
}
if (!was_stopped && ssrc_ == ssrc) {
// Already running with that ssrc.
return true;
}
// Disconnect from the previous ssrc.
if (!was_stopped) {
SetSink(nullptr);
}
bool encoded_sink_enabled = saved_encoded_sink_enabled_;
SetEncodedSinkEnabled(false);
stopped_ = false;
ssrc_ = ssrc;
// Set up the new ssrc.
ssrc_ = std::move(ssrc);
SetSink(source_->sink());
if (encoded_sink_enabled) {
SetEncodedSinkEnabled(true);
@ -163,47 +180,62 @@ void VideoRtpReceiver::RestartMediaChannel(absl::optional<uint32_t> ssrc) {
media_channel_->SetDepacketizerToDecoderFrameTransformer(
ssrc_.value_or(0), frame_transformer_);
}
if (media_channel_ && ssrc_) {
if (frame_decryptor_) {
media_channel_->SetFrameDecryptor(*ssrc_, frame_decryptor_);
}
media_channel_->SetBaseMinimumPlayoutDelayMs(*ssrc_, delay_.GetMs());
}
return true;
});
// Attach any existing frame decryptor to the media channel.
MaybeAttachFrameDecryptorToMediaChannel(
ssrc, worker_thread_, frame_decryptor_, media_channel_, stopped_);
// TODO(bugs.webrtc.org/8694): Stop using 0 to mean unsignalled SSRC
// value.
delay_->OnStart(media_channel_, ssrc.value_or(0));
if (!ok)
return;
stopped_ = false;
}
// RTC_RUN_ON(worker_thread_)
void VideoRtpReceiver::SetSink(rtc::VideoSinkInterface<VideoFrame>* sink) {
RTC_DCHECK(media_channel_);
if (ssrc_) {
media_channel_->SetSink(*ssrc_, sink);
return;
}
} else {
media_channel_->SetDefaultSink(sink);
}
}
void VideoRtpReceiver::SetupMediaChannel(uint32_t ssrc) {
if (!media_channel_) {
RTC_LOG(LS_ERROR)
<< "VideoRtpReceiver::SetupMediaChannel: No video channel exists.";
}
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
RestartMediaChannel(ssrc);
}
void VideoRtpReceiver::SetupUnsignaledMediaChannel() {
if (!media_channel_) {
RTC_LOG(LS_ERROR) << "VideoRtpReceiver::SetupUnsignaledMediaChannel: No "
"video channel exists.";
}
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
RestartMediaChannel(absl::nullopt);
}
uint32_t VideoRtpReceiver::ssrc() const {
RTC_DCHECK_RUN_ON(worker_thread_);
return ssrc_.value_or(0);
}
void VideoRtpReceiver::set_stream_ids(std::vector<std::string> stream_ids) {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
SetStreams(CreateStreamsFromIds(std::move(stream_ids)));
}
void VideoRtpReceiver::set_transport(
rtc::scoped_refptr<DtlsTransportInterface> dtls_transport) {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
dtls_transport_ = std::move(dtls_transport);
}
void VideoRtpReceiver::SetStreams(
const std::vector<rtc::scoped_refptr<MediaStreamInterface>>& streams) {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
// Remove remote track from any streams that are going away.
for (const auto& existing_stream : streams_) {
bool removed = true;
@ -236,6 +268,7 @@ void VideoRtpReceiver::SetStreams(
}
void VideoRtpReceiver::SetObserver(RtpReceiverObserverInterface* observer) {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
observer_ = observer;
// Deliver any notifications the observer may have missed by being set late.
if (received_first_packet_ && observer_) {
@ -245,14 +278,31 @@ void VideoRtpReceiver::SetObserver(RtpReceiverObserverInterface* observer) {
void VideoRtpReceiver::SetJitterBufferMinimumDelay(
absl::optional<double> delay_seconds) {
delay_->Set(delay_seconds);
RTC_DCHECK_RUN_ON(worker_thread_);
delay_.Set(delay_seconds);
if (media_channel_ && ssrc_)
media_channel_->SetBaseMinimumPlayoutDelayMs(*ssrc_, delay_.GetMs());
}
void VideoRtpReceiver::SetMediaChannel(cricket::MediaChannel* media_channel) {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
RTC_DCHECK(media_channel == nullptr ||
media_channel->media_type() == media_type());
if (stopped_ && !media_channel)
return;
worker_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
RTC_DCHECK_RUN_ON(worker_thread_);
SetMediaChannel_w(media_channel);
});
}
// RTC_RUN_ON(worker_thread_)
void VideoRtpReceiver::SetMediaChannel_w(cricket::MediaChannel* media_channel) {
if (media_channel == media_channel_)
return;
bool encoded_sink_enabled = saved_encoded_sink_enabled_;
if (encoded_sink_enabled && media_channel_) {
// Turn off the old sink, if any.
@ -275,10 +325,10 @@ void VideoRtpReceiver::SetMediaChannel(cricket::MediaChannel* media_channel) {
ssrc_.value_or(0), frame_transformer_);
}
}
});
}
void VideoRtpReceiver::NotifyFirstPacketReceived() {
RTC_DCHECK_RUN_ON(&signaling_thread_checker_);
if (observer_) {
observer_->OnFirstPacketReceived(media_type());
}
@ -286,11 +336,10 @@ void VideoRtpReceiver::NotifyFirstPacketReceived() {
}
std::vector<RtpSource> VideoRtpReceiver::GetSources() const {
if (!media_channel_ || !ssrc_ || stopped_) {
return {};
}
return worker_thread_->Invoke<std::vector<RtpSource>>(
RTC_FROM_HERE, [&] { return media_channel_->GetSources(*ssrc_); });
RTC_DCHECK_RUN_ON(worker_thread_);
if (!ssrc_ || !media_channel_)
return std::vector<RtpSource>();
return media_channel_->GetSources(*ssrc_);
}
void VideoRtpReceiver::OnGenerateKeyFrame() {
@ -316,20 +365,21 @@ void VideoRtpReceiver::OnEncodedSinkEnabled(bool enable) {
saved_encoded_sink_enabled_ = enable;
}
// RTC_RUN_ON(worker_thread_)
void VideoRtpReceiver::SetEncodedSinkEnabled(bool enable) {
if (media_channel_) {
if (enable) {
if (!media_channel_)
return;
// TODO(bugs.webrtc.org/8694): Stop using 0 to mean unsignalled SSRC
auto source = source_;
const auto ssrc = ssrc_.value_or(0);
if (enable) {
media_channel_->SetRecordableEncodedFrameCallback(
ssrc_.value_or(0),
[source = std::move(source)](const RecordableEncodedFrame& frame) {
ssrc, [source = source_](const RecordableEncodedFrame& frame) {
source->BroadcastRecordableEncodedFrame(frame);
});
} else {
// TODO(bugs.webrtc.org/8694): Stop using 0 to mean unsignalled SSRC
media_channel_->ClearRecordableEncodedFrameCallback(ssrc_.value_or(0));
}
media_channel_->ClearRecordableEncodedFrameCallback(ssrc);
}
}

View file

@ -32,18 +32,18 @@
#include "api/video/video_sink_interface.h"
#include "api/video/video_source_interface.h"
#include "media/base/media_channel.h"
#include "pc/jitter_buffer_delay_interface.h"
#include "pc/jitter_buffer_delay.h"
#include "pc/rtp_receiver.h"
#include "pc/video_rtp_track_source.h"
#include "pc/video_track.h"
#include "rtc_base/ref_counted_object.h"
#include "rtc_base/system/no_unique_address.h"
#include "rtc_base/thread.h"
#include "rtc_base/thread_annotations.h"
namespace webrtc {
class VideoRtpReceiver : public rtc::RefCountedObject<RtpReceiverInternal>,
public VideoRtpTrackSource::Callback {
class VideoRtpReceiver : public RtpReceiverInternal {
public:
// An SSRC of 0 will create a receiver that will match the first SSRC it
// sees. Must be called on signaling thread.
@ -59,23 +59,16 @@ class VideoRtpReceiver : public rtc::RefCountedObject<RtpReceiverInternal>,
virtual ~VideoRtpReceiver();
rtc::scoped_refptr<VideoTrackInterface> video_track() const {
return track_.get();
}
rtc::scoped_refptr<VideoTrackInterface> video_track() const { return track_; }
// RtpReceiverInterface implementation
rtc::scoped_refptr<MediaStreamTrackInterface> track() const override {
return track_.get();
}
rtc::scoped_refptr<DtlsTransportInterface> dtls_transport() const override {
return dtls_transport_;
return track_;
}
rtc::scoped_refptr<DtlsTransportInterface> dtls_transport() const override;
std::vector<std::string> stream_ids() const override;
std::vector<rtc::scoped_refptr<MediaStreamInterface>> streams()
const override {
return streams_;
}
const override;
cricket::MediaType media_type() const override {
return cricket::MEDIA_TYPE_VIDEO;
}
@ -98,13 +91,11 @@ class VideoRtpReceiver : public rtc::RefCountedObject<RtpReceiverInternal>,
void StopAndEndTrack() override;
void SetupMediaChannel(uint32_t ssrc) override;
void SetupUnsignaledMediaChannel() override;
uint32_t ssrc() const override { return ssrc_.value_or(0); }
uint32_t ssrc() const override;
void NotifyFirstPacketReceived() override;
void set_stream_ids(std::vector<std::string> stream_ids) override;
void set_transport(
rtc::scoped_refptr<DtlsTransportInterface> dtls_transport) override {
dtls_transport_ = dtls_transport;
}
rtc::scoped_refptr<DtlsTransportInterface> dtls_transport) override;
void SetStreams(const std::vector<rtc::scoped_refptr<MediaStreamInterface>>&
streams) override;
@ -123,33 +114,68 @@ class VideoRtpReceiver : public rtc::RefCountedObject<RtpReceiverInternal>,
void RestartMediaChannel(absl::optional<uint32_t> ssrc);
void SetSink(rtc::VideoSinkInterface<VideoFrame>* sink)
RTC_RUN_ON(worker_thread_);
void SetMediaChannel_w(cricket::MediaChannel* media_channel)
RTC_RUN_ON(worker_thread_);
// VideoRtpTrackSource::Callback
void OnGenerateKeyFrame() override;
void OnEncodedSinkEnabled(bool enable) override;
void OnGenerateKeyFrame();
void OnEncodedSinkEnabled(bool enable);
void SetEncodedSinkEnabled(bool enable) RTC_RUN_ON(worker_thread_);
class SourceCallback : public VideoRtpTrackSource::Callback {
public:
explicit SourceCallback(VideoRtpReceiver* receiver) : receiver_(receiver) {}
~SourceCallback() override = default;
private:
void OnGenerateKeyFrame() override { receiver_->OnGenerateKeyFrame(); }
void OnEncodedSinkEnabled(bool enable) override {
receiver_->OnEncodedSinkEnabled(enable);
}
VideoRtpReceiver* const receiver_;
} source_callback_{this};
RTC_NO_UNIQUE_ADDRESS SequenceChecker signaling_thread_checker_;
rtc::Thread* const worker_thread_;
const std::string id_;
cricket::VideoMediaChannel* media_channel_ = nullptr;
absl::optional<uint32_t> ssrc_;
// See documentation for `stopped_` below for when a valid media channel
// has been assigned and when this pointer will be null.
cricket::VideoMediaChannel* media_channel_ RTC_GUARDED_BY(worker_thread_) =
nullptr;
absl::optional<uint32_t> ssrc_ RTC_GUARDED_BY(worker_thread_);
// |source_| is held here to be able to change the state of the source when
// the VideoRtpReceiver is stopped.
rtc::scoped_refptr<VideoRtpTrackSource> source_;
rtc::scoped_refptr<VideoTrackProxyWithInternal<VideoTrack>> track_;
std::vector<rtc::scoped_refptr<MediaStreamInterface>> streams_;
bool stopped_ = true;
RtpReceiverObserverInterface* observer_ = nullptr;
bool received_first_packet_ = false;
int attachment_id_ = 0;
rtc::scoped_refptr<FrameDecryptorInterface> frame_decryptor_;
rtc::scoped_refptr<DtlsTransportInterface> dtls_transport_;
const rtc::scoped_refptr<VideoRtpTrackSource> source_;
const rtc::scoped_refptr<VideoTrackProxyWithInternal<VideoTrack>> track_;
std::vector<rtc::scoped_refptr<MediaStreamInterface>> streams_
RTC_GUARDED_BY(&signaling_thread_checker_);
// `stopped` is state that's used on the signaling thread to indicate whether
// a valid `media_channel_` has been assigned and configured. When an instance
// of VideoRtpReceiver is initially created, `stopped_` is true and will
// remain true until either `SetupMediaChannel` or
// `SetupUnsignaledMediaChannel` is called after assigning a media channel.
// After that, `stopped_` will remain false until `Stop()` is called.
// Note, for checking the state of the class on the worker thread,
// check `media_channel_` instead, as that's the main worker thread state.
bool stopped_ RTC_GUARDED_BY(&signaling_thread_checker_) = true;
RtpReceiverObserverInterface* observer_
RTC_GUARDED_BY(&signaling_thread_checker_) = nullptr;
bool received_first_packet_ RTC_GUARDED_BY(&signaling_thread_checker_) =
false;
const int attachment_id_;
rtc::scoped_refptr<FrameDecryptorInterface> frame_decryptor_
RTC_GUARDED_BY(worker_thread_);
rtc::scoped_refptr<DtlsTransportInterface> dtls_transport_
RTC_GUARDED_BY(&signaling_thread_checker_);
rtc::scoped_refptr<FrameTransformerInterface> frame_transformer_
RTC_GUARDED_BY(worker_thread_);
// Allows to thread safely change jitter buffer delay. Handles caching cases
// Stores the minimum jitter buffer delay. Handles caching cases
// if |SetJitterBufferMinimumDelay| is called before start.
rtc::scoped_refptr<JitterBufferDelayInterface> delay_;
JitterBufferDelay delay_ RTC_GUARDED_BY(worker_thread_);
// Records if we should generate a keyframe when |media_channel_| gets set up
// or switched.
bool saved_generate_keyframe_ RTC_GUARDED_BY(worker_thread_) = false;

View file

@ -17,8 +17,10 @@
#include "test/gmock.h"
using ::testing::_;
using ::testing::AnyNumber;
using ::testing::InSequence;
using ::testing::Mock;
using ::testing::NiceMock;
using ::testing::SaveArg;
using ::testing::StrictMock;
@ -53,19 +55,26 @@ class VideoRtpReceiverTest : public testing::Test {
VideoRtpReceiverTest()
: worker_thread_(rtc::Thread::Create()),
channel_(nullptr, cricket::VideoOptions()),
receiver_(new VideoRtpReceiver(worker_thread_.get(),
"receiver",
{"stream"})) {
receiver_(rtc::make_ref_counted<VideoRtpReceiver>(
worker_thread_.get(),
std::string("receiver"),
std::vector<std::string>({"stream"}))) {
worker_thread_->Start();
receiver_->SetMediaChannel(&channel_);
}
~VideoRtpReceiverTest() override {
// Clear expectations that tests may have set up before calling Stop().
Mock::VerifyAndClearExpectations(&channel_);
receiver_->Stop();
}
webrtc::VideoTrackSourceInterface* Source() {
return receiver_->streams()[0]->FindVideoTrack("receiver")->GetSource();
}
std::unique_ptr<rtc::Thread> worker_thread_;
MockVideoMediaChannel channel_;
NiceMock<MockVideoMediaChannel> channel_;
rtc::scoped_refptr<VideoRtpReceiver> receiver_;
};
@ -98,6 +107,10 @@ TEST_F(VideoRtpReceiverTest,
// Switching to a new channel should now not cause calls to GenerateKeyFrame.
StrictMock<MockVideoMediaChannel> channel4(nullptr, cricket::VideoOptions());
receiver_->SetMediaChannel(&channel4);
// We must call Stop() here since the mock media channels live on the stack
// and `receiver_` still has a pointer to those objects.
receiver_->Stop();
}
TEST_F(VideoRtpReceiverTest, EnablesEncodedOutput) {
@ -131,6 +144,10 @@ TEST_F(VideoRtpReceiverTest, DisablesEnablesEncodedOutputOnChannelSwitch) {
Source()->RemoveEncodedSink(&sink);
StrictMock<MockVideoMediaChannel> channel3(nullptr, cricket::VideoOptions());
receiver_->SetMediaChannel(&channel3);
// We must call Stop() here since the mock media channels live on the stack
// and `receiver_` still has a pointer to those objects.
receiver_->Stop();
}
TEST_F(VideoRtpReceiverTest, BroadcastsEncodedFramesWhenEnabled) {