This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new d15cd7b  handle subscription-already-exist exception on 
partitioned-topic for create-sub admin-api (#2269)
d15cd7b is described below

commit d15cd7bc850cb85f8e3038dbb62c98429e542787
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Tue Jul 31 11:33:02 2018 -0700

    handle subscription-already-exist exception on partitioned-topic for 
create-sub admin-api (#2269)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 35 +++++++++++++++++-----
 .../broker/admin/CreateSubscriptionTest.java       | 26 ++++++++++++++++
 2 files changed, 54 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index cf8355f..f85c617 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -38,9 +38,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import javax.ws.rs.WebApplicationException;
@@ -829,15 +831,34 @@ public class PersistentTopicsBase extends AdminResource {
         try {
             if (partitionMetadata.partitions > 0) {
                 // Create the subscription on each partition
-                List<CompletableFuture<Void>> futures = Lists.newArrayList();
                 PulsarAdmin admin = pulsar().getAdminClient();
 
+                CountDownLatch latch = new 
CountDownLatch(partitionMetadata.partitions);
+                AtomicReference<Throwable> exception = new AtomicReference<>();
+                AtomicInteger failureCount = new AtomicInteger(0);
+
                 for (int i = 0; i < partitionMetadata.partitions; i++) {
-                    
futures.add(admin.topics().createSubscriptionAsync(topicName.getPartition(i).toString(),
-                            subscriptionName, messageId));
+                    admin.persistentTopics()
+                            
.createSubscriptionAsync(topicName.getPartition(i).toString(), 
subscriptionName, messageId)
+                            .handle((result, ex) -> {
+                                if (ex != null) {
+                                    int c = failureCount.incrementAndGet();
+                                    // fail the operation on unknown exception 
or if all the partitioned failed due to
+                                    // subscription-already-exist
+                                    if (c == partitionMetadata.partitions
+                                            || !(ex instanceof 
PulsarAdminException.ConflictException)) {
+                                        exception.set(ex);
+                                    }
+                                }
+                                latch.countDown();
+                                return null;
+                            });
                 }
 
-                FutureUtil.waitForAll(futures).join();
+                latch.await();
+                if (exception.get() != null) {
+                    throw exception.get();
+                }
             } else {
                 validateAdminOperationOnTopic(authoritative);
 
@@ -850,10 +871,10 @@ public class PersistentTopicsBase extends AdminResource {
                 PersistentSubscription subscription = (PersistentSubscription) 
topic
                         .createSubscription(subscriptionName, 
InitialPosition.Latest).get();
                 
subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), 
messageId.getEntryId())).get();
-                log.info("[{}][{}] Successfully created subscription {} at 
message id {}", clientAppId(),
-                        topicName, subscriptionName, messageId);
+                log.info("[{}][{}] Successfully created subscription {} at 
message id {}", clientAppId(), topicName,
+                        subscriptionName, messageId);
             }
-        } catch (Exception e) {
+        } catch (Throwable e) {
             Throwable t = e.getCause();
             log.warn("[{}] [{}] Failed to create subscription {} at message id 
{}", clientAppId(),
                     topicName, subscriptionName, messageId, e);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
index 0cf2c5c..f5d8bd0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
@@ -103,4 +103,30 @@ public class CreateSubscriptionTest extends 
ProducerConsumerBase {
                     Lists.newArrayList("sub-1"));
         }
     }
+    
+    @Test
+    public void createSubscriptionOnPartitionedTopicWithPartialFailure() 
throws Exception {
+        String topic = "persistent://my-property/my-ns/my-partitioned-topic";
+        admin.topics().createPartitionedTopic(topic, 10);
+        
+        // create subscription for one partition
+        final String partitionedTopic0 = topic+"-partition-0";
+        admin.topics().createSubscription(partitionedTopic0, "sub-1", 
MessageId.latest);
+
+        admin.topics().createSubscription(topic, "sub-1", MessageId.latest);
+
+        // Create should fail if the subscription already exists
+        try {
+            admin.topics().createSubscription(topic, "sub-1", 
MessageId.latest);
+            fail("Should have failed");
+        } catch (Exception e) {
+            // Expected
+        }
+
+        for (int i = 0; i < 10; i++) {
+            assertEquals(
+                    
admin.topics().getSubscriptions(TopicName.get(topic).getPartition(i).toString()),
+                    Lists.newArrayList("sub-1"));
+        }
+    }
 }

Reply via email to