35

深入浅出Semaphore源码解析

 4 years ago
source link: http://www.cnblogs.com/pinxiong/p/13332609.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Semaphore 通过permits的值来限制线程访问临界资源的总数,属于有限制次数的共享锁,不支持重入。

前提条件

在理解 Semaphore 时需要具备一些基本的知识:

理解AQS的实现原理

之前有写过一篇 《深入浅出AQS源码解析》 关于AQS的文章,对AQS原理不了解的同学可以先看一下

Semaphore源码解析

Semaphore 中有3个内部类,分别是 SyncNonfairSyncFairSyncSyncNonfairSyncFairSync 的抽象类,我们会从解读 Semaphore 实现的功能开始入手逐渐去解析 SyncNonfairSyncFairSync 的源码

public class Semaphore implements java.io.Serializable {

    private final Sync sync;

    /**
     * 初始化permits个资源
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**
     * 初始化permits个资源,根据fair来决定是使用公平锁还是非公平锁的方式
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

    /**
     * 中断方式获取一个资源
     */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * 非中断方式获取一个资源
     */
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

    /**
     * 尝试获取一个资源
     */
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

    /**
     * 尝试超时获取一个资源
     */
    public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * 释放一个资源
     */
    public void release() {
        sync.releaseShared(1);
    }

    /**
     * 中断方式获取permits个资源
     */
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

    /**
     * 非中断方式获取permits个资源
     */
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    /**
     * 尝试获取permits个资源
     */
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }

    /**
     * 尝试超时获取permits个资源
     */
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }

    /**
     * 释放permits个资源
     */
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

    /**
     * 获取当前可用资源数量
     */
    public int availablePermits() {
        return sync.getPermits();
    }

    /**
     * 用掉所有的资源,并返回用掉的资源数量
     */
    public int drainPermits() {
        return sync.drainPermits();
    }

    /**
     * 缩减reduction个资源
     */
    protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }
}

虽然 Semaphore 中的方法比较多,但是都比较简单,都是转调用 Sync 中的方法,通过解析 Sync 中的源码来帮助大家理解这些方法是如何实现的

Sync类源码解析

abstract static class Sync extends AbstractQueuedSynchronizer {
    // 获取所有可用的资源数量
    final int getPermits() {
        return getState();
    }
    // 非公平的方式尝试获取acquires个可用的资源
    final int nonfairTryAcquireShared(int acquires) {
        // 无限循环,尝试获取acquires个资源
        // 如果资源数量不够,返回剩余资源数量
        // 如果资源数量足够且获取成功,返回剩余的资源数量
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

    // 尝试获取releases个资源
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            // 当releases不允许为负数
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            // CAS操作尝试修改state的值
            if (compareAndSetState(current, next))
                return true;
        }
    }

    // 缩减releases个资源
    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            // 当releases不允许为负数,也就时不能通过该方法增加资源
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            // CAS操作尝试修改state的值
            if (compareAndSetState(current, next))
                return;
        }
    }

    // 清空所有的资源数量
    final int drainPermits() {
        for (;;) {
            int current = getState();
            // CAS操作尝试将资源数量设置为0
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}

FairSync类源码解析

FairSync 中的源码很简单,直接上代码

static final class FairSync extends Sync {

    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        /**
         * 具体思路如下:
         * 1、如果AQS的同步队列中有等待的线程,直接获取失败,会加入到AQS的同步队列中
         * 2、如果AQS的同步队列为空,尝试修改state的值来获取acquires个资源
         * 3、一直重复步骤1和2,直到有结果返回才退出无限循环
         */
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

NonfairSync类源码解析

NonfairSync 中的源码就更简单,解析如下:

static final class NonfairSync extends Sync {

    NonfairSync(int permits) {
        super(permits);
    }
    
    // 抢占式的获取acquires个资源
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

总结

  • permits 初始化为 1 时, Semaphore 变成了一个互斥的排他锁
  • permits 初始化为无穷大时, Semaphore 变成了无锁模式
  • state 的值为 0 的时候,无法获取资源,获取资源的线程会进入AQS的同步队列等待有资源释放时被唤醒
  • Semaphore 初始化成 非公平锁 时,可能会出现有的线程饿死的情况,一般对于控制资源的使用而言,建议初始化为 公平锁
  • 可以调用 reducePermits 动态的缩减资源的数量,但是不能增加资源的数量

Recommend

  • 80
    • www.jianshu.com 7 years ago
    • Cache

    Java信号量Semaphore详解 - 简书

    导入 Semaphore实现为一种基于计数的信号量,Semaphore管理着一组虚拟的许可集合,这种许可可以作为某种凭证,来管理资源,在一些资源有限的场景下很有实用性,比如数据库连接,应用可初始化一组数据库连接,然后通过使用Semaphore来管理获取连接的许可,任何线程...

  • 138

    24 Mar 2020 · Software Engineering PHP Versions Used in Commercial Projects in 2017

  • 51

    Middleware is a core part of Laravel applications. How do we go about testing it in isolation with PHPUnit?

  • 45
    • www.linkedkeeper.com 7 years ago
    • Cache

    Semaphore源码解析

  • 69
    • xargin.com 6 years ago
    • Cache

    Go 系列文章 11: semaphore

    后续更新和修正: https://github.com/cch123/golang-notes/blob/master/semaphore.md 数据结构 // Go 语言中暴...

  • 6
    • huyan.couplecoders.tech 3 years ago
    • Cache

    (juc系列)semaphore源码阅读

    (juc系列)semaphore源码阅读 - 呼延十的博客 | HuYan Blog本文源码基于: JDK13 为了巩固AQS. 看一下Semaphore的源码. 大部分都是直接翻译的官方代码注释,嘻嘻 一个计数的信号量. 概念上讲,信号量维护了一个许...

  • 2

    鸿蒙轻内核A核源码分析系列八—信号量Semaphore-51CTO.COM 鸿蒙轻内核A核源码分析系列八—信号量Semaphore 作者:zhushangyuan_ 2022-04-13 11:12:43 本文带领大家一起剖析了鸿蒙轻内核的信号量模...

  • 3
    • www.extlight.com 2 years ago
    • Cache

    Semaphore 源码详解

    在高并发访问的场景下,为了保证项目不被大流量请求的压力影响性能导致项目运行崩溃,常用的解决方案就是限流和服务降级。本篇介绍 Semaphore, 直译就是信号量,是基于 AQS 扩展的一种...

  • 3

    一、背景介绍在生产实践中,常常会遇到这样的场景:需要针对某一类 Http 请求做统一的处理,例如在...

  • 4

    JUC同步锁原理源码解析四----Semaphore Semaphore 1.Semaphore的来源 A counting semaphore. Conceptually, a semaphore maintains a set of...

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK