This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5766844  fix bug in concerning ContextImpl (#2052)
5766844 is described below

commit 57668440104fb56ce932287190f5203e2073bd62
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Mon Jul 2 00:18:28 2018 -0700

    fix bug in concerning ContextImpl (#2052)
    
    NPE thrown when submitting a source because context is null
---
 .../apache/pulsar/functions/instance/ContextImpl.java    | 16 ++++++++++++----
 .../apache/pulsar/functions/instance/JavaInstance.java   |  9 +++++----
 2 files changed, 17 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 5ca07d9..e2887bf 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -18,11 +18,8 @@
  */
 package org.apache.pulsar.functions.instance;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
-import java.nio.ByteBuffer;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.commons.lang.StringUtils;
@@ -36,15 +33,17 @@ import 
org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
-import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
+import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -52,6 +51,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
+import static com.google.common.base.Preconditions.checkState;
+
 /**
  * This class implements the Context interface exposed to the user.
  */
@@ -141,6 +142,9 @@ class ContextImpl implements Context {
 
     @Override
     public Collection<String> getInputTopics() {
+        if (inputConsumer == null) {
+            return new LinkedList<>();
+        }
         if (inputConsumer instanceof MultiTopicsConsumerImpl) {
             return ((MultiTopicsConsumerImpl) inputConsumer).getTopics();
         } else {
@@ -302,6 +306,10 @@ class ContextImpl implements Context {
     //TODO remove topic argument
     @Override
     public CompletableFuture<Void> ack(byte[] messageId) {
+        // if inputConsumer is null, then ack is a no-op
+        if (inputConsumer == null) {
+            return CompletableFuture.completedFuture(null);
+        }
         MessageId actualMessageId = null;
         try {
             actualMessageId = MessageId.fromByteArray(messageId);
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 5ab8d85..3bd563e 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.instance;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.functions.api.Function;
@@ -51,13 +52,13 @@ public class JavaInstance implements AutoCloseable {
         // TODO: cache logger instances by functions?
         Logger instanceLog = LoggerFactory.getLogger("function-" + 
config.getFunctionDetails().getName());
 
+        Consumer consumer = null;
         if (source instanceof PulsarSource) {
-            this.context = new ContextImpl(config, instanceLog, pulsarClient, 
clsLoader,
-                    ((PulsarSource) source).getInputConsumer());
-        } else {
-            this.context = null;
+            consumer = ((PulsarSource) source).getInputConsumer();
         }
 
+        this.context = new ContextImpl(config, instanceLog, pulsarClient, 
clsLoader, consumer);
+
         // create the functions
         if (userClassObject instanceof Function) {
             this.function = (Function) userClassObject;

Reply via email to