3

Rust并发编程番外篇: Mutex内部实现

 5 months ago
source link: https://colobu.com/2023/11/05/inside-rust-mutex/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Mutex是最常用的一种同步原语,它提供了互斥锁的功能,多线程可以互斥访问共享数据以及通过锁保护临界区。Rust标准库提供了Mutex的实现,接下来我们看看它是怎么实现的。

Mutex的定义

Mutex包含三个字段。

一个是内部实现的锁(sys::Mutex),根据不同的操作系统,可能选择不同的实现。
一个是poison,用来标记锁是否被破坏,是否中毒了。
最后一个是data,用来存储被保护的数据。

pub struct Mutex<T: ?Sized> {
inner: sys::Mutex,
poison: poison::Flag,
data: UnsafeCell<T>,
impl<T> Mutex<T> {
pub const fn new(t: T) -> Mutex<T> {
Mutex { inner: sys::Mutex::new(), poison: poison::Flag::new(), data: UnsafeCell::new(t) }

另外一个关联的数据结构是MutexGuard, 它是Mutex的一个智能指针,用来管理锁的生命周期。它实现了DerefDrop,所以可以通过*来访问被保护的数据,当MutexGuard离开作用域时,会自动释放锁。

```rust
pub struct MutexGuard<'a, T: ?Sized + 'a> {
lock: &'a Mutex<T>,
poison: poison::Guard,

当请求锁时,调用内部的sys::Mutex上锁,并且返回一个MutexGuard,其实严格的说,返回一个LockResult,它是一个Result,当锁中毒时,返回Err,否则返回OkOk中包含了MutexGuard

```rust
pub fn lock(&self) -> LockResult<MutexGuard<'_, T>> {
unsafe {
self.inner.lock();
MutexGuard::new(self)
pub type LockResult<Guard> = Result<Guard, PoisonError<Guard>>;

poison如果你不太了解,可以看看这篇文章,我们不进一步介绍它了。
MutexGuard功能已经介绍了, 接下来我们看看它的实现:

impl<T: ?Sized> Deref for MutexGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.lock.data.get() }
impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.lock.data.get() }

MutexGuard的解引用返回被保护的数据,返回的是一个引用和可变引用。

MutexGuard离开作用域时,会自动释放锁,它的实现如下,可以看到它调用了self.lock.inner.unlock():

impl<T: ?Sized> Drop for MutexGuard<'_, T> {
#[inline]
fn drop(&mut self) {
unsafe {
self.lock.poison.done(&self.poison);
self.lock.inner.unlock();

try_lock调用innertry_lock,每什么好说的:

pub fn try_lock(&self) -> TryLockResult<MutexGuard<'_, T>> {
unsafe {
if self.inner.try_lock() {
Ok(MutexGuard::new(self)?)
} else {
Err(TryLockError::WouldBlock)

目前rust还提供了一个不稳定的方法unlock,主动的立即释放锁,其实就是调用了drop(guard):

pub fn unlock(guard: MutexGuard<'_, T>) {
drop(guard);

那么看起来,锁的主要逻辑是inner实现的,接下来我们看看inner的实现。

inner的类型是sys::Mutex,它位于library/std/src/sys/mod.rs,根据操作系统的不同,有不同的实现,我们主要看linux(unix)的实现。

unix的实现位于/library/std/src/sys/unix/locks, 根据具体的操作系统,也有两种实现方案,我们主要看linux的实现。

if #[cfg(any(
target_os = "linux",
target_os = "android",
all(target_os = "emscripten", target_feature = "atomics"),
target_os = "freebsd",
target_os = "openbsd",
target_os = "dragonfly",
mod futex_mutex;
mod futex_rwlock;
mod futex_condvar;
pub(crate) use futex_mutex::Mutex;
pub(crate) use futex_rwlock::RwLock;
pub(crate) use futex_condvar::Condvar;

它有两种实现futex_mutexpthread_mutex.rs, Linux操作系统下,使用的是futex_mutex

futex_mutex会使用Linux操作系统的futex_waitfutex_wake系统调用。
它只包含一个futex字段,它是一个AtomicU32,用来表示锁的状态,它有三种状态:

  • 0: 未加锁状态·
  • 1: 加锁,且没有其它线程等待
  • 2: 加锁,且有其它线程等待(竞争)
use crate::sync::atomic::{
AtomicU32,
Ordering::{Acquire, Relaxed, Release},
use crate::sys::futex::{futex_wait, futex_wake};
pub struct Mutex {
futex: AtomicU32,

new创建一个未加锁的Muext, 而try_lock会尝试将futex从0改为1,如果成功,表示加锁成功,否则失败。
注意这里使用了 AcquireRelaxed Ordering,交换成功新值内存可见(Acquire),失败无所谓了(Relaxed):

impl Mutex {
#[inline]
pub const fn new() -> Self {
Self { futex: AtomicU32::new(0) }
#[inline]
pub fn try_lock(&self) -> bool {
self.futex.compare_exchange(0, 1, Acquire, Relaxed).is_ok()
接下来是重头戏`lock`:

rust
impl Mutex {
...

#[inline]
pub fn lock(&self) {
    if self.futex.compare_exchange(0, 1, Acquire, Relaxed).is_err() {
        self.lock_contended(); // 如果第一次抢不到锁,进入锁竞争场景的处理
    }
}

#[cold]
fn lock_contended(&self) {
    // 获取锁的状态
    let mut state = self.spin();

    // 如果未加速,去抢!
    if state == 0 {
        match self.futex.compare_exchange(0, 1, Acquire, Relaxed) {
            Ok(_) => return, // 抢成功,返回
            Err(s) => state = s, // 失败,把当前的状态赋值给state
        }
    }

    loop {
        // 运气来了,在多个人抢一个未加锁的锁时,你,命中天子抢到了锁,并且把锁状态从0改为2
        if state != 2 && self.futex.swap(2, Acquire) == 0 {
            // 成功把锁的状态从0改成了2,获取了锁,返回
            return;
        }

        // 等待锁状态变化,如果锁状态不是2,就不等了
        futex_wait(&self.futex, 2, None);

        // 别的线程释放了锁,spin检查锁的状态,再集中精力继续抢
        state = self.spin();
    }
}

fn spin(&self) -> u32 {
    let mut spin = 100;
    loop {
        // 获取当前锁的状态, 可能是0,1,2三种状态
        let state = self.futex.load(Relaxed);

        // 如果是未加锁,或者是有其它线程等待,则返回
        // 如果spin次数用完了,也返回
        if state != 1 || spin == 0 {
            return state;
        }

        crate::hint::spin_loop();
        spin -= 1;
    }
}
...
总得来说,抢锁的线程通过spin,避免上下文切换,能够提高性能。如果spin次数用完了,就进入等待状态,等待其它线程释放锁,然后再抢。
剩下一个方法就是解锁了。解锁比较简单,就是把锁的状态从1或者2改为0,如果原来是2,表示还有其它线程等待,就唤醒一个。

rust
impl Mutex {
...
pub unsafe fn unlock(&self) {
if self.futex.swap(0, Release) == 2 {
// 如果还有等待的线程,就唤醒一个
self.wake();
}
}

#[cold]
fn wake(&self) {
    futex_wake(&self.futex);
}
...

}
`

可以看到,Rust的Mutex的实现还是比较简单的,它的核心是利用操作系统的futex相关方法,加一个AtomicU32标志来实现。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK