This is an automated email from the ASF dual-hosted git repository. cbornet pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new e8261fcef8d Revert "[fix][broker] Fix entry filter feature for the non-persistent topic (#20141)" e8261fcef8d is described below commit e8261fcef8d76061d378d8a9f3b05a5d115013fe Author: Christophe Bornet <cbor...@hotmail.com> AuthorDate: Mon Apr 24 12:10:50 2023 +0200 Revert "[fix][broker] Fix entry filter feature for the non-persistent topic (#20141)" This reverts commit e27abe9e128fb71b65ffe06417574c9a7f3facbd. --- .../service/nonpersistent/NonPersistentTopic.java | 5 ++- .../broker/service/plugin/FilterEntryTest.java | 15 +++----- .../pulsar/broker/stats/SubscriptionStatsTest.java | 42 +++++++++++----------- 3 files changed, 26 insertions(+), 36 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 33258b06726..317b8df6b9a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -26,7 +26,7 @@ import com.carrotsearch.hppc.ObjectObjectHashMap; import io.netty.buffer.ByteBuf; import io.netty.util.concurrent.FastThreadLocal; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -199,8 +199,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol // entry internally retains data so, duplicateBuffer should be release here duplicateBuffer.release(); if (subscription.getDispatcher() != null) { - // Dispatcher needs to call the set method to support entry filter feature. - subscription.getDispatcher().sendMessages(Arrays.asList(entry)); + subscription.getDispatcher().sendMessages(Collections.singletonList(entry)); } else { // it happens when subscription is created but dispatcher is not created as consumer is not added // yet diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java index b868858646c..4b9d91fbde2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java @@ -51,7 +51,6 @@ import org.apache.pulsar.broker.service.AbstractTopic; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.EntryFilterSupport; -import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.testcontext.PulsarTestContext; @@ -287,16 +286,10 @@ public class FilterEntryTest extends BrokerTestBase { } - @DataProvider(name = "topicProvider") - public Object[][] topicProvider() { - return new Object[][]{ - {"persistent://prop/ns-abc/topic" + UUID.randomUUID()}, - {"non-persistent://prop/ns-abc/topic" + UUID.randomUUID()}, - }; - } - @Test(dataProvider = "topicProvider") - public void testFilteredMsgCount(String topic) throws Throwable { + @Test + public void testFilteredMsgCount() throws Throwable { + String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID(); String subName = "sub"; try (Producer<String> producer = pulsarClient.newProducer(Schema.STRING) @@ -305,7 +298,7 @@ public class FilterEntryTest extends BrokerTestBase { .subscriptionName(subName).subscribe()) { // mock entry filters - Subscription subscription = pulsar.getBrokerService() + PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService() .getTopicReference(topic).get().getSubscription(subName); Dispatcher dispatcher = subscription.getDispatcher(); Field field = EntryFilterSupport.class.getDeclaredField("entryFilters"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java index d5e0066a86f..bf9c1d540bf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java @@ -211,22 +211,21 @@ public class SubscriptionStatsTest extends ProducerConsumerBase { hasFilterField.set(dispatcher, true); } - int rejectedCount = 100; - int acceptCount = 100; - int scheduleCount = 100; - for (int i = 0; i < rejectedCount; i++) { - producer.newMessage().property("REJECT", " ").value(UUID.randomUUID().toString()).send(); - } - for (int i = 0; i < acceptCount; i++) { + for (int i = 0; i < 100; i++) { producer.newMessage().property("ACCEPT", " ").value(UUID.randomUUID().toString()).send(); } - for (int i = 0; i < scheduleCount; i++) { + for (int i = 0; i < 100; i++) { + producer.newMessage().property("REJECT", " ").value(UUID.randomUUID().toString()).send(); + } + for (int i = 0; i < 100; i++) { producer.newMessage().property("RESCHEDULE", " ").value(UUID.randomUUID().toString()).send(); } - for (int i = 0; i < acceptCount; i++) { - Message<String> message = consumer.receive(1, TimeUnit.SECONDS); - Assert.assertNotNull(message); + for (;;) { + Message<String> message = consumer.receive(10, TimeUnit.SECONDS); + if (message == null) { + break; + } consumer.acknowledge(message); } @@ -264,12 +263,12 @@ public class SubscriptionStatsTest extends ProducerConsumerBase { .mapToDouble(m-> m.value).sum(); if (setFilter) { - Assert.assertEquals(filterAccepted, acceptCount); - Assert.assertEquals(filterRejected, rejectedCount); - // Only works on the test, if there are some markers, - // the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount - Assert.assertEquals(throughFilter, - filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter); + Assert.assertEquals(filterAccepted, 100); + if (isPersistent) { + Assert.assertEquals(filterRejected, 100); + // Only works on the test, if there are some markers, the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount + Assert.assertEquals(throughFilter, filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter); + } } else { Assert.assertEquals(throughFilter, 0D); Assert.assertEquals(filterAccepted, 0D); @@ -283,20 +282,19 @@ public class SubscriptionStatsTest extends ProducerConsumerBase { Assert.assertEquals(rescheduledMetrics.size(), 0); } - testSubscriptionStatsAdminApi(topic, subName, setFilter, acceptCount, rejectedCount); + testSubscriptionStatsAdminApi(topic, subName, setFilter); } - private void testSubscriptionStatsAdminApi(String topic, String subName, boolean setFilter, - int acceptCount, int rejectedCount) throws Exception { + private void testSubscriptionStatsAdminApi(String topic, String subName, boolean setFilter) throws Exception { boolean persistent = TopicName.get(topic).isPersistent(); TopicStats topicStats = admin.topics().getStats(topic); SubscriptionStats stats = topicStats.getSubscriptions().get(subName); Assert.assertNotNull(stats); if (setFilter) { - Assert.assertEquals(stats.getFilterAcceptedMsgCount(), acceptCount); + Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 100); if (persistent) { - Assert.assertEquals(stats.getFilterRejectedMsgCount(), rejectedCount); + Assert.assertEquals(stats.getFilterRejectedMsgCount(), 100); // Only works on the test, if there are some markers, the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount Assert.assertEquals(stats.getFilterProcessedMsgCount(), stats.getFilterAcceptedMsgCount() + stats.getFilterRejectedMsgCount()