This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 22e45e7 Refactor JavaInstanceRunnable (#1533) 22e45e7 is described below commit 22e45e70c593118af849a17cad2c2de433344625 Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Tue Apr 10 20:14:51 2018 -0700 Refactor JavaInstanceRunnable (#1533) *Motivation* EffectivelyOnce processing made `JavaInstanceRunnable` very complicated. There are tons of braching logic. It makes code hard to maintain. *Solution* Abstract the processing guarantee related logic into a `MessageProcessor` interface. Implement `at-most-once`, `at-least-once` and `effectively-once` processors. *Result* After this change, `JavaInstanceRunnable` is much cleaner. `At-Least-Once` and `At-Most-Once` logic are much simpler and easier to debug. --- .../pulsar/functions/instance/InputMessage.java | 56 ++++ .../functions/instance/JavaInstanceRunnable.java | 342 +++------------------ .../instance/processors/AtLeastOnceProcessor.java | 78 +++++ .../instance/processors/AtMostOnceProcessor.java | 82 +++++ .../processors/EffectivelyOnceProcessor.java | 273 ++++++++++++++++ .../instance/processors/MessageProcessor.java | 145 +++++++++ .../instance/processors/MessageProcessorBase.java | 167 ++++++++++ .../producers/AbstractOneOuputTopicProducers.java | 14 +- .../producers/SimpleOneOuputTopicProducers.java | 63 ---- .../instance/JavaInstanceRunnableProcessTest.java | 16 +- .../SimpleOneOutputTopicProducersTest.java | 94 ------ 11 files changed, 856 insertions(+), 474 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java new file mode 100644 index 0000000..bd7e788 --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.functions.api.SerDe; + +@Getter +@Setter +@ToString +public class InputMessage { + + private Message actualMessage; + String topicName; + SerDe inputSerDe; + Consumer consumer; + + public int getTopicPartition() { + MessageIdImpl msgId = (MessageIdImpl) actualMessage.getMessageId(); + return msgId.getPartitionIndex(); + } + + public void ack() { + if (null != consumer) { + consumer.acknowledgeAsync(actualMessage); + } + } + + public void ackCumulative() { + if (null != consumer) { + consumer.acknowledgeCumulativeAsync(actualMessage); + } + } + +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 510608f..008d5a9 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -22,17 +22,17 @@ package org.apache.pulsar.functions.instance; import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF; -import com.google.common.collect.Maps; import io.netty.buffer.ByteBuf; -import java.util.*; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; import lombok.AccessLevel; import lombok.Getter; -import lombok.Setter; -import lombok.ToString; import lombok.extern.slf4j.Slf4j; import net.jodah.typetools.TypeResolver; import org.apache.bookkeeper.api.StorageClient; @@ -48,46 +48,35 @@ import org.apache.logging.log4j.ThreadContext; import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.LoggerConfig; -import org.apache.pulsar.client.api.*; -import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException; -import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.utils.DefaultSerDe; -import org.apache.pulsar.functions.proto.Function.FunctionConfig; -import org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuarantees; +import org.apache.pulsar.functions.instance.processors.MessageProcessor; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager; import org.apache.pulsar.functions.api.SerDe; -import org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers; -import org.apache.pulsar.functions.instance.producers.Producers; -import org.apache.pulsar.functions.instance.producers.SimpleOneOuputTopicProducers; import org.apache.pulsar.functions.instance.state.StateContextImpl; import org.apache.pulsar.functions.utils.FunctionConfigUtils; import org.apache.pulsar.functions.utils.Reflections; -import org.apache.pulsar.functions.utils.Utils; /** * A function container implemented using java thread. */ @Slf4j -public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEventListener { +public class JavaInstanceRunnable implements AutoCloseable, Runnable { // The class loader that used for loading functions private ClassLoader fnClassLoader; private final InstanceConfig instanceConfig; - private final FunctionConfig.ProcessingGuarantees processingGuarantees; private final FunctionCacheManager fnCache; private final LinkedBlockingDeque<InputMessage> queue; private final String jarFile; // input topic consumer & output topic producer private final PulsarClientImpl client; - @Getter(AccessLevel.PACKAGE) - private Producers outputProducer; - @Getter(AccessLevel.PACKAGE) - private final Map<String, Consumer> inputConsumers; - private LinkedList<String> inputTopicsToResubscribe = null; + private LogAppender logAppender; // provide tables for storing states @@ -106,20 +95,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv private Map<String, SerDe> inputSerDe; private SerDe outputSerDe; - @Getter - @Setter - @ToString - private class InputMessage { - private Message actualMessage; - String topicName; - SerDe inputSerDe; - Consumer consumer; - - public int getTopicPartition() { - MessageIdImpl msgId = (MessageIdImpl) actualMessage.getMessageId(); - return msgId.getPartitionIndex(); - } - } + @Getter(AccessLevel.PACKAGE) + // processor + private final MessageProcessor processor; // function stats private final FunctionStats stats; @@ -130,27 +108,16 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv PulsarClient pulsarClient, String stateStorageServiceUrl) { this.instanceConfig = instanceConfig; - this.processingGuarantees = instanceConfig.getFunctionConfig().getProcessingGuarantees(); this.fnCache = fnCache; this.queue = new LinkedBlockingDeque<>(instanceConfig.getMaxBufferedTuples()); this.jarFile = jarFile; this.client = (PulsarClientImpl) pulsarClient; this.stateStorageServiceUrl = stateStorageServiceUrl; this.stats = new FunctionStats(); - this.inputConsumers = Maps.newConcurrentMap(); - } - - private SubscriptionType getSubscriptionType() { - if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) { - return SubscriptionType.Failover; - } else { - if (instanceConfig.getFunctionConfig().getSubscriptionType() == null - || instanceConfig.getFunctionConfig().getSubscriptionType() == FunctionConfig.SubscriptionType.SHARED) { - return SubscriptionType.Shared; - } else { - return SubscriptionType.Failover; - } - } + this.processor = MessageProcessor.create( + client, + instanceConfig.getFunctionConfig(), + queue); } /** @@ -189,13 +156,13 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv // start the state table setupStateTable(); // start the output producer - startOutputProducer(); + processor.setupOutput(outputSerDe); // start the input consumer - startInputConsumer(); + processor.setupInput(inputSerDe); // start any log topic handler setupLogHandler(); - return new JavaInstance(instanceConfig, object, clsLoader, client, inputConsumers); + return new JavaInstance(instanceConfig, object, clsLoader, client, processor.getInputConsumers()); } /** @@ -206,9 +173,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv try { javaInstance = setupJavaInstance(); while (running) { - // some src topics might be put into resubscribe list because of processing failure - // so this is the chance to resubscribe to those topics. - resubscribeTopicsIfNeeded(); + processor.prepareDequeueMessageFromProcessQueue(); JavaExecutionResult result; InputMessage msg; @@ -221,31 +186,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv break; } - if (ProcessingGuarantees.EFFECTIVELY_ONCE == processingGuarantees) { - // if the messages are received from old consumers, we discard it since new consumer was - // re-created for the correctness of effectively-once - if (msg.getConsumer() != inputConsumers.get(msg.getTopicName())) { - continue; - } - } - - if (null != outputProducer) { - // before processing the message, we have a producer connection setup for producing results. - Producer producer = null; - while (null == producer) { - try { - producer = outputProducer.getProducer(msg.getTopicName(), msg.getTopicPartition()); - } catch (PulsarClientException e) { - // `ProducerBusy` is thrown when an producer with same name is still connected. - // This can happen when a active consumer is changed for a given input topic partition - // so we need to wait until the old active consumer release the produce connection. - if (!(e instanceof ProducerBusyException)) { - log.error("Failed to get a producer for producing results computed from input topic {}", - msg.getTopicName()); - } - TimeUnit.MILLISECONDS.sleep(500); - } - } + if (!processor.prepareProcessMessage(msg)) { + // the message can't be processed by the processor at this moment, retry it. + continue; } // state object is per function, because we need to have the ability to know what updates @@ -318,30 +261,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv Thread.currentThread().setContextClassLoader(fnClassLoader); } - @Override - public void becameActive(Consumer consumer, int partitionId) { - // if the instance becomes active for a given topic partition, - // open a producer for the results computed from this topic partition. - if (null != outputProducer) { - try { - this.outputProducer.getProducer(consumer.getTopic(), partitionId); - } catch (PulsarClientException e) { - // this can be ignored, because producer can be lazily created when accessing it. - log.warn("Fail to create a producer for results computed from messages of topic: {}, partition: {}", - consumer.getTopic(), partitionId); - } - } - } - - @Override - public void becameInactive(Consumer consumer, int partitionId) { - if (null != outputProducer) { - // if I lost the ownership of a partition, close its corresponding topic partition. - // this is to allow the new active consumer be able to produce to the result topic. - this.outputProducer.closeProducer(consumer.getTopic(), partitionId); - } - } - private void setupStateTable() throws Exception { if (null == stateStorageServiceUrl) { return; @@ -384,226 +303,48 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv this.stateTable = result(storageClient.openTable(tableName)); } - private void startOutputProducer() throws Exception { - if (instanceConfig.getFunctionConfig().getOutput() != null - && !instanceConfig.getFunctionConfig().getOutput().isEmpty() - && this.outputSerDe != null) { - log.info("Starting producer for output topic " + instanceConfig.getFunctionConfig().getOutput()); - - if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) { - this.outputProducer = new MultiConsumersOneOuputTopicProducers( - client, instanceConfig.getFunctionConfig().getOutput()); - } else { - this.outputProducer = new SimpleOneOuputTopicProducers( - client, instanceConfig.getFunctionConfig().getOutput()); - } - this.outputProducer.initialize(); - } - } - - private void startInputConsumer() throws Exception { - log.info("Consumer map {}", instanceConfig.getFunctionConfig()); - for (Map.Entry<String, String> entry : instanceConfig.getFunctionConfig().getCustomSerdeInputsMap().entrySet()) { - ConsumerConfiguration conf = createConsumerConfiguration(entry.getKey()); - this.inputConsumers.put(entry.getKey(), client.subscribe(entry.getKey(), - FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()), conf)); - } - for (String topicName : instanceConfig.getFunctionConfig().getInputsList()) { - ConsumerConfiguration conf = createConsumerConfiguration(topicName); - this.inputConsumers.put(topicName, client.subscribe(topicName, - FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()), conf)); - } - } - - private ConsumerConfiguration createConsumerConfiguration(String topicName) { - log.info("Starting Consumer for topic " + topicName); - ConsumerConfiguration conf = new ConsumerConfiguration(); - conf.setSubscriptionType(getSubscriptionType()); - - SerDe inputSerde = inputSerDe.get(topicName); - conf.setMessageListener((consumer, msg) -> { - try { - InputMessage message = new InputMessage(); - message.setConsumer(consumer); - message.setInputSerDe(inputSerde); - message.setActualMessage(msg); - message.setTopicName(topicName); - queue.put(message); - if (processingGuarantees == FunctionConfig.ProcessingGuarantees.ATMOST_ONCE) { - if (instanceConfig.getFunctionConfig().getAutoAck()) { - consumer.acknowledgeAsync(msg); - } - } - } catch (InterruptedException e) { - log.error("Function container {} is interrupted on enqueuing messages", - Thread.currentThread().getId(), e); - } - }); - - // for failover subscription, register a consumer event listener to react to active consumer changes. - if (getSubscriptionType() == SubscriptionType.Failover) { - conf.setConsumerEventListener(this); - } - - return conf; - } - private void processResult(InputMessage msg, JavaExecutionResult result, long startTime, long endTime) { if (result.getUserException() != null) { log.info("Encountered user exception when processing message {}", msg, result.getUserException()); stats.incrementUserExceptions(result.getUserException()); - handleProcessException(msg.getTopicName()); + processor.handleProcessException(msg, result.getUserException()); } else if (result.getSystemException() != null) { log.info("Encountered system exception when processing message {}", msg, result.getSystemException()); stats.incrementSystemExceptions(result.getSystemException()); - handleProcessException(msg.getTopicName()); + processor.handleProcessException(msg, result.getSystemException()); } else { stats.incrementSuccessfullyProcessed(endTime - startTime); - if (result.getResult() != null && outputProducer != null) { + if (result.getResult() != null && instanceConfig.getFunctionConfig().getOutput() != null) { byte[] output; try { output = outputSerDe.serialize(result.getResult()); } catch (Exception ex) { stats.incrementSerializationExceptions(); - handleProcessException(msg.getTopicName()); + processor.handleProcessException(msg, ex); return; } if (output != null) { - try { - sendOutputMessage(msg, result, output); - } catch (Throwable t) { - log.error("Encountered error when sending result {} computed from src message {}", - result, msg, t); - handleProcessException(msg.getTopicName()); - } - } else if (processingGuarantees != ProcessingGuarantees.ATMOST_ONCE) { - ackMessage(msg); + sendOutputMessage(msg, output); + } else { + processor.sendOutputMessage(msg, null); } - } else if (processingGuarantees != ProcessingGuarantees.ATMOST_ONCE) { - ackMessage(msg); - } - } - } - - private void ackMessage(InputMessage msg) { - if (instanceConfig.getFunctionConfig().getAutoAck()) { - if (ProcessingGuarantees.EFFECTIVELY_ONCE == processingGuarantees) { - msg.getConsumer().acknowledgeCumulativeAsync(msg.getActualMessage()); } else { - msg.getConsumer().acknowledgeAsync(msg.getActualMessage()); + // the function doesn't produce any result or the user doesn't want the result. + processor.sendOutputMessage(msg, null); } } } private void sendOutputMessage(InputMessage srcMsg, - JavaExecutionResult result, byte[] output) { MessageBuilder msgBuilder = MessageBuilder.create() .setContent(output) .setProperty("__pfn_input_topic__", srcMsg.getTopicName()) .setProperty("__pfn_input_msg_id__", new String(Base64.getEncoder().encode(srcMsg.getActualMessage().getMessageId().toByteArray()))); - if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) { - msgBuilder = msgBuilder - .setSequenceId(Utils.getSequenceId(srcMsg.getActualMessage().getMessageId())); - } - Producer producer; - try { - producer = outputProducer.getProducer(srcMsg.getTopicName(), srcMsg.getTopicPartition()); - } catch (PulsarClientException e) { - log.error("Failed to get a producer for producing results computed from input topic {}", - srcMsg.getTopicName()); - - // if we fail to get a producer, put this message back to queue and reprocess it. - queue.offerFirst(srcMsg); - return; - } - - Message destMsg = msgBuilder.build(); - producer.sendAsync(destMsg) - .thenAccept(messageId -> { - if (processingGuarantees != ProcessingGuarantees.ATMOST_ONCE) { - ackMessage(srcMsg); - } - }) - .exceptionally(cause -> { - log.error("Failed to send the process result {} of message {} to output topic {}", - result, srcMsg, instanceConfig.getFunctionConfig().getOutput(), cause); - handleProcessException(srcMsg.getTopicName()); - return null; - }); - } - private void handleProcessException(String srcTopic) { - // if the src message is coming from a shared subscription, - // we don't need any special logic on handling failures, just don't ack. - // the message will be redelivered to other consumer. - // - // BUT, if the src message is coming from a failover subscription, - // we need to stop processing messages and recreate consumer to reprocess - // the message. otherwise we might break the correctness of effectively-once - if (getSubscriptionType() != SubscriptionType.Shared) { - // in this case (effectively-once), we need to close the consumer - // release the partition and open the consumer again. so we guarantee - // that we always process messages in order - // - // but this is in pulsar's callback, so add this to a retry list. so we can - // retry on java instance's main thread. - addTopicToResubscribeList(srcTopic); - } - } - - private synchronized void addTopicToResubscribeList(String topicName) { - if (null == inputTopicsToResubscribe) { - inputTopicsToResubscribe = new LinkedList<>(); - } - inputTopicsToResubscribe.add(topicName); - } - - private void resubscribeTopicsIfNeeded() { - List<String> topicsToResubscribe; - synchronized (this) { - topicsToResubscribe = inputTopicsToResubscribe; - inputTopicsToResubscribe = null; - } - if (null != topicsToResubscribe) { - for (String topic : topicsToResubscribe) { - resubscribe(topic); - } - } - } - - private void resubscribe(String srcTopic) { - // if we can not produce a message to output topic, then close the consumer of the src topic - // and retry to instantiate a consumer again. - Consumer consumer = inputConsumers.remove(srcTopic); - if (consumer != null) { - // TODO (sijie): currently we have to close the entire consumer for a given topic. However - // ideally we should do this in a finer granularity - we can close consumer - // on a given partition, without impact other partitions. - try { - consumer.close(); - } catch (PulsarClientException e) { - log.error("Failed to close consumer for input topic {} when handling produce exceptions", - srcTopic, e); - } - } - // subscribe to the src topic again - ConsumerConfiguration conf = createConsumerConfiguration(srcTopic); - try { - inputConsumers.put( - srcTopic, - client.subscribe( - srcTopic, - FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()), - conf - )); - } catch (PulsarClientException e) { - log.error("Failed to resubscribe to input topic {}. Added it to retry list and retry it later", - srcTopic, e); - addTopicToResubscribeList(srcTopic); - } + processor.sendOutputMessage(srcMsg, msgBuilder); } @Override @@ -612,21 +353,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv return; } running = false; - // stop the consumer first, so no more messages are coming in - inputConsumers.forEach((k, v) -> { - try { - v.close(); - } catch (PulsarClientException e) { - log.warn("Failed to close consumer to input topic {}", k, e); - } - }); - inputConsumers.clear(); - // kill the result producer - if (null != outputProducer) { - outputProducer.close(); - outputProducer = null; - } + processor.close(); // kill the state table if (null != stateTable) { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java new file mode 100644 index 0000000..740fd5c --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance.processors; + +import java.util.concurrent.LinkedBlockingDeque; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.functions.instance.InputMessage; +import org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers; +import org.apache.pulsar.functions.proto.Function.FunctionConfig; + +/** + * A message processor that process messages at-most-once. + */ +@Slf4j +public class AtLeastOnceProcessor extends MessageProcessorBase { + + @Getter + private Producer producer; + + AtLeastOnceProcessor(PulsarClient client, + FunctionConfig functionConfig, + SubscriptionType subType, + LinkedBlockingDeque<InputMessage> processQueue) { + super(client, functionConfig, subType, processQueue); + } + + @Override + protected void initializeOutputProducer(String outputTopic) throws Exception { + producer = AbstractOneOuputTopicProducers.createProducer(client, outputTopic); + } + + @Override + public void sendOutputMessage(InputMessage inputMsg, MessageBuilder outputMsgBuilder) { + if (null == outputMsgBuilder) { + inputMsg.ack(); + return; + } + + Message outputMsg = outputMsgBuilder.build(); + producer.sendAsync(outputMsg) + .thenAccept(msgId -> inputMsg.ack()); + } + + @Override + public void close() { + super.close(); + if (null != producer) { + try { + producer.close(); + } catch (PulsarClientException e) { + log.warn("Fail to close producer for processor {}", functionConfig.getOutput(), e); + } + } + } +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java new file mode 100644 index 0000000..1713b73 --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance.processors; + +import java.util.concurrent.LinkedBlockingDeque; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.functions.instance.InputMessage; +import org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers; +import org.apache.pulsar.functions.proto.Function.FunctionConfig; + +/** + * A message processor that process messages at-most-once. + */ +@Slf4j +class AtMostOnceProcessor extends MessageProcessorBase { + + private Producer producer; + + AtMostOnceProcessor(PulsarClient client, + FunctionConfig functionConfig, + SubscriptionType subType, + LinkedBlockingDeque<InputMessage> processQueue) { + super(client, functionConfig, subType, processQueue); + } + + @Override + protected void postReceiveMessage(InputMessage message) { + super.postReceiveMessage(message); + if (functionConfig.getAutoAck()) { + message.ack(); + } + } + + @Override + protected void initializeOutputProducer(String outputTopic) throws Exception { + producer = AbstractOneOuputTopicProducers.createProducer(client, outputTopic); + } + + @Override + public void sendOutputMessage(InputMessage inputMsg, MessageBuilder outputMsgBuilder) { + if (null == outputMsgBuilder) { + return; + } + + Message outputMsg = outputMsgBuilder.build(); + producer.sendAsync(outputMsg); + } + + @Override + public void close() { + super.close(); + if (null != producer) { + try { + producer.close(); + } catch (PulsarClientException e) { + log.warn("Fail to close producer for processor {}", functionConfig.getOutput(), e); + } + } + } +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java new file mode 100644 index 0000000..f039cdf --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance.processors; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.ConsumerEventListener; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.functions.instance.InputMessage; +import org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers; +import org.apache.pulsar.functions.instance.producers.Producers; +import org.apache.pulsar.functions.proto.Function.FunctionConfig; +import org.apache.pulsar.functions.utils.FunctionConfigUtils; +import org.apache.pulsar.functions.utils.Utils; + +/** + * A message processor that process messages effectively-once. + */ +@Slf4j +class EffectivelyOnceProcessor extends MessageProcessorBase implements ConsumerEventListener { + + private LinkedList<String> inputTopicsToResubscribe = null; + + @Getter(AccessLevel.PACKAGE) + protected Producers outputProducer; + + EffectivelyOnceProcessor(PulsarClient client, + FunctionConfig functionConfig, + LinkedBlockingDeque<InputMessage> processQueue) { + super(client, functionConfig, SubscriptionType.Failover, processQueue); + } + + /** + * An effectively-once processor can only use `Failover` subscription. + */ + @Override + protected SubscriptionType getSubscriptionType() { + return SubscriptionType.Failover; + } + + @Override + protected ConsumerConfiguration createConsumerConfiguration(String topicName) { + ConsumerConfiguration conf = super.createConsumerConfiguration(topicName); + // for effectively-once processor, register a consumer event listener to react to active consumer changes. + conf.setConsumerEventListener(this); + return conf; + } + + @Override + public void becameActive(Consumer<?> consumer, int partitionId) { + // if the instance becomes active for a given topic partition, + // open a producer for the results computed from this topic partition. + if (null != outputProducer) { + try { + this.outputProducer.getProducer(consumer.getTopic(), partitionId); + } catch (PulsarClientException e) { + // this can be ignored, because producer can be lazily created when accessing it. + log.warn("Fail to create a producer for results computed from messages of topic: {}, partition: {}", + consumer.getTopic(), partitionId); + } + } + } + + @Override + public void becameInactive(Consumer<?> consumer, int partitionId) { + if (null != outputProducer) { + // if I lost the ownership of a partition, close its corresponding topic partition. + // this is to allow the new active consumer be able to produce to the result topic. + this.outputProducer.closeProducer(consumer.getTopic(), partitionId); + } + } + + @Override + protected void initializeOutputProducer(String outputTopic) throws Exception { + outputProducer = new MultiConsumersOneOuputTopicProducers(client, outputTopic); + outputProducer.initialize(); + } + + // + // Methods to process messages + // + + @Override + public void prepareDequeueMessageFromProcessQueue() { + super.prepareDequeueMessageFromProcessQueue(); + // some src topics might be put into resubscribe list because of processing failure + // so this is the chance to resubscribe to those topics. + resubscribeTopicsIfNeeded(); + } + + @Override + public boolean prepareProcessMessage(InputMessage msg) throws InterruptedException { + boolean prepared = super.prepareProcessMessage(msg); + if (prepared) { + // if the messages are received from old consumers, we discard it since new consumer was + // re-created for the correctness of effectively-once + if (msg.getConsumer() != inputConsumers.get(msg.getTopicName())) { + return false; + } + + if (null != outputProducer) { + // before processing the message, we have a producer connection setup for producing results. + Producer producer = null; + while (null == producer) { + try { + producer = outputProducer.getProducer(msg.getTopicName(), msg.getTopicPartition()); + } catch (PulsarClientException e) { + // `ProducerBusy` is thrown when an producer with same name is still connected. + // This can happen when a active consumer is changed for a given input topic partition + // so we need to wait until the old active consumer release the produce connection. + if (!(e instanceof ProducerBusyException)) { + log.error("Failed to get a producer for producing results computed from input topic {}", + msg.getTopicName()); + } + TimeUnit.MILLISECONDS.sleep(500); + } + } + } + return true; + } else { + return false; + } + } + + @Override + public void sendOutputMessage(InputMessage inputMsg, + MessageBuilder outputMsgBuilder) { + if (null == outputMsgBuilder) { + inputMsg.ackCumulative(); + return; + } + + // assign sequence id to output message for idempotent producing + outputMsgBuilder = outputMsgBuilder + .setSequenceId(Utils.getSequenceId(inputMsg.getActualMessage().getMessageId())); + + Producer producer; + try { + producer = outputProducer.getProducer(inputMsg.getTopicName(), inputMsg.getTopicPartition()); + } catch (PulsarClientException e) { + log.error("Failed to get a producer for producing results computed from input topic {}", + inputMsg.getTopicName()); + + // if we fail to get a producer, put this message back to queue and reprocess it. + processQueue.offerFirst(inputMsg); + return; + } + + Message outputMsg = outputMsgBuilder.build(); + producer.sendAsync(outputMsg) + .thenAccept(messageId -> inputMsg.ackCumulative()) + .exceptionally(cause -> { + log.error("Failed to send the process result {} of message {} to output topic {}", + outputMsg, inputMsg, functionConfig.getOutput(), cause); + handleProcessException(inputMsg.getTopicName()); + return null; + }); + } + + @Override + public void handleProcessException(InputMessage msg, Exception cause) { + handleProcessException(msg.getTopicName()); + } + + private void handleProcessException(String srcTopic) { + // if the src message is coming from a shared subscription, + // we don't need any special logic on handling failures, just don't ack. + // the message will be redelivered to other consumer. + // + // BUT, if the src message is coming from a failover subscription, + // we need to stop processing messages and recreate consumer to reprocess + // the message. otherwise we might break the correctness of effectively-once + // + // in this case (effectively-once), we need to close the consumer + // release the partition and open the consumer again. so we guarantee + // that we always process messages in order + // + // but this is in pulsar's callback, so add this to a retry list. so we can + // retry on java instance's main thread. + addTopicToResubscribeList(srcTopic); + } + + private synchronized void addTopicToResubscribeList(String topicName) { + if (null == inputTopicsToResubscribe) { + inputTopicsToResubscribe = new LinkedList<>(); + } + inputTopicsToResubscribe.add(topicName); + } + + private void resubscribeTopicsIfNeeded() { + List<String> topicsToResubscribe; + synchronized (this) { + topicsToResubscribe = inputTopicsToResubscribe; + inputTopicsToResubscribe = null; + } + if (null != topicsToResubscribe) { + for (String topic : topicsToResubscribe) { + resubscribe(topic); + } + } + } + + private void resubscribe(String srcTopic) { + // if we can not produce a message to output topic, then close the consumer of the src topic + // and retry to instantiate a consumer again. + Consumer consumer = inputConsumers.remove(srcTopic); + if (consumer != null) { + // TODO (sijie): currently we have to close the entire consumer for a given topic. However + // ideally we should do this in a finer granularity - we can close consumer + // on a given partition, without impact other partitions. + try { + consumer.close(); + } catch (PulsarClientException e) { + log.error("Failed to close consumer for input topic {} when handling produce exceptions", + srcTopic, e); + } + } + // subscribe to the src topic again + ConsumerConfiguration conf = createConsumerConfiguration(srcTopic); + try { + inputConsumers.put( + srcTopic, + client.subscribe( + srcTopic, + FunctionConfigUtils.getFullyQualifiedName(functionConfig), + conf + )); + } catch (PulsarClientException e) { + log.error("Failed to resubscribe to input topic {}. Added it to retry list and retry it later", + srcTopic, e); + addTopicToResubscribeList(srcTopic); + } + } + + @Override + public void close() { + super.close(); + // kill the result producer + if (null != outputProducer) { + outputProducer.close(); + outputProducer = null; + } + } +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java new file mode 100644 index 0000000..b475e9f --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance.processors; + +import java.util.Map; +import java.util.concurrent.LinkedBlockingDeque; +import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.functions.api.SerDe; +import org.apache.pulsar.functions.instance.InputMessage; +import org.apache.pulsar.functions.proto.Function.FunctionConfig; +import org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuarantees; + +/** + * A processor that processes messages, used by {@link org.apache.pulsar.functions.instance.JavaInstanceRunnable}. + */ +@Evolving +public interface MessageProcessor extends AutoCloseable { + + static MessageProcessor create(PulsarClient client, + FunctionConfig functionConfig, + LinkedBlockingDeque<InputMessage> processQeueue) { + FunctionConfig.SubscriptionType fnSubType = functionConfig.getSubscriptionType(); + ProcessingGuarantees processingGuarantees = functionConfig.getProcessingGuarantees(); + SubscriptionType subType; + if (null == fnSubType || FunctionConfig.SubscriptionType.SHARED == fnSubType) { + subType = SubscriptionType.Shared; + } else { + subType = SubscriptionType.Failover; + } + + if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) { + return new EffectivelyOnceProcessor( + client, + functionConfig, + processQeueue); + } else if (processingGuarantees == ProcessingGuarantees.ATMOST_ONCE) { + return new AtMostOnceProcessor( + client, + functionConfig, + subType, + processQeueue); + } else { + return new AtLeastOnceProcessor( + client, + functionConfig, + subType, + processQeueue); + } + } + + /** + * Setup the input with a provided <i>processQueue</i>. The implementation of this processor is responsible for + * setting up the input and passing the received messages from input to the provided <i>processQueue</i>. + * + * @param inputSerDe SerDe to deserialize messages from input. + */ + void setupInput(Map<String, SerDe> inputSerDe) + throws Exception; + + /** + * Return the map of input consumers. + * + * @return the map of input consumers. + */ + Map<String, Consumer> getInputConsumers(); + + /** + * Setup the output with a provided <i>outputSerDe</i>. The implementation of this processor is responsible for + * setting up the output + * + * @param outputSerDe output serde. + * @throws Exception + */ + void setupOutput(SerDe outputSerDe) throws Exception; + + + // + // Methods that called on processing messages + // + + /** + * Method that is called before taking a message from process queue to process. + * + * <p>The implementation can do actions like failure recovery before actually taking messages out of process queue to + * process. + */ + void prepareDequeueMessageFromProcessQueue(); + + /** + * Method that is called before processing the input message <i>msg</i>. + * + * <p>The implementation can do actions like ensuring producer is ready for producing results. + * + * @param msg input message. + * @return true if the msg can be process, otherwise false. If a processor can't process this message at this moment, + * this message will be put back to the process queue and process later. + * @throws InterruptedException when the processor is interrupted on preparing processing message. + */ + boolean prepareProcessMessage(InputMessage msg) throws InterruptedException; + + /** + * Send the output message to the output topic. The output message is computed from <i>inputMsg</i>. + * + * <p>If the <i>outputMsgBuilder</i> is null, the implementation doesn't have to send any messages to the output. + * The implementation can decide to acknowledge the input message based on its process guarantees. + * + * @param inputMsg input message + * @param outputMsgBuilder output message builder. it can be null. + */ + void sendOutputMessage(InputMessage inputMsg, + MessageBuilder outputMsgBuilder); + + /** + * Handle the process exception when processing input message <i>inputMsg</i>. + * + * @param inputMsg input message + * @param exception exception thrown when processing input message. + */ + void handleProcessException(InputMessage inputMsg, Exception exception); + + + @Override + void close(); + +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java new file mode 100644 index 0000000..ddb5f79 --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance.processors; + +import com.google.common.collect.Maps; +import java.util.Map; +import java.util.concurrent.LinkedBlockingDeque; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.functions.api.SerDe; +import org.apache.pulsar.functions.instance.InputMessage; +import org.apache.pulsar.functions.proto.Function.FunctionConfig; +import org.apache.pulsar.functions.utils.FunctionConfigUtils; + +/** + * The base implementation of {@link MessageProcessor}. + */ +@Slf4j +abstract class MessageProcessorBase implements MessageProcessor { + + protected final PulsarClient client; + protected final FunctionConfig functionConfig; + protected final SubscriptionType subType; + protected final LinkedBlockingDeque<InputMessage> processQueue; + + @Getter + protected final Map<String, Consumer> inputConsumers; + protected Map<String, SerDe> inputSerDe; + + protected SerDe outputSerDe; + + protected MessageProcessorBase(PulsarClient client, + FunctionConfig functionConfig, + SubscriptionType subType, + LinkedBlockingDeque<InputMessage> processQueue) { + this.client = client; + this.functionConfig = functionConfig; + this.subType = subType; + this.processQueue = processQueue; + this.inputConsumers = Maps.newConcurrentMap(); + } + + // + // Input + // + + @Override + public void setupInput(Map<String, SerDe> inputSerDe) throws Exception { + log.info("Setting up input with input serdes: {}", inputSerDe); + this.inputSerDe = inputSerDe; + for (Map.Entry<String, String> entry : functionConfig.getCustomSerdeInputsMap().entrySet()) { + ConsumerConfiguration conf = createConsumerConfiguration(entry.getKey()); + this.inputConsumers.put(entry.getKey(), client.subscribe(entry.getKey(), + FunctionConfigUtils.getFullyQualifiedName(functionConfig), conf)); + } + for (String topicName : functionConfig.getInputsList()) { + ConsumerConfiguration conf = createConsumerConfiguration(topicName); + this.inputConsumers.put(topicName, client.subscribe(topicName, + FunctionConfigUtils.getFullyQualifiedName(functionConfig), conf)); + } + } + + protected SubscriptionType getSubscriptionType() { + return subType; + } + + protected ConsumerConfiguration createConsumerConfiguration(String topicName) { + log.info("Starting Consumer for topic " + topicName); + ConsumerConfiguration conf = new ConsumerConfiguration(); + conf.setSubscriptionType(getSubscriptionType()); + + SerDe inputSerde = inputSerDe.get(topicName); + conf.setMessageListener((consumer, msg) -> { + try { + InputMessage message = new InputMessage(); + message.setConsumer(consumer); + message.setInputSerDe(inputSerde); + message.setActualMessage(msg); + message.setTopicName(topicName); + processQueue.put(message); + postReceiveMessage(message); + } catch (InterruptedException e) { + log.error("Function container {} is interrupted on enqueuing messages", + Thread.currentThread().getId(), e); + } + }); + return conf; + } + + /** + * Method called when a message is received from input after being put into the process queue. + * + * <p>The processor implementation can make a decision to process the message based on its processing guarantees. + * for example, an at-most-once processor can ack the message immediately. + * + * @param message input message. + */ + protected void postReceiveMessage(InputMessage message) {} + + // + // Output + // + + @Override + public void setupOutput(SerDe outputSerDe) throws Exception { + this.outputSerDe = outputSerDe; + + String outputTopic = functionConfig.getOutput(); + if (outputTopic != null + && !functionConfig.getOutput().isEmpty() + && outputSerDe != null) { + log.info("Starting producer for output topic {}", outputTopic); + initializeOutputProducer(outputTopic); + } + } + + protected abstract void initializeOutputProducer(String outputTopic) throws Exception; + + // + // Process + // + + @Override + public void prepareDequeueMessageFromProcessQueue() {} + + @Override + public boolean prepareProcessMessage(InputMessage msg) throws InterruptedException { + return true; + } + + @Override + public void handleProcessException(InputMessage msg, Exception cause) {} + + @Override + public void close() { + // stop the consumer first, so no more messages are coming in + inputConsumers.forEach((k, v) -> { + try { + v.close(); + } catch (PulsarClientException e) { + log.warn("Failed to close consumer to input topic {}", k, e); + } + }); + inputConsumers.clear(); + } +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java index 723529d..bba9a7f 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java @@ -28,7 +28,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.functions.instance.FunctionResultRouter; -abstract class AbstractOneOuputTopicProducers implements Producers { +public abstract class AbstractOneOuputTopicProducers implements Producers { protected final PulsarClient client; protected final String outputTopic; @@ -42,7 +42,7 @@ abstract class AbstractOneOuputTopicProducers implements Producers { this.conf = newProducerConfiguration(); } - protected ProducerConfiguration newProducerConfiguration() { + static ProducerConfiguration newProducerConfiguration() { ProducerConfiguration conf = new ProducerConfiguration(); conf.setBlockIfQueueFull(true); conf.setBatchingEnabled(true); @@ -58,11 +58,21 @@ abstract class AbstractOneOuputTopicProducers implements Producers { protected Producer createProducer(String topic) throws PulsarClientException { + return createProducer(client, topic); + } + + public static Producer createProducer(PulsarClient client, String topic) + throws PulsarClientException { return client.createProducer(topic, newProducerConfiguration()); } protected Producer createProducer(String topic, String producerName) throws PulsarClientException { + return createProducer(client, topic, producerName); + } + + public static Producer createProducer(PulsarClient client, String topic, String producerName) + throws PulsarClientException { ProducerConfiguration newConf = newProducerConfiguration(); newConf.setProducerName(producerName); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneOuputTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneOuputTopicProducers.java deleted file mode 100644 index 2fee37a..0000000 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneOuputTopicProducers.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.instance.producers; - -import static com.google.common.base.Preconditions.checkNotNull; - -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; - -@Slf4j -public class SimpleOneOuputTopicProducers extends AbstractOneOuputTopicProducers { - - @Getter - private Producer producer; - - public SimpleOneOuputTopicProducers(PulsarClient client, String outputTopic) throws PulsarClientException { - super(client, outputTopic); - } - - @Override - public void initialize() throws PulsarClientException { - producer = createProducer(outputTopic); - } - - @Override - public Producer getProducer(String srcTopic, int srcTopicPartition) { - checkNotNull(producer, "Producer is not initialized yet"); - return producer; - } - - @Override - public void closeProducer(String srcTopicName, int srcTopicPartition) { - // no-op - } - - @Override - public void close() { - try { - producer.close(); - } catch (PulsarClientException e) { - log.warn("Fail to close producer of topic {}", outputTopic, e); - } - } -} diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java index 20644d2..8e099a6 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java @@ -76,12 +76,12 @@ import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.utils.DefaultSerDe; +import org.apache.pulsar.functions.instance.processors.AtLeastOnceProcessor; +import org.apache.pulsar.functions.instance.processors.MessageProcessor; import org.apache.pulsar.functions.proto.Function.FunctionConfig; import org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuarantees; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager; -import org.apache.pulsar.functions.instance.producers.Producers; -import org.apache.pulsar.functions.instance.producers.SimpleOneOuputTopicProducers; import org.apache.pulsar.functions.utils.FunctionConfigUtils; import org.apache.pulsar.functions.utils.Utils; import org.powermock.api.mockito.PowerMockito; @@ -437,15 +437,15 @@ public class JavaInstanceRunnableProcessTest { } // 3. verify producers and consumers are setup - Producers producers = runnable.getOutputProducer(); - assertTrue(producers instanceof SimpleOneOuputTopicProducers); + MessageProcessor processor = runnable.getProcessor(); + assertTrue(processor instanceof AtLeastOnceProcessor); assertSame(mockProducers.get(Pair.of( fnConfig.getOutput(), null - )).getProducer(), ((SimpleOneOuputTopicProducers) producers).getProducer()); + )).getProducer(), ((AtLeastOnceProcessor) processor).getProducer()); - assertEquals(mockConsumers.size(), runnable.getInputConsumers().size()); - for (Map.Entry<String, Consumer> consumerEntry : runnable.getInputConsumers().entrySet()) { + assertEquals(mockConsumers.size(), processor.getInputConsumers().size()); + for (Map.Entry<String, Consumer> consumerEntry : processor.getInputConsumers().entrySet()) { String topic = consumerEntry.getKey(); Consumer mockConsumer = mockConsumers.get(Pair.of( @@ -464,7 +464,7 @@ public class JavaInstanceRunnableProcessTest { for (ConsumerInstance consumer : mockConsumers.values()) { verify(consumer.getConsumer(), times(1)).close(); } - assertTrue(runnable.getInputConsumers().isEmpty()); + assertTrue(processor.getInputConsumers().isEmpty()); for (ProducerInstance producer : mockProducers.values()) { verify(producer.getProducer(), times(1)).close(); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneOutputTopicProducersTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneOutputTopicProducersTest.java deleted file mode 100644 index 67337a1..0000000 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneOutputTopicProducersTest.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.instance.producers; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.testng.Assert.assertSame; - -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConfiguration; -import org.apache.pulsar.client.api.PulsarClient; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -/** - * Unit test of {@link SimpleOneOuputTopicProducers}. - */ -public class SimpleOneOutputTopicProducersTest { - - private static final String TEST_OUTPUT_TOPIC = "test-output-topic"; - - private PulsarClient mockClient; - private Producer mockProducer; - private SimpleOneOuputTopicProducers producers; - - @BeforeMethod - public void setup() throws Exception { - this.mockClient = mock(PulsarClient.class); - this.mockProducer = mock(Producer.class); - - when(mockClient.createProducer(anyString(), any(ProducerConfiguration.class))) - .thenReturn(mockProducer); - - this.producers = new SimpleOneOuputTopicProducers(mockClient, TEST_OUTPUT_TOPIC); - } - - @Test - public void testInitializeClose() throws Exception { - this.producers.initialize(); - - verify(mockClient, times(1)) - .createProducer(eq(TEST_OUTPUT_TOPIC), any(ProducerConfiguration.class)); - - this.producers.close(); - - verify(mockProducer, times(1)).close(); - } - - @Test(expectedExceptions = NullPointerException.class) - public void testGetProducerWithoutInitialization() throws Exception { - this.producers.getProducer("test-src-topic", 0); - } - - @Test - public void testGetAndCloseProducer() throws Exception { - this.producers.initialize(); - - verify(mockClient, times(1)) - .createProducer(eq(TEST_OUTPUT_TOPIC), any(ProducerConfiguration.class)); - - assertSame(mockProducer, this.producers.getProducer("test-src-topic", 0)); - - verify(mockClient, times(1)) - .createProducer(eq(TEST_OUTPUT_TOPIC), any(ProducerConfiguration.class)); - - producers.closeProducer("test-src-topic", 0); - - assertSame(mockProducer, producers.getProducer()); - - verify(mockProducer, times(0)).close(); - } - -} -- To stop receiving notification emails like this one, please contact mme...@apache.org.