11-并发调用服务接口

并发调用控制耗时

并发调用100个服务接口,控制总体超时时间 60 秒,并打印耗时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.concurrent.*;

public class MultiThreadedServiceCall {

public static void main(String[] args) throws Exception {
//固定数量线程池:仅用于验证,实际场景推荐用自定义参数的线程池
ExecutorService executorService = Executors.newFixedThreadPool(100);
CompletionService<String> completionService = new ExecutorCompletionService<>(executorService);

//提交 100 个任务以同时执行
for (int i = 0; i < 100; i++) {
completionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
//模拟在 100 到 900 毫秒之间随机花费时间的服务调用
Thread.sleep((long) (Math.random() * 900 + 100));
return "Service call success! ThreadName:" + Thread.currentThread().getName();
}
});
}

long start = System.currentTimeMillis();
//将所有服务调用的总超时设置为 60 秒
long timeoutInMilliseconds = 60000;
int completedTasks = 0;
boolean allTasksCompletedSuccessfully = true;
while (completedTasks < 100 && allTasksCompletedSuccessfully) {
Future<String> future = completionService.poll(timeoutInMilliseconds - (System.currentTimeMillis() - start), TimeUnit.MILLISECONDS);

if (future != null) {
try {
String result = future.get();
//这里的 result 用于其他逻辑
System.out.println("拿到线程中服务调用结果:result = " + result);
completedTasks++;
} catch (InterruptedException | ExecutionException e) {
allTasksCompletedSuccessfully = false;
e.printStackTrace();
}
} else {
allTasksCompletedSuccessfully = false;
System.out.println("在完成所有服务调用之前总超时 " + timeoutInMilliseconds + " 毫秒!");
}
}
long end = System.currentTimeMillis() - start;
System.out.println("总耗时: " + (end / 1000) + "s");
executorService.shutdown();
}
}

具体实现过程如下:

  1. 创建一个具有 100 个线程的线程池。
  2. 创建一个已完成服务的 CompletionService,它会在每个服务执行完毕时给出 Future 对象。
  3. 使用 for 循环将 100 个服务提交到 CompletionService 中以并行执行。
  4. 记录开始时间并设置整体超时时间为 60 秒(即 60000 毫秒)。
  5. 在 while 循环中检查完成的任务数是否小于 100,同时检查所有任务是否都已成功完成。
  6. 使用 completionService.poll() 方法从 CompletionService 中取出已经完成的 Future 对象。这个方法会等待 timeoutInMilliseconds - (当前时间 - 开始时间) 的时间来等待结果。如果超时,则返回 null。
  7. 如果返回了 Future 对象,则表示该服务已经执行完毕。使用 future.get() 方法获取服务结果,并进行相应操作。累加已完成任务的数量。
  8. 如果返回 null,则表示任一服务超时,跳出 while 循环并输出信息。
  9. 关闭线程池。

并发调用容错重试

系统需要调用 100 个外部系统接口获取返回值(假定接口只返回 true 或 false,且相互间没有任何关联关系),并合并 100 个接口的返回值计算得到风控处理结果。

要求:

  1. 结果合并计算需要等到所有 100 个接口值都获取以后才能进行;
  2. 整个处理过程需要尽可能快,保证整体时间在 200ms 以内,超出这个时间则终止所有处理并整体返回 false;
  3. 需要考虑外部系统单个接口调用失败情况,具备一定重试容错能力;
  4. 手动定义处理线程池,包括参数配置以及这样做的原因。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;

public class ConcurrentCallProcessor {

//最大超时时间,单位:毫秒
private static final int TIMEOUT = 200;
//重试次数
private static final int RETRY_COUNT = 2;
//线程池
private final ExecutorService executorService;

public ConcurrentCallProcessor() {
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; //16
int maximumPoolSize = corePoolSize * 4; //64
long keepAliveTime = 60L;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(maximumPoolSize);
executorService = new ThreadPoolExecutor(
corePoolSize, //核心线程数 × 2: CPU密集型
maximumPoolSize, //最大线程数=核心线程数的2倍:队列满就用最大线程执行
keepAliveTime, //线程空闲等待时间 60s
TimeUnit.SECONDS, //单位:秒
workQueue, //等待队列:64容量,100>64所以会以最大线程数执行
Executors.defaultThreadFactory(), //默认线程工厂
new ThreadPoolExecutor.AbortPolicy() //核心线程满,等待队列满,最大线程满,则不执行抛出异常
);
}

public boolean process() {
//100个异步调用任务
List<Future<Boolean>> futures = new ArrayList<>(100);
//100个任务的返回值
//数组:√ 因为数组创建的时候会直接开辟内存
//Boolean[] results = new Boolean[100];
//List: √ 但必须要进行初始化长度和值,否则不能直接 set(index, xxx) 因为在 add() 的时候才会开辟内存进而写入值
List<Boolean> results = new ArrayList<>(Collections.nCopies(100, false));

for (int i = 0; i < 100; i++) {
final int index = i;
Future<Boolean> future = executorService.submit(() -> {
boolean result = false;
//重试指定次数,如果重试还不成功,则记录日志
for (int j = 1; j <= RETRY_COUNT; j++) {
result = callExternalSystem();
if (result) {
break;
}
if (j == 2) {
System.out.println("index=" + index + " 重试" + RETRY_COUNT + "次还不成功!");
}
}
//results[index] = result; //对应数组
results.set(index, result); //对应初始化过容量和值的List
//results.add(result); //×(错误用法) List<> results 在未初始化时,并发会丢失数据,因为内存开辟速度没有线程执行速度快
return result;
});
futures.add(future);
}
System.out.println("results.size() = " + results.size());
//results.forEach(System.out::println);

try {
boolean allDone = true;
for (Future<Boolean> future : futures) {
//等待结果返回,最大超时时间为 TIMEOUT
future.get(TIMEOUT, TimeUnit.MILLISECONDS);
//如果有接口调用超时,则设置 allDone 为 false
if (!future.isDone()) {
allDone = false;
}
}
//如果所有接口都已经返回结果,则合并计算最终结果;否则终止所有处理并返回 false
if (allDone) {
boolean finalResult = true;
for (boolean result : results) {
finalResult &= result;
}
return finalResult;
} else {
for (Future<Boolean> future : futures) {
if (!future.isDone()) {
future.cancel(true);
}
}
return false;
}
} catch (Exception ex) {
//处理异常情况,如超时、线程中断等
System.err.println("errMsg = " + ex.getMessage());
return false;
} finally {
if (executorService != null) {
executorService.shutdown();
}
}
}

private boolean callExternalSystem() {
//调用外部系统接口获取结果,返回 true 或 false
return Math.random() > 0.1;
}

//测试验证
public static void main(String[] args) {
ConcurrentCallProcessor processor = new ConcurrentCallProcessor();

long startTime = System.currentTimeMillis();
boolean result = processor.process();
long endTime = System.currentTimeMillis();

System.out.println("result: " + result);
System.out.println("总消耗时间: " + (endTime - startTime) + "ms");
}
}

该类包括一个 process 方法和一个 shutdown 方法。在 process 方法中,首先创建一个长度为 100 的 Future 数组和一个长度为 100 的结果数组,分别存储每个接口调用的 Future 和结果。然后使用线程池异步调用外部系统接口,并将结果保存到对应位置上。最后等待所有接口都返回结果,合并计算得到最终结果。

如果所有接口都返回结果,则将 allDone 标记为 true,遍历结果数组,使用按位与(&)计算得到最终结果。否则,将 allDone 标记为 false,终止所有处理并返回 false。

如果出现异常情况,如超时或线程中断等,也直接返回 false。

为了提高性能,我们使用了线程池来异步执行外部系统接口的调用,并设置了超时时间来避免等待时间过长。线程池的参数根据 CPU 核心数动态配置,以提高线程利用率和系统性能。

在 finally 的 shutdown 方法中,我们关闭线程池,释放资源。

需要注意:

  1. 在使用完毕后一定要记得关闭线程池,否则会导致内存泄露等问题。
  2. 在实际应用场景中,根据具体需求进行调整和优化。

11-并发调用服务接口
https://janycode.github.io/2017/06/28/03_数据结构/04_算法/11-并发调用服务接口/
作者
Jerry(姜源)
发布于
2017年6月28日
许可协议