Hi Schaffert,

Ah, I think the problem comes from the confusion between `--inputs` and
`--customSerdeInputs`. These two flags are different, `--input` accepts the
list of topics which use DefaultSerDe, `--customSerdeInputs` accepts the
list of topics with customized SerDe.

The total inputs of a function is the combination of `--input` and
`--customSerdeInputs`. so if you are using custom serde, you just need
`--customSerdeInputs`, you shouldn't specify the topic using custom serde
in `--inputs`. so if you remove `--inputs test-topic`, your function should
work.

- Sijie

On Tue, Jul 3, 2018 at 11:56 AM Schaffert, Lowell <lowell.schaff...@lmco.com>
wrote:

> Basically using a byte[] was my backup to deal with the problem:
>
> [apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions create \
> > --jar
> /hdd/lowell/workspace3/pulsarsubdemo/pulsar-function/target/pulsar-function-1.0-SNAPSHOT-jar-with-dependencies.jar
> \
> > --className com.demo.pulsar.Function1a \
> > --tenant public \
> > --namespace default \
> > --name function1a \
> > --inputs test-topic \
> > --customSerdeInputs
> {"test-topic":"com.demo.pulsar.datatypes.DataWrapperSerde"} \
> > --outputSerdeClassName com.demo.pulsar.datatypes.DataWrapperSerde \
> > --logTopic function-log-topic
> java.lang.RuntimeException: Default Serializer does not support type class
> com.demo.pulsar.datatypes.DataWrapper
>     at
> org.apache.pulsar.admin.cli.CmdFunctions$FunctionDetailsCommand.lambda$doJavaSubmitChecks$2(CmdFunctions.java:423)
>     at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)
>     at
> org.apache.pulsar.admin.cli.CmdFunctions$FunctionDetailsCommand.doJavaSubmitChecks(CmdFunctions.java:421)
>     at
> org.apache.pulsar.admin.cli.CmdFunctions$FunctionDetailsCommand.processArguments(CmdFunctions.java:284)
>     at
> org.apache.pulsar.admin.cli.CmdFunctions$BaseCommand.run(CmdFunctions.java:104)
>     at org.apache.pulsar.admin.cli.CmdBase.run(CmdBase.java:61)
>     at
> org.apache.pulsar.admin.cli.PulsarAdminTool.run(PulsarAdminTool.java:167)
>     at
> org.apache.pulsar.admin.cli.PulsarAdminTool.main(PulsarAdminTool.java:217)
>
> public class Function1a implements Function<DataWrapper, Void> {
>   // where to publish to
>   private static final String TOPIC_NAME = "fun-str-topic";
>   @Override
>   public Void process(DataWrapper dataWrapper, Context context)  {
>     Logger LOGGER = context.getLogger();
>
>     LOGGER.info("Function1a received message");
>     String message = dataWrapper.getPayload().getMessage();
>     context.publish(TOPIC_NAME, "Function1a-" + message);
>     return null;
>   }
> }
>
> The log entry:
>
> 09:50:02,590 INFO [public/default/Function1] [instance-0]
> JavaInstanceRunnable - Starting Java Instance Function1
> 09:50:02,601 INFO [public/default/Function1] [instance-0]
> JavaInstanceRunnable - Loading JAR files for function
> InstanceConfig(instanceId=0,
> functionId=46b384ce-bbbb-4afe-88d3-a7a31f625677,
> functionVersion=4ed44d36-dd75-4d74-bca1-054f90e332a1,
> functionDetails=tenant: "public"
> namespace: "default"
> name: "Function1"
> className: "com.demo.pulsar.Function1"
> logTopic: "log-topic"
> autoAck: true
> source {
>   topicsToSerDeClassName {
>     key: "test-topic"
>     value: ""
>   }
> }
> sink {
>   topic: "test-topic-Function1-output"
> }
> , maxBufferedTuples=1024) from jarFile
> /hdd/lowell/workspace3/pulsarsubdemo/pulsar-function/target/pulsar-function-1.0-SNAPSHOT-jar-with-dependencies.jar
> 09:50:02,614 INFO [public/default/Function1] [instance-0]
> JavaInstanceRunnable - Initialize function class loader for function
> Function1 at function cache manager
> 09:50:02,626 INFO [public/default/Function1] [instance-0]
> MessageProcessorBase - Starting producer for output topic
> test-topic-Function1-output
> 09:50:03,017 INFO [public/default/Function1] [instance-0]
> MessageProcessorBase - Error occurred executing open for source:
> java.lang.ClassNotFoundException: [B
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> ~[?:1.8.0_171]
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_171]
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_171]
>     at
> org.apache.pulsar.functions.source.PulsarSource.setupSerde(PulsarSource.java:131)
> ~[java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.source.PulsarSource.open(PulsarSource.java:58)
> ~[java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.processors.MessageProcessorBase.setupInput(MessageProcessorBase.java:103)
> [java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.processors.AtLeastOnceProcessor.setupInput(AtLeastOnceProcessor.java:36)
> [java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupJavaInstance(JavaInstanceRunnable.java:158)
> [java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:171)
> [java-instance.jar:2.0.1-incubating]
>     at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
> 09:50:03,054 ERROR [public/default/Function1] [instance-0]
> JavaInstanceRunnable - Uncaught exception in Java Instance
> java.lang.NullPointerException: null
>     at
> org.apache.pulsar.functions.source.PulsarSource.read(PulsarSource.java:71)
> ~[java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.processors.MessageProcessorBase.recieveMessage(MessageProcessorBase.java:112)
> ~[java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.processors.AtLeastOnceProcessor.recieveMessage(AtLeastOnceProcessor.java:36)
> ~[java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:174)
> [java-instance.jar:2.0.1-incubating]
>     at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
> 09:50:03,055 WARN [public/default/Function1] [instance-0]
> MessageProcessorBase - Failed to close source
> org.apache.pulsar.functions.source.PulsarSource@13e23117
> java.lang.NullPointerException: null
>     at
> org.apache.pulsar.functions.source.PulsarSource.close(PulsarSource.java:126)
> ~[java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.processors.MessageProcessorBase.close(MessageProcessorBase.java:152)
> [java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.processors.AtLeastOnceProcessor.close(AtLeastOnceProcessor.java:68)
> [java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.JavaInstanceRunnable.close(JavaInstanceRunnable.java:346)
> [java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:230)
> [java-instance.jar:2.0.1-incubating]
>     at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
>
>
> Lowell Schaffert
>
>
>
> ________________________________
> From: Sijie Guo <guosi...@gmail.com>
> Sent: Tuesday, July 3, 2018 12:18:00 PM
> To: users@pulsar.incubator.apache.org
> Subject: EXTERNAL: Re: Pulsar functions
>
> Can you check the logs in the broker? There should be logs under
> `logs/functions/<tenant>/<namespace>/<function>`.
>
> I think the problem comes from using an array as the generic type. It will
> fail on resolving array type when you are using 2.0 release.
> There is a fix coming in 2.1 release to support array as the generic type.
> https://github.com/apache/incubator-pulsar/pull/1837
> The 2.1 release would come out in 1~2 weeks or so.
>
> - Sijie
>
> On Tue, Jul 3, 2018 at 10:12 AM Schaffert, Lowell <
> lowell.schaff...@lmco.com<mailto:lowell.schaff...@lmco.com>> wrote:
> I have created two functions, can anyone explain why function 2 will not
> run?
>
> public class Function1 implements Function<String, Void> {
>   // where to publish to
>   private static final String TOPIC_NAME = "fun-str-topic";
>   @Override
>   public Void process(String input, Context context) {
>     Logger LOGGER = context.getLogger();
>
>     LOGGER.info("Function1 received message {}", input);
>     context.publish(TOPIC_NAME, "Function1-" + input);
>     return null;
>   }
> }
>
> public class Function2 implements Function<byte[], Void> {
>   // where to publish to
>   private static final String TOPIC_NAME = "fun-str-topic";
>   @Override
>   public Void process(byte[] bytes, Context context) {
>     Logger LOGGER = context.getLogger();
>     LOGGER.error("Function2 received message");
>     return null;
>   }
> }
>
> [apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions create \
> > --jar
> /hdd/lowell/workspace3/pulsarsubdemo/pulsar-function/target/pulsar-function-1.0-SNAPSHOT-jar-with-dependencies.jar
> \
> > --className com.demo.pulsar.Function1 \
> > --tenant public \
> > --namespace default \
> > --name function1 \
> > --inputs fun-test-topic \
> > --logTopic function-log-topic
> "Created successfully"
>
> [apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions create \
> > --jar
> /hdd/lowell/workspace3/pulsarsubdemo/pulsar-function/target/pulsar-function-1.0-SNAPSHOT-jar-with-dependencies.jar
> \
> > --className com.demo.pulsar.Function2 \
> > --tenant public \
> > --namespace default \
> > --name function2 \
> > --inputs bin-fun-test-topic \
> > --logTopic function-log-topic
> "Created successfully"
>
> [apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions getstatus \
> > --name function1 \
> > --namespace default \
> > --tenant public
> {
>   "functionStatusList": [
>     {
>       "running": true,
>       "instanceId": "0"
>     }
>   ]
> }
>
> [apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions getstatus
> --name function2 --namespace default --tenant public
> {
>   "functionStatusList": [
>     {
>       "failureException": "UNAVAILABLE",
>       "instanceId": "0"
>     }
>   ]
> }
>
>
> Thanks,
>
> Lowell
>
>
>

Reply via email to