Great to hear your problem is addressed.

we have some problems on packaging pulsar connectors for 2.1. hopefully we
can address them by next week. then 2.1 will be released in 2 weeks or so.

- Sijie

On Thu, Jul 5, 2018 at 1:57 PM Schaffert, Lowell <lowell.schaff...@lmco.com>
wrote:

> That change to the config worked like a charm. I do have a use case for
> using byte [] as the message type in a Function. I will be looking for to
> the 2.1 release which will support this.
>
>
>
> Lowell
>
>
>
> *From:* Sijie Guo <guosi...@gmail.com>
> *Sent:* Tuesday, July 3, 2018 2:33 PM
> *To:* Schaffert, Lowell (US) <lowell.schaff...@lmco.com>;
> users@pulsar.incubator.apache.org
> *Subject:* Re: EXTERNAL: Re: Pulsar functions
>
>
>
> Hi Schaffert,
>
>
>
> Ah, I think the problem comes from the confusion between `--inputs` and
> `--customSerdeInputs`. These two flags are different, `--input` accepts the
> list of topics which use DefaultSerDe, `--customSerdeInputs` accepts the
> list of topics with customized SerDe.
>
>
>
> The total inputs of a function is the combination of `--input` and
> `--customSerdeInputs`. so if you are using custom serde, you just need
> `--customSerdeInputs`, you shouldn't specify the topic using custom serde
> in `--inputs`. so if you remove `--inputs test-topic`, your function should
> work.
>
>
>
> - Sijie
>
> On Tue, Jul 3, 2018 at 11:56 AM Schaffert, Lowell <
> lowell.schaff...@lmco.com> wrote:
>
> Basically using a byte[] was my backup to deal with the problem:
>
> [apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions create \
> > --jar
> /hdd/lowell/workspace3/pulsarsubdemo/pulsar-function/target/pulsar-function-1.0-SNAPSHOT-jar-with-dependencies.jar
> \
> > --className com.demo.pulsar.Function1a \
> > --tenant public \
> > --namespace default \
> > --name function1a \
> > --inputs test-topic \
> > --customSerdeInputs
> {"test-topic":"com.demo.pulsar.datatypes.DataWrapperSerde"} \
> > --outputSerdeClassName com.demo.pulsar.datatypes.DataWrapperSerde \
> > --logTopic function-log-topic
> java.lang.RuntimeException: Default Serializer does not support type class
> com.demo.pulsar.datatypes.DataWrapper
>     at
> org.apache.pulsar.admin.cli.CmdFunctions$FunctionDetailsCommand.lambda$doJavaSubmitChecks$2(CmdFunctions.java:423)
>     at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)
>     at
> org.apache.pulsar.admin.cli.CmdFunctions$FunctionDetailsCommand.doJavaSubmitChecks(CmdFunctions.java:421)
>     at
> org.apache.pulsar.admin.cli.CmdFunctions$FunctionDetailsCommand.processArguments(CmdFunctions.java:284)
>     at
> org.apache.pulsar.admin.cli.CmdFunctions$BaseCommand.run(CmdFunctions.java:104)
>     at org.apache.pulsar.admin.cli.CmdBase.run(CmdBase.java:61)
>     at
> org.apache.pulsar.admin.cli.PulsarAdminTool.run(PulsarAdminTool.java:167)
>     at
> org.apache.pulsar.admin.cli.PulsarAdminTool.main(PulsarAdminTool.java:217)
>
> public class Function1a implements Function<DataWrapper, Void> {
>   // where to publish to
>   private static final String TOPIC_NAME = "fun-str-topic";
>   @Override
>   public Void process(DataWrapper dataWrapper, Context context)  {
>     Logger LOGGER = context.getLogger();
>
>     LOGGER.info("Function1a received message");
>     String message = dataWrapper.getPayload().getMessage();
>     context.publish(TOPIC_NAME, "Function1a-" + message);
>     return null;
>   }
> }
>
> The log entry:
>
> 09:50:02,590 INFO [public/default/Function1] [instance-0]
> JavaInstanceRunnable - Starting Java Instance Function1
> 09:50:02,601 INFO [public/default/Function1] [instance-0]
> JavaInstanceRunnable - Loading JAR files for function
> InstanceConfig(instanceId=0,
> functionId=46b384ce-bbbb-4afe-88d3-a7a31f625677,
> functionVersion=4ed44d36-dd75-4d74-bca1-054f90e332a1,
> functionDetails=tenant: "public"
> namespace: "default"
> name: "Function1"
> className: "com.demo.pulsar.Function1"
> logTopic: "log-topic"
> autoAck: true
> source {
>   topicsToSerDeClassName {
>     key: "test-topic"
>     value: ""
>   }
> }
> sink {
>   topic: "test-topic-Function1-output"
> }
> , maxBufferedTuples=1024) from jarFile
> /hdd/lowell/workspace3/pulsarsubdemo/pulsar-function/target/pulsar-function-1.0-SNAPSHOT-jar-with-dependencies.jar
> 09:50:02,614 INFO [public/default/Function1] [instance-0]
> JavaInstanceRunnable - Initialize function class loader for function
> Function1 at function cache manager
> 09:50:02,626 INFO [public/default/Function1] [instance-0]
> MessageProcessorBase - Starting producer for output topic
> test-topic-Function1-output
> 09:50:03,017 INFO [public/default/Function1] [instance-0]
> MessageProcessorBase - Error occurred executing open for source:
> java.lang.ClassNotFoundException: [B
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> ~[?:1.8.0_171]
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_171]
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_171]
>     at
> org.apache.pulsar.functions.source.PulsarSource.setupSerde(PulsarSource.java:131)
> ~[java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.source.PulsarSource.open(PulsarSource.java:58)
> ~[java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.processors.MessageProcessorBase.setupInput(MessageProcessorBase.java:103)
> [java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.processors.AtLeastOnceProcessor.setupInput(AtLeastOnceProcessor.java:36)
> [java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupJavaInstance(JavaInstanceRunnable.java:158)
> [java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:171)
> [java-instance.jar:2.0.1-incubating]
>     at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
> 09:50:03,054 ERROR [public/default/Function1] [instance-0]
> JavaInstanceRunnable - Uncaught exception in Java Instance
> java.lang.NullPointerException: null
>     at
> org.apache.pulsar.functions.source.PulsarSource.read(PulsarSource.java:71)
> ~[java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.processors.MessageProcessorBase.recieveMessage(MessageProcessorBase.java:112)
> ~[java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.processors.AtLeastOnceProcessor.recieveMessage(AtLeastOnceProcessor.java:36)
> ~[java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:174)
> [java-instance.jar:2.0.1-incubating]
>     at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
> 09:50:03,055 WARN [public/default/Function1] [instance-0]
> MessageProcessorBase - Failed to close source
> org.apache.pulsar.functions.source.PulsarSource@13e23117
> java.lang.NullPointerException: null
>     at
> org.apache.pulsar.functions.source.PulsarSource.close(PulsarSource.java:126)
> ~[java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.processors.MessageProcessorBase.close(MessageProcessorBase.java:152)
> [java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.processors.AtLeastOnceProcessor.close(AtLeastOnceProcessor.java:68)
> [java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.JavaInstanceRunnable.close(JavaInstanceRunnable.java:346)
> [java-instance.jar:2.0.1-incubating]
>     at
> org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:230)
> [java-instance.jar:2.0.1-incubating]
>     at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
>
>
> Lowell Schaffert
>
>
>
> ________________________________
> From: Sijie Guo <guosi...@gmail.com>
> Sent: Tuesday, July 3, 2018 12:18:00 PM
> To: users@pulsar.incubator.apache.org
> Subject: EXTERNAL: Re: Pulsar functions
>
> Can you check the logs in the broker? There should be logs under
> `logs/functions/<tenant>/<namespace>/<function>`.
>
> I think the problem comes from using an array as the generic type. It will
> fail on resolving array type when you are using 2.0 release.
> There is a fix coming in 2.1 release to support array as the generic type.
> https://github.com/apache/incubator-pulsar/pull/1837
> The 2.1 release would come out in 1~2 weeks or so.
>
> - Sijie
>
> On Tue, Jul 3, 2018 at 10:12 AM Schaffert, Lowell <
> lowell.schaff...@lmco.com<mailto:lowell.schaff...@lmco.com>> wrote:
> I have created two functions, can anyone explain why function 2 will not
> run?
>
> public class Function1 implements Function<String, Void> {
>   // where to publish to
>   private static final String TOPIC_NAME = "fun-str-topic";
>   @Override
>   public Void process(String input, Context context) {
>     Logger LOGGER = context.getLogger();
>
>     LOGGER.info("Function1 received message {}", input);
>     context.publish(TOPIC_NAME, "Function1-" + input);
>     return null;
>   }
> }
>
> public class Function2 implements Function<byte[], Void> {
>   // where to publish to
>   private static final String TOPIC_NAME = "fun-str-topic";
>   @Override
>   public Void process(byte[] bytes, Context context) {
>     Logger LOGGER = context.getLogger();
>     LOGGER.error("Function2 received message");
>     return null;
>   }
> }
>
> [apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions create \
> > --jar
> /hdd/lowell/workspace3/pulsarsubdemo/pulsar-function/target/pulsar-function-1.0-SNAPSHOT-jar-with-dependencies.jar
> \
> > --className com.demo.pulsar.Function1 \
> > --tenant public \
> > --namespace default \
> > --name function1 \
> > --inputs fun-test-topic \
> > --logTopic function-log-topic
> "Created successfully"
>
> [apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions create \
> > --jar
> /hdd/lowell/workspace3/pulsarsubdemo/pulsar-function/target/pulsar-function-1.0-SNAPSHOT-jar-with-dependencies.jar
> \
> > --className com.demo.pulsar.Function2 \
> > --tenant public \
> > --namespace default \
> > --name function2 \
> > --inputs bin-fun-test-topic \
> > --logTopic function-log-topic
> "Created successfully"
>
> [apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions getstatus \
> > --name function1 \
> > --namespace default \
> > --tenant public
> {
>   "functionStatusList": [
>     {
>       "running": true,
>       "instanceId": "0"
>     }
>   ]
> }
>
> [apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions getstatus
> --name function2 --namespace default --tenant public
> {
>   "functionStatusList": [
>     {
>       "failureException": "UNAVAILABLE",
>       "instanceId": "0"
>     }
>   ]
> }
>
>
> Thanks,
>
> Lowell
>
>

Reply via email to