Hi,
I'm afraid you found a bug. I created a Jira Issue:
https://issues.apache.org/jira/browse/FLINK-3760. I already have a fix and
hope we'll get it in the 1.0.2 release that we are just about to release.

Cheers,
Aljoscha

On Wed, 13 Apr 2016 at 07:25 Hironori Ogibayashi <ogibaya...@gmail.com>
wrote:

> Hello,
>
> I am trying to use HyperLogLog in
> stream-lib(https://github.com/addthis/stream-lib)
> in my Flink streaming job, but when I submit the job, I got the
> following error. My Flink version is 1.0.1.
>
> ---
> org.apache.flink.client.program.ProgramInvocationException: The
> program execution failed: Job execution failed.
> (...)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> (...)
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot instantiate user function.
>         at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:209)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:186)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Read size does not match expected
> size.
>         at
> org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:291)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>         at
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
>         at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.readObject(WindowOperator.java:182)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
>         at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
>         at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:194)
>         ... 3 more
> ----
>
> It looks like the serialize/deserialize problem. Length written by
> StateDescriptor.writeObject() and the actual length in readObject()
> differs? I have no idea why this happens.
>
> The code is this:
>
> ---
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>
>     val stream = env.fromElements((1,"a"),(1,"b"),(1,"c"),(1,"d"))
>     stream
>       .keyBy(0)
>       .countWindow(3)
>       .fold(new HyperLogLog(20)){(r,i) => r.offer(i._2); r}
>       .map{x => x.cardinality()}
>
>     stream.print
>
>     env.execute("HLLTest")
> ---
>
> Any help would be appreciated.
>
> Regards,
> Hionori
>

Reply via email to