This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 4db7f38  Fix NPE when splitting and unloading bundle (#2348)
4db7f38 is described below

commit 4db7f38cfaf96c5a2f524a780b854b8df1878b57
Author: massakam <massa...@yahoo-corp.jp>
AuthorDate: Fri Aug 10 13:38:52 2018 +0900

    Fix NPE when splitting and unloading bundle (#2348)
---
 .../pulsar/broker/namespace/NamespaceService.java  |  6 ++--
 .../broker/namespace/NamespaceServiceTest.java     | 42 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index bf44519..d9a1ccf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -593,7 +593,7 @@ public class NamespaceService {
                                        boolean unload,
                                        AtomicInteger counter,
                                        CompletableFuture<Void> unloadFuture) {
-        CompletableFuture<NamespaceBundles> updateFuture = new 
CompletableFuture<>();
+        CompletableFuture<List<NamespaceBundle>> updateFuture = new 
CompletableFuture<>();
 
         final Pair<NamespaceBundles, List<NamespaceBundle>> splittedBundles = 
bundleFactory.splitBundles(bundle,
             2 /* by default split into 2 */);
@@ -622,7 +622,7 @@ public class NamespaceService {
                             // namespace bundle
                             
bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
 
-                            updateFuture.complete(splittedBundles.getLeft());
+                            updateFuture.complete(splittedBundles.getRight());
                         } else if (rc == Code.BADVERSION.intValue()) {
                             KeeperException keeperException = 
KeeperException.create(KeeperException.Code.get(rc));
                             String msg = format("failed to update namespace 
policies [%s], NamespaceBundle: %s " +
@@ -680,7 +680,7 @@ public class NamespaceService {
 
                 if (unload) {
                     // unload new split bundles
-                    r.getBundles().forEach(splitBundle -> {
+                    r.forEach(splitBundle -> {
                         try {
                             unloadNamespaceBundle(splitBundle);
                         } catch (Exception e) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index a2ef397..2c51e16 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -429,6 +429,48 @@ public class NamespaceServiceTest extends BrokerTestBase {
 
     }
 
+    @Test
+    public void testRemoveOwnershipAndSplitBundle() throws Exception {
+        OwnershipCache ownershipCache = 
spy(pulsar.getNamespaceService().getOwnershipCache());
+        
doNothing().when(ownershipCache).disableOwnership(any(NamespaceBundle.class));
+
+        Field ownership = 
NamespaceService.class.getDeclaredField("ownershipCache");
+        ownership.setAccessible(true);
+        ownership.set(pulsar.getNamespaceService(), ownershipCache);
+
+        NamespaceService namespaceService = pulsar.getNamespaceService();
+        NamespaceName nsname = NamespaceName.get("pulsar/global/ns1");
+        TopicName topicName = 
TopicName.get("persistent://pulsar/global/ns1/topic-1");
+        NamespaceBundles bundles = 
namespaceService.getNamespaceBundleFactory().getBundles(nsname);
+        NamespaceBundle originalBundle = bundles.findBundle(topicName);
+
+        CompletableFuture<Void> result1 = 
namespaceService.splitAndOwnBundle(originalBundle, false);
+        try {
+            result1.get();
+        } catch (Exception e) {
+            fail("split bundle faild", e);
+        }
+
+        NamespaceBundles updatedNsBundles = 
namespaceService.getNamespaceBundleFactory().getBundles(nsname);
+        assertNotNull(updatedNsBundles);
+        NamespaceBundle splittedBundle = 
updatedNsBundles.findBundle(topicName);
+
+        updatedNsBundles.getBundles().stream().filter(bundle -> 
!bundle.equals(splittedBundle)).forEach(bundle -> {
+            try {
+                ownershipCache.removeOwnership(bundle).get();
+            } catch (Exception e) {
+                fail("faild to remove ownership", e);
+            }
+        });
+
+        CompletableFuture<Void> result2 = 
namespaceService.splitAndOwnBundle(splittedBundle, true);
+        try {
+            result2.get();
+        } catch (Exception e) {
+            // make sure: NPE does not occur
+            fail("split bundle faild", e);
+        }
+    }
 
     @SuppressWarnings("unchecked")
     private Pair<NamespaceBundles, List<NamespaceBundle>> 
splitBundles(NamespaceBundleFactory utilityFactory,

Reply via email to