This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 6a25e04 Fix the ConcurrentModificationException by revert #1832
(#1886)
6a25e04 is described below
commit 6a25e04f2bf62bf5958eb0f773d6df433659e4ff
Author: GuoJiwei <[email protected]>
AuthorDate: Fri Aug 6 17:40:56 2021 +0800
Fix the ConcurrentModificationException by revert #1832 (#1886)
---
.../shenyu/common/healthcheck/HealthCheckTask.java | 25 ++++++++++++++++------
1 file changed, 19 insertions(+), 6 deletions(-)
diff --git
a/shenyu-common/src/main/java/org/apache/shenyu/common/healthcheck/HealthCheckTask.java
b/shenyu-common/src/main/java/org/apache/shenyu/common/healthcheck/HealthCheckTask.java
index 998d58f..f51dec2 100644
---
a/shenyu-common/src/main/java/org/apache/shenyu/common/healthcheck/HealthCheckTask.java
+++
b/shenyu-common/src/main/java/org/apache/shenyu/common/healthcheck/HealthCheckTask.java
@@ -31,6 +31,7 @@ import org.apache.shenyu.common.utils.UpstreamCheckUtils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
@@ -56,7 +57,7 @@ public final class HealthCheckTask implements Runnable {
@Getter
private final AtomicBoolean checkStarted = new AtomicBoolean(false);
- private final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+ private final List<CompletableFuture<UpstreamWithSelectorId>> futures =
Lists.newArrayList();
private final int checkInterval;
@@ -125,9 +126,9 @@ public final class HealthCheckTask implements Runnable {
synchronized (lock) {
if (tryStartHealthCheck()) {
doHealthCheck();
+ waitFinish();
}
}
- waitFinish();
} catch (Exception e) {
log.error("[Health Check] Meet problem: ", e);
} finally {
@@ -141,8 +142,15 @@ public final class HealthCheckTask implements Runnable {
}
private void check(final Map<String, List<DivideUpstream>> map) {
- map.forEach((k, v) -> v.forEach(i -> futures.add(
- CompletableFuture.supplyAsync(() -> check(k, i),
executor).thenAccept(d -> putEntityToMap(d)))));
+ for (Map.Entry<String, List<DivideUpstream>> entry : map.entrySet()) {
+ String key = entry.getKey();
+ List<DivideUpstream> value = entry.getValue();
+ for (DivideUpstream upstream : value) {
+ CompletableFuture<UpstreamWithSelectorId> future =
CompletableFuture.supplyAsync(() -> check(key, upstream), executor);
+
+ futures.add(future);
+ }
+ }
}
private UpstreamWithSelectorId check(final String selectorId, final
DivideUpstream upstream) {
@@ -183,8 +191,13 @@ public final class HealthCheckTask implements Runnable {
return checkStarted.compareAndSet(false, true);
}
- private void waitFinish() {
- CompletableFuture.allOf(futures.toArray(new
CompletableFuture[]{})).thenAccept(__ -> futures.clear()).join();
+ private void waitFinish() throws ExecutionException, InterruptedException {
+ for (CompletableFuture<UpstreamWithSelectorId> future : futures) {
+ UpstreamWithSelectorId entity = future.get();
+ putEntityToMap(entity);
+ }
+
+ futures.clear();
}
private void putEntityToMap(final UpstreamWithSelectorId entity) {