以全局的固定顺序获取多个锁来避免死锁

类别:Java 点击:0 评论:0 推荐:

当两个或多个线程互相等待时被阻塞,就会发生死锁。例如,第一个线程被第二个线程阻塞,它在等待第二个线程持有的一个资源。而第二个线程在获得第一个线程持有的某个资源之前不会释放这个资源。由于第一个线程在获得第二个线程持有的那个资源之前不会释放它自己所持有的资源,而第二个线程在获得第一个线程持有的一个资源之前也不会释放它所持有的资源,于是这两个线程就被死锁。

在编写多线程代码时,死锁是最难处理的问题之一。因为死锁可能在最意想不到的地方发生,所以查找和修正它既费时又费力。例如,试考虑下面这段锁定了多个对象的代码。

public int sumArrays(int[] a1, int[] a2) { int value = 0; int size = a1.length; if (size == a2.length) { synchronized(a1) { //1 synchronized(a2) { //2 for (int i=0; i<size; i++) value += a1[i] + a2[i]; } } } return value; }

这段代码在求和操作中访问两个数组对象之前正确地锁定了这两个数组对象。它形式简短,编写也适合所要执行的任务;但不幸的是,它有一个潜在的问题。这个问题就是它埋下了死锁的种子,除非您在不同的线程中对相同的对象调用该方法时格外小心。要查看潜在的死锁,请考虑如下的事件序列:

创建两个数组对象,ArrayA 和 ArrayB。

线程 1 用下面的调用来调用 sumArrays 方法:
sumArrays(ArrayA, ArrayB);

线程 2 用下面的调用来调用 sumArrays 方法:
sumArrays(ArrayB, ArrayA);

线程 1 开始执行 sumArrays 方法并在 //1 处获得对参数 a1 的锁,对于这个调用而言,它就是对 ArrayA 对象的锁。

然后在 //2 处,在线程 1 获得对 ArrayB 的锁之前被抢先。

线程 2 开始执行 sumArrays 方法并在 //1 处获得对参数 a1 的锁,对于这个调用而言,它就是对 ArrayB 对象的锁。

然后线程 2 在 //2 处试图获取对参数 a2 的锁,它是对 ArrayA 对象的锁。因为这个锁当前由线程 1 持有,所以线程 2 被阻塞。

线程 1 开始执行并在 //2 处试图获取对参数 a2 的锁,它是对 ArrayB 对象的锁。因为这个锁当前由线程 2 持有,所以线程 1 被阻塞。

现在两个线程都被死锁。

避免这种问题的一种方法是让代码按固定的全局顺序获取锁。在本例中,如果线程 1 和线程 2 按相同的顺序对参数调用 sumArrays 方法,就不会发生死锁。但是,这一技术要求,多线程代码的程序员在调用那些锁定作为参数传入的对象的方法时需要格外小心。在您遇到这种死锁并不得不进行调试之前,使用这一技术的应用程序似乎不切实际。

另外,您也可以将锁定顺序嵌入对象的内部。这允许代码查询它准备为其获得锁的对象,以确定正确的锁定顺序。只要即将锁定的所有对象都支持锁定顺序表示法,并且获取锁的代码遵循这一策略,就可避免这种潜在死锁的情况。

在对象中嵌入锁定顺序的缺点是,这种实现将使内存需求和运行时成本增加。另外,在上例中应用这一技术需要在数组中有一个包装对象,用来存放锁定顺序信息。例如,试考虑下面的代码,它由前面的示例修改而来,其中实现了锁定顺序技术:

class ArrayWithLockOrder { private static long num_locks = 0; private long lock_order; private int[] arr; public ArrayWithLockOrder(int[] a) { arr = a; synchronized(ArrayWithLockOrder.class) { num_locks++; // 锁数加 1。 lock_order = num_locks; // 为此对象实例设置唯一的 lock_order。 } } public long lockOrder() { return lock_order; } public int[] array() { return arr; } } class SomeClass implements Runnable { public int sumArrays(ArrayWithLockOrder a1, ArrayWithLockOrder a2) { int value = 0; ArrayWithLockOrder first = a1; // 保留数组引用的一个 ArrayWithLockOrder last = a2; // 本地副本。 int size = a1.array().length; if (size == a2.array().length) { if (a1.lockOrder() > a2.lockOrder()) // 确定并设置对象的锁定 { // 顺序。 first = a2; last = a1; } synchronized(first) { // 按正确的顺序锁定对象。 synchronized(last) { int[] arr1 == a1.array(); int[] arr2 == a2.array(); for (int i=0; i<size; i++) value += arr1[i] + arr2[i]; } } } return value; } public void run() { //... } }

在第一个示例中,ArrayWithLockOrder 类是作为数组的一个包装提供的。每创建该类的一个新对象,该类就将 static num_locks 变量加 1。一个单独的 lock_order 实例变量被设置为 num_locks static 变量的当前值。这可以保证,对于该类的每个对象,lock_order 变量都有一个独特的值。lock_order 实例变量充当此对象相对于该类的其他对象的锁定顺序指示器。

请注意,static num_locks 变量是在 synchronized 语句中进行操作的。这是必须的,因为对象的每个实例共享该对象的 static 变量。因此,当两个线程同时创建 ArrayWithLockOrder 类的一个对象时,如果操作 static num_locks 变量的代码未作同步处理,该变量就可能被破坏。对此代码作同步处理可以保证,对于 ArrayWithLockOrder 类的每个对象,lock_order 变量都有一个独特的值。

此外还更新了 sumArrays 方法,以使它包括确定正确锁定顺序的代码。在请求锁之前,将查询每个对象以获得它的锁定顺序。编号较小的首先被锁定。此代码可以保证,不管各对象是以什么顺序传给此方法,它们总是被以相同的顺序锁定。

static num_locks 域和 lock_order 域都是作为 long 类型实现的。long 数据类型是作为 64 位有符号二进制补码整数实现的。这意味着在创建 9,223,372,036,854,775,807 个对象之后,num_locks 和 lock_order 的值将重新开始。您未必会达到这个极限,但在适当的条件下这是可能发生的。

实现嵌入的锁定顺序需要投入更多的工作,使用更多的内存,并会延长执行时间。但是,如果您的代码中可能存在这些类型的死锁,您也许会发现值得这样做。如果您无法承受额外的内存和执行开销,或者不能接受 num_locks 或 lock_order 域重新开始的可能性,则您在建立锁定对象的预定义顺序时应该仔细斟酌。

本文地址:http://com.8s8s.com/it/it18887.htm