That makes sense, I will give it a try. Lowell
________________________________ From: Sijie Guo <guosi...@gmail.com> Sent: Tuesday, July 3, 2018 2:33 PM To: Schaffert, Lowell (US); users@pulsar.incubator.apache.org Subject: Re: EXTERNAL: Re: Pulsar functions Hi Schaffert, Ah, I think the problem comes from the confusion between `--inputs` and `--customSerdeInputs`. These two flags are different, `--input` accepts the list of topics which use DefaultSerDe, `--customSerdeInputs` accepts the list of topics with customized SerDe. The total inputs of a function is the combination of `--input` and `--customSerdeInputs`. so if you are using custom serde, you just need `--customSerdeInputs`, you shouldn't specify the topic using custom serde in `--inputs`. so if you remove `--inputs test-topic`, your function should work. - Sijie On Tue, Jul 3, 2018 at 11:56 AM Schaffert, Lowell <lowell.schaff...@lmco.com<mailto:lowell.schaff...@lmco.com>> wrote: Basically using a byte[] was my backup to deal with the problem: [apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions create \ > --jar > /hdd/lowell/workspace3/pulsarsubdemo/pulsar-function/target/pulsar-function-1.0-SNAPSHOT-jar-with-dependencies.jar > \ > --className com.demo.pulsar.Function1a \ > --tenant public \ > --namespace default \ > --name function1a \ > --inputs test-topic \ > --customSerdeInputs > {"test-topic":"com.demo.pulsar.datatypes.DataWrapperSerde"} \ > --outputSerdeClassName com.demo.pulsar.datatypes.DataWrapperSerde \ > --logTopic function-log-topic java.lang.RuntimeException: Default Serializer does not support type class com.demo.pulsar.datatypes.DataWrapper at org.apache.pulsar.admin.cli.CmdFunctions$FunctionDetailsCommand.lambda$doJavaSubmitChecks$2(CmdFunctions.java:423) at java.util.Arrays$ArrayList.forEach(Arrays.java:3880) at org.apache.pulsar.admin.cli.CmdFunctions$FunctionDetailsCommand.doJavaSubmitChecks(CmdFunctions.java:421) at org.apache.pulsar.admin.cli.CmdFunctions$FunctionDetailsCommand.processArguments(CmdFunctions.java:284) at org.apache.pulsar.admin.cli.CmdFunctions$BaseCommand.run(CmdFunctions.java:104) at org.apache.pulsar.admin.cli.CmdBase.run(CmdBase.java:61) at org.apache.pulsar.admin.cli.PulsarAdminTool.run(PulsarAdminTool.java:167) at org.apache.pulsar.admin.cli.PulsarAdminTool.main(PulsarAdminTool.java:217) public class Function1a implements Function<DataWrapper, Void> { // where to publish to private static final String TOPIC_NAME = "fun-str-topic"; @Override public Void process(DataWrapper dataWrapper, Context context) { Logger LOGGER = context.getLogger(); LOGGER.info("Function1a received message"); String message = dataWrapper.getPayload().getMessage(); context.publish(TOPIC_NAME, "Function1a-" + message); return null; } } The log entry: 09:50:02,590 INFO [public/default/Function1] [instance-0] JavaInstanceRunnable - Starting Java Instance Function1 09:50:02,601 INFO [public/default/Function1] [instance-0] JavaInstanceRunnable - Loading JAR files for function InstanceConfig(instanceId=0, functionId=46b384ce-bbbb-4afe-88d3-a7a31f625677, functionVersion=4ed44d36-dd75-4d74-bca1-054f90e332a1, functionDetails=tenant: "public" namespace: "default" name: "Function1" className: "com.demo.pulsar.Function1" logTopic: "log-topic" autoAck: true source { topicsToSerDeClassName { key: "test-topic" value: "" } } sink { topic: "test-topic-Function1-output" } , maxBufferedTuples=1024) from jarFile /hdd/lowell/workspace3/pulsarsubdemo/pulsar-function/target/pulsar-function-1.0-SNAPSHOT-jar-with-dependencies.jar 09:50:02,614 INFO [public/default/Function1] [instance-0] JavaInstanceRunnable - Initialize function class loader for function Function1 at function cache manager 09:50:02,626 INFO [public/default/Function1] [instance-0] MessageProcessorBase - Starting producer for output topic test-topic-Function1-output 09:50:03,017 INFO [public/default/Function1] [instance-0] MessageProcessorBase - Error occurred executing open for source: java.lang.ClassNotFoundException: [B at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_171] at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_171] at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_171] at org.apache.pulsar.functions.source.PulsarSource.setupSerde(PulsarSource.java:131) ~[java-instance.jar:2.0.1-incubating] at org.apache.pulsar.functions.source.PulsarSource.open(PulsarSource.java:58) ~[java-instance.jar:2.0.1-incubating] at org.apache.pulsar.functions.instance.processors.MessageProcessorBase.setupInput(MessageProcessorBase.java:103) [java-instance.jar:2.0.1-incubating] at org.apache.pulsar.functions.instance.processors.AtLeastOnceProcessor.setupInput(AtLeastOnceProcessor.java:36) [java-instance.jar:2.0.1-incubating] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupJavaInstance(JavaInstanceRunnable.java:158) [java-instance.jar:2.0.1-incubating] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:171) [java-instance.jar:2.0.1-incubating] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171] 09:50:03,054 ERROR [public/default/Function1] [instance-0] JavaInstanceRunnable - Uncaught exception in Java Instance java.lang.NullPointerException: null at org.apache.pulsar.functions.source.PulsarSource.read(PulsarSource.java:71) ~[java-instance.jar:2.0.1-incubating] at org.apache.pulsar.functions.instance.processors.MessageProcessorBase.recieveMessage(MessageProcessorBase.java:112) ~[java-instance.jar:2.0.1-incubating] at org.apache.pulsar.functions.instance.processors.AtLeastOnceProcessor.recieveMessage(AtLeastOnceProcessor.java:36) ~[java-instance.jar:2.0.1-incubating] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:174) [java-instance.jar:2.0.1-incubating] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171] 09:50:03,055 WARN [public/default/Function1] [instance-0] MessageProcessorBase - Failed to close source org.apache.pulsar.functions.source.PulsarSource@13e23117 java.lang.NullPointerException: null at org.apache.pulsar.functions.source.PulsarSource.close(PulsarSource.java:126) ~[java-instance.jar:2.0.1-incubating] at org.apache.pulsar.functions.instance.processors.MessageProcessorBase.close(MessageProcessorBase.java:152) [java-instance.jar:2.0.1-incubating] at org.apache.pulsar.functions.instance.processors.AtLeastOnceProcessor.close(AtLeastOnceProcessor.java:68) [java-instance.jar:2.0.1-incubating] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.close(JavaInstanceRunnable.java:346) [java-instance.jar:2.0.1-incubating] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:230) [java-instance.jar:2.0.1-incubating] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171] Lowell Schaffert ________________________________ From: Sijie Guo <guosi...@gmail.com<mailto:guosi...@gmail.com>> Sent: Tuesday, July 3, 2018 12:18:00 PM To: users@pulsar.incubator.apache.org<mailto:users@pulsar.incubator.apache.org> Subject: EXTERNAL: Re: Pulsar functions Can you check the logs in the broker? There should be logs under `logs/functions/<tenant>/<namespace>/<function>`. I think the problem comes from using an array as the generic type. It will fail on resolving array type when you are using 2.0 release. There is a fix coming in 2.1 release to support array as the generic type. https://github.com/apache/incubator-pulsar/pull/1837 The 2.1 release would come out in 1~2 weeks or so. - Sijie On Tue, Jul 3, 2018 at 10:12 AM Schaffert, Lowell <lowell.schaff...@lmco.com<mailto:lowell.schaff...@lmco.com><mailto:lowell.schaff...@lmco.com<mailto:lowell.schaff...@lmco.com>>> wrote: I have created two functions, can anyone explain why function 2 will not run? public class Function1 implements Function<String, Void> { // where to publish to private static final String TOPIC_NAME = "fun-str-topic"; @Override public Void process(String input, Context context) { Logger LOGGER = context.getLogger(); LOGGER.info("Function1 received message {}", input); context.publish(TOPIC_NAME, "Function1-" + input); return null; } } public class Function2 implements Function<byte[], Void> { // where to publish to private static final String TOPIC_NAME = "fun-str-topic"; @Override public Void process(byte[] bytes, Context context) { Logger LOGGER = context.getLogger(); LOGGER.error("Function2 received message"); return null; } } [apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions create \ > --jar > /hdd/lowell/workspace3/pulsarsubdemo/pulsar-function/target/pulsar-function-1.0-SNAPSHOT-jar-with-dependencies.jar > \ > --className com.demo.pulsar.Function1 \ > --tenant public \ > --namespace default \ > --name function1 \ > --inputs fun-test-topic \ > --logTopic function-log-topic "Created successfully" [apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions create \ > --jar > /hdd/lowell/workspace3/pulsarsubdemo/pulsar-function/target/pulsar-function-1.0-SNAPSHOT-jar-with-dependencies.jar > \ > --className com.demo.pulsar.Function2 \ > --tenant public \ > --namespace default \ > --name function2 \ > --inputs bin-fun-test-topic \ > --logTopic function-log-topic "Created successfully" [apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions getstatus \ > --name function1 \ > --namespace default \ > --tenant public { "functionStatusList": [ { "running": true, "instanceId": "0" } ] } [apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions getstatus --name function2 --namespace default --tenant public { "functionStatusList": [ { "failureException": "UNAVAILABLE", "instanceId": "0" } ] } Thanks, Lowell