协程
跨语言理解协程的含义
2024.06.26
趣闻
2023 年 10 月,Rust 团队不惜 修改 3800 行代码 ,只为将 generator 替换成 coroutine,有趣的是,粗鲁的文本替换过程中产生了一些小错误。希望本文能解释清楚协程相关的概念,避免读者重蹈覆辙。
协程的定义
20 世纪 60 年代初,Melvin Conway 博士开创性的提出了协程(coroutine)这一概念,即协作式程序(cooperative routine)。随着技术的发展,出现了很多在设计思想上或实现形式上与协程类似的概念,不管它们是否自称为协程。如今,协程被定义为:一种可以暂停和恢复的计算机程序组件,它使得函数具备协作式多任务的能力。这个定义可能有些抽象,没关系,让我们从一个简单的例子开始,你只需要记住,协程的核心是“协作”二字。另外,协程一般指的是参与协作的对象,也就是可以暂停和恢复的,比线程更小粒度的可调度单元,在代码里主要体现为函数或者对象。
初识协程
下面是一个 JavaScript 生成器的例子。test 函数首先给调用方传递一个 hello,然后在 yield 处暂停执行,等待接收对方传递过来的 world,然后恢复执行,将其输出。双方在一种默契的配合中交替执行,这就是所谓的协作,这也是协程的核心特征。
function* test() {
var value = yield 'hello';
console.log(value); // output: world
}
var co = test();
result = co.next();
console.log(result.value) // output: hello
co.next('world');
顺便来看一个 Python 生成器的版本,几乎一模一样。
def test():
value = yield 'hello'
print(value) # output: world
yield
co = test()
result = next(co)
print(result) # output: hello
co.send('world')
接下来看一个 Lua 的版本,唯一的差别是需要将 test 函数转换为协程对象。与前面的生成器不同,这个协程属于有栈协程,后面会详细解释这一概念。
function test()
local value = coroutine.yield('hello')
print(value) -- output: world
end
local co = coroutine.create(test)
local status, result = coroutine.resume(co)
print(result) -- output: hello
coroutine.resume(co, 'world')
再来看一个 PHP 版本,与 Lua 版本相比,基本上就是换了一个叫纤程(fiber)的名字而已。
<?php
function test() {
$value = Fiber::suspend('hello');
echo "$value\n"; // output: world
}
$co = new Fiber('test');
$result = $co->start();
echo "$result\n"; // output: hello
$co->resume('world');
普通函数之间的关系就像是老板与员工之间的关系,老板交代员工一个任务,然后等待员工给他汇报结果即可。而在协程中,函数直间的关系就像两个同事合作完成某项任务。
纵观协程
生成器(generator)和异步(asynchronous)函数是最常见的两种协程。除此之外,还有很多手段可以暂停和恢复函数的执行,如纤程、继续(continuation)、非局部跳转等,它们都可以实现协程(纤程本身也可以视为协程)。生成器和异步函数分别是无栈(stackless)协程和有栈(stackful)协程的典型代表。有栈指的是有独立的调用栈,特征是恢复方的调用栈不会出现在它的调用栈中,因为它自己是一个独立的任务(类似于一个线程)。
这里收集了一些编程语言对协程的支持。除 Python、Kotlin 和 Rust 将异步函数称为协程外,大部分编程语言并未引入协程这一概念。毕竟缺乏这一概念并不会影响编程语言的完备性,我们之所以能将它们通通归为协程,不过是解题思路的殊途同归罢了。
生成器(迭代器) | 异步函数 | 协程技术 | |
---|---|---|---|
C | setjmp; ucontext | ||
C++ | C++20 coroutine | ||
C# | yield return | async/await | |
Go | |||
Java | |||
JavaScript | yield | async/await | |
Kotlin | iterator/yield() | suspend/.await() | |
Lua | coroutine | ||
PHP | yield | Fiber | |
Python | yield/.send | async/await | |
Ruby | .yield | Fiber; callcc | |
Rust | yield | async/.await |
无栈协程的实现(C 语言)
这个例子展示了无栈协程的核心逻辑——状态机。coroutine_t 中的 next 保存了函数暂停的位置,var 则保存了函数中使用的局部变量。使用 switch 语句跳转到需要恢复执行的位置,看似平平无奇,它的可怕之处在于能嵌入循环语句中,这个方法被称为达夫设备(Duff's device)。co_yield 中使用 `__COUNTER__` 宏自动生成标签, PuTTY 的作者 使用的是 `__LINE__`,但这会和 VS 的“编辑并继续”功能产生冲突。
#include <stdio.h>
#include <stdlib.h>
typedef struct {
int next;
void *var;
} coroutine_t;
#define co_new() { 0 }
#define co_var(name, members) \
struct { members; } *name = NULL; \
if (!co->var) \
co->var = malloc(sizeof(*name)); \
v = co->var;
#define co_begin(co) switch (co->next) { case 0:
#define co_end(co, ret) } return ret;
#define co_yield(co, val) \
co->next = __COUNTER__ + 1; return val; case __COUNTER__:
static inline co_free(coroutine_t *co)
{
if (co->var) {
free(co->var);
co->var = NULL;
}
}
void output(int value)
{
printf("%d\n", value);
}
int test(coroutine_t *co, int arg)
{
co_var(v, int i);
co_begin(co)
for (v->i = 1; v->i <= 3; v->i += 2) {
co_yield(co, v->i);
output(arg); // output: 2, 4
}
co_yield(co, 5);
co_end(co, 0)
}
int main()
{
coroutine_t co = co_new();
output(test(&co, 0)); // output: 1
output(test(&co, 2)); // output: 3
output(test(&co, 4)); // output: 5
co_free(&co);
}
有栈协程的实现(C 语言)
这个例子展示了有栈协程的核心技术——用户态的线程上下文切换。程序主要由调度器 scheduler_t 和任务 task_t 这两部分组成,执行权在它们之间不停的切换。使用了少量内联汇编来实现对寄存器的访问,只支持 Windows 的 32 位平台。完全理解这段代码可能需要一些 x86 汇编和调用栈的基础知识。生产环境中一般直接使用 Window Fiber API 实现,这里主要是为了展示原理。
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <setjmp.h>
#include <assert.h>
#include <intrin.h>
#pragma warning(disable: 4731) // modified ebp
#define TASK_STACK_SIZE 1024
typedef struct {
jmp_buf context;
char *stack_frame;
} scheduler_t;
typedef struct task_s {
void (*entry)(struct task_s*, void*);
void *user_arg;
scheduler_t *scheduler;
bool started;
bool finished;
void *context[5];
char stack[TASK_STACK_SIZE];
size_t stack_size;
void *stack_frame;
} task_t;
void task_yield_impl(task_t *self)
{
self->context[0] = _ReturnAddress();
longjmp(self->scheduler->context, 1);
}
void task_yield(task_t *self)
{
// save registers
void **ctx = self->context;
__asm {
mov eax, ctx
mov [eax + 4], edi
mov [eax + 8], esi
mov [eax + 12], ebp
mov [eax + 16], esp
}
// save stack
char dummy = 0;
char *this_frame = &dummy;
self->stack_frame = this_frame;
char *base_frame = self->scheduler->stack_frame;
self->stack_size = base_frame - this_frame;
assert(self->stack_size <= sizeof(self->stack));
memcpy(self->stack, this_frame, self->stack_size);
task_yield_impl(self);
__asm {
mov eax, ctx
mov esp, [eax + 16]
}
}
void task_resume(task_t *self)
{
// this frame will be overwrote, so use registers
void **ctx = self->context;
__asm mov esi, ctx
char dummy[1024]; // avoid overwriting return address
memcpy(dummy, "", 1); // avoid being optimized away
// restore stack
memcpy(self->stack_frame, self->stack, self->stack_size);
// restore registers
__asm {
mov eax, esi
mov edi, [eax + 4]
mov esi, [eax + 8]
mov ebp, [eax + 12]
mov esp, [eax + 16]
jmp dword ptr [eax]
}
}
void scheduler_switch(scheduler_t *self, task_t *task)
{
char dummy = 0;
if (!self->stack_frame)
self->stack_frame = &dummy;
if (setjmp(self->context))
return;
if (task->started)
task_resume(task);
else {
task->started = true;
task->entry(task, task->user_arg);
task->finished = true;
}
}
void scheduler_run(scheduler_t *self, task_t *tasks, int number)
{
bool poll = true;
while (poll) {
poll = false;
for (int i = 0; i < number; ++i) {
task_t *task = tasks + i;
if (!task->scheduler)
task->scheduler = self;
if (task->finished)
continue;
if (!poll)
poll = true;
scheduler_switch(self, task);
}
}
}
void test(task_t *task, void *arg)
{
for (int i = 0; i < 3; ++i) {
printf("%s: %d\n", (char*)arg, i);
task_yield(task);
}
}
int main()
{
task_t tasks[] = {
{ test, "foo" },
{ test, "bar" },
};
scheduler_t scheduler = { 0 };
scheduler_run(&scheduler, tasks, _countof(tasks));
}
对称协程
对称(symmetric)协程是指协程之间身份对等,没有调用或恢复关系,即让出执行权到中立的调度器(scheduler),而不是调用者(caller)或恢复者(resumer)。对称协程通常只有一种让出方式,而非对称(asymmetric)协程有两种让出方式,比如 yield/resume 组合。从技术上讲,二者可以互相实现:对称协程可以事先保存调用方信息再切换回去,非对称协程则可以引入一个分发(dispatcher)协程。生成器属于非对称协程,异步函数属于对称协程。另外,非对称协程也叫半协程(semi-coroutine)或半对称(semi-symmetric)协程。
异步函数的传染性
由于协程可以实现单个线程多任务并发执行,并且在形式上将异步编程模型同步化了,避免了可怕的回调地狱,所以很多语言陆续加入了这一特性。然而这也导致同时存在两种类型的函数,它们各自为政,不能混用,比如不能在普通函数里面使用 await,比如不能在异步函数里调用阻塞线程的函数,否则会影响当前线程里的所有任务(饥饿问题)。这导致一个函数的类型会感染整个调用链,这也被称作 函数着色问题 。问题的本质在于这两种函数分别对应于两种不同的运行容器,即线程与协程,除非某种编程语言只有一种运行容器,比如 Go 语言里的 goroutine。顺便一提,在支持非协作抢占式(non-cooperative preemption)调度后(Go >= 1.14)的 goroutine 不再算作协程了。
单电子假说
科学家们发现,在微观时间里,同一类粒子之间无法区分,它们被称为全同粒子。约翰·惠勒提出:会不会世界上只有一个电子,它们在时间线里穿梭,营造成有很多电子的假象。类似的,协程也是执行流程在不同函数之间穿梭,营造成多个函数同时在执行的错觉。