This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 950ff441da2 [improve][broker] PIP-192: Write the child ownership to 
`ServiceUnitStateChannel` instead of ZK when handling bundle split (#18858)
950ff441da2 is described below

commit 950ff441da28e144bdfb71c317a9bc339d4f05b7
Author: Kai Wang <kw...@apache.org>
AuthorDate: Mon Feb 13 19:30:36 2023 +0800

    [improve][broker] PIP-192: Write the child ownership to 
`ServiceUnitStateChannel` instead of ZK when handling bundle split (#18858)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  16 +--
 .../channel/ServiceUnitStateChannelImpl.java       | 133 +++++++++++++++++----
 .../pulsar/broker/namespace/NamespaceService.java  |  74 +++++++++---
 .../channel/ServiceUnitStateChannelTest.java       |  53 +++++++-
 4 files changed, 221 insertions(+), 55 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index d5cf6a3e74d..5446060ac65 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1028,7 +1028,9 @@ public abstract class NamespacesBase extends 
AdminResource {
                                     
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, 
bundleRange,
                                         authoritative, false))
                             .thenCompose(nsBundle -> 
pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
-                                    
getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries));
+                                    pulsar().getNamespaceService()
+                                            
.getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName),
+                                    splitBoundaries));
                 });
     }
 
@@ -1109,18 +1111,6 @@ public abstract class NamespacesBase extends 
AdminResource {
                 .getBundleWithHighestThroughputAsync(namespaceName);
     }
 
-    private NamespaceBundleSplitAlgorithm 
getNamespaceBundleSplitAlgorithmByName(String algorithmName) {
-        NamespaceBundleSplitAlgorithm algorithm = 
NamespaceBundleSplitAlgorithm.of(algorithmName);
-        if (algorithm == null) {
-            algorithm = NamespaceBundleSplitAlgorithm.of(
-                    
pulsar().getConfig().getDefaultNamespaceBundleSplitAlgorithm());
-        }
-        if (algorithm == null) {
-            algorithm = 
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO;
-        }
-        return algorithm;
-    }
-
     protected void internalSetPublishRate(PublishRate maxPublishMessageRate) {
         validateSuperUserAccess();
         log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), 
namespaceName, maxPublishMessageRate);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index d5bcd3e1436..d10138bda68 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions.channel;
 
+import static java.lang.String.format;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
@@ -35,7 +37,9 @@ import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable;
 import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
 import static 
org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished;
+import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -48,6 +52,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
@@ -60,18 +65,23 @@ import 
org.apache.pulsar.broker.loadbalance.LeaderElectionService;
 import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
 import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TableView;
 import org.apache.pulsar.common.naming.NamespaceBundle;
-import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
+import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
@@ -523,8 +533,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
     private void handleSplitEvent(String serviceUnit, ServiceUnitStateData 
data) {
         if (isTargetBroker(data.broker())) {
-            splitServiceUnit(serviceUnit)
-                    .thenCompose(__ -> tombstoneAsync(serviceUnit))
+            splitServiceUnit(serviceUnit, data)
                     .whenComplete((__, e) -> log(e, serviceUnit, data, null));
         }
     }
@@ -625,25 +634,107 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 });
     }
 
-    private CompletableFuture<Void> splitServiceUnit(String serviceUnit) {
-        // TODO: after the split we need to write the child ownerships to BSC 
instead of ZK.
+    private CompletableFuture<Void> splitServiceUnit(String serviceUnit, 
ServiceUnitStateData data) {
+        // Write the child ownerships to BSC.
         long startTime = System.nanoTime();
-        return pulsar.getNamespaceService()
-                .splitAndOwnBundle(getNamespaceBundle(serviceUnit),
-                        false,
-                        
NamespaceBundleSplitAlgorithm.of(pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm()),
-                        null)
-                .whenComplete((__, ex) -> {
-                    double splitBundleTime = TimeUnit.NANOSECONDS
-                            .toMillis((System.nanoTime() - startTime));
-                    if (ex == null) {
-                        log.info("Successfully split {} namespace-bundle in {} 
ms",
-                                serviceUnit, splitBundleTime);
-                    } else {
-                        log.error("Failed to split {} namespace-bundle in {} 
ms",
-                                serviceUnit, splitBundleTime, ex);
-                    }
-                });
+        NamespaceService namespaceService = pulsar.getNamespaceService();
+        NamespaceBundleFactory bundleFactory = 
namespaceService.getNamespaceBundleFactory();
+        NamespaceBundle bundle = getNamespaceBundle(serviceUnit);
+        CompletableFuture<Void> completionFuture = new CompletableFuture<>();
+        final AtomicInteger counter = new AtomicInteger(0);
+        this.splitServiceUnitOnceAndRetry(namespaceService, bundleFactory, 
bundle, serviceUnit, data,
+                counter, startTime, completionFuture);
+        return completionFuture;
+    }
+
+    @VisibleForTesting
+    protected void splitServiceUnitOnceAndRetry(NamespaceService 
namespaceService,
+                                                NamespaceBundleFactory 
bundleFactory,
+                                                NamespaceBundle bundle,
+                                                String serviceUnit,
+                                                ServiceUnitStateData data,
+                                                AtomicInteger counter,
+                                                long startTime,
+                                                CompletableFuture<Void> 
completionFuture) {
+        CompletableFuture<List<NamespaceBundle>> updateFuture = new 
CompletableFuture<>();
+
+        pulsar.getNamespaceService().getSplitBoundary(bundle, 
null).thenAccept(splitBundlesPair -> {
+            // Split and updateNamespaceBundles. Update may fail because of 
concurrent write to Zookeeper.
+            if (splitBundlesPair == null) {
+                String msg = format("Bundle %s not found under namespace", 
serviceUnit);
+                updateFuture.completeExceptionally(new 
BrokerServiceException.ServiceUnitNotReadyException(msg));
+                return;
+            }
+            ServiceUnitStateData next = new ServiceUnitStateData(Owned, 
data.broker());
+            NamespaceBundles targetNsBundle = splitBundlesPair.getLeft();
+            List<NamespaceBundle> splitBundles = 
Collections.unmodifiableList(splitBundlesPair.getRight());
+            List<NamespaceBundle> successPublishedBundles =
+                    Collections.synchronizedList(new 
ArrayList<>(splitBundles.size()));
+            List<CompletableFuture<Void>> futures = new 
ArrayList<>(splitBundles.size());
+            for (NamespaceBundle sBundle : splitBundles) {
+                futures.add(pubAsync(sBundle.toString(), next).thenAccept(__ 
-> successPublishedBundles.add(sBundle)));
+            }
+            NamespaceName nsname = bundle.getNamespaceObject();
+            FutureUtil.waitForAll(futures)
+                    .thenCompose(__ -> 
namespaceService.updateNamespaceBundles(nsname, targetNsBundle))
+                    .thenCompose(__ -> 
namespaceService.updateNamespaceBundlesForPolicies(nsname, targetNsBundle))
+                    .thenRun(() -> {
+                        
bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
+                        updateFuture.complete(splitBundles);
+                    }).exceptionally(e -> {
+                        // Clean the new bundle when has exception.
+                        List<CompletableFuture<Void>> futureList = new 
ArrayList<>();
+                        for (NamespaceBundle sBundle : 
successPublishedBundles) {
+                            
futureList.add(tombstoneAsync(sBundle.toString()).thenAccept(__ -> {}));
+                        }
+                        FutureUtil.waitForAll(futureList)
+                                .whenComplete((__, ex) -> {
+                                    if (ex != null) {
+                                        log.warn("Clean new bundles failed,", 
ex);
+                                    }
+                                    updateFuture.completeExceptionally(e);
+                                });
+                        return null;
+                    });
+        }).exceptionally(e -> {
+            updateFuture.completeExceptionally(e);
+            return null;
+        });
+
+        updateFuture.thenAccept(r -> {
+            // Free the old bundle
+            tombstoneAsync(serviceUnit).thenRun(() -> {
+                // Update bundled_topic cache for load-report-generation
+                pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
+                // TODO: Update the load data immediately if needed.
+                completionFuture.complete(null);
+                double splitBundleTime = 
TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime));
+                log.info("Successfully split {} parent namespace-bundle to {} 
in {} ms", serviceUnit, r,
+                        splitBundleTime);
+            }).exceptionally(e -> {
+                double splitBundleTime = 
TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime));
+                String msg = format("Failed to free bundle %s in %s ms, under 
namespace [%s] with error %s",
+                        bundle.getNamespaceObject().toString(), 
splitBundleTime, bundle, e.getMessage());
+                completionFuture.completeExceptionally(new 
BrokerServiceException.ServiceUnitNotReadyException(msg));
+                return null;
+            });
+        }).exceptionally(ex -> {
+            // Retry several times on BadVersion
+            Throwable throwable = FutureUtil.unwrapCompletionException(ex);
+            if ((throwable instanceof 
MetadataStoreException.BadVersionException)
+                    && (counter.incrementAndGet() < 
NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT)) {
+                pulsar.getExecutor().schedule(() -> 
splitServiceUnitOnceAndRetry(namespaceService, bundleFactory,
+                                bundle, serviceUnit, data, counter, startTime, 
completionFuture), 100, MILLISECONDS);
+            } else if (throwable instanceof IllegalArgumentException) {
+                completionFuture.completeExceptionally(throwable);
+            } else {
+                // Retry enough, or meet other exception
+                String msg = format("Bundle: %s not success update nsBundles, 
counter %d, reason %s",
+                        bundle.toString(), counter.get(), 
throwable.getMessage());
+                completionFuture.completeExceptionally(new 
BrokerServiceException.ServiceUnitNotReadyException(msg));
+            }
+            return null;
+        });
     }
 
     public void handleMetadataSessionEvent(SessionEvent e) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index abbabcd3b00..245c3f896af 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -124,7 +124,7 @@ public class NamespaceService implements AutoCloseable {
     private final NamespaceBundleFactory bundleFactory;
     private final String host;
 
-    private static final int BUNDLE_SPLIT_RETRY_LIMIT = 7;
+    public static final int BUNDLE_SPLIT_RETRY_LIMIT = 7;
     public static final String SLA_NAMESPACE_PROPERTY = "sla-monitor";
     public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = 
Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)");
     public static final Pattern HEARTBEAT_NAMESPACE_PATTERN_V2 = 
Pattern.compile("pulsar/([^:]+:\\d+)");
@@ -828,18 +828,7 @@ public class NamespaceService implements AutoCloseable {
                                        CompletableFuture<Void> 
completionFuture,
                                        NamespaceBundleSplitAlgorithm 
splitAlgorithm,
                                        List<Long> boundaries) {
-        BundleSplitOption bundleSplitOption;
-        if (config.getDefaultNamespaceBundleSplitAlgorithm()
-                  
.equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) {
-            Map<String, TopicStatsImpl> topicStatsMap =  
pulsar.getBrokerService().getTopicStats(bundle);
-            bundleSplitOption = new 
FlowOrQpsEquallyDivideBundleSplitOption(this, bundle, boundaries,
-                    topicStatsMap,
-                    config.getLoadBalancerNamespaceBundleMaxMsgRate(),
-                    config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(),
-                    config.getFlowOrQpsDifferenceThresholdPercentage());
-        } else {
-            bundleSplitOption = new BundleSplitOption(this, bundle, 
boundaries);
-        }
+        BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, 
boundaries, config);
 
         
splitAlgorithm.getSplitBoundary(bundleSplitOption).whenComplete((splitBoundaries,
 ex) -> {
             CompletableFuture<List<NamespaceBundle>> updateFuture = new 
CompletableFuture<>();
@@ -957,6 +946,61 @@ public class NamespaceService implements AutoCloseable {
         });
     }
 
+    /**
+     * Get the split boundary's.
+     *
+     * @param bundle The bundle to split.
+     * @param boundaries The specified positions,
+     *                   use for {@link 
org.apache.pulsar.common.naming.SpecifiedPositionsBundleSplitAlgorithm}.
+     * @return A pair, left is target namespace bundle, right is split bundles.
+     */
+    public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> 
getSplitBoundary(
+            NamespaceBundle bundle, List<Long> boundaries) {
+        BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, 
boundaries, config);
+        NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm =
+                
getNamespaceBundleSplitAlgorithmByName(config.getDefaultNamespaceBundleSplitAlgorithm());
+        CompletableFuture<List<Long>> splitBoundary =
+                nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption);
+        return splitBoundary.thenCompose(splitBoundaries -> {
+                    if (splitBoundaries == null || splitBoundaries.size() == 
0) {
+                        LOG.info("[{}] No valid boundary found in {} to split 
bundle {}",
+                                bundle.getNamespaceObject().toString(), 
boundaries, bundle.getBundleRange());
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    return 
pulsar.getNamespaceService().getNamespaceBundleFactory()
+                            .splitBundles(bundle, splitBoundaries.size() + 1, 
splitBoundaries);
+                });
+    }
+
+    private BundleSplitOption getBundleSplitOption(NamespaceBundle bundle,
+                                                   List<Long> boundaries,
+                                                   ServiceConfiguration 
config) {
+        BundleSplitOption bundleSplitOption;
+        if (config.getDefaultNamespaceBundleSplitAlgorithm()
+                
.equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) {
+            Map<String, TopicStatsImpl> topicStatsMap =  
pulsar.getBrokerService().getTopicStats(bundle);
+            bundleSplitOption = new 
FlowOrQpsEquallyDivideBundleSplitOption(this, bundle, boundaries,
+                    topicStatsMap,
+                    config.getLoadBalancerNamespaceBundleMaxMsgRate(),
+                    config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(),
+                    config.getFlowOrQpsDifferenceThresholdPercentage());
+        } else {
+            bundleSplitOption = new BundleSplitOption(this, bundle, 
boundaries);
+        }
+        return bundleSplitOption;
+    }
+
+    public NamespaceBundleSplitAlgorithm 
getNamespaceBundleSplitAlgorithmByName(String algorithmName) {
+        NamespaceBundleSplitAlgorithm algorithm = 
NamespaceBundleSplitAlgorithm.of(algorithmName);
+        if (algorithm == null) {
+            algorithm = 
NamespaceBundleSplitAlgorithm.of(pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm());
+        }
+        if (algorithm == null) {
+            algorithm = 
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO;
+        }
+        return algorithm;
+    }
+
     /**
      * Update new bundle-range to admin/policies/namespace.
      * Update may fail because of concurrent write to Zookeeper.
@@ -965,7 +1009,7 @@ public class NamespaceService implements AutoCloseable {
      * @param nsBundles
      * @throws Exception
      */
-    private CompletableFuture<Void> 
updateNamespaceBundlesForPolicies(NamespaceName nsname,
+    public CompletableFuture<Void> 
updateNamespaceBundlesForPolicies(NamespaceName nsname,
                                                                       
NamespaceBundles nsBundles) {
         Objects.requireNonNull(nsname);
         Objects.requireNonNull(nsBundles);
@@ -994,7 +1038,7 @@ public class NamespaceService implements AutoCloseable {
      * @param nsBundles
      * @throws Exception
      */
-    private CompletableFuture<Void> updateNamespaceBundles(NamespaceName 
nsname, NamespaceBundles nsBundles) {
+    public CompletableFuture<Void> updateNamespaceBundles(NamespaceName 
nsname, NamespaceBundles nsBundles) {
         Objects.requireNonNull(nsname);
         Objects.requireNonNull(nsBundles);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 660999365c4..327afa3cb88 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -38,9 +38,12 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -61,6 +64,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -69,12 +73,14 @@ import 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
 import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
 import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.TableViewImpl;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.awaitility.Awaitility;
@@ -113,9 +119,9 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         pulsar1 = pulsar;
         additionalPulsarTestContext = 
createAdditionalPulsarTestContext(getDefaultConf());
         pulsar2 = additionalPulsarTestContext.getPulsarService();
-        channel1 = new ServiceUnitStateChannelImpl(pulsar1);
+        channel1 = spy(new ServiceUnitStateChannelImpl(pulsar1));
         channel1.start();
-        channel2 = new ServiceUnitStateChannelImpl(pulsar2);
+        channel2 = spy(new ServiceUnitStateChannelImpl(pulsar2));
         channel2.start();
         lookupServiceAddress1 = (String)
                 FieldUtils.readDeclaredField(channel1, "lookupServiceAddress", 
true);
@@ -480,7 +486,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test(priority = 6)
-    public void splitTest() throws Exception {
+    public void splitAndRetryTest() throws Exception {
         channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
         waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
         waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
@@ -490,17 +496,52 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(ownerAddr2, Optional.of(lookupServiceAddress1));
         assertTrue(ownerAddr1.isPresent());
 
+        NamespaceService namespaceService = spy(pulsar1.getNamespaceService());
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        int badVersionExceptionCount = 3;
+        AtomicInteger count = new AtomicInteger(badVersionExceptionCount);
+        future.completeExceptionally(new 
MetadataStoreException.BadVersionException("BadVersion"));
+        doAnswer(invocationOnMock -> {
+            if (count.decrementAndGet() > 0) {
+                return future;
+            }
+            // Call the real method
+            reset(namespaceService);
+            return future;
+        }).when(namespaceService).updateNamespaceBundles(any(), any());
+        doReturn(namespaceService).when(pulsar1).getNamespaceService();
+
         Split split = new Split(bundle, ownerAddr1.get(), new HashMap<>());
         channel1.publishSplitEventAsync(split);
 
         waitUntilNewOwner(channel1, bundle, null);
         waitUntilNewOwner(channel2, bundle, null);
 
-        // TODO: assert child bundle ownerships in the channels.
-        validateHandlerCounters(channel1, 1, 0, 1, 0, 0, 0, 1, 0, 1, 0);
-        validateHandlerCounters(channel2, 1, 0, 1, 0, 0, 0, 1, 0, 1, 0);
+        validateHandlerCounters(channel1, 1, 0, 9, 0, 0, 0, 1, 0, 7, 0);
+        validateHandlerCounters(channel2, 1, 0, 9, 0, 0, 0, 1, 0, 7, 0);
         validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
         validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
+        // Verify the retry count
+        verify(((ServiceUnitStateChannelImpl) channel1), 
times(badVersionExceptionCount + 1))
+                .splitServiceUnitOnceAndRetry(any(), any(), any(), any(), 
any(), any(), anyLong(), any());
+
+        // Assert child bundle ownerships in the channels.
+        String childBundle1 = "public/default/0x7fffffff_0xffffffff";
+        String childBundle2 = "public/default/0x00000000_0x7fffffff";
+
+        waitUntilNewOwner(channel1, childBundle1, lookupServiceAddress1);
+        waitUntilNewOwner(channel1, childBundle2, lookupServiceAddress1);
+        waitUntilNewOwner(channel2, childBundle1, lookupServiceAddress1);
+        waitUntilNewOwner(channel2, childBundle2, lookupServiceAddress1);
+        assertEquals(Optional.of(lookupServiceAddress1), 
channel1.getOwnerAsync(childBundle1).get());
+        assertEquals(Optional.of(lookupServiceAddress1), 
channel1.getOwnerAsync(childBundle2).get());
+        assertEquals(Optional.of(lookupServiceAddress1), 
channel2.getOwnerAsync(childBundle1).get());
+        assertEquals(Optional.of(lookupServiceAddress1), 
channel2.getOwnerAsync(childBundle2).get());
+
+        cleanTableView(channel1, childBundle1);
+        cleanTableView(channel2, childBundle1);
+        cleanTableView(channel1, childBundle2);
+        cleanTableView(channel2, childBundle2);
     }
 
     @Test(priority = 7)

Reply via email to