merlimat closed pull request #1533: [functions] Refactor JavaInstanceRunnable
URL: https://github.com/apache/incubator-pulsar/pull/1533
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
new file mode 100644
index 0000000000..bd7e788836
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.functions.api.SerDe;
+
+@Getter
+@Setter
+@ToString
+public class InputMessage {
+
+    private Message actualMessage;
+    String topicName;
+    SerDe inputSerDe;
+    Consumer consumer;
+
+    public int getTopicPartition() {
+        MessageIdImpl msgId = (MessageIdImpl) actualMessage.getMessageId();
+        return msgId.getPartitionIndex();
+    }
+
+    public void ack() {
+        if (null != consumer) {
+            consumer.acknowledgeAsync(actualMessage);
+        }
+    }
+
+    public void ackCumulative() {
+        if (null != consumer) {
+            consumer.acknowledgeCumulativeAsync(actualMessage);
+        }
+    }
+
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 510608f8e4..008d5a9329 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -22,17 +22,17 @@
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
 
-import com.google.common.collect.Maps;
 import io.netty.buffer.ByteBuf;
 
-import java.util.*;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
 
 import lombok.AccessLevel;
 import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
 import org.apache.bookkeeper.api.StorageClient;
@@ -48,46 +48,35 @@
 import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.core.config.Configuration;
 import org.apache.logging.log4j.core.config.LoggerConfig;
-import org.apache.pulsar.client.api.*;
-import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
-import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
-import org.apache.pulsar.functions.proto.Function.FunctionConfig;
-import 
org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuarantees;
+import org.apache.pulsar.functions.instance.processors.MessageProcessor;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.functions.api.SerDe;
-import 
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
-import org.apache.pulsar.functions.instance.producers.Producers;
-import 
org.apache.pulsar.functions.instance.producers.SimpleOneOuputTopicProducers;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Utils;
 
 /**
  * A function container implemented using java thread.
  */
 @Slf4j
-public class JavaInstanceRunnable implements AutoCloseable, Runnable, 
ConsumerEventListener {
+public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
     // The class loader that used for loading functions
     private ClassLoader fnClassLoader;
     private final InstanceConfig instanceConfig;
-    private final FunctionConfig.ProcessingGuarantees processingGuarantees;
     private final FunctionCacheManager fnCache;
     private final LinkedBlockingDeque<InputMessage> queue;
     private final String jarFile;
 
     // input topic consumer & output topic producer
     private final PulsarClientImpl client;
-    @Getter(AccessLevel.PACKAGE)
-    private Producers outputProducer;
-    @Getter(AccessLevel.PACKAGE)
-    private final Map<String, Consumer> inputConsumers;
-    private LinkedList<String> inputTopicsToResubscribe = null;
+
     private LogAppender logAppender;
 
     // provide tables for storing states
@@ -106,20 +95,9 @@
     private Map<String, SerDe> inputSerDe;
     private SerDe outputSerDe;
 
-    @Getter
-    @Setter
-    @ToString
-    private class InputMessage {
-        private Message actualMessage;
-        String topicName;
-        SerDe inputSerDe;
-        Consumer consumer;
-
-        public int getTopicPartition() {
-            MessageIdImpl msgId = (MessageIdImpl) actualMessage.getMessageId();
-            return msgId.getPartitionIndex();
-        }
-    }
+    @Getter(AccessLevel.PACKAGE)
+    // processor
+    private final MessageProcessor processor;
 
     // function stats
     private final FunctionStats stats;
@@ -130,27 +108,16 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig,
                                 PulsarClient pulsarClient,
                                 String stateStorageServiceUrl) {
         this.instanceConfig = instanceConfig;
-        this.processingGuarantees = 
instanceConfig.getFunctionConfig().getProcessingGuarantees();
         this.fnCache = fnCache;
         this.queue = new 
LinkedBlockingDeque<>(instanceConfig.getMaxBufferedTuples());
         this.jarFile = jarFile;
         this.client = (PulsarClientImpl) pulsarClient;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.stats = new FunctionStats();
-        this.inputConsumers = Maps.newConcurrentMap();
-    }
-
-    private SubscriptionType getSubscriptionType() {
-        if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) {
-            return SubscriptionType.Failover;
-        } else {
-            if (instanceConfig.getFunctionConfig().getSubscriptionType() == 
null
-                || instanceConfig.getFunctionConfig().getSubscriptionType() == 
FunctionConfig.SubscriptionType.SHARED) {
-                return SubscriptionType.Shared;
-            } else {
-                return SubscriptionType.Failover;
-            }
-        }
+        this.processor = MessageProcessor.create(
+            client,
+            instanceConfig.getFunctionConfig(),
+            queue);
     }
 
     /**
@@ -189,13 +156,13 @@ JavaInstance setupJavaInstance() throws Exception {
         // start the state table
         setupStateTable();
         // start the output producer
-        startOutputProducer();
+        processor.setupOutput(outputSerDe);
         // start the input consumer
-        startInputConsumer();
+        processor.setupInput(inputSerDe);
         // start any log topic handler
         setupLogHandler();
 
-        return new JavaInstance(instanceConfig, object, clsLoader, client, 
inputConsumers);
+        return new JavaInstance(instanceConfig, object, clsLoader, client, 
processor.getInputConsumers());
     }
 
     /**
@@ -206,9 +173,7 @@ public void run() {
         try {
             javaInstance = setupJavaInstance();
             while (running) {
-                // some src topics might be put into resubscribe list because 
of processing failure
-                // so this is the chance to resubscribe to those topics.
-                resubscribeTopicsIfNeeded();
+                processor.prepareDequeueMessageFromProcessQueue();
 
                 JavaExecutionResult result;
                 InputMessage msg;
@@ -221,31 +186,9 @@ public void run() {
                     break;
                 }
 
-                if (ProcessingGuarantees.EFFECTIVELY_ONCE == 
processingGuarantees) {
-                    // if the messages are received from old consumers, we 
discard it since new consumer was
-                    // re-created for the correctness of effectively-once
-                    if (msg.getConsumer() != 
inputConsumers.get(msg.getTopicName())) {
-                        continue;
-                    }
-                }
-
-                if (null != outputProducer) {
-                    // before processing the message, we have a producer 
connection setup for producing results.
-                    Producer producer = null;
-                    while (null == producer) {
-                        try {
-                            producer = 
outputProducer.getProducer(msg.getTopicName(), msg.getTopicPartition());
-                        } catch (PulsarClientException e) {
-                            // `ProducerBusy` is thrown when an producer with 
same name is still connected.
-                            // This can happen when a active consumer is 
changed for a given input topic partition
-                            // so we need to wait until the old active 
consumer release the produce connection.
-                            if (!(e instanceof ProducerBusyException)) {
-                                log.error("Failed to get a producer for 
producing results computed from input topic {}",
-                                    msg.getTopicName());
-                            }
-                            TimeUnit.MILLISECONDS.sleep(500);
-                        }
-                    }
+                if (!processor.prepareProcessMessage(msg)) {
+                    // the message can't be processed by the processor at this 
moment, retry it.
+                    continue;
                 }
 
                 // state object is per function, because we need to have the 
ability to know what updates
@@ -318,30 +261,6 @@ private void loadJars() throws Exception {
         Thread.currentThread().setContextClassLoader(fnClassLoader);
     }
 
-    @Override
-    public void becameActive(Consumer consumer, int partitionId) {
-        // if the instance becomes active for a given topic partition,
-        // open a producer for the results computed from this topic partition.
-        if (null != outputProducer) {
-            try {
-                this.outputProducer.getProducer(consumer.getTopic(), 
partitionId);
-            } catch (PulsarClientException e) {
-                // this can be ignored, because producer can be lazily created 
when accessing it.
-                log.warn("Fail to create a producer for results computed from 
messages of topic: {}, partition: {}",
-                    consumer.getTopic(), partitionId);
-            }
-        }
-    }
-
-    @Override
-    public void becameInactive(Consumer consumer, int partitionId) {
-        if (null != outputProducer) {
-            // if I lost the ownership of a partition, close its corresponding 
topic partition.
-            // this is to allow the new active consumer be able to produce to 
the result topic.
-            this.outputProducer.closeProducer(consumer.getTopic(), 
partitionId);
-        }
-    }
-
     private void setupStateTable() throws Exception {
         if (null == stateStorageServiceUrl) {
             return;
@@ -384,226 +303,48 @@ private void setupStateTable() throws Exception {
         this.stateTable = result(storageClient.openTable(tableName));
     }
 
-    private void startOutputProducer() throws Exception {
-        if (instanceConfig.getFunctionConfig().getOutput() != null
-                && !instanceConfig.getFunctionConfig().getOutput().isEmpty()
-                && this.outputSerDe != null) {
-            log.info("Starting producer for output topic " + 
instanceConfig.getFunctionConfig().getOutput());
-
-            if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) 
{
-                this.outputProducer = new MultiConsumersOneOuputTopicProducers(
-                    client, instanceConfig.getFunctionConfig().getOutput());
-            } else {
-                this.outputProducer = new SimpleOneOuputTopicProducers(
-                    client, instanceConfig.getFunctionConfig().getOutput());
-            }
-            this.outputProducer.initialize();
-        }
-    }
-
-    private void startInputConsumer() throws Exception {
-        log.info("Consumer map {}", instanceConfig.getFunctionConfig());
-        for (Map.Entry<String, String> entry : 
instanceConfig.getFunctionConfig().getCustomSerdeInputsMap().entrySet()) {
-            ConsumerConfiguration conf = 
createConsumerConfiguration(entry.getKey());
-            this.inputConsumers.put(entry.getKey(), 
client.subscribe(entry.getKey(),
-                    
FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()), 
conf));
-        }
-        for (String topicName : 
instanceConfig.getFunctionConfig().getInputsList()) {
-            ConsumerConfiguration conf = 
createConsumerConfiguration(topicName);
-            this.inputConsumers.put(topicName, client.subscribe(topicName,
-                    
FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()), 
conf));
-        }
-    }
-
-    private ConsumerConfiguration createConsumerConfiguration(String 
topicName) {
-        log.info("Starting Consumer for topic " + topicName);
-        ConsumerConfiguration conf = new ConsumerConfiguration();
-        conf.setSubscriptionType(getSubscriptionType());
-
-        SerDe inputSerde = inputSerDe.get(topicName);
-        conf.setMessageListener((consumer, msg) -> {
-            try {
-                InputMessage message = new InputMessage();
-                message.setConsumer(consumer);
-                message.setInputSerDe(inputSerde);
-                message.setActualMessage(msg);
-                message.setTopicName(topicName);
-                queue.put(message);
-                if (processingGuarantees == 
FunctionConfig.ProcessingGuarantees.ATMOST_ONCE) {
-                    if (instanceConfig.getFunctionConfig().getAutoAck()) {
-                        consumer.acknowledgeAsync(msg);
-                    }
-                }
-            } catch (InterruptedException e) {
-                log.error("Function container {} is interrupted on enqueuing 
messages",
-                        Thread.currentThread().getId(), e);
-            }
-        });
-
-        // for failover subscription, register a consumer event listener to 
react to active consumer changes.
-        if (getSubscriptionType() == SubscriptionType.Failover) {
-            conf.setConsumerEventListener(this);
-        }
-
-        return conf;
-    }
-
     private void processResult(InputMessage msg,
                                JavaExecutionResult result,
                                long startTime, long endTime) {
          if (result.getUserException() != null) {
             log.info("Encountered user exception when processing message {}", 
msg, result.getUserException());
             stats.incrementUserExceptions(result.getUserException());
-            handleProcessException(msg.getTopicName());
+            processor.handleProcessException(msg, result.getUserException());
         } else if (result.getSystemException() != null) {
             log.info("Encountered system exception when processing message 
{}", msg, result.getSystemException());
             stats.incrementSystemExceptions(result.getSystemException());
-            handleProcessException(msg.getTopicName());
+            processor.handleProcessException(msg, result.getSystemException());
         } else {
             stats.incrementSuccessfullyProcessed(endTime - startTime);
-            if (result.getResult() != null && outputProducer != null) {
+            if (result.getResult() != null && 
instanceConfig.getFunctionConfig().getOutput() != null) {
                 byte[] output;
                 try {
                     output = outputSerDe.serialize(result.getResult());
                 } catch (Exception ex) {
                     stats.incrementSerializationExceptions();
-                    handleProcessException(msg.getTopicName());
+                    processor.handleProcessException(msg, ex);
                     return;
                 }
                 if (output != null) {
-                    try {
-                        sendOutputMessage(msg, result, output);
-                    } catch (Throwable t) {
-                        log.error("Encountered error when sending result {} 
computed from src message {}",
-                            result, msg, t);
-                        handleProcessException(msg.getTopicName());
-                    }
-                } else if (processingGuarantees != 
ProcessingGuarantees.ATMOST_ONCE) {
-                    ackMessage(msg);
+                    sendOutputMessage(msg, output);
+                } else {
+                    processor.sendOutputMessage(msg, null);
                 }
-            } else if (processingGuarantees != 
ProcessingGuarantees.ATMOST_ONCE) {
-                ackMessage(msg);
-            }
-        }
-    }
-
-    private void ackMessage(InputMessage msg) {
-        if (instanceConfig.getFunctionConfig().getAutoAck()) {
-            if (ProcessingGuarantees.EFFECTIVELY_ONCE == processingGuarantees) 
{
-                
msg.getConsumer().acknowledgeCumulativeAsync(msg.getActualMessage());
             } else {
-                msg.getConsumer().acknowledgeAsync(msg.getActualMessage());
+                // the function doesn't produce any result or the user doesn't 
want the result.
+                processor.sendOutputMessage(msg, null);
             }
         }
     }
 
     private void sendOutputMessage(InputMessage srcMsg,
-                                   JavaExecutionResult result,
                                    byte[] output) {
         MessageBuilder msgBuilder = MessageBuilder.create()
             .setContent(output)
             .setProperty("__pfn_input_topic__", srcMsg.getTopicName())
             .setProperty("__pfn_input_msg_id__", new 
String(Base64.getEncoder().encode(srcMsg.getActualMessage().getMessageId().toByteArray())));
-        if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) {
-            msgBuilder = msgBuilder
-                
.setSequenceId(Utils.getSequenceId(srcMsg.getActualMessage().getMessageId()));
-        }
-        Producer producer;
-        try {
-            producer = outputProducer.getProducer(srcMsg.getTopicName(), 
srcMsg.getTopicPartition());
-        } catch (PulsarClientException e) {
-            log.error("Failed to get a producer for producing results computed 
from input topic {}",
-                srcMsg.getTopicName());
-
-            // if we fail to get a producer, put this message back to queue 
and reprocess it.
-            queue.offerFirst(srcMsg);
-            return;
-        }
-
-        Message destMsg = msgBuilder.build();
-        producer.sendAsync(destMsg)
-            .thenAccept(messageId -> {
-                if (processingGuarantees != ProcessingGuarantees.ATMOST_ONCE) {
-                    ackMessage(srcMsg);
-                }
-            })
-            .exceptionally(cause -> {
-                log.error("Failed to send the process result {} of message {} 
to output topic {}",
-                    result, srcMsg, 
instanceConfig.getFunctionConfig().getOutput(), cause);
-                handleProcessException(srcMsg.getTopicName());
-                return null;
-            });
-    }
 
-    private void handleProcessException(String srcTopic) {
-        // if the src message is coming from a shared subscription,
-        // we don't need any special logic on handling failures, just don't 
ack.
-        // the message will be redelivered to other consumer.
-        //
-        // BUT, if the src message is coming from a failover subscription,
-        // we need to stop processing messages and recreate consumer to 
reprocess
-        // the message. otherwise we might break the correctness of 
effectively-once
-        if (getSubscriptionType() != SubscriptionType.Shared) {
-            // in this case (effectively-once), we need to close the consumer
-            // release the partition and open the consumer again. so we 
guarantee
-            // that we always process messages in order
-            //
-            // but this is in pulsar's callback, so add this to a retry list. 
so we can
-            // retry on java instance's main thread.
-            addTopicToResubscribeList(srcTopic);
-        }
-    }
-
-    private synchronized void addTopicToResubscribeList(String topicName) {
-        if (null == inputTopicsToResubscribe) {
-            inputTopicsToResubscribe = new LinkedList<>();
-        }
-        inputTopicsToResubscribe.add(topicName);
-    }
-
-    private void resubscribeTopicsIfNeeded() {
-        List<String> topicsToResubscribe;
-        synchronized (this) {
-            topicsToResubscribe = inputTopicsToResubscribe;
-            inputTopicsToResubscribe = null;
-        }
-        if (null != topicsToResubscribe) {
-            for (String topic : topicsToResubscribe) {
-                resubscribe(topic);
-            }
-        }
-    }
-
-    private void resubscribe(String srcTopic) {
-        // if we can not produce a message to output topic, then close the 
consumer of the src topic
-        // and retry to instantiate a consumer again.
-        Consumer consumer = inputConsumers.remove(srcTopic);
-        if (consumer != null) {
-            // TODO (sijie): currently we have to close the entire consumer 
for a given topic. However
-            //               ideally we should do this in a finer granularity 
- we can close consumer
-            //               on a given partition, without impact other 
partitions.
-            try {
-                consumer.close();
-            } catch (PulsarClientException e) {
-                log.error("Failed to close consumer for input topic {} when 
handling produce exceptions",
-                    srcTopic, e);
-            }
-        }
-        // subscribe to the src topic again
-        ConsumerConfiguration conf = createConsumerConfiguration(srcTopic);
-        try {
-            inputConsumers.put(
-                srcTopic,
-                client.subscribe(
-                    srcTopic,
-                    
FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()),
-                    conf
-                ));
-        } catch (PulsarClientException e) {
-            log.error("Failed to resubscribe to input topic {}. Added it to 
retry list and retry it later",
-                srcTopic, e);
-            addTopicToResubscribeList(srcTopic);
-        }
+        processor.sendOutputMessage(srcMsg, msgBuilder);
     }
 
     @Override
@@ -612,21 +353,8 @@ public void close() {
             return;
         }
         running = false;
-        // stop the consumer first, so no more messages are coming in
-        inputConsumers.forEach((k, v) -> {
-            try {
-                v.close();
-            } catch (PulsarClientException e) {
-                log.warn("Failed to close consumer to input topic {}", k, e);
-            }
-        });
-        inputConsumers.clear();
 
-        // kill the result producer
-        if (null != outputProducer) {
-            outputProducer.close();
-            outputProducer = null;
-        }
+        processor.close();
 
         // kill the state table
         if (null != stateTable) {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
new file mode 100644
index 0000000000..740fd5c732
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.processors;
+
+import java.util.concurrent.LinkedBlockingDeque;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.functions.instance.InputMessage;
+import 
org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
+import org.apache.pulsar.functions.proto.Function.FunctionConfig;
+
+/**
+ * A message processor that process messages at-most-once.
+ */
+@Slf4j
+public class AtLeastOnceProcessor extends MessageProcessorBase {
+
+    @Getter
+    private Producer producer;
+
+    AtLeastOnceProcessor(PulsarClient client,
+                         FunctionConfig functionConfig,
+                         SubscriptionType subType,
+                         LinkedBlockingDeque<InputMessage> processQueue) {
+        super(client, functionConfig, subType, processQueue);
+    }
+
+    @Override
+    protected void initializeOutputProducer(String outputTopic) throws 
Exception {
+        producer = AbstractOneOuputTopicProducers.createProducer(client, 
outputTopic);
+    }
+
+    @Override
+    public void sendOutputMessage(InputMessage inputMsg, MessageBuilder 
outputMsgBuilder) {
+        if (null == outputMsgBuilder) {
+            inputMsg.ack();
+            return;
+        }
+
+        Message outputMsg = outputMsgBuilder.build();
+        producer.sendAsync(outputMsg)
+            .thenAccept(msgId -> inputMsg.ack());
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        if (null != producer) {
+            try {
+                producer.close();
+            } catch (PulsarClientException e) {
+                log.warn("Fail to close producer for processor {}", 
functionConfig.getOutput(), e);
+            }
+        }
+    }
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
new file mode 100644
index 0000000000..1713b73479
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.processors;
+
+import java.util.concurrent.LinkedBlockingDeque;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.functions.instance.InputMessage;
+import 
org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
+import org.apache.pulsar.functions.proto.Function.FunctionConfig;
+
+/**
+ * A message processor that process messages at-most-once.
+ */
+@Slf4j
+class AtMostOnceProcessor extends MessageProcessorBase {
+
+    private Producer producer;
+
+    AtMostOnceProcessor(PulsarClient client,
+                        FunctionConfig functionConfig,
+                        SubscriptionType subType,
+                        LinkedBlockingDeque<InputMessage> processQueue) {
+        super(client, functionConfig, subType, processQueue);
+    }
+
+    @Override
+    protected void postReceiveMessage(InputMessage message) {
+        super.postReceiveMessage(message);
+        if (functionConfig.getAutoAck()) {
+            message.ack();
+        }
+    }
+
+    @Override
+    protected void initializeOutputProducer(String outputTopic) throws 
Exception {
+        producer = AbstractOneOuputTopicProducers.createProducer(client, 
outputTopic);
+    }
+
+    @Override
+    public void sendOutputMessage(InputMessage inputMsg, MessageBuilder 
outputMsgBuilder) {
+        if (null == outputMsgBuilder) {
+            return;
+        }
+
+        Message outputMsg = outputMsgBuilder.build();
+        producer.sendAsync(outputMsg);
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        if (null != producer) {
+            try {
+                producer.close();
+            } catch (PulsarClientException e) {
+                log.warn("Fail to close producer for processor {}", 
functionConfig.getOutput(), e);
+            }
+        }
+    }
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
new file mode 100644
index 0000000000..f039cdfa64
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.processors;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.functions.instance.InputMessage;
+import 
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
+import org.apache.pulsar.functions.instance.producers.Producers;
+import org.apache.pulsar.functions.proto.Function.FunctionConfig;
+import org.apache.pulsar.functions.utils.FunctionConfigUtils;
+import org.apache.pulsar.functions.utils.Utils;
+
+/**
+ * A message processor that process messages effectively-once.
+ */
+@Slf4j
+class EffectivelyOnceProcessor extends MessageProcessorBase implements 
ConsumerEventListener {
+
+    private LinkedList<String> inputTopicsToResubscribe = null;
+
+    @Getter(AccessLevel.PACKAGE)
+    protected Producers outputProducer;
+
+    EffectivelyOnceProcessor(PulsarClient client,
+                             FunctionConfig functionConfig,
+                             LinkedBlockingDeque<InputMessage> processQueue) {
+        super(client, functionConfig, SubscriptionType.Failover, processQueue);
+    }
+
+    /**
+     * An effectively-once processor can only use `Failover` subscription.
+     */
+    @Override
+    protected SubscriptionType getSubscriptionType() {
+        return SubscriptionType.Failover;
+    }
+
+    @Override
+    protected ConsumerConfiguration createConsumerConfiguration(String 
topicName) {
+        ConsumerConfiguration conf = 
super.createConsumerConfiguration(topicName);
+        // for effectively-once processor, register a consumer event listener 
to react to active consumer changes.
+        conf.setConsumerEventListener(this);
+        return conf;
+    }
+
+    @Override
+    public void becameActive(Consumer<?> consumer, int partitionId) {
+        // if the instance becomes active for a given topic partition,
+        // open a producer for the results computed from this topic partition.
+        if (null != outputProducer) {
+            try {
+                this.outputProducer.getProducer(consumer.getTopic(), 
partitionId);
+            } catch (PulsarClientException e) {
+                // this can be ignored, because producer can be lazily created 
when accessing it.
+                log.warn("Fail to create a producer for results computed from 
messages of topic: {}, partition: {}",
+                    consumer.getTopic(), partitionId);
+            }
+        }
+    }
+
+    @Override
+    public void becameInactive(Consumer<?> consumer, int partitionId) {
+        if (null != outputProducer) {
+            // if I lost the ownership of a partition, close its corresponding 
topic partition.
+            // this is to allow the new active consumer be able to produce to 
the result topic.
+            this.outputProducer.closeProducer(consumer.getTopic(), 
partitionId);
+        }
+    }
+
+    @Override
+    protected void initializeOutputProducer(String outputTopic) throws 
Exception {
+        outputProducer = new MultiConsumersOneOuputTopicProducers(client, 
outputTopic);
+        outputProducer.initialize();
+    }
+
+    //
+    // Methods to process messages
+    //
+
+    @Override
+    public void prepareDequeueMessageFromProcessQueue() {
+        super.prepareDequeueMessageFromProcessQueue();
+        // some src topics might be put into resubscribe list because of 
processing failure
+        // so this is the chance to resubscribe to those topics.
+        resubscribeTopicsIfNeeded();
+    }
+
+    @Override
+    public boolean prepareProcessMessage(InputMessage msg) throws 
InterruptedException {
+        boolean prepared = super.prepareProcessMessage(msg);
+        if (prepared) {
+            // if the messages are received from old consumers, we discard it 
since new consumer was
+            // re-created for the correctness of effectively-once
+            if (msg.getConsumer() != inputConsumers.get(msg.getTopicName())) {
+                return false;
+            }
+
+            if (null != outputProducer) {
+                // before processing the message, we have a producer 
connection setup for producing results.
+                Producer producer = null;
+                while (null == producer) {
+                    try {
+                        producer = 
outputProducer.getProducer(msg.getTopicName(), msg.getTopicPartition());
+                    } catch (PulsarClientException e) {
+                        // `ProducerBusy` is thrown when an producer with same 
name is still connected.
+                        // This can happen when a active consumer is changed 
for a given input topic partition
+                        // so we need to wait until the old active consumer 
release the produce connection.
+                        if (!(e instanceof ProducerBusyException)) {
+                            log.error("Failed to get a producer for producing 
results computed from input topic {}",
+                                msg.getTopicName());
+                        }
+                        TimeUnit.MILLISECONDS.sleep(500);
+                    }
+                }
+            }
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public void sendOutputMessage(InputMessage inputMsg,
+                                  MessageBuilder outputMsgBuilder) {
+        if (null == outputMsgBuilder) {
+            inputMsg.ackCumulative();
+            return;
+        }
+
+        // assign sequence id to output message for idempotent producing
+        outputMsgBuilder = outputMsgBuilder
+            
.setSequenceId(Utils.getSequenceId(inputMsg.getActualMessage().getMessageId()));
+
+        Producer producer;
+        try {
+            producer = outputProducer.getProducer(inputMsg.getTopicName(), 
inputMsg.getTopicPartition());
+        } catch (PulsarClientException e) {
+            log.error("Failed to get a producer for producing results computed 
from input topic {}",
+                inputMsg.getTopicName());
+
+            // if we fail to get a producer, put this message back to queue 
and reprocess it.
+            processQueue.offerFirst(inputMsg);
+            return;
+        }
+
+        Message outputMsg = outputMsgBuilder.build();
+        producer.sendAsync(outputMsg)
+            .thenAccept(messageId -> inputMsg.ackCumulative())
+            .exceptionally(cause -> {
+                log.error("Failed to send the process result {} of message {} 
to output topic {}",
+                    outputMsg, inputMsg, functionConfig.getOutput(), cause);
+                handleProcessException(inputMsg.getTopicName());
+                return null;
+            });
+    }
+
+    @Override
+    public void handleProcessException(InputMessage msg, Exception cause) {
+        handleProcessException(msg.getTopicName());
+    }
+
+    private void handleProcessException(String srcTopic) {
+        // if the src message is coming from a shared subscription,
+        // we don't need any special logic on handling failures, just don't 
ack.
+        // the message will be redelivered to other consumer.
+        //
+        // BUT, if the src message is coming from a failover subscription,
+        // we need to stop processing messages and recreate consumer to 
reprocess
+        // the message. otherwise we might break the correctness of 
effectively-once
+        //
+        // in this case (effectively-once), we need to close the consumer
+        // release the partition and open the consumer again. so we guarantee
+        // that we always process messages in order
+        //
+        // but this is in pulsar's callback, so add this to a retry list. so 
we can
+        // retry on java instance's main thread.
+        addTopicToResubscribeList(srcTopic);
+    }
+
+    private synchronized void addTopicToResubscribeList(String topicName) {
+        if (null == inputTopicsToResubscribe) {
+            inputTopicsToResubscribe = new LinkedList<>();
+        }
+        inputTopicsToResubscribe.add(topicName);
+    }
+
+    private void resubscribeTopicsIfNeeded() {
+        List<String> topicsToResubscribe;
+        synchronized (this) {
+            topicsToResubscribe = inputTopicsToResubscribe;
+            inputTopicsToResubscribe = null;
+        }
+        if (null != topicsToResubscribe) {
+            for (String topic : topicsToResubscribe) {
+                resubscribe(topic);
+            }
+        }
+    }
+
+    private void resubscribe(String srcTopic) {
+        // if we can not produce a message to output topic, then close the 
consumer of the src topic
+        // and retry to instantiate a consumer again.
+        Consumer consumer = inputConsumers.remove(srcTopic);
+        if (consumer != null) {
+            // TODO (sijie): currently we have to close the entire consumer 
for a given topic. However
+            //               ideally we should do this in a finer granularity 
- we can close consumer
+            //               on a given partition, without impact other 
partitions.
+            try {
+                consumer.close();
+            } catch (PulsarClientException e) {
+                log.error("Failed to close consumer for input topic {} when 
handling produce exceptions",
+                    srcTopic, e);
+            }
+        }
+        // subscribe to the src topic again
+        ConsumerConfiguration conf = createConsumerConfiguration(srcTopic);
+        try {
+            inputConsumers.put(
+                srcTopic,
+                client.subscribe(
+                    srcTopic,
+                    FunctionConfigUtils.getFullyQualifiedName(functionConfig),
+                    conf
+                ));
+        } catch (PulsarClientException e) {
+            log.error("Failed to resubscribe to input topic {}. Added it to 
retry list and retry it later",
+                srcTopic, e);
+            addTopicToResubscribeList(srcTopic);
+        }
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        // kill the result producer
+        if (null != outputProducer) {
+            outputProducer.close();
+            outputProducer = null;
+        }
+    }
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
new file mode 100644
index 0000000000..b475e9f0d6
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.processors;
+
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingDeque;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.instance.InputMessage;
+import org.apache.pulsar.functions.proto.Function.FunctionConfig;
+import 
org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuarantees;
+
+/**
+ * A processor that processes messages, used by {@link 
org.apache.pulsar.functions.instance.JavaInstanceRunnable}.
+ */
+@Evolving
+public interface MessageProcessor extends AutoCloseable {
+
+    static MessageProcessor create(PulsarClient client,
+                                   FunctionConfig functionConfig,
+                                   LinkedBlockingDeque<InputMessage> 
processQeueue) {
+        FunctionConfig.SubscriptionType fnSubType = 
functionConfig.getSubscriptionType();
+        ProcessingGuarantees processingGuarantees = 
functionConfig.getProcessingGuarantees();
+        SubscriptionType subType;
+        if (null == fnSubType || FunctionConfig.SubscriptionType.SHARED == 
fnSubType) {
+            subType = SubscriptionType.Shared;
+        } else {
+            subType = SubscriptionType.Failover;
+        }
+
+        if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) {
+            return new EffectivelyOnceProcessor(
+                client,
+                functionConfig,
+                processQeueue);
+        } else if (processingGuarantees == ProcessingGuarantees.ATMOST_ONCE) {
+            return new AtMostOnceProcessor(
+                client,
+                functionConfig,
+                subType,
+                processQeueue);
+        } else {
+            return new AtLeastOnceProcessor(
+                client,
+                functionConfig,
+                subType,
+                processQeueue);
+        }
+    }
+
+    /**
+     * Setup the input with a provided <i>processQueue</i>. The implementation 
of this processor is responsible for
+     * setting up the input and passing the received messages from input to 
the provided <i>processQueue</i>.
+     *
+     * @param inputSerDe SerDe to deserialize messages from input.
+     */
+    void setupInput(Map<String, SerDe> inputSerDe)
+        throws Exception;
+
+    /**
+     * Return the map of input consumers.
+     *
+     * @return the map of input consumers.
+     */
+    Map<String, Consumer> getInputConsumers();
+
+    /**
+     * Setup the output with a provided <i>outputSerDe</i>. The implementation 
of this processor is responsible for
+     * setting up the output
+     *
+     * @param outputSerDe output serde.
+     * @throws Exception
+     */
+    void setupOutput(SerDe outputSerDe) throws Exception;
+
+
+    //
+    // Methods that called on processing messages
+    //
+
+    /**
+     * Method that is called before taking a message from process queue to 
process.
+     *
+     * <p>The implementation can do actions like failure recovery before 
actually taking messages out of process queue to
+     * process.
+     */
+    void prepareDequeueMessageFromProcessQueue();
+
+    /**
+     * Method that is called before processing the input message <i>msg</i>.
+     *
+     * <p>The implementation can do actions like ensuring producer is ready 
for producing results.
+     *
+     * @param msg input message.
+     * @return true if the msg can be process, otherwise false. If a processor 
can't process this message at this moment,
+     *          this message will be put back to the process queue and process 
later.
+     * @throws InterruptedException when the processor is interrupted on 
preparing processing message.
+     */
+    boolean prepareProcessMessage(InputMessage msg) throws 
InterruptedException;
+
+    /**
+     * Send the output message to the output topic. The output message is 
computed from <i>inputMsg</i>.
+     *
+     * <p>If the <i>outputMsgBuilder</i> is null, the implementation doesn't 
have to send any messages to the output.
+     * The implementation can decide to acknowledge the input message based on 
its process guarantees.
+     *
+     * @param inputMsg input message
+     * @param outputMsgBuilder output message builder. it can be null.
+     */
+    void sendOutputMessage(InputMessage inputMsg,
+                           MessageBuilder outputMsgBuilder);
+
+    /**
+     * Handle the process exception when processing input message 
<i>inputMsg</i>.
+     *
+     * @param inputMsg input message
+     * @param exception exception thrown when processing input message.
+     */
+    void handleProcessException(InputMessage inputMsg, Exception exception);
+
+
+    @Override
+    void close();
+
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
new file mode 100644
index 0000000000..ddb5f79437
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.processors;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingDeque;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.instance.InputMessage;
+import org.apache.pulsar.functions.proto.Function.FunctionConfig;
+import org.apache.pulsar.functions.utils.FunctionConfigUtils;
+
+/**
+ * The base implementation of {@link MessageProcessor}.
+ */
+@Slf4j
+abstract class MessageProcessorBase implements MessageProcessor {
+
+    protected final PulsarClient client;
+    protected final FunctionConfig functionConfig;
+    protected final SubscriptionType subType;
+    protected final LinkedBlockingDeque<InputMessage> processQueue;
+
+    @Getter
+    protected final Map<String, Consumer> inputConsumers;
+    protected Map<String, SerDe> inputSerDe;
+
+    protected SerDe outputSerDe;
+
+    protected MessageProcessorBase(PulsarClient client,
+                                   FunctionConfig functionConfig,
+                                   SubscriptionType subType,
+                                   LinkedBlockingDeque<InputMessage> 
processQueue) {
+        this.client = client;
+        this.functionConfig = functionConfig;
+        this.subType = subType;
+        this.processQueue = processQueue;
+        this.inputConsumers = Maps.newConcurrentMap();
+    }
+
+    //
+    // Input
+    //
+
+    @Override
+    public void setupInput(Map<String, SerDe> inputSerDe) throws Exception {
+        log.info("Setting up input with input serdes: {}", inputSerDe);
+        this.inputSerDe = inputSerDe;
+        for (Map.Entry<String, String> entry : 
functionConfig.getCustomSerdeInputsMap().entrySet()) {
+            ConsumerConfiguration conf = 
createConsumerConfiguration(entry.getKey());
+            this.inputConsumers.put(entry.getKey(), 
client.subscribe(entry.getKey(),
+                    FunctionConfigUtils.getFullyQualifiedName(functionConfig), 
conf));
+        }
+        for (String topicName : functionConfig.getInputsList()) {
+            ConsumerConfiguration conf = 
createConsumerConfiguration(topicName);
+            this.inputConsumers.put(topicName, client.subscribe(topicName,
+                    FunctionConfigUtils.getFullyQualifiedName(functionConfig), 
conf));
+        }
+    }
+
+    protected SubscriptionType getSubscriptionType() {
+        return subType;
+    }
+
+    protected ConsumerConfiguration createConsumerConfiguration(String 
topicName) {
+        log.info("Starting Consumer for topic " + topicName);
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(getSubscriptionType());
+
+        SerDe inputSerde = inputSerDe.get(topicName);
+        conf.setMessageListener((consumer, msg) -> {
+            try {
+                InputMessage message = new InputMessage();
+                message.setConsumer(consumer);
+                message.setInputSerDe(inputSerde);
+                message.setActualMessage(msg);
+                message.setTopicName(topicName);
+                processQueue.put(message);
+                postReceiveMessage(message);
+            } catch (InterruptedException e) {
+                log.error("Function container {} is interrupted on enqueuing 
messages",
+                        Thread.currentThread().getId(), e);
+            }
+        });
+        return conf;
+    }
+
+    /**
+     * Method called when a message is received from input after being put 
into the process queue.
+     *
+     * <p>The processor implementation can make a decision to process the 
message based on its processing guarantees.
+     * for example, an at-most-once processor can ack the message immediately.
+     *
+     * @param message input message.
+     */
+    protected void postReceiveMessage(InputMessage message) {}
+
+    //
+    // Output
+    //
+
+    @Override
+    public void setupOutput(SerDe outputSerDe) throws Exception {
+        this.outputSerDe = outputSerDe;
+
+        String outputTopic = functionConfig.getOutput();
+        if (outputTopic != null
+                && !functionConfig.getOutput().isEmpty()
+                && outputSerDe != null) {
+            log.info("Starting producer for output topic {}", outputTopic);
+            initializeOutputProducer(outputTopic);
+        }
+    }
+
+    protected abstract void initializeOutputProducer(String outputTopic) 
throws Exception;
+
+    //
+    // Process
+    //
+
+    @Override
+    public void prepareDequeueMessageFromProcessQueue() {}
+
+    @Override
+    public boolean prepareProcessMessage(InputMessage msg) throws 
InterruptedException {
+        return true;
+    }
+
+    @Override
+    public void handleProcessException(InputMessage msg, Exception cause) {}
+
+    @Override
+    public void close() {
+        // stop the consumer first, so no more messages are coming in
+        inputConsumers.forEach((k, v) -> {
+            try {
+                v.close();
+            } catch (PulsarClientException e) {
+                log.warn("Failed to close consumer to input topic {}", k, e);
+            }
+        });
+        inputConsumers.clear();
+    }
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
index 723529d51c..bba9a7f861 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
@@ -28,7 +28,7 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.instance.FunctionResultRouter;
 
-abstract class AbstractOneOuputTopicProducers implements Producers {
+public abstract class AbstractOneOuputTopicProducers implements Producers {
 
     protected final PulsarClient client;
     protected final String outputTopic;
@@ -42,7 +42,7 @@
         this.conf = newProducerConfiguration();
     }
 
-    protected ProducerConfiguration newProducerConfiguration() {
+    static ProducerConfiguration newProducerConfiguration() {
         ProducerConfiguration conf = new ProducerConfiguration();
         conf.setBlockIfQueueFull(true);
         conf.setBatchingEnabled(true);
@@ -58,11 +58,21 @@ protected ProducerConfiguration newProducerConfiguration() {
 
     protected Producer createProducer(String topic)
             throws PulsarClientException {
+        return createProducer(client, topic);
+    }
+
+    public static Producer createProducer(PulsarClient client, String topic)
+            throws PulsarClientException {
         return client.createProducer(topic, newProducerConfiguration());
     }
 
     protected Producer createProducer(String topic, String producerName)
             throws PulsarClientException {
+        return createProducer(client, topic, producerName);
+    }
+
+    public static Producer createProducer(PulsarClient client, String topic, 
String producerName)
+        throws PulsarClientException {
         ProducerConfiguration newConf = newProducerConfiguration();
         newConf.setProducerName(producerName);
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneOuputTopicProducers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneOuputTopicProducers.java
deleted file mode 100644
index 2fee37a4f2..0000000000
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneOuputTopicProducers.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance.producers;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-
-@Slf4j
-public class SimpleOneOuputTopicProducers extends 
AbstractOneOuputTopicProducers {
-
-    @Getter
-    private Producer producer;
-
-    public SimpleOneOuputTopicProducers(PulsarClient client, String 
outputTopic) throws PulsarClientException {
-        super(client, outputTopic);
-    }
-
-    @Override
-    public void initialize() throws PulsarClientException {
-        producer = createProducer(outputTopic);
-    }
-
-    @Override
-    public Producer getProducer(String srcTopic, int srcTopicPartition) {
-        checkNotNull(producer, "Producer is not initialized yet");
-        return producer;
-    }
-
-    @Override
-    public void closeProducer(String srcTopicName, int srcTopicPartition) {
-        // no-op
-    }
-
-    @Override
-    public void close() {
-        try {
-            producer.close();
-        } catch (PulsarClientException e) {
-            log.warn("Fail to close producer of topic {}", outputTopic, e);
-        }
-    }
-}
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
index 20644d2f01..8e099a6bce 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
@@ -76,12 +76,12 @@
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
+import org.apache.pulsar.functions.instance.processors.AtLeastOnceProcessor;
+import org.apache.pulsar.functions.instance.processors.MessageProcessor;
 import org.apache.pulsar.functions.proto.Function.FunctionConfig;
 import 
org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuarantees;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
-import org.apache.pulsar.functions.instance.producers.Producers;
-import 
org.apache.pulsar.functions.instance.producers.SimpleOneOuputTopicProducers;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.Utils;
 import org.powermock.api.mockito.PowerMockito;
@@ -437,15 +437,15 @@ public void testSetupJavaInstance() throws Exception {
         }
 
         // 3. verify producers and consumers are setup
-        Producers producers = runnable.getOutputProducer();
-        assertTrue(producers instanceof SimpleOneOuputTopicProducers);
+        MessageProcessor processor = runnable.getProcessor();
+        assertTrue(processor instanceof AtLeastOnceProcessor);
         assertSame(mockProducers.get(Pair.of(
             fnConfig.getOutput(),
             null
-        )).getProducer(), ((SimpleOneOuputTopicProducers) 
producers).getProducer());
+        )).getProducer(), ((AtLeastOnceProcessor) processor).getProducer());
 
-        assertEquals(mockConsumers.size(), 
runnable.getInputConsumers().size());
-        for (Map.Entry<String, Consumer> consumerEntry : 
runnable.getInputConsumers().entrySet()) {
+        assertEquals(mockConsumers.size(), 
processor.getInputConsumers().size());
+        for (Map.Entry<String, Consumer> consumerEntry : 
processor.getInputConsumers().entrySet()) {
             String topic = consumerEntry.getKey();
 
             Consumer mockConsumer = mockConsumers.get(Pair.of(
@@ -464,7 +464,7 @@ public void testSetupJavaInstance() throws Exception {
         for (ConsumerInstance consumer : mockConsumers.values()) {
             verify(consumer.getConsumer(), times(1)).close();
         }
-        assertTrue(runnable.getInputConsumers().isEmpty());
+        assertTrue(processor.getInputConsumers().isEmpty());
 
         for (ProducerInstance producer : mockProducers.values()) {
             verify(producer.getProducer(), times(1)).close();
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneOutputTopicProducersTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneOutputTopicProducersTest.java
deleted file mode 100644
index 67337a15f9..0000000000
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneOutputTopicProducersTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance.producers;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertSame;
-
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-/**
- * Unit test of {@link SimpleOneOuputTopicProducers}.
- */
-public class SimpleOneOutputTopicProducersTest {
-
-    private static final String TEST_OUTPUT_TOPIC = "test-output-topic";
-
-    private PulsarClient mockClient;
-    private Producer mockProducer;
-    private SimpleOneOuputTopicProducers producers;
-
-    @BeforeMethod
-    public void setup() throws Exception {
-        this.mockClient = mock(PulsarClient.class);
-        this.mockProducer = mock(Producer.class);
-
-        when(mockClient.createProducer(anyString(), 
any(ProducerConfiguration.class)))
-            .thenReturn(mockProducer);
-
-        this.producers = new SimpleOneOuputTopicProducers(mockClient, 
TEST_OUTPUT_TOPIC);
-    }
-
-    @Test
-    public void testInitializeClose() throws Exception {
-        this.producers.initialize();
-
-        verify(mockClient, times(1))
-            .createProducer(eq(TEST_OUTPUT_TOPIC), 
any(ProducerConfiguration.class));
-
-        this.producers.close();
-
-        verify(mockProducer, times(1)).close();
-    }
-
-    @Test(expectedExceptions = NullPointerException.class)
-    public void testGetProducerWithoutInitialization() throws Exception {
-        this.producers.getProducer("test-src-topic", 0);
-    }
-
-    @Test
-    public void testGetAndCloseProducer() throws Exception {
-        this.producers.initialize();
-
-        verify(mockClient, times(1))
-            .createProducer(eq(TEST_OUTPUT_TOPIC), 
any(ProducerConfiguration.class));
-
-        assertSame(mockProducer, this.producers.getProducer("test-src-topic", 
0));
-
-        verify(mockClient, times(1))
-            .createProducer(eq(TEST_OUTPUT_TOPIC), 
any(ProducerConfiguration.class));
-
-        producers.closeProducer("test-src-topic", 0);
-
-        assertSame(mockProducer, producers.getProducer());
-
-        verify(mockProducer, times(0)).close();
-    }
-
-}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to