Cleanup of rtc::Thread.

* Updates variable names to be more descriptive.
* Removes unused sensitive delay timing functionality.
* Removes deprecated PostAt() overload.

Bug: webrtc:9883
Change-Id: I68e8072fab345c5b169cbe5602a0a252eb71b5ec
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/165393
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Steve Anton <steveanton@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#30323}
This commit is contained in:
Sebastian Jansson 2020-01-17 14:46:08 +01:00 committed by Commit Bot
parent cea929923b
commit 61380c09e2
3 changed files with 71 additions and 102 deletions

View file

@ -67,7 +67,6 @@ class ScopedAutoReleasePool {
namespace rtc {
namespace {
const int kMaxMsgLatency = 150; // 150 ms
const int kSlowDispatchLoggingThreshold = 50; // 50 ms
class MessageHandlerWithTask final : public MessageHandler {
@ -305,7 +304,7 @@ Thread::Thread(std::unique_ptr<SocketServer> ss)
Thread::Thread(SocketServer* ss, bool do_init)
: fPeekKeep_(false),
dmsgq_next_num_(0),
delayed_next_num_(0),
fInitialized_(false),
fDestroyed_(false),
stop_(0),
@ -406,7 +405,7 @@ bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
int64_t msCurrent = msStart;
while (true) {
// Check for sent messages
ReceiveSends();
ReceiveSendsFromThread(nullptr);
// Check for posted events
int64_t cmsDelayNext = kForever;
@ -421,33 +420,25 @@ bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
// triggered and calculate the next trigger time.
if (first_pass) {
first_pass = false;
while (!dmsgq_.empty()) {
if (msCurrent < dmsgq_.top().msTrigger_) {
cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
while (!delayed_messages_.empty()) {
if (msCurrent < delayed_messages_.top().run_time_ms_) {
cmsDelayNext =
TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent);
break;
}
msgq_.push_back(dmsgq_.top().msg_);
dmsgq_.pop();
messages_.push_back(delayed_messages_.top().msg_);
delayed_messages_.pop();
}
}
// Pull a message off the message queue, if available.
if (msgq_.empty()) {
if (messages_.empty()) {
break;
} else {
*pmsg = msgq_.front();
msgq_.pop_front();
*pmsg = messages_.front();
messages_.pop_front();
}
} // crit_ is released here.
// Log a warning for time-sensitive messages that we're late to deliver.
if (pmsg->ts_sensitive) {
int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
if (delay > 0) {
RTC_LOG_F(LS_WARNING)
<< "id: " << pmsg->message_id
<< " delay: " << (delay + kMaxMsgLatency) << "ms";
}
}
// If this was a dispose message, delete it and skip it.
if (MQID_DISPOSE == pmsg->message_id) {
RTC_DCHECK(nullptr == pmsg->phandler);
@ -495,6 +486,7 @@ void Thread::Post(const Location& posted_from,
uint32_t id,
MessageData* pdata,
bool time_sensitive) {
RTC_DCHECK(!time_sensitive);
if (IsQuitting()) {
delete pdata;
return;
@ -511,45 +503,32 @@ void Thread::Post(const Location& posted_from,
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
if (time_sensitive) {
msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
}
msgq_.push_back(msg);
messages_.push_back(msg);
}
WakeUpSocketServer();
}
void Thread::PostDelayed(const Location& posted_from,
int cmsDelay,
int delay_ms,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {
return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id,
return DoDelayPost(posted_from, delay_ms, TimeAfter(delay_ms), phandler, id,
pdata);
}
void Thread::PostAt(const Location& posted_from,
uint32_t tstamp,
int64_t run_at_ms,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {
// This should work even if it is used (unexpectedly).
int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata);
}
void Thread::PostAt(const Location& posted_from,
int64_t tstamp,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {
return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id,
return DoDelayPost(posted_from, TimeUntil(run_at_ms), run_at_ms, phandler, id,
pdata);
}
void Thread::DoDelayPost(const Location& posted_from,
int64_t cmsDelay,
int64_t tstamp,
int64_t delay_ms,
int64_t run_at_ms,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {
@ -569,13 +548,13 @@ void Thread::DoDelayPost(const Location& posted_from,
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
dmsgq_.push(dmsg);
DelayedMessage delayed(delay_ms, run_at_ms, delayed_next_num_, msg);
delayed_messages_.push(delayed);
// If this message queue processes 1 message every millisecond for 50 days,
// we will wrap this number. Even then, only messages with identical times
// will be misordered, and then only briefly. This is probably ok.
++dmsgq_next_num_;
RTC_DCHECK_NE(0, dmsgq_next_num_);
++delayed_next_num_;
RTC_DCHECK_NE(0, delayed_next_num_);
}
WakeUpSocketServer();
}
@ -583,11 +562,11 @@ void Thread::DoDelayPost(const Location& posted_from,
int Thread::GetDelay() {
CritScope cs(&crit_);
if (!msgq_.empty())
if (!messages_.empty())
return 0;
if (!dmsgq_.empty()) {
int delay = TimeUntil(dmsgq_.top().msTrigger_);
if (!delayed_messages_.empty()) {
int delay = TimeUntil(delayed_messages_.top().run_time_ms_);
if (delay < 0)
delay = 0;
return delay;
@ -612,14 +591,14 @@ void Thread::ClearInternal(MessageHandler* phandler,
// Remove from ordered message queue
for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
for (auto it = messages_.begin(); it != messages_.end();) {
if (it->Match(phandler, id)) {
if (removed) {
removed->push_back(*it);
} else {
delete it->pdata;
}
it = msgq_.erase(it);
it = messages_.erase(it);
} else {
++it;
}
@ -627,9 +606,8 @@ void Thread::ClearInternal(MessageHandler* phandler,
// Remove from priority queue. Not directly iterable, so use this approach
PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
for (PriorityQueue::container_type::iterator it = new_end;
it != dmsgq_.container().end(); ++it) {
auto new_end = delayed_messages_.container().begin();
for (auto it = new_end; it != delayed_messages_.container().end(); ++it) {
if (it->msg_.Match(phandler, id)) {
if (removed) {
removed->push_back(it->msg_);
@ -640,8 +618,9 @@ void Thread::ClearInternal(MessageHandler* phandler,
*new_end++ = *it;
}
}
dmsgq_.container().erase(new_end, dmsgq_.container().end());
dmsgq_.reheap();
delayed_messages_.container().erase(new_end,
delayed_messages_.container().end());
delayed_messages_.reheap();
}
void Thread::Dispatch(Message* pmsg) {
@ -909,10 +888,6 @@ void Thread::Send(const Location& posted_from,
}
}
void Thread::ReceiveSends() {
ReceiveSendsFromThread(nullptr);
}
void Thread::ReceiveSendsFromThread(const Thread* source) {
// Receive a sent message. Cleanup scenarios:
// - thread sending exits: We don't allow this, since thread can exit
@ -935,8 +910,7 @@ void Thread::ReceiveSendsFromThread(const Thread* source) {
}
bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) {
for (std::list<_SendMessage>::iterator it = sendlist_.begin();
it != sendlist_.end(); ++it) {
for (auto it = sendlist_.begin(); it != sendlist_.end(); ++it) {
if (it->thread == source || source == nullptr) {
*msg = *it;
sendlist_.erase(it);
@ -1011,9 +985,7 @@ void Thread::Clear(MessageHandler* phandler,
// Remove messages on sendlist_ with phandler
// Object target cleared: remove from send list, wakeup/set ready
// if sender not null.
std::list<_SendMessage>::iterator iter = sendlist_.begin();
while (iter != sendlist_.end()) {
for (auto iter = sendlist_.begin(); iter != sendlist_.end();) {
_SendMessage smsg = *iter;
if (smsg.msg.Match(phandler, id)) {
if (removed) {

View file

@ -228,24 +228,19 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
int cmsWait = kForever,
bool process_io = true);
virtual bool Peek(Message* pmsg, int cmsWait = 0);
// |time_sensitive| is deprecated and should always be false.
virtual void Post(const Location& posted_from,
MessageHandler* phandler,
uint32_t id = 0,
MessageData* pdata = nullptr,
bool time_sensitive = false);
virtual void PostDelayed(const Location& posted_from,
int cmsDelay,
int delay_ms,
MessageHandler* phandler,
uint32_t id = 0,
MessageData* pdata = nullptr);
virtual void PostAt(const Location& posted_from,
int64_t tstamp,
MessageHandler* phandler,
uint32_t id = 0,
MessageData* pdata = nullptr);
// TODO(honghaiz): Remove this when all the dependencies are removed.
virtual void PostAt(const Location& posted_from,
uint32_t tstamp,
int64_t run_at_ms,
MessageHandler* phandler,
uint32_t id = 0,
MessageData* pdata = nullptr);
@ -253,15 +248,14 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
uint32_t id = MQID_ANY,
MessageList* removed = nullptr);
virtual void Dispatch(Message* pmsg);
virtual void ReceiveSends();
// Amount of time until the next message can be retrieved
virtual int GetDelay();
bool empty() const { return size() == 0u; }
size_t size() const {
CritScope cs(&crit_); // msgq_.size() is not thread safe.
return msgq_.size() + dmsgq_.size() + (fPeekKeep_ ? 1u : 0u);
CritScope cs(&crit_);
return messages_.size() + delayed_messages_.size() + (fPeekKeep_ ? 1u : 0u);
}
// Internally posts a message which causes the doomed object to be deleted
@ -431,6 +425,33 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
#endif
protected:
// DelayedMessage goes into a priority queue, sorted by trigger time. Messages
// with the same trigger time are processed in num_ (FIFO) order.
class DelayedMessage {
public:
DelayedMessage(int64_t delay,
int64_t run_time_ms,
uint32_t num,
const Message& msg)
: delay_ms_(delay),
run_time_ms_(run_time_ms),
message_number_(num),
msg_(msg) {}
bool operator<(const DelayedMessage& dmsg) const {
return (dmsg.run_time_ms_ < run_time_ms_) ||
((dmsg.run_time_ms_ == run_time_ms_) &&
(dmsg.message_number_ < message_number_));
}
int64_t delay_ms_; // for debugging
int64_t run_time_ms_;
// Monotonicaly incrementing number used for ordering of messages
// targeted to execute at the same time.
uint32_t message_number_;
Message msg_;
};
class PriorityQueue : public std::priority_queue<DelayedMessage> {
public:
container_type& container() { return c; }
@ -520,9 +541,9 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
bool fPeekKeep_;
Message msgPeek_;
MessageList msgq_ RTC_GUARDED_BY(crit_);
PriorityQueue dmsgq_ RTC_GUARDED_BY(crit_);
uint32_t dmsgq_next_num_ RTC_GUARDED_BY(crit_);
MessageList messages_ RTC_GUARDED_BY(crit_);
PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_);
uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_);
CriticalSection crit_;
bool fInitialized_;
bool fDestroyed_;

View file

@ -101,8 +101,7 @@ const uint32_t MQID_DISPOSE = static_cast<uint32_t>(-2);
// No destructor
struct Message {
Message()
: phandler(nullptr), message_id(0), pdata(nullptr), ts_sensitive(0) {}
Message() : phandler(nullptr), message_id(0), pdata(nullptr) {}
inline bool Match(MessageHandler* handler, uint32_t id) const {
return (handler == nullptr || handler == phandler) &&
(id == MQID_ANY || id == message_id);
@ -111,31 +110,8 @@ struct Message {
MessageHandler* phandler;
uint32_t message_id;
MessageData* pdata;
int64_t ts_sensitive;
};
typedef std::list<Message> MessageList;
// DelayedMessage goes into a priority queue, sorted by trigger time. Messages
// with the same trigger time are processed in num_ (FIFO) order.
class DelayedMessage {
public:
DelayedMessage(int64_t delay,
int64_t trigger,
uint32_t num,
const Message& msg)
: cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) {}
bool operator<(const DelayedMessage& dmsg) const {
return (dmsg.msTrigger_ < msTrigger_) ||
((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_));
}
int64_t cmsDelay_; // for debugging
int64_t msTrigger_;
uint32_t num_;
Message msg_;
};
} // namespace rtc
#endif // RTC_BASE_THREAD_MESSAGE_H_