频道栏目
首页 > 资讯 > 其他 > 正文

Java并发编程系列(五)ReentrantLock源码解析

17-02-10        来源:[db:作者]  
收藏   我要投稿

Java并发编程系列(五)ReentrantLock源码解析:ReentrantLock有三个内部类Sync、FairSync、NonfairSync,主要类图关系用UML表示为。
这里写图片描述
首先看看成员变量

 private final Sync sync;

该变量表示锁的类型,是公平锁还是非公平锁,具体要看构造函数。

再看看构造函数:

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

默认情况下使用的是非公平锁,如果传入true则使用公平锁。

再看看我们的lock方法:

public void lock() {
    sync.lock();
}

这里用的是Sync类的lock(),看看Sync类的lock()是怎么实现的:

abstract void lock();

恩,这里是一个抽象方法,在子类里实现,以非公平锁为例,看看其实现:

 final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

在前面 Java并发编程系列(四)—-CAS与原子类 中熟悉的compareAndXXX,我们看看是啥玩意:

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

该方法在AbstractQueuedSynchronizer中,是一个CAS操作,我们看看CAS操作的是哪个变量,表示的是什么意思,在静态代码块中看到:

static {
    try {
        //关键看这里,表示的是state变量
        stateOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        headOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        tailOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        waitStatusOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("waitStatus"));
        nextOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("next"));

    } catch (Exception ex) { throw new Error(ex); }

    // Reduce the risk of rare disastrous classloading in first call to
    // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
    Class ensureLoaded = LockSupport.class;
}

可以看到stateOffset是state的偏移量,我们看看state是表示什么:

 /**
 * The synchronization state.
 */
private volatile int state;

从源码的注释可以知道表示同步的状态,结合lock方法可知,意思是当前值如果为0(意味着没有线程获取到锁),则compareAndSetState执行成功,然后执行setExclusiveOwnerThread(Thread.currentThread()),看看是个啥玩意:

protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

这个方法是在AbstractOwnableSynchronizer抽象类里,将当前线程引用赋给AbstractOwnableSynchronizer的成员变量exclusiveOwnerThread;

如果compareAndSetState执行失败,证明有其他线程获取到了锁,则执行acquire(1):

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

看看tryAcquire,在AbstractQueuedSynchronizer中会抛出一个异常

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

而NonfairSync重写了该方法:

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    //这里获取当前state的值,表示当前同步的锁状态
    int c = getState();
    //如果是此时没有线程持有,则用compareAndSetState持有
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //前面如果lock时获取到了锁,会把线程给AbstractOwnableSynchronizer,
    //这里取出来对比,如果发现是同一个线程,则表示是重入的,重入锁就是这么来的
    else if (current == getExclusiveOwnerThread()) {
        //这里把state的值+1,表示多了多重入了一次。
        int nextc = c + acquires;
        //这里有个值,如果重入次数大于int的最大值,会发生溢出,直接抛出Error
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    //如果没有获取成功,则返回fale,表示尝试获取锁失败
    return false;
}

tyrAcquire获取还是失败的话,那么就调用addWaiter,参数为Node.EXCLUSIVE,Node.EXCLUSIVE是AbstractQueuedSynchronizer内部类Node的一个成员变量,值为null,表示为互斥锁。AbstractQueuedSynchronizer是同步队列,看看addWaiter是怎么添加节点的:

private Node addWaiter(Node mode) {
    //新建节点,将当前线程放在节点中
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    //获取尾节点
    Node pred = tail;
    //如果尾节点不为空,证明队列非空
    if (pred != null) {
        //将新节点前驱指向尾节点
        node.prev = pred;
        //CAS操作将新节点设置为尾节点
        if (compareAndSetTail(pred, node)) {
            //如果设置成功,将旧的尾节点后驱指向新的尾节点,返回尾节点。
            pred.next = node;
            return node;
        }
    }
    //队列为空或者前面的CAS操作失败,则入队
    enq(node);
    //将新建节点(也就是最新的尾节点返回
    return node;
}
 private Node enq(final Node node) {
        //循环操作直至成功
        for (;;) {
            Node t = tail;
            //如果尾节点为空,证明队列没有初始化,要设置一个空的头节点
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    //这里注意,虽然返回的是t,也就是最新尾节点的前驱节点
                    //但是在addWaiter返回给acquireQueued的仍然是尾节点
                    return t;
                }
            }
        }
    }

这些主要是创建队列和将线程放在一个节点里入队的过程,具体步骤看注释。

再看看acquireQueued,也就是当把该节点放入队列中,返回该节点作为参数传入acquireQueued进行锁的抢占

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //这里获取addWaiter中返回的尾节点的前驱节点
            final Node p = node.predecessor();
            //如果前驱节点为头节点,调用tryAcquire获取锁成功,则表明前面的线程已经释放了锁
            if (p == head && tryAcquire(arg)) {
                //如果获取成功,那么将当前节点设为头节点
                setHead(node);
                //旧的头结点后继节点设置为空,使垃圾回收线程清除旧的头节点
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //如果获取锁失败,检查是否应该将线程挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        //如果获取失败
        if (failed)
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //获取前面一个的状态,如果为SIGNAL则返回true
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
        //如果为取消状态,清除所有前面取消的队列
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    //如果前一个节点状态是其他状态,则尝试设置成SIGNAL状态,并返回不需要挂起,从而进行第二次抢占。
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
//这个是当shouldParkAfterFailedAcquire返回true时进行调用,挂起当前线程,并且返回当前线程的中断状态并清零
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

 public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    unsafe.park(false, 0L);
    setBlocker(t, null);
}

这里主要是抢占锁,如果抢占失败,判断是否要将线程挂起,不用则重新下一轮抢占,要挂起则将线程挂起。注意到如果抢占到了锁,但是parkAndCheckInterrupt返回了true,那么证明线程是中断状态,由于Thread.interrupted()后会将线程状态清零,当acquireQueued将interrupted = true返回,这时要再执行selfInterrupt进行当前线程的中断。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

公平锁的实现

final void lock() {
    //和非公平锁相比直接acquire
    acquire(1);
}
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //与非公平锁相比多了个hasQueuedPredecessors,判断队列是否有前驱
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

可以看到,公平锁和非公平锁之间主要是初次竞争的时候有不同。加入队列后抢占的流程是一致的。

小结

ReentrantLock获取锁的操作大概如下:尝试获取锁,成功就直接将当前线程放到AbstractOwnableSynchronizer中,执行获取到锁的后操作,否则进行下一步。 第一次获取失败,则尝试第二次获取,看看是不是同一个线程可重入的。 如果仍然获取失败,则将当前线程放在节点中,加入队列。

将该节点加入到锁的抢占当中。 抢占过程中失败要检查是否将线程挂起,以及检查线程的状态。 如果线程状态为中断状态,那么在获取到锁后要中断当前线程。

相关TAG标签
上一篇:PAT 1017 A除以B
下一篇:回调机制的一些理解
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站