Huge thank you! From: Ted Yu <yuzhih...@gmail.com> Date: Monday, June 19, 2017 at 9:19 PM To: Philip Doctor <philip.doc...@physiq.com> Cc: "user@flink.apache.org" <user@flink.apache.org> Subject: Re: Possible Data Corruption?
See this thread: http://search-hadoop.com/m/Flink/VkLeQm2nZm1Wa7Ny1?subj=Re+Painful+KryoException+java+lang+IndexOutOfBoundsException+on+Flink+Batch+Api+scala which mentioned FLINK-6398<https://issues.apache.org/jira/browse/FLINK-6398> fixed in 1.2.2 / 1.3 On Mon, Jun 19, 2017 at 5:53 PM, Philip Doctor <philip.doc...@physiq.com<mailto:philip.doc...@physiq.com>> wrote: Dear Flink Users, I have a Flink (v1.2.1) process I left running for the last five days. It aggregates a bit of state and exposes it via Queryable State. It ran correctly for the first 3 days. There were no code changes or data changes, but suddenly Queryable State got weird. The process logs the current value of the queryable state, and from the logs I discerned that the state was correctly being aggregated. However they Queryable State that was returned was unable to be deserialized. Rather than the list of longs I expect, instead I get 2 bytes (0x 57 02). It seemed quite clear that the state in the Task Manager was not the state I was getting out of Queryable State. I next reasoned that my data was being check pointed and possibly I could restore. So I restarted the process to recover from a check point. At this point the process fails with the following error java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IndexOutOfBoundsException: Index: 28, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.get(ArrayList.java:429) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:231) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:788) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:284) ... 6 more This looks to me like Flink has serialized out state incorrectly. I was running Flink 1.2.1, I upgraded to Flink 1.3 after this happened so I could manually set the Kafka partition offset, I backed it up 5 days to replay all the data and now everything is working fine again. However I’m more than a little worried. Was there a serialization bug fixed in 1.3 ? I don’t believe there’s anything in my code that could be causing such an issue, but is there something in my jobs that could make something like this happen? Is this a known bug? The fact that it not only results in bad data in the query but appears to take down my disaster recovery plan makes me a bit nervous here. Thanks for your time, Phil