/*
 *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
 *
 *  Use of this source code is governed by a BSD-style license
 *  that can be found in the LICENSE file in the root of the source
 *  tree. An additional intellectual property rights grant can be found
 *  in the file PATENTS.  All contributing project authors may
 *  be found in the AUTHORS file in the root of the source tree.
 */

#include "rtc_base/message_queue.h"

#include <functional>

#include "rtc_base/atomic_ops.h"
#include "rtc_base/bind.h"
#include "rtc_base/event.h"
#include "rtc_base/gunit.h"
#include "rtc_base/logging.h"
#include "rtc_base/null_socket_server.h"
#include "rtc_base/ref_count.h"
#include "rtc_base/ref_counted_object.h"
#include "rtc_base/thread.h"
#include "rtc_base/time_utils.h"

namespace rtc {
namespace {

class MessageQueueTest : public ::testing::Test, public MessageQueue {
 public:
  MessageQueueTest() : MessageQueue(SocketServer::CreateDefault(), true) {}
  bool IsLocked_Worker() {
    if (!crit_.TryEnter()) {
      return true;
    }
    crit_.Leave();
    return false;
  }
  bool IsLocked() {
    // We have to do this on a worker thread, or else the TryEnter will
    // succeed, since our critical sections are reentrant.
    std::unique_ptr<Thread> worker(Thread::CreateWithSocketServer());
    worker->Start();
    return worker->Invoke<bool>(
        RTC_FROM_HERE, rtc::Bind(&MessageQueueTest::IsLocked_Worker, this));
  }
};

struct DeletedLockChecker {
  DeletedLockChecker(MessageQueueTest* test, bool* was_locked, bool* deleted)
      : test(test), was_locked(was_locked), deleted(deleted) {}
  ~DeletedLockChecker() {
    *deleted = true;
    *was_locked = test->IsLocked();
  }
  MessageQueueTest* test;
  bool* was_locked;
  bool* deleted;
};

static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(
    MessageQueue* q) {
  EXPECT_TRUE(q != nullptr);
  int64_t now = TimeMillis();
  q->PostAt(RTC_FROM_HERE, now, nullptr, 3);
  q->PostAt(RTC_FROM_HERE, now - 2, nullptr, 0);
  q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 1);
  q->PostAt(RTC_FROM_HERE, now, nullptr, 4);
  q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 2);

  Message msg;
  for (size_t i = 0; i < 5; ++i) {
    memset(&msg, 0, sizeof(msg));
    EXPECT_TRUE(q->Get(&msg, 0));
    EXPECT_EQ(i, msg.message_id);
  }

  EXPECT_FALSE(q->Get(&msg, 0));  // No more messages
}

TEST_F(MessageQueueTest,
       DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder) {
  MessageQueue q(SocketServer::CreateDefault(), true);
  DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q);

  NullSocketServer nullss;
  MessageQueue q_nullss(&nullss, true);
  DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q_nullss);
}

TEST_F(MessageQueueTest, DisposeNotLocked) {
  bool was_locked = true;
  bool deleted = false;
  DeletedLockChecker* d = new DeletedLockChecker(this, &was_locked, &deleted);
  Dispose(d);
  Message msg;
  EXPECT_FALSE(Get(&msg, 0));
  EXPECT_TRUE(deleted);
  EXPECT_FALSE(was_locked);
}

class DeletedMessageHandler : public MessageHandler {
 public:
  explicit DeletedMessageHandler(bool* deleted) : deleted_(deleted) {}
  ~DeletedMessageHandler() override { *deleted_ = true; }
  void OnMessage(Message* msg) override {}

 private:
  bool* deleted_;
};

TEST_F(MessageQueueTest, DiposeHandlerWithPostedMessagePending) {
  bool deleted = false;
  DeletedMessageHandler* handler = new DeletedMessageHandler(&deleted);
  // First, post a dispose.
  Dispose(handler);
  // Now, post a message, which should *not* be returned by Get().
  Post(RTC_FROM_HERE, handler, 1);
  Message msg;
  EXPECT_FALSE(Get(&msg, 0));
  EXPECT_TRUE(deleted);
}

// Ensure that ProcessAllMessageQueues does its essential function; process
// all messages (both delayed and non delayed) up until the current time, on
// all registered message queues.
TEST(MessageQueueManager, ProcessAllMessageQueues) {
  Event entered_process_all_message_queues(true, false);
  auto a = Thread::CreateWithSocketServer();
  auto b = Thread::CreateWithSocketServer();
  a->Start();
  b->Start();

  volatile int messages_processed = 0;
  FunctorMessageHandler<void, std::function<void()>> incrementer(
      [&messages_processed, &entered_process_all_message_queues] {
        // Wait for event as a means to ensure Increment doesn't occur outside
        // of ProcessAllMessageQueues. The event is set by a message posted to
        // the main thread, which is guaranteed to be handled inside
        // ProcessAllMessageQueues.
        entered_process_all_message_queues.Wait(Event::kForever);
        AtomicOps::Increment(&messages_processed);
      });
  FunctorMessageHandler<void, std::function<void()>> event_signaler(
      [&entered_process_all_message_queues] {
        entered_process_all_message_queues.Set();
      });

  // Post messages (both delayed and non delayed) to both threads.
  a->Post(RTC_FROM_HERE, &incrementer);
  b->Post(RTC_FROM_HERE, &incrementer);
  a->PostDelayed(RTC_FROM_HERE, 0, &incrementer);
  b->PostDelayed(RTC_FROM_HERE, 0, &incrementer);
  rtc::Thread::Current()->Post(RTC_FROM_HERE, &event_signaler);

  MessageQueueManager::ProcessAllMessageQueuesForTesting();
  EXPECT_EQ(4, AtomicOps::AcquireLoad(&messages_processed));
}

// Test that ProcessAllMessageQueues doesn't hang if a thread is quitting.
TEST(MessageQueueManager, ProcessAllMessageQueuesWithQuittingThread) {
  auto t = Thread::CreateWithSocketServer();
  t->Start();
  t->Quit();
  MessageQueueManager::ProcessAllMessageQueuesForTesting();
}

// Test that ProcessAllMessageQueues doesn't hang if a queue clears its
// messages.
TEST(MessageQueueManager, ProcessAllMessageQueuesWithClearedQueue) {
  Event entered_process_all_message_queues(true, false);
  auto t = Thread::CreateWithSocketServer();
  t->Start();

  FunctorMessageHandler<void, std::function<void()>> clearer(
      [&entered_process_all_message_queues] {
        // Wait for event as a means to ensure Clear doesn't occur outside of
        // ProcessAllMessageQueues. The event is set by a message posted to the
        // main thread, which is guaranteed to be handled inside
        // ProcessAllMessageQueues.
        entered_process_all_message_queues.Wait(Event::kForever);
        rtc::Thread::Current()->Clear(nullptr);
      });
  FunctorMessageHandler<void, std::function<void()>> event_signaler(
      [&entered_process_all_message_queues] {
        entered_process_all_message_queues.Set();
      });

  // Post messages (both delayed and non delayed) to both threads.
  t->Post(RTC_FROM_HERE, &clearer);
  rtc::Thread::Current()->Post(RTC_FROM_HERE, &event_signaler);
  MessageQueueManager::ProcessAllMessageQueuesForTesting();
}

class RefCountedHandler : public MessageHandler, public rtc::RefCountInterface {
 public:
  void OnMessage(Message* msg) override {}
};

class EmptyHandler : public MessageHandler {
 public:
  void OnMessage(Message* msg) override {}
};

TEST(MessageQueueManager, ClearReentrant) {
  std::unique_ptr<Thread> t(Thread::Create());
  EmptyHandler handler;
  RefCountedHandler* inner_handler(
      new rtc::RefCountedObject<RefCountedHandler>());
  // When the empty handler is destroyed, it will clear messages queued for
  // itself. The message to be cleared itself wraps a MessageHandler object
  // (RefCountedHandler) so this will cause the message queue to be cleared
  // again in a re-entrant fashion, which previously triggered a DCHECK.
  // The inner handler will be removed in a re-entrant fashion from the
  // message queue of the thread while the outer handler is removed, verifying
  // that the iterator is not invalidated in "MessageQueue::Clear".
  t->Post(RTC_FROM_HERE, inner_handler, 0);
  t->Post(RTC_FROM_HERE, &handler, 0,
          new ScopedRefMessageData<RefCountedHandler>(inner_handler));
}

}  // namespace
}  // namespace rtc