20

Java并发---concurrent包

 3 years ago
source link: http://www.cnblogs.com/houqx/p/13529943.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

一、包的结构层次

uUJR7vj.jpg!mobile

其中包含了两个子包atomic和locks,另外字concurrent下的阻塞队列以及executor,这些就是concurrent包中的精华。而这些类的实现主要是依赖于volatile和CAS,从整体上看concurrent包的整体实现图如下:

a6F7nu.jpg!mobile

二、Lock和synchronized的比较

锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源。在Lock接口出现之前,Java主要是靠synchronized关键字实现锁功能的,而Java1.5之后,并发包增加了Lock接口,它提供了与synchronized一样的锁功能。虽然它失去了想synchronized关键字隐式加锁解锁的便捷性,单却拥有了获取锁和释放锁的可操作性,可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性

通常Lock使用的形式如下:

Lock lock=new ReentrantLock();
lock.lock();
try {
            
} finally {
    lock.unlock();
}

需要注意的是:synchronized同步块执行完成活着遇到异常会自动释放锁,而lock必须调用unlock()释放锁,因此必须在finally中释放锁。

三、Lock接口API的介绍

看看Lock定义了哪些方法:

void lock();获取锁

void lockInterruptibly() throws InterruptedException;获取所的过程能够响应中断

boolean tryLock();非阻塞式响应中断能立即返回,获取锁返回true,反之返回false;

boolean tryLock(long time, TimeUnit unit) throws InterruptedException;超时获取锁,在超时内或者中断的情况下能够获取锁

Condition newCondition();获取与lock绑定的等待通知组件,当前线程必须获得了锁才能进行等待,进行等待时会先释放锁,当再次获取锁时才能从等待中返回;

void unlock();释放锁

那么在locks包下有哪些类实现了改接口?

ReentrantLock

public class ReentrantLock implements Lock, java.io.Serializable 

很显然ReentrantLock实现了lock接口,当你查看源码是会发现ReentrantLock并没有多少代码,另外一个很明显的特点是:基本上所有的方法的实现实际上都是调用了其静态内部类Sync中的方法,而Sync类继承了AbstractQueuedSynchronizer(AQS)。可以看出ReentrantLock关键核心在于对队列同步器AbstractQueuedSynchronizer的理解。

四、了解队列同步器AQS(AbstractQueuedSynchronizer)

关于AQS在源码中有十分具体的解释

BjQ7v2R.jpg!mobile

同步器是用来构建锁和其他同步组件的基础框架,它的实现主要依赖一个int成员变量来表示同步状态以及通过一个FIFO队列构成等待队列。它的子类必须重写AQS的几个protected修饰的用来改变同步状态的方法,其他方法主要是实现了排队和阻塞机制。状态的更新使用getState,setState以及compareAndSetState这三个方法。

子类被推荐定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态的获取和释放方法来供自定义同步组件的使用,同步器既支持独占式获取同步状态,也支持共享式获取同步状态,这样就可以方便的实现不同类型的同步组件。

同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解二者的关系:锁是面向使用者,它定义了使用者与锁交互的接口,隐藏了实现细节;同步器是面向锁的实现者,它简化了锁的实现方式,屏蔽了同步状态的管理,线程的排队,等待和唤醒等底层操作。锁和同步器很好的隔离了使用者和实现者所需要关注的领域。

五、AQS的设计模式

AQS的设计模式是使用模板方法设计模式,它将一些方法开放给子类进行重写,而同步器给同步组件所提供模板方法又会重新被子类重写的方法。

举个例子,AQS中需要重写的方法tryAcquire

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

ReentrantLock中NonfairSync(继承AQS)会重写该方法为:

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

而AQS中的模板方法acquire():

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

会调用tryAcquire方法,而此时当继承AQS的NonfairSync调用模板方法acquire时就会调用已经被NonfairSync重写的tryAcquire方法。这就是使用AQS的方式。

同步组件(这里不仅仅指锁,还包括CountDownLatch等)的实现依赖于同步器AQS,在同步组件实现中,使用AQS的方式被推荐定义继承AQS的静态内部类;

AQS采用模板方法进行设计,AQS的protected修饰的方法需要由继承AQS的子类进行重写实现,当调用AQS的子类的方法是就会调用被重写的方法;

AQS负责同步状态的管理,线程的排队,等待和唤醒这些底层操作,而Lock等同步组件主要专注于实现同步语义;

在重写AQS的方式时,使用AQS提供的getState(),setState()以及compareAndSetState()方法进行修改同步状态.

AQS可重写的方法如下图:

6z6vqe.jpg!mobile

在实现同步组件是AQS提供的模板方法如下图

QnEVFje.jpg!mobile

AQS提供的模板方法可以分为3类:

独占式获取与释放同步状态

共享式获取与释放同步状态

查询同步队列中等待线程情况

同步组件通过AQS提供的模板方法实现自己的同步语义。

六、AQS的使用example

class Mutex implements Lock, java.io.Serializable {

    // Our internal helper class
    private static class Sync extends AbstractQueuedSynchronizer {
        // Reports whether in locked state
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // Acquires the lock if state is zero
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // Releases the lock by setting state to zero
        protected boolean tryRelease(int releases) {
            assert releases == 1; // Otherwise unused
            if (getState() == 0)
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // Provides a Condition
        Condition newCondition() {
            return new ConditionObject();
        }

        // Deserializes properly
        private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

    // The sync object does all the hard work. We just forward to it.
    private final Sync sync = new Sync();

    public void lock() {
        sync.acquire(1);
    }

    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}
package passtra;

public class MutextDemo {
    private static Mutex mutex = new Mutex();

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(() -> {
                mutex.lock();
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    mutex.unlock();
                }
            });
            thread.start();
        }
    }
}

这个例子来源于AQS源码的example,执行情况:

Ujuaquu.png!mobile

上面的例子实现了独占锁的语义,在同一个时刻只允许一个线程占有锁。 MutextDemo新建10个线程,分别睡眠3s。从执行情况也可以看出当前Thread-6正在执行占有锁而其他线程7/8处于WAIT状态。按照推荐的方式,Mutext定义了一个集成AQS的静态内部类Sync,并且重写了AQS的tryAcquire等等方法,而对state的更新也是利用了getState,setState,compareAndSetStaste这三个方法。在实现lock接口中方法也是调用AQS提供的模板方法(因为Sync继承了AQS)。

从这个例子就可以很清楚的看出来,在同步组件的实现上主要利用了AQS,而AQS“屏蔽”了同步状态的修改,线程排队等底层实现,通过AQS的模板方法可以很方便的给同步组件的实现警醒调用。而针对用户来说,只需要调用同步组件提供的方法来实现并发编程即可,同时在新建一个同步组件时需要把握的两个关键点是:

实现同步组件时推荐定义继承AQS的静态内部类,并重写需要的proyected修饰的方法;

同步组件语义的实现依赖于AQS的模板方法,而AQS模板方法有依赖于AQS的子类重写的方法。

通俗的说,因为AQS整体设计思路采用模板方法设计模式,同步组件以及AQS的功能实际上分别切换成各自的两部分:

同步组件实现者的角度:

通过可重新写的方法:

独占式:

tryAcquire()(独占式获取同步状态);

tryRelease()(独占式释放同步状态);

共享式:

tryAcquireShared()(共享式获取同步状态)

tryReleaseShared()(共享式释放同步状态)

告诉AQS怎么判断当前同步状态是否成功获取或者是否成功释放。

同步组件专注于对当前同步状态的逻辑判断,从而实现自己的同步语义。这句话比较抽象,举个例子,上面的Mutex例子中通过tryAcquire方法实现自己的同步语义,在该方法中如果当前同步状态为0(即该同步组件没有被任何线程获取),当前线程可以获取同时将状态更改为1返回true,否则,该组件只能在同一时刻被线程占用,mutex专注于获取释放的逻辑来实现自己想要表达的同步语义。

AQS角度

而对AQS来说,只需要同步组件返回的true和false即可,因为AQS会对true和fals会有不同的操作,true会认为当前线程获取同步组件成功直接返回,而false的话AQS会将当前献策还给你插入同步队列等一系列的方法。

总的来说,同步组件通过重写AQS的方法实现自己想要表达的同步语义,而AQS只需要同步组件表达true和false即可,AQS会针对true和false不同的情况做不同的处理。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK