一、媒介
前面先容了利用CAS实现的非阻塞行列ConcurrentLinkedQueue,下面就来先容下利用独有锁实现的阻塞行列LinkedBlockingQueue的实现
二、 LinkedBlockingQueue类图布局

如图LinkedBlockingQueue中也有两个Node别离用来存放首尾节点,而且内里有个初始值为0的原子变量count用来记录行列元素个数,别的内里有两个ReentrantLock的独有锁,别离用来节制元素入队和出队加锁,个中takeLock用来节制同时只有一个线程可以从行列获取元素,其他线程必需期待,putLock节制同时只能有一个线程可以获取锁去添加元素,其他线程必需期待。别的notEmpty和notFull用来实现入队和出队的同步。 别的由于进出队是两个非公正独有锁,所以可以同时又一个线程入队和一个线程出队,其实这个是个出产者-消费者模子。
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
/* Current number of elements /
private final AtomicInteger count = new AtomicInteger(0);
public static final int MAX_VALUE = 0x7fffffff;
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
//初始化首尾节点
last = head = new Node<E>(null);
}
如图默认行列容量为0x7fffffff;用户也可以本身指定容量。
三、必备基本
3.1 ReentrantLock
可以参考 https://www.atatech.org/articles/80539?flag_data_from=active
3.2 条件变量(Condition)
条件变量这里利用的是takeLock.newCondition()获取也就是说挪用ReentrantLock的要领获取的,那么可预见Condition利用了ReentrantLock的state。上面的参考没有提到所以这里串串讲下

如图ConditionObject中两个node别离用来存放条件行列的首尾节点,条件行列就是挪用条件变量的await要领被阻塞后的节点构成的单向链表。别的ConditionObject还要依赖AQS的state,ConditionObject是AQS类的一个内部类。
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
//假如间断符号被配置了,则抛异常
if (Thread.interrupted())
throw new InterruptedException();
//添加当前线程节点到条件行列,
Node node = addConditionWaiter();
//当前线程释放独有锁
int savedState = fullyRelease(node);
long lastTime = System.nanoTime();
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
//挂起当前线程直到超时
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
}
//unpark后,当前线程从头获取锁,有大概获取不到被放到AQS的行列
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return nanosTimeout - (System.nanoTime() - lastTime);
}
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
//释放锁,假如失败则抛异常
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
首先假如当前线程间断符号被配置了,直接抛出异常。添加当前线程节点(状态为:-2)到条件行列。
然后实验释放当前线程拥有的锁并生存当前计数,可知假如当前线程挪用awaitNano前没有利用当前条件变量地址的Reetenlock变量挪用lock可能lockInterruptibly获取到锁,会抛出IllegalMonitorStateException异常。
然后挪用park挂起当前线程直到超时可能其他线程挪用了当前线程的unpark要领,可能挪用了当前线程的interupt要领(这时候会抛异常)。