This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bed032e714a [improve] [broker] Add additionalSystemCursorNames ignore 
list for TTL check (#22614)
bed032e714a is described below

commit bed032e714aff9f5d2594bdc80a3e7888e53b1bf
Author: Hang Chen <chenh...@apache.org>
AuthorDate: Thu May 9 20:45:56 2024 +0800

    [improve] [broker] Add additionalSystemCursorNames ignore list for TTL 
check (#22614)
---
 conf/broker.conf                                   |  4 +
 conf/standalone.conf                               |  4 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  7 ++
 .../broker/service/persistent/PersistentTopic.java |  7 +-
 .../pulsar/broker/service/MessageTTLTest.java      | 96 ++++++++++++++++++++++
 5 files changed, 117 insertions(+), 1 deletion(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 1b51ff47551..1ef68a0395c 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -180,6 +180,10 @@ backlogQuotaDefaultRetentionPolicy=producer_request_hold
 # Default ttl for namespaces if ttl is not already configured at namespace 
policies. (disable default-ttl with value 0)
 ttlDurationDefaultInSeconds=0
 
+# Additional system subscriptions that will be ignored by ttl check. The 
cursor names are comma separated.
+# Default is empty.
+# additionalSystemCursorNames=
+
 # Enable topic auto creation if new producer or consumer connected (disable 
auto creation with value false)
 allowAutoTopicCreation=true
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 51035235d4d..a8615b70293 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -121,6 +121,10 @@ backlogQuotaDefaultLimitSecond=-1
 # Default ttl for namespaces if ttl is not already configured at namespace 
policies. (disable default-ttl with value 0)
 ttlDurationDefaultInSeconds=0
 
+# Additional system subscriptions that will be ignored by ttl check.  The 
cursor names are comma separated.
+# Default is empty.
+# additionalSystemCursorNames=
+
 # Enable the deletion of inactive topics. This parameter need to cooperate 
with the allowAutoTopicCreation parameter.
 # If brokerDeleteInactiveTopicsEnabled is set to true, we should ensure that 
allowAutoTopicCreation is also set to true.
 brokerDeleteInactiveTopicsEnabled=true
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a9d170ea5de..9efe1856509 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -652,6 +652,13 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         )
     private int ttlDurationDefaultInSeconds = 0;
 
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "Additional system subscriptions that will be ignored by ttl 
check. "
+                + "The cursor names are comma separated. Default is empty."
+    )
+    private Set<String> additionalSystemCursorNames = new TreeSet<>();
+
     @FieldContext(
         category = CATEGORY_POLICIES,
         dynamic = true,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 7228bdeb2d3..28bc27f7961 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -40,6 +40,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -279,6 +280,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     private final ExecutorService orderedExecutor;
 
     private volatile CloseFutures closeFutures;
+    private Set<String> additionalSystemCursorNames = new TreeSet<>();
 
     @Getter
     private final PersistentTopicMetrics persistentTopicMetrics = new 
PersistentTopicMetrics();
@@ -414,6 +416,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         } else {
             shadowSourceTopic = null;
         }
+        additionalSystemCursorNames = 
brokerService.pulsar().getConfiguration().getAdditionalSystemCursorNames();
     }
 
     @Override
@@ -1934,7 +1937,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         int messageTtlInSeconds = topicPolicies.getMessageTTLInSeconds().get();
         if (messageTtlInSeconds != 0) {
             subscriptions.forEach((__, sub) -> {
-                if (!isCompactionSubscription(sub.getName())) {
+                if (!isCompactionSubscription(sub.getName())
+                        && (additionalSystemCursorNames.isEmpty()
+                            || 
!additionalSystemCursorNames.contains(sub.getName()))) {
                    sub.expireMessages(messageTtlInSeconds);
                 }
             });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
index 68a9a769ac1..2f5ad215a1b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
@@ -23,15 +23,23 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import 
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -138,4 +146,92 @@ public class MessageTTLTest extends BrokerTestBase {
         topicRefMock.onUpdate(topicPolicies);
         verify(topicRefMock, times(2)).checkMessageExpiry();
     }
+
+    @Test
+    public void testTtlFilteredByIgnoreSubscriptions() throws Exception {
+        String topicName = 
"persistent://prop/ns-abc/testTTLFilteredByIgnoreSubscriptions";
+        String subName = "__SUB_FILTER";
+        cleanup();
+        Set<String> ignoredSubscriptions = new HashSet<>();
+        ignoredSubscriptions.add(subName);
+        int defaultTtl = 5;
+        conf.setAdditionalSystemCursorNames(ignoredSubscriptions);
+        conf.setTtlDurationDefaultInSeconds(defaultTtl);
+        super.baseSetup();
+
+        
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName)
+                .subscribe().close();
+
+        @Cleanup
+        org.apache.pulsar.client.api.Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topicName).create();
+
+        final int messages = 10;
+
+        for (int i = 0; i < messages; i++) {
+            String message = "my-message-" + i;
+            producer.send(message);
+        }
+        producer.close();
+
+        Optional<Topic> topic = 
pulsar.getBrokerService().getTopicReference(topicName);
+        assertTrue(topic.isPresent());
+        PersistentSubscription subscription = (PersistentSubscription) 
topic.get().getSubscription(subName);
+
+        Thread.sleep((defaultTtl - 1) * 1000);
+        topic.get().checkMessageExpiry();
+        // Wait the message expire task done and make sure the message does 
not expire early.
+        Thread.sleep(1000);
+        assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10);
+        Thread.sleep(2000);
+        topic.get().checkMessageExpiry();
+        // Wait the message expire task done.
+        retryStrategically((test) -> 
subscription.getNumberOfEntriesInBacklog(false) == 0, 5, 200);
+        // The message should not expire because the subscription is ignored.
+        assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10);
+
+        conf.setAdditionalSystemCursorNames(new TreeSet<>());
+    }
+
+    @Test
+    public void testTtlWithoutIgnoreSubscriptions() throws Exception {
+        String topicName = 
"persistent://prop/ns-abc/testTTLWithoutIgnoreSubscriptions";
+        String subName = "__SUB_FILTER";
+        cleanup();
+        int defaultTtl = 5;
+        conf.setTtlDurationDefaultInSeconds(defaultTtl);
+        conf.setBrokerDeleteInactiveTopicsEnabled(false);
+        super.baseSetup();
+
+        
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName)
+                .subscribe().close();
+
+        @Cleanup
+        org.apache.pulsar.client.api.Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topicName).create();
+
+        final int messages = 10;
+
+        for (int i = 0; i < messages; i++) {
+            String message = "my-message-" + i;
+            producer.send(message);
+        }
+        producer.close();
+
+        Optional<Topic> topic = 
pulsar.getBrokerService().getTopicReference(topicName);
+        assertTrue(topic.isPresent());
+        PersistentSubscription subscription = (PersistentSubscription) 
topic.get().getSubscription(subName);
+
+        Thread.sleep((defaultTtl - 1) * 1000);
+        topic.get().checkMessageExpiry();
+        // Wait the message expire task done and make sure the message does 
not expire early.
+        Thread.sleep(1000);
+        assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10);
+        Thread.sleep(2000);
+        topic.get().checkMessageExpiry();
+        // Wait the message expire task done and make sure the message expired.
+        retryStrategically((test) -> 
subscription.getNumberOfEntriesInBacklog(false) == 0, 5, 200);
+        assertEquals(subscription.getNumberOfEntriesInBacklog(false), 0);
+    }
+
 }

Reply via email to