This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ee5afa5  Add ServiceUrlProvider and add method forceCloseConnection in 
PulsarC… (#2543)
ee5afa5 is described below

commit ee5afa5007e260891140956d244a865d73368321
Author: penghui <codelipeng...@gmail.com>
AuthorDate: Wed Sep 19 15:44:15 2018 +0800

    Add ServiceUrlProvider and add method forceCloseConnection in PulsarC… 
(#2543)
    
    Support build Pulsar client with serviceUrlProvider method.
    
    ### Motivation
    
    With serviceUrlProvider we can store the pulsar service url in zookeeper or 
any other config service.
    And we can watch the service url change event then control the pulsar 
client, such as change pulsar client serviceUrl, force close client connection 
or re-connect with new service url.
    
    ### Modifications
    
    Add ServiceUrlProvider interface.
    Add forceCloseConnection method in PulsarClient.
---
 .../pulsar/client/api/ServiceUrlProviderTest.java  | 156 +++++++++++++++++++++
 .../apache/pulsar/client/api/ClientBuilder.java    |   7 +
 .../org/apache/pulsar/client/api/PulsarClient.java |  22 +++
 .../pulsar/client/api/ServiceUrlProvider.java      |  42 ++++++
 .../pulsar/client/impl/ClientBuilderImpl.java      |  20 ++-
 .../apache/pulsar/client/impl/HandlerState.java    |   4 +
 .../apache/pulsar/client/impl/LookupService.java   |   2 +-
 .../client/impl/PartitionedProducerImpl.java       |   4 +
 .../pulsar/client/impl/PulsarClientImpl.java       |  29 +++-
 .../client/impl/conf/ClientConfigurationData.java  |   2 +
 10 files changed, 283 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
new file mode 100644
index 0000000..ab86f12
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.TimeUnit;
+
+public class ServiceUrlProviderTest extends ProducerConsumerBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testCreateClientWithServiceUrlProvider() throws Exception {
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrlProvider(new 
TestServiceUrlProvider(pulsar.getBrokerServiceUrl()))
+                .statsInterval(1, TimeUnit.SECONDS)
+                .build();
+        Assert.assertTrue(((PulsarClientImpl) 
client).getConfiguration().getServiceUrlProvider() instanceof 
TestServiceUrlProvider);
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .create();
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .subscriptionName("my-subscribe")
+                .subscribe();
+        for (int i = 0; i < 100; i++) {
+            producer.send("Hello Pulsar[" + i + "]");
+        }
+        client.forceCloseConnection();
+        for (int i = 100; i < 200; i++) {
+            producer.send("Hello Pulsar[" + i + "]");
+        }
+        int received = 0;
+        do {
+            Message<String> message = consumer.receive();
+            System.out.println(message.getValue());
+            received++;
+        } while (received < 200);
+        Assert.assertEquals(200, received);
+        producer.close();
+        consumer.close();
+        client.close();
+    }
+
+    @Test
+    public void testCreateClientWithAutoChangedServiceUrlProvider() throws 
Exception {
+
+        AutoChangedServiceUrlProvider serviceUrlProvider = new 
AutoChangedServiceUrlProvider(pulsar.getBrokerServiceUrl());
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrlProvider(serviceUrlProvider)
+                .statsInterval(1, TimeUnit.SECONDS)
+                .build();
+        Assert.assertTrue(((PulsarClientImpl) 
client).getConfiguration().getServiceUrlProvider() instanceof 
AutoChangedServiceUrlProvider);
+
+        ProducerImpl<String> producer = (ProducerImpl<String>) 
client.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .create();
+        ConsumerImpl<String> consumer = (ConsumerImpl<String>) 
client.newConsumer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .subscriptionName("my-subscribe")
+                .subscribe();
+
+        PulsarService pulsarService1 = pulsar;
+        conf.setBrokerServicePort(PortManager.nextFreePort());
+        conf.setWebServicePort(PortManager.nextFreePort());
+        startBroker();
+        PulsarService pulsarService2 = pulsar;
+        System.out.println("Pulsar1=" + pulsarService1.getBrokerServiceUrl() + 
", Pulsar2=" + pulsarService2.getBrokerServiceUrl());
+        Assert.assertNotEquals(pulsarService1.getBrokerServiceUrl(), 
pulsarService2.getBrokerServiceUrl());
+        Assert.assertEquals("pulsar://" + 
producer.getClient().getLookup().getServiceUrl(), 
pulsarService1.getBrokerServiceUrl());
+        Assert.assertEquals("pulsar://" + 
consumer.getClient().getLookup().getServiceUrl(), 
pulsarService1.getBrokerServiceUrl());
+        
serviceUrlProvider.onServiceUrlChanged(pulsarService2.getBrokerServiceUrl());
+        Assert.assertEquals("pulsar://" + 
producer.getClient().getLookup().getServiceUrl(), 
pulsarService2.getBrokerServiceUrl());
+        Assert.assertEquals("pulsar://" + 
consumer.getClient().getLookup().getServiceUrl(), 
pulsarService2.getBrokerServiceUrl());
+        producer.close();
+        consumer.close();
+        client.close();
+    }
+
+    class TestServiceUrlProvider implements ServiceUrlProvider {
+
+        private PulsarClient pulsarClient;
+
+        private String serviceUrl;
+
+        public TestServiceUrlProvider(String serviceUrl) {
+            this.serviceUrl = serviceUrl;
+        }
+
+        @Override
+        public String getServiceUrl() {
+            return serviceUrl;
+        }
+
+        @Override
+        public void setClient(PulsarClient client) {
+            this.pulsarClient = client;
+        }
+
+        public PulsarClient getPulsarClient() {
+            return pulsarClient;
+        }
+    }
+
+    class AutoChangedServiceUrlProvider extends TestServiceUrlProvider {
+
+        public AutoChangedServiceUrlProvider(String serviceUrl) {
+            super(serviceUrl);
+        }
+
+        public void onServiceUrlChanged(String newServiceUrl) throws 
PulsarClientException {
+            this.getPulsarClient().getConf().setServiceUrl(newServiceUrl);
+            this.getPulsarClient().reloadLookUp();
+            this.getPulsarClient().forceCloseConnection();
+        }
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 8aff26f..2831c09 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -81,6 +81,13 @@ public interface ClientBuilder extends Cloneable {
     ClientBuilder serviceUrl(String serviceUrl);
 
     /**
+     * Configure the service URL provider for Pulsar service
+     * @param serviceUrlProvider
+     * @return
+     */
+    ClientBuilder serviceUrlProvider(ServiceUrlProvider serviceUrlProvider);
+
+    /**
      * Set the authentication provider to use in the Pulsar client instance.
      * <p>
      * Example:
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index 5827573..cba451b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -368,4 +368,26 @@ public interface PulsarClient extends Closeable {
      *             if the forceful shutdown fails
      */
     void shutdown() throws PulsarClientException;
+
+    /**
+     * Force close connection of pulsar client.
+     *
+     * close all producer connection and close all consumer producer.
+     *
+     */
+    void forceCloseConnection();
+
+    /**
+     * Reload lookup service in pulsar client.
+     *
+     * @throws PulsarClientException
+     */
+    void reloadLookUp() throws PulsarClientException;
+
+    /**
+     * Get client config data.
+     *
+     * @return
+     */
+    ClientConfigurationData getConf();
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
new file mode 100644
index 0000000..9b6e963
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * The provider to provide the service url
+ * It used by {@link ClientBuilder#serviceUrlProvider(ServiceUrlProvider)}
+ */
+public interface ServiceUrlProvider {
+
+    /**
+     * Get pulsar service url from ServiceUrlProvider.
+     *
+     * @return pulsar service url.
+     */
+    String getServiceUrl();
+
+    /**
+     * Set pulsar client to the provider for provider can control the pulsar 
client,
+     * such as {@link PulsarClient#forceCloseConnection()} or {@link 
PulsarClient#close()}.
+     *
+     * @param client created pulsar client.
+     */
+    void setClient(PulsarClient client);
+
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index e2d5c4c..6a59520 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -21,12 +21,14 @@ package org.apache.pulsar.client.impl;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.api.ServiceUrlProvider;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 
@@ -43,11 +45,17 @@ public class ClientBuilderImpl implements ClientBuilder {
 
     @Override
     public PulsarClient build() throws PulsarClientException {
+        if (conf.getServiceUrlProvider() != null && 
StringUtils.isNotBlank(conf.getServiceUrlProvider().getServiceUrl())) {
+            conf.setServiceUrl(conf.getServiceUrlProvider().getServiceUrl());
+        }
         if (conf.getServiceUrl() == null) {
-            throw new IllegalArgumentException("service URL needs to be 
specified on the ClientBuilder object");
+            throw new IllegalArgumentException("service URL or service URL 
provider needs to be specified on the ClientBuilder object");
         }
-
-        return new PulsarClientImpl(conf);
+        PulsarClient client = new PulsarClientImpl(conf);
+        if (conf.getServiceUrlProvider() != null) {
+            conf.getServiceUrlProvider().setClient(client);
+        }
+        return client;
     }
 
     @Override
@@ -72,6 +80,12 @@ public class ClientBuilderImpl implements ClientBuilder {
     }
 
     @Override
+    public ClientBuilder serviceUrlProvider(ServiceUrlProvider 
serviceUrlProvider) {
+        conf.setServiceUrlProvider(serviceUrlProvider);
+        return this;
+    }
+
+    @Override
     public ClientBuilder authentication(Authentication authentication) {
         conf.setAuthentication(authentication);
         return this;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
index 6189583..b48e702 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
@@ -66,4 +66,8 @@ abstract class HandlerState {
     protected State getAndUpdateState(final UnaryOperator<State> updater) {
         return STATE_UPDATER.getAndUpdate(this, updater);
     }
+
+    public PulsarClientImpl getClient() {
+        return client;
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
index 769422b..568c703 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
@@ -40,7 +40,7 @@ import org.apache.pulsar.common.schema.SchemaInfo;
  * </ul>
  *
  */
-interface LookupService extends AutoCloseable {
+public interface LookupService extends AutoCloseable {
 
     /**
      * Calls broker lookup-api to get broker {@link InetSocketAddress} which 
serves namespace bundle that contains given
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 12ecf2b..8c4387e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -237,6 +237,10 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
 
     private static final Logger log = 
LoggerFactory.getLogger(PartitionedProducerImpl.class);
 
+    protected List<ProducerImpl<T>> getProducers() {
+        return producers.stream().collect(Collectors.toList());
+    }
+
     @Override
     String getHandlerName() {
         return "partition-producer";
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index b87b9bd..5dd0e3d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -34,6 +34,7 @@ import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
@@ -82,7 +83,7 @@ public class PulsarClientImpl implements PulsarClient {
     private static final Logger log = 
LoggerFactory.getLogger(PulsarClientImpl.class);
 
     private final ClientConfigurationData conf;
-    private final LookupService lookup;
+    private LookupService lookup;
     private final ConnectionPool cnxPool;
     private final Timer timer;
     private final ExecutorProvider externalExecutorProvider;
@@ -706,6 +707,19 @@ public class PulsarClientImpl implements PulsarClient {
         }
     }
 
+    @Override
+    public void forceCloseConnection() {
+        for (ConcurrentMap<Integer, CompletableFuture<ClientCnx>> cnxMap : 
cnxPool.pool.values()) {
+            for (CompletableFuture<ClientCnx> clientCnxCompletableFuture : 
cnxMap.values()) {
+                try {
+                    clientCnxCompletableFuture.get().close();
+                } catch (Exception e) {
+                    log.error("Force close connection exception ", e);
+                }
+            }
+        }
+    }
+
     protected CompletableFuture<ClientCnx> getConnection(final String topic) {
         TopicName topicName = TopicName.get(topic);
         return lookup.getBroker(topicName)
@@ -745,6 +759,19 @@ public class PulsarClientImpl implements PulsarClient {
         return lookup;
     }
 
+    public void reloadLookUp() throws PulsarClientException {
+        if (conf.getServiceUrl().startsWith("http")) {
+            lookup = new HttpLookupService(conf, eventLoopGroup);
+        } else {
+            lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), 
conf.isUseTls(), externalExecutorProvider.getExecutor());
+        }
+    }
+
+    @Override
+    public ClientConfigurationData getConf() {
+        return conf;
+    }
+
     public CompletableFuture<Integer> getNumberOfPartitions(String topic) {
         return getPartitionedTopicMetadata(topic).thenApply(metadata -> 
metadata.partitions);
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index b51d61f..1ff24c9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl.conf;
 import java.io.Serializable;
 
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ServiceUrlProvider;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -35,6 +36,7 @@ public class ClientConfigurationData implements Serializable, 
Cloneable {
     private static final long serialVersionUID = 1L;
 
     private String serviceUrl;
+    private ServiceUrlProvider serviceUrlProvider;
 
     @JsonIgnore
     private Authentication authentication = new AuthenticationDisabled();

Reply via email to