This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9746ea4  Fix: remove local-cluster from replication list of 
global-namespace should clean topics (#1647)
9746ea4 is described below

commit 9746ea44337044a4c5d4f744d4e84e31ef53d327
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Wed Apr 25 22:57:38 2018 -0700

    Fix: remove local-cluster from replication list of global-namespace should 
clean topics (#1647)
---
 .../broker/service/persistent/PersistentTopic.java | 130 ++++++++++++++-------
 .../broker/service/ReplicatorGlobalNSTest.java     | 110 +++++++++++++++++
 2 files changed, 201 insertions(+), 39 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c34ec84..f425254 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -661,6 +661,21 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
         return delete(false);
     }
 
+    private CompletableFuture<Void> delete(boolean failIfHasSubscriptions) {
+        return delete(failIfHasSubscriptions, false);
+    }
+
+    /**
+     * Forcefully close all producers/consumers/replicators and deletes the 
topic. this function is used when local
+     * cluster is removed from global-namespace replication list. Because 
broker doesn't allow lookup if local cluster
+     * is not part of replication cluster list.
+     * 
+     * @return
+     */
+    private CompletableFuture<Void> deleteForcefully() {
+        return delete(false, true);
+    }
+    
     /**
      * Delete the managed ledger associated with this topic
      *
@@ -668,11 +683,14 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
      *            Flag indicating whether delete should succeed if topic still 
has unconnected subscriptions. Set to
      *            false when called from admin API (it will delete the subs 
too), and set to true when called from GC
      *            thread
-     *
+     * @param closeIfClientsConnected
+     *            Flag indicate whether explicitly close connected 
producers/consumers/replicators before trying to delete topic. If
+     *            any client is connected to a topic and if this flag is 
disable then this operation fails.
+     * 
      * @return Completable future indicating completion of delete operation 
Completed exceptionally with:
      *         IllegalStateException if topic is still active 
ManagedLedgerException if ledger delete operation fails
      */
-    private CompletableFuture<Void> delete(boolean failIfHasSubscriptions) {
+    private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, 
boolean closeIfClientsConnected) {
         CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
 
         lock.writeLock().lock();
@@ -682,48 +700,73 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
                 deleteFuture.completeExceptionally(new 
TopicFencedException("Topic is already fenced"));
                 return deleteFuture;
             }
-            if (USAGE_COUNT_UPDATER.get(this) == 0) {
-                isFenced = true;
-
+            
+            CompletableFuture<Void> closeClientFuture = new 
CompletableFuture<>();
+            if (closeIfClientsConnected) {
                 List<CompletableFuture<Void>> futures = Lists.newArrayList();
+                replicators.forEach((cluster, replicator) -> 
futures.add(replicator.disconnect()));
+                producers.forEach(producer -> 
futures.add(producer.disconnect()));
+                subscriptions.forEach((s, sub) -> 
futures.add(sub.disconnect()));
+                FutureUtil.waitForAll(futures).thenRun(() -> {
+                    closeClientFuture.complete(null);
+                }).exceptionally(ex -> {
+                    log.error("[{}] Error closing clients", topic, ex);
+                    isFenced = false;
+                    closeClientFuture.completeExceptionally(ex);
+                    return null;
+                });
+            } else {
+                closeClientFuture.complete(null);
+            }
 
-                if (failIfHasSubscriptions) {
-                    if (!subscriptions.isEmpty()) {
-                        isFenced = false;
-                        deleteFuture.completeExceptionally(new 
TopicBusyException("Topic has subscriptions"));
-                        return deleteFuture;
-                    }
-                } else {
-                    subscriptions.forEach((s, sub) -> 
futures.add(sub.delete()));
-                }
+            closeClientFuture.thenAccept(delete -> {
+                if (USAGE_COUNT_UPDATER.get(this) == 0) {
+                    isFenced = true;
 
-                FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
-                    if (ex != null) {
-                        log.error("[{}] Error deleting topic", topic, ex);
-                        isFenced = false;
-                        deleteFuture.completeExceptionally(ex);
-                    } else {
-                        ledger.asyncDelete(new 
AsyncCallbacks.DeleteLedgerCallback() {
-                            @Override
-                            public void deleteLedgerComplete(Object ctx) {
-                                brokerService.removeTopicFromCache(topic);
-                                log.info("[{}] Topic deleted", topic);
-                                deleteFuture.complete(null);
-                            }
+                    List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
 
-                            @Override
-                            public void 
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
-                                isFenced = false;
-                                log.error("[{}] Error deleting topic", topic, 
exception);
-                                deleteFuture.completeExceptionally(new 
PersistenceException(exception));
-                            }
-                        }, null);
+                    if (failIfHasSubscriptions) {
+                        if (!subscriptions.isEmpty()) {
+                            isFenced = false;
+                            deleteFuture.completeExceptionally(new 
TopicBusyException("Topic has subscriptions"));
+                            return;
+                        }
+                    } else {
+                        subscriptions.forEach((s, sub) -> 
futures.add(sub.delete()));
                     }
-                });
-            } else {
+
+                    FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
+                        if (ex != null) {
+                            log.error("[{}] Error deleting topic", topic, ex);
+                            isFenced = false;
+                            deleteFuture.completeExceptionally(ex);
+                        } else {
+                            ledger.asyncDelete(new 
AsyncCallbacks.DeleteLedgerCallback() {
+                                @Override
+                                public void deleteLedgerComplete(Object ctx) {
+                                    brokerService.removeTopicFromCache(topic);
+                                    log.info("[{}] Topic deleted", topic);
+                                    deleteFuture.complete(null);
+                                }
+
+                                @Override
+                                public void 
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+                                    isFenced = false;
+                                    log.error("[{}] Error deleting topic", 
topic, exception);
+                                    deleteFuture.completeExceptionally(new 
PersistenceException(exception));
+                                }
+                            }, null);
+                        }
+                    });
+                } else {
+                    deleteFuture.completeExceptionally(new TopicBusyException(
+                            "Topic has " + USAGE_COUNT_UPDATER.get(this) + " 
connected producers/consumers"));
+                }
+            }).exceptionally(ex->{
                 deleteFuture.completeExceptionally(
-                        new TopicBusyException("Topic has " + 
USAGE_COUNT_UPDATER.get(this) + " connected producers/consumers"));
-            }
+                        new TopicBusyException("Failed to close clients before 
deleting topic."));
+                return null;
+            });
         } finally {
             lock.writeLock().unlock();
         }
@@ -858,6 +901,14 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
         }
 
         String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
+        
+        // if local cluster is removed from global namespace cluster-list : 
then delete topic forcefully because pulsar
+        // doesn't serve global topic without local repl-cluster configured.
+        if (TopicName.get(topic).isGlobal() && 
!configuredClusters.contains(localCluster)) {
+            log.info("Deleting topic [{}] because local cluster is not part of 
global namespace repl list {}",
+                    configuredClusters);
+            return deleteForcefully();
+        }
 
         List<CompletableFuture<Void>> futures = Lists.newArrayList();
 
@@ -882,6 +933,7 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
                     futures.add(removeReplicator(cluster));
                 }
             }
+            
         });
 
         return FutureUtil.waitForAll(futures);
@@ -962,7 +1014,7 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
     CompletableFuture<Void> removeReplicator(String remoteCluster) {
         log.info("[{}] Removing replicator to {}", topic, remoteCluster);
         final CompletableFuture<Void> future = new CompletableFuture<>();
-
+        
         String name = PersistentReplicator.getReplicatorName(replicatorPrefix, 
remoteCluster);
 
         replicators.get(remoteCluster).disconnect().thenRun(() -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
new file mode 100644
index 0000000..adf5ce7
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Sets;
+
+public class ReplicatorGlobalNSTest extends ReplicatorTestBase {
+
+    protected String methodName;
+
+    @BeforeMethod
+    public void beforeMethod(Method m) throws Exception {
+        methodName = m.getName();
+    }
+
+    @Override
+    @BeforeClass(timeOut = 30000)
+    void setup() throws Exception {
+        super.setup();
+    }
+
+    @Override
+    @AfterClass(timeOut = 30000)
+    void shutdown() throws Exception {
+        super.shutdown();
+    }
+
+    @DataProvider(name = "partitionedTopic")
+    public Object[][] partitionedTopicProvider() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+
+    /**
+     * If local cluster is removed from the global namespace then all topics 
under that namespace should be deleted from
+     * the cluster.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testRemoveLocalClusterOnGlobalNamespace() throws Exception {
+        log.info("--- Starting 
ReplicatorTest::testRemoveLocalClusterOnGlobalNamespace ---");
+
+        final String namespace = "pulsar/global/removeClusterTest";
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2", "r3"));
+
+        final String topicName = "persistent://" + namespace + "/topic";
+
+        PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, 
TimeUnit.SECONDS)
+                .build();
+        PulsarClient client2 = 
PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, 
TimeUnit.SECONDS)
+                .build();
+
+        ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) 
client1.newProducer().topic(topicName)
+                
.enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+        ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) 
client1.newConsumer().topic(topicName)
+                .subscriptionName("sub1").subscribe();
+        ConsumerImpl<byte[]> consumer2 = (ConsumerImpl<byte[]>) 
client2.newConsumer().topic(topicName)
+                .subscriptionName("sub1").subscribe();
+
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r2", "r3"));
+
+        MockedPulsarServiceBaseTest
+                .retryStrategically((test) -> 
!pulsar1.getBrokerService().getTopics().containsKey(topicName), 5, 150);
+
+        
Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName));
+        Assert.assertFalse(producer1.isConnected());
+        Assert.assertFalse(consumer1.isConnected());
+        Assert.assertTrue(consumer2.isConnected());
+
+        client1.close();
+        client2.close();
+    }
+
+    private static final Logger log = 
LoggerFactory.getLogger(ReplicatorGlobalNSTest.class);
+
+}

-- 
To stop receiving notification emails like this one, please contact
rdhaba...@apache.org.

Reply via email to