WebRTC源码分析-线程基础3
原文出处:WebRTC源码分析-线程基础之消息循环,消息投递
前言
如之前的总述文章所述,rtc::Thread类封装了WebRTC中线程的一般功能,比如设置线程名称,启动线程执行用户代码,线程的join,sleep,run,stop等方法;同时也提供了线程内部的消息循环,以及线程之间以同步、异步方式投递消息,同步方式在目标线程执行方法并返回结果等线程之间交互的方式;另外,每个线程均持有SocketServer类成员对象,该类实现了IO多路复用功能。
本文将针对rtc::Thread类所提供消息循环,消息投递的功能进行介绍。由于Thread类是通过继承MessageQueue才具有此类功能,因此,在介绍Thread相关API实现之前应先介绍MessageQueue相关的知识:消息队里管理(MessageQueueManager),消息队列(MessageQueue),消息(Message,DelayedMessage),消息数据(MessageData,TypedMessageData,ScopedMessag eData,DisposeData)的相关知识。
- WebRTC源码分析-线程基础之MessageQueueManager
- WebRTC源码分析-线程基础之Message && MessageData && MesaageHandler
- WebRTC源码分析-线程基础之MessageQueue
Thread类在rtc_base/thread.h中声明,定义在rtc_base/thread.c中(只保留了消息循环以及消息投递相关的API):
class RTC_LOCKABLE Thread : public MessageQueue {
public:
virtual void Run();
virtual void Send(const Location& posted_from,
MessageHandler* phandler,
uint32_t id = 0,
MessageData* pdata = nullptr);
template <class ReturnT, class FunctorT>
ReturnT Invoke(const Location& posted_from, FunctorT&& functor) {
FunctorMessageHandler<ReturnT, FunctorT> handler(
std::forward<FunctorT>(functor));
InvokeInternal(posted_from, &handler);
return handler.MoveResult();
}
bool IsProcessingMessagesForTesting() override;
void Clear(MessageHandler* phandler,
uint32_t id = MQID_ANY,
MessageList* removed = nullptr) override;
void ReceiveSends() override;
bool ProcessMessages(int cms);
protected:
friend class ScopedDisallowBlockingCalls;
private:
void ReceiveSendsFromThread(const Thread* source);
bool PopSendMessageFromThread(const Thread* source, _SendMessage* msg);
void InvokeInternal(const Location& posted_from, MessageHandler* handler);
std::list<_SendMessage> sendlist_;
bool blocking_calls_allowed_ = true;
friend class ThreadManager;
RTC_DISALLOW_COPY_AND_ASSIGN(Thread);
};
消息循环的建立
由上一篇文章WebRTC源码分析-线程基础之线程基本功能的线程启动分析知道,用户在没有传入自己的Runnable对象时,新的线程上会执行Thread.Run()方法,该方法源码如下,内部会调用ProcessMessages(kForever)去运行消息循环。而用户如果实现了自己的Runnable对象时,也想要运行消息循环,那咋办嘛?下面的注释就告知你了,在Runnable.Run()中适时的调用ProcessMessages()方法就行。
// By default, Thread::Run() calls ProcessMessages(kForever). To do other
// work, override Run(). To receive and dispatch messages, call
// ProcessMessages occasionally.
void Thread::Run() {
ProcessMessages(kForever);
}
下面看看这个ProcessMessages()怎么建立消息循环的。分两种情形:
1)默认地,ProcessMessages(kForever),告知无限期进行处理。此时的情形就是函数内部的while循环不停的调用Get()去获取消息,然后处理消息Dispatch()。循环能够退出的条件就是Get方法返回false。由WebRTC源码分析-线程基础之MessageQueue分析Get()方法分析可知,无限期处理的情况下,只有循环停止工作或者IO处理出错才会导致Get()返回false。
2)如果ProcessMessages(int cmsLoop),有限期进行处理。那么退出循环的方式有两个,一个是使用时间已经到了,返回true;另外一个是Get()方法返回false,有限期处理的情况下,Get()返回false的条件有三:循环停止工作;IO处理出错;已经耗完所有处理时间也还未找到一个MSG。
bool Thread::ProcessMessages(int cmsLoop) {
// Using ProcessMessages with a custom clock for testing and a time greater
// than 0 doesn't work, since it's not guaranteed to advance the custom
// clock's time, and may get stuck in an infinite loop.
RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 ||
cmsLoop == kForever);
// 计算终止处理消息的时间
int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
// 下次可以进行消息获取的时间长度
int cmsNext = cmsLoop;
while (true) {
#if defined(WEBRTC_MAC)
ScopedAutoReleasePool pool;
#endif
// 获取消息
Message msg;
if (!Get(&msg, cmsNext))
return !IsQuitting();
// 处理消息
Dispatch(&msg);
// 若不是无限期,计算下次可以进行消息获取的时间。
if (cmsLoop != kForever) {
cmsNext = static_cast<int>(TimeUntil(msEnd));
// 若使用时间已经到了,那么退出循环
if (cmsNext < 0)
return true;
}
}
}
Post消息
向一个线程Post消息,只是简单地向消息循环的队列中插入一条待处理的消息,然后Post方法就会返回,不会引发当前线程的阻塞。Thread方法并没有重写MQ的 Post方法,因此,关于Post方法的细节分析见 WebRTC源码分析-线程基础之MessageQueue
Send消息
向一个线程Send消息,会阻塞当前线程的运行,直到该消息被目标线程消费完后才会解除阻塞,从Send方法返回。算法流程如源码及其注释如下(分9个步骤):
void Thread::Send(const Location& posted_from,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {
// 目标线程的消息循环是否还在处理消息? // 步骤1
if (IsQuitting())
return;
// 创建需要处理的消息 // 步骤2
Message msg;
msg.posted_from = posted_from;
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
// 若目标线程就是自己,那么直接在此处处理完消息就ok // 步骤3
if (IsCurrent()) {
phandler->OnMessage(&msg);
return;
}
// 断言当前线程是否具有阻塞权限,无阻塞权限, // 步骤4
// 那么向别的线程Send消息就是个非法操作
AssertBlockingIsAllowedOnCurrentThread();
// 确保当前线程有一个Thread对象与之绑定 // 步骤5
AutoThread thread;
Thread* current_thread = Thread::Current();
RTC_DCHECK(current_thread != nullptr); // AutoThread ensures this
// 创建一个SendMessage对象,放置到目标线程对象的sendlist_ // 步骤6
// ready表征该消息是否已经处理完。
bool ready = false;
{
CritScope cs(&crit_);
_SendMessage smsg;
smsg.thread = current_thread;
smsg.msg = msg;
smsg.ready = &ready;
sendlist_.push_back(smsg);
}
// 将目标线程从IO处理中唤醒,赶紧处理消息啦~ // 步骤7
// 目标线程将在其消息循环中,调用ReceiveSends()处理Send消息~~
WakeUpSocketServer();
// 同步等待消息被处理 // 步骤8
bool waited = false;
crit_.Enter();
while (!ready) {
crit_.Leave();
// 对方也可能向我Send了消息,可不能都互相阻塞住了
// 处理对方可能Send给我的消息。
current_thread->ReceiveSendsFromThread(this);
// 处理完对方的Send消息后,阻塞等待对方处理完我Send的消息,然后来唤醒我吧
// 但这儿会有个意外,这就是waited存在的意义了
current_thread->socketserver()->Wait(kForever, false);
waited = true;
crit_.Enter();
}
crit_.Leave();
// 如果出现过waited,那么再唤醒一次当前线程去处理Post消息。 // 步骤9
if (waited) {
current_thread->socketserver()->WakeUp();
}
}
要理解上述算法,需要搞清楚Send方法的代码是在当前线程执行的,而调用的是目标线程对象Thread的Send方法,即Send方法中的this,是目标线程线程对象Thread。捋清楚这点非常重要!!!这儿我一步步分析上述算法过程:
- 判断目标线程的消息循环是否仍在工作:IsQuitting()是目标线程对象Thread的方法,但是是在当前线程中执行的!若消息循环停止工作,那么会拒绝处理消息,Send会直接返回,但是调用方是无法获知的。一般建议是在向线程发送消息之前调用IsProcessingMessagesForTesting()判断下该消息循环是否还在正常运行。
- 创建需要消费的消息对象:此处没有什么可以多说的
- 判断目标线程是否就是当前线程: 通过Thread.IsCurrent()可以判别这点,如果目标线程就是当前线程,那就是自己给自己Send消息了,直接在此处消费消息并返回。
- 断言当前线程是否允许阻塞: 注意,这儿不是断言目标线程。因为,向另外一个线程Send消息时,当前线程需要阻塞地等待目标线程处理完消息后才返回。如果,当前线程没有阻塞权限的话,那就是非法操作了。
- 确保当前线程有一个关联的Thread对象: 为什么?因为后续的阻塞唤醒操作都要通过Thread对象的方法来实现,如果当前线程没有关联Thread对象,那么这些操作就无法完成。怎么做?通过创建一个局部对象AutoThread thread来实现。源码如下,注意两点:
1)只有当当前线程无Thread关联时,才会将AutoThread作为当前线程的关联Thread;
2)由于AutoThread thread是局部对象,当Send函数结束时该对象生命周期走到尾声,可以利用其析构函数中需要恢复当前对象无Thread对象绑定的状态(当然,前提是之前就无Thread对象关联)。
AutoThread::AutoThread()
: Thread(SocketServer::CreateDefault(), /*do_init=*/false) {
DoInit();
if (!ThreadManager::Instance()->CurrentThread()) {
ThreadManager::Instance()->SetCurrentThread(this);
}
}
AutoThread::~AutoThread() {
Stop();
DoDestroy();
if (ThreadManager::Instance()->CurrentThread() == this) {
ThreadManager::Instance()->SetCurrentThread(nullptr);
}
}
- 创建_SendMessage实例,并入队:
_SendMessage结构体的声明如下,该对象被创建后,会进入目标线程的sendlist_。其中Thread* thread存储的是主动投放消息的当前线程。当目标线程在消费msg之后,会将ready标志置为true,并且通过thread->WakeUp()解除当前线程的阻塞,从而判断ready后获知Send消息已经被消费了。
struct _SendMessage {
_SendMessage() {}
Thread* thread; // 当前线程对象
Message msg; // 消息
bool* ready; // 消息是否处理完毕的标志
};
- 唤醒目标线程处理消息: 当
_SendMessage消息已经进入目标线程的sendlist_队列了,当然是要唤醒目标线程去处理啦,MQ的WakeUpSocketServer()就干这个事。好像很简单?这里代码没有体现的一点是:目标线程是如何处理这个_SendMessage消息的,WebRTC源码分析-线程基础之MessageQueue 提到过 MQ的Get()方法最开始就是优先地,阻塞地调用它的ReceiveSends()处理Send消息,可惜的是MQ的该方法是个vitural方法,并且啥也没干,刚好Thread对象就重写了ReceiveSends()方法,这正是处理Send消息最佳之处。由于ReceiveSends()是在目标线程干的事,为了不打乱节奏,此处不展开描述ReceiveSends()。 - 同步等待Send消息被处理:虽然代码没几行,但是整个过程中最难理解的地方,慢慢展开来说,把这块儿的代码再次贴出来。
bool waited = false;
crit_.Enter();
while (!ready) {
crit_.Leave();
current_thread->ReceiveSendsFromThread(this);
current_thread->socketserver()->Wait(kForever, false);
waited = true;
crit_.Enter();
}
crit_.Leave();
1)ready这个参数会被当前线程访问,也会被目标线程访问,必然需要加锁,并且要达到两个线程的互斥,还必须要使用同一个锁。而这个锁就是目标线程的线程对象Thread的CriticalSection crit_成员。因此,while循环开头读取ready时进行了加锁解锁操作。
2)按理说,接下来当前线程阻塞等待目标线程完成操作之后通知我解除阻塞就行了。但是,考虑到一点,如果,两个线程同时互相Send消息,那岂不是二者都卡在等待对方通知这个地方,死锁了?为了避免这个情况,在阻塞等待前,先处理所有别人Send给我的消息吧。调用current_thread->ReceiveSendsFromThread(this)进行处理,注意current_thread是当前线程对象,因而是处理的当前线程所接收到的Send消息,而this是目标线程对象。 需要注意一点PopSendMessageFromThread()方法只会把source线程发送的消息从队列中取出,此时source为this,即目标线程对象。所以,此处ReceiveSendsFromThread()只会处理目标线程Send给当前线程的消息,此时,ready置为true,并调用目标线程的WakeUp()方法,唤醒了目标线程,使得目标线程不会阻塞在Send方法中,从而使得目标线程有机会去运行其消息循环,从而消费当前线程Send给它的消息。很绕很绕,我已经尽力了。
void Thread::ReceiveSendsFromThread(const Thread* source) {
_SendMessage smsg;
crit_.Enter();
while (PopSendMessageFromThread(source, &smsg)) {
crit_.Leave();
Dispatch(&smsg.msg);
crit_.Enter();
*smsg.ready = true;
smsg.thread->socketserver()->WakeUp();
}
crit_.Leave();
}
bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) {
for (std::list<_SendMessage>::iterator it = sendlist_.begin();
it != sendlist_.end(); ++it) {
if (it->thread == source || source == nullptr) {
*msg = *it;
sendlist_.erase(it);
return true;
}
}
return false;
}
3)处理完目标线程可能Send给我的消息之后,我终于可以安心地阻塞在IO上等待了,current_thread->socketserver()->Wait(kForever, false)。这时,我们看看目标线程如何操作。之前在步骤7中,已经阐述过,目标线程被唤醒后在消息循环中优先阻塞地调用ReceiveSends()方法来处理Send消息,而且ReceiveSends()是被Thread重写过的方法。代码如下:咦,这不是上面调用的ReceiveSendsFromThread()方法嘛?不过传入的指针为空,此时PopSendMessageFromThread()会将目标线程上收到的所有Send消息都拿出来消费完,设置标志位,然后调用消息发送者线程的WakeUp()来唤醒消息发送者。到此处,终于把这个Send逻辑搞清楚了。完了?还没完呢,唤醒该线程的一定就是目标线程处理完Send消息之后嘛?还有这个waited干嘛用的啊?
void Thread::ReceiveSends() {
ReceiveSendsFromThread(nullptr);
}
4)当前线程沉睡在current_thread->socketserver()->Wait(kForever, false)上时,唤醒该线程的一定就是目标线程处理完Send消息之后嘛?那可不一定,可能是别的线程Wake了它,也可能是目标线程Post了一条消息给当前线程(此时会唤醒当前线程,详见Post消息)。那么意味着当前线程Send消息可能还未处理完,自己就被醒了,那么又得进行While中的ready变量的访问了,又得加锁,解锁,把这个过程再走一遍。
5)这个waited变量干嘛用?就代码而言,我们发现waited变量进入了一次while循环就会变成true,表示我等待过至少一次。没有等待过的原因就是目标线程干活很麻利,在第一次判断ready值的时候就已经把活干完了,Send消息已被消费。具体记录这个等待过一次是做什么用呢?看步骤9
- 再唤醒一次 如果进入过一次循环等待,那么waited变量为true,需要再次唤醒当前线程一次
current_thread->socketserver()->WakeUp(),让当前线程能处理消息。其实原因在源码的注释上写得很明白了,我就不翻译了。完毕。
// Our Wait loop above may have consumed some WakeUp events for this
// MessageQueue, that weren't relevant to this Send. Losing these WakeUps can
// cause problems for some SocketServers.
//
// Concrete example:
// Win32SocketServer on thread A calls Send on thread B. While processing the
// message, thread B Posts a message to A. We consume the wakeup for that
// Post while waiting for the Send to complete, which means that when we exit
// this loop, we need to issue another WakeUp, or else the Posted message
// won't be processed in a timely manner.
if (waited) {
current_thread->socketserver()->WakeUp();
}
Invoke跨线程同步执行方法
Invoke方法提供了一个方便的方式:阻塞当前线程,在另外一个线程上同步执行方法,并且返回执行结果。
本质上就是将需要执行的方法封装到消息处理器FunctorMessageHandler中,然后向目标线程Send这个携带特殊消息处理器FunctorMessageHandler的消息,该消息被消费后,阻塞结束,FunctorMessageHandler对象携带了方法执行的结果,当前线程可以从中获取到执行结果。其实,这里的重点有二:
- FunctorMessageHandler类的封装,见WebRTC源码分析-线程基础之Message && MessageData && MesaageHandler
- Invoke的本质是调用Send消息的方式来执行方法。
template <class ReturnT, class FunctorT>
ReturnT Invoke(const Location& posted_from, FunctorT&& functor) {
FunctorMessageHandler<ReturnT, FunctorT> handler(
std::forward<FunctorT>(functor));
InvokeInternal(posted_from, &handler);
return handler.MoveResult();
}
void Thread::InvokeInternal(const Location& posted_from,
MessageHandler* handler) {
TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file_and_line",
posted_from.file_and_line(), "src_func",
posted_from.function_name());
Send(posted_from, handler);
}
总结
本文比较详细的介绍了Thread的消息循环,线程间Post消息,Send消息,跨线程执行方法等功能。由于Thread是通过继承MessageQueue才具有这些功能,因此,需要结合另外3篇文章来一起看。
- WebRTC源码分析-线程基础之MessageQueueManager
- WebRTC源码分析-线程基础之Message && MessageData && MesaageHandler
- WebRTC源码分析-线程基础之MessageQueue
本文对Post消息(非阻塞)只是一笔带过,因为在Thread并没有改写Post方法,而直接是MessageQueue的Post。
本文对Send消息(阻塞)重点拆解了一番,自己倒是理解了,但不知道是否描述的够清晰,是否有错误。博客的作用嘛,就是为了能够理清楚自己的思路,顺便也为他人做些贡献。如果有不对的地方,看到此处的同路人可以指出
本文还对跨线程执行方法提供了一种便捷的方式,本质就是封装一个特殊的消息处理器FunctorMessageHandler到消息中,并使用Send方法使得目标线程消费消息时执行该方法。
原文出处:WebRTC源码分析-线程基础之跨线程同步MethodCall
前言
MethodCall类位于WebRTC的api/proxy.h中,如其名而知该类的作用是进行某个方法调用。实际上,是用来实现某个指定方法在指定线程上同步执行并返回结果的效果。其实该文件中并没有真正的MethodCall类,而是MethodCall0,MethodCall1,…,MethodCall5,还有ConstMethodCall0,ConstMethodCall1。这些数字代表什么意思?意思是这个指定方法的入参个数,Const是指入参不能被修改。
本文只以MethodCall0为例进行说明,其余的不过是参数个数不同而已,没有实质区别。当然辅助MethodCall0实现同步的还有另外两个类:ReturnType以及SynchronousMethodCall。
MethodCall0
MethodCall0源码如下所示,该类是一个模板类。
template <typename C, typename R>
class MethodCall0 : public rtc::Message, public rtc::MessageHandler {
public:
typedef R (C::*Method)();
MethodCall0(C* c, Method m) : c_(c), m_(m) {}
R Marshal(const rtc::Location& posted_from, rtc::Thread* t) {
internal::SynchronousMethodCall(this).Invoke(posted_from, t);
return r_.moved_result();
}
private:
void OnMessage(rtc::Message*) { r_.Invoke(c_, m_); }
C* c_;
Method m_;
ReturnType<R> r_;
};
先看一下实际使用中,如何使用MethodCall0。如下源码所示,目标是要实现PeerConnectionFactory类对象pc_factory的指定方法Initialize()在signaling_thread中被同步被执行,当前线程阻塞的获取该方法的执行结果赋值给result。
MethodCall0<PeerConnectionFactory, bool> call(
pc_factory.get(), &PeerConnectionFactory::Initialize);
bool result = call.Marshal(RTC_FROM_HERE, pc_factory->signaling_thread());
对照MethodCall0源码以及实例,可知由模板生成的实际MethodCall0类为以下代码所示。
template <typename PeerConnectionFactory, typename bool>
class MethodCall0 : public rtc::Message, public rtc::MessageHandler {
public:
typedef bool (PeerConnectionFactory::*Initialize)();
MethodCall0(PeerConnectionFactory* c, Initialize m) : c_(c), m_(m) {}
bool Marshal(const rtc::Location& posted_from, rtc::Thread* t) {
internal::SynchronousMethodCall(this).Invoke(posted_from, t);
return r_.moved_result();
}
private:
void OnMessage(rtc::Message*) { r_.Invoke(c_, m_); }
PeerConnectionFactory* c_;
Initialize m_;
ReturnType<bool> r_;
};
那么实际代码就是先创建一个MethodCall0对象call,并调用MethodCall0的Marshal()方法获取并返回结果。那么我们来一步步拆解Marshal()方法:
- 创建SynchronousMethodCall,并将MethodCall0自身作为入参。
- 调用SynchronousMethodCall的Invoke方法,Location对象以及指定线程
signaling_thread作为入参。 - 调用MethodCall0对象的私有成员
ReturnType<**bool**> r_的moved_result()获取返回值。
由于第1,2步涉及到一个新的类SynchronousMethodCall,下面看下该类的源码:
SynchronousMethodCall
该类的声明如下:
class SynchronousMethodCall : public rtc::MessageData,
public rtc::MessageHandler {
public:
explicit SynchronousMethodCall(rtc::MessageHandler* proxy);
~SynchronousMethodCall() override;
void Invoke(const rtc::Location& posted_from, rtc::Thread* t);
private:
void OnMessage(rtc::Message*) override;
rtc::Event e_;
rtc::MessageHandler* proxy_;
};
该类的实现如下:
SynchronousMethodCall::SynchronousMethodCall(rtc::MessageHandler* proxy)
: proxy_(proxy) {}
SynchronousMethodCall::~SynchronousMethodCall() = default;
void SynchronousMethodCall::Invoke(const rtc::Location& posted_from,
rtc::Thread* t) {
if (t->IsCurrent()) {
proxy_->OnMessage(nullptr);
} else {
t->Post(posted_from, this, 0);
e_.Wait(rtc::Event::kForever);
}
}
void SynchronousMethodCall::OnMessage(rtc::Message*) {
proxy_->OnMessage(nullptr);
e_.Set();
}
第一步在SynchronousMethodCall构造时,MethodCall0对象作为入参传入,因此SynchronousMethodCall.proxy_就是MethodCall0对象。
第二步调用SynchronousMethodCall.Invoke()方法,传入了指定线程signaling_thread。有两种情况:
- 若执行Invoke()的当前线程就是指定线程
signaling_thread,那么直接在当前线程调用MethodCall0.OnMessage()即可,该方法使得指定方法被执行,同时也确保在指定线程上执行的,因为当前线程就是指定线程嘛。且结果被保存在MethodCall0.r_成员中,MethodCall0的OnMessage()的分析且往后放,因为r_为ReturnType<bool>的实例,涉及到第三个类ReturnType。 - 若Invoke()的当前线程不是指定线程
signaling_thread,那么就需要调用指定线程的Post方法,将SynchronousMethodCall作为一个消息投放到指定线程,Post方法不仅投放了消息,并且会唤醒指定线程去处理消息。后续在当前线程上调用Event.Wait()方法使得当前线程阻塞在Event对象上。等待SynchronousMethodCall对象在指定线程线程上被消费,即执行SynchronousMethodCall.OnMessage方法,该方法中执行MethodCall0.OnMessage(),又回到了上面的那种情况,即在指定线程中执行了MethodCall0.OnMessage(),使得指定方法在指定线程上执行。然后Event.Set()将当前线程唤醒,结束阻塞。
饶了很远,其实SynchronousMethodCall.Invoke()的作用就是保证了MethodCall0.OnMessage方法在指定的线程上运行,此时的一切又绕回到MethodCall0类了。看看MethodCall0.OnMessage,为了不走回头路,源码粘贴如下
void OnMessage(rtc::Message*) { r_.Invoke(c_, m_); }
哦,原来就是执行了ReturnType<bool>.Invoke(PeerConnectionFactory,Initialize)方法。那么来看看第三个类ReturnType吧,又是一个模板类~
template <typename R>
class ReturnType {
public:
template <typename C, typename M>
void Invoke(C* c, M m) {
r_ = (c->*m)();
}
template <typename C, typename M, typename T1>
void Invoke(C* c, M m, T1 a1) {
r_ = (c->*m)(std::move(a1));
}
template <typename C, typename M, typename T1, typename T2>
void Invoke(C* c, M m, T1 a1, T2 a2) {
r_ = (c->*m)(std::move(a1), std::move(a2));
}
template <typename C, typename M, typename T1, typename T2, typename T3>
void Invoke(C* c, M m, T1 a1, T2 a2, T3 a3) {
r_ = (c->*m)(std::move(a1), std::move(a2), std::move(a3));
}
template <typename C,
typename M,
typename T1,
typename T2,
typename T3,
typename T4>
void Invoke(C* c, M m, T1 a1, T2 a2, T3 a3, T4 a4) {
r_ = (c->*m)(std::move(a1), std::move(a2), std::move(a3), std::move(a4));
}
template <typename C,
typename M,
typename T1,
typename T2,
typename T3,
typename T4,
typename T5>
void Invoke(C* c, M m, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5) {
r_ = (c->*m)(std::move(a1), std::move(a2), std::move(a3), std::move(a4),
std::move(a5));
}
R moved_result() { return std::move(r_); }
private:
R r_;
};
代码稍微有点长~因为他大爷的目标方法可能的入参有0 1 2 3 4 5个。。所以Invoke方法也有0 1 2 3 4 5共6个。Initialize方法因为没有入参,所以简单,就第一个Invoke了,该方法就是调用了PeerConnectionFactory.Initialize(),并将结果存入成员bool r_中。有木有,一切皆以明了!!
好了,终于可以回到最开头的MethodCall0.Marshal()的第三步了,也很简单,就是使用std::move移动语义将ReturnType持有的结果 返回出来。至此,一切明了,我已打出了gg。
return r_.moved_result();
结束了吗?
没,其实在看SynchronousMethodCall部分实现的时候,其作用就是为了确保不论是在哪个线程中调用,MethodCall0.OnMessage方法都能在目标线程中执行。为了实现这点还使用了Thread.Post方法+Event.Wait()。为什么不使用Thread.Send方法或者Thread.Invoke方法直接实现呢?根本就不需要这么绕,甚至SynchronousMethodCall对象都没有存在的必要,如此推敲出来MethodCall0也没有必要存在。。。回到之前那个示例,即WebRTC源码分析-呼叫建立过程之二(创建PeerConnectionFactory)中提过,如果目标线程是没有运行消息循环的线程,那么Thread.Send和Thread.Invoke是无法正常工作的,这正是MethodCall0存在的必要性。然而,分析完MethodCall0发现其依赖了Thread.Post方法来实现其功能,该方法也是在消息循环没有运行时无法正常工作。Oh~God,大型车祸翻车现场。在哪儿翻车的?到此还未理清楚。
原文出处:WebRTC源码分析-线程安全之Proxy,防止线程乱入
0 前言
在之前的文章WebRTC源码分析-呼叫建立过程之二(创建PeerConnectionFactory)分析PeerConnectionFactory对象的创建过程中,最后一步惊觉最终返给用户层的并非是PeerConnectionFactory对象,而是其代理对象PeerConnectionFactoryProxy。如果不深入看源码,应用层是无感知的,还以为是在直接操作PeerConnectionFactory实体对象呢,毕竟应用层的指针是PeerConnectionFactoryInterface,一般而言,第一观感肯定指向的是对应的Implement对象。
为什么如此设计呢?我们知道WebRTC是个多线程的程序,最直观的感受就是肯定会有三个基础线程signaling_thread(信令),worker_thread(工作者),network_thread(网络)。并且另外一个非常重要的点在于WebRTC内部很多对象的方法必须在指定的线程中执行,比如之前分析的PeerConnectionFactory所有成员方法必须在signaling_thread(信令)中执行,并且每个方法的开始位置都有一个断言,形如以下代码所示。如果方法没有在对应线程执行,程序走到此处必然嗝屁。
rtc::scoped_refptr<AudioSourceInterface>
PeerConnectionFactory::CreateAudioSource(const cricket::AudioOptions& options) {
RTC_DCHECK(signaling_thread_->IsCurrent());
rtc::scoped_refptr<LocalAudioSource> source(
LocalAudioSource::Create(&options));
return source;
}
站在用户的角度来讲,如果直接面向的是如此接口的话,我们势必需要了解每个接口的具体实现,以便知道使用这些方法时让其在哪个线程上去执行,否则走到哪哪都是崩溃。面对如此设计的库,即使程序不崩溃,使用者肯定也是很崩溃的,何况WebRTC的api层对外暴露的都是Interface,应用层哪知道实际持有的到底是哪个Implement类对象呢?更何况应用层还不一定知道所有线程的存在呢,毕竟正如example/peerconnection_client示例工程那样,应用层根本就不会去创建上面三个基础线程,都是WebRTC内部创建的。
所以,为了使得用户使用WebRTC时可以无忧无虑,随处(不用考虑在哪个线程中)任意调用,Proxy代理层就合理的,巧妙的诞生了,据说Chrome也使用了相同的思想,相同的技术。Proxy代理层使得用户想调用某个对方的某个方法时(实际上操作的是对应的Proxy对象),将使得方法调用被代理到期待的对象的期待的方法上,并且是在恰当的线程上同步执行。
以下,我们来揭秘下Proxy代理层是如何实现的,是如何解决上述问题的
1 望而生畏的宏定义
WebRTC源码分析-呼叫建立过程之二(创建PeerConnectionFactory)文章中,提到PeerConnectionFactoryProxy是通过宏定义产生的,我们正好以PeerConnectionFactory代理的产生过程为入口点,一步步分析。
首先,在PeerConnectionFactory代理产生在api/peer_connection_factory_proxy.h文件中,该文件是专门为生成PeerConnectionFactory代理类而存在的。看上去很复杂,宏种类很多,简化来看,其实这里面的宏定义就4种而已:
BEGIN_SIGNALING_PROXY_MAP:用来产生代理类的名称,形如"class PeerConnectionFactoryProxy {";PROXY_SIGNALING_THREAD_DESTRUCTOR:用来产生析新类的构函数;PROXY_METHOD1~PROXY_METHOD4:用来产生新类的方法,数字代表入参个数;END_PROXY_MAP():用来产生类声明的结尾,形如"};"
namespace webrtc {
BEGIN_SIGNALING_PROXY_MAP(PeerConnectionFactory)
PROXY_SIGNALING_THREAD_DESTRUCTOR()
PROXY_METHOD1(void, SetOptions, const Options&)
PROXY_METHOD4(rtc::scoped_refptr<PeerConnectionInterface>,
CreatePeerConnection,
const PeerConnectionInterface::RTCConfiguration&,
std::unique_ptr<cricket::PortAllocator>,
std::unique_ptr<rtc::RTCCertificateGeneratorInterface>,
PeerConnectionObserver*);
PROXY_METHOD2(rtc::scoped_refptr<PeerConnectionInterface>,
CreatePeerConnection,
const PeerConnectionInterface::RTCConfiguration&,
PeerConnectionDependencies);
PROXY_CONSTMETHOD1(webrtc::RtpCapabilities,
GetRtpSenderCapabilities,
cricket::MediaType);
PROXY_CONSTMETHOD1(webrtc::RtpCapabilities,
GetRtpReceiverCapabilities,
cricket::MediaType);
PROXY_METHOD1(rtc::scoped_refptr<MediaStreamInterface>,
CreateLocalMediaStream,
const std::string&)
PROXY_METHOD1(rtc::scoped_refptr<AudioSourceInterface>,
CreateAudioSource,
const cricket::AudioOptions&)
PROXY_METHOD2(rtc::scoped_refptr<VideoTrackInterface>,
CreateVideoTrack,
const std::string&,
VideoTrackSourceInterface*)
PROXY_METHOD2(rtc::scoped_refptr<AudioTrackInterface>,
CreateAudioTrack,
const std::string&,
AudioSourceInterface*)
PROXY_METHOD2(bool, StartAecDump, rtc::PlatformFile, int64_t)
PROXY_METHOD0(void, StopAecDump)
END_PROXY_MAP()
} // namespace webrtc
上面简要的分析,实际上并不完全正确,但可以帮助理解。要了解详情,势必要将这些宏定义进一步的展开才能一探究竟。那我们就进一步的挖掘吧,这些宏定义都位于api/proxy.h的文件中。
2 api/proxy.h
以下是对上述四个宏比较繁琐的拆分分析,比较枯燥,就像拿起了一个Puzzle,慢慢来拼起来,最终一窥全貌
2.1 BEGIN_SIGNALING_PROXY_MAP
#define BEGIN_SIGNALING_PROXY_MAP(c) \
PROXY_MAP_BOILERPLATE(c) \
SIGNALING_PROXY_MAP_BOILERPLATE(c) \
REFCOUNTED_PROXY_MAP_BOILERPLATE(c) \
public: \
static rtc::scoped_refptr<c##ProxyWithInternal> Create( \
rtc::Thread* signaling_thread, INTERNAL_CLASS* c) { \
return new rtc::RefCountedObject<c##ProxyWithInternal>(signaling_thread, \
c); \
}
该宏可以解释为另外3个宏+一个Create方法,如果将PeerConnectionFactory代入进去的话将是
public:
static rtc::scoped_refptr<PeerConnectionFactoryProxyWithInternal> Create(
rtc::Thread* signaling_thread, INTERNAL_CLASS* c) {
return new rtc::RefCountedObject<PeerConnectionFactoryProxyWithInternal>(signaling_thread, c);
}
有没有觉得很神奇,代理类名称好像是PeerConnectionFactoryProxyWithInternal。我们继续
2.1.1 PROXY_MAP_BOILERPLATE
#define PROXY_MAP_BOILERPLATE(c) \
template <class INTERNAL_CLASS> \
class c##ProxyWithInternal; \
typedef c##ProxyWithInternal<c##Interface> c##Proxy; \
template <class INTERNAL_CLASS> \
class c##ProxyWithInternal : public c##Interface { \
protected: \
typedef c##Interface C; \
\
public: \
const INTERNAL_CLASS* internal() const { return c_; } \
INTERNAL_CLASS* internal() { return c_; }
代入PeerConnectionFactory,变为:
template <class INTERNAL_CLASS>
class PeerConnectionFactoryProxyWithInternal;
typedef PeerConnectionFactoryProxyWithInternal<PeerConnectionFactoryInterface> PeerConnectionFactoryProxy;
template <class INTERNAL_CLASS>
class PeerConnectionFactoryProxyWithInternal : public PeerConnectionFactoryInterface {
protected:
typedef PeerConnectionFactoryInterface C;
public:
const INTERNAL_CLASS* internal() const { return c_; }
INTERNAL_CLASS* internal() { return c_; }
2.1.2 SIGNALING_PROXY_MAP_BOILERPLATE
#define SIGNALING_PROXY_MAP_BOILERPLATE(c) \
protected: \
c##ProxyWithInternal(rtc::Thread* signaling_thread, INTERNAL_CLASS* c) \
: signaling_thread_(signaling_thread), c_(c) {} \
\
private: \
mutable rtc::Thread* signaling_thread_;
代入PeerConnectionFactory,变为:
protected:
PeerConnectionFactoryProxyWithInternal(rtc::Thread* signaling_thread, INTERNAL_CLASS* c)
: signaling_thread_(signaling_thread), c_(c) {}
private:
mutable rtc::Thread* signaling_thread_;
2.1.3 REFCOUNTED_PROXY_MAP_BOILERPLATE
#define REFCOUNTED_PROXY_MAP_BOILERPLATE(c) \
protected: \
~c##ProxyWithInternal() { \
MethodCall0<c##ProxyWithInternal, void> call( \
this, &c##ProxyWithInternal::DestroyInternal); \
call.Marshal(RTC_FROM_HERE, destructor_thread()); \
} \
\
private: \
void DestroyInternal() { c_ = nullptr; } \
rtc::scoped_refptr<INTERNAL_CLASS> c_;
代入PeerConnectionFactory,变为:
protected:
~PeerConnectionFactoryProxyWithInternal() {
MethodCall0<PeerConnectionFactoryProxyWithInternal, void> call(
this, &PeerConnectionFactoryProxyWithInternal::DestroyInternal);
call.Marshal(RTC_FROM_HERE, destructor_thread());
}
private:
void DestroyInternal() { c_ = nullptr; }
rtc::scoped_refptr<INTERNAL_CLASS> c_;
2.2 PROXY_SIGNALING_THREAD_DESTRUCTOR
#define PROXY_SIGNALING_THREAD_DESTRUCTOR() \
private: \
rtc::Thread* destructor_thread() const { return signaling_thread_; } \
\
public: // NOLINTNEXTLINE
2.3 PROXY_METHOD
关于PROXY_METHOD宏系列,打算只举一个示例,其他的都基本相同,没有必要挨个分析。
#define PROXY_METHOD1(r, method, t1) \
r method(t1 a1) override { \
MethodCall1<C, r, t1> call(c_, &C::method, std::move(a1)); \
return call.Marshal(RTC_FROM_HERE, signaling_thread_); \
}
以PeerConnectionFactory的const Options& options() const { return options_;}方法为例,宏定义
PROXY_METHOD1(void, SetOptions, const Options&)
展开为:
void SetOptions(const Options& a1) override {
MethodCall1<PeerConnectionFactoryInterface, void , a1> call(c_, &PeerConnectionFactoryInterface::SetOptions, std::move(a1));
return call.Marshal(RTC_FROM_HERE, signaling_thread_);
}
2.4 END_PROXY_MAP()
#define END_PROXY_MAP() \
};
3 拼图
3.1 模板类PeerConnectionFactoryProxyWithInternal
上述拆分已完毕,接下来我们将上述的拼图的各个部分组合到一起看看将产生一个什么样的类:
template <class INTERNAL_CLASS> class PeerConnectionFactoryProxyWithInternal;
typedef PeerConnectionFactoryProxyWithInternal<PeerConnectionFactoryInterface> PeerConnectionFactoryProxy;
template <class INTERNAL_CLASS>
class PeerConnectionFactoryProxyWithInternal : public PeerConnectionFactoryInterface {
protected:
typedef PeerConnectionFactoryInterface C;
public:
const INTERNAL_CLASS* internal() const { return c_; }
INTERNAL_CLASS* internal() { return c_; }
protected:
PeerConnectionFactoryProxyWithInternal(rtc::Thread* signaling_thread, INTERNAL_CLASS* c)
: signaling_thread_(signaling_thread), c_(c) {}
private:
mutable rtc::Thread* signaling_thread_;
protected:
~PeerConnectionFactoryProxyWithInternal() {
MethodCall0<PeerConnectionFactoryProxyWithInternal, void> call(
this, &PeerConnectionFactoryProxyWithInternal::DestroyInternal);
call.Marshal(RTC_FROM_HERE, destructor_thread());
}
private:
void DestroyInternal() { c_ = nullptr; }
rtc::scoped_refptr<INTERNAL_CLASS> c_;
public:
static rtc::scoped_refptr<PeerConnectionFactoryProxyWithInternal> Create(
rtc::Thread* signaling_thread, INTERNAL_CLASS* c) {
return new rtc::RefCountedObject<PeerConnectionFactoryProxyWithInternal>(signaling_thread, c);
}
private:
rtc::Thread* destructor_thread() const { return signaling_thread_; }
public: // NOLINTNEXTLINE
void SetOptions(const Options& a1) override {
MethodCall1<PeerConnectionFactoryInterface, void , const Options&> call(c_, &PeerConnectionFactoryInterface::SetOptions, std::move(a1));
return call.Marshal(RTC_FROM_HERE, signaling_thread_);
}
};
由上述产生的代码(仍然感觉有点凌乱)可以看出定义了一个template <class INTERNAL_CLASS>PeerConnectionFactoryProxyWithInternal模板类,该类继承于PeerConnectionFactoryInterface;
3.2 模板类实例化 PeerConnectionFactoryProxy
同时,定义了PeerConnectionFactoryProxy为PeerConnectionFactoryProxyWithInternal<PeerConnectionFactoryInterface>; 是该模板的实例化,我们直接看这个模板实例化的类,并稍微整理下,将更加清晰,最终我们将看到一个这样的类(当然,由于需要代理的公有方法太多,因此只以SetOptions方法为示例,省略其他的方法):
class PeerConnectionFactoryProxy : public PeerConnectionFactoryInterface {
public:
static rtc::scoped_refptr<PeerConnectionFactoryProxy> Create(
rtc::Thread* signaling_thread, PeerConnectionFactoryInterface* c) {
return new rtc::RefCountedObject<PeerConnectionFactoryProxy>(signaling_thread, c);
}
void SetOptions(const Options& a1) override {
MethodCall1<PeerConnectionFactoryInterface, void , const Options&> call(c_, &PeerConnectionFactoryInterface::SetOptions, std::move(a1));
return call.Marshal(RTC_FROM_HERE, signaling_thread_);
}
protected:
PeerConnectionFactoryProxy(rtc::Thread* signaling_thread, PeerConnectionFactoryInterface* c)
: signaling_thread_(signaling_thread), c_(c) {}
~PeerConnectionFactoryProxy() {
MethodCall0<PeerConnectionFactoryProxy, void> call(
this, &PeerConnectionFactoryProxy::DestroyInternal);
call.Marshal(RTC_FROM_HERE, destructor_thread());
}
private:
void DestroyInternal() { c_ = nullptr; }
rtc::Thread* destructor_thread() const { return signaling_thread_; }
private:
rtc::scoped_refptr<PeerConnectionFactoryInterface> c_;
mutable rtc::Thread* signaling_thread_;
};
这下就比较容易理解了:
- PeerConnectionFactoryProxy也继承于PeerConnectionFactoryInterface,这样可以制造一种假象,在应用层我们持有的是PeerConnectionFactory实体对象,其实不然。
- Create方法创建的是PeerConnectionFactoryProxy对象,该对象内部持有PeerConnectionFactoryInterface类别的成员(指向实体对象PeerConnectionFactory),以及目标线程对象
signaling_thread_。 - 执行PeerConnectionFactoryProxy的其他公有方法,比如SetOptions(),内部将会执行PeerConnectionFactory的SetOptions()方法,并且该方法是在目标线程
signaling_thread_运行的,具体的原理见WebRTC源码分析-线程基础之跨线程同步MethodCall。
如此,基本的原理就阐述完毕了。
4 更进一步
我们看到PeerConnectionFactory的所有方法都是需要在信令线程中执行,因此相对来说还比较简单。但是我们会遇到这种情况:一个类的某些方法需要运行在信令线程上,令一些方法需要运行在工作线程上。这时该如何是好?
同样,在api/proxy类中,另外两个宏配合在一起提供了这个功能:
BEGIN_PROXY_MAP确保了可以同时传入信令线程和工作线程
#define BEGIN_PROXY_MAP(c) \
PROXY_MAP_BOILERPLATE(c) \
WORKER_PROXY_MAP_BOILERPLATE(c) \
REFCOUNTED_PROXY_MAP_BOILERPLATE(c) \
public: \
static rtc::scoped_refptr<c##ProxyWithInternal> Create( \
rtc::Thread* signaling_thread, rtc::Thread* worker_thread, \
INTERNAL_CLASS* c) { \
return new rtc::RefCountedObject<c##ProxyWithInternal>(signaling_thread, \
worker_thread, c); \
}
PROXY_WORKER_METHOD0确保了方法在工作者线程上执行
#define PROXY_WORKER_METHOD0(r, method) \
r method() override { \
MethodCall0<C, r> call(c_, &C::method); \
return call.Marshal(RTC_FROM_HERE, worker_thread_); \
}
结束了否?未完呢~~还有另外一个宏,虽然说不太重要,但是揭露了WebRTC中一个普遍的事实:owned这个词在WebRTC中反复多次出现某些变量名中,意义在于告知:该对象是由我持有的,它的生命周期由我控制,因此,对象的销毁我来做吧~
这个宏如下所示:
#define OWNED_PROXY_MAP_BOILERPLATE(c) \
public: \
~c##ProxyWithInternal() { \
MethodCall0<c##ProxyWithInternal, void> call( \
this, &c##ProxyWithInternal::DestroyInternal); \
call.Marshal(RTC_FROM_HERE, destructor_thread()); \
} \
\
private: \
void DestroyInternal() { delete c_; } \
INTERNAL_CLASS* c_;
与之对应的是:
#define REFCOUNTED_PROXY_MAP_BOILERPLATE(c) \
protected: \
~c##ProxyWithInternal() { \
MethodCall0<c##ProxyWithInternal, void> call( \
this, &c##ProxyWithInternal::DestroyInternal); \
call.Marshal(RTC_FROM_HERE, destructor_thread()); \
} \
\
private: \
void DestroyInternal() { c_ = nullptr; } \
rtc::scoped_refptr<INTERNAL_CLASS> c_;
精彩之处在于内部成员c_的类别,和销毁方式:
- owned方式,成员为
INTERNAL_CLASS* c_,销毁的方式是delete c_;直接删除对象。 - refcounted方式,成员为
scoped_refptr<INTERNAL_CLASS> c_,销毁方式是c_ = nullptr;解除引用。
总结
- 本文阐述了WebRTC中,为了防止线程乱入所做的工作:通过Proxy代理对象,将所有操作代理到对应的线程上执行对应的方法;
- 详细的拆解了PeerConnectionFactory的公有方法均需要在信令线程上执行时,是如何被代理的;
- 进一步地,如果有每个类的方法一部分需要在信令线程上执行,一部分需要在工作者线程上执行,这种情况如何被代理;
- 未决问题:是否存在某些类的某些方法需要在网络线程上执行? 此时,WebRTC是如何做的呢?直接复用信令线程的实现嘛?