This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 3e44d1e6e2b [improve] PIP-241: add TopicEventListener / topic events for the BrokerService (#19153) 3e44d1e6e2b is described below commit 3e44d1e6e2ba4599d547c83cf7cb25350f0cc560 Author: Andrey Yegorov <8622884+dl...@users.noreply.github.com> AuthorDate: Thu Feb 2 10:04:52 2023 -0800 [improve] PIP-241: add TopicEventListener / topic events for the BrokerService (#19153) --- .../pulsar/broker/service/BrokerService.java | 73 ++++- .../broker/service/TopicEventsDispatcher.java | 137 +++++++++ .../pulsar/broker/service/TopicEventsListener.java | 62 ++++ .../pulsar/broker/TopicEventsListenerTest.java | 311 +++++++++++++++++++++ .../pulsar/broker/service/BrokerTestBase.java | 2 +- 5 files changed, 579 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index f7020963fb7..27a1518cb81 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -110,6 +110,8 @@ import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; +import org.apache.pulsar.broker.service.TopicEventsListener.EventStage; +import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; @@ -281,6 +283,8 @@ public class BrokerService implements Closeable { private Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors; private Set<ManagedLedgerPayloadProcessor> brokerEntryPayloadProcessors; + private final TopicEventsDispatcher topicEventsDispatcher = new TopicEventsDispatcher(); + public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception { this.pulsar = pulsar; this.preciseTopicPublishRateLimitingEnable = @@ -398,6 +402,16 @@ public class BrokerService implements Closeable { this.bundlesQuotas = new BundlesQuotas(pulsar.getLocalMetadataStore()); } + public void addTopicEventListener(TopicEventsListener... listeners) { + topicEventsDispatcher.addTopicEventListener(listeners); + getTopics().keys().forEach(topic -> + TopicEventsDispatcher.notify(listeners, topic, TopicEvent.LOAD, EventStage.SUCCESS, null)); + } + + public void removeTopicEventListener(TopicEventsListener... listeners) { + topicEventsDispatcher.removeTopicEventListener(listeners); + } + // This call is used for starting additional protocol handlers public void startProtocolHandlers( Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> protocolHandlers) { @@ -1024,21 +1038,41 @@ public class BrokerService implements Closeable { return loadOrCreatePersistentTopic(tpName, createIfMissing, properties); }); } else { - return topics.computeIfAbsent(topicName.toString(), (name) -> { + return topics.computeIfAbsent(topicName.toString(), (name) -> { + topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.BEFORE); if (topicName.isPartitioned()) { final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> { if (topicName.getPartitionIndex() < metadata.partitions) { - return createNonPersistentTopic(name); + topicEventsDispatcher + .notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE); + + CompletableFuture<Optional<Topic>> res = createNonPersistentTopic(name); + + CompletableFuture<Optional<Topic>> eventFuture = topicEventsDispatcher + .notifyOnCompletion(res, topicName.toString(), TopicEvent.CREATE); + topicEventsDispatcher + .notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD); + return res; } + topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE); return CompletableFuture.completedFuture(Optional.empty()); }); } else if (createIfMissing) { - return createNonPersistentTopic(name); + topicEventsDispatcher.notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE); + + CompletableFuture<Optional<Topic>> res = createNonPersistentTopic(name); + + CompletableFuture<Optional<Topic>> eventFuture = topicEventsDispatcher + .notifyOnCompletion(res, topicName.toString(), TopicEvent.CREATE); + topicEventsDispatcher + .notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD); + return res; } else { + topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE); return CompletableFuture.completedFuture(Optional.empty()); } - }); + }); } } catch (IllegalArgumentException e) { log.warn("[{}] Illegalargument exception when loading topic", topicName, e); @@ -1056,6 +1090,13 @@ public class BrokerService implements Closeable { } public CompletableFuture<Void> deleteTopic(String topic, boolean forceDelete) { + topicEventsDispatcher.notify(topic, TopicEvent.DELETE, EventStage.BEFORE); + CompletableFuture<Void> result = deleteTopicInternal(topic, forceDelete); + topicEventsDispatcher.notifyOnCompletion(result, topic, TopicEvent.DELETE); + return result; + } + + private CompletableFuture<Void> deleteTopicInternal(String topic, boolean forceDelete) { TopicName topicName = TopicName.get(topic); Optional<Topic> optTopic = getTopicReference(topic); @@ -1402,7 +1443,7 @@ public class BrokerService implements Closeable { log.debug("Broker is unable to load persistent topic {}", topic); } topicFuture.completeExceptionally(new NotAllowedException( - "Broker is not unable to load persistent topic")); + "Broker is unable to load persistent topic")); return topicFuture; } @@ -1542,6 +1583,24 @@ public class BrokerService implements Closeable { managedLedgerConfig.setShadowSourceName(TopicName.get(shadowSource).getPersistenceNamingEncoding()); } + topicEventsDispatcher.notify(topic, TopicEvent.LOAD, EventStage.BEFORE); + // load can fail with topicFuture completed non-exceptionally + // work around this + final CompletableFuture<Void> loadFuture = new CompletableFuture<>(); + topicFuture.whenComplete((res, ex) -> { + if (ex == null) { + loadFuture.complete(null); + } else { + loadFuture.completeExceptionally(ex); + } + }); + + if (createIfMissing) { + topicEventsDispatcher.notify(topic, TopicEvent.CREATE, EventStage.BEFORE); + topicEventsDispatcher.notifyOnCompletion(topicFuture, topic, TopicEvent.CREATE); + } + topicEventsDispatcher.notifyOnCompletion(loadFuture, topic, TopicEvent.LOAD); + // Once we have the configuration, we can proceed with the async open operation managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), managedLedgerConfig, new OpenLedgerCallback() { @@ -1603,6 +1662,7 @@ public class BrokerService implements Closeable { public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { if (!createIfMissing && exception instanceof ManagedLedgerNotFoundException) { // We were just trying to load a topic and the topic doesn't exist + loadFuture.completeExceptionally(exception); topicFuture.complete(Optional.empty()); } else { log.warn("Failed to create topic {}", topic, exception); @@ -2135,6 +2195,8 @@ public class BrokerService implements Closeable { String bundleName = namespaceBundle.toString(); String namespaceName = TopicName.get(topic).getNamespaceObject().toString(); + topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD, EventStage.BEFORE); + synchronized (multiLayerTopicsMap) { ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> namespaceMap = multiLayerTopicsMap .get(namespaceName); @@ -2169,6 +2231,7 @@ public class BrokerService implements Closeable { if (compactor != null) { compactor.getStats().removeTopic(topic); } + topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD, EventStage.SUCCESS); } public int getNumberOfNamespaceBundles() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsDispatcher.java new file mode 100644 index 00000000000..a706e00db90 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsDispatcher.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import lombok.extern.slf4j.Slf4j; + +/** + * Utility class to dispatch topic events. + */ +@Slf4j +public class TopicEventsDispatcher { + private final List<TopicEventsListener> topicEventListeners = new CopyOnWriteArrayList<>(); + + /** + * Adds listeners, ignores null listeners. + * @param listeners + */ + public void addTopicEventListener(TopicEventsListener... listeners) { + Objects.requireNonNull(listeners); + Arrays.stream(listeners) + .filter(x -> x != null) + .forEach(topicEventListeners::add); + } + + /** + * Removes listeners. + * @param listeners + */ + public void removeTopicEventListener(TopicEventsListener... listeners) { + Objects.requireNonNull(listeners); + Arrays.stream(listeners) + .filter(x -> x != null) + .forEach(topicEventListeners::remove); + } + + /** + * Dispatches notification to all currently added listeners. + * @param topic + * @param event + * @param stage + */ + public void notify(String topic, + TopicEventsListener.TopicEvent event, + TopicEventsListener.EventStage stage) { + notify(topic, event, stage, null); + } + + /** + * Dispatches notification to all currently added listeners. + * @param topic + * @param event + * @param stage + * @param t + */ + public void notify(String topic, + TopicEventsListener.TopicEvent event, + TopicEventsListener.EventStage stage, + Throwable t) { + topicEventListeners + .forEach(listener -> notify(listener, topic, event, stage, t)); + } + + /** + * Dispatches SUCCESS/FAILURE notification to all currently added listeners on completion of the future. + * @param future + * @param topic + * @param event + * @param <T> + * @return future of a new completion stage + */ + public <T> CompletableFuture<T> notifyOnCompletion(CompletableFuture<T> future, + String topic, + TopicEventsListener.TopicEvent event) { + return future.whenComplete((r, ex) -> notify(topic, + event, + ex == null ? TopicEventsListener.EventStage.SUCCESS : TopicEventsListener.EventStage.FAILURE, + ex)); + } + + /** + * Dispatches notification to specified listeners. + * @param listeners + * @param topic + * @param event + * @param stage + * @param t + */ + public static void notify(TopicEventsListener[] listeners, + String topic, + TopicEventsListener.TopicEvent event, + TopicEventsListener.EventStage stage, + Throwable t) { + Objects.requireNonNull(listeners); + for (TopicEventsListener listener: listeners) { + notify(listener, topic, event, stage, t); + } + } + + private static void notify(TopicEventsListener listener, + String topic, + TopicEventsListener.TopicEvent event, + TopicEventsListener.EventStage stage, + Throwable t) { + if (listener == null) { + return; + } + + try { + listener.handleEvent(topic, event, stage, t); + } catch (Throwable ex) { + log.error("TopicEventsListener {} exception while handling {}_{} for topic {}", + listener, event, stage, topic, ex); + } + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsListener.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsListener.java new file mode 100644 index 00000000000..8068067206c --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsListener.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; + +/** + * Listener for the Topic events. + */ +@InterfaceStability.Evolving +@InterfaceAudience.LimitedPrivate +public interface TopicEventsListener { + + /** + * Types of events currently supported. + * create/load/unload/delete + */ + enum TopicEvent { + // create events included into load events + CREATE, + LOAD, + UNLOAD, + DELETE, + } + + /** + * Stages of events currently supported. + * before starting the event/successful completion/failed completion + */ + enum EventStage { + BEFORE, + SUCCESS, + FAILURE + } + + /** + * Handle topic event. + * Choice of the thread / maintenance of the thread pool is up to the event handlers. + * @param topicName - name of the topic + * @param event - TopicEvent + * @param stage - EventStage + * @param t - exception in case of FAILURE, if present/known + */ + void handleEvent(String topicName, TopicEvent event, EventStage stage, Throwable t); +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java new file mode 100644 index 00000000000..e6459bbf74c --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker; + +import com.google.common.collect.Sets; + +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +@Slf4j +public class TopicEventsListenerTest extends BrokerTestBase { + + final static Queue<String> events = new ConcurrentLinkedQueue<>(); + volatile String topicNameToWatch; + String namespace; + + @DataProvider(name = "topicType") + public static Object[][] topicType() { + return new Object[][] { + {"persistent", "partitioned", true}, + {"persistent", "non-partitioned", true}, + {"non-persistent", "partitioned", true}, + {"non-persistent", "non-partitioned", true}, + {"persistent", "partitioned", false}, + {"persistent", "non-partitioned", false}, + {"non-persistent", "partitioned", false}, + {"non-persistent", "non-partitioned", false} + }; + } + + @DataProvider(name = "topicTypeNoDelete") + public static Object[][] topicTypeNoDelete() { + return new Object[][] { + {"persistent", "partitioned"}, + {"persistent", "non-partitioned"}, + {"non-persistent", "partitioned"}, + {"non-persistent", "non-partitioned"} + }; + } + + @BeforeClass + @Override + protected void setup() throws Exception { + super.baseSetup(); + pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true); + + pulsar.getBrokerService().addTopicEventListener((topic, event, stage, t) -> { + log.info("got event {}__{} for topic {}", event, stage, topic); + if (topic.equals(topicNameToWatch)) { + if (log.isDebugEnabled()) { + log.debug("got event {}__{} for topic {} with detailed stack", + event, stage, topic, new Exception("tracing event source")); + } + events.add(event.toString() + "__" + stage.toString()); + } + }); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @BeforeMethod + protected void setupTest() throws Exception { + namespace = "prop/" + UUID.randomUUID(); + admin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); + assertTrue(admin.namespaces().getNamespaces("prop").contains(namespace)); + admin.namespaces().setRetention(namespace, new RetentionPolicies(3, 10)); + try (PulsarAdmin admin2 = createPulsarAdmin()) { + Awaitility.await().untilAsserted(() -> + assertEquals(admin2.namespaces().getRetention(namespace), new RetentionPolicies(3, 10))); + } + + events.clear(); + } + + @AfterMethod(alwaysRun = true) + protected void cleanupTest() throws Exception { + deleteNamespaceWithRetry(namespace, true); + } + + @Test(dataProvider = "topicType") + public void testEvents(String topicTypePersistence, String topicTypePartitioned, + boolean forceDelete) throws Exception { + String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID(); + + createTopicAndVerifyEvents(topicTypePartitioned, topicName); + + events.clear(); + if (topicTypePartitioned.equals("partitioned")) { + admin.topics().deletePartitionedTopic(topicName, forceDelete); + } else { + admin.topics().delete(topicName, forceDelete); + } + + Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() -> + Assert.assertEquals(events.toArray(), new String[]{ + "DELETE__BEFORE", + "UNLOAD__BEFORE", + "UNLOAD__SUCCESS", + "DELETE__SUCCESS" + }) + ); + } + + @Test(dataProvider = "topicType") + public void testEventsWithUnload(String topicTypePersistence, String topicTypePartitioned, + boolean forceDelete) throws Exception { + String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID(); + + createTopicAndVerifyEvents(topicTypePartitioned, topicName); + + events.clear(); + admin.topics().unload(topicName); + + Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() -> + Assert.assertEquals(events.toArray(), new String[]{ + "UNLOAD__BEFORE", + "UNLOAD__SUCCESS" + }) + ); + + events.clear(); + if (topicTypePartitioned.equals("partitioned")) { + admin.topics().deletePartitionedTopic(topicName, forceDelete); + } else { + admin.topics().delete(topicName, forceDelete); + } + + Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() -> + Assert.assertEquals(events.toArray(), new String[]{ + "DELETE__BEFORE", + "DELETE__SUCCESS" + }) + ); + } + + @Test(dataProvider = "topicType") + public void testEventsActiveSub(String topicTypePersistence, String topicTypePartitioned, + boolean forceDelete) throws Exception { + String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID(); + + createTopicAndVerifyEvents(topicTypePartitioned, topicName); + + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + for (int i = 0; i < 10; i++) { + producer.send("hello".getBytes()); + } + consumer.receive(); + + events.clear(); + try { + if (topicTypePartitioned.equals("partitioned")) { + admin.topics().deletePartitionedTopic(topicName, forceDelete); + } else { + admin.topics().delete(topicName, forceDelete); + } + } catch (PulsarAdminException e) { + if (forceDelete) { + throw e; + } + assertTrue(e.getMessage().contains("Topic has active producers/subscriptions") + || e.getMessage().contains("connected producers/consumers")); + } + + final String[] expectedEvents; + + if (forceDelete) { + expectedEvents = new String[]{ + "DELETE__BEFORE", + "UNLOAD__BEFORE", + "UNLOAD__SUCCESS", + "DELETE__SUCCESS", + }; + } else { + expectedEvents = new String[]{ + "DELETE__BEFORE", + "DELETE__FAILURE" + }; + } + + Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + // only care about first 4 events max, the rest will be from client recreating deleted topic + String[] eventsToArray = (events.size() <= 4) + ? events.toArray(new String[0]) + : ArrayUtils.subarray(events.toArray(new String[0]), 0, 4); + Assert.assertEquals(eventsToArray, expectedEvents); + }); + + consumer.close(); + producer.close(); + } + + @Test(dataProvider = "topicTypeNoDelete") + public void testTopicAutoGC(String topicTypePersistence, String topicTypePartitioned) throws Exception { + String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID(); + + createTopicAndVerifyEvents(topicTypePartitioned, topicName); + + admin.namespaces().setInactiveTopicPolicies(namespace, + new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true)); + + // Remove retention + admin.namespaces().setRetention(namespace, new RetentionPolicies()); + try (PulsarAdmin admin2 = createPulsarAdmin()) { + Awaitility.await().untilAsserted(() -> + assertEquals(admin2.namespaces().getRetention(namespace), new RetentionPolicies())); + } + + events.clear(); + + runGC(); + + Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() -> + Assert.assertEquals(events.toArray(), new String[]{ + "UNLOAD__BEFORE", + "UNLOAD__SUCCESS", + }) + ); + } + + private void createTopicAndVerifyEvents(String topicTypePartitioned, String topicName) throws Exception { + final String[] expectedEvents; + if (topicTypePartitioned.equals("partitioned")) { + topicNameToWatch = topicName + "-partition-1"; + admin.topics().createPartitionedTopic(topicName, 2); + triggerPartitionsCreation(topicName); + + expectedEvents = new String[]{ + "LOAD__BEFORE", + "CREATE__BEFORE", + "CREATE__SUCCESS", + "LOAD__SUCCESS" + }; + + } else { + topicNameToWatch = topicName; + admin.topics().createNonPartitionedTopic(topicName); + + expectedEvents = new String[]{ + "LOAD__BEFORE", + "LOAD__FAILURE", + "LOAD__BEFORE", + "CREATE__BEFORE", + "CREATE__SUCCESS", + "LOAD__SUCCESS" + }; + + } + + Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() -> + Assert.assertEquals(events.toArray(), expectedEvents)); + } + + private PulsarAdmin createPulsarAdmin() throws PulsarClientException { + return PulsarAdmin.builder() + .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString()) + .build(); + } + + private void triggerPartitionsCreation(String topicName) throws Exception { + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .create(); + producer.close(); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java index 5fd4edd2a30..63f778a44f1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java @@ -79,7 +79,7 @@ public abstract class BrokerTestBase extends MockedPulsarServiceBaseTest { } } - void runGC() { + protected void runGC() { try { pulsar.getBrokerService().forEachTopic(topic -> { if (topic instanceof AbstractTopic) {