1.CompletionService介绍
将生产新的异步任务与使用已完成任务的结果分离开来的服务
。
生产者 submit 执行的任务。使用者take 已完成的任务,并按照完成这些任务的完成顺序处理它们的结果。例如,CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分提交,然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能与所请求的顺序不同。
通常,CompletionService 依赖于一个单独的Executor 来实际执行任务,在这种情况下,CompletionService只管理一个内部完成队列。
ExecutorCompletionService 类提供了此方法的一个实现。
实现接口:
任务代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class CallbleTask implements Callable<Integer> {
private int sleepSeconds;
private int returnValue;
public CallbleTask(int sleepSeconds, int returnValue) { this.sleepSeconds = sleepSeconds; this.returnValue = returnValue; }
@Override public Integer call() throws Exception { System.out.println("begin to execute.");
TimeUnit.SECONDS.sleep(sleepSeconds);
System.out.println("end to execute.");
return returnValue; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class Main {
public static void main(String[] args) { int taskSize = 5;
ExecutorService executor = Executors.newFixedThreadPool(taskSize);
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor); int sleep = 5; for (int i = 1; i <= taskSize; i++) { int value = i; completionService.submit(new CallbleTask(sleep, value)); }
for (int i = 0; i < taskSize; i++) { try { System.out.println(completionService.take().get());
} catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
System.out.println("执行完毕...."); executor.shutdown(); }
}
|
通过这个程序可以看出来,简化了使用线程池提交一个Callable任务之后通过获取一个Future来轮询get结果,代码没有使用CompletionService简单!
2. 实现原理
1 2 3 4 5 6 7
| public interface CompletionService<V> { Future<V> submit(Callable<V> task); Future<V> submit(Runnable task, V result); Future<V> take() throws InterruptedException; Future<V> poll(); Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; }
|
1 2 3 4 5 6
| public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; }
|
接下来看看java.util.concurrent.ExecutorCompletionService.newTaskFor(Runnable, V)源码:
1 2 3 4 5 6
| private RunnableFuture<V> newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask<V>(task, result); else return aes.newTaskFor(task, result); }
|
1 2 3
| protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
|
java.util.concurrent.CompletionService.submit(Callable<Integer>)实现就是返回FutureTask的 private Object outcome结果。