This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new d15cd7b handle subscription-already-exist exception on partitioned-topic for create-sub admin-api (#2269) d15cd7b is described below commit d15cd7bc850cb85f8e3038dbb62c98429e542787 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Tue Jul 31 11:33:02 2018 -0700 handle subscription-already-exist exception on partitioned-topic for create-sub admin-api (#2269) --- .../broker/admin/impl/PersistentTopicsBase.java | 35 +++++++++++++++++----- .../broker/admin/CreateSubscriptionTest.java | 26 ++++++++++++++++ 2 files changed, 54 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index cf8355f..f85c617 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -38,9 +38,11 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import javax.ws.rs.WebApplicationException; @@ -829,15 +831,34 @@ public class PersistentTopicsBase extends AdminResource { try { if (partitionMetadata.partitions > 0) { // Create the subscription on each partition - List<CompletableFuture<Void>> futures = Lists.newArrayList(); PulsarAdmin admin = pulsar().getAdminClient(); + CountDownLatch latch = new CountDownLatch(partitionMetadata.partitions); + AtomicReference<Throwable> exception = new AtomicReference<>(); + AtomicInteger failureCount = new AtomicInteger(0); + for (int i = 0; i < partitionMetadata.partitions; i++) { - futures.add(admin.topics().createSubscriptionAsync(topicName.getPartition(i).toString(), - subscriptionName, messageId)); + admin.persistentTopics() + .createSubscriptionAsync(topicName.getPartition(i).toString(), subscriptionName, messageId) + .handle((result, ex) -> { + if (ex != null) { + int c = failureCount.incrementAndGet(); + // fail the operation on unknown exception or if all the partitioned failed due to + // subscription-already-exist + if (c == partitionMetadata.partitions + || !(ex instanceof PulsarAdminException.ConflictException)) { + exception.set(ex); + } + } + latch.countDown(); + return null; + }); } - FutureUtil.waitForAll(futures).join(); + latch.await(); + if (exception.get() != null) { + throw exception.get(); + } } else { validateAdminOperationOnTopic(authoritative); @@ -850,10 +871,10 @@ public class PersistentTopicsBase extends AdminResource { PersistentSubscription subscription = (PersistentSubscription) topic .createSubscription(subscriptionName, InitialPosition.Latest).get(); subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get(); - log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), - topicName, subscriptionName, messageId); + log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), topicName, + subscriptionName, messageId); } - } catch (Exception e) { + } catch (Throwable e) { Throwable t = e.getCause(); log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName, subscriptionName, messageId, e); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java index 0cf2c5c..f5d8bd0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java @@ -103,4 +103,30 @@ public class CreateSubscriptionTest extends ProducerConsumerBase { Lists.newArrayList("sub-1")); } } + + @Test + public void createSubscriptionOnPartitionedTopicWithPartialFailure() throws Exception { + String topic = "persistent://my-property/my-ns/my-partitioned-topic"; + admin.topics().createPartitionedTopic(topic, 10); + + // create subscription for one partition + final String partitionedTopic0 = topic+"-partition-0"; + admin.topics().createSubscription(partitionedTopic0, "sub-1", MessageId.latest); + + admin.topics().createSubscription(topic, "sub-1", MessageId.latest); + + // Create should fail if the subscription already exists + try { + admin.topics().createSubscription(topic, "sub-1", MessageId.latest); + fail("Should have failed"); + } catch (Exception e) { + // Expected + } + + for (int i = 0; i < 10; i++) { + assertEquals( + admin.topics().getSubscriptions(TopicName.get(topic).getPartition(i).toString()), + Lists.newArrayList("sub-1")); + } + } }