This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 22e45e7  Refactor JavaInstanceRunnable (#1533)
22e45e7 is described below

commit 22e45e70c593118af849a17cad2c2de433344625
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Tue Apr 10 20:14:51 2018 -0700

    Refactor JavaInstanceRunnable (#1533)
    
    *Motivation*
    
    EffectivelyOnce processing made `JavaInstanceRunnable` very complicated. 
There are tons of braching logic. It makes code hard to maintain.
    
    *Solution*
    
    Abstract the processing guarantee related logic into a `MessageProcessor` 
interface. Implement `at-most-once`, `at-least-once` and `effectively-once` 
processors.
    
    *Result*
    
    After this change, `JavaInstanceRunnable` is much cleaner. `At-Least-Once` 
and `At-Most-Once` logic are much simpler and easier to debug.
---
 .../pulsar/functions/instance/InputMessage.java    |  56 ++++
 .../functions/instance/JavaInstanceRunnable.java   | 342 +++------------------
 .../instance/processors/AtLeastOnceProcessor.java  |  78 +++++
 .../instance/processors/AtMostOnceProcessor.java   |  82 +++++
 .../processors/EffectivelyOnceProcessor.java       | 273 ++++++++++++++++
 .../instance/processors/MessageProcessor.java      | 145 +++++++++
 .../instance/processors/MessageProcessorBase.java  | 167 ++++++++++
 .../producers/AbstractOneOuputTopicProducers.java  |  14 +-
 .../producers/SimpleOneOuputTopicProducers.java    |  63 ----
 .../instance/JavaInstanceRunnableProcessTest.java  |  16 +-
 .../SimpleOneOutputTopicProducersTest.java         |  94 ------
 11 files changed, 856 insertions(+), 474 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
new file mode 100644
index 0000000..bd7e788
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.functions.api.SerDe;
+
+@Getter
+@Setter
+@ToString
+public class InputMessage {
+
+    private Message actualMessage;
+    String topicName;
+    SerDe inputSerDe;
+    Consumer consumer;
+
+    public int getTopicPartition() {
+        MessageIdImpl msgId = (MessageIdImpl) actualMessage.getMessageId();
+        return msgId.getPartitionIndex();
+    }
+
+    public void ack() {
+        if (null != consumer) {
+            consumer.acknowledgeAsync(actualMessage);
+        }
+    }
+
+    public void ackCumulative() {
+        if (null != consumer) {
+            consumer.acknowledgeCumulativeAsync(actualMessage);
+        }
+    }
+
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 510608f..008d5a9 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -22,17 +22,17 @@ package org.apache.pulsar.functions.instance;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
 
-import com.google.common.collect.Maps;
 import io.netty.buffer.ByteBuf;
 
-import java.util.*;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
 
 import lombok.AccessLevel;
 import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
 import org.apache.bookkeeper.api.StorageClient;
@@ -48,46 +48,35 @@ import org.apache.logging.log4j.ThreadContext;
 import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.core.config.Configuration;
 import org.apache.logging.log4j.core.config.LoggerConfig;
-import org.apache.pulsar.client.api.*;
-import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
-import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
-import org.apache.pulsar.functions.proto.Function.FunctionConfig;
-import 
org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuarantees;
+import org.apache.pulsar.functions.instance.processors.MessageProcessor;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.functions.api.SerDe;
-import 
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
-import org.apache.pulsar.functions.instance.producers.Producers;
-import 
org.apache.pulsar.functions.instance.producers.SimpleOneOuputTopicProducers;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Utils;
 
 /**
  * A function container implemented using java thread.
  */
 @Slf4j
-public class JavaInstanceRunnable implements AutoCloseable, Runnable, 
ConsumerEventListener {
+public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
     // The class loader that used for loading functions
     private ClassLoader fnClassLoader;
     private final InstanceConfig instanceConfig;
-    private final FunctionConfig.ProcessingGuarantees processingGuarantees;
     private final FunctionCacheManager fnCache;
     private final LinkedBlockingDeque<InputMessage> queue;
     private final String jarFile;
 
     // input topic consumer & output topic producer
     private final PulsarClientImpl client;
-    @Getter(AccessLevel.PACKAGE)
-    private Producers outputProducer;
-    @Getter(AccessLevel.PACKAGE)
-    private final Map<String, Consumer> inputConsumers;
-    private LinkedList<String> inputTopicsToResubscribe = null;
+
     private LogAppender logAppender;
 
     // provide tables for storing states
@@ -106,20 +95,9 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable, ConsumerEv
     private Map<String, SerDe> inputSerDe;
     private SerDe outputSerDe;
 
-    @Getter
-    @Setter
-    @ToString
-    private class InputMessage {
-        private Message actualMessage;
-        String topicName;
-        SerDe inputSerDe;
-        Consumer consumer;
-
-        public int getTopicPartition() {
-            MessageIdImpl msgId = (MessageIdImpl) actualMessage.getMessageId();
-            return msgId.getPartitionIndex();
-        }
-    }
+    @Getter(AccessLevel.PACKAGE)
+    // processor
+    private final MessageProcessor processor;
 
     // function stats
     private final FunctionStats stats;
@@ -130,27 +108,16 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable, ConsumerEv
                                 PulsarClient pulsarClient,
                                 String stateStorageServiceUrl) {
         this.instanceConfig = instanceConfig;
-        this.processingGuarantees = 
instanceConfig.getFunctionConfig().getProcessingGuarantees();
         this.fnCache = fnCache;
         this.queue = new 
LinkedBlockingDeque<>(instanceConfig.getMaxBufferedTuples());
         this.jarFile = jarFile;
         this.client = (PulsarClientImpl) pulsarClient;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.stats = new FunctionStats();
-        this.inputConsumers = Maps.newConcurrentMap();
-    }
-
-    private SubscriptionType getSubscriptionType() {
-        if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) {
-            return SubscriptionType.Failover;
-        } else {
-            if (instanceConfig.getFunctionConfig().getSubscriptionType() == 
null
-                || instanceConfig.getFunctionConfig().getSubscriptionType() == 
FunctionConfig.SubscriptionType.SHARED) {
-                return SubscriptionType.Shared;
-            } else {
-                return SubscriptionType.Failover;
-            }
-        }
+        this.processor = MessageProcessor.create(
+            client,
+            instanceConfig.getFunctionConfig(),
+            queue);
     }
 
     /**
@@ -189,13 +156,13 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable, ConsumerEv
         // start the state table
         setupStateTable();
         // start the output producer
-        startOutputProducer();
+        processor.setupOutput(outputSerDe);
         // start the input consumer
-        startInputConsumer();
+        processor.setupInput(inputSerDe);
         // start any log topic handler
         setupLogHandler();
 
-        return new JavaInstance(instanceConfig, object, clsLoader, client, 
inputConsumers);
+        return new JavaInstance(instanceConfig, object, clsLoader, client, 
processor.getInputConsumers());
     }
 
     /**
@@ -206,9 +173,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable, ConsumerEv
         try {
             javaInstance = setupJavaInstance();
             while (running) {
-                // some src topics might be put into resubscribe list because 
of processing failure
-                // so this is the chance to resubscribe to those topics.
-                resubscribeTopicsIfNeeded();
+                processor.prepareDequeueMessageFromProcessQueue();
 
                 JavaExecutionResult result;
                 InputMessage msg;
@@ -221,31 +186,9 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable, ConsumerEv
                     break;
                 }
 
-                if (ProcessingGuarantees.EFFECTIVELY_ONCE == 
processingGuarantees) {
-                    // if the messages are received from old consumers, we 
discard it since new consumer was
-                    // re-created for the correctness of effectively-once
-                    if (msg.getConsumer() != 
inputConsumers.get(msg.getTopicName())) {
-                        continue;
-                    }
-                }
-
-                if (null != outputProducer) {
-                    // before processing the message, we have a producer 
connection setup for producing results.
-                    Producer producer = null;
-                    while (null == producer) {
-                        try {
-                            producer = 
outputProducer.getProducer(msg.getTopicName(), msg.getTopicPartition());
-                        } catch (PulsarClientException e) {
-                            // `ProducerBusy` is thrown when an producer with 
same name is still connected.
-                            // This can happen when a active consumer is 
changed for a given input topic partition
-                            // so we need to wait until the old active 
consumer release the produce connection.
-                            if (!(e instanceof ProducerBusyException)) {
-                                log.error("Failed to get a producer for 
producing results computed from input topic {}",
-                                    msg.getTopicName());
-                            }
-                            TimeUnit.MILLISECONDS.sleep(500);
-                        }
-                    }
+                if (!processor.prepareProcessMessage(msg)) {
+                    // the message can't be processed by the processor at this 
moment, retry it.
+                    continue;
                 }
 
                 // state object is per function, because we need to have the 
ability to know what updates
@@ -318,30 +261,6 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable, ConsumerEv
         Thread.currentThread().setContextClassLoader(fnClassLoader);
     }
 
-    @Override
-    public void becameActive(Consumer consumer, int partitionId) {
-        // if the instance becomes active for a given topic partition,
-        // open a producer for the results computed from this topic partition.
-        if (null != outputProducer) {
-            try {
-                this.outputProducer.getProducer(consumer.getTopic(), 
partitionId);
-            } catch (PulsarClientException e) {
-                // this can be ignored, because producer can be lazily created 
when accessing it.
-                log.warn("Fail to create a producer for results computed from 
messages of topic: {}, partition: {}",
-                    consumer.getTopic(), partitionId);
-            }
-        }
-    }
-
-    @Override
-    public void becameInactive(Consumer consumer, int partitionId) {
-        if (null != outputProducer) {
-            // if I lost the ownership of a partition, close its corresponding 
topic partition.
-            // this is to allow the new active consumer be able to produce to 
the result topic.
-            this.outputProducer.closeProducer(consumer.getTopic(), 
partitionId);
-        }
-    }
-
     private void setupStateTable() throws Exception {
         if (null == stateStorageServiceUrl) {
             return;
@@ -384,226 +303,48 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable, ConsumerEv
         this.stateTable = result(storageClient.openTable(tableName));
     }
 
-    private void startOutputProducer() throws Exception {
-        if (instanceConfig.getFunctionConfig().getOutput() != null
-                && !instanceConfig.getFunctionConfig().getOutput().isEmpty()
-                && this.outputSerDe != null) {
-            log.info("Starting producer for output topic " + 
instanceConfig.getFunctionConfig().getOutput());
-
-            if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) 
{
-                this.outputProducer = new MultiConsumersOneOuputTopicProducers(
-                    client, instanceConfig.getFunctionConfig().getOutput());
-            } else {
-                this.outputProducer = new SimpleOneOuputTopicProducers(
-                    client, instanceConfig.getFunctionConfig().getOutput());
-            }
-            this.outputProducer.initialize();
-        }
-    }
-
-    private void startInputConsumer() throws Exception {
-        log.info("Consumer map {}", instanceConfig.getFunctionConfig());
-        for (Map.Entry<String, String> entry : 
instanceConfig.getFunctionConfig().getCustomSerdeInputsMap().entrySet()) {
-            ConsumerConfiguration conf = 
createConsumerConfiguration(entry.getKey());
-            this.inputConsumers.put(entry.getKey(), 
client.subscribe(entry.getKey(),
-                    
FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()), 
conf));
-        }
-        for (String topicName : 
instanceConfig.getFunctionConfig().getInputsList()) {
-            ConsumerConfiguration conf = 
createConsumerConfiguration(topicName);
-            this.inputConsumers.put(topicName, client.subscribe(topicName,
-                    
FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()), 
conf));
-        }
-    }
-
-    private ConsumerConfiguration createConsumerConfiguration(String 
topicName) {
-        log.info("Starting Consumer for topic " + topicName);
-        ConsumerConfiguration conf = new ConsumerConfiguration();
-        conf.setSubscriptionType(getSubscriptionType());
-
-        SerDe inputSerde = inputSerDe.get(topicName);
-        conf.setMessageListener((consumer, msg) -> {
-            try {
-                InputMessage message = new InputMessage();
-                message.setConsumer(consumer);
-                message.setInputSerDe(inputSerde);
-                message.setActualMessage(msg);
-                message.setTopicName(topicName);
-                queue.put(message);
-                if (processingGuarantees == 
FunctionConfig.ProcessingGuarantees.ATMOST_ONCE) {
-                    if (instanceConfig.getFunctionConfig().getAutoAck()) {
-                        consumer.acknowledgeAsync(msg);
-                    }
-                }
-            } catch (InterruptedException e) {
-                log.error("Function container {} is interrupted on enqueuing 
messages",
-                        Thread.currentThread().getId(), e);
-            }
-        });
-
-        // for failover subscription, register a consumer event listener to 
react to active consumer changes.
-        if (getSubscriptionType() == SubscriptionType.Failover) {
-            conf.setConsumerEventListener(this);
-        }
-
-        return conf;
-    }
-
     private void processResult(InputMessage msg,
                                JavaExecutionResult result,
                                long startTime, long endTime) {
          if (result.getUserException() != null) {
             log.info("Encountered user exception when processing message {}", 
msg, result.getUserException());
             stats.incrementUserExceptions(result.getUserException());
-            handleProcessException(msg.getTopicName());
+            processor.handleProcessException(msg, result.getUserException());
         } else if (result.getSystemException() != null) {
             log.info("Encountered system exception when processing message 
{}", msg, result.getSystemException());
             stats.incrementSystemExceptions(result.getSystemException());
-            handleProcessException(msg.getTopicName());
+            processor.handleProcessException(msg, result.getSystemException());
         } else {
             stats.incrementSuccessfullyProcessed(endTime - startTime);
-            if (result.getResult() != null && outputProducer != null) {
+            if (result.getResult() != null && 
instanceConfig.getFunctionConfig().getOutput() != null) {
                 byte[] output;
                 try {
                     output = outputSerDe.serialize(result.getResult());
                 } catch (Exception ex) {
                     stats.incrementSerializationExceptions();
-                    handleProcessException(msg.getTopicName());
+                    processor.handleProcessException(msg, ex);
                     return;
                 }
                 if (output != null) {
-                    try {
-                        sendOutputMessage(msg, result, output);
-                    } catch (Throwable t) {
-                        log.error("Encountered error when sending result {} 
computed from src message {}",
-                            result, msg, t);
-                        handleProcessException(msg.getTopicName());
-                    }
-                } else if (processingGuarantees != 
ProcessingGuarantees.ATMOST_ONCE) {
-                    ackMessage(msg);
+                    sendOutputMessage(msg, output);
+                } else {
+                    processor.sendOutputMessage(msg, null);
                 }
-            } else if (processingGuarantees != 
ProcessingGuarantees.ATMOST_ONCE) {
-                ackMessage(msg);
-            }
-        }
-    }
-
-    private void ackMessage(InputMessage msg) {
-        if (instanceConfig.getFunctionConfig().getAutoAck()) {
-            if (ProcessingGuarantees.EFFECTIVELY_ONCE == processingGuarantees) 
{
-                
msg.getConsumer().acknowledgeCumulativeAsync(msg.getActualMessage());
             } else {
-                msg.getConsumer().acknowledgeAsync(msg.getActualMessage());
+                // the function doesn't produce any result or the user doesn't 
want the result.
+                processor.sendOutputMessage(msg, null);
             }
         }
     }
 
     private void sendOutputMessage(InputMessage srcMsg,
-                                   JavaExecutionResult result,
                                    byte[] output) {
         MessageBuilder msgBuilder = MessageBuilder.create()
             .setContent(output)
             .setProperty("__pfn_input_topic__", srcMsg.getTopicName())
             .setProperty("__pfn_input_msg_id__", new 
String(Base64.getEncoder().encode(srcMsg.getActualMessage().getMessageId().toByteArray())));
-        if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) {
-            msgBuilder = msgBuilder
-                
.setSequenceId(Utils.getSequenceId(srcMsg.getActualMessage().getMessageId()));
-        }
-        Producer producer;
-        try {
-            producer = outputProducer.getProducer(srcMsg.getTopicName(), 
srcMsg.getTopicPartition());
-        } catch (PulsarClientException e) {
-            log.error("Failed to get a producer for producing results computed 
from input topic {}",
-                srcMsg.getTopicName());
-
-            // if we fail to get a producer, put this message back to queue 
and reprocess it.
-            queue.offerFirst(srcMsg);
-            return;
-        }
-
-        Message destMsg = msgBuilder.build();
-        producer.sendAsync(destMsg)
-            .thenAccept(messageId -> {
-                if (processingGuarantees != ProcessingGuarantees.ATMOST_ONCE) {
-                    ackMessage(srcMsg);
-                }
-            })
-            .exceptionally(cause -> {
-                log.error("Failed to send the process result {} of message {} 
to output topic {}",
-                    result, srcMsg, 
instanceConfig.getFunctionConfig().getOutput(), cause);
-                handleProcessException(srcMsg.getTopicName());
-                return null;
-            });
-    }
 
-    private void handleProcessException(String srcTopic) {
-        // if the src message is coming from a shared subscription,
-        // we don't need any special logic on handling failures, just don't 
ack.
-        // the message will be redelivered to other consumer.
-        //
-        // BUT, if the src message is coming from a failover subscription,
-        // we need to stop processing messages and recreate consumer to 
reprocess
-        // the message. otherwise we might break the correctness of 
effectively-once
-        if (getSubscriptionType() != SubscriptionType.Shared) {
-            // in this case (effectively-once), we need to close the consumer
-            // release the partition and open the consumer again. so we 
guarantee
-            // that we always process messages in order
-            //
-            // but this is in pulsar's callback, so add this to a retry list. 
so we can
-            // retry on java instance's main thread.
-            addTopicToResubscribeList(srcTopic);
-        }
-    }
-
-    private synchronized void addTopicToResubscribeList(String topicName) {
-        if (null == inputTopicsToResubscribe) {
-            inputTopicsToResubscribe = new LinkedList<>();
-        }
-        inputTopicsToResubscribe.add(topicName);
-    }
-
-    private void resubscribeTopicsIfNeeded() {
-        List<String> topicsToResubscribe;
-        synchronized (this) {
-            topicsToResubscribe = inputTopicsToResubscribe;
-            inputTopicsToResubscribe = null;
-        }
-        if (null != topicsToResubscribe) {
-            for (String topic : topicsToResubscribe) {
-                resubscribe(topic);
-            }
-        }
-    }
-
-    private void resubscribe(String srcTopic) {
-        // if we can not produce a message to output topic, then close the 
consumer of the src topic
-        // and retry to instantiate a consumer again.
-        Consumer consumer = inputConsumers.remove(srcTopic);
-        if (consumer != null) {
-            // TODO (sijie): currently we have to close the entire consumer 
for a given topic. However
-            //               ideally we should do this in a finer granularity 
- we can close consumer
-            //               on a given partition, without impact other 
partitions.
-            try {
-                consumer.close();
-            } catch (PulsarClientException e) {
-                log.error("Failed to close consumer for input topic {} when 
handling produce exceptions",
-                    srcTopic, e);
-            }
-        }
-        // subscribe to the src topic again
-        ConsumerConfiguration conf = createConsumerConfiguration(srcTopic);
-        try {
-            inputConsumers.put(
-                srcTopic,
-                client.subscribe(
-                    srcTopic,
-                    
FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()),
-                    conf
-                ));
-        } catch (PulsarClientException e) {
-            log.error("Failed to resubscribe to input topic {}. Added it to 
retry list and retry it later",
-                srcTopic, e);
-            addTopicToResubscribeList(srcTopic);
-        }
+        processor.sendOutputMessage(srcMsg, msgBuilder);
     }
 
     @Override
@@ -612,21 +353,8 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable, ConsumerEv
             return;
         }
         running = false;
-        // stop the consumer first, so no more messages are coming in
-        inputConsumers.forEach((k, v) -> {
-            try {
-                v.close();
-            } catch (PulsarClientException e) {
-                log.warn("Failed to close consumer to input topic {}", k, e);
-            }
-        });
-        inputConsumers.clear();
 
-        // kill the result producer
-        if (null != outputProducer) {
-            outputProducer.close();
-            outputProducer = null;
-        }
+        processor.close();
 
         // kill the state table
         if (null != stateTable) {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
new file mode 100644
index 0000000..740fd5c
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.processors;
+
+import java.util.concurrent.LinkedBlockingDeque;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.functions.instance.InputMessage;
+import 
org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
+import org.apache.pulsar.functions.proto.Function.FunctionConfig;
+
+/**
+ * A message processor that process messages at-most-once.
+ */
+@Slf4j
+public class AtLeastOnceProcessor extends MessageProcessorBase {
+
+    @Getter
+    private Producer producer;
+
+    AtLeastOnceProcessor(PulsarClient client,
+                         FunctionConfig functionConfig,
+                         SubscriptionType subType,
+                         LinkedBlockingDeque<InputMessage> processQueue) {
+        super(client, functionConfig, subType, processQueue);
+    }
+
+    @Override
+    protected void initializeOutputProducer(String outputTopic) throws 
Exception {
+        producer = AbstractOneOuputTopicProducers.createProducer(client, 
outputTopic);
+    }
+
+    @Override
+    public void sendOutputMessage(InputMessage inputMsg, MessageBuilder 
outputMsgBuilder) {
+        if (null == outputMsgBuilder) {
+            inputMsg.ack();
+            return;
+        }
+
+        Message outputMsg = outputMsgBuilder.build();
+        producer.sendAsync(outputMsg)
+            .thenAccept(msgId -> inputMsg.ack());
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        if (null != producer) {
+            try {
+                producer.close();
+            } catch (PulsarClientException e) {
+                log.warn("Fail to close producer for processor {}", 
functionConfig.getOutput(), e);
+            }
+        }
+    }
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
new file mode 100644
index 0000000..1713b73
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.processors;
+
+import java.util.concurrent.LinkedBlockingDeque;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.functions.instance.InputMessage;
+import 
org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
+import org.apache.pulsar.functions.proto.Function.FunctionConfig;
+
+/**
+ * A message processor that process messages at-most-once.
+ */
+@Slf4j
+class AtMostOnceProcessor extends MessageProcessorBase {
+
+    private Producer producer;
+
+    AtMostOnceProcessor(PulsarClient client,
+                        FunctionConfig functionConfig,
+                        SubscriptionType subType,
+                        LinkedBlockingDeque<InputMessage> processQueue) {
+        super(client, functionConfig, subType, processQueue);
+    }
+
+    @Override
+    protected void postReceiveMessage(InputMessage message) {
+        super.postReceiveMessage(message);
+        if (functionConfig.getAutoAck()) {
+            message.ack();
+        }
+    }
+
+    @Override
+    protected void initializeOutputProducer(String outputTopic) throws 
Exception {
+        producer = AbstractOneOuputTopicProducers.createProducer(client, 
outputTopic);
+    }
+
+    @Override
+    public void sendOutputMessage(InputMessage inputMsg, MessageBuilder 
outputMsgBuilder) {
+        if (null == outputMsgBuilder) {
+            return;
+        }
+
+        Message outputMsg = outputMsgBuilder.build();
+        producer.sendAsync(outputMsg);
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        if (null != producer) {
+            try {
+                producer.close();
+            } catch (PulsarClientException e) {
+                log.warn("Fail to close producer for processor {}", 
functionConfig.getOutput(), e);
+            }
+        }
+    }
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
new file mode 100644
index 0000000..f039cdf
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.processors;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.functions.instance.InputMessage;
+import 
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
+import org.apache.pulsar.functions.instance.producers.Producers;
+import org.apache.pulsar.functions.proto.Function.FunctionConfig;
+import org.apache.pulsar.functions.utils.FunctionConfigUtils;
+import org.apache.pulsar.functions.utils.Utils;
+
+/**
+ * A message processor that process messages effectively-once.
+ */
+@Slf4j
+class EffectivelyOnceProcessor extends MessageProcessorBase implements 
ConsumerEventListener {
+
+    private LinkedList<String> inputTopicsToResubscribe = null;
+
+    @Getter(AccessLevel.PACKAGE)
+    protected Producers outputProducer;
+
+    EffectivelyOnceProcessor(PulsarClient client,
+                             FunctionConfig functionConfig,
+                             LinkedBlockingDeque<InputMessage> processQueue) {
+        super(client, functionConfig, SubscriptionType.Failover, processQueue);
+    }
+
+    /**
+     * An effectively-once processor can only use `Failover` subscription.
+     */
+    @Override
+    protected SubscriptionType getSubscriptionType() {
+        return SubscriptionType.Failover;
+    }
+
+    @Override
+    protected ConsumerConfiguration createConsumerConfiguration(String 
topicName) {
+        ConsumerConfiguration conf = 
super.createConsumerConfiguration(topicName);
+        // for effectively-once processor, register a consumer event listener 
to react to active consumer changes.
+        conf.setConsumerEventListener(this);
+        return conf;
+    }
+
+    @Override
+    public void becameActive(Consumer<?> consumer, int partitionId) {
+        // if the instance becomes active for a given topic partition,
+        // open a producer for the results computed from this topic partition.
+        if (null != outputProducer) {
+            try {
+                this.outputProducer.getProducer(consumer.getTopic(), 
partitionId);
+            } catch (PulsarClientException e) {
+                // this can be ignored, because producer can be lazily created 
when accessing it.
+                log.warn("Fail to create a producer for results computed from 
messages of topic: {}, partition: {}",
+                    consumer.getTopic(), partitionId);
+            }
+        }
+    }
+
+    @Override
+    public void becameInactive(Consumer<?> consumer, int partitionId) {
+        if (null != outputProducer) {
+            // if I lost the ownership of a partition, close its corresponding 
topic partition.
+            // this is to allow the new active consumer be able to produce to 
the result topic.
+            this.outputProducer.closeProducer(consumer.getTopic(), 
partitionId);
+        }
+    }
+
+    @Override
+    protected void initializeOutputProducer(String outputTopic) throws 
Exception {
+        outputProducer = new MultiConsumersOneOuputTopicProducers(client, 
outputTopic);
+        outputProducer.initialize();
+    }
+
+    //
+    // Methods to process messages
+    //
+
+    @Override
+    public void prepareDequeueMessageFromProcessQueue() {
+        super.prepareDequeueMessageFromProcessQueue();
+        // some src topics might be put into resubscribe list because of 
processing failure
+        // so this is the chance to resubscribe to those topics.
+        resubscribeTopicsIfNeeded();
+    }
+
+    @Override
+    public boolean prepareProcessMessage(InputMessage msg) throws 
InterruptedException {
+        boolean prepared = super.prepareProcessMessage(msg);
+        if (prepared) {
+            // if the messages are received from old consumers, we discard it 
since new consumer was
+            // re-created for the correctness of effectively-once
+            if (msg.getConsumer() != inputConsumers.get(msg.getTopicName())) {
+                return false;
+            }
+
+            if (null != outputProducer) {
+                // before processing the message, we have a producer 
connection setup for producing results.
+                Producer producer = null;
+                while (null == producer) {
+                    try {
+                        producer = 
outputProducer.getProducer(msg.getTopicName(), msg.getTopicPartition());
+                    } catch (PulsarClientException e) {
+                        // `ProducerBusy` is thrown when an producer with same 
name is still connected.
+                        // This can happen when a active consumer is changed 
for a given input topic partition
+                        // so we need to wait until the old active consumer 
release the produce connection.
+                        if (!(e instanceof ProducerBusyException)) {
+                            log.error("Failed to get a producer for producing 
results computed from input topic {}",
+                                msg.getTopicName());
+                        }
+                        TimeUnit.MILLISECONDS.sleep(500);
+                    }
+                }
+            }
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public void sendOutputMessage(InputMessage inputMsg,
+                                  MessageBuilder outputMsgBuilder) {
+        if (null == outputMsgBuilder) {
+            inputMsg.ackCumulative();
+            return;
+        }
+
+        // assign sequence id to output message for idempotent producing
+        outputMsgBuilder = outputMsgBuilder
+            
.setSequenceId(Utils.getSequenceId(inputMsg.getActualMessage().getMessageId()));
+
+        Producer producer;
+        try {
+            producer = outputProducer.getProducer(inputMsg.getTopicName(), 
inputMsg.getTopicPartition());
+        } catch (PulsarClientException e) {
+            log.error("Failed to get a producer for producing results computed 
from input topic {}",
+                inputMsg.getTopicName());
+
+            // if we fail to get a producer, put this message back to queue 
and reprocess it.
+            processQueue.offerFirst(inputMsg);
+            return;
+        }
+
+        Message outputMsg = outputMsgBuilder.build();
+        producer.sendAsync(outputMsg)
+            .thenAccept(messageId -> inputMsg.ackCumulative())
+            .exceptionally(cause -> {
+                log.error("Failed to send the process result {} of message {} 
to output topic {}",
+                    outputMsg, inputMsg, functionConfig.getOutput(), cause);
+                handleProcessException(inputMsg.getTopicName());
+                return null;
+            });
+    }
+
+    @Override
+    public void handleProcessException(InputMessage msg, Exception cause) {
+        handleProcessException(msg.getTopicName());
+    }
+
+    private void handleProcessException(String srcTopic) {
+        // if the src message is coming from a shared subscription,
+        // we don't need any special logic on handling failures, just don't 
ack.
+        // the message will be redelivered to other consumer.
+        //
+        // BUT, if the src message is coming from a failover subscription,
+        // we need to stop processing messages and recreate consumer to 
reprocess
+        // the message. otherwise we might break the correctness of 
effectively-once
+        //
+        // in this case (effectively-once), we need to close the consumer
+        // release the partition and open the consumer again. so we guarantee
+        // that we always process messages in order
+        //
+        // but this is in pulsar's callback, so add this to a retry list. so 
we can
+        // retry on java instance's main thread.
+        addTopicToResubscribeList(srcTopic);
+    }
+
+    private synchronized void addTopicToResubscribeList(String topicName) {
+        if (null == inputTopicsToResubscribe) {
+            inputTopicsToResubscribe = new LinkedList<>();
+        }
+        inputTopicsToResubscribe.add(topicName);
+    }
+
+    private void resubscribeTopicsIfNeeded() {
+        List<String> topicsToResubscribe;
+        synchronized (this) {
+            topicsToResubscribe = inputTopicsToResubscribe;
+            inputTopicsToResubscribe = null;
+        }
+        if (null != topicsToResubscribe) {
+            for (String topic : topicsToResubscribe) {
+                resubscribe(topic);
+            }
+        }
+    }
+
+    private void resubscribe(String srcTopic) {
+        // if we can not produce a message to output topic, then close the 
consumer of the src topic
+        // and retry to instantiate a consumer again.
+        Consumer consumer = inputConsumers.remove(srcTopic);
+        if (consumer != null) {
+            // TODO (sijie): currently we have to close the entire consumer 
for a given topic. However
+            //               ideally we should do this in a finer granularity 
- we can close consumer
+            //               on a given partition, without impact other 
partitions.
+            try {
+                consumer.close();
+            } catch (PulsarClientException e) {
+                log.error("Failed to close consumer for input topic {} when 
handling produce exceptions",
+                    srcTopic, e);
+            }
+        }
+        // subscribe to the src topic again
+        ConsumerConfiguration conf = createConsumerConfiguration(srcTopic);
+        try {
+            inputConsumers.put(
+                srcTopic,
+                client.subscribe(
+                    srcTopic,
+                    FunctionConfigUtils.getFullyQualifiedName(functionConfig),
+                    conf
+                ));
+        } catch (PulsarClientException e) {
+            log.error("Failed to resubscribe to input topic {}. Added it to 
retry list and retry it later",
+                srcTopic, e);
+            addTopicToResubscribeList(srcTopic);
+        }
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        // kill the result producer
+        if (null != outputProducer) {
+            outputProducer.close();
+            outputProducer = null;
+        }
+    }
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
new file mode 100644
index 0000000..b475e9f
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.processors;
+
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingDeque;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.instance.InputMessage;
+import org.apache.pulsar.functions.proto.Function.FunctionConfig;
+import 
org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuarantees;
+
+/**
+ * A processor that processes messages, used by {@link 
org.apache.pulsar.functions.instance.JavaInstanceRunnable}.
+ */
+@Evolving
+public interface MessageProcessor extends AutoCloseable {
+
+    static MessageProcessor create(PulsarClient client,
+                                   FunctionConfig functionConfig,
+                                   LinkedBlockingDeque<InputMessage> 
processQeueue) {
+        FunctionConfig.SubscriptionType fnSubType = 
functionConfig.getSubscriptionType();
+        ProcessingGuarantees processingGuarantees = 
functionConfig.getProcessingGuarantees();
+        SubscriptionType subType;
+        if (null == fnSubType || FunctionConfig.SubscriptionType.SHARED == 
fnSubType) {
+            subType = SubscriptionType.Shared;
+        } else {
+            subType = SubscriptionType.Failover;
+        }
+
+        if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) {
+            return new EffectivelyOnceProcessor(
+                client,
+                functionConfig,
+                processQeueue);
+        } else if (processingGuarantees == ProcessingGuarantees.ATMOST_ONCE) {
+            return new AtMostOnceProcessor(
+                client,
+                functionConfig,
+                subType,
+                processQeueue);
+        } else {
+            return new AtLeastOnceProcessor(
+                client,
+                functionConfig,
+                subType,
+                processQeueue);
+        }
+    }
+
+    /**
+     * Setup the input with a provided <i>processQueue</i>. The implementation 
of this processor is responsible for
+     * setting up the input and passing the received messages from input to 
the provided <i>processQueue</i>.
+     *
+     * @param inputSerDe SerDe to deserialize messages from input.
+     */
+    void setupInput(Map<String, SerDe> inputSerDe)
+        throws Exception;
+
+    /**
+     * Return the map of input consumers.
+     *
+     * @return the map of input consumers.
+     */
+    Map<String, Consumer> getInputConsumers();
+
+    /**
+     * Setup the output with a provided <i>outputSerDe</i>. The implementation 
of this processor is responsible for
+     * setting up the output
+     *
+     * @param outputSerDe output serde.
+     * @throws Exception
+     */
+    void setupOutput(SerDe outputSerDe) throws Exception;
+
+
+    //
+    // Methods that called on processing messages
+    //
+
+    /**
+     * Method that is called before taking a message from process queue to 
process.
+     *
+     * <p>The implementation can do actions like failure recovery before 
actually taking messages out of process queue to
+     * process.
+     */
+    void prepareDequeueMessageFromProcessQueue();
+
+    /**
+     * Method that is called before processing the input message <i>msg</i>.
+     *
+     * <p>The implementation can do actions like ensuring producer is ready 
for producing results.
+     *
+     * @param msg input message.
+     * @return true if the msg can be process, otherwise false. If a processor 
can't process this message at this moment,
+     *          this message will be put back to the process queue and process 
later.
+     * @throws InterruptedException when the processor is interrupted on 
preparing processing message.
+     */
+    boolean prepareProcessMessage(InputMessage msg) throws 
InterruptedException;
+
+    /**
+     * Send the output message to the output topic. The output message is 
computed from <i>inputMsg</i>.
+     *
+     * <p>If the <i>outputMsgBuilder</i> is null, the implementation doesn't 
have to send any messages to the output.
+     * The implementation can decide to acknowledge the input message based on 
its process guarantees.
+     *
+     * @param inputMsg input message
+     * @param outputMsgBuilder output message builder. it can be null.
+     */
+    void sendOutputMessage(InputMessage inputMsg,
+                           MessageBuilder outputMsgBuilder);
+
+    /**
+     * Handle the process exception when processing input message 
<i>inputMsg</i>.
+     *
+     * @param inputMsg input message
+     * @param exception exception thrown when processing input message.
+     */
+    void handleProcessException(InputMessage inputMsg, Exception exception);
+
+
+    @Override
+    void close();
+
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
new file mode 100644
index 0000000..ddb5f79
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.processors;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingDeque;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.instance.InputMessage;
+import org.apache.pulsar.functions.proto.Function.FunctionConfig;
+import org.apache.pulsar.functions.utils.FunctionConfigUtils;
+
+/**
+ * The base implementation of {@link MessageProcessor}.
+ */
+@Slf4j
+abstract class MessageProcessorBase implements MessageProcessor {
+
+    protected final PulsarClient client;
+    protected final FunctionConfig functionConfig;
+    protected final SubscriptionType subType;
+    protected final LinkedBlockingDeque<InputMessage> processQueue;
+
+    @Getter
+    protected final Map<String, Consumer> inputConsumers;
+    protected Map<String, SerDe> inputSerDe;
+
+    protected SerDe outputSerDe;
+
+    protected MessageProcessorBase(PulsarClient client,
+                                   FunctionConfig functionConfig,
+                                   SubscriptionType subType,
+                                   LinkedBlockingDeque<InputMessage> 
processQueue) {
+        this.client = client;
+        this.functionConfig = functionConfig;
+        this.subType = subType;
+        this.processQueue = processQueue;
+        this.inputConsumers = Maps.newConcurrentMap();
+    }
+
+    //
+    // Input
+    //
+
+    @Override
+    public void setupInput(Map<String, SerDe> inputSerDe) throws Exception {
+        log.info("Setting up input with input serdes: {}", inputSerDe);
+        this.inputSerDe = inputSerDe;
+        for (Map.Entry<String, String> entry : 
functionConfig.getCustomSerdeInputsMap().entrySet()) {
+            ConsumerConfiguration conf = 
createConsumerConfiguration(entry.getKey());
+            this.inputConsumers.put(entry.getKey(), 
client.subscribe(entry.getKey(),
+                    FunctionConfigUtils.getFullyQualifiedName(functionConfig), 
conf));
+        }
+        for (String topicName : functionConfig.getInputsList()) {
+            ConsumerConfiguration conf = 
createConsumerConfiguration(topicName);
+            this.inputConsumers.put(topicName, client.subscribe(topicName,
+                    FunctionConfigUtils.getFullyQualifiedName(functionConfig), 
conf));
+        }
+    }
+
+    protected SubscriptionType getSubscriptionType() {
+        return subType;
+    }
+
+    protected ConsumerConfiguration createConsumerConfiguration(String 
topicName) {
+        log.info("Starting Consumer for topic " + topicName);
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(getSubscriptionType());
+
+        SerDe inputSerde = inputSerDe.get(topicName);
+        conf.setMessageListener((consumer, msg) -> {
+            try {
+                InputMessage message = new InputMessage();
+                message.setConsumer(consumer);
+                message.setInputSerDe(inputSerde);
+                message.setActualMessage(msg);
+                message.setTopicName(topicName);
+                processQueue.put(message);
+                postReceiveMessage(message);
+            } catch (InterruptedException e) {
+                log.error("Function container {} is interrupted on enqueuing 
messages",
+                        Thread.currentThread().getId(), e);
+            }
+        });
+        return conf;
+    }
+
+    /**
+     * Method called when a message is received from input after being put 
into the process queue.
+     *
+     * <p>The processor implementation can make a decision to process the 
message based on its processing guarantees.
+     * for example, an at-most-once processor can ack the message immediately.
+     *
+     * @param message input message.
+     */
+    protected void postReceiveMessage(InputMessage message) {}
+
+    //
+    // Output
+    //
+
+    @Override
+    public void setupOutput(SerDe outputSerDe) throws Exception {
+        this.outputSerDe = outputSerDe;
+
+        String outputTopic = functionConfig.getOutput();
+        if (outputTopic != null
+                && !functionConfig.getOutput().isEmpty()
+                && outputSerDe != null) {
+            log.info("Starting producer for output topic {}", outputTopic);
+            initializeOutputProducer(outputTopic);
+        }
+    }
+
+    protected abstract void initializeOutputProducer(String outputTopic) 
throws Exception;
+
+    //
+    // Process
+    //
+
+    @Override
+    public void prepareDequeueMessageFromProcessQueue() {}
+
+    @Override
+    public boolean prepareProcessMessage(InputMessage msg) throws 
InterruptedException {
+        return true;
+    }
+
+    @Override
+    public void handleProcessException(InputMessage msg, Exception cause) {}
+
+    @Override
+    public void close() {
+        // stop the consumer first, so no more messages are coming in
+        inputConsumers.forEach((k, v) -> {
+            try {
+                v.close();
+            } catch (PulsarClientException e) {
+                log.warn("Failed to close consumer to input topic {}", k, e);
+            }
+        });
+        inputConsumers.clear();
+    }
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
index 723529d..bba9a7f 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
@@ -28,7 +28,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.instance.FunctionResultRouter;
 
-abstract class AbstractOneOuputTopicProducers implements Producers {
+public abstract class AbstractOneOuputTopicProducers implements Producers {
 
     protected final PulsarClient client;
     protected final String outputTopic;
@@ -42,7 +42,7 @@ abstract class AbstractOneOuputTopicProducers implements 
Producers {
         this.conf = newProducerConfiguration();
     }
 
-    protected ProducerConfiguration newProducerConfiguration() {
+    static ProducerConfiguration newProducerConfiguration() {
         ProducerConfiguration conf = new ProducerConfiguration();
         conf.setBlockIfQueueFull(true);
         conf.setBatchingEnabled(true);
@@ -58,11 +58,21 @@ abstract class AbstractOneOuputTopicProducers implements 
Producers {
 
     protected Producer createProducer(String topic)
             throws PulsarClientException {
+        return createProducer(client, topic);
+    }
+
+    public static Producer createProducer(PulsarClient client, String topic)
+            throws PulsarClientException {
         return client.createProducer(topic, newProducerConfiguration());
     }
 
     protected Producer createProducer(String topic, String producerName)
             throws PulsarClientException {
+        return createProducer(client, topic, producerName);
+    }
+
+    public static Producer createProducer(PulsarClient client, String topic, 
String producerName)
+        throws PulsarClientException {
         ProducerConfiguration newConf = newProducerConfiguration();
         newConf.setProducerName(producerName);
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneOuputTopicProducers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneOuputTopicProducers.java
deleted file mode 100644
index 2fee37a..0000000
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneOuputTopicProducers.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance.producers;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-
-@Slf4j
-public class SimpleOneOuputTopicProducers extends 
AbstractOneOuputTopicProducers {
-
-    @Getter
-    private Producer producer;
-
-    public SimpleOneOuputTopicProducers(PulsarClient client, String 
outputTopic) throws PulsarClientException {
-        super(client, outputTopic);
-    }
-
-    @Override
-    public void initialize() throws PulsarClientException {
-        producer = createProducer(outputTopic);
-    }
-
-    @Override
-    public Producer getProducer(String srcTopic, int srcTopicPartition) {
-        checkNotNull(producer, "Producer is not initialized yet");
-        return producer;
-    }
-
-    @Override
-    public void closeProducer(String srcTopicName, int srcTopicPartition) {
-        // no-op
-    }
-
-    @Override
-    public void close() {
-        try {
-            producer.close();
-        } catch (PulsarClientException e) {
-            log.warn("Fail to close producer of topic {}", outputTopic, e);
-        }
-    }
-}
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
index 20644d2..8e099a6 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
@@ -76,12 +76,12 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
+import org.apache.pulsar.functions.instance.processors.AtLeastOnceProcessor;
+import org.apache.pulsar.functions.instance.processors.MessageProcessor;
 import org.apache.pulsar.functions.proto.Function.FunctionConfig;
 import 
org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuarantees;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
-import org.apache.pulsar.functions.instance.producers.Producers;
-import 
org.apache.pulsar.functions.instance.producers.SimpleOneOuputTopicProducers;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.Utils;
 import org.powermock.api.mockito.PowerMockito;
@@ -437,15 +437,15 @@ public class JavaInstanceRunnableProcessTest {
         }
 
         // 3. verify producers and consumers are setup
-        Producers producers = runnable.getOutputProducer();
-        assertTrue(producers instanceof SimpleOneOuputTopicProducers);
+        MessageProcessor processor = runnable.getProcessor();
+        assertTrue(processor instanceof AtLeastOnceProcessor);
         assertSame(mockProducers.get(Pair.of(
             fnConfig.getOutput(),
             null
-        )).getProducer(), ((SimpleOneOuputTopicProducers) 
producers).getProducer());
+        )).getProducer(), ((AtLeastOnceProcessor) processor).getProducer());
 
-        assertEquals(mockConsumers.size(), 
runnable.getInputConsumers().size());
-        for (Map.Entry<String, Consumer> consumerEntry : 
runnable.getInputConsumers().entrySet()) {
+        assertEquals(mockConsumers.size(), 
processor.getInputConsumers().size());
+        for (Map.Entry<String, Consumer> consumerEntry : 
processor.getInputConsumers().entrySet()) {
             String topic = consumerEntry.getKey();
 
             Consumer mockConsumer = mockConsumers.get(Pair.of(
@@ -464,7 +464,7 @@ public class JavaInstanceRunnableProcessTest {
         for (ConsumerInstance consumer : mockConsumers.values()) {
             verify(consumer.getConsumer(), times(1)).close();
         }
-        assertTrue(runnable.getInputConsumers().isEmpty());
+        assertTrue(processor.getInputConsumers().isEmpty());
 
         for (ProducerInstance producer : mockProducers.values()) {
             verify(producer.getProducer(), times(1)).close();
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneOutputTopicProducersTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneOutputTopicProducersTest.java
deleted file mode 100644
index 67337a1..0000000
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneOutputTopicProducersTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance.producers;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertSame;
-
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-/**
- * Unit test of {@link SimpleOneOuputTopicProducers}.
- */
-public class SimpleOneOutputTopicProducersTest {
-
-    private static final String TEST_OUTPUT_TOPIC = "test-output-topic";
-
-    private PulsarClient mockClient;
-    private Producer mockProducer;
-    private SimpleOneOuputTopicProducers producers;
-
-    @BeforeMethod
-    public void setup() throws Exception {
-        this.mockClient = mock(PulsarClient.class);
-        this.mockProducer = mock(Producer.class);
-
-        when(mockClient.createProducer(anyString(), 
any(ProducerConfiguration.class)))
-            .thenReturn(mockProducer);
-
-        this.producers = new SimpleOneOuputTopicProducers(mockClient, 
TEST_OUTPUT_TOPIC);
-    }
-
-    @Test
-    public void testInitializeClose() throws Exception {
-        this.producers.initialize();
-
-        verify(mockClient, times(1))
-            .createProducer(eq(TEST_OUTPUT_TOPIC), 
any(ProducerConfiguration.class));
-
-        this.producers.close();
-
-        verify(mockProducer, times(1)).close();
-    }
-
-    @Test(expectedExceptions = NullPointerException.class)
-    public void testGetProducerWithoutInitialization() throws Exception {
-        this.producers.getProducer("test-src-topic", 0);
-    }
-
-    @Test
-    public void testGetAndCloseProducer() throws Exception {
-        this.producers.initialize();
-
-        verify(mockClient, times(1))
-            .createProducer(eq(TEST_OUTPUT_TOPIC), 
any(ProducerConfiguration.class));
-
-        assertSame(mockProducer, this.producers.getProducer("test-src-topic", 
0));
-
-        verify(mockClient, times(1))
-            .createProducer(eq(TEST_OUTPUT_TOPIC), 
any(ProducerConfiguration.class));
-
-        producers.closeProducer("test-src-topic", 0);
-
-        assertSame(mockProducer, producers.getProducer());
-
-        verify(mockProducer, times(0)).close();
-    }
-
-}

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to