jai1 closed pull request #1715: Broker should not start replicator for root 
partitioned-topic
URL: https://github.com/apache/incubator-pulsar/pull/1715
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 1913dd5ff3..d5480ccc8e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -18,17 +18,22 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.apache.pulsar.broker.web.PulsarWebResource.path;
+
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.DestinationName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,8 +62,9 @@
         Stopped, Starting, Started, Stopping
     }
 
-    public AbstractReplicator(String topicName, String replicatorPrefix, 
String localCluster,
-            String remoteCluster, BrokerService brokerService) {
+    public AbstractReplicator(String topicName, String replicatorPrefix, 
String localCluster, String remoteCluster,
+            BrokerService brokerService) throws NamingException {
+        validatePartitionedTopic(topicName, brokerService);
         this.brokerService = brokerService;
         this.topicName = topicName;
         this.replicatorPrefix = replicatorPrefix;
@@ -67,7 +73,6 @@ public AbstractReplicator(String topicName, String 
replicatorPrefix, String loca
         this.client = (PulsarClientImpl) 
brokerService.getReplicationClient(remoteCluster);
         this.producer = null;
         this.producerQueueSize = 
brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
-
         this.producerConfiguration = new ProducerConfiguration();
         this.producerConfiguration.setSendTimeout(0, TimeUnit.SECONDS);
         this.producerConfiguration.setMaxPendingMessages(producerQueueSize);
@@ -214,5 +219,42 @@ public static String getReplicatorName(String 
replicatorPrefix, String cluster)
         return (replicatorPrefix + "." + cluster).intern();
     }
 
+    /**
+     * Replication can't be started on root-partitioned-topic to avoid 
producer startup conflict.
+     * 
+     * <pre>
+     * eg:
+     * if topic : persistent://prop/cluster/ns/my-topic is a partitioned topic 
with 2 partitions then
+     * broker explicitly creates replicator producer for: 
"my-topic-partition-1" and "my-topic-partition-2".
+     * 
+     * However, if broker tries to start producer with root topic "my-topic" 
then client-lib internally creates individual 
+     * producers for "my-topic-partition-1" and "my-topic-partition-2" which 
creates conflict with existing 
+     * replicator producers.
+     * </pre>
+     * 
+     * Therefore, replicator can't be started on root-partition topic which 
can internally create multiple partitioned
+     * producers.
+     * 
+     * @param topicName
+     * @param brokerService
+     */
+    private void validatePartitionedTopic(String topicName, BrokerService 
brokerService) throws NamingException {
+        DestinationName destination = DestinationName.get(topicName);
+        String partitionedTopicPath = 
path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE,
+                destination.getNamespace().toString(), 
destination.getDomain().toString(),
+                destination.getEncodedLocalName());
+        boolean isPartitionedTopic = false;
+        try {
+            isPartitionedTopic = 
brokerService.pulsar().getConfigurationCache().policiesCache()
+                    .get(partitionedTopicPath).isPresent();
+        } catch (Exception e) {
+            log.warn("Failed to verify partitioned topic {}-{}", topicName, 
e.getMessage());
+        }
+        if (isPartitionedTopic) {
+            throw new NamingException(
+                    topicName + " is a partitioned-topic and replication can't 
be started for partitioned-producer ");
+        }
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(AbstractReplicator.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2f3b27a75b..1b5cadee84 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -65,6 +65,7 @@
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -583,29 +584,37 @@ private void createPersistentTopic(final String topic, 
CompletableFuture<Topic>
                     new OpenLedgerCallback() {
                         @Override
                         public void openLedgerComplete(ManagedLedger ledger, 
Object ctx) {
-                            PersistentTopic persistentTopic = new 
PersistentTopic(topic, ledger, BrokerService.this);
-
-                            CompletableFuture<Void> replicationFuture = 
persistentTopic.checkReplication();
-                            replicationFuture.thenCompose(v -> {
-                                // Also check dedup status
-                                return 
persistentTopic.checkDeduplicationStatus();
-                            }).thenRun(() -> {
-                                log.info("Created topic {} - dedup is {}", 
topic,
-                                        
persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled");
-                                long topicLoadLatencyMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
-                                        - topicCreateTimeMs;
-                                pulsarStats.recordTopicLoadTimeValue(topic, 
topicLoadLatencyMs);
-                                addTopicToStatsMaps(destinationName, 
persistentTopic);
-                                topicFuture.complete(persistentTopic);
-                            }).exceptionally((ex) -> {
-                                log.warn("Replication or dedup check failed. 
Removing topic from topics list {}, {}", topic, ex);
-                                
persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
-                                    topics.remove(topic, topicFuture);
-                                    topicFuture.completeExceptionally(ex);
+                            try {
+                                PersistentTopic persistentTopic = new 
PersistentTopic(topic, ledger,
+                                        BrokerService.this);
+                                CompletableFuture<Void> replicationFuture = 
persistentTopic.checkReplication();
+                                replicationFuture.thenCompose(v -> {
+                                    // Also check dedup status
+                                    return 
persistentTopic.checkDeduplicationStatus();
+                                }).thenRun(() -> {
+                                    log.info("Created topic {} - dedup is {}", 
topic,
+                                            
persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled");
+                                    long topicLoadLatencyMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
+                                            - topicCreateTimeMs;
+                                    
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
+                                    addTopicToStatsMaps(destinationName, 
persistentTopic);
+                                    topicFuture.complete(persistentTopic);
+                                }).exceptionally((ex) -> {
+                                    log.warn(
+                                            "Replication or dedup check 
failed. Removing topic from topics list {}, {}",
+                                            topic, ex);
+                                    
persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+                                        topics.remove(topic, topicFuture);
+                                        topicFuture.completeExceptionally(ex);
+                                    });
+
+                                    return null;
                                 });
-
-                                return null;
-                            });
+                            } catch (NamingException e) {
+                                log.warn("Failed to create topic {}-{}", 
topic, e.getMessage());
+                                pulsar.getExecutor().submit(() -> 
topics.remove(topic, topicFuture));
+                                topicFuture.completeExceptionally(e);
+                            }
                         }
 
                         @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index d914aa7ece..44bb27d4e6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -26,6 +26,7 @@
 import org.apache.bookkeeper.mledger.util.Rate;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.Replicator;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.client.api.MessageId;
@@ -49,7 +50,7 @@
     private final NonPersistentReplicatorStats stats = new 
NonPersistentReplicatorStats();
 
     public NonPersistentReplicator(NonPersistentTopic topic, String 
localCluster, String remoteCluster,
-            BrokerService brokerService) {
+            BrokerService brokerService) throws NamingException {
         super(topic.getName(), topic.replicatorPrefix, localCluster, 
remoteCluster, brokerService);
 
         producerConfiguration.setBlockIfQueueFull(false);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 44a9e14434..511e7f8535 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -270,17 +270,6 @@ private boolean hasLocalProducers() {
         return foundLocal.get();
     }
 
-    private boolean hasRemoteProducers() {
-        AtomicBoolean foundRemote = new AtomicBoolean(false);
-        producers.forEach(producer -> {
-            if (producer.isRemote()) {
-                foundRemote.set(true);
-            }
-        });
-
-        return foundRemote.get();
-    }
-
     @Override
     public void removeProducer(Producer producer) {
         checkArgument(producer.getTopic() == this);
@@ -520,7 +509,12 @@ void removeSubscription(String subscriptionName) {
             }
 
             if (!replicators.containsKey(cluster)) {
-                startReplicator(cluster);
+                if (!startReplicator(cluster)) {
+                    // it happens when global topic is a partitioned topic and 
replicator can't start on original
+                    // non partitioned-topic (topic without partition prefix)
+                    return FutureUtil
+                            .failedFuture(new NamingException(topic + " failed 
to start replicator for " + cluster));
+                }
             }
         }
 
@@ -535,13 +529,30 @@ void removeSubscription(String subscriptionName) {
         return FutureUtil.waitForAll(futures);
     }
 
-    void startReplicator(String remoteCluster) {
+    boolean startReplicator(String remoteCluster) {
         log.info("[{}] Starting replicator to remote: {}", topic, 
remoteCluster);
         String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
-        replicators.computeIfAbsent(remoteCluster,
-                r -> new NonPersistentReplicator(NonPersistentTopic.this, 
localCluster, remoteCluster, brokerService));
+        return addReplicationCluster(remoteCluster,NonPersistentTopic.this, 
localCluster);
     }
 
+    protected boolean addReplicationCluster(String remoteCluster, 
NonPersistentTopic nonPersistentTopic, String localCluster) {
+        AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
+        replicators.computeIfAbsent(remoteCluster, r -> {
+            try {
+                return new NonPersistentReplicator(NonPersistentTopic.this, 
localCluster, remoteCluster, brokerService);
+            } catch (NamingException e) {
+                isReplicatorStarted.set(false);
+                log.error("[{}] Replicator startup failed due to 
partitioned-topic {}", topic, remoteCluster);
+            }
+            return null;
+        });
+        // clean up replicator if startup is failed
+        if (!isReplicatorStarted.get()) {
+            replicators.remove(remoteCluster);
+        }
+        return isReplicatorStarted.get();
+    }
+    
     CompletableFuture<Void> removeReplicator(String remoteCluster) {
         log.info("[{}] Removing replicator to {}", topic, remoteCluster);
         final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -913,6 +924,8 @@ public void markBatchMessagePublished() {
         this.hasBatchMessagePublished = true;
     }
 
+    
+    
     private static final Logger log = 
LoggerFactory.getLogger(NonPersistentTopic.class);
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 62118190ff..52ed4b4532 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -38,6 +38,7 @@
 import org.apache.bookkeeper.mledger.util.Rate;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.Replicator;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.Backoff;
@@ -89,7 +90,7 @@
     private final ReplicatorStats stats = new ReplicatorStats();
 
     public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, 
String localCluster, String remoteCluster,
-            BrokerService brokerService) {
+            BrokerService brokerService) throws NamingException {
         super(topic.getName(), topic.replicatorPrefix, localCluster, 
remoteCluster, brokerService);
         this.topic = topic;
         this.cursor = cursor;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 20ae52726e..9477486c5e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -193,7 +193,7 @@ public void reset() {
         }
     }
 
-    public PersistentTopic(String topic, ManagedLedger ledger, BrokerService 
brokerService) {
+    public PersistentTopic(String topic, ManagedLedger ledger, BrokerService 
brokerService) throws NamingException {
         this.topic = topic;
         this.ledger = ledger;
         this.brokerService = brokerService;
@@ -212,8 +212,11 @@ public PersistentTopic(String topic, ManagedLedger ledger, 
BrokerService brokerS
             if (cursor.getName().startsWith(replicatorPrefix)) {
                 String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
                 String remoteCluster = 
PersistentReplicator.getRemoteCluster(cursor.getName());
-                replicators.put(remoteCluster,
-                        new PersistentReplicator(this, cursor, localCluster, 
remoteCluster, brokerService));
+                boolean isReplicatorStarted = 
addReplicationCluster(remoteCluster, this, cursor, localCluster);
+                if (!isReplicatorStarted) {
+                    throw new NamingException(
+                            PersistentTopic.this.getName() + " Failed to start 
replicator " + remoteCluster);
+                }
             } else if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)) {
                 // This is not a regular subscription, we are going to ignore 
it for now and let the message dedup logic
                 // to take care of it
@@ -882,9 +885,13 @@ public void checkMessageDeduplicationInfo() {
             @Override
             public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                 String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
-                replicators.computeIfAbsent(remoteCluster, r -> new 
PersistentReplicator(PersistentTopic.this, cursor, localCluster,
-                        remoteCluster, brokerService));
-                future.complete(null);
+                boolean isReplicatorStarted = 
addReplicationCluster(remoteCluster, PersistentTopic.this, cursor, 
localCluster);
+                if (isReplicatorStarted) {
+                    future.complete(null);    
+                } else {
+                    future.completeExceptionally(new NamingException(
+                            PersistentTopic.this.getName() + " Failed to start 
replicator " + remoteCluster));
+                }
             }
 
             @Override
@@ -897,6 +904,26 @@ public void openCursorFailed(ManagedLedgerException 
exception, Object ctx) {
         return future;
     }
 
+    protected boolean addReplicationCluster(String remoteCluster, 
PersistentTopic persistentTopic, ManagedCursor cursor,
+            String localCluster) {
+        AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
+        replicators.computeIfAbsent(remoteCluster, r -> {
+            try {
+                return new PersistentReplicator(PersistentTopic.this, cursor, 
localCluster, remoteCluster,
+                        brokerService);
+            } catch (NamingException e) {
+                isReplicatorStarted.set(false);
+                log.error("[{}] Replicator startup failed due to 
partitioned-topic {}", topic, remoteCluster);
+            }
+            return null;
+        });
+        // clean up replicator if startup is failed
+        if (!isReplicatorStarted.get()) {
+            replicators.remove(remoteCluster);
+        }
+        return isReplicatorStarted.get();
+    }
+
     CompletableFuture<Void> removeReplicator(String remoteCluster) {
         log.info("[{}] Removing replicator to {}", topic, remoteCluster);
         final CompletableFuture<Void> future = new CompletableFuture<>();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index cbef1d0548..922f6ee67e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -48,6 +48,7 @@
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.namespace.OwnedBundle;
 import org.apache.pulsar.broker.namespace.OwnershipCache;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.checksum.utils.Crc32cChecksum;
@@ -74,6 +75,8 @@
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 import org.testng.collections.Lists;
 
@@ -84,6 +87,14 @@
  */
 public class ReplicatorTest extends ReplicatorTestBase {
 
+    protected String methodName;
+
+    @BeforeMethod
+    public void beforeMethod(Method m) throws Exception {
+        methodName = m.getName();
+    }
+
+    
     @Override
     @BeforeClass(timeOut = 30000)
     void setup() throws Exception {
@@ -102,6 +113,11 @@ void shutdown() throws Exception {
         });
     }
 
+    @DataProvider(name = "partitionedTopic")
+    public Object[][] partitionedTopicProvider() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+    
     @Test(enabled = true, timeOut = 30000)
     public void testConfigChange() throws Exception {
         log.info("--- Starting ReplicatorTest::testConfigChange ---");
@@ -856,6 +872,62 @@ public void verifyChecksumAfterReplication() throws 
Exception {
         reader2.closeAsync().get();
     }
 
+    /**
+     * It verifies that broker should not start replicator for 
partitioned-topic (topic without -partition postfix)
+     * 
+     * @param isPartitionedTopic
+     * @throws Exception
+     */
+    @Test(dataProvider = "partitionedTopic")
+    public void testReplicatorOnPartitionedTopic(boolean isPartitionedTopic) 
throws Exception {
+
+        log.info("--- Starting ReplicatorTest::{} --- ", methodName);
+
+        final String namespace = "pulsar/global/partitionedNs-" + 
isPartitionedTopic;
+        final String persistentTopicName = "persistent://" + namespace + 
"/partTopic-" + isPartitionedTopic;
+        final String nonPersistentTopicName = "non-persistent://" + namespace 
+ "/partTopic-" + isPartitionedTopic;
+        BrokerService brokerService = pulsar1.getBrokerService();
+
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Lists.newArrayList("r1", "r2", "r3"));
+
+        if (isPartitionedTopic) {
+            
admin1.persistentTopics().createPartitionedTopic(persistentTopicName, 5);
+            
admin1.nonPersistentTopics().createPartitionedTopic(nonPersistentTopicName, 5);
+        }
+
+        // load namespace with dummy topic on ns
+        PulsarClient client = PulsarClient.create(url1.toString());
+        client.createProducer("persistent://" + namespace + "/dummyTopic");
+
+        // persistent topic test
+        try {
+            brokerService.getTopic(persistentTopicName).get();
+            if (isPartitionedTopic) {
+                fail("Topic creation fails with partitioned topic as 
replicator init fails");
+            }
+        } catch (Exception e) {
+            if (!isPartitionedTopic) {
+                fail("Topic creation should not fail without any partitioned 
topic");
+            }
+            assertTrue(e.getCause() instanceof NamingException);
+        }
+
+        // non-persistent topic test
+        try {
+            brokerService.getTopic(nonPersistentTopicName).get();
+            if (isPartitionedTopic) {
+                fail("Topic creation fails with partitioned topic as 
replicator init fails");
+            }
+        } catch (Exception e) {
+            if (!isPartitionedTopic) {
+                fail("Topic creation should not fail without any partitioned 
topic");
+            }
+            assertTrue(e.getCause() instanceof NamingException);
+        }
+
+    }
+    
     private static final Logger log = 
LoggerFactory.getLogger(ReplicatorTest.class);
 
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 344ad23a0f..4320428b3b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -132,6 +132,7 @@ void setup() throws Exception {
         config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
         config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
         
config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+        config1.setDefaultNumberOfNamespaceBundles(1);
         pulsar1 = new PulsarService(config1);
         pulsar1.start();
         ns1 = pulsar1.getBrokerService();
@@ -165,6 +166,7 @@ void setup() throws Exception {
         config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
         config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
         
config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+        config2.setDefaultNumberOfNamespaceBundles(1);
         pulsar2 = new PulsarService(config2);
         pulsar2.start();
         ns2 = pulsar2.getBrokerService();
@@ -197,6 +199,7 @@ void setup() throws Exception {
         config3.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
         config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
         config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config3.setDefaultNumberOfNamespaceBundles(1);
         pulsar3 = new PulsarService(config3);
         pulsar3.start();
         ns3 = pulsar3.getBrokerService();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to