欢迎访问昆山宝鼎软件有限公司网站! 设为首页 | 网站地图 | XML | RSS订阅 | 宝鼎邮箱 | 后台管理


新闻资讯

MENU

软件开发知识

invariant); } else { // must capture correct index before u

点击: 次  来源:昆山软开发 时间:2018-10-12

原文出处: 等你回去来

谈到阻塞,相信各人都不会生疏了。阻塞的应用场景真的多得不要不要的,好比 出产-消费模式,限流统计等等。什么 ArrayBlockingQueue、 LinkedBlockingQueue、DelayQueue 等等,都是阻塞行列的实现啊,多简朴!

阻塞,一般有两个特性很亮眼:1. 不耗 CPU 期待;2. 线程安详;

额,要这么说也 OK 的。究竟,我们碰着的问题,到这里就够办理了。可是有没有想过,这容器的阻塞又是如何实现的呢?

好吧,掀开源码,也很简朴了:(好比 ArrayBlockingQueue 的 take、put….)

// ArrayBlockingQueue

/**
 * Inserts the specified element at the tail of this queue, waiting
 * for space to become available if the queue is full.
 *
 * @throws InterruptedException {@inheritDoc}
 * @throws NullPointerException {@inheritDoc}
 */
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // 阻塞的点
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

/**
 * Inserts the specified element at the tail of this queue, waiting
 * up to the specified wait time for space to become available if
 * the queue is full.
 *
 * @throws InterruptedException {@inheritDoc}
 * @throws NullPointerException {@inheritDoc}
 */
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    checkNotNull(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            if (nanos <= 0)
                return false;
            // 阻塞的点
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            // 阻塞的点
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

看来,最终都是依赖了 AbstractQueuedSynchronizer 类(著名的AQS)的 await 要领,看起来像那么回事。那么这个同步器的阻塞又是如何实现的呢?

Java的代码老是好跟踪的:

// AbstractQueuedSynchronizer.await()

/**
 * Implements interruptible condition wait.
 * <ol>
 * <li> If current thread is interrupted, throw InterruptedException.
 * <li> Save lock state returned by {@link #getState}.
 * <li> Invoke {@link #release} with saved state as argument,
 *      throwing IllegalMonitorStateException if it fails.
 * <li> Block until signalled or interrupted.
 * <li> Reacquire by invoking specialized version of
 *      {@link #acquire} with saved state as argument.
 * <li> If interrupted while blocked in step 4, throw InterruptedException.
 * </ol>
 */
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // 此处举办真正的阻塞
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

如上,可以看到,真正的阻塞事情又转交给了另一个东西类: LockSupport 的 park 要领了,这回跟锁扯上了干系,昆山软件开发,看起来已经越来越靠近事实了:

// LockSupport.park()

/**
 * Disables the current thread for thread scheduling purposes unless the
 * permit is available.
 *
 * <p>If the permit is available then it is consumed and the call returns
 * immediately; otherwise
 * the current thread becomes disabled for thread scheduling
 * purposes and lies dormant until one of three things happens:
 *
 * <ul>
 * <li>Some other thread invokes {@link #unpark unpark} with the
 * current thread as the target; or
 *
 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 * the current thread; or
 *
 * <li>The call spuriously (that is, for no reason) returns.
 * </ul>
 *
 * <p>This method does <em>not</em> report which of these caused the
 * method to return. Callers should re-check the conditions which caused
 * the thread to park in the first place. Callers may also determine,
 * for example, the interrupt status of the thread upon return.
 *
 * @param blocker the synchronization object responsible for this
 *        thread parking
 * @since 1.6
 */
public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}

看得出来,这里的实现就较量简捷了,先获取当前线程,配置阻塞工具,阻塞,然后清除阻塞。

好吧,到底什么是真正的阻塞,我们照旧不得而知!