This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e44d1e6e2b [improve] PIP-241: add TopicEventListener / topic events 
for the BrokerService (#19153)
3e44d1e6e2b is described below

commit 3e44d1e6e2ba4599d547c83cf7cb25350f0cc560
Author: Andrey Yegorov <8622884+dl...@users.noreply.github.com>
AuthorDate: Thu Feb 2 10:04:52 2023 -0800

    [improve] PIP-241: add TopicEventListener / topic events for the 
BrokerService (#19153)
---
 .../pulsar/broker/service/BrokerService.java       |  73 ++++-
 .../broker/service/TopicEventsDispatcher.java      | 137 +++++++++
 .../pulsar/broker/service/TopicEventsListener.java |  62 ++++
 .../pulsar/broker/TopicEventsListenerTest.java     | 311 +++++++++++++++++++++
 .../pulsar/broker/service/BrokerTestBase.java      |   2 +-
 5 files changed, 579 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index f7020963fb7..27a1518cb81 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -110,6 +110,8 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
+import org.apache.pulsar.broker.service.TopicEventsListener.EventStage;
+import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
@@ -281,6 +283,8 @@ public class BrokerService implements Closeable {
     private Set<BrokerEntryMetadataInterceptor> 
brokerEntryMetadataInterceptors;
     private Set<ManagedLedgerPayloadProcessor> brokerEntryPayloadProcessors;
 
+    private final TopicEventsDispatcher topicEventsDispatcher = new 
TopicEventsDispatcher();
+
     public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) 
throws Exception {
         this.pulsar = pulsar;
         this.preciseTopicPublishRateLimitingEnable =
@@ -398,6 +402,16 @@ public class BrokerService implements Closeable {
         this.bundlesQuotas = new BundlesQuotas(pulsar.getLocalMetadataStore());
     }
 
+    public void addTopicEventListener(TopicEventsListener... listeners) {
+        topicEventsDispatcher.addTopicEventListener(listeners);
+        getTopics().keys().forEach(topic ->
+                TopicEventsDispatcher.notify(listeners, topic, 
TopicEvent.LOAD, EventStage.SUCCESS, null));
+    }
+
+    public void removeTopicEventListener(TopicEventsListener... listeners) {
+        topicEventsDispatcher.removeTopicEventListener(listeners);
+    }
+
     // This call is used for starting additional protocol handlers
     public void startProtocolHandlers(
         Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> 
protocolHandlers) {
@@ -1024,21 +1038,41 @@ public class BrokerService implements Closeable {
                     return loadOrCreatePersistentTopic(tpName, 
createIfMissing, properties);
                 });
             } else {
-            return topics.computeIfAbsent(topicName.toString(), (name) -> {
+                return topics.computeIfAbsent(topicName.toString(), (name) -> {
+                    topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.LOAD, EventStage.BEFORE);
                     if (topicName.isPartitioned()) {
                         final TopicName partitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
                         return 
this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata)
 -> {
                             if (topicName.getPartitionIndex() < 
metadata.partitions) {
-                                return createNonPersistentTopic(name);
+                                topicEventsDispatcher
+                                        .notify(topicName.toString(), 
TopicEvent.CREATE, EventStage.BEFORE);
+
+                                CompletableFuture<Optional<Topic>> res = 
createNonPersistentTopic(name);
+
+                                CompletableFuture<Optional<Topic>> eventFuture 
= topicEventsDispatcher
+                                        .notifyOnCompletion(res, 
topicName.toString(), TopicEvent.CREATE);
+                                topicEventsDispatcher
+                                        .notifyOnCompletion(eventFuture, 
topicName.toString(), TopicEvent.LOAD);
+                                return res;
                             }
+                            topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.LOAD, EventStage.FAILURE);
                             return 
CompletableFuture.completedFuture(Optional.empty());
                         });
                     } else if (createIfMissing) {
-                        return createNonPersistentTopic(name);
+                        topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.CREATE, EventStage.BEFORE);
+
+                        CompletableFuture<Optional<Topic>> res = 
createNonPersistentTopic(name);
+
+                        CompletableFuture<Optional<Topic>> eventFuture = 
topicEventsDispatcher
+                                .notifyOnCompletion(res, topicName.toString(), 
TopicEvent.CREATE);
+                        topicEventsDispatcher
+                                .notifyOnCompletion(eventFuture, 
topicName.toString(), TopicEvent.LOAD);
+                        return res;
                     } else {
+                        topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.LOAD, EventStage.FAILURE);
                         return 
CompletableFuture.completedFuture(Optional.empty());
                     }
-                    });
+                });
             }
         } catch (IllegalArgumentException e) {
             log.warn("[{}] Illegalargument exception when loading topic", 
topicName, e);
@@ -1056,6 +1090,13 @@ public class BrokerService implements Closeable {
     }
 
     public CompletableFuture<Void> deleteTopic(String topic, boolean 
forceDelete) {
+        topicEventsDispatcher.notify(topic, TopicEvent.DELETE, 
EventStage.BEFORE);
+        CompletableFuture<Void> result =  deleteTopicInternal(topic, 
forceDelete);
+        topicEventsDispatcher.notifyOnCompletion(result, topic, 
TopicEvent.DELETE);
+        return result;
+    }
+
+    private CompletableFuture<Void> deleteTopicInternal(String topic, boolean 
forceDelete) {
         TopicName topicName = TopicName.get(topic);
         Optional<Topic> optTopic = getTopicReference(topic);
 
@@ -1402,7 +1443,7 @@ public class BrokerService implements Closeable {
                 log.debug("Broker is unable to load persistent topic {}", 
topic);
             }
             topicFuture.completeExceptionally(new NotAllowedException(
-                    "Broker is not unable to load persistent topic"));
+                    "Broker is unable to load persistent topic"));
             return topicFuture;
         }
 
@@ -1542,6 +1583,24 @@ public class BrokerService implements Closeable {
                 
managedLedgerConfig.setShadowSourceName(TopicName.get(shadowSource).getPersistenceNamingEncoding());
             }
 
+            topicEventsDispatcher.notify(topic, TopicEvent.LOAD, 
EventStage.BEFORE);
+            // load can fail with topicFuture completed non-exceptionally
+            // work around this
+            final CompletableFuture<Void> loadFuture = new 
CompletableFuture<>();
+            topicFuture.whenComplete((res, ex) -> {
+                if (ex == null) {
+                    loadFuture.complete(null);
+                } else {
+                    loadFuture.completeExceptionally(ex);
+                }
+            });
+
+            if (createIfMissing) {
+                topicEventsDispatcher.notify(topic, TopicEvent.CREATE, 
EventStage.BEFORE);
+                topicEventsDispatcher.notifyOnCompletion(topicFuture, topic, 
TopicEvent.CREATE);
+            }
+            topicEventsDispatcher.notifyOnCompletion(loadFuture, topic, 
TopicEvent.LOAD);
+
             // Once we have the configuration, we can proceed with the async 
open operation
             
managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), 
managedLedgerConfig,
                     new OpenLedgerCallback() {
@@ -1603,6 +1662,7 @@ public class BrokerService implements Closeable {
                         public void openLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
                             if (!createIfMissing && exception instanceof 
ManagedLedgerNotFoundException) {
                                 // We were just trying to load a topic and the 
topic doesn't exist
+                                loadFuture.completeExceptionally(exception);
                                 topicFuture.complete(Optional.empty());
                             } else {
                                 log.warn("Failed to create topic {}", topic, 
exception);
@@ -2135,6 +2195,8 @@ public class BrokerService implements Closeable {
         String bundleName = namespaceBundle.toString();
         String namespaceName = 
TopicName.get(topic).getNamespaceObject().toString();
 
+        topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD, 
EventStage.BEFORE);
+
         synchronized (multiLayerTopicsMap) {
             ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, 
Topic>> namespaceMap = multiLayerTopicsMap
                     .get(namespaceName);
@@ -2169,6 +2231,7 @@ public class BrokerService implements Closeable {
         if (compactor != null) {
             compactor.getStats().removeTopic(topic);
         }
+        topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD, 
EventStage.SUCCESS);
     }
 
     public int getNumberOfNamespaceBundles() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsDispatcher.java
new file mode 100644
index 00000000000..a706e00db90
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsDispatcher.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Utility class to dispatch topic events.
+ */
+@Slf4j
+public class TopicEventsDispatcher {
+    private final List<TopicEventsListener> topicEventListeners = new 
CopyOnWriteArrayList<>();
+
+    /**
+     * Adds listeners, ignores null listeners.
+     * @param listeners
+     */
+    public void addTopicEventListener(TopicEventsListener... listeners) {
+        Objects.requireNonNull(listeners);
+        Arrays.stream(listeners)
+                .filter(x -> x != null)
+                .forEach(topicEventListeners::add);
+    }
+
+    /**
+     * Removes listeners.
+     * @param listeners
+     */
+    public void removeTopicEventListener(TopicEventsListener... listeners) {
+        Objects.requireNonNull(listeners);
+        Arrays.stream(listeners)
+                .filter(x -> x != null)
+                .forEach(topicEventListeners::remove);
+    }
+
+    /**
+     * Dispatches notification to all currently added listeners.
+     * @param topic
+     * @param event
+     * @param stage
+     */
+    public void notify(String topic,
+                       TopicEventsListener.TopicEvent event,
+                       TopicEventsListener.EventStage stage) {
+        notify(topic, event, stage, null);
+    }
+
+    /**
+     * Dispatches notification to all currently added listeners.
+     * @param topic
+     * @param event
+     * @param stage
+     * @param t
+     */
+    public void notify(String topic,
+                       TopicEventsListener.TopicEvent event,
+                       TopicEventsListener.EventStage stage,
+                       Throwable t) {
+        topicEventListeners
+                .forEach(listener -> notify(listener, topic, event, stage, t));
+    }
+
+    /**
+     * Dispatches SUCCESS/FAILURE notification to all currently added 
listeners on completion of the future.
+     * @param future
+     * @param topic
+     * @param event
+     * @param <T>
+     * @return future of a new completion stage
+     */
+    public <T> CompletableFuture<T> notifyOnCompletion(CompletableFuture<T> 
future,
+                                                       String topic,
+                                                       
TopicEventsListener.TopicEvent event) {
+        return future.whenComplete((r, ex) -> notify(topic,
+                event,
+                ex == null ? TopicEventsListener.EventStage.SUCCESS : 
TopicEventsListener.EventStage.FAILURE,
+                ex));
+    }
+
+    /**
+     * Dispatches notification to specified listeners.
+     * @param listeners
+     * @param topic
+     * @param event
+     * @param stage
+     * @param t
+     */
+    public static void notify(TopicEventsListener[] listeners,
+                              String topic,
+                              TopicEventsListener.TopicEvent event,
+                              TopicEventsListener.EventStage stage,
+                              Throwable t) {
+        Objects.requireNonNull(listeners);
+        for (TopicEventsListener listener: listeners) {
+            notify(listener, topic, event, stage, t);
+        }
+    }
+
+    private static void notify(TopicEventsListener listener,
+                               String topic,
+                               TopicEventsListener.TopicEvent event,
+                               TopicEventsListener.EventStage stage,
+                               Throwable t) {
+        if (listener == null) {
+            return;
+        }
+
+        try {
+            listener.handleEvent(topic, event, stage, t);
+        } catch (Throwable ex) {
+            log.error("TopicEventsListener {} exception while handling {}_{} 
for topic {}",
+                    listener, event, stage, topic, ex);
+        }
+    }
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsListener.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsListener.java
new file mode 100644
index 00000000000..8068067206c
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsListener.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+/**
+ * Listener for the Topic events.
+ */
+@InterfaceStability.Evolving
+@InterfaceAudience.LimitedPrivate
+public interface TopicEventsListener {
+
+    /**
+     * Types of events currently supported.
+     *  create/load/unload/delete
+     */
+    enum TopicEvent {
+        // create events included into load events
+        CREATE,
+        LOAD,
+        UNLOAD,
+        DELETE,
+    }
+
+    /**
+     * Stages of events currently supported.
+     *  before starting the event/successful completion/failed completion
+     */
+    enum EventStage {
+        BEFORE,
+        SUCCESS,
+        FAILURE
+    }
+
+    /**
+     * Handle topic event.
+     * Choice of the thread / maintenance of the thread pool is up to the 
event handlers.
+     * @param topicName - name of the topic
+     * @param event - TopicEvent
+     * @param stage - EventStage
+     * @param t - exception in case of FAILURE, if present/known
+     */
+    void handleEvent(String topicName, TopicEvent event, EventStage stage, 
Throwable t);
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
new file mode 100644
index 00000000000..e6459bbf74c
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import com.google.common.collect.Sets;
+
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+@Slf4j
+public class TopicEventsListenerTest extends BrokerTestBase {
+
+    final static Queue<String> events = new ConcurrentLinkedQueue<>();
+    volatile String topicNameToWatch;
+    String namespace;
+
+    @DataProvider(name = "topicType")
+    public static Object[][] topicType() {
+        return new Object[][] {
+                {"persistent", "partitioned", true},
+                {"persistent", "non-partitioned", true},
+                {"non-persistent", "partitioned", true},
+                {"non-persistent", "non-partitioned", true},
+                {"persistent", "partitioned", false},
+                {"persistent", "non-partitioned", false},
+                {"non-persistent", "partitioned", false},
+                {"non-persistent", "non-partitioned", false}
+        };
+    }
+
+    @DataProvider(name = "topicTypeNoDelete")
+    public static Object[][] topicTypeNoDelete() {
+        return new Object[][] {
+                {"persistent", "partitioned"},
+                {"persistent", "non-partitioned"},
+                {"non-persistent", "partitioned"},
+                {"non-persistent", "non-partitioned"}
+        };
+    }
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+        pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
+
+        pulsar.getBrokerService().addTopicEventListener((topic, event, stage, 
t) -> {
+            log.info("got event {}__{} for topic {}", event, stage, topic);
+            if (topic.equals(topicNameToWatch)) {
+                if (log.isDebugEnabled()) {
+                    log.debug("got event {}__{} for topic {} with detailed 
stack",
+                            event, stage, topic, new Exception("tracing event 
source"));
+                }
+                events.add(event.toString() + "__" + stage.toString());
+            }
+        });
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @BeforeMethod
+    protected void setupTest() throws Exception {
+        namespace = "prop/" + UUID.randomUUID();
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        
assertTrue(admin.namespaces().getNamespaces("prop").contains(namespace));
+        admin.namespaces().setRetention(namespace, new RetentionPolicies(3, 
10));
+        try (PulsarAdmin admin2 = createPulsarAdmin()) {
+            Awaitility.await().untilAsserted(() ->
+                    assertEquals(admin2.namespaces().getRetention(namespace), 
new RetentionPolicies(3, 10)));
+        }
+
+        events.clear();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    protected void cleanupTest() throws Exception {
+        deleteNamespaceWithRetry(namespace, true);
+    }
+
+    @Test(dataProvider = "topicType")
+    public void testEvents(String topicTypePersistence, String 
topicTypePartitioned,
+                           boolean forceDelete) throws Exception {
+        String topicName = topicTypePersistence + "://" + namespace + "/" + 
"topic-" + UUID.randomUUID();
+
+        createTopicAndVerifyEvents(topicTypePartitioned, topicName);
+
+        events.clear();
+        if (topicTypePartitioned.equals("partitioned")) {
+            admin.topics().deletePartitionedTopic(topicName, forceDelete);
+        } else {
+            admin.topics().delete(topicName, forceDelete);
+        }
+
+        Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() ->
+                Assert.assertEquals(events.toArray(), new String[]{
+                        "DELETE__BEFORE",
+                        "UNLOAD__BEFORE",
+                        "UNLOAD__SUCCESS",
+                        "DELETE__SUCCESS"
+                })
+        );
+    }
+
+    @Test(dataProvider = "topicType")
+    public void testEventsWithUnload(String topicTypePersistence, String 
topicTypePartitioned,
+                                     boolean forceDelete) throws Exception {
+        String topicName = topicTypePersistence + "://" + namespace + "/" + 
"topic-" + UUID.randomUUID();
+
+        createTopicAndVerifyEvents(topicTypePartitioned, topicName);
+
+        events.clear();
+        admin.topics().unload(topicName);
+
+        Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() ->
+                Assert.assertEquals(events.toArray(), new String[]{
+                        "UNLOAD__BEFORE",
+                        "UNLOAD__SUCCESS"
+                })
+        );
+
+        events.clear();
+        if (topicTypePartitioned.equals("partitioned")) {
+            admin.topics().deletePartitionedTopic(topicName, forceDelete);
+        } else {
+            admin.topics().delete(topicName, forceDelete);
+        }
+
+        Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() ->
+                Assert.assertEquals(events.toArray(), new String[]{
+                        "DELETE__BEFORE",
+                        "DELETE__SUCCESS"
+                })
+        );
+    }
+
+    @Test(dataProvider = "topicType")
+    public void testEventsActiveSub(String topicTypePersistence, String 
topicTypePartitioned,
+                                    boolean forceDelete) throws Exception {
+        String topicName = topicTypePersistence + "://" + namespace + "/" + 
"topic-" + UUID.randomUUID();
+
+        createTopicAndVerifyEvents(topicTypePartitioned, topicName);
+
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe();
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        for (int i = 0; i < 10; i++) {
+            producer.send("hello".getBytes());
+        }
+        consumer.receive();
+
+        events.clear();
+        try {
+            if (topicTypePartitioned.equals("partitioned")) {
+                admin.topics().deletePartitionedTopic(topicName, forceDelete);
+            } else {
+                admin.topics().delete(topicName, forceDelete);
+            }
+        } catch (PulsarAdminException e) {
+            if (forceDelete) {
+                throw e;
+            }
+            assertTrue(e.getMessage().contains("Topic has active 
producers/subscriptions")
+                    || e.getMessage().contains("connected 
producers/consumers"));
+        }
+
+        final String[] expectedEvents;
+
+        if (forceDelete) {
+            expectedEvents = new String[]{
+                    "DELETE__BEFORE",
+                    "UNLOAD__BEFORE",
+                    "UNLOAD__SUCCESS",
+                    "DELETE__SUCCESS",
+            };
+        } else {
+            expectedEvents = new String[]{
+                    "DELETE__BEFORE",
+                    "DELETE__FAILURE"
+            };
+        }
+
+        Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            // only care about first 4 events max, the rest will be from 
client recreating deleted topic
+            String[] eventsToArray = (events.size() <= 4)
+                    ? events.toArray(new String[0])
+                    : ArrayUtils.subarray(events.toArray(new String[0]), 0, 4);
+            Assert.assertEquals(eventsToArray, expectedEvents);
+        });
+
+        consumer.close();
+        producer.close();
+    }
+
+    @Test(dataProvider = "topicTypeNoDelete")
+    public void testTopicAutoGC(String topicTypePersistence, String 
topicTypePartitioned) throws Exception {
+        String topicName = topicTypePersistence + "://" + namespace + "/" + 
"topic-" + UUID.randomUUID();
+
+        createTopicAndVerifyEvents(topicTypePartitioned, topicName);
+
+        admin.namespaces().setInactiveTopicPolicies(namespace,
+                new 
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, 
true));
+
+        // Remove retention
+        admin.namespaces().setRetention(namespace, new RetentionPolicies());
+        try (PulsarAdmin admin2 = createPulsarAdmin()) {
+            Awaitility.await().untilAsserted(() ->
+                    assertEquals(admin2.namespaces().getRetention(namespace), 
new RetentionPolicies()));
+        }
+
+        events.clear();
+
+        runGC();
+
+        Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() ->
+                Assert.assertEquals(events.toArray(), new String[]{
+                        "UNLOAD__BEFORE",
+                        "UNLOAD__SUCCESS",
+                })
+        );
+    }
+
+    private void createTopicAndVerifyEvents(String topicTypePartitioned, 
String topicName) throws Exception {
+        final String[] expectedEvents;
+        if (topicTypePartitioned.equals("partitioned")) {
+            topicNameToWatch = topicName + "-partition-1";
+            admin.topics().createPartitionedTopic(topicName, 2);
+            triggerPartitionsCreation(topicName);
+
+            expectedEvents = new String[]{
+                    "LOAD__BEFORE",
+                    "CREATE__BEFORE",
+                    "CREATE__SUCCESS",
+                    "LOAD__SUCCESS"
+            };
+
+        } else {
+            topicNameToWatch = topicName;
+            admin.topics().createNonPartitionedTopic(topicName);
+
+            expectedEvents = new String[]{
+                    "LOAD__BEFORE",
+                    "LOAD__FAILURE",
+                    "LOAD__BEFORE",
+                    "CREATE__BEFORE",
+                    "CREATE__SUCCESS",
+                    "LOAD__SUCCESS"
+            };
+
+        }
+
+        Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() ->
+                Assert.assertEquals(events.toArray(), expectedEvents));
+    }
+
+    private PulsarAdmin createPulsarAdmin() throws PulsarClientException {
+        return PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : 
brokerUrlTls.toString())
+                .build();
+    }
+
+    private void triggerPartitionsCreation(String topicName) throws Exception {
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+        producer.close();
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
index 5fd4edd2a30..63f778a44f1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
@@ -79,7 +79,7 @@ public abstract class BrokerTestBase extends 
MockedPulsarServiceBaseTest {
         }
     }
 
-    void runGC() {
+    protected void runGC() {
         try {
             pulsar.getBrokerService().forEachTopic(topic -> {
                 if (topic instanceof AbstractTopic) {

Reply via email to