协程

跨语言理解协程的含义

2024.06.26

趣闻

2023 年 10 月,Rust 团队不惜 修改 3800 行代码 ,只为将 generator 替换成 coroutine,有趣的是,粗鲁的文本替换过程中产生了一些小错误。希望本文能解释清楚协程相关的概念,避免读者重蹈覆辙。

协程的定义

20 世纪 60 年代初,Melvin Conway 博士开创性的提出了协程(coroutine)这一概念,即协作式程序(cooperative routine)。随着技术的发展,出现了很多在设计思想上或实现形式上与协程类似的概念,不管它们是否自称为协程。如今,协程被定义为:一种可以暂停和恢复的计算机程序组件,它使得函数具备协作式多任务的能力。这个定义可能有些抽象,没关系,让我们从一个简单的例子开始,你只需要记住,协程的核心是“协作”二字。另外,协程一般指的是参与协作的对象,也就是可以暂停和恢复的,比线程更小粒度的可调度单元,在代码里主要体现为函数或者对象。

初识协程

下面是一个 JavaScript 生成器的例子。test 函数首先给调用方传递一个 hello,然后在 yield 处暂停执行,等待接收对方传递过来的 world,然后恢复执行,将其输出。双方在一种默契的配合中交替执行,这就是所谓的协作,这也是协程的核心特征。

function* test() {
    var value = yield 'hello';
    console.log(value); // output: world
}

var co = test();
result = co.next();
console.log(result.value) // output: hello
co.next('world');

顺便来看一个 Python 生成器的版本,几乎一模一样。

def test():
    value = yield 'hello'
    print(value)  # output: world
    yield

co = test()
result = next(co)
print(result)  # output: hello
co.send('world')

接下来看一个 Lua 的版本,唯一的差别是需要将 test 函数转换为协程对象。与前面的生成器不同,这个协程属于有栈协程,后面会详细解释这一概念。

function test()
    local value = coroutine.yield('hello')
    print(value) -- output: world
end

local co = coroutine.create(test)
local status, result = coroutine.resume(co)
print(result) -- output: hello
coroutine.resume(co, 'world')

再来看一个 PHP 版本,与 Lua 版本相比,基本上就是换了一个叫纤程(fiber)的名字而已。

<?php
function test() {
    $value = Fiber::suspend('hello');
    echo "$value\n"; // output: world
}

$co = new Fiber('test');
$result = $co->start();
echo "$result\n"; // output: hello
$co->resume('world');

普通函数之间的关系就像是老板与员工之间的关系,老板交代员工一个任务,然后等待员工给他汇报结果即可。而在协程中,函数直间的关系就像两个同事合作完成某项任务。

纵观协程

生成器(generator)和异步(asynchronous)函数是最常见的两种协程。除此之外,还有很多手段可以暂停和恢复函数的执行,如纤程、继续(continuation)、非局部跳转等,它们都可以实现协程(纤程本身也可以视为协程)。生成器和异步函数分别是无栈(stackless)协程和有栈(stackful)协程的典型代表。有栈指的是有独立的调用栈,特征是恢复方的调用栈不会出现在它的调用栈中,因为它自己是一个独立的任务(类似于一个线程)。

这里收集了一些编程语言对协程的支持。除 Python、Kotlin 和 Rust 将异步函数称为协程外,大部分编程语言并未引入协程这一概念。毕竟缺乏这一概念并不会影响编程语言的完备性,我们之所以能将它们通通归为协程,不过是解题思路的殊途同归罢了。

生成器(迭代器)异步函数协程技术
Csetjmp; ucontext
C++C++20 coroutine
C#yield returnasync/await
Go
Java
JavaScriptyieldasync/await
Kotliniterator/yield()suspend/.await()
Luacoroutine
PHPyieldFiber
Pythonyield/.sendasync/await
Ruby.yieldFiber; callcc
Rustyieldasync/.await

无栈协程的实现(C 语言)

这个例子展示了无栈协程的核心逻辑——状态机。coroutine_t 中的 next 保存了函数暂停的位置,var 则保存了函数中使用的局部变量。使用 switch 语句跳转到需要恢复执行的位置,看似平平无奇,它的可怕之处在于能嵌入循环语句中,这个方法被称为达夫设备(Duff's device)。co_yield 中使用 `__COUNTER__` 宏自动生成标签, PuTTY 的作者 使用的是 `__LINE__`,但这会和 VS 的“编辑并继续”功能产生冲突。

#include <stdio.h>
#include <stdlib.h>

typedef struct {
    int next;
    void *var;
} coroutine_t;

#define co_new() { 0 }
#define co_var(name, members)               \
    struct { members; } *name = NULL;       \
    if (!co->var)                           \
        co->var = malloc(sizeof(*name));    \
    v = co->var;

#define co_begin(co) switch (co->next) { case 0:
#define co_end(co, ret) } return ret;
#define co_yield(co, val) \
    co->next = __COUNTER__ + 1; return val; case __COUNTER__:

static inline co_free(coroutine_t *co)
{
    if (co->var) {
        free(co->var);
        co->var = NULL;
    }
}

void output(int value)
{
    printf("%d\n", value);
}

int test(coroutine_t *co, int arg)
{
    co_var(v, int i);
    co_begin(co)
    for (v->i = 1; v->i <= 3; v->i += 2) {
        co_yield(co, v->i);
        output(arg); // output: 2, 4
    }
    co_yield(co, 5);
    co_end(co, 0)
}

int main()
{
    coroutine_t co = co_new();
    output(test(&co, 0)); // output: 1
    output(test(&co, 2)); // output: 3
    output(test(&co, 4)); // output: 5
    co_free(&co);
}

有栈协程的实现(C 语言)

这个例子展示了有栈协程的核心技术——用户态的线程上下文切换。程序主要由调度器 scheduler_t 和任务 task_t 这两部分组成,执行权在它们之间不停的切换。使用了少量内联汇编来实现对寄存器的访问,只支持 Windows 的 32 位平台。完全理解这段代码可能需要一些 x86 汇编和调用栈的基础知识。生产环境中一般直接使用 Window Fiber API 实现,这里主要是为了展示原理。

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <setjmp.h>
#include <assert.h>
#include <intrin.h>

#pragma warning(disable: 4731) // modified ebp
#define TASK_STACK_SIZE 1024

typedef struct {
    jmp_buf context;
    char *stack_frame;
} scheduler_t;

typedef struct task_s {
    void (*entry)(struct task_s*, void*);
    void *user_arg;
    scheduler_t *scheduler;
    bool started;
    bool finished;
    void *context[5];
    char stack[TASK_STACK_SIZE];
    size_t stack_size;
    void *stack_frame;
} task_t;

void task_yield_impl(task_t *self)
{
    self->context[0] = _ReturnAddress();
    longjmp(self->scheduler->context, 1);
}

void task_yield(task_t *self)
{
    // save registers
    void **ctx = self->context;
    __asm {
        mov eax, ctx
        mov [eax + 4], edi
        mov [eax + 8], esi
        mov [eax + 12], ebp
        mov [eax + 16], esp
    }

    // save stack
    char dummy = 0;
    char *this_frame = &dummy;
    self->stack_frame = this_frame;

    char *base_frame = self->scheduler->stack_frame;
    self->stack_size = base_frame - this_frame;

    assert(self->stack_size <= sizeof(self->stack));
    memcpy(self->stack, this_frame, self->stack_size);

    task_yield_impl(self);
    __asm {
        mov eax, ctx
        mov esp, [eax + 16]
    }
}

void task_resume(task_t *self)
{
    // this frame will be overwrote, so use registers
    void **ctx = self->context;
    __asm mov esi, ctx

    char dummy[1024]; // avoid overwriting return address
    memcpy(dummy, "", 1); // avoid being optimized away

    // restore stack
    memcpy(self->stack_frame, self->stack, self->stack_size);

    // restore registers
    __asm {
        mov eax, esi
        mov edi, [eax + 4]
        mov esi, [eax + 8]
        mov ebp, [eax + 12]
        mov esp, [eax + 16]
        jmp dword ptr [eax]
    }
}

void scheduler_switch(scheduler_t *self, task_t *task)
{
    char dummy = 0;
    if (!self->stack_frame)
        self->stack_frame = &dummy;

    if (setjmp(self->context))
        return;

    if (task->started)
        task_resume(task);
    else {
        task->started = true;
        task->entry(task, task->user_arg);
        task->finished = true;
    }
}

void scheduler_run(scheduler_t *self, task_t *tasks, int number)
{
    bool poll = true;
    while (poll) {
        poll = false;
        for (int i = 0; i < number; ++i) {
            task_t *task = tasks + i;

            if (!task->scheduler)
                task->scheduler = self;

            if (task->finished)
                continue;

            if (!poll)
                poll = true;

            scheduler_switch(self, task);
        }
    }
}

void test(task_t *task, void *arg)
{
    for (int i = 0; i < 3; ++i) {
        printf("%s: %d\n", (char*)arg, i);
        task_yield(task);
    }
}

int main()
{
    task_t tasks[] = {
        { test, "foo" },
        { test, "bar" },
    };

    scheduler_t scheduler = { 0 };
    scheduler_run(&scheduler, tasks, _countof(tasks));
}

对称协程

对称(symmetric)协程是指协程之间身份对等,没有调用或恢复关系,即让出执行权到中立的调度器(scheduler),而不是调用者(caller)或恢复者(resumer)。对称协程通常只有一种让出方式,而非对称(asymmetric)协程有两种让出方式,比如 yield/resume 组合。从技术上讲,二者可以互相实现:对称协程可以事先保存调用方信息再切换回去,非对称协程则可以引入一个分发(dispatcher)协程。生成器属于非对称协程,异步函数属于对称协程。另外,非对称协程也叫半协程(semi-coroutine)或半对称(semi-symmetric)协程。

异步函数的传染性

由于协程可以实现单个线程多任务并发执行,并且在形式上将异步编程模型同步化了,避免了可怕的回调地狱,所以很多语言陆续加入了这一特性。然而这也导致同时存在两种类型的函数,它们各自为政,不能混用,比如不能在普通函数里面使用 await,比如不能在异步函数里调用阻塞线程的函数,否则会影响当前线程里的所有任务(饥饿问题)。这导致一个函数的类型会感染整个调用链,这也被称作 函数着色问题 。问题的本质在于这两种函数分别对应于两种不同的运行容器,即线程与协程,除非某种编程语言只有一种运行容器,比如 Go 语言里的 goroutine。顺便一提,在支持非协作抢占式(non-cooperative preemption)调度后(Go >= 1.14)的 goroutine 不再算作协程了。

单电子假说

科学家们发现,在微观时间里,同一类粒子之间无法区分,它们被称为全同粒子。约翰·惠勒提出:会不会世界上只有一个电子,它们在时间线里穿梭,营造成有很多电子的假象。类似的,协程也是执行流程在不同函数之间穿梭,营造成多个函数同时在执行的错觉。

(全文结束)