sijie closed pull request #2137: Added more Java and Python examples and made the examples match across languages URL: https://github.com/apache/incubator-pulsar/pull/2137
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConfigBasedAppendFunction.java similarity index 68% rename from pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java rename to pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConfigBasedAppendFunction.java index fb3ceb01c7..1edd943c1f 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConfigBasedAppendFunction.java @@ -20,24 +20,22 @@ import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; -import org.slf4j.Logger; import java.util.Optional; -public class UserConfigFunction implements Function<String, Void> { +/** + * Function that appends something to incoming input based on config supplied + */ +public class ConfigBasedAppendFunction implements Function<String, String> { @Override - public Void process(String input, Context context) { + public String process(String input, Context context) { String key = "config-key"; - Optional<Object> maybeValue = context.getUserConfigValue(key); - Logger LOG = context.getLogger(); + Optional<Object> appendValue = context.getUserConfigValue(key); - if (maybeValue.isPresent()) { - String value = (String) maybeValue.get(); - LOG.info("The config value is {}", value); + if (appendValue.isPresent()) { + return input + (String) appendValue.get(); } else { - LOG.error("No value present for the key {}", key); + return input + "!"; } - - return null; } } diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomObjectFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomObjectFunction.java new file mode 100644 index 0000000000..d2db0a69b8 --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomObjectFunction.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Function; +import org.apache.pulsar.functions.api.examples.serde.CustomObject; + +/** + * Function that deals with custom objects + */ +public class CustomObjectFunction implements Function<CustomObject, CustomObject> { + + @Override + public CustomObject process(CustomObject input, Context context) { + return new CustomObject(input.getValue() + 100); + } +} diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java index 097c12b5b7..7d4d2ec79a 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java @@ -21,6 +21,10 @@ import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; +/** + * The classic Exclamation Function that appends an exclamation at the end + * of the input + */ public class ExclamationFunction implements Function<String, String> { @Override public String process(String input, Context context) { diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java index e8e12aea73..4b73678db3 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java @@ -18,28 +18,20 @@ */ package org.apache.pulsar.functions.api.examples; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; -import org.slf4j.Logger; /** - * A function with logging example. + * A function that demonstrates how to redirect logging to a topic. + * In this particular example, for every input string, the function + * does some logging. If --logTopic topic is specified, these log statements + * end up in that specified pulsar topic */ public class LoggingFunction implements Function<String, String> { - private final AtomicInteger counter = new AtomicInteger(0); - @Override public String process(String input, Context context) { - Logger LOG = context.getLogger(); - - int counterLocal = counter.incrementAndGet(); - if ((counterLocal & Integer.MAX_VALUE) % 100000 == 0) { - LOG.info("Handled {} messages", counterLocal); - } - + context.getLogger().info(input + "-log"); return String.format("%s!", input); } diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java index e97b692f9d..02d461636c 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java @@ -22,12 +22,16 @@ import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.utils.DefaultSerDe; +/** + * Example function that uses the built in publish function in the context + * to publish to a desired topic based on config + */ public class PublishFunction implements Function<String, Void> { @Override public Void process(String input, Context context) { - String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "persistent://sample/standalone/ns1/publish"); + String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "publishtopic"); String output = String.format("%s!", input); - context.publish(publishTopic, output, DefaultSerDe.class.getName()); + context.publish(publishTopic, output); return null; } } diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserMetricFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserMetricFunction.java index e23d37f9af..c81cadbfa2 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserMetricFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserMetricFunction.java @@ -21,10 +21,14 @@ import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; +/** + * Example function that wants to keep track of the rate of letters + * seen in input. + */ public class UserMetricFunction implements Function<String, Void> { @Override public Void process(String input, Context context) { - context.recordMetric("MyMetricName", 1); + context.recordMetric("LetterCount", input.length()); return null; } } diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/VoidFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/VoidFunction.java index abcc663320..4f7f5af98d 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/VoidFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/VoidFunction.java @@ -21,6 +21,9 @@ import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; +/** + * Example function that does not return any value + */ public class VoidFunction implements Function<String, Void> { @Override public Void process(String input, Context context) { diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java index 189b2ca103..ae01cecad4 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java @@ -21,20 +21,15 @@ import lombok.extern.slf4j.Slf4j; import java.util.Collection; -import java.util.function.BinaryOperator; import java.util.function.Function; +/** + * Example Function that acts on a window of tuples at a time rather than per tuple basis. + */ @Slf4j public class WindowFunction implements Function <Collection<Integer>, Integer> { @Override public Integer apply(Collection<Integer> integers) { - - int sum = integers.stream().reduce(new BinaryOperator<Integer>() { - @Override - public Integer apply(Integer integer, Integer integer2) { - return integer + integer2; - } - }).get(); - return sum; + return integers.stream().reduce(0, (x, y) -> x + y); } } diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CounterFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WordCountFunction.java similarity index 78% rename from pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CounterFunction.java rename to pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WordCountFunction.java index 9bc25db923..d9e6747ab9 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CounterFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WordCountFunction.java @@ -23,11 +23,16 @@ import java.util.Arrays; -public class CounterFunction implements Function<String, Void> { +/** + * The classic word count example done using pulsar functions + * Each input message is a sentence that split into words and each word counted. + * The built in counter state is used to keep track of the word count in a + * persistent and consistent manner. + */ +public class WordCountFunction implements Function<String, Void> { @Override public Void process(String input, Context context) throws Exception { Arrays.asList(input.split("\\.")).forEach(word -> context.incrCounter(word, 1)); - return null; } } diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObject.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObject.java new file mode 100644 index 0000000000..777bbdcb4b --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObject.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples.serde; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +/** + * This class simulates a user defined POJO + */ +@Getter +@Setter +@AllArgsConstructor +public class CustomObject { + private long value; +} diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObjectSerde.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObjectSerde.java new file mode 100644 index 0000000000..fd5bcdccf6 --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObjectSerde.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples.serde; + +import org.apache.pulsar.functions.api.SerDe; + +import java.nio.ByteBuffer; + +/** + * This class takes care of serializing/deserializing CustomObject + */ +public class CustomObjectSerde implements SerDe<CustomObject> { + @Override + public CustomObject deserialize(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.wrap(bytes); + return new CustomObject(buffer.getLong()); + } + + @Override + public byte[] serialize(CustomObject object) { + ByteBuffer buffer = ByteBuffer.allocate(8); + buffer.putLong(object.getValue()); + return buffer.array(); + } +} diff --git a/pulsar-functions/java-examples/src/main/resources/example-stateful-function-config.yaml b/pulsar-functions/java-examples/src/main/resources/example-stateful-function-config.yaml index 4c758c9ddd..605b6b0dcb 100644 --- a/pulsar-functions/java-examples/src/main/resources/example-stateful-function-config.yaml +++ b/pulsar-functions/java-examples/src/main/resources/example-stateful-function-config.yaml @@ -20,7 +20,7 @@ tenant: "test" namespace: "test-namespace" name: "stateful-example" -className: "org.apache.pulsar.functions.api.examples.CounterFunction" +className: "org.apache.pulsar.functions.api.examples.WordCountFunction" inputs: ["test_stateful_src"] userConfig: "PublishTopic": "test_stateful_result" diff --git a/pulsar-functions/python-examples/config_based_append_function.py b/pulsar-functions/python-examples/config_based_append_function.py new file mode 100755 index 0000000000..34597368de --- /dev/null +++ b/pulsar-functions/python-examples/config_based_append_function.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +from pulsar import Function + +# Function that appends something to incoming input based on config supplied +class ConfigBasedAppendFunction(Function): + def __init__(self): + pass + + def process(self, input, context): + key = "config-key" + append_value = "!" + if key in context.get_user_config_map: + append_value = context.get_user_config_value(append_value) + return input + append_value diff --git a/pulsar-functions/python-examples/custom_object_function.py b/pulsar-functions/python-examples/custom_object_function.py new file mode 100755 index 0000000000..53ba597445 --- /dev/null +++ b/pulsar-functions/python-examples/custom_object_function.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +from pulsar import Function, SerDe + +class MyObject(object): + def __init__(self): + self.a = 0 + self.b = 0 + +class CustomSerDe(SerDe): + def __init__(self): + pass + + def serialize(self, object): + return "%d,%d" % (object.a, object.b) + + def deserialize(self, input_bytes): + split = str(input_bytes).split(',') + retval = MyObject() + retval.a = int(split[0]) + retval.b = int(split[1]) + return retval + +# Function that deals with custom objects +class CustomObjectFunction(Function): + def __init__(self): + pass + + def process(self, input, context): + retval = MyObject() + retval.a = input.a + 11 + retval.b = input.b + 24 + return retval \ No newline at end of file diff --git a/pulsar-functions/python-examples/logfunction.py b/pulsar-functions/python-examples/exclamation_function.py similarity index 88% rename from pulsar-functions/python-examples/logfunction.py rename to pulsar-functions/python-examples/exclamation_function.py index 504cdc5215..5df5de55b5 100755 --- a/pulsar-functions/python-examples/logfunction.py +++ b/pulsar-functions/python-examples/exclamation_function.py @@ -21,10 +21,11 @@ from pulsar import Function -class LogFunction(Function): +# The classic ExclamationFunction that appends an exclamation at the end +# of the input +class ExclamationFunction(Function): def __init__(self): pass def process(self, input, context): - context.get_logger().info(input) return input + '!' diff --git a/pulsar-functions/python-examples/logging_function.py b/pulsar-functions/python-examples/logging_function.py new file mode 100755 index 0000000000..54f247389e --- /dev/null +++ b/pulsar-functions/python-examples/logging_function.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +from pulsar import Function + +# A function that demonstrates how to redirect logging to a topic. +# In this particular example, for every input string, the function +# does some logging. If --logTopic topic is specified, these log +# statements end up in that specified pulsar topic +class LoggingFunction(Function): + def __init__(self): + pass + + def process(self, input, context): + context.get_logger().info(input + '-log') + return input + '!' diff --git a/pulsar-functions/python-examples/pure_python_function_exclamation.py b/pulsar-functions/python-examples/native_exclamation_function.py similarity index 100% rename from pulsar-functions/python-examples/pure_python_function_exclamation.py rename to pulsar-functions/python-examples/native_exclamation_function.py diff --git a/pulsar-functions/python-examples/publish_function.py b/pulsar-functions/python-examples/publish_function.py new file mode 100755 index 0000000000..c0c3c91f6a --- /dev/null +++ b/pulsar-functions/python-examples/publish_function.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +from pulsar import Function + +# Example function that uses the built in publish function in the context +# to publish to a desired topic based on config +class PublishFunction(Function): + def __init__(self): + pass + + def process(self, input, context): + publish_topic = "publishtopic" + if "publish-topic" in context.get_user_config_map: + publish_topic = context.get_user_config_value("publish-topic") + context.publish(publish_topic, input + '!') + return diff --git a/pulsar-functions/python-examples/exclamation.py b/pulsar-functions/python-examples/void_function.py similarity index 90% rename from pulsar-functions/python-examples/exclamation.py rename to pulsar-functions/python-examples/void_function.py index d8a14f76f5..afa44df820 100755 --- a/pulsar-functions/python-examples/exclamation.py +++ b/pulsar-functions/python-examples/void_function.py @@ -21,9 +21,10 @@ from pulsar import Function -class Exclamation(Function): +# Example function that does not return any value +class VoidFunction(Function): def __init__(self): pass def process(self, input, context): - return input + '!' + return diff --git a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java index af767d9b22..180e7f9375 100644 --- a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java +++ b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java @@ -36,9 +36,9 @@ "org.apache.pulsar.functions.api.examples.ExclamationFunction"; public static final String EXCLAMATION_PYTHON_CLASS = - "exclamation.Exclamation"; + "exclamation.ExclamationFunction"; - public static final String EXCLAMATION_PYTHON_FILE = "exclamation.py"; + public static final String EXCLAMATION_PYTHON_FILE = "exclamation_function.py"; protected static String getExclamationClass(Runtime runtime) { if (Runtime.JAVA == runtime) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services