mgodave closed pull request #1355: Remove as many Thread.sleep calls from REST 
endpoint
URL: https://github.com/apache/incubator-pulsar/pull/1355
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1e7764083..9c65fc883 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -20,30 +20,29 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-
 import javax.servlet.ServletContext;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
-
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -66,9 +65,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-
 public abstract class AdminResource extends PulsarWebResource {
     private static final Logger log = 
LoggerFactory.getLogger(AdminResource.class);
     private static final String POLICIES_READONLY_FLAG_PATH = 
"/admin/flags/policies-readonly";
@@ -102,6 +98,21 @@ protected void zkCreateOptimistic(String path, byte[] 
content) throws Exception
         ZkUtils.createFullPathOptimistic(globalZk(), path, content, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     }
 
+    protected CompletableFuture<Void> zkAsyncCreateOptimistic(String path, 
byte[] content) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ZkUtils.asyncCreateFullPathOptimistic(globalZk(), path, content, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
+            (rc, path1, ctx, name) -> {
+                KeeperException.Code code = KeeperException.Code.get(rc);
+                if (code != KeeperException.Code.OK) {
+                    KeeperException e = KeeperException.create(code);
+                    future.completeExceptionally(e);
+                } else {
+                    future.complete(null);
+                }
+            }, null);
+        return future;
+    }
+
     /**
      * Get the domain of the topic (whether it's persistent or non-persistent)
      */
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9386df5ae..5dc142595 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -329,7 +329,7 @@ protected void internalRevokePermissionsOnTopic(String 
role) {
         }
     }
 
-    protected void internalCreatePartitionedTopic(int numPartitions, boolean 
authoritative) {
+    protected CompletableFuture<Void> internalCreatePartitionedTopic(int 
numPartitions, boolean authoritative) {
         validateAdminAccessOnProperty(topicName.getProperty());
         if (numPartitions <= 1) {
             throw new RestException(Status.NOT_ACCEPTABLE, "Number of 
partitions should be more than 1");
@@ -338,16 +338,18 @@ protected void internalCreatePartitionedTopic(int 
numPartitions, boolean authori
             String path = path(PARTITIONED_TOPIC_PATH_ZNODE, 
namespaceName.toString(), domain(),
                     topicName.getEncodedLocalName());
             byte[] data = jsonMapper().writeValueAsBytes(new 
PartitionedTopicMetadata(numPartitions));
-            zkCreateOptimistic(path, data);
-            // we wait for the data to be synced in all quorums and the 
observers
-            Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
-            log.info("[{}] Successfully created partitioned topic {}", 
clientAppId(), topicName);
-        } catch (KeeperException.NodeExistsException e) {
-            log.warn("[{}] Failed to create already existing partitioned topic 
{}", clientAppId(), topicName);
-            throw new RestException(Status.CONFLICT, "Partitioned topic 
already exist");
+            return zkAsyncCreateOptimistic(path, data).handle((ignore, e) -> {
+                if (null == e) {
+                    log.info("[{}] Successfully created partitioned topic {}", 
clientAppId(), topicName);
+                    return null;
+                } else {
+                    log.warn("[{}] Failed to create already existing 
partitioned topic {}", clientAppId(), topicName);
+                    throw new RestException(Status.CONFLICT, "Partitioned 
topic already exist");
+                }
+            });
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
-            throw new RestException(e);
+            return FutureUtil.failedFuture(new RestException(e));
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 0f9ffe144..60565fff7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -28,7 +28,9 @@
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 
 import com.google.common.collect.Lists;
@@ -116,26 +118,27 @@ public PersistentTopicInternalStats 
getInternalStats(@PathParam("property") Stri
             @ApiResponse(code = 409, message = "Partitioned topic already 
exist") })
     public void createPartitionedTopic(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") 
@Encoded String encodedTopic,
-            int numPartitions, @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative) {
+            int numPartitions, @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative, AsyncResponse response) {
         validateTopicName(property, cluster, namespace, encodedTopic);
         validateAdminAccessOnProperty(topicName.getProperty());
         if (numPartitions <= 1) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of 
partitions should be more than 1");
+            response.resume(new RestException(Status.NOT_ACCEPTABLE, "Number 
of partitions should be more than 1"));
         }
         try {
             String path = path(PARTITIONED_TOPIC_PATH_ZNODE, 
namespaceName.toString(), domain(),
                     topicName.getEncodedLocalName());
             byte[] data = jsonMapper().writeValueAsBytes(new 
PartitionedTopicMetadata(numPartitions));
-            zkCreateOptimistic(path, data);
-            // we wait for the data to be synced in all quorums and the 
observers
-            Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
-            log.info("[{}] Successfully created partitioned topic {}", 
clientAppId(), topicName);
-        } catch (KeeperException.NodeExistsException e) {
-            log.warn("[{}] Failed to create already existing partitioned topic 
{}", clientAppId(), topicName);
-            throw new RestException(Status.CONFLICT, "Partitioned topic 
already exist");
+            zkAsyncCreateOptimistic(path, data).thenAccept(ignore -> {
+                log.info("[{}] Successfully created partitioned topic {}", 
clientAppId(), topicName);
+                response.resume(Response.ok());
+            }).exceptionally(e -> {
+                log.warn("[{}] Failed to create already existing partitioned 
topic {}", clientAppId(), topicName);
+                response.resume(new RestException(Status.CONFLICT, 
"Partitioned topic already exist"));
+                return null;
+            });
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
-            throw new RestException(e);
+            response.resume(new RestException(e));
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 8e105424c..5f2249f4a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -130,9 +130,16 @@ public void 
revokePermissionsOnTopic(@PathParam("property") String property,
             @ApiResponse(code = 409, message = "Partitioned topic already 
exist") })
     public void createPartitionedTopic(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") 
@Encoded String encodedTopic,
-            int numPartitions, @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative) {
+            int numPartitions, @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative, AsyncResponse response) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalCreatePartitionedTopic(numPartitions, authoritative);
+        internalCreatePartitionedTopic(numPartitions, 
authoritative).handle((ignore, e) -> {
+            if (e != null) {
+                response.resume(e);
+            } else {
+                response.resume(Response.noContent());
+            }
+            return null;
+        });
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 493030b7f..084ed1263 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -29,7 +29,9 @@
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 
 import org.apache.pulsar.broker.service.Topic;
@@ -39,7 +41,6 @@
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
-import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,26 +104,27 @@ public PersistentTopicInternalStats 
getInternalStats(@PathParam("property") Stri
             @ApiResponse(code = 409, message = "Partitioned topic already 
exist") })
     public void createPartitionedTopic(@PathParam("property") String property, 
@PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic, int 
numPartitions,
-            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative, AsyncResponse response) {
         validateTopicName(property, namespace, encodedTopic);
         validateAdminAccessOnProperty(topicName.getProperty());
         if (numPartitions <= 1) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of 
partitions should be more than 1");
+            response.resume(new RestException(Status.NOT_ACCEPTABLE, "Number 
of partitions should be more than 1"));
         }
         try {
             String path = path(PARTITIONED_TOPIC_PATH_ZNODE, 
namespaceName.toString(), domain(),
                     topicName.getEncodedLocalName());
             byte[] data = jsonMapper().writeValueAsBytes(new 
PartitionedTopicMetadata(numPartitions));
-            zkCreateOptimistic(path, data);
-            // we wait for the data to be synced in all quorums and the 
observers
-            Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
-            log.info("[{}] Successfully created partitioned topic {}", 
clientAppId(), topicName);
-        } catch (KeeperException.NodeExistsException e) {
-            log.warn("[{}] Failed to create already existing partitioned topic 
{}", clientAppId(), topicName);
-            throw new RestException(Status.CONFLICT, "Partitioned topic 
already exist");
+            zkAsyncCreateOptimistic(path, data).thenAccept(ignore -> {
+                response.resume(Response.ok());
+                log.info("[{}] Successfully created partitioned topic {}", 
clientAppId(), topicName);
+            }).exceptionally(e -> {
+                log.warn("[{}] Failed to create already existing partitioned 
topic {}", clientAppId(), topicName);
+                response.resume(new RestException(Status.CONFLICT, 
"Partitioned topic already exist"));
+                return null;
+            });
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
-            throw new RestException(e);
+            response.resume(new RestException(e));
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 0f89bfd82..055347e1f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -126,9 +126,16 @@ public void 
revokePermissionsOnTopic(@PathParam("property") String property,
             @ApiResponse(code = 409, message = "Partitioned topic already 
exist") })
     public void createPartitionedTopic(@PathParam("property") String property, 
@PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic, int 
numPartitions,
-            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative, AsyncResponse response) {
         validateTopicName(property, namespace, encodedTopic);
-        internalCreatePartitionedTopic(numPartitions, authoritative);
+        internalCreatePartitionedTopic(numPartitions, 
authoritative).handle((ignore, e) -> {
+            if (e != null) {
+                response.resume(e);
+            } else {
+                response.resume(Response.ok());
+            }
+            return null;
+        });
     }
 
     /**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to