原文出处:WebRTC源码分析-线程基础之MessageQueueManager

前言

正如其名,MessageQueueManager类(后续简写为MQM)提供了MessageQueue(简写为MQ)的管理功能。在之前的文章中已经分析过,MQ在构建时会调用MQ.DoInit()方法,该方法将MQ添加到MQM的内部std::Vector成员中。

MQM类的声明和定义分别在rtc_base/message_queue.h以及rtc_base/message_queue.cc中,其定义如下所示

// MessageQueueManager does cleanup of of message queues
class MessageQueueManager {
 public:
  static void Add(MessageQueue* message_queue);
  static void Remove(MessageQueue* message_queue);
  static void Clear(MessageHandler* handler);

  // TODO(nisse): Delete alias, as soon as downstream code is updated.
  static void ProcessAllMessageQueues() { ProcessAllMessageQueuesForTesting(); }

  // For testing purposes, for use with a simulated clock.
  // Ensures that all message queues have processed delayed messages
  // up until the current point in time.
  static void ProcessAllMessageQueuesForTesting();

 private:
  static MessageQueueManager* Instance();
  MessageQueueManager();
  ~MessageQueueManager();

  void AddInternal(MessageQueue* message_queue);
  void RemoveInternal(MessageQueue* message_queue);
  void ClearInternal(MessageHandler* handler);
  void ProcessAllMessageQueuesInternal();

  // This list contains all live MessageQueues.
  std::vector<MessageQueue*> message_queues_ RTC_GUARDED_BY(crit_);

  // Methods that don't modify the list of message queues may be called in a
  // re-entrant fashion. "processing_" keeps track of the depth of re-entrant
  // calls.
  CriticalSection crit_;
  size_t processing_ RTC_GUARDED_BY(crit_);
};

MessageQueueManager的构造

MessageQueueManager的构造方式与ThreadManager一样,都是单例模式,都是非安全的。之前分析过ThreadManager为什么能够安全的构造,MessageQueueManager原理一样,并且MessageQueueManager对象的创建先于第一个MessageQueue对象。

MessageQueueManager* MessageQueueManager::Instance() {
  static MessageQueueManager* const instance = new MessageQueueManager;
  return instance;
}

MessageQueueManager::MessageQueueManager() : processing_(0) {}
MessageQueueManager::~MessageQueueManager() {}

MessageQueue的添加与移除

MessageQueueManager提供了Add与Remove的静态函数来往单例的管理类中添加和删除MQ,具体如下源码所示:

void MessageQueueManager::Add(MessageQueue* message_queue) {
  return Instance()->AddInternal(message_queue);
}
void MessageQueueManager::AddInternal(MessageQueue* message_queue) {
  CritScope cs(&crit_);
  // Prevent changes while the list of message queues is processed.
  RTC_DCHECK_EQ(processing_, 0);
  message_queues_.push_back(message_queue);
}

void MessageQueueManager::Remove(MessageQueue* message_queue) {
  return Instance()->RemoveInternal(message_queue);
}
void MessageQueueManager::RemoveInternal(MessageQueue* message_queue) {
  {
    CritScope cs(&crit_);
    // Prevent changes while the list of message queues is processed.
    RTC_DCHECK_EQ(processing_, 0);
    std::vector<MessageQueue*>::iterator iter;
    iter = std::find(message_queues_.begin(), message_queues_.end(),
                     message_queue);
    if (iter != message_queues_.end()) {
      message_queues_.erase(iter);
    }
  }
}

添加和删除方法对外都是以静态方法提供,通过调用MQM的单实例的对应的私有方法来实现往向量Vector中添加和删除MQ,需要说明的注意点有以下几个:

1)成员crit_是临界区类CriticalSection的对象,该成员保证多线程环境下MQM.message_queues_以及MQM.processing_访问安全,正如上面两个函数所示,函数开头创建CritScope cs(&crit_); 在cs的构造函数中调用crit_->Enter()表示进入临界区,相当于上锁。利用函数结束后cs对象的析构中调用crit_->Leave()表示离开临界区,相当于解锁。

2)存储MQ的向量声明为:std::vector<MessageQueue*> message_queues_ RTC_GUARDED_BY(crit_); 其中RTC_GUARDED_BY宏在clang编译器下展开为attribute(guarded_by(crit_)),指示编译器在编译过程中检查代码中所有访问message_queues_的各个路径上是否都先获取了锁crit_,如果没有就会在编译过程中产生错误或者警告。而对于其他编译器,该宏不起任何作用,意味着不会在编译期进行检查。详见 Thread Safety Analysis

3)成员processing_声明为:size_t processing_ RTC_GUARDED_BY(crit_); Add与Remove函数中执行了RTC_DCHECK_EQ(processing_, 0)断言,必须确保processing_为0。当processing_不为0时,要么在执行MQM的Clear()方法,要么在执行ProcessAllMessageQueues(),这些操作此时,是不允许往MQM添加MQ或者删除MQ这种会改变Vector列表的操作,因为前面的两个函数一般都会要遍历Vector。思考一点,不是已经上锁保证线程安全了嘛,为啥还要保证processing_为0呢?继续往下看吧~~

清理

与Add和Remove方式一摸一样,Clear函数也是以静态方法的形式对外提供。Clear函数的作用是从MQM所管理的所有MQ中删除与入参MessageHandler* handler匹配消息。具体而言就是遍历MQM中的MQ,然后调用MQ本身的Clear()方法,这个方法比较冗长,此处就不展开叙述,将会在介绍MQ的文章中详细描述。

void MessageQueueManager::Clear(MessageHandler* handler) {
  return Instance()->ClearInternal(handler);
}

void MessageQueueManager::ClearInternal(MessageHandler* handler) {
  // Deleted objects may cause re-entrant calls to ClearInternal. This is
  // allowed as the list of message queues does not change while queues are
  // cleared.
  MarkProcessingCritScope cs(&crit_, &processing_);
  for (MessageQueue* queue : message_queues_) {
    queue->Clear(handler);
  }
}

另外很重要的一点,该方法并没有使用前文所述的CritScopecs(&crit_)来实现线程安全,而是使用了一个新的类MarkProcessingCritScope cs(&crit_, &processing_),有什么神奇的地方?且看源码:

class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
 public:
  MarkProcessingCritScope(const CriticalSection* cs, size_t* processing)
      RTC_EXCLUSIVE_LOCK_FUNCTION(cs)
      : cs_(cs), processing_(processing) {
    cs_->Enter();
    *processing_ += 1;
  }
  ~MarkProcessingCritScope() RTC_UNLOCK_FUNCTION() {
    *processing_ -= 1;
    cs_->Leave();
  }
 private:
  const CriticalSection* const cs_;
  size_t* processing_;
  RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope);
};

首先要知道CriticalSection是可重入的,也即一个线程上调用cs_->Enter()上锁之后,在释放锁之前,同一个线程可以反复调用cs_->Enter()而不会阻塞,因此被称为“可重入锁”。同一个线程上锁一次,processing_就增1,记录上锁次数,只要processing_不为0,表示我正在Clear操作或者后文的Process*方法,这两个方法不会改变MQM中Vector列表,因此,可以在解锁之前,重入进行反复操作,但是不允许Add和Remove操作,因为其会改变Vector,这就是为什么Add和Remove函数中既加锁了,还要断言processing_必须为0,否则代码就是写得有Bug了。

处理所有MQ中的消息

这个方法目前还没有完全的理解,首先说下我自己的分析。

static void ProcessAllMessageQueues() { ProcessAllMessageQueuesForTesting(); }

void MessageQueueManager::ProcessAllMessageQueuesForTesting() {
  return Instance()->ProcessAllMessageQueuesInternal();
}

void MessageQueueManager::ProcessAllMessageQueuesInternal() {
  // This works by posting a delayed message at the current time and waiting
  // for it to be dispatched on all queues, which will ensure that all messages
  // that came before it were also dispatched.
  volatile int queues_not_done = 0;

  // This class is used so that whether the posted message is processed, or the
  // message queue is simply cleared, queues_not_done gets decremented.
  class ScopedIncrement : public MessageData {
   public:
    ScopedIncrement(volatile int* value) : value_(value) {
      AtomicOps::Increment(value_);
    }
    ~ScopedIncrement() override { AtomicOps::Decrement(value_); }

   private:
    volatile int* value_;
  };

  {
    MarkProcessingCritScope cs(&crit_, &processing_);
    for (MessageQueue* queue : message_queues_) {
      if (!queue->IsProcessingMessagesForTesting()) {
        // If the queue is not processing messages, it can
        // be ignored. If we tried to post a message to it, it would be dropped
        // or ignored.
        continue;
      }
      queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE,
                         new ScopedIncrement(&queues_not_done));
    }
  }
  rtc::Thread* current = rtc::Thread::Current();
  // Note: One of the message queues may have been on this thread, which is
  // why we can't synchronously wait for queues_not_done to go to 0; we need
  // to process messages as well.
  while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
    if (current) {
      current->ProcessMessages(0);
    }
  }
}

总结


原文出处:WebRTC源码分析-线程基础之Message && MessageData && MessageHandler

前言

本文将介绍消息循环中的消息(Message),消息中持有的数据(MessageData),处理消息的Handler(MessageHandler)的基本内容。

其中Message与MessageData相关的结构体位于rtc_base/message_queue.h中,MessageHandler相关的类位于rtc_base/message_handler.h

消息Message

WebRTC中消息相关的类分为两种,一种是Message,表征的是即时消息,投放到消息循环中期待能被立马消费;另外一种是DelayedMessage,表征的是延迟消息,投放到消息循环中不会立马被消费,而是延迟一段时间才会被消费。

Message 源码如下所示

struct Message {
  Message()
      : phandler(nullptr), message_id(0), pdata(nullptr), ts_sensitive(0) {}
  inline bool Match(MessageHandler* handler, uint32_t id) const {
    return (handler == nullptr || handler == phandler) &&
            (id == MQID_ANY || id == message_id);
  }
  Location posted_from;             // 消息自哪儿产生
  MessageHandler* phandler;    // 消息如何处理
  uint32_t message_id;              // 消息id
  MessageData* pdata;              // 消息中携带的数据
  int64_t ts_sensitive;                 // 消息产生的时间
};

Message结构体成员有如下几种:

Message结构体还提供了两个方法。

DelayedMessage源码如下

// DelayedMessage goes into a priority queue, sorted by trigger time.  Messages
// with the same trigger time are processed in num_ (FIFO) order.
class DelayedMessage {
  public:
  DelayedMessage(int64_t delay,
                  int64_t trigger,
                  uint32_t num,
                  const Message& msg)
      : cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) {}

  bool operator<(const DelayedMessage& dmsg) const {
    return (dmsg.msTrigger_ < msTrigger_) ||
            ((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_));
  }

  int64_t cmsDelay_;  // for debugging
  int64_t msTrigger_;
  uint32_t num_;
  Message msg_;
};

DelayedMessage与Message并非是继承is-a关系,而是has-a的关系。提供了除Message msg_成员之外的另几个成员:

由于DelayedMessage在MQ中会放至于专门的优先级队列中,优先级如何确定?DelayedMessage重载了小于"<"运算符以确定排序,先按消息需要被处理的触发时间戳msTrigger_排序;若触发时间相同,则按消息序号排序。当某个延迟消息要投放到延迟队列中时,该优先级队列会根据“<”运算符去比较这个延迟消息与队列中的消息以确定应在队列的哪个位置插入该消息。

MessageList被定义为std::list, MessageQueue中的即时消息就存储于MessageList类别的列表中,按照先入先出的方式挨个进行处理。

typedef std::list<Message> MessageList;

消息MessgeData

MessgeData 消息数据基类,其析构函数被定义为virtual类型,以防内存泄漏

class MessageData {
  public:
  MessageData() {}
  virtual ~MessageData() {}
};

TypedMessageData 使用模板定义的MessageData 的一个子类,便于扩展。

template <class T>
class TypedMessageData : public MessageData {
  public:
  explicit TypedMessageData(const T& data) : data_(data) {}
  const T& data() const { return data_; }
  T& data() { return data_; }
  private:
  T data_;
};

ScopedMessageData 类似于 TypedMessageData,用于指针类型。在析构函数中,自动对该指针调用 delete。

// Like TypedMessageData, but for pointers that require a delete.
template <class T>
class ScopedMessageData : public MessageData {
 public:
  explicit ScopedMessageData(std::unique_ptr<T> data)
      : data_(std::move(data)) {}
  // Deprecated.
  // TODO(deadbeef): Remove this once downstream applications stop using it.
  explicit ScopedMessageData(T* data) : data_(data) {}
  // Deprecated.
  // TODO(deadbeef): Returning a reference to a unique ptr? Why. Get rid of
  // this once downstream applications stop using it, then rename inner_data to
  // just data.
  const std::unique_ptr<T>& data() const { return data_; }
  std::unique_ptr<T>& data() { return data_; }

  const T& inner_data() const { return *data_; }
  T& inner_data() { return *data_; }

 private:
  std::unique_ptr<T> data_;
};

ScopedRefMessageData 类似于ScopedMessageData,用于引用计数的指针类型。

// Like ScopedMessageData, but for reference counted pointers.
template <class T>
class ScopedRefMessageData : public MessageData {
  public:
  explicit ScopedRefMessageData(T* data) : data_(data) {}
  const scoped_refptr<T>& data() const { return data_; }
  scoped_refptr<T>& data() { return data_; }

  private:
  scoped_refptr<T> data_;
};

DisposeData 这个值得注意下,和前面几个MessageData子类不一样,该对象并没有提供其内部数据data_的方法,那么该对象正如其名,持有需要进行销毁的数据。在哪些场景下使用?

template <class T>
class DisposeData : public MessageData {
 public:
  explicit DisposeData(T* data) : data_(data) {}
  virtual ~DisposeData() { delete data_; }

 private:
  T* data_;
};

WrapMessageData && UseMessageData 两个模板方法,分别用来将数据封装成MessageData以及取出对应的数据,封装拆箱操作。

template <class T>
inline MessageData* WrapMessageData(const T& data) {
  return new TypedMessageData<T>(data);
}

template <class T>
inline const T& UseMessageData(MessageData* data) {
  return static_cast<TypedMessageData<T>*>(data)->data();
}

消息处理器MessageHandler

MessageHandler 消息处理器的基类,子类在继承了该类之后要重载 OnMessage 函数,在其中实现消息响应的逻辑。

class MessageHandler {
 public:
  virtual ~MessageHandler();
  virtual void OnMessage(Message* msg) = 0;
 protected:
  MessageHandler() {}
 private:
  RTC_DISALLOW_COPY_AND_ASSIGN(MessageHandler);
};

FunctorMessageHandler MessageHandler的模板子类,用于帮助实现阻塞地跨线程在指定线程上执行某个方法,并可获取执行结果。Thread的Invoke()方法使用该类。该类的构造函数中使用了C++11的右值引用,转发语义std::forward,以及转移语义std::move。详见博客C++11 std::move和std::forward

// Helper class to facilitate executing a functor on a thread.
template <class ReturnT, class FunctorT>
class FunctorMessageHandler : public MessageHandler {
  public:
  explicit FunctorMessageHandler(FunctorT&& functor)
      : functor_(std::forward<FunctorT>(functor)) {}
  virtual void OnMessage(Message* msg) { result_ = functor_(); }
  const ReturnT& result() const { return result_; }
  // Returns moved result. Should not call result() or MoveResult() again
  // after this.
  ReturnT MoveResult() { return std::move(result_); }
  private:
  FunctorT functor_;
  ReturnT result_;
};

无返回值特例FunctorMessageHandler 返回值类型为 void 的函数的FunctorMessageHandler特化版本

// Specialization for ReturnT of void.
template <class FunctorT>
class FunctorMessageHandler<void, FunctorT> : public MessageHandler {
  public:
  explicit FunctorMessageHandler(FunctorT&& functor)
      : functor_(std::forward<FunctorT>(functor)) {}
  virtual void OnMessage(Message* msg) { functor_(); }
  void result() const {}
  void MoveResult() {}
  private:
  FunctorT functor_;
};

总结

至此,基本上将MQ相关的“边角料”介绍完毕了,重点的知识再回顾下:


原文出处:WebRTC源码分析-线程基础之MessageQueue

前言

MessageQueue提供了两方面的功能,消息循环中的消息队列功能以及通过持有SocketServer对象带来的IO多路复用功能。在MessageQueue内部这两部分功能不是完全孤立的,而是相互配合在一起使用。尤其是在MessageQueue的核心方法Get()中体现得淋漓尽致。

MessageQueue的实现位于rtc_base/message_queue.h以及rtc_base/message_queue.cc中,其声明如下:

class MessageQueue {
 public:
  static const int kForever = -1;

  MessageQueue(SocketServer* ss, bool init_queue);
  MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue);
  virtual ~MessageQueue();

  SocketServer* socketserver();

  virtual void Quit();
  virtual bool IsQuitting();
  virtual void Restart();
  virtual bool IsProcessingMessagesForTesting();

  virtual bool Get(Message* pmsg, int cmsWait = kForever, bool process_io = true);
  virtual bool Peek(Message* pmsg, int cmsWait = 0);
  virtual void Post(const Location& posted_from, MessageHandler* phandler,
                    uint32_t id = 0, MessageData* pdata = nullptr, bool time_sensitive = false);
  virtual void PostDelayed(const Location& posted_from, int cmsDelay,
                           MessageHandler* phandler, uint32_t id = 0, MessageData* pdata = nullptr);
  virtual void PostAt(const Location& posted_from, int64_t tstamp,
                      MessageHandler* phandler, uint32_t id = 0, MessageData* pdata = nullptr);
  // TODO(honghaiz): Remove this when all the dependencies are removed.
  virtual void PostAt(const Location& posted_from, uint32_t tstamp,
                      MessageHandler* phandler, uint32_t id = 0, MessageData* pdata = nullptr);
  virtual void Clear(MessageHandler* phandler, uint32_t id = MQID_ANY,
                              MessageList* removed = nullptr);
  virtual void Dispatch(Message* pmsg);
  virtual void ReceiveSends();

  // Amount of time until the next message can be retrieved
  virtual int GetDelay();

  bool empty() const { return size() == 0u; }
  size_t size() const {
    CritScope cs(&crit_);  // msgq_.size() is not thread safe.
    return msgq_.size() + dmsgq_.size() + (fPeekKeep_ ? 1u : 0u);
  }

  // Internally posts a message which causes the doomed object to be deleted
  template <class T>
  void Dispose(T* doomed) {
    if (doomed) {
      Post(RTC_FROM_HERE, nullptr, MQID_DISPOSE, new DisposeData<T>(doomed));
    }
  }

  sigslot::signal0<> SignalQueueDestroyed;

 protected:
  class PriorityQueue : public std::priority_queue<DelayedMessage> {
   public:
    container_type& container() { return c; }
    void reheap() { make_heap(c.begin(), c.end(), comp); }
  };

  void DoDelayPost(const Location& posted_from, int64_t cmsDelay, int64_t tstamp, 
                   MessageHandler* phandler, uint32_t id, MessageData* pdata);

  void DoInit();

  void ClearInternal(MessageHandler* phandler, uint32_t id,
                     MessageList* removed) RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_);

  void DoDestroy() RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_);

  void WakeUpSocketServer();

  bool fPeekKeep_;
  Message msgPeek_;
  MessageList msgq_ RTC_GUARDED_BY(crit_);
  PriorityQueue dmsgq_ RTC_GUARDED_BY(crit_);
  uint32_t dmsgq_next_num_ RTC_GUARDED_BY(crit_);
  CriticalSection crit_;
  bool fInitialized_;
  bool fDestroyed_;

 private:
  volatile int stop_;
  SocketServer* const ss_;
  std::unique_ptr<SocketServer> own_ss_;

  RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(MessageQueue);
};

MQ的基本成员

根据功能划分,可以将MQ的基本成员分为三类

MQ状态指示:

消息循环相关:MQ中有3个地方存储了消息:msgPeek_msgq_dmsgq_

IO多路复用相关:

MQ的构造及析构

构造 做了这么几件事:初始化所有的成员;断言ss不能传空指针,将MQ的指针传递给ss使得二者相互持有相互访问;将MQ加入到MQM的管理指针,fInitialized_标志置为true,指示该MQ已经初始化完成。

MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
    : fPeekKeep_(false), dmsgq_next_num_(0), fInitialized_(false),
      fDestroyed_(false), stop_(0), ss_(ss) {
  RTC_DCHECK(ss);
  ss_->SetMessageQueue(this);
  if (init_queue) {
    DoInit();
  }
}

MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue)
    : MessageQueue(ss.get(), init_queue) {
  own_ss_ = std::move(ss);
}

void MessageQueue::DoInit() {
  if (fInitialized_) {
    return;
  }
  fInitialized_ = true;
  MessageQueueManager::Add(this);
}

析构 基本上是构造的逆操作:设置fDestroyed_为true,表示MQ被销毁;发送信号SignalQueueDestroyed()告知关注了MQ的对象不要再访问该MQ了;从MQM中移除自己;清理MQ中的所有消息;从ss中移除MQ的指针。

MessageQueue::~MessageQueue() {
  DoDestroy();
}

void MessageQueue::DoDestroy() {
  if (fDestroyed_) {
    return;
  }
  fDestroyed_ = true;
  // The signal is done from here to ensure
  // that it always gets called when the queue
  // is going away.
  SignalQueueDestroyed();
  MessageQueueManager::Remove(this);
  ClearInternal(nullptr, MQID_ANY, nullptr);

  if (ss_) {
    ss_->SetMessageQueue(nullptr);
  }
}

MQ的Size

由于MQ有3个地方存储了消息,一个是Peek消息msgPeek_,一个即时消息队列msgq_,一个是延迟消息队列dmsgq_。那么计算MQ中存储的消息个数时,这三个地方都得算上。

bool empty() const { return size() == 0u; }

size_t size() const {
  CritScope cs(&crit_);  // msgq_.size() is not thread safe.
  return msgq_.size() + dmsgq_.size() + (fPeekKeep_ ? 1u : 0u);
}

MQ的运行状态

MQ的运行状态由成员volatile int stop来指示,相关的函数有以下几个,如源码所示。方法都非常简单,无非就是对`stop`变量进行原子性的置1和置0,本身就是线程安全的,所有并没有上锁。另外需要知道的几点如下:

void MessageQueue::Quit() {
  AtomicOps::ReleaseStore(&stop_, 1);
  WakeUpSocketServer();
}

bool MessageQueue::IsQuitting() {
  return AtomicOps::AcquireLoad(&stop_) != 0;
}

bool MessageQueue::IsProcessingMessagesForTesting() {
  return !IsQuitting();
}

void MessageQueue::Restart() {
  AtomicOps::ReleaseStore(&stop_, 0);
}

消息获取

MQ中消息获取相关的函数有两个,Peek()与Get(),其中Get()是核心内容,再看Get()方法之前,先看看Peek()方法干了啥~~

Peek(): 简而言之就是查看之前是否已经Peek过一个MSG到成员msgPeek_中,若已经Peek过一个则直接将该消息返回;若没有,则通过Get()方法从消息队列中取出一个消息,成功则将该消息交给msgPeek_成员,并将fPeekKeep_标志置为true。

bool MessageQueue::Peek(Message* pmsg, int cmsWait) {
  // fPeekKeep_为真,表示已经Peek过一个MSG到msgPeek_
  // 直接将该MSG返回
  if (fPeekKeep_) {
    *pmsg = msgPeek_;
    return true;
  }
  // 若没有之前没有Peek过一个MSG
  if (!Get(pmsg, cmsWait))
    return false;
  //将Get到的消息放在msgPeek_中保存,并设置标志位
  msgPeek_ = *pmsg;
  fPeekKeep_ = true;
  return true;
}

Get():方法的声明如下所示,注释说明了Get()方法的内部算法的流程,Get()方法会阻塞的处理IO,直到有消息可以处理 或者 cmsWait时间已经过去 或者 Stop()方法被调用。

// Get() will process I/O until:
//  1) A message is available (returns true)
//  2) cmsWait seconds have elapsed (returns false)
//  3) Stop() is called (returns false)
virtual bool Get(Message* pmsg,
                  int cmsWait = kForever,
                  bool process_io = true);

源码如下:

bool MessageQueue::Get(Message* pmsg, int cmsWait, bool process_io) {
  // 是否存在一个Peek过的消息没有被处理?
  // 优先处理该消息                                                                     // 步骤1
  if (fPeekKeep_) {                                                                              
    *pmsg = msgPeek_;
    fPeekKeep_ = false;
    return true;
  }

  int64_t cmsTotal = cmsWait;
  int64_t cmsElapsed = 0;
  int64_t msStart = TimeMillis();
  int64_t msCurrent = msStart;
  while (true) {
    // 检查是否有send消息,若存在,先阻塞处理send消息                                     // 步骤2      
    ReceiveSends();                                                                              

    // 检查所有post消息(即时消息+延迟消息)
    int64_t cmsDelayNext = kForever;
    bool first_pass = true;
    while (true) {
       // 上锁进行消息队列的访问
      {
        CritScope cs(&crit_);
        // 内部第一次循环,先检查延迟消息队列                                           // 步骤3
        if (first_pass) {                                                                        
          first_pass = false;
          // 将延迟消息队列dmsgq_中已经超过触发时间的消息全部取出放入到即时消息队列msgq_中
          // 计算当前时间距离下一个最早将要到达触发时间的消息还有多长时间cmsDelayNext。
          while (!dmsgq_.empty()) {
            if (msCurrent < dmsgq_.top().msTrigger_) {
              cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
              break;
            }
            msgq_.push_back(dmsgq_.top().msg_);
            dmsgq_.pop();
          }
        }
        // 从即时消息队列msgq_队首取出第一个消息                                     // 步骤4
        if (msgq_.empty()) {                                                                  
          break;
        } else {
          *pmsg = msgq_.front();
          msgq_.pop_front();
        }
      }  // crit_ is released here.

      // 如果消息对时间敏感,那么如果超过了最大忍耐时间kMaxMsgLatency才被处理
      // 则打印警告日志
      if (pmsg->ts_sensitive) {
        int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
        if (delay > 0) {
          RTC_LOG_F(LS_WARNING)
              << "id: " << pmsg->message_id
              << "  delay: " << (delay + kMaxMsgLatency) << "ms";
        }
      }
      // 如果取出是需要销毁的消息,则销毁该消息,继续取下一个消息。
      if (MQID_DISPOSE == pmsg->message_id) {
        RTC_DCHECK(nullptr == pmsg->phandler);
        delete pmsg->pdata;
        *pmsg = Message();
        continue;
      }
      return true;
    }

    // 走到这,说明当前没有消息要处理,很可能是处于Quit状态了,先判断一下
    if (IsQuitting())
      break;

    // 计算留给IO处理的时间                                                       // 步骤5
    int64_t cmsNext;                                                                           
    if (cmsWait == kForever) {    //Get无限期,那么距离下个延迟消息的时间就作为本次IO处理时间
      cmsNext = cmsDelayNext;
    } else {      // Get有超时时间,计算本次IO处理时间
      cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);   // 总体来说还剩多少时间
      if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))  // 总体剩余时间和下一个延迟消息触发时间谁先到达?取其短者
        cmsNext = cmsDelayNext;
    }

    {
      // 阻塞处理IO多路复用                                                     // 步骤6
      if (!ss_->Wait(static_cast<int>(cmsNext), process_io))                  
        return false;
    }

    // 计算是否所有时间都已耗尽,是否进入下一个大循环                             // 步骤7
    msCurrent = TimeMillis();                                    
    cmsElapsed = TimeDiff(msCurrent, msStart);
    if (cmsWait != kForever) {
      if (cmsElapsed >= cmsWait)
        return false;
    }
  }
  return false;
}

上述算法过程是整个消息循环的核心内容,如上注释,大概可以分为7个步骤:

  1. 检查Peek消息。先检查之前是否已经Peek过一个消息到msgPeek_还未被处理,若有,当前就处理该消息吧,函数返回~ 若无,继续处理其他消息。
  2. 通过执行 ReceiveSends() 方法来处理所有Send消息。在MQ中该方法为virtual方法,啥也不干,Thread类继承MQ后会实现该方法,在此方法中处理所有Send消息。因此,消息循环中其实优先,阻塞地先处理所有Send消息,实现跨线程的Send消息方法。
  3. 处理Post消息中的延迟消息。从延迟消息队列dmsgq_中取出所有已经到达触发时间点的延迟消息,并塞入即时消息队列msgq_的队尾。同时计算下一个延迟消息还过多久将被触发(如果延迟消息队列中还有未超时的消息),这个时间可能会被作为后续IO多路复用处理的超时时间。这点在redis,nginx上理念一致。
  4. 处理即时消息。取出即时消息队列msgq_的队首消息。若该消息是个要销毁的消息,那么销毁该消息,并取下一个即时消息;若取到一个非要销毁的即时消息,那么就先处理该即时消息吧,函数返回;若本步骤没有取到即时消息,表示当前没有消息要处理,那干点啥好呢~处理网络IO吧
  5. 计算留给网络IO的时间。消息处理才是迫切的,网络IO嘛,看我能给你分配多少时间吧~~ 分两种情形来对待:

1)若外部Get方法无限期,那么下一个延迟消息触发时间到来之前我都可以用来处理IO;若是延迟队列中没有延迟消息呢?也就是消息循环队列中没有任何要处理的消息了,那当然我就可以无限期地,阻塞地将时间都用来处理IO了,直到有消息进入消息队列,将消息循环从IO处理中唤醒为止,继续处理消息。
2)若外部Get方法是有超时时间的,那么我们有必要先计算下已经花费了多长时间,到此刻,我们总共最多还剩多长时间留给IO处理。将总剩余时间跟下一个延迟消息触发时间做个比较,哪个小取哪个作为IO处理的时间;若是延迟队列中没有延迟消息呢?那就将剩下的所有时间都交给IO处理咯,反正也没有消息要处理~

  1. IO多路复用处理。 阻塞地花费上述计算好的时间进行IO处理。过程中要是处理出错,则函数返回;若是处理时间耗完或者时间没有耗完,但是有新消息进入循环了使得阻塞式的IO处理被唤醒,那么进入下个步骤。
  2. 计算剩余时间。既然消息已经被处理完过一次,IO也处理完了,先计算下是不是所有时间都已经耗尽?耗尽时间了,我还没找到可用的即时消息,sorry~函数返回false;没有耗尽的话,那么我们计算下剩余的时间,并将剩余的时间把2-7过程再来一遍吧:处理Send消息,检查延迟消息,检查并返回即时消息,再次计算IO处理时间,IO处理,再次计算剩余时间。什么?为什么没有重复步骤1检查Peek消息?同一个线程中我既然在执行Get,怎么可能Peek嘛,怎么可能又蹦跶出一个Peek消息呢?

消息投递

MQ中消息投递相关的函数有这么几个:Post(),PostDelayed(),两个PostAt(),DoDelayPost()。其中Post()用于投递即时消息;PostDelayed(),两个PostAt()用于投递延迟消息,内部都是调用DoDelayPost()来实现。

Post(): 即时消息的投递,源码如下。主要就是将函数的入参封装成一个即时消息Message对象,然后放置到即时队列msgq_的队尾。需要注意的有四点:

void MessageQueue::Post(const Location& posted_from, MessageHandler* phandler,
                        uint32_t id,  MessageData* pdata,  bool time_sensitive) {
  if (IsQuitting()) {
    delete pdata;
    return;
  }
  {
    CritScope cs(&crit_);
    Message msg;
    msg.posted_from = posted_from;
    msg.phandler = phandler;
    msg.message_id = id;
    msg.pdata = pdata;
    if (time_sensitive) {
      msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
    }
    msgq_.push_back(msg);
  }
  WakeUpSocketServer();
}

void MessageQueue::WakeUpSocketServer() {
  ss_->WakeUp();
}

PostDelayed(),PostAt(),DoDelayPost(): 延迟消息的投递,源码如下。PostDelayed(),PostAt()均是将各自的入参稍作转换后,再调用DoDelayPost()方法,将入参封装成延迟消息DelayedMesssage,然后加入到延迟消息队列dmsgq_中,并从IO多路复用的阻塞中唤醒来处理消息。与Post()方法中的做法并无二致。需要额外注意的地方有这么几点:

void MessageQueue::PostDelayed(const Location& posted_from, int cmsDelay,
                               MessageHandler* phandler, uint32_t id, MessageData* pdata) {
  return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id,
                     pdata);
}

void MessageQueue::PostAt(const Location& posted_from, uint32_t tstamp,
                          MessageHandler* phandler, uint32_t id, MessageData* pdata) {
  // This should work even if it is used (unexpectedly).
  int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
  return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata);
}

void MessageQueue::PostAt(const Location& posted_from, int64_t tstamp,
                          MessageHandler* phandler, uint32_t id, MessageData* pdata) {
  return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id,
                     pdata);
}

void MessageQueue::DoDelayPost(const Location& posted_from, int64_t cmsDelay, int64_t tstamp,
                               MessageHandler* phandler, uint32_t id, MessageData* pdata) {
  if (IsQuitting()) {
    delete pdata;
    return;
  }

  {
    CritScope cs(&crit_);
    Message msg;
    msg.posted_from = posted_from;
    msg.phandler = phandler;
    msg.message_id = id;
    msg.pdata = pdata;
    DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
    dmsgq_.push(dmsg);
    // If this message queue processes 1 message every millisecond for 50 days,
    // we will wrap this number.  Even then, only messages with identical times
    // will be misordered, and then only briefly.  This is probably ok.
    ++dmsgq_next_num_;
    RTC_DCHECK_NE(0, dmsgq_next_num_);
  }
  WakeUpSocketServer();
}

消息处理

Dispatch():方法内容一目了然,先打印一条trace日志;然后记录消息处理的开始时间start_time;调用消息的MessageHandler的OnMessage方法进行消息处理;记录消息处理的结束时间end_time;计算消息处理花费了多长时间diff,如果消息花费时间过程,超过kSlowDispatchLoggingThreshold(50ms),则打印一条警告日志,告知从哪儿构建的消息花费了多长时间才消费完。

void MessageQueue::Dispatch(Message* pmsg) {
  TRACE_EVENT2("webrtc", "MessageQueue::Dispatch", "src_file_and_line",
                pmsg->posted_from.file_and_line(), "src_func",
                pmsg->posted_from.function_name());
  int64_t start_time = TimeMillis();
  pmsg->phandler->OnMessage(pmsg);
  int64_t end_time = TimeMillis();
  int64_t diff = TimeDiff(end_time, start_time);
  if (diff >= kSlowDispatchLoggingThreshold) {
    RTC_LOG(LS_INFO) << "Message took " << diff
                      << "ms to dispatch. Posted from: "
                      << pmsg->posted_from.ToString();
  }
}

消息清理

Clear(): 清理函数逻辑也相当简单,目标也相当明确,就是要讲满足条件的消息从MQ中删除。需要注意的点有以下几个:

void MessageQueue::Clear(MessageHandler* phandler, uint32_t id, MessageList* removed) {
  CritScope cs(&crit_);
  ClearInternal(phandler, id, removed);
}

void MessageQueue::ClearInternal(MessageHandler* phandler, uint32_t id, MessageList* removed) {
  // Remove messages with phandler
  if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
    if (removed) {
      removed->push_back(msgPeek_);
    } else {
      delete msgPeek_.pdata;
    }
    fPeekKeep_ = false;
  }

  // Remove from ordered message queue
  for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
    if (it->Match(phandler, id)) {
      if (removed) {
        removed->push_back(*it);
      } else {
        delete it->pdata;
      }
      it = msgq_.erase(it);
    } else {
      ++it;
    }
  }

  // Remove from priority queue. Not directly iterable, so use this approach
  PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
  for (PriorityQueue::container_type::iterator it = new_end;
       it != dmsgq_.container().end(); ++it) {
    if (it->msg_.Match(phandler, id)) {
      if (removed) {
        removed->push_back(it->msg_);
      } else {
        delete it->msg_.pdata;
      }
    } else {
      *new_end++ = *it;
    }
  }
  dmsgq_.container().erase(new_end, dmsgq_.container().end());
  dmsgq_.reheap();
}

销毁消息

Dispose(): 之前在WebRTC源码分析-线程基础之Message && MessageData && MessageHandler中对DisposeData消息数据专门进行过阐述。再配合之前Get()方法中对DisposeData消息数据的处理方式,我们很容易理解该函数的作用:如果想要销毁某个对象,而不方便立马销毁,那么就可以将调用消息循环的Dispose()方法让消息循环帮忙进行数据销毁。

// Internally posts a message which causes the doomed object to be deleted
template <class T>
void Dispose(T* doomed) {
  if (doomed) {
    Post(RTC_FROM_HERE, nullptr, MQID_DISPOSE, new DisposeData<T>(doomed));
  }
}