This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new b6f464ea0a1 [fix][broker] Disable system topic message deduplication 
(#22582)
b6f464ea0a1 is described below

commit b6f464ea0a17786fb857aac5111dcc394cba8f56
Author: Qiang Zhao <mattisonc...@apache.org>
AuthorDate: Wed May 8 10:53:53 2024 +0800

    [fix][broker] Disable system topic message deduplication (#22582)
---
 .../org/apache/pulsar/broker/service/Topic.java    | 10 +++++++
 .../service/persistent/MessageDeduplication.java   |  6 +---
 .../broker/service/persistent/PersistentTopic.java |  9 +++---
 .../broker/service/persistent/SystemTopic.java     | 16 +++++++++++
 .../service/persistent/MessageDuplicationTest.java | 32 ++++++++++++++++++++++
 5 files changed, 64 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 1da8cfce4ee..c2eefcd18e3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -213,6 +213,16 @@ public interface Topic {
 
     void checkCursorsToCacheEntries();
 
+    /**
+     * Indicate if the current topic enabled server side deduplication.
+     * This is a dynamic configuration, user may update it by namespace/topic 
policies.
+     *
+     * @return whether enabled server side deduplication
+     */
+    default boolean isDeduplicationEnabled() {
+        return false;
+    }
+
     void checkDeduplicationSnapshot();
 
     void checkMessageExpiry();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index e508661364d..ab3b799093b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -217,7 +217,7 @@ public class MessageDeduplication {
      * returning a future to track the completion of the task
      */
     public CompletableFuture<Void> checkStatus() {
-        boolean shouldBeEnabled = isDeduplicationEnabled();
+        boolean shouldBeEnabled = topic.isDeduplicationEnabled();
         synchronized (this) {
             if (status == Status.Recovering || status == Status.Removing) {
                 // If there's already a transition happening, check later for 
status
@@ -472,10 +472,6 @@ public class MessageDeduplication {
         }, null);
     }
 
-    private boolean isDeduplicationEnabled() {
-        return 
topic.getHierarchyTopicPolicies().getDeduplicationEnabled().get();
-    }
-
     /**
      * Topic will call this method whenever a producer connects.
      */
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e99bd1425f4..60eb700fc06 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2146,10 +2146,6 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return future;
     }
 
-    public boolean isDeduplicationEnabled() {
-        return messageDeduplication.isEnabled();
-    }
-
     @Override
     public int getNumberOfConsumers() {
         int count = 0;
@@ -4080,6 +4076,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return ledger.isMigrated();
     }
 
+    public boolean isDeduplicationEnabled() {
+        return getHierarchyTopicPolicies().getDeduplicationEnabled().get();
+    }
+
     public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID 
txnID, String subName) {
         return 
this.subscriptions.get(subName).getTransactionInPendingAckStats(txnID);
     }
@@ -4104,4 +4104,5 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     public Optional<TopicName> getShadowSourceTopic() {
         return Optional.ofNullable(shadowSourceTopic);
     }
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
index 720ae3c5189..f2cec2138a3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
@@ -80,6 +80,22 @@ public class SystemTopic extends PersistentTopic {
         return !NamespaceService.isHeartbeatNamespace(TopicName.get(topic));
     }
 
+    @Override
+    public boolean isDeduplicationEnabled() {
+        /*
+            Disable deduplication on system topic to avoid recovering 
deduplication WAL
+            (especially from offloaded topic).
+            Because the system topic usually is a precondition of other 
topics. therefore,
+            we should pay attention on topic loading time.
+
+            Note: If the system topic loading timeout may cause dependent 
topics to fail to run.
+
+            Dependency diagram: normal topic --rely on--> system topic --rely 
on--> deduplication recover
+                                --may rely on--> (tiered storage)
+         */
+        return false;
+    }
+
     @Override
     public boolean isEncryptionRequired() {
         // System topics are only written by the broker that can't know the 
encryption context.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index 402b5c4972c..f034717ccf2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -37,6 +37,8 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
 import java.lang.reflect.Field;
 import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -50,9 +52,11 @@ import org.apache.pulsar.broker.service.BacklogQuotaManager;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.broker.qos.AsyncTokenBucket;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -490,4 +494,32 @@ public class MessageDuplicationTest extends BrokerTestBase 
{
         messageDeduplication.purgeInactiveProducers();
         assertTrue(messageDeduplication.getInactiveProducers().isEmpty());
     }
+
+
+    @Test
+    public void testMessageDeduplicationShouldNotWorkForSystemTopic() throws 
PulsarAdminException {
+        final String localName = UUID.randomUUID().toString();
+        final String namespace = "prop/ns-abc";
+        final String prefix = "persistent://%s/".formatted(namespace);
+        final String topic = prefix + localName;
+        admin.topics().createNonPartitionedTopic(topic);
+
+        // broker level policies
+        final String eventSystemTopic = prefix + 
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
+        final Optional<Topic> optionalTopic = 
pulsar.getBrokerService().getTopic(eventSystemTopic, true).join();
+        assertTrue(optionalTopic.isPresent());
+        final Topic ptRef = optionalTopic.get();
+        assertTrue(ptRef.isSystemTopic());
+        assertFalse(ptRef.isDeduplicationEnabled());
+
+        // namespace level policies
+        admin.namespaces().setDeduplicationStatus(namespace, true);
+        assertTrue(ptRef.isSystemTopic());
+        assertFalse(ptRef.isDeduplicationEnabled());
+
+        // topic level policies
+        admin.topicPolicies().setDeduplicationStatus(eventSystemTopic, true);
+        assertTrue(ptRef.isSystemTopic());
+        assertFalse(ptRef.isDeduplicationEnabled());
+    }
 }

Reply via email to