I quickly checked the implementation of duplicate() for both the
KryoSerializer and StreamElementSerializer (which are the only serializers
involved here).
They seem to be correct; especially for the KryoSerializer, since
FLINK-8836 [1] we now always perform a deep copy of the KryoSerializer when
duplicating it, and therefore Kryo instances should not be shared at all
across duplicates.
This seems to rule out any duplication issues with the serializers.

As a maybe relevant question, @Fabian do you register any types /
serializers via ExecutionConfig.registerKryoType(...) /
ExecutionConfig.registerTypeWithKryoSerializer(...)?

Best,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-8836

On Thu, Jul 4, 2019 at 5:29 PM Fabian Wollert <fab...@zalando.de> wrote:

> No, not yet. We lack some knowledge in understanding this. The only thing
> we found out that it happens most probably in the Elasticsearch Sink,
> because:
> - some error messages have the sink in their stack trace.
> - when bumping the ES nodes specs on AWS, the error happens less often (we
> haven't bumped it to super large instances yet, nor got to a state where
> they go away completely. also this would not be the ideal fix)
>
> so my current assumption is that some backpressuring is not happening
> correctly. but this is super vaguely, any other hints or support on this is
> highly appreciated.
>
> --
>
>
> *Fabian WollertZalando SE*
>
> E-Mail: fab...@zalando.de
>
>
> Am Do., 4. Juli 2019 um 11:26 Uhr schrieb Flavio Pompermaier <
> pomperma...@okkam.it>:
>
>> Any news on this? Have you found the cause of the error?
>>
>> On Fri, Jun 28, 2019 at 10:10 AM Flavio Pompermaier <pomperma...@okkam.it>
>> wrote:
>>
>>> Indeed looking at StreamElementSerializer the duplicate() method could
>>> be bugged:
>>>
>>> @Override
>>> public StreamElementSerializer<T> duplicate() {
>>>       TypeSerializer<T> copy = typeSerializer.duplicate();
>>>       return (copy == typeSerializer) ? this : new
>>> StreamElementSerializer<T>(copy);
>>> }
>>>
>>> Is ti safe to return this when copy == typeSerializer ...?
>>>
>>> On Fri, Jun 28, 2019 at 9:51 AM Flavio Pompermaier <pomperma...@okkam.it>
>>> wrote:
>>>
>>>> Hi Fabian,
>>>> we had similar errors with Flink 1.3 [1][2] and the error was caused by
>>>> the fact that a serialised was sharing the same object with multiple
>>>> threads.
>>>> The error was not deterministic and was changing from time to time.
>>>> So maybe it could be something similar (IMHO).
>>>>
>>>> [1] http://codeha.us/apache-flink-users/msg02010.html
>>>> [2]
>>>> http://mail-archives.apache.org/mod_mbox/flink-user/201606.mbox/%3ccaeluf_aic_izyw5f27knter_y6h4+nzg2cpniozqdgm+wk7...@mail.gmail.com%3e
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Fri, Jun 28, 2019 at 8:52 AM Fabian Wollert <fab...@zalando.de>
>>>> wrote:
>>>>
>>>>> additionally we have these coming with this as well all the time:
>>>>>
>>>>> com.esotericsoftware.kryo.KryoException: 
>>>>> java.lang.ArrayIndexOutOfBoundsException
>>>>> Serialization trace:
>>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>>>>   at 
>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>>   at 
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>   at 
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>>>   at 
>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>   at 
>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>>   at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>>
>>>>>
>>>>> or
>>>>>
>>>>>
>>>>> com.esotericsoftware.kryo.KryoException: 
>>>>> java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
>>>>> Serialization trace:
>>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>>>>   at 
>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>>   at 
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>   at 
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>>>   at 
>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>   at 
>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>>   at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
>>>>>   at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>>>>>   at java.util.ArrayList.get(ArrayList.java:433)
>>>>>   at 
>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>>>>>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>>>>   at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>>>>>   at 
>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:131)
>>>>>   at 
>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>>>>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>>>   at 
>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>>>   ... 12 more
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>>
>>>>> *Fabian WollertZalando SE*
>>>>>
>>>>> E-Mail: fab...@zalando.de
>>>>> Phone: +49 152 03479412
>>>>>
>>>>>
>>>>>
>>>>> Am Do., 27. Juni 2019 um 18:29 Uhr schrieb Fabian Wollert <
>>>>> fab...@zalando.de>:
>>>>>
>>>>>> Hi, we have some Flink Jobs (Flink Version 1.7.1) consuming from a
>>>>>> Custom Source and Ingesting into an Elasticsearch Cluster (V.5.6). In
>>>>>> recent times, we see more and more Exceptions happening like this:
>>>>>>
>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: com. ^
>>>>>>  at 
>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>>>  at 
>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>>>  at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>>  at 
>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>>>>  at 
>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>>  at 
>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>>>  at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: java.lang.ClassNotFoundException: com. ^
>>>>>>  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>>  at 
>>>>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
>>>>>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>>  at java.lang.Class.forName0(Native Method)
>>>>>>  at java.lang.Class.forName(Class.java:348)
>>>>>>  at 
>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>>>>>>
>>>>>> ... 13 more
>>>>>>
>>>>>> or
>>>>>>
>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: 
>>>>>> com.fasterxml.jackson.databind.node.DoubleNod    
>>>>>> com.fasterxml.jackson.databind.node.ObjectNode
>>>>>> Serialization trace:
>>>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>>>>>  at 
>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>>>  at 
>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>>>  at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>>  at 
>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>>>>>>  at 
>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>>>>>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>>>>  at 
>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>>>>  at 
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>>  at 
>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>>>>  at 
>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>>  at 
>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>>>  at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: java.lang.ClassNotFoundException: 
>>>>>> com.fasterxml.jackson.databind.node.DoubleNod    
>>>>>> com.fasterxml.jackson.databind.node.ObjectNode
>>>>>>  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>>  at 
>>>>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
>>>>>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>>  at java.lang.Class.forName0(Native Method)
>>>>>>  at java.lang.Class.forName(Class.java:348)
>>>>>>  at 
>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>>>>>>
>>>>>> ... 19 more
>>>>>>
>>>>>> I guess somewhere the serialization between Steps in the TaskManager
>>>>>> fails. Unfortunately, it happens very unpredictably. My question is: has
>>>>>> someone seen this before? If yes, what was your approach on debugging it?
>>>>>> We have this problem mostly right now on high volume event processing, so
>>>>>> only when a high load is processed, then this appears. i tried to
>>>>>> investigate with TRACE log level already, but that keeps the instance 
>>>>>> this
>>>>>> is running on more busy with writing tons of logs, which slows down
>>>>>> processing and eventually does not trigger the exception. I'm wondering 
>>>>>> if
>>>>>> there is another way of investigation here possible.
>>>>>>
>>>>>> Thx in advance for any hints how to debug this.
>>>>>>
>>>>>> --
>>>>>>
>>>>>>
>>>>>> *Fabian WollertZalando SE*
>>>>>>
>>>>>> E-Mail: fab...@zalando.de
>>>>>>
>>>>>
>>>>
>>>
>>>
>>

Reply via email to