This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 499ffd7  Ensure that the input topic exists before doing trigger 
(#3130)
499ffd7 is described below

commit 499ffd7405f2e0d22327d6b03e285d0fdbea94bf
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Thu Dec 6 07:28:45 2018 -0800

    Ensure that the input topic exists before doing trigger (#3130)
---
 .../org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java  | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 8f13dc8..3bd5a01 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -1041,6 +1041,12 @@ public abstract class ComponentImpl {
 
             return Response.status(Status.BAD_REQUEST).build();
         }
+        try {
+            
worker().getBrokerAdmin().topics().getSubscriptions(inputTopicToWrite);
+        } catch (PulsarAdminException e) {
+            log.error("Function in trigger function is not ready @ /{}/{}/{}", 
tenant, namespace, functionName);
+            return Response.status(Status.BAD_REQUEST).build();
+        }
         String outputTopic = 
functionMetaData.getFunctionDetails().getSink().getTopic();
         Reader<byte[]> reader = null;
         Producer<byte[]> producer = null;

Reply via email to