欢迎访问昆山宝鼎软件有限公司网站! 设为首页 | 网站地图 | XML | RSS订阅 | 宝鼎邮箱 | 宝鼎售后问题提交 | 后台管理


新闻资讯

MENU

软件开发知识

并刊行列 无界阻塞行 CAD加密 列 LinkedBlockingQueue 道理探究

点击: 次  来源:宝鼎软件 时间:2017-07-28

原文出处: 本日你不格斗来日诰日你就落伍

一、媒介

前面先容了利用CAS实现的非阻塞行列ConcurrentLinkedQueue,下面就来先容下利用独有锁实现的阻塞行列LinkedBlockingQueue的实现

二、 LinkedBlockingQueue类图布局

并刊队列 无界阻塞行 CAD加密 列 LinkedBlockingQueue 原理探究
如图LinkedBlockingQueue中也有两个Node别离用来存放首尾节点,而且内里有个初始值为0的原子变量count用来记录行列元素个数,别的内里有两个ReentrantLock的独有锁,别离用来节制元素入队和出队加锁,个中takeLock用来节制同时只有一个线程可以从行列获取元素,其他线程必需期待,putLock节制同时只能有一个线程可以获取锁去添加元素,其他线程必需期待。别的notEmpty和notFull用来实现入队和出队的同步。 别的由于进出队是两个非公正独有锁,所以可以同时又一个线程入队和一个线程出队,其实这个是个出产者-消费者模子。

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

/* Current number of elements /
private final AtomicInteger count = new AtomicInteger(0);

public static final int   MAX_VALUE = 0x7fffffff;

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

  public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    //初始化首尾节点
    last = head = new Node<E>(null);
}

如图默认行列容量为0x7fffffff;用户也可以本身指定容量。

三、必备基本

3.1 ReentrantLock

可以参考 https://www.atatech.org/articles/80539?flag_data_from=active

3.2 条件变量(Condition)

条件变量这里利用的是takeLock.newCondition()获取也就是说挪用ReentrantLock的要领获取的,那么可预见Condition利用了ReentrantLock的state。上面的参考没有提到所以这里串串讲下

  • 首先看下类图布局
  • 并刊队列 无界阻塞行 CAD加密 列 LinkedBlockingQueue 原理探究

    如图ConditionObject中两个node别离用来存放条件行列的首尾节点,条件行列就是挪用条件变量的await要领被阻塞后的节点构成的单向链表。别的ConditionObject还要依赖AQS的state,ConditionObject是AQS类的一个内部类。

  • awaitNanos操纵
  • public final long awaitNanos(long nanosTimeout)
            throws InterruptedException {
    
        //假如间断符号被配置了,则抛异常
        if (Thread.interrupted())
            throw new InterruptedException();
    
        //添加当前线程节点到条件行列,
        Node node = addConditionWaiter();
    
        //当前线程释放独有锁
        int savedState = fullyRelease(node);
        long lastTime = System.nanoTime();
        int interruptMode = 0;
    
        while (!isOnSyncQueue(node)) {
            if (nanosTimeout <= 0L) {
                transferAfterCancelledWait(node);
                break;
            }
            //挂起当前线程直到超时
            LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
    
            long now = System.nanoTime();
            nanosTimeout -= now - lastTime;
            lastTime = now;
        }
    
        //unpark后,当前线程从头获取锁,有大概获取不到被放到AQS的行列
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return nanosTimeout - (System.nanoTime() - lastTime);
    }
    
    
        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();
    
                //释放锁,假如失败则抛异常
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }

    首先假如当前线程间断符号被配置了,直接抛出异常。添加当前线程节点(状态为:-2)到条件行列。

    然后实验释放当前线程拥有的锁并生存当前计数,可知假如当前线程挪用awaitNano前没有利用当前条件变量地址的Reetenlock变量挪用lock可能lockInterruptibly获取到锁,会抛出IllegalMonitorStateException异常。

    然后挪用park挂起当前线程直到超时可能其他线程挪用了当前线程的unpark要领,可能挪用了当前线程的interupt要领(这时候会抛异常)。