Hi,
this looks like the bug "when duplicating a KryoSerializer does not duplicate 
registered default serializers", and this has been fixed on the branch master, 
1.5.0, and 1.4.x. But, unfortunately not included in 1.4.2(because this bug was 
discovered after 1.4.2 release). @Stefan plz correct me if I'm wrong.




Best, Sihua


On 05/25/2018 05:55,Ya-Te Wong<[email protected]> wrote:
Hello,

We're using Flink version 1.4.2.
Our Flink job runs pretty well most of the time. But sometimes we see 
exceptions in the Kryo serializer.
The timing on when the exceptions would occur seems pretty random.
Sometimes we don't see any exceptions for 5 days. Sometimes we get exceptions 
within hours.
I have captured the stack traces of the last 3 times that the exceptions 
occurred. They are not exactly the same. The commonality is that our code in 
onTimer() is triggered as part of watermark handling. Our code then tried to 
get from the copy-on-write state table and eventually exception occurred in 
Kryo.

Has anyone seen something like that before?

Thanks,



[ Exception #1 ]

java.lang.RuntimeException: Exception occurred while processing valve output 
watermark:
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:400)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:252)
at 
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.IndexOutOfBoundsException: Index: 3, Size: 0
Serialization trace:
timestamp (com.mycompany.datascience.datatypes.SensorStateEvent)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
at 
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
at 
com.mycompany.datascience.scala.pushnotifier.operators.UserStateSensorStateJoin.onTimer(UserStateSensorStateJoin.scala:145)
at 
org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.onEventTime(KeyedCoProcessOperator.java:95)
at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:912)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:397)
... 7 more
Caused by: java.lang.IndexOutOfBoundsException: Index: 3, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.set(ArrayList.java:444)
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:524)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
... 27 more



[ Exception #2 ]

java.lang.RuntimeException: Exception occurred while processing valve output 
watermark:
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:400)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:252)
at 
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 
SerialNumber6H0G025
Serialization trace:
sensorState (com.mycompany.datascience.datatypes.SensorStateEvent)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
at 
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
at 
com.mycompany.datascience.scala.pushnotifier.operators.UserStateSensorStateJoin.onTimer(UserStateSensorStateJoin.scala:145)
at 
org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.onEventTime(KeyedCoProcessOperator.java:95)
at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:912)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:397)
... 7 more
Caused by: java.lang.ClassNotFoundException: SerialNumber6H0G025
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
... 30 more



[ Exception #3 ]

java.lang.RuntimeException: Exception occurred while processing valve output 
watermark:
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:400)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:252)
at 
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
at 
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
at 
com.mycompany.datascience.scala.pushnotifier.operators.UserStateSensorStateJoin.onTimer(UserStateSensorStateJoin.scala:145)
at 
org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.onEventTime(KeyedCoProcessOperator.java:95)
at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:912)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:397)
... 7 more





Reply via email to