Java并发编程系列(五)ReentrantLock源码解析:ReentrantLock有三个内部类Sync、FairSync、NonfairSync,主要类图关系用UML表示为。
首先看看成员变量
private final Sync sync;
该变量表示锁的类型,是公平锁还是非公平锁,具体要看构造函数。
再看看构造函数:
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
默认情况下使用的是非公平锁,如果传入true则使用公平锁。
再看看我们的lock方法:
public void lock() { sync.lock(); }
这里用的是Sync类的lock(),看看Sync类的lock()是怎么实现的:
abstract void lock();
恩,这里是一个抽象方法,在子类里实现,以非公平锁为例,看看其实现:
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
在前面 Java并发编程系列(四)—-CAS与原子类 中熟悉的compareAndXXX,我们看看是啥玩意:
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
该方法在AbstractQueuedSynchronizer中,是一个CAS操作,我们看看CAS操作的是哪个变量,表示的是什么意思,在静态代码块中看到:
static { try { //关键看这里,表示的是state变量 stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } // Reduce the risk of rare disastrous classloading in first call to // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 Class ensureLoaded = LockSupport.class; }
可以看到stateOffset是state的偏移量,我们看看state是表示什么:
/** * The synchronization state. */ private volatile int state;
从源码的注释可以知道表示同步的状态,结合lock方法可知,意思是当前值如果为0(意味着没有线程获取到锁),则compareAndSetState执行成功,然后执行setExclusiveOwnerThread(Thread.currentThread()),看看是个啥玩意:
protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }
这个方法是在AbstractOwnableSynchronizer抽象类里,将当前线程引用赋给AbstractOwnableSynchronizer的成员变量exclusiveOwnerThread;
如果compareAndSetState执行失败,证明有其他线程获取到了锁,则执行acquire(1):
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
看看tryAcquire,在AbstractQueuedSynchronizer中会抛出一个异常
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
而NonfairSync重写了该方法:
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //这里获取当前state的值,表示当前同步的锁状态 int c = getState(); //如果是此时没有线程持有,则用compareAndSetState持有 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //前面如果lock时获取到了锁,会把线程给AbstractOwnableSynchronizer, //这里取出来对比,如果发现是同一个线程,则表示是重入的,重入锁就是这么来的 else if (current == getExclusiveOwnerThread()) { //这里把state的值+1,表示多了多重入了一次。 int nextc = c + acquires; //这里有个值,如果重入次数大于int的最大值,会发生溢出,直接抛出Error if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //如果没有获取成功,则返回fale,表示尝试获取锁失败 return false; }
tyrAcquire获取还是失败的话,那么就调用addWaiter,参数为Node.EXCLUSIVE,Node.EXCLUSIVE是AbstractQueuedSynchronizer内部类Node的一个成员变量,值为null,表示为互斥锁。AbstractQueuedSynchronizer是同步队列,看看addWaiter是怎么添加节点的:
private Node addWaiter(Node mode) { //新建节点,将当前线程放在节点中 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure //获取尾节点 Node pred = tail; //如果尾节点不为空,证明队列非空 if (pred != null) { //将新节点前驱指向尾节点 node.prev = pred; //CAS操作将新节点设置为尾节点 if (compareAndSetTail(pred, node)) { //如果设置成功,将旧的尾节点后驱指向新的尾节点,返回尾节点。 pred.next = node; return node; } } //队列为空或者前面的CAS操作失败,则入队 enq(node); //将新建节点(也就是最新的尾节点返回 return node; } private Node enq(final Node node) { //循环操作直至成功 for (;;) { Node t = tail; //如果尾节点为空,证明队列没有初始化,要设置一个空的头节点 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; //这里注意,虽然返回的是t,也就是最新尾节点的前驱节点 //但是在addWaiter返回给acquireQueued的仍然是尾节点 return t; } } } }
这些主要是创建队列和将线程放在一个节点里入队的过程,具体步骤看注释。
再看看acquireQueued,也就是当把该节点放入队列中,返回该节点作为参数传入acquireQueued进行锁的抢占
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //这里获取addWaiter中返回的尾节点的前驱节点 final Node p = node.predecessor(); //如果前驱节点为头节点,调用tryAcquire获取锁成功,则表明前面的线程已经释放了锁 if (p == head && tryAcquire(arg)) { //如果获取成功,那么将当前节点设为头节点 setHead(node); //旧的头结点后继节点设置为空,使垃圾回收线程清除旧的头节点 p.next = null; // help GC failed = false; return interrupted; } //如果获取锁失败,检查是否应该将线程挂起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { //如果获取失败 if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获取前面一个的状态,如果为SIGNAL则返回true int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; //如果为取消状态,清除所有前面取消的队列 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; //如果前一个节点状态是其他状态,则尝试设置成SIGNAL状态,并返回不需要挂起,从而进行第二次抢占。 } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //这个是当shouldParkAfterFailedAcquire返回true时进行调用,挂起当前线程,并且返回当前线程的中断状态并清零 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park(false, 0L); setBlocker(t, null); }
这里主要是抢占锁,如果抢占失败,判断是否要将线程挂起,不用则重新下一轮抢占,要挂起则将线程挂起。注意到如果抢占到了锁,但是parkAndCheckInterrupt返回了true,那么证明线程是中断状态,由于Thread.interrupted()后会将线程状态清零,当acquireQueued将interrupted = true返回,这时要再执行selfInterrupt进行当前线程的中断。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } static void selfInterrupt() { Thread.currentThread().interrupt(); }
final void lock() { //和非公平锁相比直接acquire acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //与非公平锁相比多了个hasQueuedPredecessors,判断队列是否有前驱 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
可以看到,公平锁和非公平锁之间主要是初次竞争的时候有不同。加入队列后抢占的流程是一致的。
ReentrantLock获取锁的操作大概如下:尝试获取锁,成功就直接将当前线程放到AbstractOwnableSynchronizer中,执行获取到锁的后操作,否则进行下一步。 第一次获取失败,则尝试第二次获取,看看是不是同一个线程可重入的。 如果仍然获取失败,则将当前线程放在节点中,加入队列。
将该节点加入到锁的抢占当中。 抢占过程中失败要检查是否将线程挂起,以及检查线程的状态。 如果线程状态为中断状态,那么在获取到锁后要中断当前线程。