欢迎访问昆山宝鼎软件有限公司网站! 设为首页 | 网站地图 | XML | RSS订阅 | 宝鼎邮箱 | 后台管理


新闻资讯

MENU

软件开发知识

invokeAll()在所有任务都完成( 劳务派遣信息管理系统 包括成功/被中断/超时)后才会返回

点击: 次  来源:昆山软开发 时间:2017-12-03

原文出处: 猴子007

ExecutorService中界说了两个批量执行任务的要领,invokeAll()和invokeAny(),在批量执行或多选一的业务场景中很是利便。invokeAll()在所有任务都完成(包罗乐成/被间断/超时)后才会返回,invokeAny()在任意一个任务乐成(或ExecutorService被间断/超时)后就会返回。

AbstractExecutorService实现了这两个要领,本文将先后阐明invokeAll()和invokeAny()两个要领的源码实现。

invokeAll()

invokeAll()在所有任务都完成(包罗乐成/被间断/超时)后才会返回。有不限时和限时版本,从更简朴的不限时版入手。

不限时版

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks) {
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            execute(f);
        }
        for (int i = 0, size = futures.size(); i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {
                try {
                    f.get(); // 无所谓先执行哪个任务的get()要领
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

8-12行,先将所有任务都提交到线程池(虽然,任何ExecutorService均可)中。

严格来说,不是“提交”,而是“执行”。执行大概是同步或异步的,取决于线程池的计策。不外由于我们仅接头异步环境(同步同理),用“提交”一词更容易领略。下同。
13-22行,for轮回的目标是阻塞挪用invokeAll的线程,直到所有任务都执行完毕。虽然我们也可以利用其他方法实现阻塞,不外这种方法是最简朴的:

  • 15行假如f.isDone()返回true,则当前任务已竣事,继承查抄下一个任务;不然,挪用f.get()让线程阻塞,直到当前任务竣事。
  • 17行无所谓先执行哪一个FutureTask实例的get()要领。由于所有任务并发执行,总体阻塞时间取决于于是耗时最长的任务,从而实现了invodeAll的阻塞挪用。
  • 18-20行没有捕捉InterruptedException。假如有任务被间断,主线程将抛出InterruptedException,以响应间断。
  • 最后,为防备在全部任务竣事之前过早退出,23行、25-29行相共同,假如done不为true(未执行到40行就退出了)则打消全部任务。

    限时版

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));
            final long deadline = System.nanoTime() + nanos;
            final int size = futures.size();
            // Interleave time checks and calls to execute in case
            // executor doesn't have any/much parallelism.
            for (int i = 0; i < size; i++) {
                execute((Runnable)futures.get(i));
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) // 实时查抄是否超时
                    return futures;
            }
            for (int i = 0; i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    if (nanos <= 0L) // 实时查抄是否超时
                        return futures;
                    try {
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    nanos = deadline - System.nanoTime();
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

    10-11行,先将所有任务封装为FutureTask,添加到futures列表中。

    18-23行,每提交一个任务,就立即判定是否超时。这样的话,假如在任务全部提交到线程池中之前,昆山软件开发,就已经到达了超时时间,昆山软件开发,则可以或许尽快查抄出超时,竣事提交并退出。