频道栏目
首页 > 资讯 > Java > 正文

java并发包详解(jdk7)

16-10-19        来源:[db:作者]  
收藏   我要投稿

在此对java并发包做一个大致总结,如有错误,请指正。
juc包的总体结构大致如下
这里写图片描述
外层框架主要有Lock(ReentrantLock、ReadWriteLock等)、同步器(semaphores等)、阻塞队列(BlockingQueue等)、Executor(线程池)、并发容器(ConcurrentHashMap等)、还有Fork/Join框架;
内层有AQS(AbstractQueuedSynchronizer类,锁功能都由他实现)、非阻塞数据结构、原子变量类(AtomicInteger等无锁线程安全类)三种。
底层就实现是volatile和CAS。整个并发包其实都是由这两种思想构成的。

volatile

理解volatile特性的一个好方法是把对volatile变量的单个读/写,看成是使用同一个锁对这
些单个读/写操作做了同步。一个volatile变量的单个读/写操作,与一个普通变量的读/写操作都
是使用同一个锁来同步,它们之间的执行效果相同。
volatile具有的特性。
可见性。对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写
入。
原子性:对任意单个volatile变量的读/写具有原子性,但类似于volatile++这种复合操作不
具有原子性。

volatile读的内存语义如下。
当读一个volatile变量时,JMM会把该线程对应的本地内存置为无效。线程接下来将从主
内存中读取共享变量。
volatile写的内存语义如下。
当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量值刷新到主内
存。

为了实现volatile的内存语义,编译器在生成字节码时,会在指令序列中插入内存屏障来
禁止特定类型的处理器重排序。对于编译器来说,发现一个最优布置来最小化插入屏障的总
数几乎不可能。为此,JMM采取保守策略。下面是基于保守策略的JMM内存屏障插入策略。
·在每个volatile写操作的前面插入一个StoreStore屏障。
·在每个volatile写操作的后面插入一个StoreLoad屏障。
·在每个volatile读操作的后面插入一个LoadLoad屏障。
·在每个volatile读操作的后面插入一个LoadStore屏障。

上面内存屏障的简单意思就是:StoreStore屏障,禁止上面的写操作和下面的volatile写重排序;StoreLoad屏障,禁止上面的写操作和下面的volatile读重排序;LoadLoad屏障,禁止上面的读/写操作和下面的volatile读操作重排序;LoadStore屏障,禁止上面的读操作和下面的volatile写操作重排序。

由于Java的CAS同时具有volatile读和volatile写的内存语义,因此Java线程之间的通信现
在有了下面4种方式。
1)A线程写volatile变量,随后B线程读这个volatile变量。
2)A线程写volatile变量,随后B线程用CAS更新这个volatile变量。
3)A线程用CAS更新一个volatile变量,随后B线程用CAS更新这个volatile变量。
4)A线程用CAS更新一个volatile变量,随后B线程读这个volatile变量。

总结一下适合使用volatile变量的使用条件(必须满足所有条件):
1、对变量的写操作不依赖变量的当前值,或者你能确保只有单线程更新变量的值。(简单来说就是单线程写,多线程读的场景)
2、该变量不会与其他状态变量一起纳入不变性条件中。
3、在访问变量时不需要加锁。(如果要加锁的话用普通变量就行了,没必要用volatile了)

锁的实现

锁的释放和获取的内存语义
当线程释放锁时,JMM会把该线程对应的本地内存中的共享变量刷新到主内存中。
当线程获取锁时,JMM会把该线程对应的本地内存置为无效。从而使得被监视器保护的
临界区代码必须从主内存中读取共享变量。
总结一下就是:
线程A释放一个锁,实质上是线程A向接下来将要获取这个锁的某个线程发出了(线程A
对共享变量所做修改的)消息。
·线程B获取一个锁,实质上是线程B接收了之前某个线程发出的(在释放这个锁之前对共
享变量所做修改的)消息。
·线程A释放锁,随后线程B获取这个锁,这个过程实质上是线程A通过主内存向线程B发
送消息。
这里借助ReentrantLock的源代码,来分析锁内存语义的具体实现机制。
在ReentrantLock中,调用lock()方法获取锁;调用unlock()方法释放锁。

public void lock() {
        sync.lock();
    }
public void unlock() {
        sync.release(1);
    }

它的锁由静态内部类Sync实现,分为公平锁和非公平锁。

abstract static class Sync extends AbstractQueuedSynchronizer 
static final class NonfairSync extends Sync  //非公平锁
static final class FairSync extends Sync    //公平锁

锁的实现都依赖于Java同步器框架AbstractQueuedSynchronizer(AQS),AQS使用一个整型的volatile变量(命名为state)来维护同步状态,它是ReentrantLock内存语义实现的关键。

/**
     * The synchronization state.
     */
    private volatile int state;

拿非公平锁的实现举例

final void lock() {
//AQS内方法,采用CAS实现监视器锁,如果state为0,则获得锁,并设为1
            if (compareAndSetState(0, 1))
                //获取锁之后设置拥有锁的线程为当前线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                //如果锁已被获取,则加入等待队列
                acquire(1);
        }

protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

非公平锁在释放锁的最后写volatile变量state,在获取锁时首先读这个volatile变量。根据
volatile的happens-before规则,释放锁的线程在写volatile变量之前可见的共享变量,在获取锁
的线程读取同一个volatile变量后将立即变得对获取锁的线程可见。
compareAndSetState(0, 1)采用的CAS操作。JDK文档对该方法的说明如下:如果当前状态值等于预期值,则以原子方式将同步状态设置为给定的更新值。
可能会疑惑为什么这个方法可以完成原子操作,原因是此操作具有volatile读和写的内存语义。主要是由sun.misc.Unsafe类实现的,它是native方法,这里不做深入了。

AQS

分析一下同步器(AQS)的原理
AQS框架采用了模板模式,例如在独占模式的获取和释放,再拿ReentrantLock的非公平锁NonfairSync举例

/** 独占获取 */
final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    //AQS中并未实现tryAcquire()方法,需在子类实现
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    //NonfairSync中实现
    protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
/** 独占释放 */
public void unlock() {
        sync.release(1);
    }
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    //AQS中未实现
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    //Sync类中实现
    protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

从源码来看,同步状态的维护、获取、释放动作是由子类实现的,而后续动作入线程的阻塞、唤醒机制等则由AQS框架实现。
AQS中使用LockSupport.park() 和 LockSupport.unpark() 的本地方法实现,实现线程的阻塞和唤醒。

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);  //唤醒线程
    }

AQS内部维护了一个队列,AQS将阻塞线程封装成内部类Node的对象,并维护到这个队列中。

private transient volatile Node head;   //头结点
    private transient volatile Node tail;   //尾节点

这个队列是非阻塞的 FIFO队列,即插入移除节点的时候是非阻塞的,所以AQS内部采用CAS的方式保证节点插入和移除的原子性。

/**
     * CAS head field. Used only by enq.
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    /**
     * CAS tail field. Used only by enq.
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

Node类源码如下

static final class Node {
        /** 标记是共享模式*/    
        static final Node SHARED = new Node();
        /** 标记是独占模式*/    
        static final Node EXCLUSIVE = null;
        /** 代表线程已经被取消*/    
        static final int CANCELLED =  1;
        /** 代表后续节点需要唤醒 */   
        static final int SIGNAL    = -1;
        /** 代表线程在等待某一条件/  
        static final int CONDITION = -2;

        static final int PROPAGATE = -3;
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        /***连接到等待condition的下一个节点  */ 
        Node nextWaiter;
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

独占式同步状态获取流程,也就是acquire(int arg)方法调用流程,如图
这里写图片描述
由图所知,前驱节点为头节点且能够获取同步状态的判断条件和线程进入等待状态是获
取同步状态的自旋过程。当同步状态获取成功之后,当前线程从acquire(int arg)方法返回,如果
对于锁这种并发组件而言,代表着当前线程获取了锁。
当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能
够继续获取同步状态。通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释
放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)。

public final boolean release(int arg) {
//尝试释放锁,将拥有锁的线程设为null,把state设为0
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

分析了独占式同步状态获取和释放过程后,适当做个总结:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。

原子变量类

原子变量类指的是java.util.concurrent.atomic包下的类。
整个包下面的类实现原理都接近,就是利用volatile和CAS来实现。
拿AtomicInteger举例:

private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
      try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
      } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;

AtomicInteger类拥有一个unsafe对象,这是实现CAS的关键,private volatile int value就是它当前的值,用volatile来保证内存可见性。

public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

这个方法完成CAS,如果value==expect,则把值设为update。这是核心方法,其他方法实现基本都用到它
例如

//获取旧值设置新值
public final int getAndSet(int newValue) {
        for (;;) {
            int current = get();
            if (compareAndSet(current, newValue))
                return current;
        }
    }
    //获取旧值并加1
    public final int getAndIncrement() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return current;
        }
    }

等方法。

相关TAG标签
上一篇:08,网络
下一篇:Kotlin:类与集成
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站