This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit bafae5d3f48c6c858e48f3cfc7e0c7f212ece2f4 Author: ran <r...@streamnative.io> AuthorDate: Thu Apr 20 16:18:06 2023 +0800 [fix][broker] Fix entry filter feature for the non-persistent topic (#20141) (cherry picked from commit 575cf2331dd3ac0048923487c6c7904eda4301e6) --- .../service/nonpersistent/NonPersistentTopic.java | 5 +-- .../broker/service/plugin/FilterEntryTest.java | 15 +++++--- .../pulsar/broker/stats/SubscriptionStatsTest.java | 42 +++++++++++----------- 3 files changed, 36 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 317b8df6b9a..33258b06726 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -26,7 +26,7 @@ import com.carrotsearch.hppc.ObjectObjectHashMap; import io.netty.buffer.ByteBuf; import io.netty.util.concurrent.FastThreadLocal; import java.util.ArrayList; -import java.util.Collections; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -199,7 +199,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol // entry internally retains data so, duplicateBuffer should be release here duplicateBuffer.release(); if (subscription.getDispatcher() != null) { - subscription.getDispatcher().sendMessages(Collections.singletonList(entry)); + // Dispatcher needs to call the set method to support entry filter feature. + subscription.getDispatcher().sendMessages(Arrays.asList(entry)); } else { // it happens when subscription is created but dispatcher is not created as consumer is not added // yet diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java index 4b9d91fbde2..b868858646c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java @@ -51,6 +51,7 @@ import org.apache.pulsar.broker.service.AbstractTopic; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.EntryFilterSupport; +import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.testcontext.PulsarTestContext; @@ -286,10 +287,16 @@ public class FilterEntryTest extends BrokerTestBase { } + @DataProvider(name = "topicProvider") + public Object[][] topicProvider() { + return new Object[][]{ + {"persistent://prop/ns-abc/topic" + UUID.randomUUID()}, + {"non-persistent://prop/ns-abc/topic" + UUID.randomUUID()}, + }; + } - @Test - public void testFilteredMsgCount() throws Throwable { - String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID(); + @Test(dataProvider = "topicProvider") + public void testFilteredMsgCount(String topic) throws Throwable { String subName = "sub"; try (Producer<String> producer = pulsarClient.newProducer(Schema.STRING) @@ -298,7 +305,7 @@ public class FilterEntryTest extends BrokerTestBase { .subscriptionName(subName).subscribe()) { // mock entry filters - PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService() + Subscription subscription = pulsar.getBrokerService() .getTopicReference(topic).get().getSubscription(subName); Dispatcher dispatcher = subscription.getDispatcher(); Field field = EntryFilterSupport.class.getDeclaredField("entryFilters"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java index bf9c1d540bf..d5e0066a86f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java @@ -211,21 +211,22 @@ public class SubscriptionStatsTest extends ProducerConsumerBase { hasFilterField.set(dispatcher, true); } - for (int i = 0; i < 100; i++) { - producer.newMessage().property("ACCEPT", " ").value(UUID.randomUUID().toString()).send(); - } - for (int i = 0; i < 100; i++) { + int rejectedCount = 100; + int acceptCount = 100; + int scheduleCount = 100; + for (int i = 0; i < rejectedCount; i++) { producer.newMessage().property("REJECT", " ").value(UUID.randomUUID().toString()).send(); } - for (int i = 0; i < 100; i++) { + for (int i = 0; i < acceptCount; i++) { + producer.newMessage().property("ACCEPT", " ").value(UUID.randomUUID().toString()).send(); + } + for (int i = 0; i < scheduleCount; i++) { producer.newMessage().property("RESCHEDULE", " ").value(UUID.randomUUID().toString()).send(); } - for (;;) { - Message<String> message = consumer.receive(10, TimeUnit.SECONDS); - if (message == null) { - break; - } + for (int i = 0; i < acceptCount; i++) { + Message<String> message = consumer.receive(1, TimeUnit.SECONDS); + Assert.assertNotNull(message); consumer.acknowledge(message); } @@ -263,12 +264,12 @@ public class SubscriptionStatsTest extends ProducerConsumerBase { .mapToDouble(m-> m.value).sum(); if (setFilter) { - Assert.assertEquals(filterAccepted, 100); - if (isPersistent) { - Assert.assertEquals(filterRejected, 100); - // Only works on the test, if there are some markers, the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount - Assert.assertEquals(throughFilter, filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter); - } + Assert.assertEquals(filterAccepted, acceptCount); + Assert.assertEquals(filterRejected, rejectedCount); + // Only works on the test, if there are some markers, + // the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount + Assert.assertEquals(throughFilter, + filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter); } else { Assert.assertEquals(throughFilter, 0D); Assert.assertEquals(filterAccepted, 0D); @@ -282,19 +283,20 @@ public class SubscriptionStatsTest extends ProducerConsumerBase { Assert.assertEquals(rescheduledMetrics.size(), 0); } - testSubscriptionStatsAdminApi(topic, subName, setFilter); + testSubscriptionStatsAdminApi(topic, subName, setFilter, acceptCount, rejectedCount); } - private void testSubscriptionStatsAdminApi(String topic, String subName, boolean setFilter) throws Exception { + private void testSubscriptionStatsAdminApi(String topic, String subName, boolean setFilter, + int acceptCount, int rejectedCount) throws Exception { boolean persistent = TopicName.get(topic).isPersistent(); TopicStats topicStats = admin.topics().getStats(topic); SubscriptionStats stats = topicStats.getSubscriptions().get(subName); Assert.assertNotNull(stats); if (setFilter) { - Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 100); + Assert.assertEquals(stats.getFilterAcceptedMsgCount(), acceptCount); if (persistent) { - Assert.assertEquals(stats.getFilterRejectedMsgCount(), 100); + Assert.assertEquals(stats.getFilterRejectedMsgCount(), rejectedCount); // Only works on the test, if there are some markers, the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount Assert.assertEquals(stats.getFilterProcessedMsgCount(), stats.getFilterAcceptedMsgCount() + stats.getFilterRejectedMsgCount()