That makes sense, I will give it a try.

Lowell






________________________________
From: Sijie Guo <guosi...@gmail.com>
Sent: Tuesday, July 3, 2018 2:33 PM
To: Schaffert, Lowell (US); users@pulsar.incubator.apache.org
Subject: Re: EXTERNAL: Re: Pulsar functions

Hi Schaffert,

Ah, I think the problem comes from the confusion between `--inputs` and 
`--customSerdeInputs`. These two flags are different, `--input` accepts the 
list of topics which use DefaultSerDe, `--customSerdeInputs` accepts the list 
of topics with customized SerDe.

The total inputs of a function is the combination of `--input` and 
`--customSerdeInputs`. so if you are using custom serde, you just need 
`--customSerdeInputs`, you shouldn't specify the topic using custom serde in 
`--inputs`. so if you remove `--inputs test-topic`, your function should work.

- Sijie

On Tue, Jul 3, 2018 at 11:56 AM Schaffert, Lowell 
<lowell.schaff...@lmco.com<mailto:lowell.schaff...@lmco.com>> wrote:
Basically using a byte[] was my backup to deal with the problem:

[apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions create \
> --jar 
> /hdd/lowell/workspace3/pulsarsubdemo/pulsar-function/target/pulsar-function-1.0-SNAPSHOT-jar-with-dependencies.jar
>  \
> --className com.demo.pulsar.Function1a \
> --tenant public \
> --namespace default \
> --name function1a \
> --inputs test-topic \
> --customSerdeInputs 
> {"test-topic":"com.demo.pulsar.datatypes.DataWrapperSerde"} \
> --outputSerdeClassName com.demo.pulsar.datatypes.DataWrapperSerde \
> --logTopic function-log-topic
java.lang.RuntimeException: Default Serializer does not support type class 
com.demo.pulsar.datatypes.DataWrapper
    at 
org.apache.pulsar.admin.cli.CmdFunctions$FunctionDetailsCommand.lambda$doJavaSubmitChecks$2(CmdFunctions.java:423)
    at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)
    at 
org.apache.pulsar.admin.cli.CmdFunctions$FunctionDetailsCommand.doJavaSubmitChecks(CmdFunctions.java:421)
    at 
org.apache.pulsar.admin.cli.CmdFunctions$FunctionDetailsCommand.processArguments(CmdFunctions.java:284)
    at 
org.apache.pulsar.admin.cli.CmdFunctions$BaseCommand.run(CmdFunctions.java:104)
    at org.apache.pulsar.admin.cli.CmdBase.run(CmdBase.java:61)
    at org.apache.pulsar.admin.cli.PulsarAdminTool.run(PulsarAdminTool.java:167)
    at 
org.apache.pulsar.admin.cli.PulsarAdminTool.main(PulsarAdminTool.java:217)

public class Function1a implements Function<DataWrapper, Void> {
  // where to publish to
  private static final String TOPIC_NAME = "fun-str-topic";
  @Override
  public Void process(DataWrapper dataWrapper, Context context)  {
    Logger LOGGER = context.getLogger();

    LOGGER.info("Function1a received message");
    String message = dataWrapper.getPayload().getMessage();
    context.publish(TOPIC_NAME, "Function1a-" + message);
    return null;
  }
}

The log entry:

09:50:02,590 INFO [public/default/Function1] [instance-0] JavaInstanceRunnable 
- Starting Java Instance Function1
09:50:02,601 INFO [public/default/Function1] [instance-0] JavaInstanceRunnable 
- Loading JAR files for function InstanceConfig(instanceId=0, 
functionId=46b384ce-bbbb-4afe-88d3-a7a31f625677, 
functionVersion=4ed44d36-dd75-4d74-bca1-054f90e332a1, functionDetails=tenant: 
"public"
namespace: "default"
name: "Function1"
className: "com.demo.pulsar.Function1"
logTopic: "log-topic"
autoAck: true
source {
  topicsToSerDeClassName {
    key: "test-topic"
    value: ""
  }
}
sink {
  topic: "test-topic-Function1-output"
}
, maxBufferedTuples=1024) from jarFile 
/hdd/lowell/workspace3/pulsarsubdemo/pulsar-function/target/pulsar-function-1.0-SNAPSHOT-jar-with-dependencies.jar
09:50:02,614 INFO [public/default/Function1] [instance-0] JavaInstanceRunnable 
- Initialize function class loader for function Function1 at function cache 
manager
09:50:02,626 INFO [public/default/Function1] [instance-0] MessageProcessorBase 
- Starting producer for output topic test-topic-Function1-output
09:50:03,017 INFO [public/default/Function1] [instance-0] MessageProcessorBase 
- Error occurred executing open for source:
java.lang.ClassNotFoundException: [B
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_171]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_171]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_171]
    at 
org.apache.pulsar.functions.source.PulsarSource.setupSerde(PulsarSource.java:131)
 ~[java-instance.jar:2.0.1-incubating]
    at 
org.apache.pulsar.functions.source.PulsarSource.open(PulsarSource.java:58) 
~[java-instance.jar:2.0.1-incubating]
    at 
org.apache.pulsar.functions.instance.processors.MessageProcessorBase.setupInput(MessageProcessorBase.java:103)
 [java-instance.jar:2.0.1-incubating]
    at 
org.apache.pulsar.functions.instance.processors.AtLeastOnceProcessor.setupInput(AtLeastOnceProcessor.java:36)
 [java-instance.jar:2.0.1-incubating]
    at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupJavaInstance(JavaInstanceRunnable.java:158)
 [java-instance.jar:2.0.1-incubating]
    at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:171)
 [java-instance.jar:2.0.1-incubating]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
09:50:03,054 ERROR [public/default/Function1] [instance-0] JavaInstanceRunnable 
- Uncaught exception in Java Instance
java.lang.NullPointerException: null
    at 
org.apache.pulsar.functions.source.PulsarSource.read(PulsarSource.java:71) 
~[java-instance.jar:2.0.1-incubating]
    at 
org.apache.pulsar.functions.instance.processors.MessageProcessorBase.recieveMessage(MessageProcessorBase.java:112)
 ~[java-instance.jar:2.0.1-incubating]
    at 
org.apache.pulsar.functions.instance.processors.AtLeastOnceProcessor.recieveMessage(AtLeastOnceProcessor.java:36)
 ~[java-instance.jar:2.0.1-incubating]
    at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:174)
 [java-instance.jar:2.0.1-incubating]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
09:50:03,055 WARN [public/default/Function1] [instance-0] MessageProcessorBase 
- Failed to close source 
org.apache.pulsar.functions.source.PulsarSource@13e23117
java.lang.NullPointerException: null
    at 
org.apache.pulsar.functions.source.PulsarSource.close(PulsarSource.java:126) 
~[java-instance.jar:2.0.1-incubating]
    at 
org.apache.pulsar.functions.instance.processors.MessageProcessorBase.close(MessageProcessorBase.java:152)
 [java-instance.jar:2.0.1-incubating]
    at 
org.apache.pulsar.functions.instance.processors.AtLeastOnceProcessor.close(AtLeastOnceProcessor.java:68)
 [java-instance.jar:2.0.1-incubating]
    at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.close(JavaInstanceRunnable.java:346)
 [java-instance.jar:2.0.1-incubating]
    at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:230)
 [java-instance.jar:2.0.1-incubating]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]


Lowell Schaffert



________________________________
From: Sijie Guo <guosi...@gmail.com<mailto:guosi...@gmail.com>>
Sent: Tuesday, July 3, 2018 12:18:00 PM
To: users@pulsar.incubator.apache.org<mailto:users@pulsar.incubator.apache.org>
Subject: EXTERNAL: Re: Pulsar functions

Can you check the logs in the broker? There should be logs under 
`logs/functions/<tenant>/<namespace>/<function>`.

I think the problem comes from using an array as the generic type. It will fail 
on resolving array type when you are using 2.0 release.
There is a fix coming in 2.1 release to support array as the generic type. 
https://github.com/apache/incubator-pulsar/pull/1837
The 2.1 release would come out in 1~2 weeks or so.

- Sijie

On Tue, Jul 3, 2018 at 10:12 AM Schaffert, Lowell 
<lowell.schaff...@lmco.com<mailto:lowell.schaff...@lmco.com><mailto:lowell.schaff...@lmco.com<mailto:lowell.schaff...@lmco.com>>>
 wrote:
I have created two functions, can anyone explain why function 2 will not run?

public class Function1 implements Function<String, Void> {
  // where to publish to
  private static final String TOPIC_NAME = "fun-str-topic";
  @Override
  public Void process(String input, Context context) {
    Logger LOGGER = context.getLogger();

    LOGGER.info("Function1 received message {}", input);
    context.publish(TOPIC_NAME, "Function1-" + input);
    return null;
  }
}

public class Function2 implements Function<byte[], Void> {
  // where to publish to
  private static final String TOPIC_NAME = "fun-str-topic";
  @Override
  public Void process(byte[] bytes, Context context) {
    Logger LOGGER = context.getLogger();
    LOGGER.error("Function2 received message");
    return null;
  }
}

[apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions create \
> --jar 
> /hdd/lowell/workspace3/pulsarsubdemo/pulsar-function/target/pulsar-function-1.0-SNAPSHOT-jar-with-dependencies.jar
>  \
> --className com.demo.pulsar.Function1 \
> --tenant public \
> --namespace default \
> --name function1 \
> --inputs fun-test-topic \
> --logTopic function-log-topic
"Created successfully"

[apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions create \
> --jar 
> /hdd/lowell/workspace3/pulsarsubdemo/pulsar-function/target/pulsar-function-1.0-SNAPSHOT-jar-with-dependencies.jar
>  \
> --className com.demo.pulsar.Function2 \
> --tenant public \
> --namespace default \
> --name function2 \
> --inputs bin-fun-test-topic \
> --logTopic function-log-topic
"Created successfully"

[apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions getstatus \
> --name function1 \
> --namespace default \
> --tenant public
{
  "functionStatusList": [
    {
      "running": true,
      "instanceId": "0"
    }
  ]
}

[apache-pulsar-2.0.1-incubating]$ ./bin/pulsar-admin functions getstatus --name 
function2 --namespace default --tenant public
{
  "functionStatusList": [
    {
      "failureException": "UNAVAILABLE",
      "instanceId": "0"
    }
  ]
}


Thanks,

Lowell


Reply via email to