Hi, I'm afraid you found a bug. I created a Jira Issue: https://issues.apache.org/jira/browse/FLINK-3760. I already have a fix and hope we'll get it in the 1.0.2 release that we are just about to release.
Cheers, Aljoscha On Wed, 13 Apr 2016 at 07:25 Hironori Ogibayashi <ogibaya...@gmail.com> wrote: > Hello, > > I am trying to use HyperLogLog in > stream-lib(https://github.com/addthis/stream-lib) > in my Flink streaming job, but when I submit the job, I got the > following error. My Flink version is 1.0.1. > > --- > org.apache.flink.client.program.ProgramInvocationException: The > program execution failed: Job execution failed. > (...) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > (...) > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: > Cannot instantiate user function. > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:209) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:186) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Read size does not match expected > size. > at > org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:291) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > at > java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.readObject(WindowOperator.java:182) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:194) > ... 3 more > ---- > > It looks like the serialize/deserialize problem. Length written by > StateDescriptor.writeObject() and the actual length in readObject() > differs? I have no idea why this happens. > > The code is this: > > --- > val env = StreamExecutionEnvironment.getExecutionEnvironment > > val stream = env.fromElements((1,"a"),(1,"b"),(1,"c"),(1,"d")) > stream > .keyBy(0) > .countWindow(3) > .fold(new HyperLogLog(20)){(r,i) => r.offer(i._2); r} > .map{x => x.cardinality()} > > stream.print > > env.execute("HLLTest") > --- > > Any help would be appreciated. > > Regards, > Hionori >