Hi Schaffert, Ah, I think the problem comes from the confusion between `--inputs` and `--customSerdeInputs`. These two flags are different, `--input` accepts the list of topics which use DefaultSerDe, `--customSerdeInputs` accepts the list of topics with customized SerDe.
The total inputs of a function is the combination of `--input` and `--customSerdeInputs`. so if you are using custom serde, you just need `--customSerdeInputs`, you shouldn't specify the topic using custom serde in `--inputs`. so if you remove `--inputs test-topic`, your function should work. - Sijie On Tue, Jul 3, 2018 at 11:56 AM Schaffert, Lowell <lowell.schaff...@lmco.com> wrote: > Basically using a byte[] was my backup to deal with the problem: > > [apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions create \ > > --jar > /hdd/lowell/workspace3/pulsarsubdemo/pulsar-function/target/pulsar-function-1.0-SNAPSHOT-jar-with-dependencies.jar > \ > > --className com.demo.pulsar.Function1a \ > > --tenant public \ > > --namespace default \ > > --name function1a \ > > --inputs test-topic \ > > --customSerdeInputs > {"test-topic":"com.demo.pulsar.datatypes.DataWrapperSerde"} \ > > --outputSerdeClassName com.demo.pulsar.datatypes.DataWrapperSerde \ > > --logTopic function-log-topic > java.lang.RuntimeException: Default Serializer does not support type class > com.demo.pulsar.datatypes.DataWrapper > at > org.apache.pulsar.admin.cli.CmdFunctions$FunctionDetailsCommand.lambda$doJavaSubmitChecks$2(CmdFunctions.java:423) > at java.util.Arrays$ArrayList.forEach(Arrays.java:3880) > at > org.apache.pulsar.admin.cli.CmdFunctions$FunctionDetailsCommand.doJavaSubmitChecks(CmdFunctions.java:421) > at > org.apache.pulsar.admin.cli.CmdFunctions$FunctionDetailsCommand.processArguments(CmdFunctions.java:284) > at > org.apache.pulsar.admin.cli.CmdFunctions$BaseCommand.run(CmdFunctions.java:104) > at org.apache.pulsar.admin.cli.CmdBase.run(CmdBase.java:61) > at > org.apache.pulsar.admin.cli.PulsarAdminTool.run(PulsarAdminTool.java:167) > at > org.apache.pulsar.admin.cli.PulsarAdminTool.main(PulsarAdminTool.java:217) > > public class Function1a implements Function<DataWrapper, Void> { > // where to publish to > private static final String TOPIC_NAME = "fun-str-topic"; > @Override > public Void process(DataWrapper dataWrapper, Context context) { > Logger LOGGER = context.getLogger(); > > LOGGER.info("Function1a received message"); > String message = dataWrapper.getPayload().getMessage(); > context.publish(TOPIC_NAME, "Function1a-" + message); > return null; > } > } > > The log entry: > > 09:50:02,590 INFO [public/default/Function1] [instance-0] > JavaInstanceRunnable - Starting Java Instance Function1 > 09:50:02,601 INFO [public/default/Function1] [instance-0] > JavaInstanceRunnable - Loading JAR files for function > InstanceConfig(instanceId=0, > functionId=46b384ce-bbbb-4afe-88d3-a7a31f625677, > functionVersion=4ed44d36-dd75-4d74-bca1-054f90e332a1, > functionDetails=tenant: "public" > namespace: "default" > name: "Function1" > className: "com.demo.pulsar.Function1" > logTopic: "log-topic" > autoAck: true > source { > topicsToSerDeClassName { > key: "test-topic" > value: "" > } > } > sink { > topic: "test-topic-Function1-output" > } > , maxBufferedTuples=1024) from jarFile > /hdd/lowell/workspace3/pulsarsubdemo/pulsar-function/target/pulsar-function-1.0-SNAPSHOT-jar-with-dependencies.jar > 09:50:02,614 INFO [public/default/Function1] [instance-0] > JavaInstanceRunnable - Initialize function class loader for function > Function1 at function cache manager > 09:50:02,626 INFO [public/default/Function1] [instance-0] > MessageProcessorBase - Starting producer for output topic > test-topic-Function1-output > 09:50:03,017 INFO [public/default/Function1] [instance-0] > MessageProcessorBase - Error occurred executing open for source: > java.lang.ClassNotFoundException: [B > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > ~[?:1.8.0_171] > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_171] > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_171] > at > org.apache.pulsar.functions.source.PulsarSource.setupSerde(PulsarSource.java:131) > ~[java-instance.jar:2.0.1-incubating] > at > org.apache.pulsar.functions.source.PulsarSource.open(PulsarSource.java:58) > ~[java-instance.jar:2.0.1-incubating] > at > org.apache.pulsar.functions.instance.processors.MessageProcessorBase.setupInput(MessageProcessorBase.java:103) > [java-instance.jar:2.0.1-incubating] > at > org.apache.pulsar.functions.instance.processors.AtLeastOnceProcessor.setupInput(AtLeastOnceProcessor.java:36) > [java-instance.jar:2.0.1-incubating] > at > org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupJavaInstance(JavaInstanceRunnable.java:158) > [java-instance.jar:2.0.1-incubating] > at > org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:171) > [java-instance.jar:2.0.1-incubating] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171] > 09:50:03,054 ERROR [public/default/Function1] [instance-0] > JavaInstanceRunnable - Uncaught exception in Java Instance > java.lang.NullPointerException: null > at > org.apache.pulsar.functions.source.PulsarSource.read(PulsarSource.java:71) > ~[java-instance.jar:2.0.1-incubating] > at > org.apache.pulsar.functions.instance.processors.MessageProcessorBase.recieveMessage(MessageProcessorBase.java:112) > ~[java-instance.jar:2.0.1-incubating] > at > org.apache.pulsar.functions.instance.processors.AtLeastOnceProcessor.recieveMessage(AtLeastOnceProcessor.java:36) > ~[java-instance.jar:2.0.1-incubating] > at > org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:174) > [java-instance.jar:2.0.1-incubating] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171] > 09:50:03,055 WARN [public/default/Function1] [instance-0] > MessageProcessorBase - Failed to close source > org.apache.pulsar.functions.source.PulsarSource@13e23117 > java.lang.NullPointerException: null > at > org.apache.pulsar.functions.source.PulsarSource.close(PulsarSource.java:126) > ~[java-instance.jar:2.0.1-incubating] > at > org.apache.pulsar.functions.instance.processors.MessageProcessorBase.close(MessageProcessorBase.java:152) > [java-instance.jar:2.0.1-incubating] > at > org.apache.pulsar.functions.instance.processors.AtLeastOnceProcessor.close(AtLeastOnceProcessor.java:68) > [java-instance.jar:2.0.1-incubating] > at > org.apache.pulsar.functions.instance.JavaInstanceRunnable.close(JavaInstanceRunnable.java:346) > [java-instance.jar:2.0.1-incubating] > at > org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:230) > [java-instance.jar:2.0.1-incubating] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171] > > > Lowell Schaffert > > > > ________________________________ > From: Sijie Guo <guosi...@gmail.com> > Sent: Tuesday, July 3, 2018 12:18:00 PM > To: users@pulsar.incubator.apache.org > Subject: EXTERNAL: Re: Pulsar functions > > Can you check the logs in the broker? There should be logs under > `logs/functions/<tenant>/<namespace>/<function>`. > > I think the problem comes from using an array as the generic type. It will > fail on resolving array type when you are using 2.0 release. > There is a fix coming in 2.1 release to support array as the generic type. > https://github.com/apache/incubator-pulsar/pull/1837 > The 2.1 release would come out in 1~2 weeks or so. > > - Sijie > > On Tue, Jul 3, 2018 at 10:12 AM Schaffert, Lowell < > lowell.schaff...@lmco.com<mailto:lowell.schaff...@lmco.com>> wrote: > I have created two functions, can anyone explain why function 2 will not > run? > > public class Function1 implements Function<String, Void> { > // where to publish to > private static final String TOPIC_NAME = "fun-str-topic"; > @Override > public Void process(String input, Context context) { > Logger LOGGER = context.getLogger(); > > LOGGER.info("Function1 received message {}", input); > context.publish(TOPIC_NAME, "Function1-" + input); > return null; > } > } > > public class Function2 implements Function<byte[], Void> { > // where to publish to > private static final String TOPIC_NAME = "fun-str-topic"; > @Override > public Void process(byte[] bytes, Context context) { > Logger LOGGER = context.getLogger(); > LOGGER.error("Function2 received message"); > return null; > } > } > > [apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions create \ > > --jar > /hdd/lowell/workspace3/pulsarsubdemo/pulsar-function/target/pulsar-function-1.0-SNAPSHOT-jar-with-dependencies.jar > \ > > --className com.demo.pulsar.Function1 \ > > --tenant public \ > > --namespace default \ > > --name function1 \ > > --inputs fun-test-topic \ > > --logTopic function-log-topic > "Created successfully" > > [apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions create \ > > --jar > /hdd/lowell/workspace3/pulsarsubdemo/pulsar-function/target/pulsar-function-1.0-SNAPSHOT-jar-with-dependencies.jar > \ > > --className com.demo.pulsar.Function2 \ > > --tenant public \ > > --namespace default \ > > --name function2 \ > > --inputs bin-fun-test-topic \ > > --logTopic function-log-topic > "Created successfully" > > [apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions getstatus \ > > --name function1 \ > > --namespace default \ > > --tenant public > { > "functionStatusList": [ > { > "running": true, > "instanceId": "0" > } > ] > } > > [apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions getstatus > --name function2 --namespace default --tenant public > { > "functionStatusList": [ > { > "failureException": "UNAVAILABLE", > "instanceId": "0" > } > ] > } > > > Thanks, > > Lowell > > >