This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2028aa2  Closing client when Proxyconnection is disconnected (#1821)
2028aa2 is described below

commit 2028aa2b476fc3922b1ab29b1ebc8456ab0f5bb1
Author: Jai Asher <j...@ccs.neu.edu>
AuthorDate: Fri Jun 22 09:22:16 2018 -0700

    Closing client when Proxyconnection is disconnected (#1821)
    
    * Closing client when Proxyconnection is disconnected
    
    * Addressed massakam's PR comments
    
    * Fixed test failures with authorization disabled
---
 .../apache/pulsar/client/impl/ConnectionPool.java  |  4 +-
 .../pulsar/proxy/server/ProxyConnection.java       | 10 +++-
 .../pulsar/proxy/server/ProxyConnectionPool.java   | 59 ++++++++++++++++++++++
 3 files changed, 69 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 9b08169..afba7d2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -58,13 +58,13 @@ import io.netty.resolver.dns.DnsNameResolverBuilder;
 import io.netty.util.concurrent.Future;
 
 public class ConnectionPool implements Closeable {
-    private final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, 
CompletableFuture<ClientCnx>>> pool;
+    protected final ConcurrentHashMap<InetSocketAddress, 
ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
 
     private final Bootstrap bootstrap;
     private final EventLoopGroup eventLoopGroup;
     private final int maxConnectionsPerHosts;
 
-    private final DnsNameResolver dnsResolver;
+    protected final DnsNameResolver dnsResolver;
 
     private static final int MaxMessageSize = 5 * 1024 * 1024;
     public static final String TLS_HANDLER = "tls";
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 8e9effd..08b5b2b 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -140,6 +140,10 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
             directProxyHandler.outboundChannel.close();
         }
 
+        if (client != null) {
+            client.close();
+        }
+        
         LOG.info("[{}] Connection closed", remoteAddress);
     }
 
@@ -274,7 +278,9 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
             this.clientAuthentication = clientConf.getAuthentication();
 
             if (!service.getConfiguration().isAuthenticationEnabled()) {
-                this.client = new PulsarClientImpl(clientConf, 
service.getWorkerGroup());
+                this.client = new PulsarClientImpl(clientConf, 
service.getWorkerGroup(),
+                        new ProxyConnectionPool(clientConf, 
service.getWorkerGroup(), () -> new ClientCnx(clientConf,
+                                service.getWorkerGroup())));
                 return true;
             }
             
@@ -311,7 +317,7 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
     private PulsarClientImpl createClient(final ClientConfigurationData 
clientConf, final String clientAuthData,
             final String clientAuthMethod) throws PulsarClientException {
         return new PulsarClientImpl(clientConf, service.getWorkerGroup(),
-                new ConnectionPool(clientConf, service.getWorkerGroup(), () -> 
new ProxyClientCnx(clientConf,
+                new ProxyConnectionPool(clientConf, service.getWorkerGroup(), 
() -> new ProxyClientCnx(clientConf,
                         service.getWorkerGroup(), clientAuthRole, 
clientAuthData, clientAuthMethod)));
     }
 
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
new file mode 100644
index 0000000..73fd9ab
--- /dev/null
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.EventLoopGroup;
+
+public class ProxyConnectionPool extends ConnectionPool {
+    public ProxyConnectionPool(ClientConfigurationData clientConfig, 
EventLoopGroup eventLoopGroup,
+            Supplier<ClientCnx> clientCnxSupplier) {
+        super(clientConfig, eventLoopGroup, clientCnxSupplier);
+    }
+
+    @Override
+    public void close() throws IOException {
+        log.info("Closing ProxyConnectionPool.");
+        pool.forEach((address, clientCnxPool) -> {
+            if (clientCnxPool != null) {
+                clientCnxPool.forEach((identifier, clientCnx) -> {
+                    if (clientCnx != null && clientCnx.isDone()) {
+                        try {
+                            clientCnx.get().close();
+                        } catch (InterruptedException | ExecutionException e) {
+                            log.error("Unable to close get client connection 
future.", e);
+                        }
+                    }
+                });
+            }
+        });
+        dnsResolver.close();
+    }
+
+    private static final Logger log = 
LoggerFactory.getLogger(ProxyConnectionPool.class);
+}
\ No newline at end of file

Reply via email to