Hello,
We're using Flink version 1.4.2.
Our Flink job runs pretty well most of the time. But sometimes we see
exceptions in the Kryo serializer.
The timing on when the exceptions would occur seems pretty random.
Sometimes we don't see any exceptions for 5 days. Sometimes we get exceptions
within hours.
I have captured the stack traces of the last 3 times that the exceptions
occurred. They are not exactly the same. The commonality is that our code in
onTimer() is triggered as part of watermark handling. Our code then tried to
get from the copy-on-write state table and eventually exception occurred in
Kryo.
Has anyone seen something like that before?
Thanks,
[ Exception #1 ]
java.lang.RuntimeException: Exception occurred while processing valve output
watermark:
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:400)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:252)
at
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.IndexOutOfBoundsException: Index: 3, Size: 0
Serialization trace:
timestamp (com.mycompany.datascience.datatypes.SensorStateEvent)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
at scala.collection.immutable.List.foreach(List.scala:392)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
at
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
at
com.mycompany.datascience.scala.pushnotifier.operators.UserStateSensorStateJoin.onTimer(UserStateSensorStateJoin.scala:145)
at
org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.onEventTime(KeyedCoProcessOperator.java:95)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:912)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:397)
... 7 more
Caused by: java.lang.IndexOutOfBoundsException: Index: 3, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.set(ArrayList.java:444)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:524)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
... 27 more
[ Exception #2 ]
java.lang.RuntimeException: Exception occurred while processing valve output
watermark:
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:400)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:252)
at
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class:
SerialNumber6H0G025
Serialization trace:
sensorState (com.mycompany.datascience.datatypes.SensorStateEvent)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
at scala.collection.immutable.List.foreach(List.scala:392)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
at
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
at
com.mycompany.datascience.scala.pushnotifier.operators.UserStateSensorStateJoin.onTimer(UserStateSensorStateJoin.scala:145)
at
org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.onEventTime(KeyedCoProcessOperator.java:95)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:912)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:397)
... 7 more
Caused by: java.lang.ClassNotFoundException: SerialNumber6H0G025
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
... 30 more
[ Exception #3 ]
java.lang.RuntimeException: Exception occurred while processing valve output
watermark:
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:400)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:252)
at
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
at scala.collection.immutable.List.foreach(List.scala:392)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
at
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
at
com.mycompany.datascience.scala.pushnotifier.operators.UserStateSensorStateJoin.onTimer(UserStateSensorStateJoin.scala:145)
at
org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.onEventTime(KeyedCoProcessOperator.java:95)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:912)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:397)
... 7 more