6

Java并发编程实战(4)- 死锁

 3 years ago
source link: http://www.cnblogs.com/wing011203/p/14257763.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

在这篇文章中,我们主要讨论一下死锁及其解决办法。

目录

概述

在上一篇文章中,我们讨论了如何使用一个互斥锁去保护多个资源,以银行账户转账为例,当时给出的解决方法是基于Class对象创建互斥锁。

这样虽然解决了同步的问题,但是能在现实中使用吗?答案是不可以,尤其是在高并发的情况下,原因是我们使用的互斥锁的范围太大,以转账为例,我们的做法会锁定整个账户Class对象,这样会导致转账操作只能串行进行,但是在实际场景中,大量的转账操作业务中的双方是不相同的,直接在Class对象级别上加锁是不能接受的。

那如果在对象实例级别上加锁,使用细粒度锁,会有什么问题? 可能会发生死锁。

我们接下来看一下造成死锁的原因和可能的解决方案。

死锁案例

什么是死锁?

死锁是指一组互相竞争资源的线程因互相等待,导致“永久”阻塞的现象。

一般来说,当我们使用 细粒度锁 时,它在提升性能的同时,也可能会导致死锁。

我们还是以银行转账为例,来看一下死锁是如何发生的。

首先,我们先定义个BankAccount对象,来存储基本信息,代码如下。

public class BankAccount {
	private int id;
	private double balance;
	private String password;
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	public double getBalance() {
		return balance;
	}
	public void setBalance(double balance) {
		this.balance = balance;
	}
}

接下来,我们使用细粒度锁来尝试完成转账操作,代码如下。

public class BankTransferDemo {
	
	public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
		synchronized(sourceAccount) {
			synchronized(targetAccount) {
				if (sourceAccount.getBalance() > amount) {
					System.out.println("Start transfer.");
					System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
					sourceAccount.setBalance(sourceAccount.getBalance() - amount);
					targetAccount.setBalance(targetAccount.getBalance() + amount);
					System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
				}
			}
		}
	}
}

我们用下面的代码来做简单测试。

public static void main(String[] args) throws InterruptedException {
		BankAccount sourceAccount = new BankAccount();
		sourceAccount.setId(1);
		sourceAccount.setBalance(50000);
		
		BankAccount targetAccount = new BankAccount();
		targetAccount.setId(2);
		targetAccount.setBalance(20000);
		
		BankTransferDemo obj = new BankTransferDemo();
		
		Thread t1 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				obj.transfer(sourceAccount, targetAccount, 1);
			}
		});
		
		Thread t2 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				obj.transfer(targetAccount, sourceAccount, 1);
			}
		});
		
		t1.start();
		t2.start();
		
		t1.join();
		t2.join();
		
		System.out.println("Finished.");
	}

测试代码中包含了2个线程,其中t1线程循环从sourceAccount向targetAccount转账,而t2线程会循环从targetAccount向sourceAccount转账。

从运行结果来看,t1线程中的循环在运行600次左右时,t2线程也创建好,开始循环转账了,这时就会发生死锁,导致t1线程和t2线程都无法继续执行。

我们可以用下面的资源分配图来更直观的描述死锁。

rAFjI3m.png!mobile

死锁的原因和预防

并发程序一旦死锁,一般没有特别好的办法,很多时候我们只能重启应用,因此, 解决死锁问题的最好办法是规避死锁。

我们先来看一下死锁发生的条件,一个叫 Coffman 的牛人,于1971年在ACM Computing Surveys发表了一篇名为 System Deadlocks 的文章,他总结了只有以下四个条件全部满足的情况下,才会发生死锁:

  • 互斥,共享资源X和Y只能被一个线程占用。
  • 占有且等待,线程t1已经取得共享资源X,在等待共享资源Y的时候,不释放共享资源X。
  • 不可抢占,其他线程不能强行抢占线程t1占有的资源。
  • 循环等待,线程t1等待线程t2占有的资源,线程t2等待线程t1占有的资源,就是循环等待。

通过上述描述,我们能够推导出, 只要破坏上面其中一个条件,就可以避免死锁的发生。

但是第一个条件互斥,是不可以被破坏的,否则我们就没有用锁的必要了,那么我们来看如何破坏其他三个条件。

破坏占用且等待条件

如果要破坏占用且等待条件,我们可以尝试一次性申请全部资源,这样就不需要等待了。

在实现过程中,我们需要创建一个新的角色,负责同时申请和同时释放全部资源,我们可以将其称为Allocator。

我们来看一下具体的代码实现。

public class Allocator {
	
	private volatile static Allocator instance;
	
	private Allocator() {}
	
	public static Allocator getInstance() {
		if (instance == null) {
			synchronized(Allocator.class) {
				if (instance == null) {
					instance = new Allocator();
				}
			}
		}
		
		return instance;
	}
	
	private Set<Object> lockObjs = new HashSet<Object>();
	
	public synchronized boolean apply(Object... objs) {
		for (Object obj : objs) {
			if (lockObjs.contains(obj)) {
				return false;
			}
		}
		for (Object obj : objs) {
			lockObjs.add(obj);
		}
		
		return true;
	}
	
	public synchronized void free(Object... objs) {
		for (Object obj : objs) {
			if (lockObjs.contains(obj)) {
				lockObjs.remove(obj);
			}
		}
	}
}

Allocator是一个单例模式,它会使用一个Set对象来保存所有需要处理的资源,然后使用apply()和free()来同时锁定或者释放所有资源,它们会接收不固定参数。

我们来看一下新的transfer()方法应该怎么写。

public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
		Allocator allocator = Allocator.getInstance();
		while(!allocator.apply(sourceAccount, targetAccount));
		try {
			synchronized(sourceAccount) {
				synchronized(targetAccount) {
					if (sourceAccount.getBalance() > amount) {
						System.out.println("Start transfer.");
						System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
						sourceAccount.setBalance(sourceAccount.getBalance() - amount);
						targetAccount.setBalance(targetAccount.getBalance() + amount);
						System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
					}
				}
			}
		}
		finally {
			allocator.free(sourceAccount, targetAccount);
		}
	}

我们可以看到,transfer()方法中,首先获取Allocator实例,然后调用apply(),传入sourceAccount和targetAccount实例,请注意这里使用了while循环,即直到apply()返回true,才会退出循环,此时,Allocator已经锁定了sourceAccount和targetAccount,接下来,我们使用synchronized关键字来锁定sourceAccount和targetAccount,然后执行转账的业务逻辑。 这里并不是必须要用synchronized,但是这样做可以避免其他操作来影响转账操作 ,例如如果转账的过程中对sourceAccount实例进行取钱操作,如果不用synchronized,就有可能引发并发问题。

下面是测试代码。

public static void main(String[] args) throws InterruptedException {
		BankAccount sourceAccount = new BankAccount();
		sourceAccount.setId(1);
		sourceAccount.setBalance(50000);
		
		BankAccount targetAccount = new BankAccount();
		targetAccount.setId(2);
		targetAccount.setBalance(20000);
		
		BankTransferDemo obj = new BankTransferDemo();
		
		Thread t1 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				obj.transfer(sourceAccount, targetAccount, 1);
			}
		});
		
		Thread t2 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				obj.transfer(targetAccount, sourceAccount, 1);
			}
		});
		
		t1.start();
		t2.start();
		
		t1.join();
		t2.join();
		
		System.out.println("Finished.");
	}

程序是可以正常执行的,结果和我们预期一致。

在这里,我们需要 保证锁对象的不可变性 ,对于BankAccount对象来说,id属性可以看做是其主键,id相同的BankAccount实例,从业务角度来说,指向的都是同一个账户,但是对于锁对象来说,id相同的不同实例,会产生不同的锁,从而引发并发问题。

我们来看下面修改后的测试代码。

public static void main(String[] args) throws InterruptedException {
		
		
		BankTransferDemo obj = new BankTransferDemo();
		
		Thread t1 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				// 这里应该从后端获取账户实例,此处只做演示。
				BankAccount sourceAccount = new BankAccount();
				sourceAccount.setId(1);
				sourceAccount.setBalance(50000);
				
				BankAccount targetAccount = new BankAccount();
				targetAccount.setId(2);
				targetAccount.setBalance(20000);
				obj.transfer(sourceAccount, targetAccount, 1);
			}
		});
		
		Thread t2 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				// 这里应该从后端获取账户实例,此处只做演示。
				BankAccount sourceAccount = new BankAccount();
				sourceAccount.setId(1);
				sourceAccount.setBalance(50000);
				
				BankAccount targetAccount = new BankAccount();
				targetAccount.setId(2);
				targetAccount.setBalance(20000);
				obj.transfer(targetAccount, sourceAccount, 1);
			}
		});
		
		t1.start();
		t2.start();
		
		t1.join();
		t2.join();
		
		System.out.println("Finished.");
	}

上述代码中,每次转账都创建新的BankAccount实例,然后将其传入Allocator,这样做,是不能够正常处理的,因为每次使用的互斥锁都作用在不同的实例上,这一点,需要特别注意。

破坏不可抢占条件

破坏不可抢占条件很简单,解决的关键在于能够主动释放它占有的资源,但是synchronized是不能做到这一点的。

synchronized申请资源的时候,如果申请失败,线程会直接进入阻塞状态,什么都不能做,已经锁定的资源也无法释放。

我们可以使用java.util.concurrent包中的Lock对象来实现这一点,相关代码如下。

private Lock lock = new ReentrantLock();
	
    public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
        try {
            lock.lock();
            if (sourceAccount.getBalance() > amount) {
                System.out.println("Start transfer.");
                System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
                sourceAccount.setBalance(sourceAccount.getBalance() - amount);
                targetAccount.setBalance(targetAccount.getBalance() + amount);
                System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
            }
        }
        finally {
            lock.unlock();
        }
    }

破坏循环条件

破坏循环条件,需要对资源进行排序,然后按序申请资源。

我们来看下面的代码。

public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
		BankAccount left = sourceAccount;
		BankAccount right = targetAccount;
		if (sourceAccount.getId() > targetAccount.getId()) {
			left = targetAccount;
			right = sourceAccount;
		}
		synchronized(left) {
			synchronized(right) {
				if (sourceAccount.getBalance() > amount) {
					System.out.println("Start transfer.");
					System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
					sourceAccount.setBalance(sourceAccount.getBalance() - amount);
					targetAccount.setBalance(targetAccount.getBalance() + amount);
					System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
				}
			}
		}
	}

在这里,我们假设BankAccount中的 id 是主键,我们按照 id 对sourceAccount和targetAccount进行排序,之后按照 id 从小到大申请资源,这样就不会有死锁发生了。

我们在解决并发问题的时候,可能会有多种方式,我们需要评估一下各个解决方案,从中选择一个成本最低的方案。

对于我们一直谈论的转账示例,破坏循环条件可能是一个比较好的解决方法。

使用等待-通知机制

我们上面在破坏占用且等待条件时,使用了如下的死循环:

while(!allocator.apply(sourceAccount, targetAccount));

在并发量不高的情况下,这样写没有问题,但是在高并发的情况下,这样写可能需要循环太多次才能拿到锁,太消耗CPU了,属于蛮干型。

在这种情况下,一种合理的方案是:如果线程要求的条件不满足,那么线程阻塞自己,进入 等待 状态,当线程要求的条件满足后, 通知 等待的线程重新执行,这里线程阻塞就避免了循环消耗CPU的问题。

这就是我们要讨论的等待-通知机制。

Java中的等待-通知机制

Java中的等待-通知机制流程是怎样的?

线程首先获取互斥锁,当线程要求的条件不满足时,释放互斥锁,进入等待状态;当要求的条件满足时,通知等待的线程,重新获取互斥锁。

Java使用synchronized关键字配合wait()、notify()、notifyAll()三个方法实现等待-通知机制。

在并发程序中,当一个线程进入临界区后,由于某些条件没有满足,需要进入等待状态,Java对象的wait()方法能够实现这一点。当线程要求的条件满足时,Java对象的notify()和notifyAll()方法就可以通知等待的线程,它会告诉线程,你需要的条件 曾经满足过 ,之所以说曾经,是因为notify()只能保证在通知的那一时刻,条件是满足的,而被通知线程的执行时刻和通知时刻一般不会重合,所以在线程开始执行的时候,可能条件又不满足了。

另外需要注意,被通知的线程重新执行时,还需要获取互斥锁,因为之前在调用wait()方法时,互斥锁已经被释放了。

wait()、notify()和notifyAll()三个方法能够被调用的前提是已经获取了响应的互斥锁,所以这三个方法都是在synchronized{}内部被调用的。

下面我们来看一下修改后的Allocator,其中apply()和free()方法的代码如下。

public synchronized void apply(Object... objs) {
		for (Object obj : objs) {
			while (lockObjs.contains(obj)) {
				try {
					this.wait();
				} catch (InterruptedException e) {
					System.out.println(e.getMessage());
				}
			}
		}
		for (Object obj : objs) {
			lockObjs.add(obj);
		}
	}
	
	public synchronized void free(Object... objs) {
		for (Object obj : objs) {
			if (lockObjs.contains(obj)) {
				lockObjs.remove(obj);
			}
		}
		this.notifyAll();
	}

对应的transfer()方法的代码如下。

public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
	Allocator allocator = Allocator.getInstance();
	allocator.apply(sourceAccount, targetAccount);
	try {
		synchronized(sourceAccount) {
			synchronized(targetAccount) {
				if (sourceAccount.getBalance() > amount) {
					System.out.println("Start transfer.");
					System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
					sourceAccount.setBalance(sourceAccount.getBalance() - amount);
					targetAccount.setBalance(targetAccount.getBalance() + amount);
					System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
				}
			}
		}
	}
	finally {
		allocator.free(sourceAccount, targetAccount);
	}
}

运行结果和我们期望是一致的。

条件曾经满足

在上述代码中,我们可以发现,apply()方法中的判断条件之前是if,现在改成了while, while (lockObjs.contains(obj)) ,这样做可以解决 条件曾经满足 的问题。

因为当wait()返回时,有可能条件已经发生了变化,曾经条件满足,但是现在已经不满足了,所以要重新检验条件是否满足。

这是一种范式,是一种经典的做法。

notify() vs notifyAll()

notify()和notifyAll()有什么区别?

notify()会随机的通知等待队列中的一个线程, 而notifyAll()会通知等待队列中的所有线程。

我们尽量使用notifyAll()方法,因为notify()可能会导致某些线程永远不会被通知到。

假设我们有一个实例,它有资源 A、B、C、D,我们使用实例对象来创建互斥锁。

  • 线程t1申请到了A、B
  • 线程t2申请到了C、D
  • 线程t3试图申请A、B,失败,进入等待队列
  • 线程t4试图申请C、D,失败,进入等待队列
  • 此时,线程t1执行结束,释放锁
  • 线程t1调用实例的notify()来通知等待队列中的线程,有可能被通知的是线程t4,但线程t4申请的是C、D还被线程t2占用,所以线程t4只能继续等待
  • 此时,线程t2执行结束,释放锁
  • 线程t2调用实例的notify()来通知等待队列中的线程,t3或者t4只能有1个被唤醒并正常执行,另外1个则再也没有机会被唤醒

wait()和sleep()的区别

wait()方法与sleep()方法的不同之处在于,wait()方法会释放对象的“锁标志”。当调用某一对象的wait()方法后,会使当前线程暂停执行,并将当前线程放入对象等待池中,直到调用了notify()方法后,将从对象等待池中移出任意一个线程并放入锁标志等待池中,只有锁标志等待池中的线程可以获取锁标志,它们随时准备争夺锁的拥有权。当调用了某个对象的notifyAll()方法,会将对象等待池中的所有线程都移动到该对象的锁标志等待池。

sleep()方法需要指定等待的时间,它可以让当前正在执行的线程在指定的时间内暂停执行,进入阻塞状态,该方法既可以让其他同优先级或者高优先级的线程得到执行的机会,也可以让低优先级的线程得到执行机会。但是sleep()方法不会释放“锁标志”,也就是说如果有synchronized同步块,其他线程仍然不能访问共享数据。

总结一下,wait()和sleep()区别如下。

  • wait()释放资源,sleep()不释放资源
  • wait()需要被唤醒,sleep()不需要
  • wait()是object顶级父类的方法,sleep()则是Thread的方法

wait()和sleep()都会让渡CPU执行时间,等待再次调度!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK