mirror of
https://github.com/mollyim/webrtc.git
synced 2025-05-13 05:40:42 +01:00
Fix ABA problem when iterating epoll events.
Original patch contributed by andrey.semashev@gmail.com. In PhysicalSocketServer::WaitEpoll(), the loop verifies that the signalled dispatcher is in dispatchers_ set. It does so by looking up the dispatcher pointer in the set. This is vulnerable to the ABA problem because one dispatcher may be removed and destroyed and another created and added with the same address before epoll reports an event for the old dispatcher. The same issue exists for other Wait implementations, if a dispatcher is removed and a new one added with the same socket handle is the old. This is avoided by using a 64-bit key for looking up the dispatcher in the set. The key is set from a running counter which gets incremented when a dispatcher is added to the set, so even if the same dispatcher pointer is added, removed and added again, the key value will be different. This changes the storage of dispatchers_ from a set to a flat_hash_map, which uses a bit more memory but has faster lookup (O(1) as opposed to O(log n)). Bug: webrtc:11124 Change-Id: I6d206e1a367b58ba971edca9b48af7664384b797 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/181027 Commit-Queue: Taylor <deadbeef@webrtc.org> Reviewed-by: Karl Wiberg <kwiberg@webrtc.org> Cr-Commit-Position: refs/heads/master@{#32019}
This commit is contained in:
parent
81de439281
commit
7b69a44c8b
6 changed files with 183 additions and 107 deletions
|
@ -802,6 +802,7 @@ rtc_library("rtc_base") {
|
||||||
]
|
]
|
||||||
absl_deps = [
|
absl_deps = [
|
||||||
"//third_party/abseil-cpp/absl/algorithm:container",
|
"//third_party/abseil-cpp/absl/algorithm:container",
|
||||||
|
"//third_party/abseil-cpp/absl/container:flat_hash_map",
|
||||||
"//third_party/abseil-cpp/absl/memory",
|
"//third_party/abseil-cpp/absl/memory",
|
||||||
"//third_party/abseil-cpp/absl/strings",
|
"//third_party/abseil-cpp/absl/strings",
|
||||||
"//third_party/abseil-cpp/absl/types:optional",
|
"//third_party/abseil-cpp/absl/types:optional",
|
||||||
|
|
|
@ -103,6 +103,20 @@ typedef char* SockOptArg;
|
||||||
#endif
|
#endif
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
class ScopedSetTrue {
|
||||||
|
public:
|
||||||
|
ScopedSetTrue(bool* value) : value_(value) {
|
||||||
|
RTC_DCHECK(!*value_);
|
||||||
|
*value_ = true;
|
||||||
|
}
|
||||||
|
~ScopedSetTrue() { *value_ = false; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
bool* value_;
|
||||||
|
};
|
||||||
|
} // namespace
|
||||||
|
|
||||||
namespace rtc {
|
namespace rtc {
|
||||||
|
|
||||||
std::unique_ptr<SocketServer> SocketServer::CreateDefault() {
|
std::unique_ptr<SocketServer> SocketServer::CreateDefault() {
|
||||||
|
@ -835,7 +849,7 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) {
|
||||||
|
|
||||||
#if defined(WEBRTC_USE_EPOLL)
|
#if defined(WEBRTC_USE_EPOLL)
|
||||||
|
|
||||||
static int GetEpollEvents(uint32_t ff) {
|
inline static int GetEpollEvents(uint32_t ff) {
|
||||||
int events = 0;
|
int events = 0;
|
||||||
if (ff & (DE_READ | DE_ACCEPT)) {
|
if (ff & (DE_READ | DE_ACCEPT)) {
|
||||||
events |= EPOLLIN;
|
events |= EPOLLIN;
|
||||||
|
@ -1061,7 +1075,8 @@ PhysicalSocketServer::~PhysicalSocketServer() {
|
||||||
close(epoll_fd_);
|
close(epoll_fd_);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
RTC_DCHECK(dispatchers_.empty());
|
RTC_DCHECK(dispatcher_by_key_.empty());
|
||||||
|
RTC_DCHECK(key_by_dispatcher_.empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
void PhysicalSocketServer::WakeUp() {
|
void PhysicalSocketServer::WakeUp() {
|
||||||
|
@ -1100,45 +1115,32 @@ AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) {
|
||||||
|
|
||||||
void PhysicalSocketServer::Add(Dispatcher* pdispatcher) {
|
void PhysicalSocketServer::Add(Dispatcher* pdispatcher) {
|
||||||
CritScope cs(&crit_);
|
CritScope cs(&crit_);
|
||||||
if (processing_dispatchers_) {
|
if (key_by_dispatcher_.count(pdispatcher)) {
|
||||||
// A dispatcher is being added while a "Wait" call is processing the
|
RTC_LOG(LS_WARNING)
|
||||||
// list of socket events.
|
<< "PhysicalSocketServer asked to add a duplicate dispatcher.";
|
||||||
// Defer adding to "dispatchers_" set until processing is done to avoid
|
return;
|
||||||
// invalidating the iterator in "Wait".
|
|
||||||
pending_remove_dispatchers_.erase(pdispatcher);
|
|
||||||
pending_add_dispatchers_.insert(pdispatcher);
|
|
||||||
} else {
|
|
||||||
dispatchers_.insert(pdispatcher);
|
|
||||||
}
|
}
|
||||||
|
uint64_t key = next_dispatcher_key_++;
|
||||||
|
dispatcher_by_key_.emplace(key, pdispatcher);
|
||||||
|
key_by_dispatcher_.emplace(pdispatcher, key);
|
||||||
#if defined(WEBRTC_USE_EPOLL)
|
#if defined(WEBRTC_USE_EPOLL)
|
||||||
if (epoll_fd_ != INVALID_SOCKET) {
|
if (epoll_fd_ != INVALID_SOCKET) {
|
||||||
AddEpoll(pdispatcher);
|
AddEpoll(pdispatcher, key);
|
||||||
}
|
}
|
||||||
#endif // WEBRTC_USE_EPOLL
|
#endif // WEBRTC_USE_EPOLL
|
||||||
}
|
}
|
||||||
|
|
||||||
void PhysicalSocketServer::Remove(Dispatcher* pdispatcher) {
|
void PhysicalSocketServer::Remove(Dispatcher* pdispatcher) {
|
||||||
CritScope cs(&crit_);
|
CritScope cs(&crit_);
|
||||||
if (processing_dispatchers_) {
|
if (!key_by_dispatcher_.count(pdispatcher)) {
|
||||||
// A dispatcher is being removed while a "Wait" call is processing the
|
|
||||||
// list of socket events.
|
|
||||||
// Defer removal from "dispatchers_" set until processing is done to avoid
|
|
||||||
// invalidating the iterator in "Wait".
|
|
||||||
if (!pending_add_dispatchers_.erase(pdispatcher) &&
|
|
||||||
dispatchers_.find(pdispatcher) == dispatchers_.end()) {
|
|
||||||
RTC_LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
|
|
||||||
"dispatcher, potentially from a duplicate call to "
|
|
||||||
"Add.";
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
pending_remove_dispatchers_.insert(pdispatcher);
|
|
||||||
} else if (!dispatchers_.erase(pdispatcher)) {
|
|
||||||
RTC_LOG(LS_WARNING)
|
RTC_LOG(LS_WARNING)
|
||||||
<< "PhysicalSocketServer asked to remove a unknown "
|
<< "PhysicalSocketServer asked to remove a unknown "
|
||||||
"dispatcher, potentially from a duplicate call to Add.";
|
"dispatcher, potentially from a duplicate call to Add.";
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
uint64_t key = key_by_dispatcher_.at(pdispatcher);
|
||||||
|
key_by_dispatcher_.erase(pdispatcher);
|
||||||
|
dispatcher_by_key_.erase(key);
|
||||||
#if defined(WEBRTC_USE_EPOLL)
|
#if defined(WEBRTC_USE_EPOLL)
|
||||||
if (epoll_fd_ != INVALID_SOCKET) {
|
if (epoll_fd_ != INVALID_SOCKET) {
|
||||||
RemoveEpoll(pdispatcher);
|
RemoveEpoll(pdispatcher);
|
||||||
|
@ -1152,34 +1154,22 @@ void PhysicalSocketServer::Update(Dispatcher* pdispatcher) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Don't update dispatchers that haven't yet been added.
|
||||||
CritScope cs(&crit_);
|
CritScope cs(&crit_);
|
||||||
if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
|
if (!key_by_dispatcher_.count(pdispatcher)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
UpdateEpoll(pdispatcher);
|
UpdateEpoll(pdispatcher, key_by_dispatcher_.at(pdispatcher));
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
void PhysicalSocketServer::AddRemovePendingDispatchers() {
|
|
||||||
if (!pending_add_dispatchers_.empty()) {
|
|
||||||
for (Dispatcher* pdispatcher : pending_add_dispatchers_) {
|
|
||||||
dispatchers_.insert(pdispatcher);
|
|
||||||
}
|
|
||||||
pending_add_dispatchers_.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!pending_remove_dispatchers_.empty()) {
|
|
||||||
for (Dispatcher* pdispatcher : pending_remove_dispatchers_) {
|
|
||||||
dispatchers_.erase(pdispatcher);
|
|
||||||
}
|
|
||||||
pending_remove_dispatchers_.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#if defined(WEBRTC_POSIX)
|
#if defined(WEBRTC_POSIX)
|
||||||
|
|
||||||
bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
||||||
|
// We don't support reentrant waiting.
|
||||||
|
RTC_DCHECK(!waiting_);
|
||||||
|
ScopedSetTrue s(&waiting_);
|
||||||
#if defined(WEBRTC_USE_EPOLL)
|
#if defined(WEBRTC_USE_EPOLL)
|
||||||
// We don't keep a dedicated "epoll" descriptor containing only the non-IO
|
// We don't keep a dedicated "epoll" descriptor containing only the non-IO
|
||||||
// (i.e. signaling) dispatcher, so "poll" will be used instead of the default
|
// (i.e. signaling) dispatcher, so "poll" will be used instead of the default
|
||||||
|
@ -1205,6 +1195,9 @@ static void ProcessEvents(Dispatcher* dispatcher,
|
||||||
&len);
|
&len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Most often the socket is writable or readable or both, so make a single
|
||||||
|
// virtual call to get requested events
|
||||||
|
const uint32_t requested_events = dispatcher->GetRequestedEvents();
|
||||||
uint32_t ff = 0;
|
uint32_t ff = 0;
|
||||||
|
|
||||||
// Check readable descriptors. If we're waiting on an accept, signal
|
// Check readable descriptors. If we're waiting on an accept, signal
|
||||||
|
@ -1212,7 +1205,7 @@ static void ProcessEvents(Dispatcher* dispatcher,
|
||||||
// readable or really closed.
|
// readable or really closed.
|
||||||
// TODO(pthatcher): Only peek at TCP descriptors.
|
// TODO(pthatcher): Only peek at TCP descriptors.
|
||||||
if (readable) {
|
if (readable) {
|
||||||
if (dispatcher->GetRequestedEvents() & DE_ACCEPT) {
|
if (requested_events & DE_ACCEPT) {
|
||||||
ff |= DE_ACCEPT;
|
ff |= DE_ACCEPT;
|
||||||
} else if (errcode || dispatcher->IsDescriptorClosed()) {
|
} else if (errcode || dispatcher->IsDescriptorClosed()) {
|
||||||
ff |= DE_CLOSE;
|
ff |= DE_CLOSE;
|
||||||
|
@ -1224,7 +1217,7 @@ static void ProcessEvents(Dispatcher* dispatcher,
|
||||||
// Check writable descriptors. If we're waiting on a connect, detect
|
// Check writable descriptors. If we're waiting on a connect, detect
|
||||||
// success versus failure by the reaped error code.
|
// success versus failure by the reaped error code.
|
||||||
if (writable) {
|
if (writable) {
|
||||||
if (dispatcher->GetRequestedEvents() & DE_CONNECT) {
|
if (requested_events & DE_CONNECT) {
|
||||||
if (!errcode) {
|
if (!errcode) {
|
||||||
ff |= DE_CONNECT;
|
ff |= DE_CONNECT;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1258,13 +1251,9 @@ bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
|
||||||
stop_us = rtc::TimeMicros() + cmsWait * 1000;
|
stop_us = rtc::TimeMicros() + cmsWait * 1000;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Zero all fd_sets. Don't need to do this inside the loop since
|
|
||||||
// select() zeros the descriptors not signaled
|
|
||||||
|
|
||||||
fd_set fdsRead;
|
fd_set fdsRead;
|
||||||
FD_ZERO(&fdsRead);
|
|
||||||
fd_set fdsWrite;
|
fd_set fdsWrite;
|
||||||
FD_ZERO(&fdsWrite);
|
|
||||||
// Explicitly unpoison these FDs on MemorySanitizer which doesn't handle the
|
// Explicitly unpoison these FDs on MemorySanitizer which doesn't handle the
|
||||||
// inline assembly in FD_ZERO.
|
// inline assembly in FD_ZERO.
|
||||||
// http://crbug.com/344505
|
// http://crbug.com/344505
|
||||||
|
@ -1276,16 +1265,22 @@ bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
|
||||||
fWait_ = true;
|
fWait_ = true;
|
||||||
|
|
||||||
while (fWait_) {
|
while (fWait_) {
|
||||||
|
// Zero all fd_sets. Although select() zeros the descriptors not signaled,
|
||||||
|
// we may need to do this for dispatchers that were deleted while
|
||||||
|
// iterating.
|
||||||
|
FD_ZERO(&fdsRead);
|
||||||
|
FD_ZERO(&fdsWrite);
|
||||||
int fdmax = -1;
|
int fdmax = -1;
|
||||||
{
|
{
|
||||||
CritScope cr(&crit_);
|
CritScope cr(&crit_);
|
||||||
// TODO(jbauch): Support re-entrant waiting.
|
current_dispatcher_keys_.clear();
|
||||||
RTC_DCHECK(!processing_dispatchers_);
|
for (auto const& kv : dispatcher_by_key_) {
|
||||||
for (Dispatcher* pdispatcher : dispatchers_) {
|
uint64_t key = kv.first;
|
||||||
|
Dispatcher* pdispatcher = kv.second;
|
||||||
// Query dispatchers for read and write wait state
|
// Query dispatchers for read and write wait state
|
||||||
RTC_DCHECK(pdispatcher);
|
|
||||||
if (!process_io && (pdispatcher != signal_wakeup_))
|
if (!process_io && (pdispatcher != signal_wakeup_))
|
||||||
continue;
|
continue;
|
||||||
|
current_dispatcher_keys_.push_back(key);
|
||||||
int fd = pdispatcher->GetDescriptor();
|
int fd = pdispatcher->GetDescriptor();
|
||||||
// "select"ing a file descriptor that is equal to or larger than
|
// "select"ing a file descriptor that is equal to or larger than
|
||||||
// FD_SETSIZE will result in undefined behavior.
|
// FD_SETSIZE will result in undefined behavior.
|
||||||
|
@ -1323,8 +1318,14 @@ bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
|
||||||
} else {
|
} else {
|
||||||
// We have signaled descriptors
|
// We have signaled descriptors
|
||||||
CritScope cr(&crit_);
|
CritScope cr(&crit_);
|
||||||
processing_dispatchers_ = true;
|
// Iterate only on the dispatchers whose sockets were passed into
|
||||||
for (Dispatcher* pdispatcher : dispatchers_) {
|
// WSAEventSelect; this avoids the ABA problem (a socket being
|
||||||
|
// destroyed and a new one created with the same file descriptor).
|
||||||
|
for (uint64_t key : current_dispatcher_keys_) {
|
||||||
|
if (!dispatcher_by_key_.count(key))
|
||||||
|
continue;
|
||||||
|
Dispatcher* pdispatcher = dispatcher_by_key_.at(key);
|
||||||
|
|
||||||
int fd = pdispatcher->GetDescriptor();
|
int fd = pdispatcher->GetDescriptor();
|
||||||
|
|
||||||
bool readable = FD_ISSET(fd, &fdsRead);
|
bool readable = FD_ISSET(fd, &fdsRead);
|
||||||
|
@ -1340,11 +1341,6 @@ bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
|
||||||
// The error code can be signaled through reads or writes.
|
// The error code can be signaled through reads or writes.
|
||||||
ProcessEvents(pdispatcher, readable, writable, readable || writable);
|
ProcessEvents(pdispatcher, readable, writable, readable || writable);
|
||||||
}
|
}
|
||||||
|
|
||||||
processing_dispatchers_ = false;
|
|
||||||
// Process deferred dispatchers that have been added/removed while the
|
|
||||||
// events were handled above.
|
|
||||||
AddRemovePendingDispatchers();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Recalc the time remaining to wait. Doing it here means it doesn't get
|
// Recalc the time remaining to wait. Doing it here means it doesn't get
|
||||||
|
@ -1365,7 +1361,7 @@ bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
|
||||||
|
|
||||||
#if defined(WEBRTC_USE_EPOLL)
|
#if defined(WEBRTC_USE_EPOLL)
|
||||||
|
|
||||||
void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) {
|
void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher, uint64_t key) {
|
||||||
RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
|
RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
|
||||||
int fd = pdispatcher->GetDescriptor();
|
int fd = pdispatcher->GetDescriptor();
|
||||||
RTC_DCHECK(fd != INVALID_SOCKET);
|
RTC_DCHECK(fd != INVALID_SOCKET);
|
||||||
|
@ -1375,7 +1371,7 @@ void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) {
|
||||||
|
|
||||||
struct epoll_event event = {0};
|
struct epoll_event event = {0};
|
||||||
event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
|
event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
|
||||||
event.data.ptr = pdispatcher;
|
event.data.u64 = key;
|
||||||
int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event);
|
int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event);
|
||||||
RTC_DCHECK_EQ(err, 0);
|
RTC_DCHECK_EQ(err, 0);
|
||||||
if (err == -1) {
|
if (err == -1) {
|
||||||
|
@ -1404,7 +1400,7 @@ void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher) {
|
void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher, uint64_t key) {
|
||||||
RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
|
RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
|
||||||
int fd = pdispatcher->GetDescriptor();
|
int fd = pdispatcher->GetDescriptor();
|
||||||
RTC_DCHECK(fd != INVALID_SOCKET);
|
RTC_DCHECK(fd != INVALID_SOCKET);
|
||||||
|
@ -1414,7 +1410,7 @@ void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher) {
|
||||||
|
|
||||||
struct epoll_event event = {0};
|
struct epoll_event event = {0};
|
||||||
event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
|
event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
|
||||||
event.data.ptr = pdispatcher;
|
event.data.u64 = key;
|
||||||
int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event);
|
int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event);
|
||||||
RTC_DCHECK_EQ(err, 0);
|
RTC_DCHECK_EQ(err, 0);
|
||||||
if (err == -1) {
|
if (err == -1) {
|
||||||
|
@ -1456,11 +1452,12 @@ bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
|
||||||
CritScope cr(&crit_);
|
CritScope cr(&crit_);
|
||||||
for (int i = 0; i < n; ++i) {
|
for (int i = 0; i < n; ++i) {
|
||||||
const epoll_event& event = epoll_events_[i];
|
const epoll_event& event = epoll_events_[i];
|
||||||
Dispatcher* pdispatcher = static_cast<Dispatcher*>(event.data.ptr);
|
uint64_t key = event.data.u64;
|
||||||
if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
|
if (!dispatcher_by_key_.count(key)) {
|
||||||
// The dispatcher for this socket no longer exists.
|
// The dispatcher for this socket no longer exists.
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
Dispatcher* pdispatcher = dispatcher_by_key_.at(key);
|
||||||
|
|
||||||
bool readable = (event.events & (EPOLLIN | EPOLLPRI));
|
bool readable = (event.events & (EPOLLIN | EPOLLPRI));
|
||||||
bool writable = (event.events & EPOLLOUT);
|
bool writable = (event.events & EPOLLOUT);
|
||||||
|
@ -1472,7 +1469,7 @@ bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
|
||||||
|
|
||||||
if (cmsWait != kForever) {
|
if (cmsWait != kForever) {
|
||||||
tvWait = TimeDiff(tvStop, TimeMillis());
|
tvWait = TimeDiff(tvStop, TimeMillis());
|
||||||
if (tvWait < 0) {
|
if (tvWait <= 0) {
|
||||||
// Return success on timeout.
|
// Return success on timeout.
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -1555,6 +1552,10 @@ bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) {
|
||||||
|
|
||||||
#if defined(WEBRTC_WIN)
|
#if defined(WEBRTC_WIN)
|
||||||
bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
||||||
|
// We don't support reentrant waiting.
|
||||||
|
RTC_DCHECK(!waiting_);
|
||||||
|
ScopedSetTrue s(&waiting_);
|
||||||
|
|
||||||
int64_t cmsTotal = cmsWait;
|
int64_t cmsTotal = cmsWait;
|
||||||
int64_t cmsElapsed = 0;
|
int64_t cmsElapsed = 0;
|
||||||
int64_t msStart = Time();
|
int64_t msStart = Time();
|
||||||
|
@ -1562,37 +1563,40 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
||||||
fWait_ = true;
|
fWait_ = true;
|
||||||
while (fWait_) {
|
while (fWait_) {
|
||||||
std::vector<WSAEVENT> events;
|
std::vector<WSAEVENT> events;
|
||||||
std::vector<Dispatcher*> event_owners;
|
std::vector<uint64_t> event_owners;
|
||||||
|
|
||||||
events.push_back(socket_ev_);
|
events.push_back(socket_ev_);
|
||||||
|
|
||||||
{
|
{
|
||||||
CritScope cr(&crit_);
|
CritScope cr(&crit_);
|
||||||
// TODO(jbauch): Support re-entrant waiting.
|
// Get a snapshot of all current dispatchers; this is used to avoid the
|
||||||
RTC_DCHECK(!processing_dispatchers_);
|
// ABA problem (see later comment) and avoids the dispatcher_by_key_
|
||||||
|
// iterator being invalidated by calling CheckSignalClose, which may
|
||||||
// Calling "CheckSignalClose" might remove a closed dispatcher from the
|
// remove the dispatcher from the list.
|
||||||
// set. This must be deferred to prevent invalidating the iterator.
|
current_dispatcher_keys_.clear();
|
||||||
processing_dispatchers_ = true;
|
for (auto const& kv : dispatcher_by_key_) {
|
||||||
for (Dispatcher* disp : dispatchers_) {
|
current_dispatcher_keys_.push_back(kv.first);
|
||||||
|
}
|
||||||
|
for (uint64_t key : current_dispatcher_keys_) {
|
||||||
|
if (!dispatcher_by_key_.count(key)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Dispatcher* disp = dispatcher_by_key_.at(key);
|
||||||
|
if (!disp)
|
||||||
|
continue;
|
||||||
if (!process_io && (disp != signal_wakeup_))
|
if (!process_io && (disp != signal_wakeup_))
|
||||||
continue;
|
continue;
|
||||||
SOCKET s = disp->GetSocket();
|
SOCKET s = disp->GetSocket();
|
||||||
if (disp->CheckSignalClose()) {
|
if (disp->CheckSignalClose()) {
|
||||||
// We just signalled close, don't poll this socket
|
// We just signalled close, don't poll this socket.
|
||||||
} else if (s != INVALID_SOCKET) {
|
} else if (s != INVALID_SOCKET) {
|
||||||
WSAEventSelect(s, events[0],
|
WSAEventSelect(s, events[0],
|
||||||
FlagsToEvents(disp->GetRequestedEvents()));
|
FlagsToEvents(disp->GetRequestedEvents()));
|
||||||
} else {
|
} else {
|
||||||
events.push_back(disp->GetWSAEvent());
|
events.push_back(disp->GetWSAEvent());
|
||||||
event_owners.push_back(disp);
|
event_owners.push_back(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
processing_dispatchers_ = false;
|
|
||||||
// Process deferred dispatchers that have been added/removed while the
|
|
||||||
// events were handled above.
|
|
||||||
AddRemovePendingDispatchers();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Which is shorter, the delay wait or the asked wait?
|
// Which is shorter, the delay wait or the asked wait?
|
||||||
|
@ -1624,15 +1628,23 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
||||||
int index = dw - WSA_WAIT_EVENT_0;
|
int index = dw - WSA_WAIT_EVENT_0;
|
||||||
if (index > 0) {
|
if (index > 0) {
|
||||||
--index; // The first event is the socket event
|
--index; // The first event is the socket event
|
||||||
Dispatcher* disp = event_owners[index];
|
uint64_t key = event_owners[index];
|
||||||
|
if (!dispatcher_by_key_.count(key)) {
|
||||||
// The dispatcher could have been removed while waiting for events.
|
// The dispatcher could have been removed while waiting for events.
|
||||||
if (dispatchers_.find(disp) != dispatchers_.end()) {
|
continue;
|
||||||
|
}
|
||||||
|
Dispatcher* disp = dispatcher_by_key_.at(key);
|
||||||
disp->OnPreEvent(0);
|
disp->OnPreEvent(0);
|
||||||
disp->OnEvent(0, 0);
|
disp->OnEvent(0, 0);
|
||||||
}
|
|
||||||
} else if (process_io) {
|
} else if (process_io) {
|
||||||
processing_dispatchers_ = true;
|
// Iterate only on the dispatchers whose sockets were passed into
|
||||||
for (Dispatcher* disp : dispatchers_) {
|
// WSAEventSelect; this avoids the ABA problem (a socket being
|
||||||
|
// destroyed and a new one created with the same SOCKET handle).
|
||||||
|
for (uint64_t key : current_dispatcher_keys_) {
|
||||||
|
if (!dispatcher_by_key_.count(key)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Dispatcher* disp = dispatcher_by_key_.at(key);
|
||||||
SOCKET s = disp->GetSocket();
|
SOCKET s = disp->GetSocket();
|
||||||
if (s == INVALID_SOCKET)
|
if (s == INVALID_SOCKET)
|
||||||
continue;
|
continue;
|
||||||
|
@ -1698,11 +1710,6 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
processing_dispatchers_ = false;
|
|
||||||
// Process deferred dispatchers that have been added/removed while the
|
|
||||||
// events were handled above.
|
|
||||||
AddRemovePendingDispatchers();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset the network event until new activity occurs
|
// Reset the network event until new activity occurs
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
|
|
||||||
#include <array>
|
#include <array>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <set>
|
#include <unordered_map>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
#include "rtc_base/deprecated/recursive_critical_section.h"
|
#include "rtc_base/deprecated/recursive_critical_section.h"
|
||||||
|
@ -85,17 +85,13 @@ class RTC_EXPORT PhysicalSocketServer : public SocketServer {
|
||||||
// The number of events to process with one call to "epoll_wait".
|
// The number of events to process with one call to "epoll_wait".
|
||||||
static constexpr size_t kNumEpollEvents = 128;
|
static constexpr size_t kNumEpollEvents = 128;
|
||||||
|
|
||||||
typedef std::set<Dispatcher*> DispatcherSet;
|
|
||||||
|
|
||||||
void AddRemovePendingDispatchers() RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
|
|
||||||
|
|
||||||
#if defined(WEBRTC_POSIX)
|
#if defined(WEBRTC_POSIX)
|
||||||
bool WaitSelect(int cms, bool process_io);
|
bool WaitSelect(int cms, bool process_io);
|
||||||
#endif // WEBRTC_POSIX
|
#endif // WEBRTC_POSIX
|
||||||
#if defined(WEBRTC_USE_EPOLL)
|
#if defined(WEBRTC_USE_EPOLL)
|
||||||
void AddEpoll(Dispatcher* dispatcher);
|
void AddEpoll(Dispatcher* dispatcher, uint64_t key);
|
||||||
void RemoveEpoll(Dispatcher* dispatcher);
|
void RemoveEpoll(Dispatcher* dispatcher);
|
||||||
void UpdateEpoll(Dispatcher* dispatcher);
|
void UpdateEpoll(Dispatcher* dispatcher, uint64_t key);
|
||||||
bool WaitEpoll(int cms);
|
bool WaitEpoll(int cms);
|
||||||
bool WaitPoll(int cms, Dispatcher* dispatcher);
|
bool WaitPoll(int cms, Dispatcher* dispatcher);
|
||||||
|
|
||||||
|
@ -106,16 +102,31 @@ class RTC_EXPORT PhysicalSocketServer : public SocketServer {
|
||||||
std::array<epoll_event, kNumEpollEvents> epoll_events_;
|
std::array<epoll_event, kNumEpollEvents> epoll_events_;
|
||||||
const int epoll_fd_ = INVALID_SOCKET;
|
const int epoll_fd_ = INVALID_SOCKET;
|
||||||
#endif // WEBRTC_USE_EPOLL
|
#endif // WEBRTC_USE_EPOLL
|
||||||
DispatcherSet dispatchers_ RTC_GUARDED_BY(crit_);
|
// uint64_t keys are used to uniquely identify a dispatcher in order to avoid
|
||||||
DispatcherSet pending_add_dispatchers_ RTC_GUARDED_BY(crit_);
|
// the ABA problem during the epoll loop (a dispatcher being destroyed and
|
||||||
DispatcherSet pending_remove_dispatchers_ RTC_GUARDED_BY(crit_);
|
// replaced by one with the same address).
|
||||||
bool processing_dispatchers_ RTC_GUARDED_BY(crit_) = false;
|
uint64_t next_dispatcher_key_ RTC_GUARDED_BY(crit_) = 0;
|
||||||
|
std::unordered_map<uint64_t, Dispatcher*> dispatcher_by_key_
|
||||||
|
RTC_GUARDED_BY(crit_);
|
||||||
|
// Reverse lookup necessary for removals/updates.
|
||||||
|
std::unordered_map<Dispatcher*, uint64_t> key_by_dispatcher_
|
||||||
|
RTC_GUARDED_BY(crit_);
|
||||||
|
// A list of dispatcher keys that we're interested in for the current
|
||||||
|
// select() or WSAWaitForMultipleEvents() loop. Again, used to avoid the ABA
|
||||||
|
// problem (a socket being destroyed and a new one created with the same
|
||||||
|
// handle, erroneously receiving the events from the destroyed socket).
|
||||||
|
//
|
||||||
|
// Kept as a member variable just for efficiency.
|
||||||
|
std::vector<uint64_t> current_dispatcher_keys_;
|
||||||
Signaler* signal_wakeup_; // Assigned in constructor only
|
Signaler* signal_wakeup_; // Assigned in constructor only
|
||||||
RecursiveCriticalSection crit_;
|
RecursiveCriticalSection crit_;
|
||||||
#if defined(WEBRTC_WIN)
|
#if defined(WEBRTC_WIN)
|
||||||
const WSAEVENT socket_ev_;
|
const WSAEVENT socket_ev_;
|
||||||
#endif
|
#endif
|
||||||
bool fWait_;
|
bool fWait_;
|
||||||
|
// Are we currently in a select()/epoll()/WSAWaitForMultipleEvents loop?
|
||||||
|
// Used for a DCHECK, because we don't support reentrant waiting.
|
||||||
|
bool waiting_ = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
|
class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
|
||||||
|
|
|
@ -381,6 +381,15 @@ TEST_F(PhysicalSocketTest, TestCloseInClosedCallbackIPv6) {
|
||||||
SocketTest::TestCloseInClosedCallbackIPv6();
|
SocketTest::TestCloseInClosedCallbackIPv6();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(PhysicalSocketTest, TestDeleteInReadCallbackIPv4) {
|
||||||
|
MAYBE_SKIP_IPV4;
|
||||||
|
SocketTest::TestDeleteInReadCallbackIPv4();
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(PhysicalSocketTest, TestDeleteInReadCallbackIPv6) {
|
||||||
|
SocketTest::TestDeleteInReadCallbackIPv6();
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(PhysicalSocketTest, TestSocketServerWaitIPv4) {
|
TEST_F(PhysicalSocketTest, TestSocketServerWaitIPv4) {
|
||||||
MAYBE_SKIP_IPV4;
|
MAYBE_SKIP_IPV4;
|
||||||
SocketTest::TestSocketServerWaitIPv4();
|
SocketTest::TestSocketServerWaitIPv4();
|
||||||
|
|
|
@ -149,6 +149,15 @@ void SocketTest::TestCloseInClosedCallbackIPv6() {
|
||||||
CloseInClosedCallbackInternal(kIPv6Loopback);
|
CloseInClosedCallbackInternal(kIPv6Loopback);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void SocketTest::TestDeleteInReadCallbackIPv4() {
|
||||||
|
DeleteInReadCallbackInternal(kIPv4Loopback);
|
||||||
|
}
|
||||||
|
|
||||||
|
void SocketTest::TestDeleteInReadCallbackIPv6() {
|
||||||
|
MAYBE_SKIP_IPV6;
|
||||||
|
DeleteInReadCallbackInternal(kIPv6Loopback);
|
||||||
|
}
|
||||||
|
|
||||||
void SocketTest::TestSocketServerWaitIPv4() {
|
void SocketTest::TestSocketServerWaitIPv4() {
|
||||||
SocketServerWaitInternal(kIPv4Loopback);
|
SocketServerWaitInternal(kIPv4Loopback);
|
||||||
}
|
}
|
||||||
|
@ -651,6 +660,42 @@ void SocketTest::CloseInClosedCallbackInternal(const IPAddress& loopback) {
|
||||||
EXPECT_TRUE(Socket::CS_CLOSED == client->GetState());
|
EXPECT_TRUE(Socket::CS_CLOSED == client->GetState());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Helper class specifically for the test below.
|
||||||
|
class SocketDeleter : public sigslot::has_slots<> {
|
||||||
|
public:
|
||||||
|
explicit SocketDeleter(std::unique_ptr<AsyncSocket> socket)
|
||||||
|
: socket_(std::move(socket)) {}
|
||||||
|
|
||||||
|
void Delete(AsyncSocket* other) { socket_.reset(); }
|
||||||
|
|
||||||
|
bool deleted() const { return socket_ == nullptr; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::unique_ptr<AsyncSocket> socket_;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Tested deleting a socket within another socket's read callback. A previous
|
||||||
|
// iteration of the select loop failed in this situation, if both sockets
|
||||||
|
// became readable at the same time.
|
||||||
|
void SocketTest::DeleteInReadCallbackInternal(const IPAddress& loopback) {
|
||||||
|
std::unique_ptr<AsyncSocket> socket1(
|
||||||
|
ss_->CreateAsyncSocket(loopback.family(), SOCK_DGRAM));
|
||||||
|
std::unique_ptr<AsyncSocket> socket2(
|
||||||
|
ss_->CreateAsyncSocket(loopback.family(), SOCK_DGRAM));
|
||||||
|
EXPECT_EQ(0, socket1->Bind(SocketAddress(loopback, 0)));
|
||||||
|
EXPECT_EQ(0, socket2->Bind(SocketAddress(loopback, 0)));
|
||||||
|
EXPECT_EQ(3, socket1->SendTo("foo", 3, socket1->GetLocalAddress()));
|
||||||
|
EXPECT_EQ(3, socket2->SendTo("bar", 3, socket1->GetLocalAddress()));
|
||||||
|
// Sleep a while to ensure sends are both completed at the same time.
|
||||||
|
Thread::SleepMs(1000);
|
||||||
|
|
||||||
|
// Configure the helper class to delete socket 2 when socket 1 has a read
|
||||||
|
// event.
|
||||||
|
SocketDeleter deleter(std::move(socket2));
|
||||||
|
socket1->SignalReadEvent.connect(&deleter, &SocketDeleter::Delete);
|
||||||
|
EXPECT_TRUE_WAIT(deleter.deleted(), kTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
class Sleeper : public MessageHandler {
|
class Sleeper : public MessageHandler {
|
||||||
public:
|
public:
|
||||||
void OnMessage(Message* msg) override { Thread::Current()->SleepMs(500); }
|
void OnMessage(Message* msg) override { Thread::Current()->SleepMs(500); }
|
||||||
|
|
|
@ -46,6 +46,8 @@ class SocketTest : public ::testing::Test {
|
||||||
void TestServerCloseIPv6();
|
void TestServerCloseIPv6();
|
||||||
void TestCloseInClosedCallbackIPv4();
|
void TestCloseInClosedCallbackIPv4();
|
||||||
void TestCloseInClosedCallbackIPv6();
|
void TestCloseInClosedCallbackIPv6();
|
||||||
|
void TestDeleteInReadCallbackIPv4();
|
||||||
|
void TestDeleteInReadCallbackIPv6();
|
||||||
void TestSocketServerWaitIPv4();
|
void TestSocketServerWaitIPv4();
|
||||||
void TestSocketServerWaitIPv6();
|
void TestSocketServerWaitIPv6();
|
||||||
void TestTcpIPv4();
|
void TestTcpIPv4();
|
||||||
|
@ -83,6 +85,7 @@ class SocketTest : public ::testing::Test {
|
||||||
void ClientCloseDuringConnectInternal(const IPAddress& loopback);
|
void ClientCloseDuringConnectInternal(const IPAddress& loopback);
|
||||||
void ServerCloseInternal(const IPAddress& loopback);
|
void ServerCloseInternal(const IPAddress& loopback);
|
||||||
void CloseInClosedCallbackInternal(const IPAddress& loopback);
|
void CloseInClosedCallbackInternal(const IPAddress& loopback);
|
||||||
|
void DeleteInReadCallbackInternal(const IPAddress& loopback);
|
||||||
void SocketServerWaitInternal(const IPAddress& loopback);
|
void SocketServerWaitInternal(const IPAddress& loopback);
|
||||||
void SingleFlowControlCallbackInternal(const IPAddress& loopback);
|
void SingleFlowControlCallbackInternal(const IPAddress& loopback);
|
||||||
void UdpInternal(const IPAddress& loopback);
|
void UdpInternal(const IPAddress& loopback);
|
||||||
|
|
Loading…
Reference in a new issue