This is an automated email from the ASF dual-hosted git repository.

cbornet pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e8261fcef8d Revert "[fix][broker] Fix entry filter feature for the 
non-persistent topic (#20141)"
e8261fcef8d is described below

commit e8261fcef8d76061d378d8a9f3b05a5d115013fe
Author: Christophe Bornet <cbor...@hotmail.com>
AuthorDate: Mon Apr 24 12:10:50 2023 +0200

    Revert "[fix][broker] Fix entry filter feature for the non-persistent topic 
(#20141)"
    
    This reverts commit e27abe9e128fb71b65ffe06417574c9a7f3facbd.
---
 .../service/nonpersistent/NonPersistentTopic.java  |  5 ++-
 .../broker/service/plugin/FilterEntryTest.java     | 15 +++-----
 .../pulsar/broker/stats/SubscriptionStatsTest.java | 42 +++++++++++-----------
 3 files changed, 26 insertions(+), 36 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 33258b06726..317b8df6b9a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -26,7 +26,7 @@ import com.carrotsearch.hppc.ObjectObjectHashMap;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.FastThreadLocal;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -199,8 +199,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
             // entry internally retains data so, duplicateBuffer should be 
release here
             duplicateBuffer.release();
             if (subscription.getDispatcher() != null) {
-                // Dispatcher needs to call the set method to support entry 
filter feature.
-                
subscription.getDispatcher().sendMessages(Arrays.asList(entry));
+                
subscription.getDispatcher().sendMessages(Collections.singletonList(entry));
             } else {
                 // it happens when subscription is created but dispatcher is 
not created as consumer is not added
                 // yet
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index b868858646c..4b9d91fbde2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -51,7 +51,6 @@ import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.EntryFilterSupport;
-import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
@@ -287,16 +286,10 @@ public class FilterEntryTest extends BrokerTestBase {
 
     }
 
-    @DataProvider(name = "topicProvider")
-    public Object[][] topicProvider() {
-        return new Object[][]{
-                {"persistent://prop/ns-abc/topic" + UUID.randomUUID()},
-                {"non-persistent://prop/ns-abc/topic" + UUID.randomUUID()},
-        };
-    }
 
-    @Test(dataProvider = "topicProvider")
-    public void testFilteredMsgCount(String topic) throws Throwable {
+    @Test
+    public void testFilteredMsgCount() throws Throwable {
+        String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
         String subName = "sub";
 
         try (Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING)
@@ -305,7 +298,7 @@ public class FilterEntryTest extends BrokerTestBase {
                      .subscriptionName(subName).subscribe()) {
 
             // mock entry filters
-            Subscription subscription = pulsar.getBrokerService()
+            PersistentSubscription subscription = (PersistentSubscription) 
pulsar.getBrokerService()
                     .getTopicReference(topic).get().getSubscription(subName);
             Dispatcher dispatcher = subscription.getDispatcher();
             Field field = 
EntryFilterSupport.class.getDeclaredField("entryFilters");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
index d5e0066a86f..bf9c1d540bf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
@@ -211,22 +211,21 @@ public class SubscriptionStatsTest extends 
ProducerConsumerBase {
             hasFilterField.set(dispatcher, true);
         }
 
-        int rejectedCount = 100;
-        int acceptCount = 100;
-        int scheduleCount = 100;
-        for (int i = 0; i < rejectedCount; i++) {
-            producer.newMessage().property("REJECT", " 
").value(UUID.randomUUID().toString()).send();
-        }
-        for (int i = 0; i < acceptCount; i++) {
+        for (int i = 0; i < 100; i++) {
             producer.newMessage().property("ACCEPT", " 
").value(UUID.randomUUID().toString()).send();
         }
-        for (int i = 0; i < scheduleCount; i++) {
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().property("REJECT", " 
").value(UUID.randomUUID().toString()).send();
+        }
+        for (int i = 0; i < 100; i++) {
             producer.newMessage().property("RESCHEDULE", " 
").value(UUID.randomUUID().toString()).send();
         }
 
-        for (int i = 0; i < acceptCount; i++) {
-            Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
-            Assert.assertNotNull(message);
+        for (;;) {
+            Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
             consumer.acknowledge(message);
         }
 
@@ -264,12 +263,12 @@ public class SubscriptionStatsTest extends 
ProducerConsumerBase {
                     .mapToDouble(m-> m.value).sum();
 
             if (setFilter) {
-                Assert.assertEquals(filterAccepted, acceptCount);
-                Assert.assertEquals(filterRejected, rejectedCount);
-                // Only works on the test, if there are some markers,
-                // the filterProcessCount will be not equal with rejectedCount 
+ rescheduledCount + acceptCount
-                Assert.assertEquals(throughFilter,
-                        filterAccepted + filterRejected + filterRescheduled, 
0.01 * throughFilter);
+                Assert.assertEquals(filterAccepted, 100);
+                if (isPersistent) {
+                    Assert.assertEquals(filterRejected, 100);
+                    // Only works on the test, if there are some markers, the 
filterProcessCount will be not equal with rejectedCount + rescheduledCount + 
acceptCount
+                    Assert.assertEquals(throughFilter, filterAccepted + 
filterRejected + filterRescheduled, 0.01 * throughFilter);
+                }
             } else {
                 Assert.assertEquals(throughFilter, 0D);
                 Assert.assertEquals(filterAccepted, 0D);
@@ -283,20 +282,19 @@ public class SubscriptionStatsTest extends 
ProducerConsumerBase {
             Assert.assertEquals(rescheduledMetrics.size(), 0);
         }
 
-        testSubscriptionStatsAdminApi(topic, subName, setFilter, acceptCount, 
rejectedCount);
+        testSubscriptionStatsAdminApi(topic, subName, setFilter);
     }
 
-    private void testSubscriptionStatsAdminApi(String topic, String subName, 
boolean setFilter,
-                                               int acceptCount, int 
rejectedCount) throws Exception {
+    private void testSubscriptionStatsAdminApi(String topic, String subName, 
boolean setFilter) throws Exception {
         boolean persistent = TopicName.get(topic).isPersistent();
         TopicStats topicStats = admin.topics().getStats(topic);
         SubscriptionStats stats = topicStats.getSubscriptions().get(subName);
         Assert.assertNotNull(stats);
 
         if (setFilter) {
-            Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 
acceptCount);
+            Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 100);
             if (persistent) {
-                Assert.assertEquals(stats.getFilterRejectedMsgCount(), 
rejectedCount);
+                Assert.assertEquals(stats.getFilterRejectedMsgCount(), 100);
                 // Only works on the test, if there are some markers, the 
filterProcessCount will be not equal with rejectedCount + rescheduledCount + 
acceptCount
                 Assert.assertEquals(stats.getFilterProcessedMsgCount(),
                         stats.getFilterAcceptedMsgCount() + 
stats.getFilterRejectedMsgCount()

Reply via email to