并发调用控制耗时
并发调用100个服务接口,控制总体超时时间 60 秒,并打印耗时:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| import java.util.concurrent.*;
public class MultiThreadedServiceCall { public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(100); CompletionService<String> completionService = new ExecutorCompletionService<>(executorService); for (int i = 0; i < 100; i++) { completionService.submit(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep((long) (Math.random() * 900 + 100)); return "Service call success! ThreadName:" + Thread.currentThread().getName(); } }); } long start = System.currentTimeMillis(); long timeoutInMilliseconds = 60000; int completedTasks = 0; boolean allTasksCompletedSuccessfully = true; while (completedTasks < 100 && allTasksCompletedSuccessfully) { Future<String> future = completionService.poll(timeoutInMilliseconds - (System.currentTimeMillis() - start), TimeUnit.MILLISECONDS); if (future != null) { try { String result = future.get(); System.out.println("拿到线程中服务调用结果:result = " + result); completedTasks++; } catch (InterruptedException | ExecutionException e) { allTasksCompletedSuccessfully = false; e.printStackTrace(); } } else { allTasksCompletedSuccessfully = false; System.out.println("在完成所有服务调用之前总超时 " + timeoutInMilliseconds + " 毫秒!"); } } long end = System.currentTimeMillis() - start; System.out.println("总耗时: " + (end / 1000) + "s"); executorService.shutdown(); } }
|
具体实现过程如下:
- 创建一个具有 100 个线程的线程池。
- 创建一个已完成服务的 CompletionService,它会在每个服务执行完毕时给出 Future 对象。
- 使用 for 循环将 100 个服务提交到 CompletionService 中以并行执行。
- 记录开始时间并设置整体超时时间为 60 秒(即 60000 毫秒)。
- 在 while 循环中检查完成的任务数是否小于 100,同时检查所有任务是否都已成功完成。
- 使用
completionService.poll()
方法从 CompletionService 中取出已经完成的 Future 对象。这个方法会等待 timeoutInMilliseconds - (当前时间 - 开始时间) 的时间来等待结果
。如果超时,则返回 null。
- 如果返回了 Future 对象,则表示该服务已经执行完毕。使用 future.get() 方法获取服务结果,并进行相应操作。累加已完成任务的数量。
- 如果返回 null,则表示任一服务超时,跳出 while 循环并输出信息。
- 关闭线程池。
并发调用容错重试
系统需要调用 100 个外部系统接口获取返回值(假定接口只返回 true 或 false,且相互间没有任何关联关系),并合并 100 个接口的返回值计算得到风控处理结果。
要求:
- 结果合并计算需要等到所有 100 个接口值都获取以后才能进行;
- 整个处理过程需要尽可能快,保证整体时间在 200ms 以内,超出这个时间则终止所有处理并整体返回 false;
- 需要考虑外部系统单个接口调用失败情况,具备一定重试容错能力;
- 手动定义处理线程池,包括参数配置以及这样做的原因。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
| import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.*;
public class ConcurrentCallProcessor {
private static final int TIMEOUT = 200; private static final int RETRY_COUNT = 2; private final ExecutorService executorService;
public ConcurrentCallProcessor() { int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; int maximumPoolSize = corePoolSize * 4; long keepAliveTime = 60L; BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(maximumPoolSize); executorService = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workQueue, Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); }
public boolean process() { List<Future<Boolean>> futures = new ArrayList<>(100); List<Boolean> results = new ArrayList<>(Collections.nCopies(100, false));
for (int i = 0; i < 100; i++) { final int index = i; Future<Boolean> future = executorService.submit(() -> { boolean result = false; for (int j = 1; j <= RETRY_COUNT; j++) { result = callExternalSystem(); if (result) { break; } if (j == 2) { System.out.println("index=" + index + " 重试" + RETRY_COUNT + "次还不成功!"); } } results.set(index, result); return result; }); futures.add(future); } System.out.println("results.size() = " + results.size());
try { boolean allDone = true; for (Future<Boolean> future : futures) { future.get(TIMEOUT, TimeUnit.MILLISECONDS); if (!future.isDone()) { allDone = false; } } if (allDone) { boolean finalResult = true; for (boolean result : results) { finalResult &= result; } return finalResult; } else { for (Future<Boolean> future : futures) { if (!future.isDone()) { future.cancel(true); } } return false; } } catch (Exception ex) { System.err.println("errMsg = " + ex.getMessage()); return false; } finally { if (executorService != null) { executorService.shutdown(); } } }
private boolean callExternalSystem() { return Math.random() > 0.1; }
public static void main(String[] args) { ConcurrentCallProcessor processor = new ConcurrentCallProcessor();
long startTime = System.currentTimeMillis(); boolean result = processor.process(); long endTime = System.currentTimeMillis();
System.out.println("result: " + result); System.out.println("总消耗时间: " + (endTime - startTime) + "ms"); } }
|
该类包括一个 process
方法和一个 shutdown
方法。在 process
方法中,首先创建一个长度为 100 的 Future 数组和一个长度为 100 的结果数组,分别存储每个接口调用的 Future 和结果。然后使用线程池异步调用外部系统接口,并将结果保存到对应位置上。最后等待所有接口都返回结果,合并计算得到最终结果。
如果所有接口都返回结果,则将 allDone
标记为 true
,遍历结果数组,使用按位与(&)计算得到最终结果。否则,将 allDone
标记为 false
,终止所有处理并返回 false。
如果出现异常情况,如超时或线程中断等,也直接返回 false。
为了提高性能,我们使用了线程池来异步执行外部系统接口的调用,并设置了超时时间来避免等待时间过长。线程池的参数根据 CPU 核心数动态配置,以提高线程利用率和系统性能。
在 finally 的 shutdown
方法中,我们关闭线程池,释放资源。
需要注意:
- 在使用完毕后一定要记得关闭线程池,否则会导致内存泄露等问题。
- 在实际应用场景中,根据具体需求进行调整和优化。