协程系列
1.干货写在前面
协程是一种用户态的轻量级线程。本篇主要研究协程的C/C++的实现。
首先我们可以看看有哪些语言已经具备协程语义:
- 比较重量级的有C#、erlang、golang*
- 轻量级有python、lua、javascript、ruby
- 还有函数式的scala、scheme等。
c/c++不直接支持协程语义,但有不少开源的协程库,如:
Protothreads:一个“蝇量级” C 语言协程库
libco:来自腾讯的开源协程库libco介绍,官网
coroutine:云风的一个C语言同步协程库,详细信息
目前看到大概有四种实现协程的方式:
- 第一种:利用glibc 的 ucontext组件(云风的库)
- 第二种:使用汇编代码来切换上下文(实现c协程)
- 第三种:利用C语言语法switch-case的奇淫技巧来实现(Protothreads)
- 第四种:利用了 C 语言的 setjmp 和 longjmp( 一种协程的 C/C++ 实现,要求函数里面使用 static local 的变量来保存协程内部的数据)
本篇主要使用ucontext来实现简单的协程库。
2.ucontext初接触
利用ucontext提供的四个函数getcontext(),setcontext(),makecontext(),swapcontext()可以在一个进程中实现用户级的线程切换。
本节我们先来看ucontext实现的一个简单的例子:
#include <stdio.h>
#include <ucontext.h>
#include <unistd.h>
int main(int argc, const char *argv[]){
ucontext_t context;
getcontext(&context);
puts("Hello world");
sleep(1);
setcontext(&context);
return 0;
}
注:示例代码来自维基百科.
保存上述代码到example.c,执行编译命令:
gcc example.c -o example
想想程序运行的结果会是什么样?
cxy@ubuntu:~$ ./example
Hello world
Hello world
Hello world
Hello world
Hello world
Hello world
Hello world
^C
cxy@ubuntu:~$
上面是程序执行的部分输出,不知道是否和你想得一样呢?我们可以看到,程序在输出第一个“Hello world"后并没有退出程序,而是持续不断的输出”Hello world“。其实是程序通过getcontext先保存了一个上下文,然后输出"Hello world",在通过setcontext恢复到getcontext的地方,重新执行代码,所以导致程序不断的输出”Hello world“,在我这个菜鸟的眼里,这简直就是一个神奇的跳转。
那么问题来了,ucontext到底是什么?
3.ucontext组件到底是什么
在类System V环境中,在头文件< ucontext.h > 中定义了两个结构类型,mcontext_t和ucontext_t和四个函数getcontext(),setcontext(),makecontext(),swapcontext().利用它们可以在一个进程中实现用户级的线程切换。
mcontext_t类型与机器相关,并且不透明.ucontext_t结构体则至少拥有以下几个域:
typedef struct ucontext {
struct ucontext *uc_link;
sigset_t uc_sigmask;
stack_t uc_stack;
mcontext_t uc_mcontext;
...
} ucontext_t;
当当前上下文(如使用makecontext创建的上下文)运行终止时系统会恢复uc_link指向的上下文;uc_sigmask为该上下文中的阻塞信号集
合;uc_stack为该上下文中使用的栈;uc_mcontext保存的上下文的特定机器表示,包括调用线程的特定寄存器等。
下面详细介绍四个函数:
int getcontext(ucontext_t *ucp);
初始化ucp结构体,将当前的上下文保存到ucp中
int setcontext(const ucontext_t *ucp);
设置当前的上下文为ucp,setcontext的上下文ucp应该通过getcontext或者makecontext取得,如果调用成功则不返回。如果上下文是通过调用getcontext()取得,程序会继续执行这个调用。如果上下文是通过调用makecontext取得,程序会调用makecontext函数的第二个参数指向的函数,如果func函数返回,则恢复makecontext第一个参数指向的上下文第一个参数指向的上下文context_t中指向的uc_link.如果uc_link为NULL,则线程退出。
void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...);
makecontext修改通过getcontext取得的上下文ucp(这意味着调用makecontext前必须先调用getcontext)。然后给该上下文指定一个栈空间ucp->stack,设置后继的上下文ucp->uc_link.
当上下文通过setcontext或者swapcontext激活后,执行func函数,argc为func的参数个数,后面是func的参数序列。当func执行返回后,继承的上下文被激活,如果继承上下文为NULL时,线程退出。
int swapcontext(ucontext_t *oucp, ucontext_t *ucp);
保存当前上下文到oucp结构体中,然后激活upc上下文。
如果执行成功,getcontext返回0,setcontext和swapcontext不返回;如果执行失败,getcontext,setcontext,swapcontext返回-1,并设置对于的errno.
简单说来, getcontext获取当前上下文,setcontext设置当前上下文,swapcontext切换上下文,makecontext创建一个新的上下文。
4.小试牛刀-使用ucontext组件实现线程切换
虽然我们称协程是一个用户态的轻量级线程,但实际上多个协程同属一个线程。任意一个时刻,同一个线程不可能同时运行两个协程。如果我们将协程的调度简化为:主函数调用协程1,运行协程1直到协程1返回主函数,主函数在调用协程2,运行协程2直到协程2返回主函数。示意步骤如下:
执行主函数
切换:主函数 --> 协程1
执行协程1
切换:协程1 --> 主函数
执行主函数
切换:主函数 --> 协程2
执行协程2
切换协程2 --> 主函数
执行主函数
...
这种设计的关键在于实现主函数到一个协程的切换,然后从协程返回主函数。这样无论是一个协程还是多个协程都能够完成与主函数的切换,从而实现协程的调度。
实现用户线程的过程是:
- 我们首先调用getcontext获得当前上下文
- 修改当前上下文ucontext_t来指定新的上下文,如指定栈空间极其大小,设置用户线程执行完后返回的后继上下文(即主函数的上下文)等
- 调用makecontext创建上下文,并指定用户线程中要执行的函数
- 切换到用户线程上下文去执行用户线程(如果设置的后继上下文为主函数,则用户线程执行完后会自动返回主函数)。
下面代码context_test函数完成了上面的要求。
#include <ucontext.h>
#include <stdio.h>
void func1(void * arg)
{
puts("1");
puts("11");
puts("111");
puts("1111");
}
void context_test()
{
char stack[1024*128];
ucontext_t child,main;
getcontext(&child); //获取当前上下文
child.uc_stack.ss_sp = stack;//指定栈空间
child.uc_stack.ss_size = sizeof(stack);//指定栈空间大小
child.uc_stack.ss_flags = 0;
child.uc_link = &main;//设置后继上下文
makecontext(&child,(void (*)(void))func1,0);//修改上下文指向func1函数
swapcontext(&main,&child);//切换到child上下文,保存当前上下文到main
puts("main");//如果设置了后继上下文,func1函数指向完后会返回此处
}
int main()
{
context_test();
return 0;
}
在context_test中,创建了一个用户线程child,其运行的函数为func1.指定后继上下文为main func1返回后激活后继上下文,继续执行主函数。
保存上面代码到example-switch.cpp.运行编译命令:
g++ example-switch.cpp -o example-switch
执行程序结果如下
cxy@ubuntu:~$ ./example-switch
1
11
111
1111
main
cxy@ubuntu:~$
你也可以通过修改后继上下文的设置,来观察程序的行为。如修改代码
child.uc_link = &main;
为
child.uc_link = NULL;
再重新编译执行,其执行结果为:
cxy@ubuntu:~$ ./example-switch
1
11
111
1111
cxy@ubuntu:~$
可以发现程序没有打印"main",执行为func1后直接退出,而没有返回主函数。可见,如果要实现主函数到线程的切换并返回,指定后继上下文是非常重要的。
5.使用ucontext实现自己的线程库
掌握了上一节从主函数到协程的切换的关键,我们就可以开始考虑实现自己的协程了。
定义一个协程的结构体如下:
typedef void (*Fun)(void *arg);
typedef struct uthread_t
{
ucontext_t ctx;
Fun func;
void *arg;
enum ThreadState state;
char stack[DEFAULT_STACK_SZIE];
}uthread_t;
ctx保存协程的上下文,stack为协程的栈,栈大小默认为DEFAULT_STACK_SZIE=128Kb.你可以根据自己的需求更改栈的大小。func为协程执行的用户函数,arg为func的参数,state表示协程的运行状态,包括FREE,RUNNABLE,RUNING,SUSPEND,分别表示空闲,就绪,正在执行和挂起四种状态。
在定义一个调度器的结构体
typedef std::vector<uthread_t> Thread_vector;
typedef struct schedule_t
{
ucontext_t main;
int running_thread;
Thread_vector threads;
schedule_t():running_thread(-1){}
}schedule_t;
调度器包括主函数的上下文main,包含当前调度器拥有的所有协程的vector类型的threads,以及指向当前正在执行的协程的编号running_thread.如果当前没有正在执行的协程时,running_thread=-1.
接下来,在定义几个使用函数uthread_create,uthread_yield,uthread_resume函数已经辅助函数schedule_finished.就可以了。
int uthread_create(schedule_t &schedule,Fun func, void *arg);
创建一个协程,该协程的会加入到schedule的协程序列中,func为其执行的函数,arg为func的执行函数。返回创建的线程在schedule中的编号。
void uthread_yield(schedule_t &schedule);
挂起调度器schedule中当前正在执行的协程,切换到主函数。
void uthread_resume(schedule_t &schedule, int id);
恢复运行调度器schedule中编号为id的协程
int schedule_finished(const schedule_t &schedule);
判断schedule中所有的协程是否都执行完毕,是返回1,否则返回0.注意:如果有协程处于挂起状态时算作未全部执行完毕,返回0.
代码就不全贴出来了,我们来看看两个关键的函数的具体实现。首先是uthread_resume函数:
void uthread_resume(schedule_t &schedule , int id)
{
if(id < 0 || id >= schedule.threads.size()){
return;
}
uthread_t *t = &(schedule.threads[id]);
switch(t->state){
case RUNNABLE:
getcontext(&(t->ctx));
t->ctx.uc_stack.ss_sp = t->stack;
t->ctx.uc_stack.ss_size = DEFAULT_STACK_SZIE;
t->ctx.uc_stack.ss_flags = 0;
t->ctx.uc_link = &(schedule.main);
t->state = RUNNING;
schedule.running_thread = id;
makecontext(&(t->ctx),(void (*)(void))(uthread_body),1,&schedule);
/* !! note : Here does not need to break */
case SUSPEND:
swapcontext(&(schedule.main),&(t->ctx));
break;
default: ;
}
}
如果指定的协程是首次运行,处于RUNNABLE状态,则创建一个上下文,然后切换到该上下文。如果指定的协程已经运行过,处于SUSPEND状态,则直接切换到该上下文即可。代码中需要注意RUNNBALE状态的地方不需要break.
void uthread_yield(schedule_t &schedule)
{
if(schedule.running_thread != -1 ){
uthread_t *t = &(schedule.threads[schedule.running_thread]);
t->state = SUSPEND;
schedule.running_thread = -1;
swapcontext(&(t->ctx),&(schedule.main));
}
}
uthread_yield挂起当前正在运行的协程。首先是将running_thread置为-1,将正在运行的协程的状态置为SUSPEND,最后切换到主函数上下文。
更具体的代码我已经放到github上,点击这里。
6.最后一步-使用我们自己的协程库
保存下面代码到example-uthread.cpp.
#include "uthread.h"
#include <stdio.h>
void func2(void * arg)
{
puts("22");
puts("22");
uthread_yield(*(schedule_t *)arg);
puts("22");
puts("22");
}
void func3(void *arg)
{
puts("3333");
puts("3333");
uthread_yield(*(schedule_t *)arg);
puts("3333");
puts("3333");
}
void schedule_test()
{
schedule_t s;
int id1 = uthread_create(s,func3,&s);
int id2 = uthread_create(s,func2,&s);
while(!schedule_finished(s)){
uthread_resume(s,id2);
uthread_resume(s,id1);
}
puts("main over");
}
int main()
{
schedule_test();
return 0;
}
执行编译命令并运行:
g++ example-uthread.cpp -o example-uthread
./example-uthread
运行结果如下:
cxy@ubuntu:~/mythread$./example-uthread
22
22
3333
3333
22
22
3333
3333
main over
cxy@ubuntu:~/mythread$
可以看到,程序协程func2,然后切换到主函数,在执行协程func3,再切换到主函数,又切换到func2,在切换到主函数,再切换到func3,最后切换到主函数结束。
总结一下,我们利用getcontext和makecontext创建上下文,设置后继的上下文到主函数,设置每个协程的栈空间。在利用swapcontext在主函数和协程之间进行切换。
到此,使用ucontext做一个自己的协程库就到此结束了。相信你也可以自己完成自己的协程库了。
最后,代码我已经放到github上,点击这里。
原文出处:协程及libco介绍
libco 是腾讯开源的一个协程库,主要应用于微信后台RPC框架,下面我们从为什么使用协程、如何实现协程、libco使用等方面了解协程和libco。
why协程
为什么使用协程,我们先从server框架的实现说起,对于client-server的架构,server最简单的实现:
while(1) {accept();recv();do();send();}
串行地接收连接、读取请求、处理、应答,该实现弊端显而易见,server同一时间只能为一个客户端服务。
为充分利用好多核cpu进行任务处理,我们有了多进程/多线程的server框架,这也是server最常用的实现方式:
accept进程 - n个epoll进程 - n个worker进程
- accpet进程处理到来的连接,并将fd交给各个epoll进程
- epoll进程对各fd设置监控事件,当事件触发时通过共享内存等方式,将请求传给各个worker进程
- worker进程负责具体的业务逻辑处理并回包应答
以上框架以事件监听、进程池的方式,解决了多任务处理问题,但我们还可以对其作进一步的优化。
进程/线程是Linux内核最小的调度单位,一个进程在进行io操作时 (常见于分布式系统中RPC远程调用),其所在的cpu也处于iowait状态。直到后端svr返回,或者该进程的时间片用完、进程被切换到就绪态。是否可以把原本用于iowait的cpu时间片利用起来,发生io操作时让cpu处理新的请求,以提高单核cpu的使用率?
协程在用户态下完成切换,由程序员完成调度,结合对socket类/io操作类函数挂钩子、添加事件监听,为以上问题提供了解决方法。
用户态下上下文切换
Linux提供了接口用于用户态下保存进程上下文信息,这也是实现协程的基础:
- getcontext(ucontext_t *ucp): 获取当前进程/线程上下文信息,存储到ucp中
- makecontext(ucontext_t ucp, void (func)(), int argc, ...): 将func关联到上下文ucp
- setcontext(const ucontext_t *ucp): 将上下文设置为ucp
- swapcontext(ucontext_t oucp, ucontext_t ucp): 进行上下文切换,将当前上下文保存到oucp中,切换到ucp
以上函数与保存上下文的 ucontext_t 结构都在 ucontext.h 中定义,ucontext_t 结构中,我们主要关心两个字段:
- struct ucontext *uc_link: 协程后继上下文
- stack_t uc_stack: 保存协程数据的栈空间
stack_t 结构用于保存协程数据,该空间需要事先分配,我们主要关注该结构中的以下两个字段:
- void __user *ss_sp: 栈头指针
- size_t ss_size: 栈大小
获取进程上下文并切换的方法,总结有以下几步:
- 调用 getcontext(),获取当前上下文
- 预分配栈空间,设置 xxx.uc_stack.ss_sp 和 xxx.uc_stack.ss_size 的值
- 设置后继上下文环境,即设置 xxx.uc_link 的值
- 调用 makecontext(),变更上下文环境
- 调用 swapcontext(),完成跳转
Socket族函数/io异步处理
当进程使用socket族函数 (connect/send/recv等)、io函数 (read/write等),我们使用协程切换任务前,需对相应的fd设置监听事件,以便io完成后原有逻辑继续执行。
对io函数,我们可以事先设置钩子,在真正调用接口前,对相应fd设置事件监听。同样,Linux为我们设置钩子提供了接口,以read()函数为例:
- 编写名字为 read() 的函数,该函数先对fd调用epoll函数设置事件监听
- read() 中使用dlsym(),调用真正的 read()
- 将编写好的文件打包,编译成库文件:gcc -shared -Idl -fPIC prog2.c -o libprog2.so
- 执行程序时引用以上库文件:LD_PRELOAD=/home/qspace/lib/libprog2.so ./prog
当在prog程序中调用 read() 时,使用的就是我们实现的 read() 函数。
对于glibc函数设置钩子的方法,可参考:Let's Hook a Librarg Function
libco
有了以上准备工作,我们可以构建这样的server框架:
accept进程 - epoll进程(n个epoll协程) - n个worker进程(每个worker进程n个worker协程)
该框架下,接收请求、业务逻辑处理、应答都可以看做单独的任务,相应的epoll、worker协程事先分配,服务流程如下:
- mainloop主循环,负责 i/监听请求事件,有请求则拉起一个worker协程处理;ii/如果timeout时间内没有请求,则处理就绪协程(即io操作已返回)
- worker协程,如果遇到io操作则挂起,对fd加监听事件,让出cpu
libco 提供了以下接口:
- co_create: 创建协程,可在程序启动时创建各任务协程
- co_yield: 协程主动让出cpu,调io操作函数后调用
- co_resume: io操作完成后(触发相应监听事件)调用,使协程继续往下执行
socket族函数(socket/connect/sendto/recv/recvfrom等)、io函数(read/write)在libco的co_hook_sys_call.cpp中已经重写,以read为例:
ssize_t read( int fd, void *buf, size_t nbyte )
{
struct pollfd pf = { 0 };
pf.fd = fd;
pf.events = ( POLLIN | POLLERR | POLLHUP );
int pollret = poll( &pf,1,timeout ); /*对相应fd设置监听事件*/
ssize_t readret = g_sys_read_func( fd,(char*)buf ,nbyte ); /*真正调用read()*/
return readret;
}
小结
由最简单的单任务处理,到多进程/多线程(并行),再到协程(异步),server在不断地往极致方向优化,以更好地利用硬件性能的提升(多核cpu的出现、单核cpu性能不断提升)。
对程序员而言,可时常检视自己的程序,是否做好并行与异步,在硬件性能提升时,程序服务能力可不可以有相应比例的提升。
原文出处:一种协程的C/C++实现
介绍
在前几天接触到了协程的概念,觉得很有趣。因为我可以使用一个线程来实现一个类似多线程的程序,如果使用协程来替代线程,就可以省去很多原子操作和内存栅栏的麻烦,大大减少与线程同步相关的系统调用。因为我只有一个线程,而且协程之间的切换是可以由函数自己决定的。
我有见过几种协程的实现,因为没有 C/C++ 的原生支持,所以多数的库使用了汇编代码,还有些库利用了 C 语言的 setjmp 和 longjmp 但是要求函数里面使用static local 的变量来保存协程内部的数据。我讨厌写汇编和使用 static local变量,所以想出了一种稍微优雅一点又有点奇技淫巧的实现方法。 这篇文章将向你展示这种方法基本原理和实现。
基本原理
用 C/C++ 实现的最大困难就是创建,保存和恢复程序的上下文。因为这涉及到了程序栈的管理,以及 CPU 寄存器的访问,但是这两项内容在C/C++标准里面都没有严格的定义,所以我们是不可能有一个完全跨平台的C/C++实现的。但是利用操作系统提供的API,我们仍然可以避免使用汇编代码,接下来会向你展示使用 POSIX 的 pthread实现的一种简单的协程框架。什么!??Pthread?那你的程序岂不是多线程了?那还叫协程吗!没错,确实是多线程的,不过仅仅是在协程被创建之前的短暂瞬间。
要创建子程序的上下文,我们可以调用 pthread_create 函数来创建一个真正的线程,这样操作系统就会帮我们创建上下文(这里包括初始化 CPU寄存器和程序栈)。然后在线程启动时,使用 C 语言的 setjmp 把这些寄存器备份到外部的 buffer里面。创建完后,这个线程便失去了它的存在价值,所以可以果断干掉它了。不过还需要注意一点,就是在创建线程之前,需要调用pthread_attr_setstack函数来显式地声明使用的程序栈,这样线程退出的时候,系统就不会自动销毁这个程序栈。至于上下文的恢复,显然就是使用 longjmp 函数了。
创建上下文
下面是 RoutineInfo 的定义。为了简单起见,所有错误处理代码都被省略了,原版本的代码在 coroutine.cpp 文件中,省略版的代码在
coroutine_demonstration.cpp 文件中。
typedef void * (*RoutineHandler)(void*);
struct RoutineInfo{
void * param;
RoutineHandler handler;
void * ret;
bool stopped;
jmp_buf buf;
void *stackbase;
size_t stacksize;
pthread_attr_t attr;
// size: the stack size
RoutineInfo(size_t size){
param = NULL;
handler = NULL;
ret = NULL;
stopped = false;
stackbase = malloc(size);
stacksize = size;
pthread_attr_init(&attr);
if(stacksize)
pthread_attr_setstack(&attr,stackbase,stacksize);
}
~RoutineInfo(){
pthread_attr_destroy(&attr);
free(stackbase);
}
};
然后,我们需要一下全局的列表来保存这些 RoutineInfo 对象。
std::list<RoutineInfo*> InitRoutines(){
std::list<RoutineInfo*> list;
RoutineInfo *main = new RoutineInfo(0);
list.push_back(main);
return list;
}
std::list<RoutineInfo*> routines = InitRoutines();
接下来是协程的创建,注意当协程的时候,程序栈有可能已经被损坏了,所以需要一个 stackBack 作为程序栈的备份,用来做后面的恢复。
void *stackBackup = NULL;
void *CoroutineStart(void *pRoutineInfo);
int CreateCoroutine(RoutineHandler handler,void* param ){
RoutineInfo* info = new RoutineInfo(PTHREAD_STACK_MIN+ 0x4000);
info->param = param;
info->handler = handler;
pthread_t thread;
int ret = pthread_create( &thread, &(info->attr), CoroutineStart, info);
void* status;
pthread_join(thread,&status);
memcpy(info->stackbase,stackBackup,info->stacksize); // restore the stack
routines.push_back(info); // add the routine to the end of the list
return 0;
}
然后是 CoroutinneStart 函数。当线程进入这个函数的时候,使用 setjmp 保存上下文,然后备份它自己的程序栈,然后直接退出线程。
void Switch();
void *CoroutineStart(void *pRoutineInfo){
RoutineInfo& info = *(RoutineInfo*)pRoutineInfo;
if( !setjmp(info.buf)){
// back up the stack, and then exit
stackBackup = realloc(stackBackup,info.stacksize);
memcpy(stackBackup,info.stackbase, info.stacksize);
pthread_exit(NULL);
return (void*)0;
}
info.ret = info.handler(info.param);
info.stopped = true;
Switch(); // never return
return (void*)0; // suppress compiler warning
}
上下文切换
一个协程主动调用 Switch() 函数,才切换到另一个协程。
std::list<RoutineInfo*> stoppedRoutines = std::list<RoutineInfo*>();
void Switch(){
RoutineInfo* current = routines.front();
routines.pop_front();
if(current->stopped){
// The stack is stored in the RoutineInfo object,
// delete the object later, now know
stoppedRoutines.push_back(current);
longjmp( (*routines.begin())->buf ,1);
}
routines.push_back(current); // adjust the routines to the end of list
if( !setjmp(current->buf) ){
longjmp( (*routines.begin())->buf ,1);
}
if(stoppedRoutines.size()){
delete stoppedRoutines.front();
stoppedRoutines.pop_front();
}
}
演示
用户的代码很简单,就像使用一个线程库一样,一个协程主动调用 Switch() 函数主动让出 CPU 时间给另一个协程。
#include <iostream>
using namespace std;
#include <sys/wait.h>
void* foo(void*){
for(int i=0; i<2; ++i){
cout<<"foo: "<<i<<endl;
sleep(1);
Switch();
}
}
int main(){
CreateCoroutine(foo,NULL);
for(int i=0; i<6; ++i){
cout<<"main: "<<i<<endl;
sleep(1);
Switch();
}
}
记得在链接的时候加上 -lpthread 链接选项。程序的执行结果如下所示:
[roxma@VM_6_207_centos coroutine]$ g++ coroutime_demonstration.cpp -lpthread -o a.out
[roxma@VM_6_207_centos coroutine]$ ls
a.out coroutime.cpp coroutime_demonstration.cpp README.md
[roxma@VM_6_207_centos coroutine]$ ./a.out
main: 0
foo: 0
main: 1
foo: 1
main: 2
main: 3
main: 4
main: 5
原文及代码下载
https://github.com/roxma/cpp_learn/tree/master/cpp/linux_programming/coroutine