This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 69fb3c2ca3f [fix][client] Fix race condition that leads to caching failed CompletableFutures in ConnectionPool (#19661) 69fb3c2ca3f is described below commit 69fb3c2ca3faa32ff12fd1270730b3517ea69220 Author: Enrico Olivelli <eolive...@apache.org> AuthorDate: Tue Feb 28 12:17:33 2023 +0100 [fix][client] Fix race condition that leads to caching failed CompletableFutures in ConnectionPool (#19661) --- .../java/org/apache/pulsar/client/impl/ConnectionPool.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 3a9a2b9b7ab..1420d81c688 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -216,6 +216,15 @@ public class ConnectionPool implements AutoCloseable { pool.computeIfAbsent(logicalAddress, a -> new ConcurrentHashMap<>()); CompletableFuture<ClientCnx> completableFuture = innerPool .computeIfAbsent(randomKey, k -> createConnection(logicalAddress, physicalAddress, randomKey)); + if (completableFuture.isCompletedExceptionally()) { + // we cannot cache a failed connection, so we remove it from the pool + // there is a race condition in which + // cleanupConnection is called before caching this result + // and so the clean up fails + cleanupConnection(logicalAddress, randomKey, completableFuture); + return completableFuture; + } + return completableFuture.thenCompose(clientCnx -> { // If connection already release, create a new one. if (clientCnx.getIdleState().isReleased()) { @@ -274,6 +283,10 @@ public class ConnectionPool implements AutoCloseable { }).exceptionally(exception -> { log.warn("[{}] Connection handshake failed: {}", cnx.channel(), exception.getMessage()); cnxFuture.completeExceptionally(exception); + // this cleanupConnection may happen before that the + // CompletableFuture is cached into the "pool" map, + // it is not enough to clean it here, we need to clean it + // in the "pool" map when the CompletableFuture is cached cleanupConnection(logicalAddress, connectionKey, cnxFuture); cnx.ctx().close(); return null;