This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 97b56cf Functions schema integration (#1845) 97b56cf is described below commit 97b56cf04dfca81c937a4b640bc8cfc5470d66cb Author: Dave Rusek <dave.ru...@gmail.com> AuthorDate: Wed Jun 20 09:34:51 2018 -0700 Functions schema integration (#1845) * wip * Removed internal shading * wip * Fixed handling of IllegalArgumentException in ZK client wrapper * wip * Extend SerDe<T> with Schema<T> and implement PulsarSink * Add Typed Consumers to functions * Munge functions and schemas together * Remove consumer/producer changes * formatting * Review changes * Remove context cache and add setConsumer to context * Addressed Jerry's comments * Use pulsar-client-original and fix the tests --- .../client/impl/MultiTopicsConsumerImpl.java | 8 +- pulsar-functions/api-java/pom.xml | 6 ++ .../org/apache/pulsar/functions/api/SerDe.java | 28 ++++- .../pulsar/functions/instance/ContextImpl.java | 9 ++ .../pulsar/functions/instance/JavaInstance.java | 32 ++++-- .../producers/AbstractOneOuputTopicProducers.java | 28 ++--- .../MultiConsumersOneOuputTopicProducers.java | 21 ++-- .../functions/instance/producers/Producers.java | 4 +- .../apache/pulsar/functions/sink/PulsarSink.java | 36 +++---- .../pulsar/functions/source/PulsarSource.java | 115 ++++++++++++--------- .../MultiConsumersOneOutputTopicProducersTest.java | 11 +- .../pulsar/functions/source/PulsarSourceTest.java | 7 +- 12 files changed, 195 insertions(+), 110 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 4be8f58..b392629 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -117,8 +118,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { this.namespaceName = conf.getTopicNames().stream().findFirst() .flatMap(s -> Optional.of(TopicName.get(s).getNamespaceObject())).get(); - List<CompletableFuture<Void>> futures = conf.getTopicNames().stream().map(t -> subscribeAsync(t)) + List<CompletableFuture<Void>> futures = + conf.getTopicNames().stream() + .map(this::subscribeAsync) .collect(Collectors.toList()); + FutureUtil.waitForAll(futures) .thenAccept(finalFuture -> { try { @@ -127,7 +131,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } setState(State.Ready); // We have successfully created N consumers, so we can start receiving messages now - startReceivingMessages(consumers.values().stream().collect(Collectors.toList())); + startReceivingMessages(new ArrayList<>(consumers.values())); subscribeFuture().complete(MultiTopicsConsumerImpl.this); log.info("[{}] [{}] Created topics consumer with {} sub-consumers", topic, subscription, allTopicPartitionsNumber.get()); diff --git a/pulsar-functions/api-java/pom.xml b/pulsar-functions/api-java/pom.xml index 43abbe3..d7babec 100644 --- a/pulsar-functions/api-java/pom.xml +++ b/pulsar-functions/api-java/pom.xml @@ -38,6 +38,12 @@ </dependency> <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-client-original</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> <groupId>net.jodah</groupId> <artifactId>typetools</artifactId> <scope>test</scope> diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java index f9efa3d..2caee16 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java @@ -18,10 +18,36 @@ */ package org.apache.pulsar.functions.api; +import java.util.Collections; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + /** * An interface for serializer/deserializer. */ -public interface SerDe<T> { +public interface SerDe<T> extends Schema<T> { T deserialize(byte[] input); + byte[] serialize(T input); + + @Override + default SchemaInfo getSchemaInfo() { + SchemaInfo info = new SchemaInfo(); + info.setName(""); + info.setType(SchemaType.NONE); + info.setSchema(new byte[0]); + info.setProperties(Collections.emptyMap()); + return info; + } + + @Override + default byte[] encode(T message) { + return serialize(message); + } + + @Override + default T decode(byte[] bytes) { + return deserialize(bytes); + } } \ No newline at end of file diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 9eddc69..b606402 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -121,6 +121,15 @@ class ContextImpl implements Context { } } + public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, + ClassLoader classLoader) { + this(config, logger, client, classLoader, null); + } + + public void setInputConsumer(Consumer inputConsumer) { + this.inputConsumer = inputConsumer; + } + public void setCurrentMessageContext(MessageId messageId, String topicName) { this.messageId = messageId; this.currentTopicName = topicName; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java index 5ab8d85..ec85261 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java @@ -18,16 +18,19 @@ */ package org.apache.pulsar.functions.instance; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import javax.swing.text.html.Option; import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.functions.api.Function; -import org.apache.pulsar.functions.proto.InstanceCommunication; -import org.apache.pulsar.io.core.Source; - +import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData; import org.apache.pulsar.functions.source.PulsarSource; +import org.apache.pulsar.io.core.Source; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,11 +41,12 @@ import org.slf4j.LoggerFactory; */ @Slf4j public class JavaInstance implements AutoCloseable { + private ContextImpl context; @Getter(AccessLevel.PACKAGE) - private final ContextImpl context; private Function function; private java.util.function.Function javaUtilFunction; + private Optional<PulsarSource> optionalPulsarSource = Optional.empty(); public JavaInstance(InstanceConfig config, Object userClassObject, ClassLoader clsLoader, @@ -52,8 +56,8 @@ public class JavaInstance implements AutoCloseable { Logger instanceLog = LoggerFactory.getLogger("function-" + config.getFunctionDetails().getName()); if (source instanceof PulsarSource) { - this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, - ((PulsarSource) source).getInputConsumer()); + this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader); + optionalPulsarSource = Optional.of((PulsarSource) source); } else { this.context = null; } @@ -64,13 +68,17 @@ public class JavaInstance implements AutoCloseable { } else { this.javaUtilFunction = (java.util.function.Function) userClassObject; } + } public JavaExecutionResult handleMessage(MessageId messageId, String topicName, Object input) { - if (context != null) { - context.setCurrentMessageContext(messageId, topicName); - } + optionalPulsarSource.ifPresent((pulsarSource) -> { + this.context.setInputConsumer(pulsarSource.getConsumerForTopic(topicName)); + this.context.setCurrentMessageContext(messageId, topicName); + }); + JavaExecutionResult executionResult = new JavaExecutionResult(); + try { Object output; if (function != null) { @@ -85,11 +93,15 @@ public class JavaInstance implements AutoCloseable { return executionResult; } + public ContextImpl getContext() { + return this.context; + } + @Override public void close() { } - public InstanceCommunication.MetricsData getAndResetMetrics() { + public MetricsData getAndResetMetrics() { return context.getAndResetMetrics(); } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java index 7a561a1..73297c7 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java @@ -27,23 +27,27 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.functions.instance.FunctionResultRouter; -public abstract class AbstractOneOuputTopicProducers implements Producers { +public abstract class AbstractOneOuputTopicProducers<T> implements Producers<T> { protected final PulsarClient client; protected final String outputTopic; + protected final Schema<T> schema; AbstractOneOuputTopicProducers(PulsarClient client, - String outputTopic) + String outputTopic, + Schema<T> schema) throws PulsarClientException { this.client = client; this.outputTopic = outputTopic; + this.schema = schema; } - static ProducerBuilder<byte[]> newProducerBuilder(PulsarClient client) { + static <U> ProducerBuilder<U> newProducerBuilder(PulsarClient client, Schema<U> schema) { // use function result router to deal with different processing guarantees. - return client.newProducer() // + return client.newProducer(schema) // .blockIfQueueFull(true) // .enableBatching(true) // .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) // @@ -53,23 +57,23 @@ public abstract class AbstractOneOuputTopicProducers implements Producers { .messageRouter(FunctionResultRouter.of()); } - protected Producer<byte[]> createProducer(String topic) + protected Producer<T> createProducer(String topic, Schema<T> schema) throws PulsarClientException { - return createProducer(client, topic); + return createProducer(client, topic, schema); } - public static Producer<byte[]> createProducer(PulsarClient client, String topic) + public static <T> Producer<T> createProducer(PulsarClient client, String topic, Schema<T> schema) throws PulsarClientException { - return newProducerBuilder(client).topic(topic).create(); + return newProducerBuilder(client, schema).topic(topic).create(); } - protected Producer<byte[]> createProducer(String topic, String producerName) + protected Producer<T> createProducer(String topic, String producerName, Schema<T> schema) throws PulsarClientException { - return createProducer(client, topic, producerName); + return createProducer(client, topic, producerName, schema); } - public static Producer<byte[]> createProducer(PulsarClient client, String topic, String producerName) + public static <T> Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema) throws PulsarClientException { - return newProducerBuilder(client).topic(topic).producerName(producerName).create(); + return newProducerBuilder(client, schema).topic(topic).producerName(producerName).create(); } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java index 12a639e..48a8b26 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java @@ -19,7 +19,6 @@ package org.apache.pulsar.functions.instance.producers; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -31,19 +30,21 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; @Slf4j -public class MultiConsumersOneOuputTopicProducers extends AbstractOneOuputTopicProducers { +public class MultiConsumersOneOuputTopicProducers<T> extends AbstractOneOuputTopicProducers<T> { @Getter(AccessLevel.PACKAGE) // PartitionId -> producer - private final Map<String, Producer<byte[]>> producers; + private final Map<String, Producer<T>> producers; public MultiConsumersOneOuputTopicProducers(PulsarClient client, - String outputTopic) + String outputTopic, + Schema<T> schema) throws PulsarClientException { - super(client, outputTopic); + super(client, outputTopic, schema); this.producers = new ConcurrentHashMap<>(); } @@ -57,10 +58,10 @@ public class MultiConsumersOneOuputTopicProducers extends AbstractOneOuputTopicP } @Override - public synchronized Producer<byte[]> getProducer(String srcPartitionId) throws PulsarClientException { - Producer<byte[]> producer = producers.get(srcPartitionId); + public synchronized Producer<T> getProducer(String srcPartitionId) throws PulsarClientException { + Producer<T> producer = producers.get(srcPartitionId); if (null == producer) { - producer = createProducer(outputTopic, srcPartitionId); + producer = createProducer(outputTopic, srcPartitionId, schema); producers.put(srcPartitionId, producer); } return producer; @@ -68,7 +69,7 @@ public class MultiConsumersOneOuputTopicProducers extends AbstractOneOuputTopicP @Override public synchronized void closeProducer(String srcPartitionId) { - Producer<byte[]> producer = producers.get(srcPartitionId); + Producer<T> producer = producers.get(srcPartitionId); if (null != producer) { producer.closeAsync(); producers.remove(srcPartitionId); @@ -78,7 +79,7 @@ public class MultiConsumersOneOuputTopicProducers extends AbstractOneOuputTopicP @Override public synchronized void close() { List<CompletableFuture<Void>> closeFutures = new ArrayList<>(producers.size()); - for (Producer<byte[]> producer: producers.values()) { + for (Producer<T> producer: producers.values()) { closeFutures.add(producer.closeAsync()); } try { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java index 4d026ee..7892876 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java @@ -24,7 +24,7 @@ import org.apache.pulsar.client.api.PulsarClientException; /** * An interface for managing publishers within a java instance. */ -public interface Producers extends AutoCloseable { +public interface Producers<T> extends AutoCloseable { /** * Initialize all the producers. @@ -40,7 +40,7 @@ public interface Producers extends AutoCloseable { * src partition Id * @return the producer instance to produce messages */ - Producer<byte[]> getProducer(String srcPartitionId) throws PulsarClientException; + Producer<T> getProducer(String srcPartitionId) throws PulsarClientException; /** * Close a producer specified by <tt>srcPartitionId</tt>. diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index 0356ab1..60a1589 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -54,28 +54,28 @@ public class PulsarSink<T> implements Sink<T> { private PulsarSinkProcessor pulsarSinkProcessor; - private interface PulsarSinkProcessor { + private interface PulsarSinkProcessor<T> { void initializeOutputProducer(String outputTopic) throws Exception; - void sendOutputMessage(MessageBuilder outputMsgBuilder, + void sendOutputMessage(MessageBuilder<T> outputMsgBuilder, RecordContext recordContext) throws Exception; void close() throws Exception; } - private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor { - private Producer<byte[]> producer; + private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor<T> { + private Producer<T> producer; @Override public void initializeOutputProducer(String outputTopic) throws Exception { this.producer = AbstractOneOuputTopicProducers.createProducer( - client, pulsarSinkConfig.getTopic()); + client, pulsarSinkConfig.getTopic(), outputSerDe); } @Override - public void sendOutputMessage(MessageBuilder outputMsgBuilder, + public void sendOutputMessage(MessageBuilder<T> outputMsgBuilder, RecordContext recordContext) throws Exception { - Message<byte[]> outputMsg = outputMsgBuilder.build(); + Message<T> outputMsg = outputMsgBuilder.build(); this.producer.sendAsync(outputMsg); } @@ -91,19 +91,19 @@ public class PulsarSink<T> implements Sink<T> { } } - private class PulsarSinkAtLeastOnceProcessor implements PulsarSinkProcessor { - private Producer<byte[]> producer; + private class PulsarSinkAtLeastOnceProcessor implements PulsarSinkProcessor<T> { + private Producer<T> producer; @Override public void initializeOutputProducer(String outputTopic) throws Exception { this.producer = AbstractOneOuputTopicProducers.createProducer( - client, pulsarSinkConfig.getTopic()); + client, pulsarSinkConfig.getTopic(), outputSerDe); } @Override - public void sendOutputMessage(MessageBuilder outputMsgBuilder, + public void sendOutputMessage(MessageBuilder<T> outputMsgBuilder, RecordContext recordContext) throws Exception { - Message<byte[]> outputMsg = outputMsgBuilder.build(); + Message<T> outputMsg = outputMsgBuilder.build(); this.producer.sendAsync(outputMsg).thenAccept(messageId -> recordContext.ack()); } @@ -119,19 +119,19 @@ public class PulsarSink<T> implements Sink<T> { } } - private class PulsarSinkEffectivelyOnceProcessor implements PulsarSinkProcessor, ConsumerEventListener { + private class PulsarSinkEffectivelyOnceProcessor implements PulsarSinkProcessor<T>, ConsumerEventListener { @Getter(AccessLevel.PACKAGE) - protected Producers outputProducer; + protected Producers<T> outputProducer; @Override public void initializeOutputProducer(String outputTopic) throws Exception { - outputProducer = new MultiConsumersOneOuputTopicProducers(client, outputTopic); + outputProducer = new MultiConsumersOneOuputTopicProducers(client, outputTopic, outputSerDe); outputProducer.initialize(); } @Override - public void sendOutputMessage(MessageBuilder outputMsgBuilder, RecordContext recordContext) + public void sendOutputMessage(MessageBuilder<T> outputMsgBuilder, RecordContext recordContext) throws Exception { // assign sequence id to output message for idempotent producing @@ -139,9 +139,9 @@ public class PulsarSink<T> implements Sink<T> { .setSequenceId(recordContext.getRecordSequence()); // currently on PulsarRecord - Producer producer = outputProducer.getProducer(recordContext.getPartitionId()); + Producer<T> producer = outputProducer.getProducer(recordContext.getPartitionId()); - org.apache.pulsar.client.api.Message outputMsg = outputMsgBuilder.build(); + org.apache.pulsar.client.api.Message<T> outputMsg = outputMsgBuilder.build(); producer.sendAsync(outputMsg) .thenAccept(messageId -> recordContext.ack()) .join(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index 74f2366..c1dddb0 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -19,40 +19,41 @@ package org.apache.pulsar.functions.source; import com.google.common.annotations.VisibleForTesting; -import lombok.Getter; +import com.google.common.collect.Maps; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import net.jodah.typetools.TypeResolver; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageListener; import static org.apache.commons.lang3.StringUtils.isNotBlank; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.client.impl.TopicMessageImpl; +import org.apache.pulsar.io.core.PushSource; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.instance.InstanceUtils; import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.functions.utils.Utils; -import org.apache.pulsar.io.core.Record; -import org.apache.pulsar.io.core.Source; import org.jboss.util.Classes; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - @Slf4j -public class PulsarSource<T> implements Source<T> { +public class PulsarSource<T> extends PushSource<T> implements MessageListener<T> { private PulsarClient pulsarClient; private PulsarSourceConfig pulsarSourceConfig; - private Map<String, SerDe> topicToSerDeMap = new HashMap<>(); private boolean isTopicsPattern; + private Map<String, SerDe<T>> topicToSerDeMap = new HashMap<>(); - @Getter - private org.apache.pulsar.client.api.Consumer inputConsumer; + private Map<String, org.apache.pulsar.client.api.Consumer<T>> inputConsumers; public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig) { this.pulsarClient = pulsarClient; @@ -64,28 +65,31 @@ public class PulsarSource<T> implements Source<T> { // Setup Serialization/Deserialization setupSerDe(); - // Setup pulsar consumer - ConsumerBuilder<byte[]> consumerBuilder = this.pulsarClient.newConsumer() + inputConsumers = Maps.newHashMap(); + for (Map.Entry<String, SerDe<T>> entry : topicToSerDeMap.entrySet()) { + ConsumerBuilder<T> consumerBuilder = this.pulsarClient.newConsumer(entry.getValue()) .subscriptionName(this.pulsarSourceConfig.getSubscriptionName()) - .subscriptionType(this.pulsarSourceConfig.getSubscriptionType()); + .subscriptionType(this.pulsarSourceConfig.getSubscriptionType()) + .messageListener(this); - if(isNotBlank(this.pulsarSourceConfig.getTopicsPattern())) { - consumerBuilder.topicsPattern(this.pulsarSourceConfig.getTopicsPattern()); - isTopicsPattern = true; - }else { - consumerBuilder.topics(new ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet())); - } - - if (pulsarSourceConfig.getTimeoutMs() != null) { - consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS); + if (pulsarSourceConfig.getTimeoutMs() != null) { + consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS); + } + + if(isNotBlank(this.pulsarSourceConfig.getTopicsPattern())) { + consumerBuilder.topicsPattern(this.pulsarSourceConfig.getTopicsPattern()); + isTopicsPattern = true; + }else { + consumerBuilder.topics(new ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet())); + } + + inputConsumers.put(entry.getKey(),consumerBuilder.subscribe()); } - this.inputConsumer = consumerBuilder.subscribe(); + } @Override - public Record<T> read() throws Exception { - org.apache.pulsar.client.api.Message<T> message = this.inputConsumer.receive(); - + public void received(Consumer<T> consumer, Message<T> message) { String topicName; String partitionId; @@ -127,31 +131,44 @@ public class PulsarSource<T> implements Source<T> { } PulsarRecord<T> pulsarMessage = (PulsarRecord<T>) PulsarRecord.builder() - .value(input) - .messageId(message.getMessageId()) - .partitionId(String.format("%s-%s", topicName, partitionId)) - .recordSequence(Utils.getSequenceId(message.getMessageId())) - .topicName(topicName) - .ackFunction(() -> { - if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { - inputConsumer.acknowledgeCumulativeAsync(message); - } else { - inputConsumer.acknowledgeAsync(message); - } - }).failFunction(() -> { - if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { - throw new RuntimeException("Failed to process message: " + message.getMessageId()); - } - }) - .build(); - return pulsarMessage; + .value(input) + .messageId(message.getMessageId()) + .partitionId(String.format("%s-%s", topicName, partitionId)) + .recordSequence(Utils.getSequenceId(message.getMessageId())) + .topicName(topicName) + .ackFunction(() -> { + if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { + consumer.acknowledgeCumulativeAsync(message); + } else { + consumer.acknowledgeAsync(message); + } + }).failFunction(() -> { + if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { + throw new RuntimeException("Failed to process message: " + message.getMessageId()); + } + }) + .build(); + + consume(pulsarMessage); + } + + public Consumer<T> getConsumerForTopic(String topic) { + return inputConsumers.get(topic); + } + + @Override + public void reachedEndOfTopic(Consumer<T> consumer) { + //No-op } @Override public void close() throws Exception { - if (this.inputConsumer != null) { - this.inputConsumer.close(); - } + inputConsumers.forEach((ignored, consumer) -> { + try { + consumer.close(); + } catch (PulsarClientException e) { + } + }); } @VisibleForTesting diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java index b2f2c4f..81cbbda 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java @@ -19,6 +19,8 @@ package org.apache.pulsar.functions.instance.producers; import static org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers.makeProducerName; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -43,6 +45,7 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.ProducerCryptoFailureAction; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -191,10 +194,10 @@ public class MultiConsumersOneOutputTopicProducersTest { public void setup() throws Exception { this.mockClient = mock(PulsarClient.class); - when(mockClient.newProducer()) + when(mockClient.newProducer(any(Schema.class))) .thenReturn(new MockProducerBuilder()); - producers = new MultiConsumersOneOuputTopicProducers(mockClient, TEST_OUTPUT_TOPIC); + producers = new MultiConsumersOneOuputTopicProducers(mockClient, TEST_OUTPUT_TOPIC, Schema.BYTES); producers.initialize(); } @@ -219,13 +222,13 @@ public class MultiConsumersOneOutputTopicProducersTest { assertSame(mockProducers.get(producerName), producer); verify(mockClient, times(1)) - .newProducer(); + .newProducer(Schema.BYTES); assertTrue(producers.getProducers().containsKey(producerName)); // second get will not create a new producer assertSame(mockProducers.get(producerName), producer); verify(mockClient, times(1)) - .newProducer(); + .newProducer(Schema.BYTES); assertTrue(producers.getProducers().containsKey(producerName)); // close diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java index 3c5e61b..8ca94bb 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java @@ -34,9 +34,11 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import static java.util.Collections.emptyMap; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyList; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -78,9 +80,10 @@ public class PulsarSourceTest { doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(anyString()); doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(any()); doReturn(consumerBuilder).when(consumerBuilder).ackTimeout(anyLong(), any()); + doReturn(consumerBuilder).when(consumerBuilder).messageListener(anyObject()); Consumer consumer = mock(Consumer.class); doReturn(consumer).when(consumerBuilder).subscribe(); - doReturn(consumerBuilder).when(pulsarClient).newConsumer(); + doReturn(consumerBuilder).when(pulsarClient).newConsumer(anyObject()); return pulsarClient; } @@ -168,7 +171,7 @@ public class PulsarSourceTest { pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap); PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); - pulsarSource.open(new HashMap<>()); + pulsarSource.open(emptyMap()); } /**