This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new bed032e714a [improve] [broker] Add additionalSystemCursorNames ignore list for TTL check (#22614) bed032e714a is described below commit bed032e714aff9f5d2594bdc80a3e7888e53b1bf Author: Hang Chen <chenh...@apache.org> AuthorDate: Thu May 9 20:45:56 2024 +0800 [improve] [broker] Add additionalSystemCursorNames ignore list for TTL check (#22614) --- conf/broker.conf | 4 + conf/standalone.conf | 4 + .../apache/pulsar/broker/ServiceConfiguration.java | 7 ++ .../broker/service/persistent/PersistentTopic.java | 7 +- .../pulsar/broker/service/MessageTTLTest.java | 96 ++++++++++++++++++++++ 5 files changed, 117 insertions(+), 1 deletion(-) diff --git a/conf/broker.conf b/conf/broker.conf index 1b51ff47551..1ef68a0395c 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -180,6 +180,10 @@ backlogQuotaDefaultRetentionPolicy=producer_request_hold # Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0) ttlDurationDefaultInSeconds=0 +# Additional system subscriptions that will be ignored by ttl check. The cursor names are comma separated. +# Default is empty. +# additionalSystemCursorNames= + # Enable topic auto creation if new producer or consumer connected (disable auto creation with value false) allowAutoTopicCreation=true diff --git a/conf/standalone.conf b/conf/standalone.conf index 51035235d4d..a8615b70293 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -121,6 +121,10 @@ backlogQuotaDefaultLimitSecond=-1 # Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0) ttlDurationDefaultInSeconds=0 +# Additional system subscriptions that will be ignored by ttl check. The cursor names are comma separated. +# Default is empty. +# additionalSystemCursorNames= + # Enable the deletion of inactive topics. This parameter need to cooperate with the allowAutoTopicCreation parameter. # If brokerDeleteInactiveTopicsEnabled is set to true, we should ensure that allowAutoTopicCreation is also set to true. brokerDeleteInactiveTopicsEnabled=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index a9d170ea5de..9efe1856509 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -652,6 +652,13 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int ttlDurationDefaultInSeconds = 0; + @FieldContext( + category = CATEGORY_POLICIES, + doc = "Additional system subscriptions that will be ignored by ttl check. " + + "The cursor names are comma separated. Default is empty." + ) + private Set<String> additionalSystemCursorNames = new TreeSet<>(); + @FieldContext( category = CATEGORY_POLICIES, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 7228bdeb2d3..28bc27f7961 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -40,6 +40,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -279,6 +280,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal private final ExecutorService orderedExecutor; private volatile CloseFutures closeFutures; + private Set<String> additionalSystemCursorNames = new TreeSet<>(); @Getter private final PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics(); @@ -414,6 +416,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } else { shadowSourceTopic = null; } + additionalSystemCursorNames = brokerService.pulsar().getConfiguration().getAdditionalSystemCursorNames(); } @Override @@ -1934,7 +1937,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal int messageTtlInSeconds = topicPolicies.getMessageTTLInSeconds().get(); if (messageTtlInSeconds != 0) { subscriptions.forEach((__, sub) -> { - if (!isCompactionSubscription(sub.getName())) { + if (!isCompactionSubscription(sub.getName()) + && (additionalSystemCursorNames.isEmpty() + || !additionalSystemCursorNames.contains(sub.getName()))) { sub.expireMessages(messageTtlInSeconds); } }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java index 68a9a769ac1..2f5ad215a1b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java @@ -23,15 +23,23 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; @@ -138,4 +146,92 @@ public class MessageTTLTest extends BrokerTestBase { topicRefMock.onUpdate(topicPolicies); verify(topicRefMock, times(2)).checkMessageExpiry(); } + + @Test + public void testTtlFilteredByIgnoreSubscriptions() throws Exception { + String topicName = "persistent://prop/ns-abc/testTTLFilteredByIgnoreSubscriptions"; + String subName = "__SUB_FILTER"; + cleanup(); + Set<String> ignoredSubscriptions = new HashSet<>(); + ignoredSubscriptions.add(subName); + int defaultTtl = 5; + conf.setAdditionalSystemCursorNames(ignoredSubscriptions); + conf.setTtlDurationDefaultInSeconds(defaultTtl); + super.baseSetup(); + + pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName) + .subscribe().close(); + + @Cleanup + org.apache.pulsar.client.api.Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .enableBatching(false).topic(topicName).create(); + + final int messages = 10; + + for (int i = 0; i < messages; i++) { + String message = "my-message-" + i; + producer.send(message); + } + producer.close(); + + Optional<Topic> topic = pulsar.getBrokerService().getTopicReference(topicName); + assertTrue(topic.isPresent()); + PersistentSubscription subscription = (PersistentSubscription) topic.get().getSubscription(subName); + + Thread.sleep((defaultTtl - 1) * 1000); + topic.get().checkMessageExpiry(); + // Wait the message expire task done and make sure the message does not expire early. + Thread.sleep(1000); + assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10); + Thread.sleep(2000); + topic.get().checkMessageExpiry(); + // Wait the message expire task done. + retryStrategically((test) -> subscription.getNumberOfEntriesInBacklog(false) == 0, 5, 200); + // The message should not expire because the subscription is ignored. + assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10); + + conf.setAdditionalSystemCursorNames(new TreeSet<>()); + } + + @Test + public void testTtlWithoutIgnoreSubscriptions() throws Exception { + String topicName = "persistent://prop/ns-abc/testTTLWithoutIgnoreSubscriptions"; + String subName = "__SUB_FILTER"; + cleanup(); + int defaultTtl = 5; + conf.setTtlDurationDefaultInSeconds(defaultTtl); + conf.setBrokerDeleteInactiveTopicsEnabled(false); + super.baseSetup(); + + pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName) + .subscribe().close(); + + @Cleanup + org.apache.pulsar.client.api.Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .enableBatching(false).topic(topicName).create(); + + final int messages = 10; + + for (int i = 0; i < messages; i++) { + String message = "my-message-" + i; + producer.send(message); + } + producer.close(); + + Optional<Topic> topic = pulsar.getBrokerService().getTopicReference(topicName); + assertTrue(topic.isPresent()); + PersistentSubscription subscription = (PersistentSubscription) topic.get().getSubscription(subName); + + Thread.sleep((defaultTtl - 1) * 1000); + topic.get().checkMessageExpiry(); + // Wait the message expire task done and make sure the message does not expire early. + Thread.sleep(1000); + assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10); + Thread.sleep(2000); + topic.get().checkMessageExpiry(); + // Wait the message expire task done and make sure the message expired. + retryStrategically((test) -> subscription.getNumberOfEntriesInBacklog(false) == 0, 5, 200); + assertEquals(subscription.getNumberOfEntriesInBacklog(false), 0); + } + }