谈到阻塞,相信各人都不会生疏了。阻塞的应用场景真的多得不要不要的,好比 出产-消费模式,限流统计等等。什么 ArrayBlockingQueue、 LinkedBlockingQueue、DelayQueue 等等,都是阻塞行列的实现啊,多简朴!
阻塞,一般有两个特性很亮眼:1. 不耗 CPU 期待;2. 线程安详;
额,要这么说也 OK 的。究竟,我们碰着的问题,到这里就够办理了。可是有没有想过,这容器的阻塞又是如何实现的呢?
好吧,掀开源码,也很简朴了:(好比 ArrayBlockingQueue 的 take、put….)
// ArrayBlockingQueue
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
// 阻塞的点
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
/**
* Inserts the specified element at the tail of this queue, waiting
* up to the specified wait time for space to become available if
* the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
// 阻塞的点
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
// 阻塞的点
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
看来,最终都是依赖了 AbstractQueuedSynchronizer 类(著名的AQS)的 await 要领,看起来像那么回事。那么这个同步器的阻塞又是如何实现的呢?
Java的代码老是好跟踪的:
// AbstractQueuedSynchronizer.await()
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 此处举办真正的阻塞
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
如上,可以看到,真正的阻塞事情又转交给了另一个东西类: LockSupport 的 park 要领了,这回跟锁扯上了干系,昆山软件开发,看起来已经越来越靠近事实了:
// LockSupport.park()
/**
* Disables the current thread for thread scheduling purposes unless the
* permit is available.
*
* <p>If the permit is available then it is consumed and the call returns
* immediately; otherwise
* the current thread becomes disabled for thread scheduling
* purposes and lies dormant until one of three things happens:
*
* <ul>
* <li>Some other thread invokes {@link #unpark unpark} with the
* current thread as the target; or
*
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
*
* <li>The call spuriously (that is, for no reason) returns.
* </ul>
*
* <p>This method does <em>not</em> report which of these caused the
* method to return. Callers should re-check the conditions which caused
* the thread to park in the first place. Callers may also determine,
* for example, the interrupt status of the thread upon return.
*
* @param blocker the synchronization object responsible for this
* thread parking
* @since 1.6
*/
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
看得出来,这里的实现就较量简捷了,先获取当前线程,配置阻塞工具,阻塞,然后清除阻塞。
好吧,到底什么是真正的阻塞,我们照旧不得而知!