This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 2028aa2 Closing client when Proxyconnection is disconnected (#1821) 2028aa2 is described below commit 2028aa2b476fc3922b1ab29b1ebc8456ab0f5bb1 Author: Jai Asher <j...@ccs.neu.edu> AuthorDate: Fri Jun 22 09:22:16 2018 -0700 Closing client when Proxyconnection is disconnected (#1821) * Closing client when Proxyconnection is disconnected * Addressed massakam's PR comments * Fixed test failures with authorization disabled --- .../apache/pulsar/client/impl/ConnectionPool.java | 4 +- .../pulsar/proxy/server/ProxyConnection.java | 10 +++- .../pulsar/proxy/server/ProxyConnectionPool.java | 59 ++++++++++++++++++++++ 3 files changed, 69 insertions(+), 4 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 9b08169..afba7d2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -58,13 +58,13 @@ import io.netty.resolver.dns.DnsNameResolverBuilder; import io.netty.util.concurrent.Future; public class ConnectionPool implements Closeable { - private final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool; + protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool; private final Bootstrap bootstrap; private final EventLoopGroup eventLoopGroup; private final int maxConnectionsPerHosts; - private final DnsNameResolver dnsResolver; + protected final DnsNameResolver dnsResolver; private static final int MaxMessageSize = 5 * 1024 * 1024; public static final String TLS_HANDLER = "tls"; diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 8e9effd..08b5b2b 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -140,6 +140,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi directProxyHandler.outboundChannel.close(); } + if (client != null) { + client.close(); + } + LOG.info("[{}] Connection closed", remoteAddress); } @@ -274,7 +278,9 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi this.clientAuthentication = clientConf.getAuthentication(); if (!service.getConfiguration().isAuthenticationEnabled()) { - this.client = new PulsarClientImpl(clientConf, service.getWorkerGroup()); + this.client = new PulsarClientImpl(clientConf, service.getWorkerGroup(), + new ProxyConnectionPool(clientConf, service.getWorkerGroup(), () -> new ClientCnx(clientConf, + service.getWorkerGroup()))); return true; } @@ -311,7 +317,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final String clientAuthData, final String clientAuthMethod) throws PulsarClientException { return new PulsarClientImpl(clientConf, service.getWorkerGroup(), - new ConnectionPool(clientConf, service.getWorkerGroup(), () -> new ProxyClientCnx(clientConf, + new ProxyConnectionPool(clientConf, service.getWorkerGroup(), () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod))); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java new file mode 100644 index 0000000..73fd9ab --- /dev/null +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.ConnectionPool; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.channel.EventLoopGroup; + +public class ProxyConnectionPool extends ConnectionPool { + public ProxyConnectionPool(ClientConfigurationData clientConfig, EventLoopGroup eventLoopGroup, + Supplier<ClientCnx> clientCnxSupplier) { + super(clientConfig, eventLoopGroup, clientCnxSupplier); + } + + @Override + public void close() throws IOException { + log.info("Closing ProxyConnectionPool."); + pool.forEach((address, clientCnxPool) -> { + if (clientCnxPool != null) { + clientCnxPool.forEach((identifier, clientCnx) -> { + if (clientCnx != null && clientCnx.isDone()) { + try { + clientCnx.get().close(); + } catch (InterruptedException | ExecutionException e) { + log.error("Unable to close get client connection future.", e); + } + } + }); + } + }); + dnsResolver.close(); + } + + private static final Logger log = LoggerFactory.getLogger(ProxyConnectionPool.class); +} \ No newline at end of file