This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new c3c8013 Refactor functions to use Sink interface (#1708) c3c8013 is described below commit c3c8013d701d986970ec9baef142c338233f2f89 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Wed May 2 14:43:10 2018 -0700 Refactor functions to use Sink interface (#1708) * use pulsar sink * removing exception from interface * addressing various comments * changing MultiConsumersOneOuputTopicProducers to use String for partition id --- .../java/org/apache/pulsar/connect/core/Sink.java | 2 +- .../functions/instance/JavaInstanceRunnable.java | 212 ++++++++++++------ .../instance/processors/AtLeastOnceProcessor.java | 77 ------- .../instance/processors/AtMostOnceProcessor.java | 79 ------- .../processors/EffectivelyOnceProcessor.java | 120 ---------- .../instance/processors/MessageProcessor.java | 107 --------- .../instance/processors/MessageProcessorBase.java | 155 ------------- .../MultiConsumersOneOuputTopicProducers.java | 20 +- .../functions/instance/producers/Producers.java | 4 +- .../pulsar/functions/sink/DefaultRuntimeSink.java | 1 - .../apache/pulsar/functions/sink/PulsarSink.java | 247 ++++++++++++++++++++- .../PulsarSinkConfig.java} | 19 +- .../apache/pulsar/functions/sink/RuntimeSink.java | 5 +- .../pulsar/functions/source/PulsarSource.java | 30 +-- .../{PulsarConfig.java => PulsarSourceConfig.java} | 9 +- .../src/main/python/python_instance_main.py | 16 +- .../instance/JavaInstanceRunnableTest.java | 81 ------- .../MultiConsumersOneOutputTopicProducersTest.java | 2 +- .../functions/sink/DefaultRuntimeSinkTest.java | 6 +- .../PulsarSinkTest.java} | 136 +++++++++--- .../pulsar/functions/source/PulsarSourceTest.java | 90 +++++++- .../pulsar/functions/runtime/JavaInstanceMain.java | 40 ++-- .../pulsar/functions/runtime/ProcessRuntime.java | 35 ++- .../functions/runtime/ProcessRuntimeTest.java | 20 +- 24 files changed, 683 insertions(+), 830 deletions(-) diff --git a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java index ca569e7..cd2d63d 100644 --- a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java +++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java @@ -46,4 +46,4 @@ public interface Sink<T> extends AutoCloseable { * @return Completable future fo async publish request */ CompletableFuture<Void> write(T value); -} \ No newline at end of file +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index ca6414d..2e8037b 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -22,15 +22,13 @@ package org.apache.pulsar.functions.instance; import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF; +import com.google.gson.Gson; import io.netty.buffer.ByteBuf; import java.util.Arrays; -import java.util.Base64; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; import lombok.AccessLevel; import lombok.Getter; @@ -49,18 +47,23 @@ import org.apache.logging.log4j.ThreadContext; import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.LoggerConfig; -import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.connect.core.Record; +import org.apache.pulsar.connect.core.Source; import org.apache.pulsar.functions.api.Function; -import org.apache.pulsar.functions.api.utils.DefaultSerDe; -import org.apache.pulsar.functions.instance.processors.MessageProcessor; import org.apache.pulsar.functions.proto.InstanceCommunication; +import org.apache.pulsar.functions.proto.Function.SourceSpec; +import org.apache.pulsar.functions.proto.Function.SinkSpec; +import org.apache.pulsar.functions.sink.PulsarSink; +import org.apache.pulsar.functions.sink.PulsarSinkConfig; +import org.apache.pulsar.functions.sink.RuntimeSink; import org.apache.pulsar.functions.source.PulsarRecord; +import org.apache.pulsar.functions.source.PulsarSource; +import org.apache.pulsar.functions.source.PulsarSourceConfig; +import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager; -import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.instance.state.StateContextImpl; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.utils.Reflections; @@ -93,18 +96,14 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { @Getter private Exception deathException; - @Getter(AccessLevel.PACKAGE) - private SerDe outputSerDe; - - @Getter(AccessLevel.PACKAGE) - // processor - private final MessageProcessor processor; - // function stats private final FunctionStats stats; private Record currentRecord; + private Source source; + private RuntimeSink sink; + public JavaInstanceRunnable(InstanceConfig instanceConfig, FunctionCacheManager fnCache, String jarFile, @@ -116,9 +115,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { this.client = (PulsarClientImpl) pulsarClient; this.stateStorageServiceUrl = stateStorageServiceUrl; this.stats = new FunctionStats(); - this.processor = MessageProcessor.create( - client, - instanceConfig.getFunctionDetails()); } /** @@ -151,19 +147,16 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass()); } - // setup serde - setupSerDe(typeArgs, clsLoader); - // start the state table setupStateTable(); // start the output producer - processor.setupOutput(outputSerDe); + setupOutput(typeArgs[1]); // start the input consumer - processor.setupInput(typeArgs[0]); + setupInput(typeArgs[0]); // start any log topic handler setupLogHandler(); - return new JavaInstance(instanceConfig, object, clsLoader, client, processor.getSource()); + return new JavaInstance(instanceConfig, object, clsLoader, client, this.source); } /** @@ -175,9 +168,14 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { javaInstance = setupJavaInstance(); while (true) { - currentRecord = processor.recieveMessage(); + currentRecord = readInput(); - processor.postReceiveMessage(currentRecord); + if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == org.apache.pulsar.functions + .proto.Function.ProcessingGuarantees.ATMOST_ONCE) { + if (instanceConfig.getFunctionDetails().getAutoAck()) { + currentRecord.ack(); + } + } // state object is per function, because we need to have the ability to know what updates // are made in this function and ensure we only acknowledge after the state is persisted. @@ -310,44 +308,47 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { throw result.getSystemException(); } else { stats.incrementSuccessfullyProcessed(endTime - startTime); - if (result.getResult() != null && instanceConfig.getFunctionDetails().getSink().getTopic() != null) { - byte[] output; - try { - output = outputSerDe.serialize(result.getResult()); - } catch (Exception ex) { - stats.incrementSerializationExceptions(); - throw ex; - } - if (output != null) { - sendOutputMessage(srcRecord, output); - } else { - processor.sendOutputMessage(srcRecord, null); - } + if (result.getResult() != null) { + sendOutputMessage(srcRecord, result.getResult()); } else { // the function doesn't produce any result or the user doesn't want the result. - processor.sendOutputMessage(srcRecord, null); + srcRecord.ack(); } } } - private void sendOutputMessage(Record srcRecord, - byte[] output) throws Exception { - - MessageBuilder msgBuilder = MessageBuilder.create(); - if (srcRecord instanceof PulsarRecord) { - PulsarRecord pulsarMessage = (PulsarRecord) srcRecord; - msgBuilder - .setContent(output) - .setProperty("__pfn_input_topic__", pulsarMessage.getTopicName()) - .setProperty("__pfn_input_msg_id__", new String(Base64.getEncoder().encode(pulsarMessage.getMessageId().toByteArray()))); + private void sendOutputMessage(Record srcRecord, Object output) { + try { + this.sink.write(srcRecord, output); + } catch (Exception e) { + log.info("Encountered exception in sink write: ", e); + throw new RuntimeException(e); } + } - processor.sendOutputMessage(srcRecord, msgBuilder); + private Record readInput() { + try { + return this.source.read(); + } catch (Exception e) { + log.info("Encountered exception in source write: ", e); + throw new RuntimeException(e); + } } @Override public void close() { - processor.close(); + try { + source.close(); + } catch (Exception e) { + log.error("Failed to close source {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e); + } + + try { + sink.close(); + } catch (Exception e) { + log.error("Failed to close sink {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e); + } + if (null != javaInstance) { javaInstance.close(); } @@ -415,27 +416,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { bldr.putMetrics(metricName, digest); } - private void setupSerDe(Class<?>[] typeArgs, ClassLoader clsLoader) { - if (!Void.class.equals(typeArgs[1])) { // return type is not `Void.class` - if (instanceConfig.getFunctionDetails().getSink().getSerDeClassName() == null - || instanceConfig.getFunctionDetails().getSink().getSerDeClassName().isEmpty() - || instanceConfig.getFunctionDetails().getSink().getSerDeClassName().equals(DefaultSerDe.class.getName())) { - outputSerDe = InstanceUtils.initializeDefaultSerDe(typeArgs[1]); - } else { - this.outputSerDe = InstanceUtils.initializeSerDe(instanceConfig.getFunctionDetails().getSink().getSerDeClassName(), clsLoader, typeArgs[1]); - } - Class<?>[] outputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, outputSerDe.getClass()); - if (outputSerDe.getClass().getName().equals(DefaultSerDe.class.getName())) { - if (!DefaultSerDe.IsSupportedType(typeArgs[1])) { - throw new RuntimeException("Default Serde does not support type " + typeArgs[1]); - } - } else if (!outputSerdeTypeArgs[0].isAssignableFrom(typeArgs[1])) { - throw new RuntimeException("Inconsistent types found between function output type and output serde type: " - + " function type = " + typeArgs[1] + "should be assignable from " + outputSerdeTypeArgs[0]); - } - } - } - private void setupLogHandler() { if (instanceConfig.getFunctionDetails().getLogTopic() != null && !instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) { @@ -465,4 +445,92 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } config.getRootLogger().removeAppender(logAppender.getName()); } + + public void setupInput(Class<?> inputType) throws Exception { + + SourceSpec sourceSpec = this.instanceConfig.getFunctionDetails().getSource(); + Object object; + if (sourceSpec.getClassName().equals(PulsarSource.class.getName())) { + + PulsarSourceConfig pulsarSourceConfig = new PulsarSourceConfig(); + pulsarSourceConfig.setTopicSerdeClassNameMap(sourceSpec.getTopicsToSerDeClassNameMap()); + pulsarSourceConfig.setSubscriptionName( + FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails())); + pulsarSourceConfig.setProcessingGuarantees( + FunctionConfig.ProcessingGuarantees.valueOf( + this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name())); + pulsarSourceConfig.setSubscriptionType( + FunctionConfig.SubscriptionType.valueOf(sourceSpec.getSubscriptionType().name())); + pulsarSourceConfig.setTypeClassName(inputType.getName()); + + Object[] params = {this.client, pulsarSourceConfig}; + Class[] paramTypes = {PulsarClient.class, PulsarSourceConfig.class}; + + object = Reflections.createInstance( + sourceSpec.getClassName(), + PulsarSource.class.getClassLoader(), params, paramTypes); + + } else { + object = Reflections.createInstance( + sourceSpec.getClassName(), + Thread.currentThread().getContextClassLoader()); + } + + Class<?>[] typeArgs; + if (object instanceof Source) { + typeArgs = TypeResolver.resolveRawArguments(Source.class, object.getClass()); + assert typeArgs.length > 0; + } else { + throw new RuntimeException("Source does not implement correct interface"); + } + this.source = (Source) object; + + try { + this.source.open(new Gson().fromJson(sourceSpec.getConfigs(), Map.class)); + } catch (Exception e) { + log.info("Error occurred executing open for source: {}", + sourceSpec.getClassName(), e); + } + } + + public void setupOutput(Class<?> outputType) throws Exception { + + SinkSpec sinkSpec = this.instanceConfig.getFunctionDetails().getSink(); + Object object; + if (sinkSpec.getClassName().equals(PulsarSink.class.getName())) { + PulsarSinkConfig pulsarSinkConfig = new PulsarSinkConfig(); + pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.valueOf( + this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name())); + pulsarSinkConfig.setTopic(sinkSpec.getTopic()); + pulsarSinkConfig.setSerDeClassName(sinkSpec.getSerDeClassName()); + pulsarSinkConfig.setTypeClassName(outputType.getName()); + + Object[] params = {this.client, pulsarSinkConfig}; + Class[] paramTypes = {PulsarClient.class, PulsarSinkConfig.class}; + + object = Reflections.createInstance( + sinkSpec.getClassName(), + PulsarSink.class.getClassLoader(), params, paramTypes); + } else { + object = Reflections.createInstance( + sinkSpec.getClassName(), + Thread.currentThread().getContextClassLoader()); + } + + Class<?>[] typeArgs; + if (object instanceof RuntimeSink) { + typeArgs = TypeResolver.resolveRawArguments(RuntimeSink.class, object.getClass()); + assert typeArgs.length > 0; + } else { + throw new RuntimeException("Sink does not implement correct interface"); + } + this.sink = (RuntimeSink) object; + + try { + this.sink.open(new Gson().fromJson(sinkSpec.getConfigs(), Map.class)); + } catch (Exception e) { + log.info("Error occurred executing open for sink: {}", + sinkSpec.getClassName(), e); + } + } } \ No newline at end of file diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java deleted file mode 100644 index 8e149b0..0000000 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.instance.processors; - -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageBuilder; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.connect.core.Record; -import org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers; -import org.apache.pulsar.functions.proto.Function.FunctionDetails; - -/** - * A message processor that process messages at-most-once. - */ -@Slf4j -public class AtLeastOnceProcessor extends MessageProcessorBase { - - @Getter - private Producer<byte[]> producer; - - AtLeastOnceProcessor(PulsarClient client, - FunctionDetails functionDetails) { - super(client, functionDetails); - } - - @Override - protected void initializeOutputProducer(String outputTopic) throws Exception { - producer = AbstractOneOuputTopicProducers.createProducer(client, outputTopic); - } - - @Override - public void sendOutputMessage(Record srcRecord, MessageBuilder outputMsgBuilder) { - if (null == outputMsgBuilder || null == producer) { - srcRecord.ack(); - return; - } - - Message<byte[]> outputMsg = outputMsgBuilder.build(); - producer.sendAsync(outputMsg) - .thenAccept(msgId -> { - srcRecord.ack(); - }); - } - - @Override - public void close() { - super.close(); - if (null != producer) { - try { - producer.close(); - } catch (PulsarClientException e) { - log.warn("Fail to close producer for processor {}", functionDetails.getSink().getTopic(), e); - } - } - } -} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java deleted file mode 100644 index 930161e..0000000 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.instance.processors; - -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageBuilder; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.connect.core.Record; -import org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers; -import org.apache.pulsar.functions.proto.Function.FunctionDetails; - -/** - * A message processor that process messages at-most-once. - */ -@Slf4j -class AtMostOnceProcessor extends MessageProcessorBase { - - private Producer<byte[]> producer; - - AtMostOnceProcessor(PulsarClient client, - FunctionDetails functionDetails) { - super(client, functionDetails); - } - - @Override - public void postReceiveMessage(Record record) { - super.postReceiveMessage(record); - if (functionDetails.getAutoAck()) { - record.ack(); - } - } - - @Override - protected void initializeOutputProducer(String outputTopic) throws Exception { - producer = AbstractOneOuputTopicProducers.createProducer(client, outputTopic); - } - - @Override - public void sendOutputMessage(Record srcRecord, MessageBuilder outputMsgBuilder) { - if (null == outputMsgBuilder) { - return; - } - - Message<byte[]> outputMsg = outputMsgBuilder.build(); - producer.sendAsync(outputMsg); - } - - @Override - public void close() { - super.close(); - if (null != producer) { - try { - producer.close(); - } catch (PulsarClientException e) { - log.warn("Fail to close producer for processor {}", functionDetails.getSink().getTopic(), e); - } - } - } -} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java deleted file mode 100644 index 06a463b..0000000 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.instance.processors; - -import lombok.AccessLevel; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerEventListener; -import org.apache.pulsar.client.api.MessageBuilder; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.connect.core.Record; -import org.apache.pulsar.functions.source.PulsarRecord; -import org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers; -import org.apache.pulsar.functions.instance.producers.Producers; -import org.apache.pulsar.functions.proto.Function.FunctionDetails; - -/** - * A message processor that process messages effectively-once. - */ -@Slf4j -class EffectivelyOnceProcessor extends MessageProcessorBase implements ConsumerEventListener { - - @Getter(AccessLevel.PACKAGE) - protected Producers outputProducer; - - EffectivelyOnceProcessor(PulsarClient client, - FunctionDetails functionDetails) { - super(client, functionDetails); - } - - @Override - public void becameActive(Consumer<?> consumer, int partitionId) { - // if the instance becomes active for a given topic partition, - // open a producer for the results computed from this topic partition. - if (null != outputProducer) { - try { - this.outputProducer.getProducer(consumer.getTopic(), partitionId); - } catch (PulsarClientException e) { - // this can be ignored, because producer can be lazily created when accessing it. - log.warn("Fail to create a producer for results computed from messages of topic: {}, partition: {}", - consumer.getTopic(), partitionId); - } - } - } - - @Override - public void becameInactive(Consumer<?> consumer, int partitionId) { - if (null != outputProducer) { - // if I lost the ownership of a partition, close its corresponding topic partition. - // this is to allow the new active consumer be able to produce to the result topic. - this.outputProducer.closeProducer(consumer.getTopic(), partitionId); - } - } - - @Override - protected void initializeOutputProducer(String outputTopic) throws Exception { - outputProducer = new MultiConsumersOneOuputTopicProducers(client, outputTopic); - outputProducer.initialize(); - } - - // - // Methods to process messages - // - - @Override - public void sendOutputMessage(Record srcRecord, - MessageBuilder outputMsgBuilder) throws Exception { - if (null == outputMsgBuilder) { - srcRecord.ack(); - return; - } - - // assign sequence id to output message for idempotent producing - outputMsgBuilder = outputMsgBuilder - .setSequenceId(srcRecord.getRecordSequence()); - - // currently on PulsarRecord - if (srcRecord instanceof PulsarRecord) { - PulsarRecord pulsarMessage = (PulsarRecord) srcRecord; - Producer producer = outputProducer.getProducer(pulsarMessage.getTopicName(), - Integer.parseInt(srcRecord.getPartitionId())); - - org.apache.pulsar.client.api.Message outputMsg = outputMsgBuilder.build(); - producer.sendAsync(outputMsg) - .thenAccept(messageId -> srcRecord.ack()) - .join(); - } - } - - @Override - public void close() { - super.close(); - // kill the result producer - if (null != outputProducer) { - outputProducer.close(); - outputProducer = null; - } - } -} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java deleted file mode 100644 index 0dcf12c..0000000 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.instance.processors; - -import java.util.Map; - -import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving; -import org.apache.pulsar.client.api.MessageBuilder; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.connect.core.Record; -import org.apache.pulsar.connect.core.Source; -import org.apache.pulsar.functions.api.SerDe; -import org.apache.pulsar.functions.proto.Function.FunctionDetails; -import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees; - -/** - * A processor that processes messages, used by {@link org.apache.pulsar.functions.instance.JavaInstanceRunnable}. - */ -@Evolving -public interface MessageProcessor extends AutoCloseable { - - static MessageProcessor create(PulsarClient client, - FunctionDetails functionDetails) { - ProcessingGuarantees processingGuarantees = functionDetails.getProcessingGuarantees(); - - if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) { - return new EffectivelyOnceProcessor( - client, - functionDetails); - } else if (processingGuarantees == ProcessingGuarantees.ATMOST_ONCE) { - return new AtMostOnceProcessor( - client, - functionDetails); - } else { - return new AtLeastOnceProcessor( - client, - functionDetails); - } - } - - void postReceiveMessage(Record record); - - /** - * Setup the source. Implementation is responsible for initializing the source - * and for calling open method for source - * @param inputType the input type of the function - * @throws Exception - */ - void setupInput(Class<?> inputType) - throws Exception; - - /** - * Return the source. - * - * @return the source. - */ - Source getSource(); - - /** - * Setup the output with a provided <i>outputSerDe</i>. The implementation of this processor is responsible for - * setting up the output - * - * @param outputSerDe output serde. - * @throws Exception - */ - void setupOutput(SerDe outputSerDe) throws Exception; - - /** - * Send the output message to the output topic. The output message is computed from <i>inputMsg</i>. - * - * <p>If the <i>outputMsgBuilder</i> is null, the implementation doesn't have to send any messages to the output. - * The implementation can decide to acknowledge the input message based on its process guarantees. - * - * @param srcRecord record from source - * @param outputMsgBuilder output message builder. it can be null. - */ - void sendOutputMessage(Record srcRecord, - MessageBuilder outputMsgBuilder) throws PulsarClientException, Exception; - - /** - * Get the next message to process - * @return the next input message - * @throws Exception - */ - Record recieveMessage() throws Exception; - - @Override - void close(); - -} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java deleted file mode 100644 index 33b699a..0000000 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.instance.processors; - -import java.util.Map; - -import com.google.gson.Gson; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - -import net.jodah.typetools.TypeResolver; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.connect.core.Record; -import org.apache.pulsar.connect.core.Source; -import org.apache.pulsar.functions.api.SerDe; -import org.apache.pulsar.functions.source.PulsarConfig; -import org.apache.pulsar.functions.source.PulsarSource; -import org.apache.pulsar.functions.proto.Function.FunctionDetails; -import org.apache.pulsar.functions.utils.FunctionConfig; -import org.apache.pulsar.functions.utils.FunctionDetailsUtils; -import org.apache.pulsar.functions.utils.Reflections; - -/** - * The base implementation of {@link MessageProcessor}. - */ -@Slf4j -abstract class MessageProcessorBase implements MessageProcessor { - - protected final PulsarClient client; - protected final FunctionDetails functionDetails; - - @Getter - protected Source source; - - - protected MessageProcessorBase(PulsarClient client, - FunctionDetails functionDetails) { - this.client = client; - this.functionDetails = functionDetails; - } - - // - // Input - // - - @Override - public void setupInput(Class<?> inputType) throws Exception { - - org.apache.pulsar.functions.proto.Function.SourceSpec sourceSpec = this.functionDetails.getSource(); - Object object; - if (sourceSpec.getClassName().equals(PulsarSource.class.getName())) { - - PulsarConfig pulsarConfig = new PulsarConfig(); - pulsarConfig.setTopicSerdeClassNameMap(this.functionDetails.getSource().getTopicsToSerDeClassNameMap()); - pulsarConfig.setSubscriptionName(FunctionDetailsUtils.getFullyQualifiedName(this.functionDetails)); - pulsarConfig.setProcessingGuarantees( - FunctionConfig.ProcessingGuarantees.valueOf(this.functionDetails.getProcessingGuarantees().name())); - pulsarConfig.setSubscriptionType( - FunctionConfig.SubscriptionType.valueOf(this.functionDetails.getSource().getSubscriptionType().name())); - pulsarConfig.setTypeClassName(inputType.getName()); - - Object[] params = {this.client, pulsarConfig}; - Class[] paramTypes = {PulsarClient.class, PulsarConfig.class}; - - object = Reflections.createInstance( - sourceSpec.getClassName(), - PulsarSource.class.getClassLoader(), params, paramTypes); - - } else { - object = Reflections.createInstance( - sourceSpec.getClassName(), - Thread.currentThread().getContextClassLoader()); - } - - Class<?>[] typeArgs; - if (object instanceof Source) { - typeArgs = TypeResolver.resolveRawArguments(Source.class, object.getClass()); - assert typeArgs.length > 0; - } else { - throw new RuntimeException("Source does not implement correct interface"); - } - this.source = (Source) object; - - try { - this.source.open(new Gson().fromJson(sourceSpec.getConfigs(), Map.class)); - } catch (Exception e) { - log.info("Error occurred executing open for source: {}", - this.functionDetails.getSource().getClassName(), e); - } - - } - - public Record recieveMessage() throws Exception { - return this.source.read(); - } - - /** - * Method called when a message is received from input after being put into the process queue. - * - * <p>The processor implementation can make a decision to process the message based on its processing guarantees. - * for example, an at-most-once processor can ack the message immediately. - * - * @param record input message. - */ - @Override - public void postReceiveMessage(Record record) {} - - // - // Output - // - - @Override - public void setupOutput(SerDe outputSerDe) throws Exception { - String outputTopic = functionDetails.getSink().getTopic(); - if (outputTopic != null - && !outputTopic.isEmpty() - && outputSerDe != null) { - log.info("Starting producer for output topic {}", outputTopic); - initializeOutputProducer(outputTopic); - } - } - - protected abstract void initializeOutputProducer(String outputTopic) throws Exception; - - // - // Process - // - - @Override - public void close() { - - try { - this.source.close(); - } catch (Exception e) { - log.warn("Failed to close source {}", this.source, e); - } - } -} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java index 3668311..0359f7d 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java @@ -18,9 +18,8 @@ */ package org.apache.pulsar.functions.instance.producers; -import io.netty.util.collection.IntObjectHashMap; -import io.netty.util.collection.IntObjectMap; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -37,7 +36,8 @@ import org.apache.pulsar.client.api.PulsarClientException; public class MultiConsumersOneOuputTopicProducers extends AbstractOneOuputTopicProducers { @Getter(AccessLevel.PACKAGE) - private final Map<String, IntObjectMap<Producer<byte[]>>> producers; + private final Map<String, Map<String, Producer<byte[]>>> producers; + public MultiConsumersOneOuputTopicProducers(PulsarClient client, String outputTopic) @@ -51,15 +51,15 @@ public class MultiConsumersOneOuputTopicProducers extends AbstractOneOuputTopicP // no-op } - static String makeProducerName(String srcTopicName, int srcTopicPartition) { + static String makeProducerName(String srcTopicName, String srcTopicPartition) { return String.format("%s-%s", srcTopicName, srcTopicPartition); } @Override - public synchronized Producer<byte[]> getProducer(String srcTopicName, int srcTopicPartition) throws PulsarClientException { - IntObjectMap<Producer<byte[]>> producerMap = producers.get(srcTopicName); + public synchronized Producer<byte[]> getProducer(String srcTopicName, String srcTopicPartition) throws PulsarClientException { + Map<String, Producer<byte[]>> producerMap = producers.get(srcTopicName); if (null == producerMap) { - producerMap = new IntObjectHashMap<>(); + producerMap = new HashMap<>(); producers.put(srcTopicName, producerMap); } @@ -72,8 +72,8 @@ public class MultiConsumersOneOuputTopicProducers extends AbstractOneOuputTopicP } @Override - public synchronized void closeProducer(String srcTopicName, int srcTopicPartition) { - IntObjectMap<Producer<byte[]>> producerMap = producers.get(srcTopicName); + public synchronized void closeProducer(String srcTopicName, String srcTopicPartition) { + Map<String, Producer<byte[]>> producerMap = producers.get(srcTopicName); if (null != producerMap) { Producer<byte[]> producer = producerMap.remove(srcTopicPartition); @@ -89,7 +89,7 @@ public class MultiConsumersOneOuputTopicProducers extends AbstractOneOuputTopicP @Override public synchronized void close() { List<CompletableFuture<Void>> closeFutures = new ArrayList<>(producers.size()); - for (IntObjectMap<Producer<byte[]>> producerMap: producers.values()) { + for (Map<String, Producer<byte[]>> producerMap: producers.values()) { for (Producer<byte[]> producer : producerMap.values()) { closeFutures.add(producer.closeAsync()); } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java index 29cd96a..b9d6a08 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java @@ -42,7 +42,7 @@ public interface Producers extends AutoCloseable { * src topic partition * @return the producer instance to produce messages */ - Producer<byte[]> getProducer(String srcTopicName, int srcTopicPartition) throws PulsarClientException; + Producer<byte[]> getProducer(String srcTopicName, String srcTopicPartition) throws PulsarClientException; /** * Close a producer specified by <tt>srcTopicName</tt> and <tt>srcTopicPartition</tt> @@ -51,7 +51,7 @@ public interface Producers extends AutoCloseable { * @param srcTopicPartition src topic partition */ void closeProducer(String srcTopicName, - int srcTopicPartition); + String srcTopicPartition); @Override void close(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java index 8e0d37a..54e34c3 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java @@ -20,7 +20,6 @@ package org.apache.pulsar.functions.sink; import java.util.Map; import java.util.concurrent.CompletableFuture; -import org.apache.pulsar.connect.core.RecordContext; import org.apache.pulsar.connect.core.Sink; /** diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index 7c89d92..79c0b35 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -18,5 +18,250 @@ */ package org.apache.pulsar.functions.sink; -public class PulsarSink { +import com.google.common.annotations.VisibleForTesting; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import net.jodah.typetools.TypeResolver; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerEventListener; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.connect.core.RecordContext; +import org.apache.pulsar.functions.api.SerDe; +import org.apache.pulsar.functions.api.utils.DefaultSerDe; +import org.apache.pulsar.functions.instance.InstanceUtils; +import org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers; +import org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers; +import org.apache.pulsar.functions.instance.producers.Producers; +import org.apache.pulsar.functions.source.PulsarRecord; +import org.apache.pulsar.functions.utils.FunctionConfig; + +import java.util.Base64; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +@Slf4j +public class PulsarSink<T> implements RuntimeSink<T> { + + private PulsarClient client; + private PulsarSinkConfig pulsarSinkConfig; + private SerDe<T> outputSerDe; + + private PulsarSinkProcessor pulsarSinkProcessor; + + private interface PulsarSinkProcessor { + void initializeOutputProducer(String outputTopic) throws Exception; + + void sendOutputMessage(MessageBuilder outputMsgBuilder, + PulsarRecord pulsarRecord) throws Exception; + + void close() throws Exception; + } + + private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor { + private Producer<byte[]> producer; + + @Override + public void initializeOutputProducer(String outputTopic) throws Exception { + this.producer = AbstractOneOuputTopicProducers.createProducer( + client, pulsarSinkConfig.getTopic()); + } + + @Override + public void sendOutputMessage(MessageBuilder outputMsgBuilder, + PulsarRecord pulsarRecord) throws Exception { + Message<byte[]> outputMsg = outputMsgBuilder.build(); + this.producer.sendAsync(outputMsg); + } + + @Override + public void close() throws Exception { + if (null != producer) { + try { + producer.close(); + } catch (PulsarClientException e) { + log.warn("Fail to close producer for processor {}", pulsarSinkConfig.getTopic(), e); + } + } + } + } + + private class PulsarSinkAtLeastOnceProcessor implements PulsarSinkProcessor { + private Producer<byte[]> producer; + + @Override + public void initializeOutputProducer(String outputTopic) throws Exception { + this.producer = AbstractOneOuputTopicProducers.createProducer( + client, pulsarSinkConfig.getTopic()); + } + + @Override + public void sendOutputMessage(MessageBuilder outputMsgBuilder, + PulsarRecord pulsarRecord) throws Exception { + Message<byte[]> outputMsg = outputMsgBuilder.build(); + this.producer.sendAsync(outputMsg).thenAccept(messageId -> pulsarRecord.ack()); + } + + @Override + public void close() throws Exception { + if (null != producer) { + try { + producer.close(); + } catch (PulsarClientException e) { + log.warn("Fail to close producer for processor {}", pulsarSinkConfig.getTopic(), e); + } + } + } + } + + private class PulsarSinkEffectivelyOnceProcessor implements PulsarSinkProcessor, ConsumerEventListener { + + @Getter(AccessLevel.PACKAGE) + protected Producers outputProducer; + + @Override + public void initializeOutputProducer(String outputTopic) throws Exception { + outputProducer = new MultiConsumersOneOuputTopicProducers(client, outputTopic); + outputProducer.initialize(); + } + + @Override + public void sendOutputMessage(MessageBuilder outputMsgBuilder, PulsarRecord pulsarRecord) + throws Exception { + + // assign sequence id to output message for idempotent producing + outputMsgBuilder = outputMsgBuilder + .setSequenceId(pulsarRecord.getRecordSequence()); + + // currently on PulsarRecord + Producer producer = outputProducer.getProducer(pulsarRecord.getTopicName(), + pulsarRecord.getPartitionId()); + + org.apache.pulsar.client.api.Message outputMsg = outputMsgBuilder.build(); + producer.sendAsync(outputMsg) + .thenAccept(messageId -> pulsarRecord.ack()) + .join(); + } + + @Override + public void close() throws Exception { + // kill the result producer + if (null != outputProducer) { + outputProducer.close(); + outputProducer = null; + } + } + + @Override + public void becameActive(Consumer<?> consumer, int partitionId) { + // if the instance becomes active for a given topic partition, + // open a producer for the results computed from this topic partition. + if (null != outputProducer) { + try { + this.outputProducer.getProducer(consumer.getTopic(), Integer.toString(partitionId)); + } catch (PulsarClientException e) { + // this can be ignored, because producer can be lazily created when accessing it. + log.warn("Fail to create a producer for results computed from messages of topic: {}, partition: {}", + consumer.getTopic(), partitionId); + } + } + } + + @Override + public void becameInactive(Consumer<?> consumer, int partitionId) { + if (null != outputProducer) { + // if I lost the ownership of a partition, close its corresponding topic partition. + // this is to allow the new active consumer be able to produce to the result topic. + this.outputProducer.closeProducer(consumer.getTopic(), Integer.toString(partitionId)); + } + } + } + + public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig) { + this.client = client; + this.pulsarSinkConfig = pulsarSinkConfig; + } + + @Override + public void open(Map<String, Object> config) throws Exception { + + // Setup Serialization/Deserialization + setupSerDe(); + + FunctionConfig.ProcessingGuarantees processingGuarantees = this.pulsarSinkConfig.getProcessingGuarantees(); + switch (processingGuarantees) { + case ATMOST_ONCE: + this.pulsarSinkProcessor = new PulsarSinkAtMostOnceProcessor(); + break; + case ATLEAST_ONCE: + this.pulsarSinkProcessor = new PulsarSinkAtLeastOnceProcessor(); + break; + case EFFECTIVELY_ONCE: + this.pulsarSinkProcessor = new PulsarSinkEffectivelyOnceProcessor(); + break; + } + this.pulsarSinkProcessor.initializeOutputProducer(this.pulsarSinkConfig.getTopic()); + } + + @Override + public CompletableFuture<Void> write(T value) { + return null; + } + + @Override + public void write(RecordContext recordContext, T value) throws Exception { + + PulsarRecord pulsarRecord = (PulsarRecord) recordContext; + + byte[] output; + try { + output = this.outputSerDe.serialize(value); + } catch (Exception e) { + //TODO Add serialization exception stats + throw new RuntimeException("Error occured when attempting to serialize output:", e); + } + MessageBuilder msgBuilder = MessageBuilder.create(); + msgBuilder + .setContent(output) + .setProperty("__pfn_input_topic__", pulsarRecord.getTopicName()) + .setProperty("__pfn_input_msg_id__", new String( + Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray()))); + this.pulsarSinkProcessor.sendOutputMessage(msgBuilder, pulsarRecord); + } + + @Override + public void close() throws Exception { + this.pulsarSinkProcessor.close(); + + } + + @VisibleForTesting + void setupSerDe() throws ClassNotFoundException { + Class<?> typeArg = Thread.currentThread().getContextClassLoader().loadClass( + this.pulsarSinkConfig.getTypeClassName()); + + if (!Void.class.equals(typeArg)) { // return type is not `Void.class` + if (this.pulsarSinkConfig.getSerDeClassName() == null + || this.pulsarSinkConfig.getSerDeClassName().isEmpty() + || this.pulsarSinkConfig.getSerDeClassName().equals(DefaultSerDe.class.getName())) { + this.outputSerDe = InstanceUtils.initializeDefaultSerDe(typeArg); + } else { + this.outputSerDe = InstanceUtils.initializeSerDe(this.pulsarSinkConfig.getSerDeClassName(), + Thread.currentThread().getContextClassLoader(), typeArg); + } + Class<?>[] outputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, outputSerDe.getClass()); + if (outputSerDe.getClass().getName().equals(DefaultSerDe.class.getName())) { + if (!DefaultSerDe.IsSupportedType(typeArg)) { + throw new RuntimeException("Default Serde does not support type " + typeArg); + } + } else if (!outputSerdeTypeArgs[0].isAssignableFrom(typeArg)) { + throw new RuntimeException("Inconsistent types found between function output type and output serde type: " + + " function type = " + typeArg + "should be assignable from " + outputSerdeTypeArgs[0]); + } + } + } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java similarity index 67% copy from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java copy to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java index 2a5dc44..1def3f1 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java @@ -16,33 +16,22 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.source; +package org.apache.pulsar.functions.sink; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.Builder; -import lombok.Data; import lombok.Getter; -import lombok.NoArgsConstructor; import lombok.Setter; import lombok.ToString; import org.apache.pulsar.functions.utils.FunctionConfig; -import java.io.IOException; import java.util.Map; @Getter @Setter @ToString -public class PulsarConfig { - +public class PulsarSinkConfig { private FunctionConfig.ProcessingGuarantees processingGuarantees; private FunctionConfig.SubscriptionType subscriptionType; - private String subscriptionName; - private Map<String, String> topicSerdeClassNameMap; + private String topic; + private String serDeClassName; private String typeClassName; - - public static PulsarConfig load(Map<String, Object> map) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(new ObjectMapper().writeValueAsString(map), PulsarConfig.class); - } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java index 63a48ec..fe47705 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java @@ -29,7 +29,7 @@ import org.apache.pulsar.connect.core.Sink; * <p>There is a default implementation provided for wrapping up the user provided {@link Sink}. Pulsar sink * should be implemented using this interface to ensure supporting effective-once. */ -public interface RuntimeSink<T> extends Sink<T> { +public interface RuntimeSink<T> extends Sink<T>{ /** * Write the <tt>value</tt>value. @@ -40,7 +40,7 @@ public interface RuntimeSink<T> extends Sink<T> { * @param inputRecordContext input record context * @param value output value computed from the runtime. */ - default void write(RecordContext inputRecordContext, T value) { + default void write(RecordContext inputRecordContext, T value) throws Exception { write(value) .thenAccept(ignored -> inputRecordContext.ack()) .exceptionally(cause -> { @@ -48,5 +48,4 @@ public interface RuntimeSink<T> extends Sink<T> { return null; }); } - } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index 9048544..dd0fb38 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.functions.source; +import com.google.common.annotations.VisibleForTesting; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import net.jodah.typetools.TypeResolver; @@ -41,27 +42,27 @@ import java.util.concurrent.TimeUnit; public class PulsarSource<T> implements Source<T> { private PulsarClient pulsarClient; - private PulsarConfig pulsarConfig; + private PulsarSourceConfig pulsarSourceConfig; private Map<String, SerDe> topicToSerDeMap = new HashMap<>(); @Getter private org.apache.pulsar.client.api.Consumer inputConsumer; - public PulsarSource(PulsarClient pulsarClient, PulsarConfig pulsarConfig) { + public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig) { this.pulsarClient = pulsarClient; - this.pulsarConfig = pulsarConfig; + this.pulsarSourceConfig = pulsarConfig; } @Override public void open(Map<String, Object> config) throws Exception { // Setup Serialization/Deserialization - setupSerde(); + setupSerDe(); // Setup pulsar consumer this.inputConsumer = this.pulsarClient.newConsumer() - .topics(new ArrayList<>(this.pulsarConfig.getTopicSerdeClassNameMap().keySet())) - .subscriptionName(this.pulsarConfig.getSubscriptionName()) - .subscriptionType(this.pulsarConfig.getSubscriptionType().get()) + .topics(new ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet())) + .subscriptionName(this.pulsarSourceConfig.getSubscriptionName()) + .subscriptionType(this.pulsarSourceConfig.getSubscriptionType().get()) .ackTimeout(1, TimeUnit.MINUTES) .subscribe(); } @@ -81,7 +82,7 @@ public class PulsarSource<T> implements Source<T> { MessageIdImpl messageId = (MessageIdImpl) topicMessageId.getInnerMessageId(); partitionId = Long.toString(messageId.getPartitionIndex()); } else { - topicName = this.pulsarConfig.getTopicSerdeClassNameMap().keySet().iterator().next(); + topicName = this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet().iterator().next(); partitionId = Long.toString(((MessageIdImpl) message.getMessageId()).getPartitionIndex()); } @@ -107,13 +108,13 @@ public class PulsarSource<T> implements Source<T> { .sequenceId(message.getSequenceId()) .topicName(topicName) .ackFunction(() -> { - if (pulsarConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { + if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { inputConsumer.acknowledgeCumulativeAsync(message); } else { inputConsumer.acknowledgeAsync(message); } }).failFunction(() -> { - if (pulsarConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { + if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { throw new RuntimeException("Failed to process message: " + message.getMessageId()); } }) @@ -126,17 +127,18 @@ public class PulsarSource<T> implements Source<T> { this.inputConsumer.close(); } - private void setupSerde() throws ClassNotFoundException { + @VisibleForTesting + void setupSerDe() throws ClassNotFoundException { - Class<?> typeArg = Thread.currentThread().getContextClassLoader().loadClass(this.pulsarConfig.getTypeClassName()); + Class<?> typeArg = Thread.currentThread().getContextClassLoader().loadClass(this.pulsarSourceConfig.getTypeClassName()); if (Void.class.equals(typeArg)) { throw new RuntimeException("Input type of Pulsar Function cannot be Void"); } - for (Map.Entry<String, String> entry : this.pulsarConfig.getTopicSerdeClassNameMap().entrySet()) { + for (Map.Entry<String, String> entry : this.pulsarSourceConfig.getTopicSerdeClassNameMap().entrySet()) { String topic = entry.getKey(); String serDeClassname = entry.getValue(); - if (serDeClassname.isEmpty()) { + if (serDeClassname == null || serDeClassname.isEmpty()) { serDeClassname = DefaultSerDe.class.getName(); } SerDe serDe = InstanceUtils.initializeSerDe(serDeClassname, diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java similarity index 87% rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java index 2a5dc44..4d5e540 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java @@ -19,10 +19,7 @@ package org.apache.pulsar.functions.source; import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.Builder; -import lombok.Data; import lombok.Getter; -import lombok.NoArgsConstructor; import lombok.Setter; import lombok.ToString; import org.apache.pulsar.functions.utils.FunctionConfig; @@ -33,7 +30,7 @@ import java.util.Map; @Getter @Setter @ToString -public class PulsarConfig { +public class PulsarSourceConfig { private FunctionConfig.ProcessingGuarantees processingGuarantees; private FunctionConfig.SubscriptionType subscriptionType; @@ -41,8 +38,8 @@ public class PulsarConfig { private Map<String, String> topicSerdeClassNameMap; private String typeClassName; - public static PulsarConfig load(Map<String, Object> map) throws IOException { + public static PulsarSourceConfig load(Map<String, Object> map) throws IOException { ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(new ObjectMapper().writeValueAsString(map), PulsarConfig.class); + return mapper.readValue(new ObjectMapper().writeValueAsString(map), PulsarSourceConfig.class); } } diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index f2763a9..15d2643 100644 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -58,14 +58,10 @@ def main(): parser.add_argument('--name', required=True, help='Function Name') parser.add_argument('--tenant', required=True, help='Tenant Name') parser.add_argument('--namespace', required=True, help='Namespace name') - parser.add_argument('--source_topics_serde_classname', required=True, help='A mapping of Input topics to SerDe') - parser.add_argument('--output_topic', required=False, help='Output Topic') - parser.add_argument('--output_serde_classname', required=False, help='Output Serde Classnames') parser.add_argument('--instance_id', required=True, help='Instance Id') parser.add_argument('--function_id', required=True, help='Function Id') parser.add_argument('--function_version', required=True, help='Function Version') parser.add_argument('--processing_guarantees', required=True, help='Processing Guarantees') - parser.add_argument('--source_subscription_type', required=True, help='Subscription Type') parser.add_argument('--pulsar_serviceurl', required=True, help='Pulsar Service Url') parser.add_argument('--port', required=True, help='Instance Port', type=int) parser.add_argument('--max_buffered_tuples', required=True, help='Maximum number of Buffered tuples') @@ -74,6 +70,10 @@ def main(): parser.add_argument('--logging_file', required=True, help='Log file name') parser.add_argument('--auto_ack', required=True, help='Enable Autoacking?') parser.add_argument('--log_topic', required=False, help='Topic to send Log Messages') + parser.add_argument('--source_subscription_type', required=True, help='Subscription Type') + parser.add_argument('--source_topics_serde_classname', required=True, help='A mapping of Input topics to SerDe') + parser.add_argument('--sink_topic', required=False, help='Sink Topic') + parser.add_argument('--sink_serde_classname', required=False, help='Sink SerDe classname') args = parser.parse_args() log_file = os.path.join(args.logging_directory, @@ -104,10 +104,10 @@ def main(): function_details.source.MergeFrom(sourceSpec) sinkSpec = Function_pb2.SinkSpec() - if args.output_topic != None and len(args.output_topic) != 0: - sinkSpec.topic = args.output_topic - if args.output_serde_classname != None and len(args.output_serde_classname) != 0: - sinkSpec.serDeClassName = args.output_serde_classname + if args.sink_topic != None and len(args.sink_topic) != 0: + sinkSpec.topic = args.sink_topic + if args.sink_serde_classname != None and len(args.sink_serde_classname) != 0: + sinkSpec.serDeClassName = args.sink_serde_classname function_details.sink.MergeFrom(sinkSpec) function_details.processingGuarantees = Function_pb2.ProcessingGuarantees.Value(args.processing_guarantees) diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java index 289dbba..12d4f19 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java @@ -20,20 +20,14 @@ package org.apache.pulsar.functions.instance; import lombok.Getter; import lombok.Setter; -import net.jodah.typetools.TypeResolver; import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.SerDe; -import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.SinkSpec; -import org.testng.annotations.Test; -import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import static org.testng.AssertJUnit.*; - public class JavaInstanceRunnableTest { static class IntegerSerDe implements SerDe<Integer> { @@ -111,79 +105,4 @@ public class JavaInstanceRunnableTest { return null; } } - - /** - * Verify that JavaInstance does support functions that output Void type - */ - @Test - public void testVoidOutputClasses() { - try { - JavaInstanceRunnable runnable = createRunnable(false, DefaultSerDe.class.getName()); - Method method = makeAccessible(runnable); - ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); - VoidOutputHandler pulsarFunction = new VoidOutputHandler(); - Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass()); - method.invoke(runnable, typeArgs, clsLoader); - } catch (Exception ex) { - assertTrue(false); - } - } - - /** - * Verify that Default Serializer works fine. - */ - @Test - public void testDefaultSerDe() { - try { - JavaInstanceRunnable runnable = createRunnable(false, null); - Method method = makeAccessible(runnable); - ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); - Function function = (Function<String, String>) (input, context) -> input + "-lambda"; - Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass()); - method.invoke(runnable, typeArgs, clsLoader); - } catch (Exception ex) { - ex.printStackTrace(); - assertEquals(ex, null); - assertTrue(false); - } - } - - /** - * Verify that Explicit setting of Default Serializer works fine. - */ - @Test - public void testExplicitDefaultSerDe() { - try { - JavaInstanceRunnable runnable = createRunnable(false, DefaultSerDe.class.getName()); - Method method = makeAccessible(runnable); - ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); - Function function = (Function<String, String>) (input, context) -> input + "-lambda"; - Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass()); - method.invoke(runnable, typeArgs, clsLoader); - } catch (Exception ex) { - assertTrue(false); - } - } - - /** - * Verify that function output type should be consistent with output serde type. - */ - @Test - public void testInconsistentOutputType() { - try { - JavaInstanceRunnable runnable = createRunnable(false, IntegerSerDe.class.getName()); - Method method = makeAccessible(runnable); - ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); - Function function = (Function<String, String>) (input, context) -> input + "-lambda"; - Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass()); - method.invoke(runnable, typeArgs, clsLoader); - fail("Should fail constructing java instance if function type is inconsistent with serde type"); - } catch (InvocationTargetException ex) { - assertTrue(ex.getCause().getMessage().startsWith("Inconsistent types found between function output type and output serde type:")); - } catch (Exception ex) { - assertTrue(false); - } - } - - } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java index e6072c8..a22b366 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java @@ -213,7 +213,7 @@ public class MultiConsumersOneOutputTopicProducersTest { @Test public void testGetCloseProducer() throws Exception { String srcTopic = "test-src-topic"; - int ptnIdx = 1234; + String ptnIdx = "1234"; Producer<byte[]> producer = producers.getProducer(srcTopic, ptnIdx); String producerName = makeProducerName(srcTopic, ptnIdx); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java index 7c58c30..2ba4e3f 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java @@ -63,13 +63,13 @@ public class DefaultRuntimeSinkTest { } @Test - public void testWrite() { + public void testWrite() throws Exception { this.runtimeSink.write("test-record"); verify(mockSink, times(1)).write(eq("test-record")); } @Test - public void testWriteAck() { + public void testWriteAck() throws Exception { RecordContext context = mock(RecordContext.class); CompletableFuture<Void> writeFuture = new CompletableFuture<>(); @@ -82,7 +82,7 @@ public class DefaultRuntimeSinkTest { } @Test - public void testWriteFail() { + public void testWriteFail() throws Exception { RecordContext context = mock(RecordContext.class); CompletableFuture<Void> writeFuture = new CompletableFuture<>(); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java similarity index 51% copy from pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java copy to pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java index 558517a..0f826ac 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java @@ -16,21 +16,21 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.source; +package org.apache.pulsar.functions.sink; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException;; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.utils.FunctionConfig; import org.testng.annotations.Test; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyList; @@ -39,18 +39,14 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.testng.AssertJUnit.assertEquals; -import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertTrue; import static org.testng.AssertJUnit.fail; @Slf4j -public class PulsarSourceTest { +public class PulsarSinkTest { - private static final String SUBSCRIPTION_NAME = "test/test-namespace/example"; - private static Map<String, String> topicSerdeClassNameMap = new HashMap<>(); - static { - topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", DefaultSerDe.class.getName()); - } + private static final String TOPIC = "persistent://sample/standalone/ns1/test_result"; + private static final String serDeClassName = DefaultSerDe.class.getName(); public static class TestSerDe implements SerDe<String> { @@ -82,55 +78,125 @@ public class PulsarSourceTest { return pulsarClient; } - private static PulsarConfig getPulsarConfigs() { - PulsarConfig pulsarConfig = new PulsarConfig(); + private static PulsarSinkConfig getPulsarConfigs() { + PulsarSinkConfig pulsarConfig = new PulsarSinkConfig(); pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); pulsarConfig.setSubscriptionType(FunctionConfig.SubscriptionType.FAILOVER); - pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap); + pulsarConfig.setTopic(TOPIC); + pulsarConfig.setSerDeClassName(serDeClassName); pulsarConfig.setTypeClassName(String.class.getName()); return pulsarConfig; } + @Getter + @Setter + public static class ComplexUserDefinedType { + private String name; + private Integer age; + } + + public static class ComplexSerDe implements SerDe<ComplexUserDefinedType> { + @Override + public ComplexUserDefinedType deserialize(byte[] input) { + return null; + } + + @Override + public byte[] serialize(ComplexUserDefinedType input) { + return new byte[0]; + } + } + + /** + * Verify that JavaInstance does support functions that output Void type + */ @Test - public void testVoidInputClasses() throws IOException { - PulsarConfig pulsarConfig = getPulsarConfigs(); + public void testVoidOutputClasses() throws Exception { + PulsarSinkConfig pulsarConfig = getPulsarConfigs(); // set type to void pulsarConfig.setTypeClassName(Void.class.getName()); - PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig); try { - pulsarSource.open(new HashMap<>()); - assertFalse(true); - } catch (RuntimeException ex) { - log.error("RuntimeException: {}", ex, ex); - assertEquals(ex.getMessage(), "Input type of Pulsar Function cannot be Void"); + pulsarSink.setupSerDe(); } catch (Exception ex) { - log.error("Exception: {}", ex, ex); - assertFalse(true); + ex.printStackTrace(); + assertEquals(ex, null); + assertTrue(false); } } - /** - * Verify that function input type should be consistent with input serde type. - */ @Test - public void testInconsistentInputType() throws IOException { - PulsarConfig pulsarConfig = getPulsarConfigs(); + public void testInconsistentOutputType() throws IOException { + PulsarSinkConfig pulsarConfig = getPulsarConfigs(); // set type to be inconsistent to that of SerDe pulsarConfig.setTypeClassName(Integer.class.getName()); - Map<String, String> topicSerdeClassNameMap = new HashMap<>(); - topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", TestSerDe.class.getName()); - pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap); - PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); + pulsarConfig.setSerDeClassName(TestSerDe.class.getName()); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig); try { - pulsarSource.open(new HashMap<>()); + pulsarSink.setupSerDe(); fail("Should fail constructing java instance if function type is inconsistent with serde type"); } catch (RuntimeException ex) { log.error("RuntimeException: {}", ex, ex); - assertTrue(ex.getMessage().startsWith("Inconsistent types found between function input type and input serde type:")); + assertTrue(ex.getMessage().startsWith("Inconsistent types found between function output type and output serde type:")); } catch (Exception ex) { log.error("Exception: {}", ex, ex); assertTrue(false); } } + + /** + * Verify that Default Serializer works fine. + */ + @Test + public void testDefaultSerDe() throws PulsarClientException { + + PulsarSinkConfig pulsarConfig = getPulsarConfigs(); + // set type to void + pulsarConfig.setTypeClassName(String.class.getName()); + pulsarConfig.setSerDeClassName(null); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig); + + try { + pulsarSink.setupSerDe(); + } catch (Exception ex) { + ex.printStackTrace(); + fail(); + } + } + + /** + * Verify that Explicit setting of Default Serializer works fine. + */ + @Test + public void testExplicitDefaultSerDe() throws PulsarClientException { + PulsarSinkConfig pulsarConfig = getPulsarConfigs(); + // set type to void + pulsarConfig.setTypeClassName(String.class.getName()); + pulsarConfig.setSerDeClassName(DefaultSerDe.class.getName()); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig); + + try { + pulsarSink.setupSerDe(); + } catch (Exception ex) { + ex.printStackTrace(); + fail(); + } + } + + @Test + public void testComplexOuputType() throws PulsarClientException { + PulsarSinkConfig pulsarConfig = getPulsarConfigs(); + // set type to void + pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName()); + pulsarConfig.setSerDeClassName(ComplexSerDe.class.getName()); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig); + + try { + pulsarSink.setupSerDe(); + } catch (Exception ex) { + ex.printStackTrace(); + fail(); + } + } } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java index 558517a..77d397c 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.functions.source; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; @@ -82,8 +84,8 @@ public class PulsarSourceTest { return pulsarClient; } - private static PulsarConfig getPulsarConfigs() { - PulsarConfig pulsarConfig = new PulsarConfig(); + private static PulsarSourceConfig getPulsarConfigs() { + PulsarSourceConfig pulsarConfig = new PulsarSourceConfig(); pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); pulsarConfig.setSubscriptionType(FunctionConfig.SubscriptionType.FAILOVER); pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap); @@ -91,9 +93,29 @@ public class PulsarSourceTest { return pulsarConfig; } + @Getter + @Setter + public static class ComplexUserDefinedType { + private String name; + private Integer age; + } + + public static class ComplexSerDe implements SerDe<ComplexUserDefinedType> { + @Override + public ComplexUserDefinedType deserialize(byte[] input) { + return null; + } + + @Override + public byte[] serialize(ComplexUserDefinedType input) { + return new byte[0]; + } + } + + @Test public void testVoidInputClasses() throws IOException { - PulsarConfig pulsarConfig = getPulsarConfigs(); + PulsarSourceConfig pulsarConfig = getPulsarConfigs(); // set type to void pulsarConfig.setTypeClassName(Void.class.getName()); PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); @@ -115,7 +137,7 @@ public class PulsarSourceTest { */ @Test public void testInconsistentInputType() throws IOException { - PulsarConfig pulsarConfig = getPulsarConfigs(); + PulsarSourceConfig pulsarConfig = getPulsarConfigs(); // set type to be inconsistent to that of SerDe pulsarConfig.setTypeClassName(Integer.class.getName()); Map<String, String> topicSerdeClassNameMap = new HashMap<>(); @@ -133,4 +155,64 @@ public class PulsarSourceTest { assertTrue(false); } } + + /** + * Verify that Default Serializer works fine. + */ + @Test + public void testDefaultSerDe() throws PulsarClientException { + + PulsarSourceConfig pulsarConfig = getPulsarConfigs(); + // set type to void + pulsarConfig.setTypeClassName(String.class.getName()); + topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", null); + pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap); + PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); + + try { + pulsarSource.open(new HashMap<>()); + } catch (Exception ex) { + ex.printStackTrace(); + assertEquals(ex, null); + assertTrue(false); + } + } + + /** + * Verify that Explicit setting of Default Serializer works fine. + */ + @Test + public void testExplicitDefaultSerDe() throws PulsarClientException { + PulsarSourceConfig pulsarConfig = getPulsarConfigs(); + // set type to void + pulsarConfig.setTypeClassName(String.class.getName()); + topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", DefaultSerDe.class.getName()); + pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap); + PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); + + try { + pulsarSource.open(new HashMap<>()); + } catch (Exception ex) { + ex.printStackTrace(); + assertEquals(ex, null); + assertTrue(false); + } + } + + @Test + public void testComplexOuputType() throws PulsarClientException { + PulsarSourceConfig pulsarConfig = getPulsarConfigs(); + // set type to void + pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName()); + topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result",ComplexSerDe.class.getName()); + pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap); + PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); + + try { + pulsarSource.setupSerDe(); + } catch (Exception ex) { + ex.printStackTrace(); + fail(); + } + } } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index e0330bb..5bb5cac 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -60,13 +60,6 @@ public class JavaInstanceMain { protected String tenant; @Parameter(names = "--namespace", description = "Namespace Name\n", required = true) protected String namespace; - - @Parameter(names = "--output_topic", description = "Output Topic Name\n") - protected String outputTopicName; - - @Parameter(names = "--output_serde_classname", description = "Output SerDe\n") - protected String outputSerdeClassName; - @Parameter(names = "--log_topic", description = "Log Topic") protected String logTopic; @@ -112,6 +105,17 @@ public class JavaInstanceMain { @Parameter(names = "--source_topics_serde_classname", description = "A map of topics to SerDe for the source", required = true) protected String sourceTopicsSerdeClassName; + @Parameter(names = "--sink_configs", description = "The sink configs\n") + protected String sinkConfigs; + + @Parameter(names = "--sink_classname", description = "The sink classname\n", required = true) + protected String sinkClassname; + + @Parameter(names = "--sink_topic", description = "The sink Topic Name\n", required = true) + protected String sinkTopic; + + @Parameter(names = "--sink_serde_classname", description = "Sink SerDe\n") + protected String sinkSerdeClassName; private Server server; @@ -130,15 +134,6 @@ public class JavaInstanceMain { functionDetailsBuilder.setName(functionName); functionDetailsBuilder.setClassName(className); - SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder(); - if (outputSerdeClassName != null) { - sinkSpecBuilder.setSerDeClassName(outputSerdeClassName); - } - if (outputTopicName != null) { - sinkSpecBuilder.setTopic(outputTopicName); - } - functionDetailsBuilder.setSink(sinkSpecBuilder); - if (logTopic != null) { functionDetailsBuilder.setLogTopic(logTopic); } @@ -154,6 +149,7 @@ public class JavaInstanceMain { functionDetailsBuilder.putAllUserConfig(userConfigMap); } + // Setup source SourceSpec.Builder sourceDetailsBuilder = SourceSpec.newBuilder(); sourceDetailsBuilder.setClassName(sourceClassname); if (sourceConfigs != null && !sourceConfigs.isEmpty()) {; @@ -165,6 +161,18 @@ public class JavaInstanceMain { functionDetailsBuilder.setSource(sourceDetailsBuilder); + // Setup sink + SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder(); + sinkSpecBuilder.setClassName(sinkClassname); + if (sinkConfigs != null) { + sinkSpecBuilder.setConfigs(sinkConfigs); + } + if (sinkSerdeClassName != null) { + sinkSpecBuilder.setSerDeClassName(sinkSerdeClassName); + } + sinkSpecBuilder.setTopic(sinkTopic); + functionDetailsBuilder.setSink(sinkSpecBuilder); + FunctionDetails functionDetails = functionDetailsBuilder.build(); instanceConfig.setFunctionDetails(functionDetails); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index 3aae26f..72a5801 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -118,16 +118,7 @@ class ProcessRuntime implements Runtime { } else { args.add("false"); } - if (instanceConfig.getFunctionDetails().getSink().getTopic() != null - && !instanceConfig.getFunctionDetails().getSink().getTopic().isEmpty()) { - args.add("--output_topic"); - args.add(instanceConfig.getFunctionDetails().getSink().getTopic()); - } - if (instanceConfig.getFunctionDetails().getSink().getSerDeClassName() != null - && !instanceConfig.getFunctionDetails().getSink().getSerDeClassName().isEmpty()) { - args.add("--output_serde_classname"); - args.add(instanceConfig.getFunctionDetails().getSink().getSerDeClassName()); - } + args.add("--processing_guarantees"); args.add(String.valueOf(instanceConfig.getFunctionDetails().getProcessingGuarantees())); args.add("--pulsar_serviceurl"); @@ -143,6 +134,7 @@ class ProcessRuntime implements Runtime { args.add("--port"); args.add(String.valueOf(instancePort)); + // source related configs if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) { if (!instanceConfig.getFunctionDetails().getSource().getClassName().isEmpty()) { args.add("--source_classname"); @@ -159,6 +151,29 @@ class ProcessRuntime implements Runtime { args.add("--source_topics_serde_classname"); args.add(new Gson().toJson(instanceConfig.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap())); + + // sink related configs + if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) { + if (!instanceConfig.getFunctionDetails().getSink().getClassName().isEmpty()) { + args.add("--sink_classname"); + args.add(instanceConfig.getFunctionDetails().getSink().getClassName()); + } + String sinkConfigs = instanceConfig.getFunctionDetails().getSink().getConfigs(); + if (sinkConfigs != null && !sinkConfigs.isEmpty()) { + args.add("--sink_configs"); + args.add(sinkConfigs); + } + } + if (instanceConfig.getFunctionDetails().getSink().getTopic() != null + && !instanceConfig.getFunctionDetails().getSink().getTopic().isEmpty()) { + args.add("--sink_topic"); + args.add(instanceConfig.getFunctionDetails().getSink().getTopic()); + } + if (instanceConfig.getFunctionDetails().getSink().getSerDeClassName() != null + && !instanceConfig.getFunctionDetails().getSink().getSerDeClassName().isEmpty()) { + args.add("--sink_serde_classname"); + args.add(instanceConfig.getFunctionDetails().getSink().getSerDeClassName()); + } return args; } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java index d0c6b3c..b87e636 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java @@ -84,6 +84,7 @@ public class ProcessRuntimeTest { functionDetailsBuilder.setSink(Function.SinkSpec.newBuilder() .setTopic(TEST_NAME + "-output") .setSerDeClassName("org.apache.pulsar.functions.runtime.serde.Utf8Serializer") + .setClassName("org.pulsar.pulsar.TestSink") .build()); functionDetailsBuilder.setLogTopic(TEST_NAME + "-log"); functionDetailsBuilder.setSource(Function.SourceSpec.newBuilder() @@ -111,7 +112,7 @@ public class ProcessRuntimeTest { ProcessRuntime container = factory.createContainer(config, userJarFile); List<String> args = container.getProcessArgs(); - assertEquals(args.size(), 45); + assertEquals(args.size(), 47); String expectedArgs = "java -cp " + javaInstanceJarFile + " -Dlog4j.configurationFile=java_instance_log4j2.yml " + "-Dpulsar.log.dir=" + logDirectory + "/functions" + " -Dpulsar.log.file=" + config.getFunctionDetails().getName() + " org.apache.pulsar.functions.runtime.JavaInstanceMain" @@ -123,14 +124,15 @@ public class ProcessRuntimeTest { + " --function_classname " + config.getFunctionDetails().getClassName() + " --log_topic " + config.getFunctionDetails().getLogTopic() + " --auto_ack false" - + " --output_topic " + config.getFunctionDetails().getSink().getTopic() - + " --output_serde_classname " + config.getFunctionDetails().getSink().getSerDeClassName() + " --processing_guarantees ATLEAST_ONCE" + " --pulsar_serviceurl " + pulsarServiceUrl - + " --max_buffered_tuples 1024 --port " + args.get(38) + + " --max_buffered_tuples 1024 --port " + args.get(34) + " --source_classname " + config.getFunctionDetails().getSource().getClassName() + " --source_subscription_type " + config.getFunctionDetails().getSource().getSubscriptionType().name() - + " --source_topics_serde_classname " + new Gson().toJson(topicsToSerDeClassName); + + " --source_topics_serde_classname " + new Gson().toJson(topicsToSerDeClassName) + + " --sink_classname " + config.getFunctionDetails().getSink().getClassName() + + " --sink_topic " + config.getFunctionDetails().getSink().getTopic() + + " --sink_serde_classname " + config.getFunctionDetails().getSink().getSerDeClassName(); assertEquals(expectedArgs, String.join(" ", args)); } @@ -151,13 +153,13 @@ public class ProcessRuntimeTest { + " --function_classname " + config.getFunctionDetails().getClassName() + " --log_topic " + config.getFunctionDetails().getLogTopic() + " --auto_ack false" - + " --output_topic " + config.getFunctionDetails().getSink().getTopic() - + " --output_serde_classname " + config.getFunctionDetails().getSink().getSerDeClassName() + " --processing_guarantees ATLEAST_ONCE" + " --pulsar_serviceurl " + pulsarServiceUrl - + " --max_buffered_tuples 1024 --port " + args.get(37) + + " --max_buffered_tuples 1024 --port " + args.get(33) + " --source_subscription_type " + config.getFunctionDetails().getSource().getSubscriptionType().name() - + " --source_topics_serde_classname " + new Gson().toJson(topicsToSerDeClassName); + + " --source_topics_serde_classname " + new Gson().toJson(topicsToSerDeClassName) + + " --sink_topic " + config.getFunctionDetails().getSink().getTopic() + + " --sink_serde_classname " + config.getFunctionDetails().getSink().getSerDeClassName(); assertEquals(expectedArgs, String.join(" ", args)); } -- To stop receiving notification emails like this one, please contact si...@apache.org.