6

C#中使用CAS实现无锁算法 - 黑洞视界

 1 year ago
source link: https://www.cnblogs.com/eventhorizon/p/17338890.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

CAS 的基本概念

CAS(Compare-and-Swap)是一种多线程并发编程中常用的原子操作,用于实现多线程间的同步和互斥访问。 它操作通常包含三个参数:一个内存地址(通常是一个共享变量的地址)、期望的旧值和新值。

CompareAndSwap(内存地址,期望的旧值,新值)

CAS 操作会比较内存地址处的值与期望的旧值是否相等,如果相等,则将新值写入该内存地址; 如果不相等,则不进行任何操作。这个比较和交换的操作是一个原子操作,不会被其他线程中断。

CAS 通常是通过硬件层面的CPU指令实现的,其原子性是由硬件保证的。具体的实现方式根据环境会有所不同。

CAS 操作通常会有一个返回值,用于表示操作是否成功。返回结果可能是true或false,也可能是内存地址处的旧值。

相比于传统的锁机制,CAS 有一些优势:

  • 原子性:CAS 操作是原子的,不需要额外的锁来保证多线程环境下的数据一致性,避免了锁带来的性能开销和竞争条件。

  • 无阻塞:CAS 操作是无阻塞的,不会因为资源被锁定而导致线程的阻塞和上下文切换,提高了系统的并发性和可伸缩性。

  • 适用性:CAS 操作可以应用于广泛的数据结构和算法,如自旋锁、计数器、队列等,使得它在实际应用中具有较大的灵活性和适用性。

C# 中如何使用 CAS

在 C# 中,我们可以使用 Interlocked 类来实现 CAS 操作。

Interlocked 类提供了一组 CompareExchange 的重载方法,用于实现不同类型的数据的 CAS 操作。

public static int CompareExchange(ref int location1, int value, int comparand);
public static long CompareExchange(ref long location1, long value, long comparand);
// ... 省略其他重载方法
public static object CompareExchange(ref object location1, object value, object comparand);
public static T CompareExchange<T>(ref T location1, T value, T comparand) where T : class;

CompareExchange 方法将 location1 内存地址处的值与 comparand 比较,如果相等,则将 value 写入 location1 内存地址处,否则不进行任何操作。
该方法返回 location1 内存地址处的值。

通过判断方法返回值与 comparand 是否相等,我们就可以知道 CompareExchange 方法是否执行成功。

在使用 CAS 实现无锁算法时,通常我们不光是为了比较和更新一个数据,还需要在更新成功后进行下一步的操作。结合 while(true) 循环,我们可以不断地尝试更新数据,直到更新成功为止。
伪代码如下:

while (true)
{
    // 读取数据
    oldValue = ...;
    // 计算新值
    newValue = ...;
    // CAS 更新数据
    result = CompareExchange(ref location, newValue, oldValue);
    // 判断 CAS 是否成功
    if (result == oldValue)
    {
        // CAS 成功,执行后续操作
        break;
    }
}

在复杂的无锁算法中,因为每一步操作都是独立的,连续的操作并非原子,所以我们不光要借助 CAS,每一步操作前都应判断是否有其他线程已经修改了数据。

示例1:计数器#

下面是一个简单的计数器类,它使用 CAS 实现了一个线程安全的自增操作。

public class Counter
{
    private int _value;

    public int Increment()
    {
        while (true)
        {
            int oldValue = _value;
            int newValue = oldValue + 1;
            int result = Interlocked.CompareExchange(ref _value, newValue, oldValue);
            if (result == oldValue)
            {
                return newValue;
            }
        }
    }
}

CLR 底层源码中,我们也会经常看到这样的代码,比如 ThreadPool 增加线程时的计数器。
https://github.com/dotnet/runtime/blob/release/6.0/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs#L446

internal void EnsureThreadRequested()
{
    //
    // If we have not yet requested #procs threads, then request a new thread.
    //
    // CoreCLR: Note that there is a separate count in the VM which has already been incremented
    // by the VM by the time we reach this point.
    //
    int count = _separated.numOutstandingThreadRequests;
    while (count < Environment.ProcessorCount)
    {
        int prev = Interlocked.CompareExchange(ref _separated.numOutstandingThreadRequests, count + 1, count);
        if (prev == count)
        {
            ThreadPool.RequestWorkerThread();
            break;
        }
        count = prev;
    }
}

示例2:队列#

下面是一个简单的队列类,它使用 CAS 实现了一个线程安全的入队和出队操作。相较于上面的计数器,这里的操作更加复杂,我们每一步都需要考虑是否有其他线程已经修改了数据。

这样的算法有点像薛定谔的猫,你不知道它是死是活,只有当你试图去观察它的时候,它才可能会变成死或者活。

public class ConcurrentQueue<T>
{
    // _head 和 _tail 是两个伪节点,_head._next 指向队列的第一个节点,_tail 指向队列的最后一个节点。
    // _head 和 _tail 会被多个线程修改和访问,所以要用 volatile 修饰。
    private volatile Node _head;
    private volatile Node _tail;
    
    public ConcurrentQueue()
    {
        _head = new Node(default);
        // _tail 指向 _head 时,队列为空。
        _tail = _head;
    }

    public void Enqueue(T item)
    {
        var node = new Node(item);
        while (true)
        {
            Node tail = _tail;
            Node next = tail._next;
            // 判断给 next 赋值的这段时间,是否有其他线程修改过 _tail
            if (tail == _tail)
            {
                // 如果 next 为 null,则说明从给 tail 赋值到给 next 赋值这段时间,没有其他线程修改过 tail._next,
                if (next == null)
                {
                    // 如果 tail._next 为 null,则说明从给 tail 赋值到这里,没有其他线程修改过 tail._next,
                    // tail 依旧是队列的最后一个节点,我们就可以直接将 node 赋值给 tail._next。                                
                    if (Interlocked.CompareExchange(ref tail._next, node, null) == null)
                    {
                        // 如果_tail == tail,则说明从上一步 CAS 操作到这里,没有其他线程修改过 _tail,也就是没有其他线程执行过 Enqueue 操作。
                        // 那么当前线程 Enqueue 的 node 就是队列的最后一个节点,我们就可以直接将 node 赋值给 _tail。
                        Interlocked.CompareExchange(ref _tail, node, tail);
                        break;
                    }
                }
                // 如果 next 不为 null,则说明从给 tail 赋值到给 next 赋值这段时间,有其他线程修改过 tail._next,
                else
                {
                    // 如果没有其他线程修改过 _tail,那么 next 就是队列的最后一个节点,我们就可以直接将 next 赋值给 _tail。
                    Interlocked.CompareExchange(ref _tail, next, tail);
                }
            }
        }
    }

    public bool TryDequeue(out T item)
    {
        while (true)
        {
            Node head = _head;
            Node tail = _tail;
            Node next = head._next;
            // 判断 _head 是否被修改过
            // 如果没有被修改过,说明从给 head 赋值到给 next 赋值这段时间,没有其他线程执行过 Dequeue 操作。          
            if (head == _head)
            {
                // 如果 head == tail,说明队列为空
                if (head == tail)
                {
                    // 虽然上面已经判断过队列是否为空,但是在这里再判断一次
                    // 是为了防止在给 tail 赋值到给 next 赋值这段时间,有其他线程执行过 Enqueue 操作。
                    if (next == null)
                    {
                        item = default;
                        return false;
                    }

                    // 如果 next 不为 null,则说明从给 tail 赋值到给 next 赋值这段时间,有其他线程修改过 tail._next,也就是有其他线程执行过 Enqueue 操作。
                    // 那么 next 就可能是队列的最后一个节点,我们尝试将 next 赋值给 _tail。
                    Interlocked.CompareExchange(ref _tail, next, tail);
                }
                // 如果 head != tail,说明队列不为空
                else
                {
                    item = next._item;
                    if (Interlocked.CompareExchange(ref _head, next, head) == head)
                    {
                        // 如果 _head 没有被修改过
                        // 说明从给 head 赋值到这里,没有其他线程执行过 Dequeue 操作,上面的 item 就是队列的第一个节点的值。
                        // 我们就可以直接返回。
                        break;
                    }
                    // 如果 _head 被修改过
                    // 说明从给 head 赋值到这里,有其他线程执行过 Dequeue 操作,上面的 item 就不是队列的第一个节点的值。
                    // 我们就需要重新执行 Dequeue 操作。
                }
            }
        }

        return true;
    }

    private class Node
    {
        public readonly T _item;
        public Node _next;

        public Node(T item)
        {
            _item = item;
        }
    }
}

我们可以通过以下代码来进行测试

using System.Collections.Concurrent;

var queue = new ConcurrentQueue<int>();
var results = new ConcurrentBag<int>();
int dequeueRetryCount = 0;

var enqueueTask = Task.Run(() =>
{
    // 确保 Enqueue 前 dequeueTask 已经开始运行
    Thread.Sleep(10);
    Console.WriteLine("Enqueue start");
    Parallel.For(0, 100000, i => queue.Enqueue(i));
    Console.WriteLine("Enqueue done");
});

var dequeueTask = Task.Run(() =>
{
    Thread.Sleep(10);
    Console.WriteLine("Dequeue start");
    Parallel.For(0, 100000, i =>
    {
        while (true)
        {
            if (queue.TryDequeue(out int result))
            {
                results.Add(result);
                break;
            }

            Interlocked.Increment(ref dequeueRetryCount);
        }
    });
    Console.WriteLine("Dequeue done");
});

await Task.WhenAll(enqueueTask, dequeueTask);
Console.WriteLine(
    $"Enqueue and dequeue done, total data count: {results.Count}, dequeue retry count: {dequeueRetryCount}");

var hashSet = results.ToHashSet();
for (int i = 0; i < 100000; i++)
{
    if (!hashSet.Contains(i))
    {
        Console.WriteLine("Error, missing " + i);
        break;
    }
}

Console.WriteLine("Done");

输出结果:

Dequeue start
Enqueue start
Enqueue done
Dequeue done
Enqueue and dequeue done, total data count: 100000, dequeue retry count: 10586
Done

上述的 retry count 为 797,说明在 100000 次的 Dequeue 操作中,有 10586 次的 Dequeue 操作需要重试,那是因为在 Dequeue 操作中,可能暂时没有数据可供 Dequeue,需要等待其他线程执行 Enqueue 操作。

当然这个 retry count 是不稳定的,因为在多线程环境下,每次执行的结果都可能不一样。

CAS 操作是一种乐观锁,它假设没有其他线程修改过数据,如果没有修改过,那么就直接修改数据,如果修改过,那么就重新获取数据,再次尝试修改。

在借助 CAS 实现较为复杂的数据结构时,我们不光要依靠 CAS 操作,还需要注意每次操作的数据是否被其他线程修改过,考虑各个可能的分支,以及在不同的分支中,如何处理数据。

欢迎关注个人技术公众号

1201123-20230302194546214-138980196.png

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK