This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 97b56cf  Functions schema integration (#1845)
97b56cf is described below

commit 97b56cf04dfca81c937a4b640bc8cfc5470d66cb
Author: Dave Rusek <dave.ru...@gmail.com>
AuthorDate: Wed Jun 20 09:34:51 2018 -0700

    Functions schema integration (#1845)
    
    * wip
    
    * Removed internal shading
    
    * wip
    
    * Fixed handling of IllegalArgumentException in ZK client wrapper
    
    * wip
    
    * Extend SerDe<T> with Schema<T> and implement PulsarSink
    
    * Add Typed Consumers to functions
    
    * Munge functions and schemas together
    
    * Remove consumer/producer changes
    
    * formatting
    
    * Review changes
    
    * Remove context cache and add setConsumer to context
    
    * Addressed Jerry's comments
    
    * Use pulsar-client-original and fix the tests
---
 .../client/impl/MultiTopicsConsumerImpl.java       |   8 +-
 pulsar-functions/api-java/pom.xml                  |   6 ++
 .../org/apache/pulsar/functions/api/SerDe.java     |  28 ++++-
 .../pulsar/functions/instance/ContextImpl.java     |   9 ++
 .../pulsar/functions/instance/JavaInstance.java    |  32 ++++--
 .../producers/AbstractOneOuputTopicProducers.java  |  28 ++---
 .../MultiConsumersOneOuputTopicProducers.java      |  21 ++--
 .../functions/instance/producers/Producers.java    |   4 +-
 .../apache/pulsar/functions/sink/PulsarSink.java   |  36 +++----
 .../pulsar/functions/source/PulsarSource.java      | 115 ++++++++++++---------
 .../MultiConsumersOneOutputTopicProducersTest.java |  11 +-
 .../pulsar/functions/source/PulsarSourceTest.java  |   7 +-
 12 files changed, 195 insertions(+), 110 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 4be8f58..b392629 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -117,8 +118,11 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         this.namespaceName = conf.getTopicNames().stream().findFirst()
                 .flatMap(s -> 
Optional.of(TopicName.get(s).getNamespaceObject())).get();
 
-        List<CompletableFuture<Void>> futures = 
conf.getTopicNames().stream().map(t -> subscribeAsync(t))
+        List<CompletableFuture<Void>> futures =
+            conf.getTopicNames().stream()
+                .map(this::subscribeAsync)
                 .collect(Collectors.toList());
+
         FutureUtil.waitForAll(futures)
             .thenAccept(finalFuture -> {
                 try {
@@ -127,7 +131,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     }
                     setState(State.Ready);
                     // We have successfully created N consumers, so we can 
start receiving messages now
-                    
startReceivingMessages(consumers.values().stream().collect(Collectors.toList()));
+                    startReceivingMessages(new 
ArrayList<>(consumers.values()));
                     subscribeFuture().complete(MultiTopicsConsumerImpl.this);
                     log.info("[{}] [{}] Created topics consumer with {} 
sub-consumers",
                         topic, subscription, allTopicPartitionsNumber.get());
diff --git a/pulsar-functions/api-java/pom.xml 
b/pulsar-functions/api-java/pom.xml
index 43abbe3..d7babec 100644
--- a/pulsar-functions/api-java/pom.xml
+++ b/pulsar-functions/api-java/pom.xml
@@ -38,6 +38,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-client-original</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>net.jodah</groupId>
       <artifactId>typetools</artifactId>
       <scope>test</scope>
diff --git 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java
index f9efa3d..2caee16 100644
--- 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java
@@ -18,10 +18,36 @@
  */
 package org.apache.pulsar.functions.api;
 
+import java.util.Collections;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
 /**
  * An interface for serializer/deserializer.
  */
-public interface SerDe<T> {
+public interface SerDe<T> extends Schema<T> {
     T deserialize(byte[] input);
+
     byte[] serialize(T input);
+
+    @Override
+    default SchemaInfo getSchemaInfo() {
+        SchemaInfo info = new SchemaInfo();
+        info.setName("");
+        info.setType(SchemaType.NONE);
+        info.setSchema(new byte[0]);
+        info.setProperties(Collections.emptyMap());
+        return info;
+    }
+
+    @Override
+    default byte[] encode(T message) {
+        return serialize(message);
+    }
+
+    @Override
+    default T decode(byte[] bytes) {
+        return deserialize(bytes);
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 9eddc69..b606402 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -121,6 +121,15 @@ class ContextImpl implements Context {
         }
     }
 
+    public ContextImpl(InstanceConfig config, Logger logger, PulsarClient 
client,
+                       ClassLoader classLoader) {
+        this(config, logger, client, classLoader, null);
+    }
+
+    public void setInputConsumer(Consumer inputConsumer) {
+        this.inputConsumer = inputConsumer;
+    }
+
     public void setCurrentMessageContext(MessageId messageId, String 
topicName) {
         this.messageId = messageId;
         this.currentTopicName = topicName;
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 5ab8d85..ec85261 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -18,16 +18,19 @@
  */
 package org.apache.pulsar.functions.instance;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import javax.swing.text.html.Option;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.io.core.Source;
-
+import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
 import org.apache.pulsar.functions.source.PulsarSource;
+import org.apache.pulsar.io.core.Source;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,11 +41,12 @@ import org.slf4j.LoggerFactory;
  */
 @Slf4j
 public class JavaInstance implements AutoCloseable {
+    private ContextImpl context;
 
     @Getter(AccessLevel.PACKAGE)
-    private final ContextImpl context;
     private Function function;
     private java.util.function.Function javaUtilFunction;
+    private Optional<PulsarSource> optionalPulsarSource = Optional.empty();
 
     public JavaInstance(InstanceConfig config, Object userClassObject,
                  ClassLoader clsLoader,
@@ -52,8 +56,8 @@ public class JavaInstance implements AutoCloseable {
         Logger instanceLog = LoggerFactory.getLogger("function-" + 
config.getFunctionDetails().getName());
 
         if (source instanceof PulsarSource) {
-            this.context = new ContextImpl(config, instanceLog, pulsarClient, 
clsLoader,
-                    ((PulsarSource) source).getInputConsumer());
+            this.context = new ContextImpl(config, instanceLog, pulsarClient, 
clsLoader);
+            optionalPulsarSource = Optional.of((PulsarSource) source);
         } else {
             this.context = null;
         }
@@ -64,13 +68,17 @@ public class JavaInstance implements AutoCloseable {
         } else {
             this.javaUtilFunction = (java.util.function.Function) 
userClassObject;
         }
+
     }
 
     public JavaExecutionResult handleMessage(MessageId messageId, String 
topicName, Object input) {
-        if (context != null) {
-            context.setCurrentMessageContext(messageId, topicName);
-        }
+        optionalPulsarSource.ifPresent((pulsarSource) -> {
+            
this.context.setInputConsumer(pulsarSource.getConsumerForTopic(topicName));
+            this.context.setCurrentMessageContext(messageId, topicName);
+        });
+
         JavaExecutionResult executionResult = new JavaExecutionResult();
+
         try {
             Object output;
             if (function != null) {
@@ -85,11 +93,15 @@ public class JavaInstance implements AutoCloseable {
         return executionResult;
     }
 
+    public ContextImpl getContext() {
+        return this.context;
+    }
+
     @Override
     public void close() {
     }
 
-    public InstanceCommunication.MetricsData getAndResetMetrics() {
+    public MetricsData getAndResetMetrics() {
         return context.getAndResetMetrics();
     }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
index 7a561a1..73297c7 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
@@ -27,23 +27,27 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.functions.instance.FunctionResultRouter;
 
-public abstract class AbstractOneOuputTopicProducers implements Producers {
+public abstract class AbstractOneOuputTopicProducers<T> implements 
Producers<T> {
 
     protected final PulsarClient client;
     protected final String outputTopic;
+    protected final Schema<T> schema;
 
     AbstractOneOuputTopicProducers(PulsarClient client,
-                                   String outputTopic)
+                                   String outputTopic,
+                                   Schema<T> schema)
             throws PulsarClientException {
         this.client = client;
         this.outputTopic = outputTopic;
+        this.schema = schema;
     }
 
-    static ProducerBuilder<byte[]> newProducerBuilder(PulsarClient client) {
+    static <U> ProducerBuilder<U> newProducerBuilder(PulsarClient client, 
Schema<U> schema) {
         // use function result router to deal with different processing 
guarantees.
-        return client.newProducer() //
+        return client.newProducer(schema) //
                 .blockIfQueueFull(true) //
                 .enableBatching(true) //
                 .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) //
@@ -53,23 +57,23 @@ public abstract class AbstractOneOuputTopicProducers 
implements Producers {
                 .messageRouter(FunctionResultRouter.of());
     }
 
-    protected Producer<byte[]> createProducer(String topic)
+    protected Producer<T> createProducer(String topic, Schema<T> schema)
             throws PulsarClientException {
-        return createProducer(client, topic);
+        return createProducer(client, topic, schema);
     }
 
-    public static Producer<byte[]> createProducer(PulsarClient client, String 
topic)
+    public static <T> Producer<T> createProducer(PulsarClient client, String 
topic, Schema<T> schema)
             throws PulsarClientException {
-        return newProducerBuilder(client).topic(topic).create();
+        return newProducerBuilder(client, schema).topic(topic).create();
     }
 
-    protected Producer<byte[]> createProducer(String topic, String 
producerName)
+    protected Producer<T> createProducer(String topic, String producerName, 
Schema<T> schema)
             throws PulsarClientException {
-        return createProducer(client, topic, producerName);
+        return createProducer(client, topic, producerName, schema);
     }
 
-    public static Producer<byte[]> createProducer(PulsarClient client, String 
topic, String producerName)
+    public static <T> Producer<T> createProducer(PulsarClient client, String 
topic, String producerName, Schema<T> schema)
             throws PulsarClientException {
-        return 
newProducerBuilder(client).topic(topic).producerName(producerName).create();
+        return newProducerBuilder(client, 
schema).topic(topic).producerName(producerName).create();
     }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
index 12a639e..48a8b26 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.functions.instance.producers;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -31,19 +30,21 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 
 @Slf4j
-public class MultiConsumersOneOuputTopicProducers extends 
AbstractOneOuputTopicProducers {
+public class MultiConsumersOneOuputTopicProducers<T> extends 
AbstractOneOuputTopicProducers<T> {
 
     @Getter(AccessLevel.PACKAGE)
     // PartitionId -> producer
-    private final Map<String, Producer<byte[]>> producers;
+    private final Map<String, Producer<T>> producers;
 
 
     public MultiConsumersOneOuputTopicProducers(PulsarClient client,
-                                                String outputTopic)
+                                                String outputTopic,
+                                                Schema<T> schema)
             throws PulsarClientException {
-        super(client, outputTopic);
+        super(client, outputTopic, schema);
         this.producers = new ConcurrentHashMap<>();
     }
 
@@ -57,10 +58,10 @@ public class MultiConsumersOneOuputTopicProducers extends 
AbstractOneOuputTopicP
     }
 
     @Override
-    public synchronized Producer<byte[]> getProducer(String srcPartitionId) 
throws PulsarClientException {
-        Producer<byte[]> producer = producers.get(srcPartitionId);
+    public synchronized Producer<T> getProducer(String srcPartitionId) throws 
PulsarClientException {
+        Producer<T> producer = producers.get(srcPartitionId);
         if (null == producer) {
-            producer = createProducer(outputTopic, srcPartitionId);
+            producer = createProducer(outputTopic, srcPartitionId, schema);
             producers.put(srcPartitionId, producer);
         }
         return producer;
@@ -68,7 +69,7 @@ public class MultiConsumersOneOuputTopicProducers extends 
AbstractOneOuputTopicP
 
     @Override
     public synchronized void closeProducer(String srcPartitionId) {
-        Producer<byte[]> producer = producers.get(srcPartitionId);
+        Producer<T> producer = producers.get(srcPartitionId);
         if (null != producer) {
             producer.closeAsync();
             producers.remove(srcPartitionId);
@@ -78,7 +79,7 @@ public class MultiConsumersOneOuputTopicProducers extends 
AbstractOneOuputTopicP
     @Override
     public synchronized void close() {
         List<CompletableFuture<Void>> closeFutures = new 
ArrayList<>(producers.size());
-        for (Producer<byte[]> producer: producers.values()) {
+        for (Producer<T> producer: producers.values()) {
             closeFutures.add(producer.closeAsync());
         }
         try {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
index 4d026ee..7892876 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 /**
  * An interface for managing publishers within a java instance.
  */
-public interface Producers extends AutoCloseable {
+public interface Producers<T> extends AutoCloseable {
 
     /**
      * Initialize all the producers.
@@ -40,7 +40,7 @@ public interface Producers extends AutoCloseable {
      *          src partition Id
      * @return the producer instance to produce messages
      */
-    Producer<byte[]> getProducer(String srcPartitionId) throws 
PulsarClientException;
+    Producer<T> getProducer(String srcPartitionId) throws 
PulsarClientException;
 
     /**
      * Close a producer specified by <tt>srcPartitionId</tt>.
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 0356ab1..60a1589 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -54,28 +54,28 @@ public class PulsarSink<T> implements Sink<T> {
 
     private PulsarSinkProcessor pulsarSinkProcessor;
 
-    private interface PulsarSinkProcessor {
+    private interface PulsarSinkProcessor<T> {
         void initializeOutputProducer(String outputTopic) throws Exception;
 
-        void sendOutputMessage(MessageBuilder outputMsgBuilder,
+        void sendOutputMessage(MessageBuilder<T> outputMsgBuilder,
                                RecordContext recordContext) throws Exception;
 
         void close() throws Exception;
     }
 
-    private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor 
{
-        private Producer<byte[]> producer;
+    private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor<T> {
+        private Producer<T> producer;
 
         @Override
         public void initializeOutputProducer(String outputTopic) throws 
Exception {
             this.producer = AbstractOneOuputTopicProducers.createProducer(
-                    client, pulsarSinkConfig.getTopic());
+                    client, pulsarSinkConfig.getTopic(), outputSerDe);
         }
 
         @Override
-        public void sendOutputMessage(MessageBuilder outputMsgBuilder,
+        public void sendOutputMessage(MessageBuilder<T> outputMsgBuilder,
                                       RecordContext recordContext) throws 
Exception {
-            Message<byte[]> outputMsg = outputMsgBuilder.build();
+            Message<T> outputMsg = outputMsgBuilder.build();
             this.producer.sendAsync(outputMsg);
         }
 
@@ -91,19 +91,19 @@ public class PulsarSink<T> implements Sink<T> {
         }
     }
 
-    private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
-        private Producer<byte[]> producer;
+    private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor<T> {
+        private Producer<T> producer;
 
         @Override
         public void initializeOutputProducer(String outputTopic) throws 
Exception {
             this.producer = AbstractOneOuputTopicProducers.createProducer(
-                    client, pulsarSinkConfig.getTopic());
+                    client, pulsarSinkConfig.getTopic(), outputSerDe);
         }
 
         @Override
-        public void sendOutputMessage(MessageBuilder outputMsgBuilder,
+        public void sendOutputMessage(MessageBuilder<T> outputMsgBuilder,
                                       RecordContext recordContext) throws 
Exception {
-            Message<byte[]> outputMsg = outputMsgBuilder.build();
+            Message<T> outputMsg = outputMsgBuilder.build();
             this.producer.sendAsync(outputMsg).thenAccept(messageId -> 
recordContext.ack());
         }
 
@@ -119,19 +119,19 @@ public class PulsarSink<T> implements Sink<T> {
         }
     }
 
-    private class PulsarSinkEffectivelyOnceProcessor implements 
PulsarSinkProcessor, ConsumerEventListener {
+    private class PulsarSinkEffectivelyOnceProcessor implements 
PulsarSinkProcessor<T>, ConsumerEventListener {
 
         @Getter(AccessLevel.PACKAGE)
-        protected Producers outputProducer;
+        protected Producers<T> outputProducer;
 
         @Override
         public void initializeOutputProducer(String outputTopic) throws 
Exception {
-            outputProducer = new MultiConsumersOneOuputTopicProducers(client, 
outputTopic);
+            outputProducer = new MultiConsumersOneOuputTopicProducers(client, 
outputTopic, outputSerDe);
             outputProducer.initialize();
         }
 
         @Override
-        public void sendOutputMessage(MessageBuilder outputMsgBuilder, 
RecordContext recordContext)
+        public void sendOutputMessage(MessageBuilder<T> outputMsgBuilder, 
RecordContext recordContext)
                 throws Exception {
 
             // assign sequence id to output message for idempotent producing
@@ -139,9 +139,9 @@ public class PulsarSink<T> implements Sink<T> {
                     .setSequenceId(recordContext.getRecordSequence());
 
             // currently on PulsarRecord
-            Producer producer = 
outputProducer.getProducer(recordContext.getPartitionId());
+            Producer<T> producer = 
outputProducer.getProducer(recordContext.getPartitionId());
 
-            org.apache.pulsar.client.api.Message outputMsg = 
outputMsgBuilder.build();
+            org.apache.pulsar.client.api.Message<T> outputMsg = 
outputMsgBuilder.build();
             producer.sendAsync(outputMsg)
                     .thenAccept(messageId -> recordContext.ack())
                     .join();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 74f2366..c1dddb0 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -19,40 +19,41 @@
 package org.apache.pulsar.functions.source;
 
 import com.google.common.annotations.VisibleForTesting;
-import lombok.Getter;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageListener;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Utils;
-import org.apache.pulsar.io.core.Record;
-import org.apache.pulsar.io.core.Source;
 import org.jboss.util.Classes;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 @Slf4j
-public class PulsarSource<T> implements Source<T> {
+public class PulsarSource<T> extends PushSource<T> implements 
MessageListener<T> {
 
     private PulsarClient pulsarClient;
     private PulsarSourceConfig pulsarSourceConfig;
-    private Map<String, SerDe> topicToSerDeMap = new HashMap<>();
     private boolean isTopicsPattern;
+    private Map<String, SerDe<T>> topicToSerDeMap = new HashMap<>();
 
-    @Getter
-    private org.apache.pulsar.client.api.Consumer inputConsumer;
+    private Map<String, org.apache.pulsar.client.api.Consumer<T>> 
inputConsumers;
 
     public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig 
pulsarConfig) {
         this.pulsarClient = pulsarClient;
@@ -64,28 +65,31 @@ public class PulsarSource<T> implements Source<T> {
         // Setup Serialization/Deserialization
         setupSerDe();
 
-        // Setup pulsar consumer
-        ConsumerBuilder<byte[]> consumerBuilder = 
this.pulsarClient.newConsumer()
+        inputConsumers = Maps.newHashMap();
+        for (Map.Entry<String, SerDe<T>> entry : topicToSerDeMap.entrySet()) {
+            ConsumerBuilder<T> consumerBuilder = 
this.pulsarClient.newConsumer(entry.getValue())
                 
.subscriptionName(this.pulsarSourceConfig.getSubscriptionName())
-                
.subscriptionType(this.pulsarSourceConfig.getSubscriptionType());
+                
.subscriptionType(this.pulsarSourceConfig.getSubscriptionType())
+                .messageListener(this);
 
-        if(isNotBlank(this.pulsarSourceConfig.getTopicsPattern())) {
-            
consumerBuilder.topicsPattern(this.pulsarSourceConfig.getTopicsPattern());    
-            isTopicsPattern = true;
-        }else {
-            consumerBuilder.topics(new 
ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet()));    
-        }
-        
-        if (pulsarSourceConfig.getTimeoutMs() != null) {
-            consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(), 
TimeUnit.MILLISECONDS);
+            if (pulsarSourceConfig.getTimeoutMs() != null) {
+                consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(), 
TimeUnit.MILLISECONDS);
+            }
+
+            if(isNotBlank(this.pulsarSourceConfig.getTopicsPattern())) {
+                
consumerBuilder.topicsPattern(this.pulsarSourceConfig.getTopicsPattern());
+                isTopicsPattern = true;
+            }else {
+                consumerBuilder.topics(new 
ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet()));
+            }
+
+            inputConsumers.put(entry.getKey(),consumerBuilder.subscribe());
         }
-        this.inputConsumer = consumerBuilder.subscribe();
+
     }
 
     @Override
-    public Record<T> read() throws Exception {
-        org.apache.pulsar.client.api.Message<T> message = 
this.inputConsumer.receive();
-
+    public void received(Consumer<T> consumer, Message<T> message) {
         String topicName;
         String partitionId;
 
@@ -127,31 +131,44 @@ public class PulsarSource<T> implements Source<T> {
         }
 
         PulsarRecord<T> pulsarMessage = (PulsarRecord<T>) 
PulsarRecord.builder()
-                .value(input)
-                .messageId(message.getMessageId())
-                .partitionId(String.format("%s-%s", topicName, partitionId))
-                .recordSequence(Utils.getSequenceId(message.getMessageId()))
-                .topicName(topicName)
-                .ackFunction(() -> {
-                    if (pulsarSourceConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
-                        inputConsumer.acknowledgeCumulativeAsync(message);
-                    } else {
-                        inputConsumer.acknowledgeAsync(message);
-                    }
-                }).failFunction(() -> {
-                    if (pulsarSourceConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
-                        throw new RuntimeException("Failed to process message: 
" + message.getMessageId());
-                    }
-                })
-                .build();
-        return pulsarMessage;
+            .value(input)
+            .messageId(message.getMessageId())
+            .partitionId(String.format("%s-%s", topicName, partitionId))
+            .recordSequence(Utils.getSequenceId(message.getMessageId()))
+            .topicName(topicName)
+            .ackFunction(() -> {
+                if (pulsarSourceConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                    consumer.acknowledgeCumulativeAsync(message);
+                } else {
+                    consumer.acknowledgeAsync(message);
+                }
+            }).failFunction(() -> {
+                if (pulsarSourceConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                    throw new RuntimeException("Failed to process message: " + 
message.getMessageId());
+                }
+            })
+            .build();
+
+        consume(pulsarMessage);
+    }
+
+    public Consumer<T> getConsumerForTopic(String topic) {
+        return inputConsumers.get(topic);
+    }
+
+    @Override
+    public void reachedEndOfTopic(Consumer<T> consumer) {
+        //No-op
     }
 
     @Override
     public void close() throws Exception {
-        if (this.inputConsumer != null) {
-            this.inputConsumer.close();
-        }
+        inputConsumers.forEach((ignored, consumer) -> {
+            try {
+                consumer.close();
+            } catch (PulsarClientException e) {
+            }
+        });
     }
 
     @VisibleForTesting
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
index b2f2c4f..81cbbda 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.functions.instance.producers;
 
 import static 
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers.makeProducerName;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -43,6 +45,7 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -191,10 +194,10 @@ public class MultiConsumersOneOutputTopicProducersTest {
     public void setup() throws Exception {
         this.mockClient = mock(PulsarClient.class);
 
-        when(mockClient.newProducer())
+        when(mockClient.newProducer(any(Schema.class)))
             .thenReturn(new MockProducerBuilder());
 
-        producers = new MultiConsumersOneOuputTopicProducers(mockClient, 
TEST_OUTPUT_TOPIC);
+        producers = new MultiConsumersOneOuputTopicProducers(mockClient, 
TEST_OUTPUT_TOPIC, Schema.BYTES);
         producers.initialize();
     }
 
@@ -219,13 +222,13 @@ public class MultiConsumersOneOutputTopicProducersTest {
 
         assertSame(mockProducers.get(producerName), producer);
         verify(mockClient, times(1))
-            .newProducer();
+            .newProducer(Schema.BYTES);
         assertTrue(producers.getProducers().containsKey(producerName));
 
         // second get will not create a new producer
         assertSame(mockProducers.get(producerName), producer);
         verify(mockClient, times(1))
-            .newProducer();
+            .newProducer(Schema.BYTES);
         assertTrue(producers.getProducers().containsKey(producerName));
 
         // close
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 3c5e61b..8ca94bb 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -34,9 +34,11 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import static java.util.Collections.emptyMap;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -78,9 +80,10 @@ public class PulsarSourceTest {
         
doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(anyString());
         
doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(any());
         doReturn(consumerBuilder).when(consumerBuilder).ackTimeout(anyLong(), 
any());
+        
doReturn(consumerBuilder).when(consumerBuilder).messageListener(anyObject());
         Consumer consumer = mock(Consumer.class);
         doReturn(consumer).when(consumerBuilder).subscribe();
-        doReturn(consumerBuilder).when(pulsarClient).newConsumer();
+        doReturn(consumerBuilder).when(pulsarClient).newConsumer(anyObject());
         return pulsarClient;
     }
 
@@ -168,7 +171,7 @@ public class PulsarSourceTest {
         pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
         PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig);
 
-        pulsarSource.open(new HashMap<>());
+        pulsarSource.open(emptyMap());
     }
 
     /**

Reply via email to