Hello, We're using Flink version 1.4.2. Our Flink job runs pretty well most of the time. But sometimes we see exceptions in the Kryo serializer. The timing on when the exceptions would occur seems pretty random. Sometimes we don't see any exceptions for 5 days. Sometimes we get exceptions within hours. I have captured the stack traces of the last 3 times that the exceptions occurred. They are not exactly the same. The commonality is that our code in onTimer() is triggered as part of watermark handling. Our code then tried to get from the copy-on-write state table and eventually exception occurred in Kryo.
Has anyone seen something like that before? Thanks, [ Exception #1 ] java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:400) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:252) at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 3, Size: 0 Serialization trace: timestamp (com.mycompany.datascience.datatypes.SensorStateEvent) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189) at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306) at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55) at com.mycompany.datascience.scala.pushnotifier.operators.UserStateSensorStateJoin.onTimer(UserStateSensorStateJoin.scala:145) at org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.onEventTime(KeyedCoProcessOperator.java:95) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:912) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:397) ... 7 more Caused by: java.lang.IndexOutOfBoundsException: Index: 3, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.set(ArrayList.java:444) at com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38) at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:524) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113) ... 27 more [ Exception #2 ] java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:400) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:252) at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: SerialNumber6H0G025 Serialization trace: sensorState (com.mycompany.datascience.datatypes.SensorStateEvent) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189) at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306) at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55) at com.mycompany.datascience.scala.pushnotifier.operators.UserStateSensorStateJoin.onTimer(UserStateSensorStateJoin.scala:145) at org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.onEventTime(KeyedCoProcessOperator.java:95) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:912) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:397) ... 7 more Caused by: java.lang.ClassNotFoundException: SerialNumber6H0G025 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ... 30 more [ Exception #3 ] java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:400) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:252) at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625) at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175) at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306) at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55) at com.mycompany.datascience.scala.pushnotifier.operators.UserStateSensorStateJoin.onTimer(UserStateSensorStateJoin.scala:145) at org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.onEventTime(KeyedCoProcessOperator.java:95) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:912) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:397) ... 7 more