This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 4db7f38 Fix NPE when splitting and unloading bundle (#2348) 4db7f38 is described below commit 4db7f38cfaf96c5a2f524a780b854b8df1878b57 Author: massakam <massa...@yahoo-corp.jp> AuthorDate: Fri Aug 10 13:38:52 2018 +0900 Fix NPE when splitting and unloading bundle (#2348) --- .../pulsar/broker/namespace/NamespaceService.java | 6 ++-- .../broker/namespace/NamespaceServiceTest.java | 42 ++++++++++++++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index bf44519..d9a1ccf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -593,7 +593,7 @@ public class NamespaceService { boolean unload, AtomicInteger counter, CompletableFuture<Void> unloadFuture) { - CompletableFuture<NamespaceBundles> updateFuture = new CompletableFuture<>(); + CompletableFuture<List<NamespaceBundle>> updateFuture = new CompletableFuture<>(); final Pair<NamespaceBundles, List<NamespaceBundle>> splittedBundles = bundleFactory.splitBundles(bundle, 2 /* by default split into 2 */); @@ -622,7 +622,7 @@ public class NamespaceService { // namespace bundle bundleFactory.invalidateBundleCache(bundle.getNamespaceObject()); - updateFuture.complete(splittedBundles.getLeft()); + updateFuture.complete(splittedBundles.getRight()); } else if (rc == Code.BADVERSION.intValue()) { KeeperException keeperException = KeeperException.create(KeeperException.Code.get(rc)); String msg = format("failed to update namespace policies [%s], NamespaceBundle: %s " + @@ -680,7 +680,7 @@ public class NamespaceService { if (unload) { // unload new split bundles - r.getBundles().forEach(splitBundle -> { + r.forEach(splitBundle -> { try { unloadNamespaceBundle(splitBundle); } catch (Exception e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index a2ef397..2c51e16 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -429,6 +429,48 @@ public class NamespaceServiceTest extends BrokerTestBase { } + @Test + public void testRemoveOwnershipAndSplitBundle() throws Exception { + OwnershipCache ownershipCache = spy(pulsar.getNamespaceService().getOwnershipCache()); + doNothing().when(ownershipCache).disableOwnership(any(NamespaceBundle.class)); + + Field ownership = NamespaceService.class.getDeclaredField("ownershipCache"); + ownership.setAccessible(true); + ownership.set(pulsar.getNamespaceService(), ownershipCache); + + NamespaceService namespaceService = pulsar.getNamespaceService(); + NamespaceName nsname = NamespaceName.get("pulsar/global/ns1"); + TopicName topicName = TopicName.get("persistent://pulsar/global/ns1/topic-1"); + NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname); + NamespaceBundle originalBundle = bundles.findBundle(topicName); + + CompletableFuture<Void> result1 = namespaceService.splitAndOwnBundle(originalBundle, false); + try { + result1.get(); + } catch (Exception e) { + fail("split bundle faild", e); + } + + NamespaceBundles updatedNsBundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname); + assertNotNull(updatedNsBundles); + NamespaceBundle splittedBundle = updatedNsBundles.findBundle(topicName); + + updatedNsBundles.getBundles().stream().filter(bundle -> !bundle.equals(splittedBundle)).forEach(bundle -> { + try { + ownershipCache.removeOwnership(bundle).get(); + } catch (Exception e) { + fail("faild to remove ownership", e); + } + }); + + CompletableFuture<Void> result2 = namespaceService.splitAndOwnBundle(splittedBundle, true); + try { + result2.get(); + } catch (Exception e) { + // make sure: NPE does not occur + fail("split bundle faild", e); + } + } @SuppressWarnings("unchecked") private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,