This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 69fb3c2ca3f [fix][client] Fix race condition that leads to caching 
failed CompletableFutures in ConnectionPool (#19661)
69fb3c2ca3f is described below

commit 69fb3c2ca3faa32ff12fd1270730b3517ea69220
Author: Enrico Olivelli <eolive...@apache.org>
AuthorDate: Tue Feb 28 12:17:33 2023 +0100

    [fix][client] Fix race condition that leads to caching failed 
CompletableFutures in ConnectionPool (#19661)
---
 .../java/org/apache/pulsar/client/impl/ConnectionPool.java  | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 3a9a2b9b7ab..1420d81c688 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -216,6 +216,15 @@ public class ConnectionPool implements AutoCloseable {
                 pool.computeIfAbsent(logicalAddress, a -> new 
ConcurrentHashMap<>());
         CompletableFuture<ClientCnx> completableFuture = innerPool
                 .computeIfAbsent(randomKey, k -> 
createConnection(logicalAddress, physicalAddress, randomKey));
+        if (completableFuture.isCompletedExceptionally()) {
+            // we cannot cache a failed connection, so we remove it from the 
pool
+            // there is a race condition in which
+            // cleanupConnection is called before caching this result
+            // and so the clean up fails
+            cleanupConnection(logicalAddress, randomKey, completableFuture);
+            return completableFuture;
+        }
+
         return completableFuture.thenCompose(clientCnx -> {
             // If connection already release, create a new one.
             if (clientCnx.getIdleState().isReleased()) {
@@ -274,6 +283,10 @@ public class ConnectionPool implements AutoCloseable {
             }).exceptionally(exception -> {
                 log.warn("[{}] Connection handshake failed: {}", 
cnx.channel(), exception.getMessage());
                 cnxFuture.completeExceptionally(exception);
+                // this cleanupConnection may happen before that the
+                // CompletableFuture is cached into the "pool" map,
+                // it is not enough to clean it here, we need to clean it
+                // in the "pool" map when the CompletableFuture is cached
                 cleanupConnection(logicalAddress, connectionKey, cnxFuture);
                 cnx.ctx().close();
                 return null;

Reply via email to