This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 5766844 fix bug in concerning ContextImpl (#2052) 5766844 is described below commit 57668440104fb56ce932287190f5203e2073bd62 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Mon Jul 2 00:18:28 2018 -0700 fix bug in concerning ContextImpl (#2052) NPE thrown when submitting a source because context is null --- .../apache/pulsar/functions/instance/ContextImpl.java | 16 ++++++++++++---- .../apache/pulsar/functions/instance/JavaInstance.java | 9 +++++---- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 5ca07d9..e2887bf 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -18,11 +18,8 @@ */ package org.apache.pulsar.functions.instance; -import static com.google.common.base.Preconditions.checkState; - import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; -import java.nio.ByteBuffer; import lombok.Getter; import lombok.Setter; import org.apache.commons.lang.StringUtils; @@ -36,15 +33,17 @@ import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.api.utils.DefaultSerDe; -import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData; import org.apache.pulsar.functions.instance.state.StateContextImpl; +import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData; import org.apache.pulsar.functions.utils.Reflections; import org.slf4j.Logger; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.LinkedList; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -52,6 +51,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import static com.google.common.base.Preconditions.checkState; + /** * This class implements the Context interface exposed to the user. */ @@ -141,6 +142,9 @@ class ContextImpl implements Context { @Override public Collection<String> getInputTopics() { + if (inputConsumer == null) { + return new LinkedList<>(); + } if (inputConsumer instanceof MultiTopicsConsumerImpl) { return ((MultiTopicsConsumerImpl) inputConsumer).getTopics(); } else { @@ -302,6 +306,10 @@ class ContextImpl implements Context { //TODO remove topic argument @Override public CompletableFuture<Void> ack(byte[] messageId) { + // if inputConsumer is null, then ack is a no-op + if (inputConsumer == null) { + return CompletableFuture.completedFuture(null); + } MessageId actualMessageId = null; try { actualMessageId = MessageId.fromByteArray(messageId); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java index 5ab8d85..3bd563e 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java @@ -21,6 +21,7 @@ package org.apache.pulsar.functions.instance; import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.functions.api.Function; @@ -51,13 +52,13 @@ public class JavaInstance implements AutoCloseable { // TODO: cache logger instances by functions? Logger instanceLog = LoggerFactory.getLogger("function-" + config.getFunctionDetails().getName()); + Consumer consumer = null; if (source instanceof PulsarSource) { - this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, - ((PulsarSource) source).getInputConsumer()); - } else { - this.context = null; + consumer = ((PulsarSource) source).getInputConsumer(); } + this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, consumer); + // create the functions if (userClassObject instanceof Function) { this.function = (Function) userClassObject;