

Java 多线程(五):锁(三) - Grey Zeng
source link: https://www.cnblogs.com/greyzeng/p/16684446.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Java 多线程(五):锁(三)
作者:Grey
原文地址:
StampedLock#
StampedLock
其实是对读写锁的一种改进,它支持在读同时进行一个写操作,也就是说,它的性能将会比读写锁更快。
更通俗的讲就是在读锁没有释放的时候是可以获取到一个写锁,获取到写锁之后,读锁阻塞,这一点和读写锁一致,唯一的区别在于读写锁不支持在没有释放读锁的时候获取写锁。
StampedLock
有三种模式:
-
悲观读:允许多个线程获取悲观读锁。
-
写锁:写锁和悲观读是互斥的。
-
乐观读:无锁机制,类似于数据库中的乐观锁,它支持在不释放乐观读的时候是可以获取到一个写锁。
参考: 有没有比读写锁更快的锁?
示例代码:
悲观读 + 写锁:
package git.snippets.juc;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.StampedLock;
import java.util.logging.Logger;
// 悲观读 + 写锁
public class StampedLockPessimistic {
private static final Logger log = Logger.getLogger(StampedLockPessimistic.class.getName());
private static final StampedLock lock = new StampedLock();
//缓存中存储的数据
private static final Map<String, String> mapCache = new HashMap<>();
//模拟数据库存储的数据
private static final Map<String, String> mapDb = new HashMap<>();
static {
mapDb.put("zhangsan", "你好,我是张三");
mapDb.put("sili", "你好,我是李四");
}
private static void getInfo(String name) {
//获取悲观读
long stamp = lock.readLock();
log.info("线程名:" + Thread.currentThread().getName() + " 获取了悲观读锁" + " 用户名:" + name);
try {
if ("zhangsan".equals(name)) {
log.info("线程名:" + Thread.currentThread().getName() + " 休眠中" + " 用户名:" + name);
Thread.sleep(3000);
log.info("线程名:" + Thread.currentThread().getName() + " 休眠结束" + " 用户名:" + name);
}
String info = mapCache.get(name);
if (null != info) {
log.info("在缓存中获取到了数据");
return;
}
} catch (InterruptedException e) {
log.info("线程名:" + Thread.currentThread().getName() + " 释放了悲观读锁");
e.printStackTrace();
} finally {
//释放悲观读
lock.unlock(stamp);
}
//获取写锁
stamp = lock.writeLock();
log.info("线程名:" + Thread.currentThread().getName() + " 获取了写锁" + " 用户名:" + name);
try {
//判断一下缓存中是否被插入了数据
String info = mapCache.get(name);
if (null != info) {
log.info("获取到了写锁,再次确认在缓存中获取到了数据");
return;
}
//这里是往数据库获取数据
String infoByDb = mapDb.get(name);
//将数据插入缓存
mapCache.put(name, infoByDb);
log.info("缓存中没有数据,在数据库获取到了数据");
} finally {
//释放写锁
log.info("线程名:" + Thread.currentThread().getName() + " 释放了写锁" + " 用户名:" + name);
lock.unlock(stamp);
}
}
public static void main(String[] args) {
//线程1
Thread t1 = new Thread(() -> {
getInfo("zhangsan");
});
//线程2
Thread t2 = new Thread(() -> {
getInfo("lisi");
});
//线程启动
t1.start();
t2.start();
//线程同步
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package git.snippets.juc;
import java.util.concurrent.locks.StampedLock;
import java.util.logging.Logger;
// 乐观写
public class StampedLockOptimistic {
private static final Logger log = Logger.getLogger(StampedLockOptimistic.class.getName());
private static final StampedLock lock = new StampedLock();
private static int num1 = 1;
private static int num2 = 1;
/**
* 修改成员变量的值,+1
*
* @return
*/
private static int sum() {
log.info("求和方法被执行了");
//获取乐观读
long stamp = lock.tryOptimisticRead();
int cnum1 = num1;
int cnum2 = num2;
log.info("获取到的成员变量值,cnum1:" + cnum1 + " cnum2:" + cnum2);
try {
//休眠3秒,目的是为了让其他线程修改掉成员变量的值。
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//判断在运行期间是否存在写操作 true:不存在 false:存在
if (!lock.validate(stamp)) {
log.info("存在写操作!");
//存在写锁
//升级悲观读锁
stamp = lock.readLock();
try {
log.info("升级悲观读锁");
cnum1 = num1;
cnum2 = num2;
log.info("重新获取了成员变量的值=========== cnum1=" + cnum1 + " cnum2=" + cnum2);
} finally {
//释放悲观读锁
lock.unlock(stamp);
}
}
return cnum1 + cnum2;
}
//使用写锁修改成员变量的值
private static void updateNum() {
long stamp = lock.writeLock();
try {
num1 = 2;
num2 = 2;
} finally {
lock.unlock(stamp);
}
}
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
int sum = sum();
log.info("求和结果:" + sum);
});
t1.start();
//休眠1秒,目的为了让线程t1能执行到获取成员变量之后
Thread.sleep(1000);
updateNum();
t1.join();
log.info("执行完毕");
}
}
使用 StampedLock 的注意事项#
-
看名字就能看出来
StampedLock
不支持重入锁。 -
它适用于读多写少的情况,如果不是这种情况,请慎用,性能可能还不如
synchronized
。 -
StampedLock
的悲观读锁、写锁不支持条件变量。 -
千万不能中断阻塞的悲观读锁或写锁,如果调用阻塞线程的
interrupt()
,会导致cpu飙升,如果希望StampedLock
支持中断操作,请使用readLockInterruptibly
(悲观读锁)与writeLockInterruptibly
(写锁)。
CountDownLatch#
类似门闩的概念,可以替代join
,但是比join
灵活,因为一个线程里面可以多次countDown
,但是join
一定要等线程完成才能执行。
其底层原理是:调用await()
方法的线程会利用AQS
排队,一旦数字减为0,则会将AQS
中排队的线程依次唤醒。
代码如下:
package git.snippets.juc;
import java.util.concurrent.CountDownLatch;
/**
* CountDownLatch可以用Join替代
*/
public class CountDownLatchAndJoin {
public static void main(String[] args) {
useCountDownLatch();
useJoin();
}
public static void useCountDownLatch() {
// use countdownlatch
long start = System.currentTimeMillis();
Thread[] threads = new Thread[100000];
CountDownLatch latch = new CountDownLatch(threads.length);
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
int result = 0;
for (int i1 = 0; i1 < 1000; i1++) {
result += i1;
}
// System.out.println("Current thread " + Thread.currentThread().getName() + " finish cal result " + result);
latch.countDown();
});
}
for (Thread thread : threads) {
thread.start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("end latch down, time is " + (end - start));
}
public static void useJoin() {
long start = System.currentTimeMillis();
// use join
Thread[] threads = new Thread[100000];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
int result = 0;
for (int i1 = 0; i1 < 1000; i1++) {
result += i1;
}
// System.out.println("Current thread " + Thread.currentThread().getName() + " finish cal result " + result);
});
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long end = System.currentTimeMillis();
System.out.println("end join, time is " + (end - start));
}
}
CyclicBarrier#
类似栅栏,类比:满了20个乘客就发车 这样的场景。
比如:一个程序可能收集如下来源的数据:
程序可以并发执行,用线程操作1,2,3,然后操作完毕后再合并, 然后执行后续的逻辑操作,就可以使用CyclicBarrier
代码如下:
package git.snippets.juc;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* CyclicBarrier示例:满员发车
*
* @author <a href="mailto:[email protected]">Grey</a>
* @since 1.8
*/
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(20, () -> {
System.out.println("满了20,发车");
});
for (int i = 0; i < 100; i++) {
new Thread(() -> {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
Semaphore#
表示信号量,有如下两个操作:
s.acquire()
信号量减1
s.release()
信号量加1
到 0 以后,就不能执行了,这个可以用于限流。
底层原理是:如果没有线程许可可用,则线程阻塞,并通过 AQS 来排队,可以通过release()
方法来释放许可,当某个线程释放了某个许可后,会从 AQS 中正在排队的第一个线程依次开始唤醒,直到没有空闲许可。
Semaphore 使用示例:有N个线程来访问,我需要限制同时运行的只有信号量大小的线程数。
代码如下:
package git.snippets.juc;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* Semaphore用于限流
*/
public class SemaphoreUsage {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(1);
new Thread(() -> {
try {
semaphore.acquire();
TimeUnit.SECONDS.sleep(2);
System.out.println("Thread 1 executed");
} catch (Exception e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}).start();
new Thread(() -> {
try {
semaphore.acquire();
TimeUnit.SECONDS.sleep(2);
System.out.println("Thread 2 executed");
} catch (Exception e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}).start();
}
}
Semaphore
可以有公平和非公平的方式进行配置。
Semaphore
和CountDownLatch
的区别?
Semaphore 是信号量,可以做限流,限制 n 个线程并发,释放一个线程后就又能进来一个新的线程。
CountDownLatch 是闭锁,带有阻塞的功能,必须等到 n 个线程都执行完后,被阻塞的线程才能继续往下执行。
Guava RateLimiter#
采用令牌桶算法,用于限流
示例代码如下
package git.snippets.juc;
import com.google.common.util.concurrent.RateLimiter;
import java.util.List;
import java.util.concurrent.Executor;
/**
* @author <a href="mailto:[email protected]">Grey</a>
* @date 2021/4/21
* @since
*/
public class RateLimiterUsage {
//每秒只发出2个令牌
static final RateLimiter rateLimiter = RateLimiter.create(2.0);
static void submitTasks(List<Runnable> tasks, Executor executor) {
for (Runnable task : tasks) {
rateLimiter.acquire(); // 也许需要等待
executor.execute(task);
}
}
}
注:上述代码需要引入 Guava 包。
Phaser(Since jdk1.7)#
遗传算法,可以用这个结婚的场景模拟: 假设婚礼的宾客有 5 个人,加上新郎和新娘,一共 7 个人。 我们可以把这 7 个人看成 7 个线程,有如下步骤要执行。
-
到达婚礼现场
-
拥抱(只有新郎和新娘线程可以执行)
每个阶段执行完毕后才能执行下一个阶段,其中拥抱阶段只有新郎新娘这两个线程才能执行。
以上需求,我们可以通过 Phaser 来实现,具体代码和注释如下:
package git.snippets.juc;
import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class PhaserUsage {
static final Random R = new Random();
static WeddingPhaser phaser = new WeddingPhaser();
static void millSleep() {
try {
TimeUnit.MILLISECONDS.sleep(R.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
// 宾客的人数
final int guestNum = 5;
// 新郎和新娘
final int mainNum = 2;
phaser.bulkRegister(mainNum + guestNum);
for (int i = 0; i < guestNum; i++) {
new Thread(new Person("宾客" + i)).start();
}
new Thread(new Person("新娘")).start();
new Thread(new Person("新郎")).start();
}
static class WeddingPhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("所有人到齐");
return false;
case 1:
System.out.println("所有人吃饭");
return false;
case 2:
System.out.println("所有人离开");
return false;
case 3:
System.out.println("新郎新娘拥抱");
return true;
default:
return true;
}
}
}
static class Person implements Runnable {
String name;
Person(String name) {
this.name = name;
}
@Override
public void run() {
// 先到达婚礼现场
arrive();
// 吃饭
eat();
// 离开
leave();
// 拥抱,只保留新郎和新娘两个线程可以执行
hug();
}
private void arrive() {
millSleep();
System.out.println("name:" + name + " 到来");
phaser.arriveAndAwaitAdvance();
}
private void eat() {
millSleep();
System.out.println("name:" + name + " 吃饭");
phaser.arriveAndAwaitAdvance();
}
private void leave() {
millSleep();
System.out.println("name:" + name + " 离开");
phaser.arriveAndAwaitAdvance();
}
private void hug() {
if ("新娘".equals(name) || "新郎".equals(name)) {
millSleep();
System.out.println("新娘新郎拥抱");
phaser.arriveAndAwaitAdvance();
} else {
phaser.arriveAndDeregister();
}
}
}
}
Exchanger#
用于线程之间交换数据,exchange()
方法是阻塞的,所以要两个exchange
行为都执行到才会触发交换。
package git.snippets.juc;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
/**
* Exchanger用于两个线程之间交换变量
*/
public class ExchangerUsage {
static Exchanger<String> semaphore = new Exchanger<>();
public static void main(String[] args) {
new Thread(() -> {
String s = "T1";
try {
s = semaphore.exchange(s);
TimeUnit.SECONDS.sleep(2);
System.out.println("Thread 1(T1) executed, Result is " + s);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
String s = "T2";
try {
s = semaphore.exchange(s);
TimeUnit.SECONDS.sleep(2);
System.out.println("Thread 2(T2) executed, Result is " + s);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
LockSupport#
其他锁的底层用的是AQS
原先让线程等待需要wait/await
,现在仅需要LockSupport.park()
原先叫醒线程需要notify/notifyAll
,现在仅需要LockSupport.unpark()
, LockSupport.unpark()
还可以叫醒指定线程,
示例代码:
package git.snippets.juc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
/**
* 阻塞指定线程,唤醒指定线程
*/
public class LockSupportUsage {
public static void main(String[] args) {
Thread t = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
if (i == 5) {
LockSupport.park();
}
if (i == 8) {
LockSupport.park();
}
TimeUnit.SECONDS.sleep(1);
System.out.println(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
// unpark可以先于park调用
//LockSupport.unpark(t);
try {
TimeUnit.SECONDS.sleep(8);
} catch (InterruptedException e) {
e.printStackTrace();
}
LockSupport.unpark(t);
System.out.println("after 8 seconds");
}
}
实现一个监控元素的容器#
实现一个容器,提供两个方法
// 向容器中增加一个元素
void add(T t);
// 返回容器大小
int size();
有两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束
方法 1. 使用wait + notify
实现
方法 2. 使用CountDownLatch
实现
方法 3. 使用LockSupport
实现
代码如下:
package git.snippets.juc;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
// 实现一个容器,提供两个方法,add,size,有两个线程,
// 线程1添加10个元素到容器中,
// 线程2实现监控元素的个数,
// 当个数到5个时,线程2给出提示并结束
public class MonitorContainer {
public static void main(String[] args) {
useLockSupport();
// useCountDownLatch();
// useNotifyAndWait();
}
/**
* 使用LockSupport
*/
private static void useLockSupport() {
System.out.println("use LockSupport...");
Thread adder;
List<Object> list = Collections.synchronizedList(new ArrayList<>());
Thread finalMonitor = new Thread(() -> {
LockSupport.park();
if (match(list)) {
System.out.println("filled 5 elements size is " + list.size());
LockSupport.unpark(null);
}
});
adder = new Thread(() -> {
for (int i = 0; i < 10; i++) {
increment(list);
if (match(list)) {
LockSupport.unpark(finalMonitor);
}
}
});
adder.start();
finalMonitor.start();
}
/**
* 使用CountDownLatch
*/
private static void useCountDownLatch() {
System.out.println("use CountDownLatch...");
List<Object> list = Collections.synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(5);
Thread adder = new Thread(() -> {
for (int i = 0; i < 10; i++) {
increment(list);
if (i <= 4) {
latch.countDown();
}
}
});
Thread monitor = new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
if (match(list)) {
System.out.println("filled 5 elements");
}
});
adder.start();
monitor.start();
}
/**
* notify + wait 实现
*/
private static void useNotifyAndWait() {
System.out.println("use notify and wait...");
List<Object> list = Collections.synchronizedList(new ArrayList<>());
final Object o = new Object();
Thread adder = new Thread(() -> {
synchronized (o) {
for (int i = 0; i < 10; i++) {
increment(list);
if (match(list)) {
o.notify();
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("add finished");
o.notify();
}
});
Thread monitor = new Thread(() -> {
synchronized (o) {
if (match(list)) {
System.out.println("5 elements added " + list.size());
o.notify();
try {
o.wait();
System.out.println("monitor finished");
o.notify();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
adder.start();
monitor.start();
}
/**
* 只要是5的倍数,就循环打印
*/
private static void useNotifyAndWaitLoop() {
List<Object> list = Collections.synchronizedList(new ArrayList<>());
final Object o = new Object();
Thread adder = new Thread(() -> {
synchronized (o) {
for (; ; ) {
increment(list);
if (match(list)) {
o.notify();
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
});
Thread monitor = new Thread(() -> {
synchronized (o) {
while (true) {
if (match(list)) {
System.out.println("filled 5 elements");
}
o.notify();
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
adder.start();
monitor.start();
}
private static void increment(List<Object> list) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
list.add(new Object());
System.out.println("list add the ele, size is " + list.size());
}
private static boolean match(List<Object> list) {
return list.size() % 5 == 0;
}
}
生产者消费者问题#
写一个固定容量的同步容器,拥有put
和get
方法,以及getCount
方法,能够支持 2 个生产者线程以及 10 个消费者线程的阻塞调用。
方法 1. 使用wait/notifyAll
方法 2. ReentrantLock
的Condition
,本质就是等待队列
package git.snippets.juc;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
// 写一个固定容量的同步容器,拥有put和get方法,以及getCount方法,能够支持2个生产者线程以及10个消费者线程的阻塞调用。
public class ProducerAndConsumer {
public static void main(String[] args) {
// MyContainerByCondition container = new MyContainerByCondition(100);
MyContainerByNotifyAndWait container = new MyContainerByNotifyAndWait(100);
for (int i = 0; i < 25; i++) {
new Thread(container::get).start();
}
for (int i = 0; i < 20; i++) {
new Thread(() -> container.put(new Object())).start();
}
}
}
// 使用ReentrantLock的Condition
class MyContainerByCondition {
static ReentrantLock lock = new ReentrantLock();
final int MAX;
private final LinkedList<Object> list = new LinkedList<>();
Condition consumer = lock.newCondition();
Condition producer = lock.newCondition();
public MyContainerByCondition(int limit) {
this.MAX = limit;
}
public void put(Object object) {
lock.lock();
try {
while (getCount() == MAX) {
System.out.println("container is full");
try {
producer.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(object);
consumer.signalAll();
System.out.println("contain add a object, current size " + getCount());
} finally {
lock.unlock();
}
}
public Object get() {
lock.lock();
try {
while (getCount() == 0) {
try {
System.out.println("container is empty");
consumer.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Object object = list.removeFirst();
producer.signalAll();
System.out.println("contain get a object, current size " + getCount());
return object;
} finally {
lock.unlock();
}
}
public synchronized int getCount() {
return list.size();
}
}
// 使用synchronized的wait和notifyAll
class MyContainerByNotifyAndWait {
LinkedList<Object> list = null;
final int limit;
MyContainerByNotifyAndWait(int limit) {
this.limit = limit;
list = new LinkedList<>();
}
synchronized int getCount() {
return list.size();
}
// index 从0开始计数
synchronized Object get() {
while (list.size() == 0) {
System.out.println("container is empty");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Object o = list.removeFirst();
System.out.println("get a data");
this.notifyAll();
return o;
}
synchronized void put(Object data) {
while (list.size() > limit) {
System.out.println("container is full , do not add any more");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(data);
System.out.println("add a data");
this.notifyAll();
}
}
说明#
本文涉及到的所有代码和图例
更多内容见:Java 多线程
参考资料#
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK