This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c3c8013  Refactor functions to use Sink interface (#1708)
c3c8013 is described below

commit c3c8013d701d986970ec9baef142c338233f2f89
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Wed May 2 14:43:10 2018 -0700

    Refactor functions to use Sink interface (#1708)
    
    * use pulsar sink
    
    * removing exception from interface
    
    * addressing various comments
    
    * changing MultiConsumersOneOuputTopicProducers to use String for partition 
id
---
 .../java/org/apache/pulsar/connect/core/Sink.java  |   2 +-
 .../functions/instance/JavaInstanceRunnable.java   | 212 ++++++++++++------
 .../instance/processors/AtLeastOnceProcessor.java  |  77 -------
 .../instance/processors/AtMostOnceProcessor.java   |  79 -------
 .../processors/EffectivelyOnceProcessor.java       | 120 ----------
 .../instance/processors/MessageProcessor.java      | 107 ---------
 .../instance/processors/MessageProcessorBase.java  | 155 -------------
 .../MultiConsumersOneOuputTopicProducers.java      |  20 +-
 .../functions/instance/producers/Producers.java    |   4 +-
 .../pulsar/functions/sink/DefaultRuntimeSink.java  |   1 -
 .../apache/pulsar/functions/sink/PulsarSink.java   | 247 ++++++++++++++++++++-
 .../PulsarSinkConfig.java}                         |  19 +-
 .../apache/pulsar/functions/sink/RuntimeSink.java  |   5 +-
 .../pulsar/functions/source/PulsarSource.java      |  30 +--
 .../{PulsarConfig.java => PulsarSourceConfig.java} |   9 +-
 .../src/main/python/python_instance_main.py        |  16 +-
 .../instance/JavaInstanceRunnableTest.java         |  81 -------
 .../MultiConsumersOneOutputTopicProducersTest.java |   2 +-
 .../functions/sink/DefaultRuntimeSinkTest.java     |   6 +-
 .../PulsarSinkTest.java}                           | 136 +++++++++---
 .../pulsar/functions/source/PulsarSourceTest.java  |  90 +++++++-
 .../pulsar/functions/runtime/JavaInstanceMain.java |  40 ++--
 .../pulsar/functions/runtime/ProcessRuntime.java   |  35 ++-
 .../functions/runtime/ProcessRuntimeTest.java      |  20 +-
 24 files changed, 683 insertions(+), 830 deletions(-)

diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
index ca569e7..cd2d63d 100644
--- a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
+++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
@@ -46,4 +46,4 @@ public interface Sink<T> extends AutoCloseable {
      * @return Completable future fo async publish request
      */
     CompletableFuture<Void> write(T value);
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index ca6414d..2e8037b 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -22,15 +22,13 @@ package org.apache.pulsar.functions.instance;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
 
+import com.google.gson.Gson;
 import io.netty.buffer.ByteBuf;
 
 import java.util.Arrays;
-import java.util.Base64;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import lombok.AccessLevel;
 import lombok.Getter;
@@ -49,18 +47,23 @@ import org.apache.logging.log4j.ThreadContext;
 import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.core.config.Configuration;
 import org.apache.logging.log4j.core.config.LoggerConfig;
-import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.connect.core.Record;
+import org.apache.pulsar.connect.core.Source;
 import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
-import org.apache.pulsar.functions.instance.processors.MessageProcessor;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.sink.PulsarSink;
+import org.apache.pulsar.functions.sink.PulsarSinkConfig;
+import org.apache.pulsar.functions.sink.RuntimeSink;
 import org.apache.pulsar.functions.source.PulsarRecord;
+import org.apache.pulsar.functions.source.PulsarSource;
+import org.apache.pulsar.functions.source.PulsarSourceConfig;
+import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
-import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.Reflections;
@@ -93,18 +96,14 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
     @Getter
     private Exception deathException;
 
-    @Getter(AccessLevel.PACKAGE)
-    private SerDe outputSerDe;
-
-    @Getter(AccessLevel.PACKAGE)
-    // processor
-    private final MessageProcessor processor;
-
     // function stats
     private final FunctionStats stats;
 
     private Record currentRecord;
 
+    private Source source;
+    private RuntimeSink sink;
+
     public JavaInstanceRunnable(InstanceConfig instanceConfig,
                                 FunctionCacheManager fnCache,
                                 String jarFile,
@@ -116,9 +115,6 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
         this.client = (PulsarClientImpl) pulsarClient;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.stats = new FunctionStats();
-        this.processor = MessageProcessor.create(
-                client,
-                instanceConfig.getFunctionDetails());
     }
 
     /**
@@ -151,19 +147,16 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
             typeArgs = 
TypeResolver.resolveRawArguments(java.util.function.Function.class, 
function.getClass());
         }
 
-        // setup serde
-        setupSerDe(typeArgs, clsLoader);
-
         // start the state table
         setupStateTable();
         // start the output producer
-        processor.setupOutput(outputSerDe);
+        setupOutput(typeArgs[1]);
         // start the input consumer
-        processor.setupInput(typeArgs[0]);
+        setupInput(typeArgs[0]);
         // start any log topic handler
         setupLogHandler();
 
-        return new JavaInstance(instanceConfig, object, clsLoader, client, 
processor.getSource());
+        return new JavaInstance(instanceConfig, object, clsLoader, client, 
this.source);
     }
 
     /**
@@ -175,9 +168,14 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
             javaInstance = setupJavaInstance();
             while (true) {
 
-                currentRecord = processor.recieveMessage();
+                currentRecord = readInput();
 
-                processor.postReceiveMessage(currentRecord);
+                if 
(instanceConfig.getFunctionDetails().getProcessingGuarantees() == 
org.apache.pulsar.functions
+                        .proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
+                    if (instanceConfig.getFunctionDetails().getAutoAck()) {
+                        currentRecord.ack();
+                    }
+                }
 
                 // state object is per function, because we need to have the 
ability to know what updates
                 // are made in this function and ensure we only acknowledge 
after the state is persisted.
@@ -310,44 +308,47 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
             throw result.getSystemException();
         } else {
             stats.incrementSuccessfullyProcessed(endTime - startTime);
-            if (result.getResult() != null && 
instanceConfig.getFunctionDetails().getSink().getTopic() != null) {
-                byte[] output;
-                try {
-                    output = outputSerDe.serialize(result.getResult());
-                } catch (Exception ex) {
-                    stats.incrementSerializationExceptions();
-                    throw ex;
-                }
-                if (output != null) {
-                    sendOutputMessage(srcRecord, output);
-                } else {
-                    processor.sendOutputMessage(srcRecord, null);
-                }
+            if (result.getResult() != null) {
+                sendOutputMessage(srcRecord, result.getResult());
             } else {
                 // the function doesn't produce any result or the user doesn't 
want the result.
-                processor.sendOutputMessage(srcRecord, null);
+                srcRecord.ack();
             }
         }
     }
 
-    private void sendOutputMessage(Record srcRecord,
-                                   byte[] output) throws Exception {
-
-        MessageBuilder msgBuilder = MessageBuilder.create();
-        if (srcRecord instanceof PulsarRecord) {
-            PulsarRecord pulsarMessage = (PulsarRecord) srcRecord;
-            msgBuilder
-                    .setContent(output)
-                    .setProperty("__pfn_input_topic__", 
pulsarMessage.getTopicName())
-                    .setProperty("__pfn_input_msg_id__", new 
String(Base64.getEncoder().encode(pulsarMessage.getMessageId().toByteArray())));
+    private void sendOutputMessage(Record srcRecord, Object output) {
+        try {
+            this.sink.write(srcRecord, output);
+        } catch (Exception e) {
+            log.info("Encountered exception in sink write: ", e);
+            throw new RuntimeException(e);
         }
+    }
 
-        processor.sendOutputMessage(srcRecord, msgBuilder);
+    private Record readInput() {
+        try {
+            return this.source.read();
+        } catch (Exception e) {
+            log.info("Encountered exception in source write: ", e);
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     public void close() {
-        processor.close();
+        try {
+            source.close();
+        } catch (Exception e) {
+            log.error("Failed to close source {}", 
instanceConfig.getFunctionDetails().getSource().getClassName(), e);
+        }
+
+        try {
+            sink.close();
+        } catch (Exception e) {
+            log.error("Failed to close sink {}", 
instanceConfig.getFunctionDetails().getSource().getClassName(), e);
+        }
+
         if (null != javaInstance) {
             javaInstance.close();
         }
@@ -415,27 +416,6 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
         bldr.putMetrics(metricName, digest);
     }
 
-    private void setupSerDe(Class<?>[] typeArgs, ClassLoader clsLoader) {
-        if (!Void.class.equals(typeArgs[1])) { // return type is not 
`Void.class`
-            if 
(instanceConfig.getFunctionDetails().getSink().getSerDeClassName() == null
-                    || 
instanceConfig.getFunctionDetails().getSink().getSerDeClassName().isEmpty()
-                    || 
instanceConfig.getFunctionDetails().getSink().getSerDeClassName().equals(DefaultSerDe.class.getName()))
 {
-                outputSerDe = 
InstanceUtils.initializeDefaultSerDe(typeArgs[1]);
-            } else {
-                this.outputSerDe = 
InstanceUtils.initializeSerDe(instanceConfig.getFunctionDetails().getSink().getSerDeClassName(),
 clsLoader, typeArgs[1]);
-            }
-            Class<?>[] outputSerdeTypeArgs = 
TypeResolver.resolveRawArguments(SerDe.class, outputSerDe.getClass());
-            if 
(outputSerDe.getClass().getName().equals(DefaultSerDe.class.getName())) {
-                if (!DefaultSerDe.IsSupportedType(typeArgs[1])) {
-                    throw new RuntimeException("Default Serde does not support 
type " + typeArgs[1]);
-                }
-            } else if (!outputSerdeTypeArgs[0].isAssignableFrom(typeArgs[1])) {
-                throw new RuntimeException("Inconsistent types found between 
function output type and output serde type: "
-                        + " function type = " + typeArgs[1] + "should be 
assignable from " + outputSerdeTypeArgs[0]);
-            }
-        }
-    }
-
     private void setupLogHandler() {
         if (instanceConfig.getFunctionDetails().getLogTopic() != null &&
                 !instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) {
@@ -465,4 +445,92 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
         }
         config.getRootLogger().removeAppender(logAppender.getName());
     }
+
+    public void setupInput(Class<?> inputType) throws Exception {
+
+        SourceSpec sourceSpec = 
this.instanceConfig.getFunctionDetails().getSource();
+        Object object;
+        if (sourceSpec.getClassName().equals(PulsarSource.class.getName())) {
+
+            PulsarSourceConfig pulsarSourceConfig = new PulsarSourceConfig();
+            
pulsarSourceConfig.setTopicSerdeClassNameMap(sourceSpec.getTopicsToSerDeClassNameMap());
+            pulsarSourceConfig.setSubscriptionName(
+                    
FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
+            pulsarSourceConfig.setProcessingGuarantees(
+                    FunctionConfig.ProcessingGuarantees.valueOf(
+                            
this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
+            pulsarSourceConfig.setSubscriptionType(
+                    
FunctionConfig.SubscriptionType.valueOf(sourceSpec.getSubscriptionType().name()));
+            pulsarSourceConfig.setTypeClassName(inputType.getName());
+
+            Object[] params = {this.client, pulsarSourceConfig};
+            Class[] paramTypes = {PulsarClient.class, 
PulsarSourceConfig.class};
+
+            object = Reflections.createInstance(
+                    sourceSpec.getClassName(),
+                    PulsarSource.class.getClassLoader(), params, paramTypes);
+
+        } else {
+            object = Reflections.createInstance(
+                    sourceSpec.getClassName(),
+                    Thread.currentThread().getContextClassLoader());
+        }
+
+        Class<?>[] typeArgs;
+        if (object instanceof Source) {
+            typeArgs = TypeResolver.resolveRawArguments(Source.class, 
object.getClass());
+            assert typeArgs.length > 0;
+        } else {
+            throw new RuntimeException("Source does not implement correct 
interface");
+        }
+        this.source = (Source) object;
+
+        try {
+            this.source.open(new Gson().fromJson(sourceSpec.getConfigs(), 
Map.class));
+        } catch (Exception e) {
+            log.info("Error occurred executing open for source: {}",
+                    sourceSpec.getClassName(), e);
+        }
+    }
+
+    public void setupOutput(Class<?> outputType) throws Exception {
+
+        SinkSpec sinkSpec = this.instanceConfig.getFunctionDetails().getSink();
+        Object object;
+        if (sinkSpec.getClassName().equals(PulsarSink.class.getName())) {
+            PulsarSinkConfig pulsarSinkConfig = new PulsarSinkConfig();
+            
pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.valueOf(
+                    
this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
+            pulsarSinkConfig.setTopic(sinkSpec.getTopic());
+            pulsarSinkConfig.setSerDeClassName(sinkSpec.getSerDeClassName());
+            pulsarSinkConfig.setTypeClassName(outputType.getName());
+
+            Object[] params = {this.client, pulsarSinkConfig};
+            Class[] paramTypes = {PulsarClient.class, PulsarSinkConfig.class};
+
+            object = Reflections.createInstance(
+                    sinkSpec.getClassName(),
+                    PulsarSink.class.getClassLoader(), params, paramTypes);
+        } else {
+            object = Reflections.createInstance(
+                    sinkSpec.getClassName(),
+                    Thread.currentThread().getContextClassLoader());
+        }
+
+        Class<?>[] typeArgs;
+        if (object instanceof RuntimeSink) {
+            typeArgs = TypeResolver.resolveRawArguments(RuntimeSink.class, 
object.getClass());
+            assert typeArgs.length > 0;
+        } else {
+            throw new RuntimeException("Sink does not implement correct 
interface");
+        }
+        this.sink = (RuntimeSink) object;
+
+        try {
+            this.sink.open(new Gson().fromJson(sinkSpec.getConfigs(), 
Map.class));
+        } catch (Exception e) {
+            log.info("Error occurred executing open for sink: {}",
+                    sinkSpec.getClassName(), e);
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
deleted file mode 100644
index 8e149b0..0000000
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance.processors;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageBuilder;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.connect.core.Record;
-import 
org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-
-/**
- * A message processor that process messages at-most-once.
- */
-@Slf4j
-public class AtLeastOnceProcessor extends MessageProcessorBase {
-
-    @Getter
-    private Producer<byte[]> producer;
-
-    AtLeastOnceProcessor(PulsarClient client,
-                         FunctionDetails functionDetails) {
-        super(client, functionDetails);
-    }
-
-    @Override
-    protected void initializeOutputProducer(String outputTopic) throws 
Exception {
-        producer = AbstractOneOuputTopicProducers.createProducer(client, 
outputTopic);
-    }
-
-    @Override
-    public void sendOutputMessage(Record srcRecord, MessageBuilder 
outputMsgBuilder) {
-        if (null == outputMsgBuilder || null == producer) {
-            srcRecord.ack();
-            return;
-        }
-
-        Message<byte[]> outputMsg = outputMsgBuilder.build();
-        producer.sendAsync(outputMsg)
-            .thenAccept(msgId -> {
-                srcRecord.ack();
-            });
-    }
-
-    @Override
-    public void close() {
-        super.close();
-        if (null != producer) {
-            try {
-                producer.close();
-            } catch (PulsarClientException e) {
-                log.warn("Fail to close producer for processor {}", 
functionDetails.getSink().getTopic(), e);
-            }
-        }
-    }
-}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
deleted file mode 100644
index 930161e..0000000
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance.processors;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageBuilder;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.connect.core.Record;
-import 
org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-
-/**
- * A message processor that process messages at-most-once.
- */
-@Slf4j
-class AtMostOnceProcessor extends MessageProcessorBase {
-
-    private Producer<byte[]> producer;
-
-    AtMostOnceProcessor(PulsarClient client,
-                        FunctionDetails functionDetails) {
-        super(client, functionDetails);
-    }
-
-    @Override
-    public void postReceiveMessage(Record record) {
-        super.postReceiveMessage(record);
-        if (functionDetails.getAutoAck()) {
-            record.ack();
-        }
-    }
-
-    @Override
-    protected void initializeOutputProducer(String outputTopic) throws 
Exception {
-        producer = AbstractOneOuputTopicProducers.createProducer(client, 
outputTopic);
-    }
-
-    @Override
-    public void sendOutputMessage(Record srcRecord, MessageBuilder 
outputMsgBuilder) {
-        if (null == outputMsgBuilder) {
-            return;
-        }
-
-        Message<byte[]> outputMsg = outputMsgBuilder.build();
-        producer.sendAsync(outputMsg);
-    }
-
-    @Override
-    public void close() {
-        super.close();
-        if (null != producer) {
-            try {
-                producer.close();
-            } catch (PulsarClientException e) {
-                log.warn("Fail to close producer for processor {}", 
functionDetails.getSink().getTopic(), e);
-            }
-        }
-    }
-}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
deleted file mode 100644
index 06a463b..0000000
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance.processors;
-
-import lombok.AccessLevel;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerEventListener;
-import org.apache.pulsar.client.api.MessageBuilder;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.connect.core.Record;
-import org.apache.pulsar.functions.source.PulsarRecord;
-import 
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
-import org.apache.pulsar.functions.instance.producers.Producers;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-
-/**
- * A message processor that process messages effectively-once.
- */
-@Slf4j
-class EffectivelyOnceProcessor extends MessageProcessorBase implements 
ConsumerEventListener {
-
-    @Getter(AccessLevel.PACKAGE)
-    protected Producers outputProducer;
-
-    EffectivelyOnceProcessor(PulsarClient client,
-                             FunctionDetails functionDetails) {
-        super(client, functionDetails);
-    }
-
-    @Override
-    public void becameActive(Consumer<?> consumer, int partitionId) {
-        // if the instance becomes active for a given topic partition,
-        // open a producer for the results computed from this topic partition.
-        if (null != outputProducer) {
-            try {
-                this.outputProducer.getProducer(consumer.getTopic(), 
partitionId);
-            } catch (PulsarClientException e) {
-                // this can be ignored, because producer can be lazily created 
when accessing it.
-                log.warn("Fail to create a producer for results computed from 
messages of topic: {}, partition: {}",
-                    consumer.getTopic(), partitionId);
-            }
-        }
-    }
-
-    @Override
-    public void becameInactive(Consumer<?> consumer, int partitionId) {
-        if (null != outputProducer) {
-            // if I lost the ownership of a partition, close its corresponding 
topic partition.
-            // this is to allow the new active consumer be able to produce to 
the result topic.
-            this.outputProducer.closeProducer(consumer.getTopic(), 
partitionId);
-        }
-    }
-
-    @Override
-    protected void initializeOutputProducer(String outputTopic) throws 
Exception {
-        outputProducer = new MultiConsumersOneOuputTopicProducers(client, 
outputTopic);
-        outputProducer.initialize();
-    }
-
-    //
-    // Methods to process messages
-    //
-
-    @Override
-    public void sendOutputMessage(Record srcRecord,
-                                  MessageBuilder outputMsgBuilder) throws 
Exception {
-        if (null == outputMsgBuilder) {
-            srcRecord.ack();
-            return;
-        }
-
-        // assign sequence id to output message for idempotent producing
-        outputMsgBuilder = outputMsgBuilder
-            .setSequenceId(srcRecord.getRecordSequence());
-
-        // currently on PulsarRecord
-        if (srcRecord instanceof PulsarRecord) {
-            PulsarRecord pulsarMessage = (PulsarRecord) srcRecord;
-            Producer producer = 
outputProducer.getProducer(pulsarMessage.getTopicName(),
-                    Integer.parseInt(srcRecord.getPartitionId()));
-
-            org.apache.pulsar.client.api.Message outputMsg = 
outputMsgBuilder.build();
-            producer.sendAsync(outputMsg)
-                    .thenAccept(messageId -> srcRecord.ack())
-                    .join();
-        }
-    }
-
-    @Override
-    public void close() {
-        super.close();
-        // kill the result producer
-        if (null != outputProducer) {
-            outputProducer.close();
-            outputProducer = null;
-        }
-    }
-}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
deleted file mode 100644
index 0dcf12c..0000000
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance.processors;
-
-import java.util.Map;
-
-import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
-import org.apache.pulsar.client.api.MessageBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.connect.core.Record;
-import org.apache.pulsar.connect.core.Source;
-import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
-
-/**
- * A processor that processes messages, used by {@link 
org.apache.pulsar.functions.instance.JavaInstanceRunnable}.
- */
-@Evolving
-public interface MessageProcessor extends AutoCloseable {
-
-    static MessageProcessor create(PulsarClient client,
-                                   FunctionDetails functionDetails) {
-        ProcessingGuarantees processingGuarantees = 
functionDetails.getProcessingGuarantees();
-
-        if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) {
-            return new EffectivelyOnceProcessor(
-                client,
-                functionDetails);
-        } else if (processingGuarantees == ProcessingGuarantees.ATMOST_ONCE) {
-            return new AtMostOnceProcessor(
-                client,
-                functionDetails);
-        } else {
-            return new AtLeastOnceProcessor(
-                client,
-                functionDetails);
-        }
-    }
-
-    void postReceiveMessage(Record record);
-
-    /**
-     * Setup the source. Implementation is responsible for initializing the 
source
-     * and for calling open method for source
-     * @param inputType the input type of the function
-     * @throws Exception
-     */
-    void setupInput(Class<?> inputType)
-        throws Exception;
-
-    /**
-     * Return the source.
-     *
-     * @return the source.
-     */
-    Source getSource();
-
-    /**
-     * Setup the output with a provided <i>outputSerDe</i>. The implementation 
of this processor is responsible for
-     * setting up the output
-     *
-     * @param outputSerDe output serde.
-     * @throws Exception
-     */
-    void setupOutput(SerDe outputSerDe) throws Exception;
-
-    /**
-     * Send the output message to the output topic. The output message is 
computed from <i>inputMsg</i>.
-     *
-     * <p>If the <i>outputMsgBuilder</i> is null, the implementation doesn't 
have to send any messages to the output.
-     * The implementation can decide to acknowledge the input message based on 
its process guarantees.
-     *
-     * @param srcRecord record from source
-     * @param outputMsgBuilder output message builder. it can be null.
-     */
-    void sendOutputMessage(Record srcRecord,
-                           MessageBuilder outputMsgBuilder) throws 
PulsarClientException, Exception;
-
-    /**
-     * Get the next message to process
-     * @return the next input message
-     * @throws Exception
-     */
-    Record recieveMessage() throws Exception;
-
-    @Override
-    void close();
-
-}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
deleted file mode 100644
index 33b699a..0000000
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance.processors;
-
-import java.util.Map;
-
-import com.google.gson.Gson;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
-import net.jodah.typetools.TypeResolver;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.connect.core.Record;
-import org.apache.pulsar.connect.core.Source;
-import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.source.PulsarConfig;
-import org.apache.pulsar.functions.source.PulsarSource;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
-import org.apache.pulsar.functions.utils.Reflections;
-
-/**
- * The base implementation of {@link MessageProcessor}.
- */
-@Slf4j
-abstract class MessageProcessorBase implements MessageProcessor {
-
-    protected final PulsarClient client;
-    protected final FunctionDetails functionDetails;
-
-    @Getter
-    protected Source source;
-
-
-    protected MessageProcessorBase(PulsarClient client,
-                                   FunctionDetails functionDetails) {
-        this.client = client;
-        this.functionDetails = functionDetails;
-    }
-
-    //
-    // Input
-    //
-
-    @Override
-    public void setupInput(Class<?> inputType) throws Exception {
-
-        org.apache.pulsar.functions.proto.Function.SourceSpec sourceSpec = 
this.functionDetails.getSource();
-        Object object;
-        if (sourceSpec.getClassName().equals(PulsarSource.class.getName())) {
-
-            PulsarConfig pulsarConfig = new PulsarConfig();
-            
pulsarConfig.setTopicSerdeClassNameMap(this.functionDetails.getSource().getTopicsToSerDeClassNameMap());
-            
pulsarConfig.setSubscriptionName(FunctionDetailsUtils.getFullyQualifiedName(this.functionDetails));
-            pulsarConfig.setProcessingGuarantees(
-                    
FunctionConfig.ProcessingGuarantees.valueOf(this.functionDetails.getProcessingGuarantees().name()));
-            pulsarConfig.setSubscriptionType(
-                    
FunctionConfig.SubscriptionType.valueOf(this.functionDetails.getSource().getSubscriptionType().name()));
-            pulsarConfig.setTypeClassName(inputType.getName());
-
-            Object[] params = {this.client, pulsarConfig};
-            Class[] paramTypes = {PulsarClient.class, PulsarConfig.class};
-
-            object = Reflections.createInstance(
-                    sourceSpec.getClassName(),
-                    PulsarSource.class.getClassLoader(), params, paramTypes);
-
-        } else {
-            object = Reflections.createInstance(
-                    sourceSpec.getClassName(),
-                    Thread.currentThread().getContextClassLoader());
-        }
-
-        Class<?>[] typeArgs;
-        if (object instanceof Source) {
-            typeArgs = TypeResolver.resolveRawArguments(Source.class, 
object.getClass());
-            assert typeArgs.length > 0;
-        } else {
-            throw new RuntimeException("Source does not implement correct 
interface");
-        }
-        this.source = (Source) object;
-
-        try {
-            this.source.open(new Gson().fromJson(sourceSpec.getConfigs(), 
Map.class));
-        } catch (Exception e) {
-            log.info("Error occurred executing open for source: {}",
-                    this.functionDetails.getSource().getClassName(), e);
-        }
-
-    }
-
-    public Record recieveMessage() throws Exception {
-        return this.source.read();
-    }
-
-    /**
-     * Method called when a message is received from input after being put 
into the process queue.
-     *
-     * <p>The processor implementation can make a decision to process the 
message based on its processing guarantees.
-     * for example, an at-most-once processor can ack the message immediately.
-     *
-     * @param record input message.
-     */
-    @Override
-    public void postReceiveMessage(Record record) {}
-
-    //
-    // Output
-    //
-
-    @Override
-    public void setupOutput(SerDe outputSerDe) throws Exception {
-        String outputTopic = functionDetails.getSink().getTopic();
-        if (outputTopic != null
-                && !outputTopic.isEmpty()
-                && outputSerDe != null) {
-            log.info("Starting producer for output topic {}", outputTopic);
-            initializeOutputProducer(outputTopic);
-        }
-    }
-
-    protected abstract void initializeOutputProducer(String outputTopic) 
throws Exception;
-
-    //
-    // Process
-    //
-
-    @Override
-    public void close() {
-
-        try {
-            this.source.close();
-        } catch (Exception e) {
-            log.warn("Failed to close source {}", this.source, e);
-        }
-    }
-}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
index 3668311..0359f7d 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
@@ -18,9 +18,8 @@
  */
 package org.apache.pulsar.functions.instance.producers;
 
-import io.netty.util.collection.IntObjectHashMap;
-import io.netty.util.collection.IntObjectMap;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -37,7 +36,8 @@ import org.apache.pulsar.client.api.PulsarClientException;
 public class MultiConsumersOneOuputTopicProducers extends 
AbstractOneOuputTopicProducers {
 
     @Getter(AccessLevel.PACKAGE)
-    private final Map<String, IntObjectMap<Producer<byte[]>>> producers;
+    private final Map<String, Map<String, Producer<byte[]>>> producers;
+
 
     public MultiConsumersOneOuputTopicProducers(PulsarClient client,
                                                 String outputTopic)
@@ -51,15 +51,15 @@ public class MultiConsumersOneOuputTopicProducers extends 
AbstractOneOuputTopicP
         // no-op
     }
 
-    static String makeProducerName(String srcTopicName, int srcTopicPartition) 
{
+    static String makeProducerName(String srcTopicName, String 
srcTopicPartition) {
         return String.format("%s-%s", srcTopicName, srcTopicPartition);
     }
 
     @Override
-    public synchronized Producer<byte[]> getProducer(String srcTopicName, int 
srcTopicPartition) throws PulsarClientException {
-        IntObjectMap<Producer<byte[]>> producerMap = 
producers.get(srcTopicName);
+    public synchronized Producer<byte[]> getProducer(String srcTopicName, 
String srcTopicPartition) throws PulsarClientException {
+        Map<String, Producer<byte[]>> producerMap = 
producers.get(srcTopicName);
         if (null == producerMap) {
-            producerMap = new IntObjectHashMap<>();
+            producerMap = new HashMap<>();
             producers.put(srcTopicName, producerMap);
         }
 
@@ -72,8 +72,8 @@ public class MultiConsumersOneOuputTopicProducers extends 
AbstractOneOuputTopicP
     }
 
     @Override
-    public synchronized void closeProducer(String srcTopicName, int 
srcTopicPartition) {
-        IntObjectMap<Producer<byte[]>> producerMap = 
producers.get(srcTopicName);
+    public synchronized void closeProducer(String srcTopicName, String 
srcTopicPartition) {
+        Map<String, Producer<byte[]>> producerMap = 
producers.get(srcTopicName);
 
         if (null != producerMap) {
             Producer<byte[]> producer = producerMap.remove(srcTopicPartition);
@@ -89,7 +89,7 @@ public class MultiConsumersOneOuputTopicProducers extends 
AbstractOneOuputTopicP
     @Override
     public synchronized void close() {
         List<CompletableFuture<Void>> closeFutures = new 
ArrayList<>(producers.size());
-        for (IntObjectMap<Producer<byte[]>> producerMap: producers.values()) {
+        for (Map<String, Producer<byte[]>> producerMap: producers.values()) {
             for (Producer<byte[]> producer : producerMap.values()) {
                 closeFutures.add(producer.closeAsync());
             }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
index 29cd96a..b9d6a08 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
@@ -42,7 +42,7 @@ public interface Producers extends AutoCloseable {
      *          src topic partition
      * @return the producer instance to produce messages
      */
-    Producer<byte[]> getProducer(String srcTopicName, int srcTopicPartition) 
throws PulsarClientException;
+    Producer<byte[]> getProducer(String srcTopicName, String 
srcTopicPartition) throws PulsarClientException;
 
     /**
      * Close a producer specified by <tt>srcTopicName</tt> and 
<tt>srcTopicPartition</tt>
@@ -51,7 +51,7 @@ public interface Producers extends AutoCloseable {
      * @param srcTopicPartition src topic partition
      */
     void closeProducer(String srcTopicName,
-                       int srcTopicPartition);
+                       String srcTopicPartition);
 
     @Override
     void close();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java
index 8e0d37a..54e34c3 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.functions.sink;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.connect.core.RecordContext;
 import org.apache.pulsar.connect.core.Sink;
 
 /**
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 7c89d92..79c0b35 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -18,5 +18,250 @@
  */
 package org.apache.pulsar.functions.sink;
 
-public class PulsarSink {
+import com.google.common.annotations.VisibleForTesting;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.connect.core.RecordContext;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.api.utils.DefaultSerDe;
+import org.apache.pulsar.functions.instance.InstanceUtils;
+import 
org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
+import 
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
+import org.apache.pulsar.functions.instance.producers.Producers;
+import org.apache.pulsar.functions.source.PulsarRecord;
+import org.apache.pulsar.functions.utils.FunctionConfig;
+
+import java.util.Base64;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+@Slf4j
+public class PulsarSink<T> implements RuntimeSink<T> {
+
+    private PulsarClient client;
+    private PulsarSinkConfig pulsarSinkConfig;
+    private SerDe<T> outputSerDe;
+
+    private PulsarSinkProcessor pulsarSinkProcessor;
+
+    private interface PulsarSinkProcessor {
+        void initializeOutputProducer(String outputTopic) throws Exception;
+
+        void sendOutputMessage(MessageBuilder outputMsgBuilder,
+                               PulsarRecord pulsarRecord) throws Exception;
+
+        void close() throws Exception;
+    }
+
+    private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor 
{
+        private Producer<byte[]> producer;
+
+        @Override
+        public void initializeOutputProducer(String outputTopic) throws 
Exception {
+            this.producer = AbstractOneOuputTopicProducers.createProducer(
+                    client, pulsarSinkConfig.getTopic());
+        }
+
+        @Override
+        public void sendOutputMessage(MessageBuilder outputMsgBuilder,
+                                      PulsarRecord pulsarRecord) throws 
Exception {
+            Message<byte[]> outputMsg = outputMsgBuilder.build();
+            this.producer.sendAsync(outputMsg);
+        }
+
+        @Override
+        public void close() throws Exception {
+            if (null != producer) {
+                try {
+                    producer.close();
+                } catch (PulsarClientException e) {
+                    log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+                }
+            }
+        }
+    }
+
+    private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
+        private Producer<byte[]> producer;
+
+        @Override
+        public void initializeOutputProducer(String outputTopic) throws 
Exception {
+            this.producer = AbstractOneOuputTopicProducers.createProducer(
+                    client, pulsarSinkConfig.getTopic());
+        }
+
+        @Override
+        public void sendOutputMessage(MessageBuilder outputMsgBuilder,
+                                      PulsarRecord pulsarRecord) throws 
Exception {
+            Message<byte[]> outputMsg = outputMsgBuilder.build();
+            this.producer.sendAsync(outputMsg).thenAccept(messageId -> 
pulsarRecord.ack());
+        }
+
+        @Override
+        public void close() throws Exception {
+            if (null != producer) {
+                try {
+                    producer.close();
+                } catch (PulsarClientException e) {
+                    log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+                }
+            }
+        }
+    }
+
+    private class PulsarSinkEffectivelyOnceProcessor implements 
PulsarSinkProcessor, ConsumerEventListener {
+
+        @Getter(AccessLevel.PACKAGE)
+        protected Producers outputProducer;
+
+        @Override
+        public void initializeOutputProducer(String outputTopic) throws 
Exception {
+            outputProducer = new MultiConsumersOneOuputTopicProducers(client, 
outputTopic);
+            outputProducer.initialize();
+        }
+
+        @Override
+        public void sendOutputMessage(MessageBuilder outputMsgBuilder, 
PulsarRecord pulsarRecord)
+                throws Exception {
+
+            // assign sequence id to output message for idempotent producing
+            outputMsgBuilder = outputMsgBuilder
+                    .setSequenceId(pulsarRecord.getRecordSequence());
+
+            // currently on PulsarRecord
+            Producer producer = 
outputProducer.getProducer(pulsarRecord.getTopicName(),
+                    pulsarRecord.getPartitionId());
+
+            org.apache.pulsar.client.api.Message outputMsg = 
outputMsgBuilder.build();
+            producer.sendAsync(outputMsg)
+                    .thenAccept(messageId -> pulsarRecord.ack())
+                    .join();
+        }
+
+        @Override
+        public void close() throws Exception {
+            // kill the result producer
+            if (null != outputProducer) {
+                outputProducer.close();
+                outputProducer = null;
+            }
+        }
+
+        @Override
+        public void becameActive(Consumer<?> consumer, int partitionId) {
+            // if the instance becomes active for a given topic partition,
+            // open a producer for the results computed from this topic 
partition.
+            if (null != outputProducer) {
+                try {
+                    this.outputProducer.getProducer(consumer.getTopic(), 
Integer.toString(partitionId));
+                } catch (PulsarClientException e) {
+                    // this can be ignored, because producer can be lazily 
created when accessing it.
+                    log.warn("Fail to create a producer for results computed 
from messages of topic: {}, partition: {}",
+                            consumer.getTopic(), partitionId);
+                }
+            }
+        }
+
+        @Override
+        public void becameInactive(Consumer<?> consumer, int partitionId) {
+            if (null != outputProducer) {
+                // if I lost the ownership of a partition, close its 
corresponding topic partition.
+                // this is to allow the new active consumer be able to produce 
to the result topic.
+                this.outputProducer.closeProducer(consumer.getTopic(), 
Integer.toString(partitionId));
+            }
+        }
+    }
+
+    public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig) {
+        this.client = client;
+        this.pulsarSinkConfig = pulsarSinkConfig;
+    }
+
+    @Override
+    public void open(Map<String, Object> config) throws Exception {
+
+        // Setup Serialization/Deserialization
+        setupSerDe();
+
+        FunctionConfig.ProcessingGuarantees processingGuarantees = 
this.pulsarSinkConfig.getProcessingGuarantees();
+        switch (processingGuarantees) {
+            case ATMOST_ONCE:
+                this.pulsarSinkProcessor = new PulsarSinkAtMostOnceProcessor();
+                break;
+            case ATLEAST_ONCE:
+                this.pulsarSinkProcessor = new 
PulsarSinkAtLeastOnceProcessor();
+                break;
+            case EFFECTIVELY_ONCE:
+                this.pulsarSinkProcessor = new 
PulsarSinkEffectivelyOnceProcessor();
+                break;
+        }
+        
this.pulsarSinkProcessor.initializeOutputProducer(this.pulsarSinkConfig.getTopic());
+    }
+
+    @Override
+    public CompletableFuture<Void> write(T value) {
+        return null;
+    }
+
+    @Override
+    public void write(RecordContext recordContext, T value) throws Exception {
+
+        PulsarRecord pulsarRecord = (PulsarRecord) recordContext;
+
+        byte[] output;
+        try {
+            output = this.outputSerDe.serialize(value);
+        } catch (Exception e) {
+            //TODO Add serialization exception stats
+            throw new RuntimeException("Error occured when attempting to 
serialize output:", e);
+        }
+        MessageBuilder msgBuilder = MessageBuilder.create();
+        msgBuilder
+                .setContent(output)
+                .setProperty("__pfn_input_topic__", 
pulsarRecord.getTopicName())
+                .setProperty("__pfn_input_msg_id__", new String(
+                        
Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
+        this.pulsarSinkProcessor.sendOutputMessage(msgBuilder, pulsarRecord);
+    }
+
+    @Override
+    public void close() throws Exception {
+        this.pulsarSinkProcessor.close();
+
+    }
+
+    @VisibleForTesting
+    void setupSerDe() throws ClassNotFoundException {
+        Class<?> typeArg = 
Thread.currentThread().getContextClassLoader().loadClass(
+                this.pulsarSinkConfig.getTypeClassName());
+
+        if (!Void.class.equals(typeArg)) { // return type is not `Void.class`
+            if (this.pulsarSinkConfig.getSerDeClassName() == null
+                    || this.pulsarSinkConfig.getSerDeClassName().isEmpty()
+                    || 
this.pulsarSinkConfig.getSerDeClassName().equals(DefaultSerDe.class.getName())) 
{
+                this.outputSerDe = 
InstanceUtils.initializeDefaultSerDe(typeArg);
+            } else {
+                this.outputSerDe = 
InstanceUtils.initializeSerDe(this.pulsarSinkConfig.getSerDeClassName(),
+                        Thread.currentThread().getContextClassLoader(), 
typeArg);
+            }
+            Class<?>[] outputSerdeTypeArgs = 
TypeResolver.resolveRawArguments(SerDe.class, outputSerDe.getClass());
+            if 
(outputSerDe.getClass().getName().equals(DefaultSerDe.class.getName())) {
+                if (!DefaultSerDe.IsSupportedType(typeArg)) {
+                    throw new RuntimeException("Default Serde does not support 
type " + typeArg);
+                }
+            } else if (!outputSerdeTypeArgs[0].isAssignableFrom(typeArg)) {
+                throw new RuntimeException("Inconsistent types found between 
function output type and output serde type: "
+                        + " function type = " + typeArg + "should be 
assignable from " + outputSerdeTypeArgs[0]);
+            }
+        }
+    }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
similarity index 67%
copy from 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java
copy to 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
index 2a5dc44..1def3f1 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
@@ -16,33 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.source;
+package org.apache.pulsar.functions.sink;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import lombok.Builder;
-import lombok.Data;
 import lombok.Getter;
-import lombok.NoArgsConstructor;
 import lombok.Setter;
 import lombok.ToString;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 
-import java.io.IOException;
 import java.util.Map;
 
 @Getter
 @Setter
 @ToString
-public class PulsarConfig {
-
+public class PulsarSinkConfig {
     private FunctionConfig.ProcessingGuarantees processingGuarantees;
     private FunctionConfig.SubscriptionType subscriptionType;
-    private String subscriptionName;
-    private Map<String, String> topicSerdeClassNameMap;
+    private String topic;
+    private String serDeClassName;
     private String typeClassName;
-
-    public static PulsarConfig load(Map<String, Object> map) throws 
IOException {
-        ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
PulsarConfig.class);
-    }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java
index 63a48ec..fe47705 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java
@@ -29,7 +29,7 @@ import org.apache.pulsar.connect.core.Sink;
  * <p>There is a default implementation provided for wrapping up the user 
provided {@link Sink}. Pulsar sink
  * should be implemented using this interface to ensure supporting 
effective-once.
  */
-public interface RuntimeSink<T> extends Sink<T> {
+public interface RuntimeSink<T> extends Sink<T>{
 
     /**
      * Write the <tt>value</tt>value.
@@ -40,7 +40,7 @@ public interface RuntimeSink<T> extends Sink<T> {
      * @param inputRecordContext input record context
      * @param value output value computed from the runtime.
      */
-    default void write(RecordContext inputRecordContext, T value) {
+    default void write(RecordContext inputRecordContext, T value) throws 
Exception {
         write(value)
             .thenAccept(ignored -> inputRecordContext.ack())
             .exceptionally(cause -> {
@@ -48,5 +48,4 @@ public interface RuntimeSink<T> extends Sink<T> {
                 return null;
             });
     }
-
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 9048544..dd0fb38 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.source;
 
+import com.google.common.annotations.VisibleForTesting;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
@@ -41,27 +42,27 @@ import java.util.concurrent.TimeUnit;
 public class PulsarSource<T> implements Source<T> {
 
     private PulsarClient pulsarClient;
-    private PulsarConfig pulsarConfig;
+    private PulsarSourceConfig pulsarSourceConfig;
     private Map<String, SerDe> topicToSerDeMap = new HashMap<>();
 
     @Getter
     private org.apache.pulsar.client.api.Consumer inputConsumer;
 
-    public PulsarSource(PulsarClient pulsarClient, PulsarConfig pulsarConfig) {
+    public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig 
pulsarConfig) {
         this.pulsarClient = pulsarClient;
-        this.pulsarConfig = pulsarConfig;
+        this.pulsarSourceConfig = pulsarConfig;
     }
 
     @Override
     public void open(Map<String, Object> config) throws Exception {
         // Setup Serialization/Deserialization
-        setupSerde();
+        setupSerDe();
 
         // Setup pulsar consumer
         this.inputConsumer = this.pulsarClient.newConsumer()
-                .topics(new 
ArrayList<>(this.pulsarConfig.getTopicSerdeClassNameMap().keySet()))
-                .subscriptionName(this.pulsarConfig.getSubscriptionName())
-                
.subscriptionType(this.pulsarConfig.getSubscriptionType().get())
+                .topics(new 
ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet()))
+                
.subscriptionName(this.pulsarSourceConfig.getSubscriptionName())
+                
.subscriptionType(this.pulsarSourceConfig.getSubscriptionType().get())
                 .ackTimeout(1, TimeUnit.MINUTES)
                 .subscribe();
     }
@@ -81,7 +82,7 @@ public class PulsarSource<T> implements Source<T> {
             MessageIdImpl messageId = (MessageIdImpl) 
topicMessageId.getInnerMessageId();
             partitionId = Long.toString(messageId.getPartitionIndex());
         } else {
-            topicName = 
this.pulsarConfig.getTopicSerdeClassNameMap().keySet().iterator().next();
+            topicName = 
this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet().iterator().next();
             partitionId = Long.toString(((MessageIdImpl) 
message.getMessageId()).getPartitionIndex());
         }
 
@@ -107,13 +108,13 @@ public class PulsarSource<T> implements Source<T> {
                 .sequenceId(message.getSequenceId())
                 .topicName(topicName)
                 .ackFunction(() -> {
-                    if (pulsarConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                    if (pulsarSourceConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                         inputConsumer.acknowledgeCumulativeAsync(message);
                     } else {
                         inputConsumer.acknowledgeAsync(message);
                     }
                 }).failFunction(() -> {
-                    if (pulsarConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                    if (pulsarSourceConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                         throw new RuntimeException("Failed to process message: 
" + message.getMessageId());
                     }
                 })
@@ -126,17 +127,18 @@ public class PulsarSource<T> implements Source<T> {
         this.inputConsumer.close();
     }
 
-    private void setupSerde() throws ClassNotFoundException {
+    @VisibleForTesting
+    void setupSerDe() throws ClassNotFoundException {
 
-        Class<?> typeArg = 
Thread.currentThread().getContextClassLoader().loadClass(this.pulsarConfig.getTypeClassName());
+        Class<?> typeArg = 
Thread.currentThread().getContextClassLoader().loadClass(this.pulsarSourceConfig.getTypeClassName());
         if (Void.class.equals(typeArg)) {
             throw new RuntimeException("Input type of Pulsar Function cannot 
be Void");
         }
 
-        for (Map.Entry<String, String> entry : 
this.pulsarConfig.getTopicSerdeClassNameMap().entrySet()) {
+        for (Map.Entry<String, String> entry : 
this.pulsarSourceConfig.getTopicSerdeClassNameMap().entrySet()) {
             String topic = entry.getKey();
             String serDeClassname = entry.getValue();
-            if (serDeClassname.isEmpty()) {
+            if (serDeClassname == null || serDeClassname.isEmpty()) {
                 serDeClassname = DefaultSerDe.class.getName();
             }
             SerDe serDe = InstanceUtils.initializeSerDe(serDeClassname,
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
similarity index 87%
rename from 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java
rename to 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
index 2a5dc44..4d5e540 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
@@ -19,10 +19,7 @@
 package org.apache.pulsar.functions.source;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import lombok.Builder;
-import lombok.Data;
 import lombok.Getter;
-import lombok.NoArgsConstructor;
 import lombok.Setter;
 import lombok.ToString;
 import org.apache.pulsar.functions.utils.FunctionConfig;
@@ -33,7 +30,7 @@ import java.util.Map;
 @Getter
 @Setter
 @ToString
-public class PulsarConfig {
+public class PulsarSourceConfig {
 
     private FunctionConfig.ProcessingGuarantees processingGuarantees;
     private FunctionConfig.SubscriptionType subscriptionType;
@@ -41,8 +38,8 @@ public class PulsarConfig {
     private Map<String, String> topicSerdeClassNameMap;
     private String typeClassName;
 
-    public static PulsarConfig load(Map<String, Object> map) throws 
IOException {
+    public static PulsarSourceConfig load(Map<String, Object> map) throws 
IOException {
         ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
PulsarConfig.class);
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
PulsarSourceConfig.class);
     }
 }
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py 
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index f2763a9..15d2643 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -58,14 +58,10 @@ def main():
   parser.add_argument('--name', required=True, help='Function Name')
   parser.add_argument('--tenant', required=True, help='Tenant Name')
   parser.add_argument('--namespace', required=True, help='Namespace name')
-  parser.add_argument('--source_topics_serde_classname', required=True, 
help='A mapping of Input topics to SerDe')
-  parser.add_argument('--output_topic', required=False, help='Output Topic')
-  parser.add_argument('--output_serde_classname', required=False, help='Output 
Serde Classnames')
   parser.add_argument('--instance_id', required=True, help='Instance Id')
   parser.add_argument('--function_id', required=True, help='Function Id')
   parser.add_argument('--function_version', required=True, help='Function 
Version')
   parser.add_argument('--processing_guarantees', required=True, 
help='Processing Guarantees')
-  parser.add_argument('--source_subscription_type', required=True, 
help='Subscription Type')
   parser.add_argument('--pulsar_serviceurl', required=True, help='Pulsar 
Service Url')
   parser.add_argument('--port', required=True, help='Instance Port', type=int)
   parser.add_argument('--max_buffered_tuples', required=True, help='Maximum 
number of Buffered tuples')
@@ -74,6 +70,10 @@ def main():
   parser.add_argument('--logging_file', required=True, help='Log file name')
   parser.add_argument('--auto_ack', required=True, help='Enable Autoacking?')
   parser.add_argument('--log_topic', required=False, help='Topic to send Log 
Messages')
+  parser.add_argument('--source_subscription_type', required=True, 
help='Subscription Type')
+  parser.add_argument('--source_topics_serde_classname', required=True, 
help='A mapping of Input topics to SerDe')
+  parser.add_argument('--sink_topic', required=False, help='Sink Topic')
+  parser.add_argument('--sink_serde_classname', required=False, help='Sink 
SerDe classname')
 
   args = parser.parse_args()
   log_file = os.path.join(args.logging_directory,
@@ -104,10 +104,10 @@ def main():
   function_details.source.MergeFrom(sourceSpec)
 
   sinkSpec = Function_pb2.SinkSpec()
-  if args.output_topic != None and len(args.output_topic) != 0:
-    sinkSpec.topic = args.output_topic
-  if args.output_serde_classname != None and len(args.output_serde_classname) 
!= 0:
-    sinkSpec.serDeClassName = args.output_serde_classname
+  if args.sink_topic != None and len(args.sink_topic) != 0:
+    sinkSpec.topic = args.sink_topic
+  if args.sink_serde_classname != None and len(args.sink_serde_classname) != 0:
+    sinkSpec.serDeClassName = args.sink_serde_classname
   function_details.sink.MergeFrom(sinkSpec)
 
   function_details.processingGuarantees = 
Function_pb2.ProcessingGuarantees.Value(args.processing_guarantees)
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 289dbba..12d4f19 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -20,20 +20,14 @@ package org.apache.pulsar.functions.instance;
 
 import lombok.Getter;
 import lombok.Setter;
-import net.jodah.typetools.TypeResolver;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
-import org.testng.annotations.Test;
 
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
-import static org.testng.AssertJUnit.*;
-
 public class JavaInstanceRunnableTest {
 
     static class IntegerSerDe implements SerDe<Integer> {
@@ -111,79 +105,4 @@ public class JavaInstanceRunnableTest {
             return null;
         }
     }
-
-    /**
-     * Verify that JavaInstance does support functions that output Void type
-     */
-    @Test
-    public void testVoidOutputClasses() {
-        try {
-            JavaInstanceRunnable runnable = createRunnable(false, 
DefaultSerDe.class.getName());
-            Method method = makeAccessible(runnable);
-            ClassLoader clsLoader = 
Thread.currentThread().getContextClassLoader();
-            VoidOutputHandler pulsarFunction = new VoidOutputHandler();
-            Class<?>[] typeArgs = 
TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
-            method.invoke(runnable, typeArgs, clsLoader);
-        } catch (Exception ex) {
-            assertTrue(false);
-        }
-    }
-
-    /**
-     * Verify that Default Serializer works fine.
-     */
-    @Test
-    public void testDefaultSerDe() {
-        try {
-            JavaInstanceRunnable runnable = createRunnable(false, null);
-            Method method = makeAccessible(runnable);
-            ClassLoader clsLoader = 
Thread.currentThread().getContextClassLoader();
-            Function function = (Function<String, String>) (input, context) -> 
input + "-lambda";
-            Class<?>[] typeArgs = 
TypeResolver.resolveRawArguments(Function.class, function.getClass());
-            method.invoke(runnable, typeArgs, clsLoader);
-        } catch (Exception ex) {
-            ex.printStackTrace();
-            assertEquals(ex, null);
-            assertTrue(false);
-        }
-    }
-
-    /**
-     * Verify that Explicit setting of Default Serializer works fine.
-     */
-    @Test
-    public void testExplicitDefaultSerDe() {
-        try {
-            JavaInstanceRunnable runnable = createRunnable(false, 
DefaultSerDe.class.getName());
-            Method method = makeAccessible(runnable);
-            ClassLoader clsLoader = 
Thread.currentThread().getContextClassLoader();
-            Function function = (Function<String, String>) (input, context) -> 
input + "-lambda";
-            Class<?>[] typeArgs = 
TypeResolver.resolveRawArguments(Function.class, function.getClass());
-            method.invoke(runnable, typeArgs, clsLoader);
-        } catch (Exception ex) {
-            assertTrue(false);
-        }
-    }
-
-    /**
-     * Verify that function output type should be consistent with output serde 
type.
-     */
-    @Test
-    public void testInconsistentOutputType() {
-        try {
-            JavaInstanceRunnable runnable = createRunnable(false, 
IntegerSerDe.class.getName());
-            Method method = makeAccessible(runnable);
-            ClassLoader clsLoader = 
Thread.currentThread().getContextClassLoader();
-            Function function = (Function<String, String>) (input, context) -> 
input + "-lambda";
-            Class<?>[] typeArgs = 
TypeResolver.resolveRawArguments(Function.class, function.getClass());
-            method.invoke(runnable, typeArgs, clsLoader);
-            fail("Should fail constructing java instance if function type is 
inconsistent with serde type");
-        } catch (InvocationTargetException ex) {
-            assertTrue(ex.getCause().getMessage().startsWith("Inconsistent 
types found between function output type and output serde type:"));
-        } catch (Exception ex) {
-            assertTrue(false);
-        }
-    }
-
-
 }
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
index e6072c8..a22b366 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
@@ -213,7 +213,7 @@ public class MultiConsumersOneOutputTopicProducersTest {
     @Test
     public void testGetCloseProducer() throws Exception {
         String srcTopic = "test-src-topic";
-        int ptnIdx = 1234;
+        String ptnIdx = "1234";
         Producer<byte[]> producer = producers.getProducer(srcTopic, ptnIdx);
 
         String producerName = makeProducerName(srcTopic, ptnIdx);
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java
index 7c58c30..2ba4e3f 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java
@@ -63,13 +63,13 @@ public class DefaultRuntimeSinkTest {
     }
 
     @Test
-    public void testWrite() {
+    public void testWrite() throws Exception {
         this.runtimeSink.write("test-record");
         verify(mockSink, times(1)).write(eq("test-record"));
     }
 
     @Test
-    public void testWriteAck() {
+    public void testWriteAck() throws Exception {
         RecordContext context = mock(RecordContext.class);
 
         CompletableFuture<Void> writeFuture = new CompletableFuture<>();
@@ -82,7 +82,7 @@ public class DefaultRuntimeSinkTest {
     }
 
     @Test
-    public void testWriteFail() {
+    public void testWriteFail() throws Exception {
         RecordContext context = mock(RecordContext.class);
 
         CompletableFuture<Void> writeFuture = new CompletableFuture<>();
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
similarity index 51%
copy from 
pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
copy to 
pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 558517a..0f826ac 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -16,21 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.source;
+package org.apache.pulsar.functions.sink;
 
+import lombok.Getter;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException;;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.testng.annotations.Test;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyList;
@@ -39,18 +39,14 @@ import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertFalse;
 import static org.testng.AssertJUnit.assertTrue;
 import static org.testng.AssertJUnit.fail;
 
 @Slf4j
-public class PulsarSourceTest {
+public class PulsarSinkTest {
 
-    private static final String SUBSCRIPTION_NAME = 
"test/test-namespace/example";
-    private static Map<String, String> topicSerdeClassNameMap = new 
HashMap<>();
-    static {
-        
topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", 
DefaultSerDe.class.getName());
-    }
+    private static final String TOPIC = 
"persistent://sample/standalone/ns1/test_result";
+    private static final String serDeClassName = DefaultSerDe.class.getName();
 
     public static class TestSerDe implements SerDe<String> {
 
@@ -82,55 +78,125 @@ public class PulsarSourceTest {
         return pulsarClient;
     }
 
-    private static PulsarConfig getPulsarConfigs() {
-        PulsarConfig pulsarConfig = new PulsarConfig();
+    private static PulsarSinkConfig getPulsarConfigs() {
+        PulsarSinkConfig pulsarConfig = new PulsarSinkConfig();
         
pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
         
pulsarConfig.setSubscriptionType(FunctionConfig.SubscriptionType.FAILOVER);
-        pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
+        pulsarConfig.setTopic(TOPIC);
+        pulsarConfig.setSerDeClassName(serDeClassName);
         pulsarConfig.setTypeClassName(String.class.getName());
         return pulsarConfig;
     }
 
+    @Getter
+    @Setter
+    public static class ComplexUserDefinedType {
+        private String name;
+        private Integer age;
+    }
+
+    public static class ComplexSerDe implements SerDe<ComplexUserDefinedType> {
+        @Override
+        public ComplexUserDefinedType deserialize(byte[] input) {
+            return null;
+        }
+
+        @Override
+        public byte[] serialize(ComplexUserDefinedType input) {
+            return new byte[0];
+        }
+    }
+
+    /**
+     * Verify that JavaInstance does support functions that output Void type
+     */
     @Test
-    public void testVoidInputClasses() throws IOException {
-        PulsarConfig pulsarConfig = getPulsarConfigs();
+    public void testVoidOutputClasses() throws Exception {
+        PulsarSinkConfig pulsarConfig = getPulsarConfigs();
         // set type to void
         pulsarConfig.setTypeClassName(Void.class.getName());
-        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig);
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig);
 
         try {
-            pulsarSource.open(new HashMap<>());
-            assertFalse(true);
-        } catch (RuntimeException ex) {
-            log.error("RuntimeException: {}", ex, ex);
-            assertEquals(ex.getMessage(), "Input type of Pulsar Function 
cannot be Void");
+            pulsarSink.setupSerDe();
         } catch (Exception ex) {
-            log.error("Exception: {}", ex, ex);
-            assertFalse(true);
+            ex.printStackTrace();
+            assertEquals(ex, null);
+            assertTrue(false);
         }
     }
 
-    /**
-     * Verify that function input type should be consistent with input serde 
type.
-     */
     @Test
-    public void testInconsistentInputType() throws IOException {
-        PulsarConfig pulsarConfig = getPulsarConfigs();
+    public void testInconsistentOutputType() throws IOException {
+        PulsarSinkConfig pulsarConfig = getPulsarConfigs();
         // set type to be inconsistent to that of SerDe
         pulsarConfig.setTypeClassName(Integer.class.getName());
-        Map<String, String> topicSerdeClassNameMap = new HashMap<>();
-        
topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", 
TestSerDe.class.getName());
-        pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
-        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig);
+        pulsarConfig.setSerDeClassName(TestSerDe.class.getName());
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig);
         try {
-            pulsarSource.open(new HashMap<>());
+            pulsarSink.setupSerDe();
             fail("Should fail constructing java instance if function type is 
inconsistent with serde type");
         } catch (RuntimeException ex) {
             log.error("RuntimeException: {}", ex, ex);
-            assertTrue(ex.getMessage().startsWith("Inconsistent types found 
between function input type and input serde type:"));
+            assertTrue(ex.getMessage().startsWith("Inconsistent types found 
between function output type and output serde type:"));
         } catch (Exception ex) {
             log.error("Exception: {}", ex, ex);
             assertTrue(false);
         }
     }
+
+    /**
+     * Verify that Default Serializer works fine.
+     */
+    @Test
+    public void testDefaultSerDe() throws PulsarClientException {
+
+        PulsarSinkConfig pulsarConfig = getPulsarConfigs();
+        // set type to void
+        pulsarConfig.setTypeClassName(String.class.getName());
+        pulsarConfig.setSerDeClassName(null);
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig);
+
+        try {
+            pulsarSink.setupSerDe();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            fail();
+        }
+    }
+
+    /**
+     * Verify that Explicit setting of Default Serializer works fine.
+     */
+    @Test
+    public void testExplicitDefaultSerDe() throws PulsarClientException {
+        PulsarSinkConfig pulsarConfig = getPulsarConfigs();
+        // set type to void
+        pulsarConfig.setTypeClassName(String.class.getName());
+        pulsarConfig.setSerDeClassName(DefaultSerDe.class.getName());
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig);
+
+        try {
+            pulsarSink.setupSerDe();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            fail();
+        }
+    }
+
+    @Test
+    public void testComplexOuputType() throws PulsarClientException {
+        PulsarSinkConfig pulsarConfig = getPulsarConfigs();
+        // set type to void
+        pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName());
+        pulsarConfig.setSerDeClassName(ComplexSerDe.class.getName());
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig);
+
+        try {
+            pulsarSink.setupSerDe();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            fail();
+        }
+    }
 }
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 558517a..77d397c 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.functions.source;
 
+import lombok.Getter;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -82,8 +84,8 @@ public class PulsarSourceTest {
         return pulsarClient;
     }
 
-    private static PulsarConfig getPulsarConfigs() {
-        PulsarConfig pulsarConfig = new PulsarConfig();
+    private static PulsarSourceConfig getPulsarConfigs() {
+        PulsarSourceConfig pulsarConfig = new PulsarSourceConfig();
         
pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
         
pulsarConfig.setSubscriptionType(FunctionConfig.SubscriptionType.FAILOVER);
         pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
@@ -91,9 +93,29 @@ public class PulsarSourceTest {
         return pulsarConfig;
     }
 
+    @Getter
+    @Setter
+    public static class ComplexUserDefinedType {
+        private String name;
+        private Integer age;
+    }
+
+    public static class ComplexSerDe implements SerDe<ComplexUserDefinedType> {
+        @Override
+        public ComplexUserDefinedType deserialize(byte[] input) {
+            return null;
+        }
+
+        @Override
+        public byte[] serialize(ComplexUserDefinedType input) {
+            return new byte[0];
+        }
+    }
+
+
     @Test
     public void testVoidInputClasses() throws IOException {
-        PulsarConfig pulsarConfig = getPulsarConfigs();
+        PulsarSourceConfig pulsarConfig = getPulsarConfigs();
         // set type to void
         pulsarConfig.setTypeClassName(Void.class.getName());
         PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig);
@@ -115,7 +137,7 @@ public class PulsarSourceTest {
      */
     @Test
     public void testInconsistentInputType() throws IOException {
-        PulsarConfig pulsarConfig = getPulsarConfigs();
+        PulsarSourceConfig pulsarConfig = getPulsarConfigs();
         // set type to be inconsistent to that of SerDe
         pulsarConfig.setTypeClassName(Integer.class.getName());
         Map<String, String> topicSerdeClassNameMap = new HashMap<>();
@@ -133,4 +155,64 @@ public class PulsarSourceTest {
             assertTrue(false);
         }
     }
+
+    /**
+     * Verify that Default Serializer works fine.
+     */
+    @Test
+    public void testDefaultSerDe() throws PulsarClientException {
+
+        PulsarSourceConfig pulsarConfig = getPulsarConfigs();
+        // set type to void
+        pulsarConfig.setTypeClassName(String.class.getName());
+        
topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", 
null);
+        pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
+        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig);
+
+        try {
+            pulsarSource.open(new HashMap<>());
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            assertEquals(ex, null);
+            assertTrue(false);
+        }
+    }
+
+    /**
+     * Verify that Explicit setting of Default Serializer works fine.
+     */
+    @Test
+    public void testExplicitDefaultSerDe() throws PulsarClientException {
+        PulsarSourceConfig pulsarConfig = getPulsarConfigs();
+        // set type to void
+        pulsarConfig.setTypeClassName(String.class.getName());
+        
topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", 
DefaultSerDe.class.getName());
+        pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
+        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig);
+
+        try {
+            pulsarSource.open(new HashMap<>());
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            assertEquals(ex, null);
+            assertTrue(false);
+        }
+    }
+
+    @Test
+    public void testComplexOuputType() throws PulsarClientException {
+        PulsarSourceConfig pulsarConfig = getPulsarConfigs();
+        // set type to void
+        pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName());
+        
topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result",ComplexSerDe.class.getName());
+        pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
+        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig);
+
+        try {
+            pulsarSource.setupSerDe();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            fail();
+        }
+    }
 }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index e0330bb..5bb5cac 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -60,13 +60,6 @@ public class JavaInstanceMain {
     protected String tenant;
     @Parameter(names = "--namespace", description = "Namespace Name\n", 
required = true)
     protected String namespace;
-
-    @Parameter(names = "--output_topic", description = "Output Topic Name\n")
-    protected String outputTopicName;
-
-    @Parameter(names = "--output_serde_classname", description = "Output 
SerDe\n")
-    protected String outputSerdeClassName;
-
     @Parameter(names = "--log_topic", description = "Log Topic")
     protected String logTopic;
 
@@ -112,6 +105,17 @@ public class JavaInstanceMain {
     @Parameter(names = "--source_topics_serde_classname", description = "A map 
of topics to SerDe for the source", required = true)
     protected String sourceTopicsSerdeClassName;
 
+    @Parameter(names = "--sink_configs", description = "The sink configs\n")
+    protected String sinkConfigs;
+
+    @Parameter(names = "--sink_classname", description = "The sink 
classname\n", required = true)
+    protected String sinkClassname;
+
+    @Parameter(names = "--sink_topic", description = "The sink Topic Name\n", 
required = true)
+    protected String sinkTopic;
+
+    @Parameter(names = "--sink_serde_classname", description = "Sink SerDe\n")
+    protected String sinkSerdeClassName;
 
     private Server server;
 
@@ -130,15 +134,6 @@ public class JavaInstanceMain {
         functionDetailsBuilder.setName(functionName);
         functionDetailsBuilder.setClassName(className);
 
-        SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
-        if (outputSerdeClassName != null) {
-            sinkSpecBuilder.setSerDeClassName(outputSerdeClassName);
-        }
-        if (outputTopicName != null) {
-            sinkSpecBuilder.setTopic(outputTopicName);
-        }
-        functionDetailsBuilder.setSink(sinkSpecBuilder);
-
         if (logTopic != null) {
             functionDetailsBuilder.setLogTopic(logTopic);
         }
@@ -154,6 +149,7 @@ public class JavaInstanceMain {
             functionDetailsBuilder.putAllUserConfig(userConfigMap);
         }
 
+        // Setup source
         SourceSpec.Builder sourceDetailsBuilder = SourceSpec.newBuilder();
         sourceDetailsBuilder.setClassName(sourceClassname);
         if (sourceConfigs != null && !sourceConfigs.isEmpty()) {;
@@ -165,6 +161,18 @@ public class JavaInstanceMain {
 
         functionDetailsBuilder.setSource(sourceDetailsBuilder);
 
+        // Setup sink
+        SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
+        sinkSpecBuilder.setClassName(sinkClassname);
+        if (sinkConfigs != null) {
+            sinkSpecBuilder.setConfigs(sinkConfigs);
+        }
+        if (sinkSerdeClassName != null) {
+            sinkSpecBuilder.setSerDeClassName(sinkSerdeClassName);
+        }
+        sinkSpecBuilder.setTopic(sinkTopic);
+        functionDetailsBuilder.setSink(sinkSpecBuilder);
+
         FunctionDetails functionDetails = functionDetailsBuilder.build();
         instanceConfig.setFunctionDetails(functionDetails);
 
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 3aae26f..72a5801 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -118,16 +118,7 @@ class ProcessRuntime implements Runtime {
         } else {
             args.add("false");
         }
-        if (instanceConfig.getFunctionDetails().getSink().getTopic() != null
-                && 
!instanceConfig.getFunctionDetails().getSink().getTopic().isEmpty()) {
-            args.add("--output_topic");
-            args.add(instanceConfig.getFunctionDetails().getSink().getTopic());
-        }
-        if (instanceConfig.getFunctionDetails().getSink().getSerDeClassName() 
!= null
-                && 
!instanceConfig.getFunctionDetails().getSink().getSerDeClassName().isEmpty()) {
-            args.add("--output_serde_classname");
-            
args.add(instanceConfig.getFunctionDetails().getSink().getSerDeClassName());
-        }
+
         args.add("--processing_guarantees");
         
args.add(String.valueOf(instanceConfig.getFunctionDetails().getProcessingGuarantees()));
         args.add("--pulsar_serviceurl");
@@ -143,6 +134,7 @@ class ProcessRuntime implements Runtime {
         args.add("--port");
         args.add(String.valueOf(instancePort));
 
+        // source related configs
         if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.JAVA) {
             if 
(!instanceConfig.getFunctionDetails().getSource().getClassName().isEmpty()) {
                 args.add("--source_classname");
@@ -159,6 +151,29 @@ class ProcessRuntime implements Runtime {
 
         args.add("--source_topics_serde_classname");
         args.add(new 
Gson().toJson(instanceConfig.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap()));
+
+        // sink related configs
+        if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.JAVA) {
+            if 
(!instanceConfig.getFunctionDetails().getSink().getClassName().isEmpty()) {
+                args.add("--sink_classname");
+                
args.add(instanceConfig.getFunctionDetails().getSink().getClassName());
+            }
+            String sinkConfigs = 
instanceConfig.getFunctionDetails().getSink().getConfigs();
+            if (sinkConfigs != null && !sinkConfigs.isEmpty()) {
+                args.add("--sink_configs");
+                args.add(sinkConfigs);
+            }
+        }
+        if (instanceConfig.getFunctionDetails().getSink().getTopic() != null
+                && 
!instanceConfig.getFunctionDetails().getSink().getTopic().isEmpty()) {
+            args.add("--sink_topic");
+            args.add(instanceConfig.getFunctionDetails().getSink().getTopic());
+        }
+        if (instanceConfig.getFunctionDetails().getSink().getSerDeClassName() 
!= null
+                && 
!instanceConfig.getFunctionDetails().getSink().getSerDeClassName().isEmpty()) {
+            args.add("--sink_serde_classname");
+            
args.add(instanceConfig.getFunctionDetails().getSink().getSerDeClassName());
+        }
         return args;
     }
 
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index d0c6b3c..b87e636 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -84,6 +84,7 @@ public class ProcessRuntimeTest {
         functionDetailsBuilder.setSink(Function.SinkSpec.newBuilder()
                 .setTopic(TEST_NAME + "-output")
                 
.setSerDeClassName("org.apache.pulsar.functions.runtime.serde.Utf8Serializer")
+                .setClassName("org.pulsar.pulsar.TestSink")
                 .build());
         functionDetailsBuilder.setLogTopic(TEST_NAME + "-log");
         functionDetailsBuilder.setSource(Function.SourceSpec.newBuilder()
@@ -111,7 +112,7 @@ public class ProcessRuntimeTest {
 
         ProcessRuntime container = factory.createContainer(config, 
userJarFile);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 45);
+        assertEquals(args.size(), 47);
         String expectedArgs = "java -cp " + javaInstanceJarFile + " 
-Dlog4j.configurationFile=java_instance_log4j2.yml "
                 + "-Dpulsar.log.dir=" + logDirectory + "/functions" + " 
-Dpulsar.log.file=" + config.getFunctionDetails().getName()
                 + " org.apache.pulsar.functions.runtime.JavaInstanceMain"
@@ -123,14 +124,15 @@ public class ProcessRuntimeTest {
                 + " --function_classname " + 
config.getFunctionDetails().getClassName()
                 + " --log_topic " + config.getFunctionDetails().getLogTopic()
                 + " --auto_ack false"
-                + " --output_topic " + 
config.getFunctionDetails().getSink().getTopic()
-                + " --output_serde_classname " + 
config.getFunctionDetails().getSink().getSerDeClassName()
                 + " --processing_guarantees ATLEAST_ONCE"
                 + " --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(38)
+                + " --max_buffered_tuples 1024 --port " + args.get(34)
                 + " --source_classname " + 
config.getFunctionDetails().getSource().getClassName()
                 + " --source_subscription_type " + 
config.getFunctionDetails().getSource().getSubscriptionType().name()
-                + " --source_topics_serde_classname " + new 
Gson().toJson(topicsToSerDeClassName);
+                + " --source_topics_serde_classname " + new 
Gson().toJson(topicsToSerDeClassName)
+                + " --sink_classname " + 
config.getFunctionDetails().getSink().getClassName()
+                + " --sink_topic " + 
config.getFunctionDetails().getSink().getTopic()
+                + " --sink_serde_classname " + 
config.getFunctionDetails().getSink().getSerDeClassName();
         assertEquals(expectedArgs, String.join(" ", args));
     }
 
@@ -151,13 +153,13 @@ public class ProcessRuntimeTest {
                 + " --function_classname " + 
config.getFunctionDetails().getClassName()
                 + " --log_topic " + config.getFunctionDetails().getLogTopic()
                 + " --auto_ack false"
-                + " --output_topic " + 
config.getFunctionDetails().getSink().getTopic()
-                + " --output_serde_classname " + 
config.getFunctionDetails().getSink().getSerDeClassName()
                 + " --processing_guarantees ATLEAST_ONCE"
                 + " --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(37)
+                + " --max_buffered_tuples 1024 --port " + args.get(33)
                 + " --source_subscription_type " + 
config.getFunctionDetails().getSource().getSubscriptionType().name()
-                + " --source_topics_serde_classname " + new 
Gson().toJson(topicsToSerDeClassName);
+                + " --source_topics_serde_classname " + new 
Gson().toJson(topicsToSerDeClassName)
+                + " --sink_topic " + 
config.getFunctionDetails().getSink().getTopic()
+                + " --sink_serde_classname " + 
config.getFunctionDetails().getSink().getSerDeClassName();
         assertEquals(expectedArgs, String.join(" ", args));
     }
 

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to