2

两百行Rust代码解析绿色线程原理(四)一个绿色线程的实现

 1 year ago
source link: https://zhuanlan.zhihu.com/p/101061389
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

两百行Rust代码解析绿色线程原理(四)一个绿色线程的实现

原文: Green threads explained in 200 lines of rust language
地址: https://cfsamson.gitbook.io/green-threads-explained-in-200-lines-of-rust/
作者: Carl Fredrik Samson([email protected])
翻译: 耿腾


在我们开始之前,我得提醒你我们编写的代码非常不安全,并且这也不是编写 Rust 代码的 “最佳实践”。我希望在不引入很多不必要的复杂性的前提下使其尽可能安全,所以如果你发现了更安全方法,又不会让我们的代码过于复杂,那么我鼓励亲爱的你为该项目提一个 RP(Pull Request)。

让我们开始吧

我们要做的第一件事就是在 main.rs 中删除我们的示例。我们从头开始并添加以下内容:

#![feature(asm)]
#![feature(naked_functions)]
use std::ptr;

const DEFAULT_STACK_SIZE: usize = 1024 * 1024 * 2;
const MAX_THREADS: usize = 4;
static mut RUNTIME: usize = 0;

我们启用了两个特性(feature),我们之前介绍过的 asm 功能,以及我们需要解释的 naked_functions 功能。

naked_functions

当 Rust 编译一个函数时,它会为每个函数添加一个小的序言和尾声,这会在我们切换上下文时给我们带来一些问题,因为我们最终得到了一个未对齐的栈。这在我们的第一个简单示例中运行良好,但是一旦我们需要再次切换回相同的栈,我们就会遇到麻烦。将某个函数标记为 #[naked] 会删除序言和结尾。此属性主要用于内联汇编。

如果你有兴趣,可以阅读 RFC #1201 中有关 naked_functions 功能的更多信息。

我们的 DEFAULT_STACK_SIZE 设置为 2 MB,这足够我们使用。我们还将 MAX_THREADS 设置为 4,因为我们的示例不需要太大。

最后一个常量 RUNTIME 是一个指向我们运行时的指针(是的,我知道,一个可变的全局变量不是很好,但我们稍后需要它,我们只在运行时初始化时设置这个变量)。

我们开始填上一些代表数据的内容:

pub struct Runtime {
    threads: Vec<Thread>,
    current: usize,
}

#[derive(PartialEq, Eq, Debug)]
enum State {
    Available,
    Running,
    Ready,
}

struct Thread {
    id: usize,
    stack: Vec<u8>,
    ctx: ThreadContext,
    state: State,
}

#[derive(Debug, Default)]
#[repr(C)]
struct ThreadContext {
    rsp: u64,
    r15: u64,
    r14: u64,
    r13: u64,
    r12: u64,
    rbx: u64,
    rbp: u64,
}

Runtime 将是我们的主要入口。我们会创建一个非常小的、简单的运行时来调度和切换我们的线程。运行时包含一个 Thread 的数组和一个 current 字段,以指示我们当前正在运行的线程。

Thread 保存线程的数据。每个线程都有一个 id,这样我们就可以将不同的线程区分开。栈类似于我们在前面章节中的第一个示例中看到的内容。 ctx 字段是一个上下文,表示我们的 CPU 需要在栈上剩余的位置恢复的数据,以及一个 state,它是我们的线程状态。

译者注:这里的 “线程” 以及 Thread 指的就是我们要实现的 “绿色线程(Grean Thread)”,而不是操作系统线程。

State 是一个枚举,代表我们的线程所处的状态:

  • Available 表示线程可用,并且可以根据需要分配任务;
  • Running 意味着线程正在运行;
  • Ready 意味着线程已准备好继续前进和恢复执行。

ThreadContext 保存 CPU 需要在栈上恢复执行的寄存器的数据。

如果你不记得,请返回 绪论及基本概念 一章以了解寄存器。这些是 x86-64 体系结构规范中标记为 “callee saved” 的寄存器。

让我们继续:

impl Thread {
    fn new(id: usize) -> Self {
        Thread {
            id,
            stack: vec![0_u8; DEFAULT_STACK_SIZE],
            ctx: ThreadContext::default(),
            state: State::Available,
        }
    }
}

这部分很简单。新线程在 Available 状态下启动,表示已准备好为其分配任务。

需要注意的一点是我们在这里分配了栈内存。这不是必需的,也不是资源的最佳使用方法,因为我们没有在首次使用时分配,,而为一个只是可能需要的线程分配了内存。不过,这降低了我们代码的复杂性,而我们的代码有比为栈分配内存更重要的关注点。

需要着重注意的是,一旦分配了栈,它就不能移动(move)!不应使用数组的 push() 或任何方法触发内存重分配。如果要把这部分代码写的更好一些,我们可以创建自己的类型,只暴露那些我们认为可以安全使用的方法。 值得一提的是,Vec<T> 有一个名为 into_boxed_slice() 的方法,它返回一个堆分配的切片 Box<[T]>。切片不能增长,所以如果我们改为使用它,我们可以避免重新分配问题。

实现运行时

此段中的所有代码都在 impl Runtime 代码块中,这意味着它们是 Runtime 结构上的方法。

impl Runtime {
    pub fn new() -> Self {
        // 这将是我们的基本线程, 会被初始化为
        // Running 状态
        let base_thread = Thread {
            id: 0,
            stack: vec![0_u8; DEFAULT_STACK_SIZE],
            ctx: ThreadContext::default(),
            state: State::Running,
        };

        let mut threads = vec![base_thread];
        let mut available_threads: Vec<Thread> = (1..MAX_THREADS).map(|i| Thread::new(i)).collect();
        threads.append(&mut available_threads);

        Runtime {
            threads,
            current: 0,
        }
    }

当我们实例化 Runtime 时,我们创建了一个基础线程。此线程将设置为 Running 状态,并确保我们保持运行时运行,直到所有任务完成。

然后我们实例化其余的线程并将当前线程设置为 0,也就是我们的基本线程。

/// 这里有点作弊,但是我们需要一个指向存储的 Runtime 的指针
    /// 这样即便我们没有它的引用,
    /// 也可以对其调用 yield.
    pub fn init(&self) {
        unsafe {
            let r_ptr: *const Runtime = self;
            RUNTIME = r_ptr as usize;
        }
    }

然后我们需要这样(译者注:为那个可变的全局变量赋值),正如我在定义常量时所提到的,我们需要以后能够调用 yield。代码有点难看,但我们知道只要有任何线程可以 yield,我们的运行时就会存活,只要我们不滥用它就可以安全地运行。

pub fn run(&mut self) -> ! {
        while self.t_yield() {}
        std::process::exit(0);
    }

这是我们开始启动运行时的地方。它将不断调用 t_yield() 直到返回 falsefalse 意味着没有工作要做了,我们可以退出进程。

fn t_return(&mut self) {
        if self.current != 0 {
            self.threads[self.current].state = State::Available;
            self.t_yield();
        }
    }

这是我们在线程完成时调用的返回函数。 return 是 Rust 中的另一个保留关键字,因此我们将此命名为 t_return()。请注意我们的线程用户不会调用它,我们设置栈,以便在任务完成时调用它。 如果调用线程是 base_thread(即基础线程),我们什么都不做。我们的运行时将在基础线程上为我们调用 yield。如果从生成的线程中调用它,我们知道它已经完成,因为所有线程都在它们的栈顶部有一个 guard 函数(我们将下面进一步展示),并且这个 t_return 函数唯一被调用的地方就是我们的 guard 函数。 我们将当前线程状态设置为 Available,让运行时知道它已准备好接受新任务,然后立即调用 t_yield,它将调度新的线程运行。

接下来是我们的 yield 函数:

fn t_yield(&mut self) -> bool {
        let mut pos = self.current;
        while self.threads[pos].state != State::Ready {
            pos += 1;
            if pos == self.threads.len() {
                pos = 0;
            }
            if pos == self.current {
                return false;
            }
        }

        if self.threads[self.current].state != State::Available {
            self.threads[self.current].state = State::Ready;
        }

        self.threads[pos].state = State::Running;
        let old_pos = self.current;
        self.current = pos;

        unsafe {
            switch(&mut self.threads[old_pos].ctx, &self.threads[pos].ctx);
        }

        // 防止 Windows 的编译器优化我们的代码。
        self.threads.len() > 0
    }

这是我们运行时的核心。我们必须将此命名为 t_yield,因为 yield 是 Rust 中的保留关键字。 在这里,我们遍历所有线程,看看是否有线程处于 Ready 状态,Ready 表明它已准备好恢复执行。在现实世界的程序中,这可能是一个数据库调用的返回。

如果没有线程是 Ready 的,就直接返回。这是一个非常简单的调度程序,只使用轮询算法,真正的调度程序可能有更复杂的方法来决定下一个要运行的任务。

这是一个为我们的例子量身定制的非常简单的实现。如果我们的线程尚未准备好进行(未处于 Ready 状态)并仍在等待比如来自数据库的响应,会发生什么?
解决这个问题并不困难,当一个线程 Ready时,我们可以轮询它的状态,而不是直接运行我们的代码,。比如,如果它真的准备好运行它可以返回 IsReady,如果仍旧在等待某些操作完成它可以返回 Pending。在后一种情况下,我们可以直接它处于 Ready 状态,以便稍后再次进行轮询。是不是听起来很熟悉?如果您已经阅读了Future在Rust中的工作方式,那么我们将开始就它们如何组合在一起提供一些建议。

如果我们找到一个准备运行的线程,那就将当前线程的状态从 Running 修改为 Ready

然后我们调用 switch 来保存当前上下文(旧上下文)并将新上下文加载到 CPU 中。新上下文要么是新任务,要么是 CPU 在现有任务上恢复工作所需的所有信息。

最终的 self.threads.len() > 0 的部分只是阻止编译器优化代码的一种方法。这种情况发生在 Windows 上,但不发生在 Linux 上,例如在运行性能测试时是一个常见问题。因此,我们可以使用 std::hint::black_box 来防止编译器为了更快地执行代码而走得太远并跳过我们需要的步骤。我选择了另外一种方法,只要是有注释的就应该没问题,无论如何代码永远不会执行到这里。

接下来是我们的 spawn() 函数:

pub fn spawn(&mut self, f: fn()) {
        let available = self
            .threads
            .iter_mut()
            .find(|t| t.state == State::Available)
            .expect("no available thread.");

        let size = available.stack.len();
        let s_ptr = available.stack.as_mut_ptr();

        unsafe {
            ptr::write(s_ptr.offset((size - 24) as isize) as *mut u64, guard as u64);
            ptr::write(s_ptr.offset((size - 32) as isize) as *mut u64, f as u64);
            available.ctx.rsp = s_ptr.offset((size - 32) as isize) as u64;
        }
        available.state = State::Ready;
    }
}

我认为 t_yield 在逻辑上是个很有趣的函数,而这个函数在技术上最有趣。

就像我们在前一章中所讨论的那样,我们要在这初始化栈,并且要确保我们的栈看起来像 psABI 栈布局中指定的栈。

当我们生成一个新线程时,我们首先要检查是否有任何可用线程(线程处于 Available 状态)。如果我们的线程耗尽,我们会在这种情况下使用恐慌(panic)机制,但有几种(更好的)方法可以解决这个问题。我们现在就保持这种简单的实现。

当我们找到一个可用的线程时,我们得到栈长度和指向我们的 u8 字节数组的指针。

接下来,我们必须使用一些不安全的功能。首先,我们确保指向堆栈底部的指针是16字节对齐的。然后,我们写入 guard 函数的地址,当我们提供的任务完成并且函数返回时,将调用该地址。然后我们将 f 的地址写入,这是我们通过参数传递进来想要运行的函数。

还记得我们在 那一章解释的栈是如何工作的吗?我们希望 f 函数首先运行,因此我们将基指针设置为 f 并确保它是16字节对齐的。然后我们将 guard 函数的地址压入。它不是16字节对齐,但是当 f 返回时,CPU 将读取下一个地址作为 f 的返回地址并在那里继续执行。

然后,我们设置 rsp 的值,它是指向我们提供的函数地址的栈指针,所以我们在计划运行时首先开始执行它。

最后,我们将状态设置为 Ready,这意味着我们有工作要做,并且我们已经准备好了。请记住,实际启动此线程取决于我们的“调度程序”。

我们现在已经完成了 Runtime 的实现,如果你掌握了所有这些,你基本上就能理解绿色线程是如何工作的。但是,实现它们仍然需要一些细节。

guardswitch 函数

fn guard() {
    unsafe {
        let rt_ptr = RUNTIME as *mut Runtime;
        (*rt_ptr).t_return();
    };
}

该函数意味着我们传入的函数已经返回,这意味着我们的线程已完成运行其任务,因此我们取消引用我们的运行时并调用t_return()。我们本可以创建一个函数,在线程完成时执行一些额外的工作但是现在我们的 t_return() 函数已经可以满足我们的需要了。它将我们的线程标记为 Available(如果它不是我们的基本线程)和 yield,这样我们就可以在另外一个线程上恢复工作。

pub fn yield_thread() {
    unsafe {
        let rt_ptr = RUNTIME as *mut Runtime;
        (*rt_ptr).t_yield();
    };
}

这只是一个辅助函数,它允许我们从代码中的任意位置调用 yield。不过这是非常不安全的,如果我们调用它并且我们的 Runtime 尚未初始化或运行时被删除,则会导致未定义的行为。然而,让这个更安全并不是我们的优先事项,我们只是为了让我们的例子启动并运行。

我们很快就到达终点了,还有最后一个函数。如果你已完成前面的章节,那么在没有很多评论的情况下应该可以理解这个:

#[naked]
#[inline(never)]
unsafe fn switch(old: *mut ThreadContext, new: *const ThreadContext) {
    asm!("
        mov     %rsp, 0x00($0)
        mov     %r15, 0x08($0)
        mov     %r14, 0x10($0)
        mov     %r13, 0x18($0)
        mov     %r12, 0x20($0)
        mov     %rbx, 0x28($0)
        mov     %rbp, 0x30($0)

        mov     0x00($1), %rsp
        mov     0x08($1), %r15
        mov     0x10($1), %r14
        mov     0x18($1), %r13
        mov     0x20($1), %r12
        mov     0x28($1), %rbx
        mov     0x30($1), %rbp
        ret
        "
    :
    :"r"(old), "r"(new)
    :
    : "volatile", "alignstack"
    );
}

这就是我们的内联汇编。正如第一个例子那样,我们首先读出我们需要的所有寄存器的值,然后将所有寄存器值设置为我们在“新”线程上暂停执行时保存的寄存器值。 这基本上是保存和恢复执行所需要做的全部工作。 这里我们看到使用的 #[naked] 属性。我们不希望Rust为我们的函数生成序言和结语,因为这就是所有的汇编代码,我们希望自己处理所有事情。如果我们不加上这个,我们将无法再次切换回我们的栈。

大多数内联汇编都在 一个能跑通的例子 一章的末尾解释了,如果这看起来很陌生,请阅读那章的相关部分再回来。

这个函数有两处与我们的第一个函数不同:

第一个是属性 #[inline(never)],该属性阻止编译器内联此函数。我花了一些时间来解决这个问题,但是如果我们不包含它,那么在使用 --release 模式编译时代码将会运行失败。

“volatile” 选项是新加的。正如我之前警告的那样,内联汇编可能有些粗糙,但这也表明我们的汇编有副作用。当我们提供参数作为输入时,我们需要确保编译器知道我们正在修改其中一个传入参数而不仅仅是读取它们。

0x00($1) # 0
0x08($1) # 8
0x10($1) # 16
0x18($1) # 24

我简要地提到过这一点,但在这里你能看到它的实际效果。这些是十六进制数字,表示从我们想要读/写的内存指针的偏移量。我在旁边加了十进制的注释,因此你看到我们只以 8 字节步长偏移指针,这与 ThreadContext 结构中的 u64 字段大小相同。

这也是为什么用 #[repr(C)] 注释 ThreadContext 很重要的原因,所以我们知道数据将以这种方式在内存中表示,我们写入正确的字段。 Rust ABI 不保证它们在内存中以相同的顺序表示,但是 C-ABI 可以保证这一点。

main 函数

fn main() {
    let mut runtime = Runtime::new();
    runtime.init();
    runtime.spawn(|| {
        println!("THREAD 1 STARTING");
        let id = 1;
        for i in 0..10 {
            println!("thread: {} counter: {}", id, i);
            yield_thread();
        }
        println!("THREAD 1 FINISHED");
    });
    runtime.spawn(|| {
        println!("THREAD 2 STARTING");
        let id = 2;
        for i in 0..15 {
            println!("thread: {} counter: {}", id, i);
            yield_thread();
        }
        println!("THREAD 2 FINISHED");
    });
    runtime.run();
}

正如你在这里看到的那样,我们初始化我们的运行时并创建两个线程,一个计数到 10,一个计数到 15,每两次计数之间都会执行 yield。当我们使用 cargo run 命令运行工程时,我们应该会得到以下输出:

Finished dev [unoptimized + debuginfo] target(s) in 2.17s
Running `target/debug/green_threads`
THREAD 1 STARTING
thread: 1 counter: 0
THREAD 2 STARTING
thread: 2 counter: 0
thread: 1 counter: 1
thread: 2 counter: 1
thread: 1 counter: 2
thread: 2 counter: 2
thread: 1 counter: 3
thread: 2 counter: 3
thread: 1 counter: 4
thread: 2 counter: 4
thread: 1 counter: 5
thread: 2 counter: 5
thread: 1 counter: 6
thread: 2 counter: 6
thread: 1 counter: 7
thread: 2 counter: 7
thread: 1 counter: 8
thread: 2 counter: 8
thread: 1 counter: 9
thread: 2 counter: 9
THREAD 1 FINISHED.
thread: 2 counter: 10
thread: 2 counter: 11
thread: 2 counter: 12
thread: 2 counter: 13
thread: 2 counter: 14
THREAD 2 FINISHED.

漂亮!!我们的线程交替执行,因为它们在每个计数处让出控制权,直到线程 1 完成并且线程 2 在完成任务之前数到最后一个数字。

你现在已经实现了一个非常简单但可用的绿色线程示例。这是我们探索它的必经之路,但如果你已经走了这么远并通读了所有内容,你值得休息一下。谢谢阅读!

译者注:原书的后一章就是单纯的200行完整代码,我直接就放在本章里了:

#![feature(asm)]
#![feature(naked_functions)]
use std::ptr;

const DEFAULT_STACK_SIZE: usize = 1024 * 1024 * 2;
const MAX_THREADS: usize = 4;
static mut RUNTIME: usize = 0;

pub struct Runtime {
    threads: Vec<Thread>,
    current: usize,
}

#[derive(PartialEq, Eq, Debug)]
enum State {
    Available,
    Running,
    Ready,
}

struct Thread {
    id: usize,
    stack: Vec<u8>,
    ctx: ThreadContext,
    state: State,
}

#[derive(Debug, Default)]
#[repr(C)]
struct ThreadContext {
    rsp: u64,
    r15: u64,
    r14: u64,
    r13: u64,
    r12: u64,
    rbx: u64,
    rbp: u64,
}

impl Thread {
    fn new(id: usize) -> Self {
        Thread {
            id,
            stack: vec![0_u8; DEFAULT_STACK_SIZE],
            ctx: ThreadContext::default(),
            state: State::Available,
        }
    }
}

impl Runtime {
    pub fn new() -> Self {
        let base_thread = Thread {
            id: 0,
            stack: vec![0_u8; DEFAULT_STACK_SIZE],
            ctx: ThreadContext::default(),
            state: State::Running,
        };

        let mut threads = vec![base_thread];
        let mut available_threads: Vec<Thread> = (1..MAX_THREADS).map(|i| Thread::new(i)).collect();
        threads.append(&mut available_threads);

        Runtime {
            threads,
            current: 0,
        }
    }

    pub fn init(&self) {
        unsafe {
            let r_ptr: *const Runtime = self;
            RUNTIME = r_ptr as usize;
        }
    }

    pub fn run(&mut self) -> ! {
        while self.t_yield() {}
        std::process::exit(0);
    }

    fn t_return(&mut self) {
        if self.current != 0 {
            self.threads[self.current].state = State::Available;
            self.t_yield();
        }
    }

    fn t_yield(&mut self) -> bool {
        let mut pos = self.current;
        while self.threads[pos].state != State::Ready {
            pos += 1;
            if pos == self.threads.len() {
                pos = 0;
            }
            if pos == self.current {
                return false;
            }
        }

        if self.threads[self.current].state != State::Available {
            self.threads[self.current].state = State::Ready;
        }

        self.threads[pos].state = State::Running;
        let old_pos = self.current;
        self.current = pos;

        unsafe {
            switch(&mut self.threads[old_pos].ctx, &self.threads[pos].ctx);
        }

        self.threads.len() > 0
    }

    pub fn spawn(&mut self, f: fn()) {
        let available = self
            .threads
            .iter_mut()
            .find(|t| t.state == State::Available)
            .expect("no available thread.");

        let size = available.stack.len();

        unsafe {
            let s_ptr = available.stack.as_mut_ptr().offset(size as isize);
            let s_ptr = (s_ptr as usize & !15) as *mut u8;
            ptr::write(s_ptr.offset(-24) as *mut u64, guard as u64);
            ptr::write(s_ptr.offset(-32) as *mut u64, f as u64);
            available.ctx.rsp = s_ptr.offset(-32) as u64;
        }

        available.state = State::Ready;
    }
}

fn guard() {
    unsafe {
        let rt_ptr = RUNTIME as *mut Runtime;
        (*rt_ptr).t_return();
    };
}

pub fn yield_thread() {
    unsafe {
        let rt_ptr = RUNTIME as *mut Runtime;
        (*rt_ptr).t_yield();
    };
}

#[naked]
#[inline(never)]
unsafe fn switch(old: *mut ThreadContext, new: *const ThreadContext) {
    asm!("
        mov     %rsp, 0x00($0)
        mov     %r15, 0x08($0)
        mov     %r14, 0x10($0)
        mov     %r13, 0x18($0)
        mov     %r12, 0x20($0)
        mov     %rbx, 0x28($0)
        mov     %rbp, 0x30($0)

        mov     0x00($1), %rsp
        mov     0x08($1), %r15
        mov     0x10($1), %r14
        mov     0x18($1), %r13
        mov     0x20($1), %r12
        mov     0x28($1), %rbx
        mov     0x30($1), %rbp
        ret
        "
    :
    :"r"(old), "r"(new)
    :
    : "volatile", "alignstack"
    );
}

fn main() {
    let mut runtime = Runtime::new();
    runtime.init();
    runtime.spawn(|| {
        println!("THREAD 1 STARTING");
        let id = 1;
        for i in 0..10 {
            println!("thread: {} counter: {}", id, i);
            yield_thread();
        }
        println!("THREAD 1 FINISHED");
    });
    runtime.spawn(|| {
        println!("THREAD 2 STARTING");
        let id = 2;
        for i in 0..15 {
            println!("thread: {} counter: {}", id, i);
            yield_thread();
        }
        println!("THREAD 2 FINISHED");
    });
    runtime.run();
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK