*@Fabian do you register any types / serializers via
ExecutionConfig.registerKryoType(...) /
ExecutionConfig.registerTypeWithKryoSerializer(...)?*

Nope, not at all. our flink job code has nowhere the word "Kryo" at all.

thx for looking into it ...

--


*Fabian WollertZalando SE*

E-Mail: fab...@zalando.de

Am Do., 4. Juli 2019 um 11:51 Uhr schrieb Tzu-Li (Gordon) Tai <
tzuli...@apache.org>:

> I quickly checked the implementation of duplicate() for both the
> KryoSerializer and StreamElementSerializer (which are the only serializers
> involved here).
> They seem to be correct; especially for the KryoSerializer, since
> FLINK-8836 [1] we now always perform a deep copy of the KryoSerializer when
> duplicating it, and therefore Kryo instances should not be shared at all
> across duplicates.
> This seems to rule out any duplication issues with the serializers.
>
> As a maybe relevant question, @Fabian do you register any types /
> serializers via ExecutionConfig.registerKryoType(...) /
> ExecutionConfig.registerTypeWithKryoSerializer(...)?
>
> Best,
> Gordon
>
> [1] https://issues.apache.org/jira/browse/FLINK-8836
>
> On Thu, Jul 4, 2019 at 5:29 PM Fabian Wollert <fab...@zalando.de> wrote:
>
>> No, not yet. We lack some knowledge in understanding this. The only thing
>> we found out that it happens most probably in the Elasticsearch Sink,
>> because:
>> - some error messages have the sink in their stack trace.
>> - when bumping the ES nodes specs on AWS, the error happens less often
>> (we haven't bumped it to super large instances yet, nor got to a state
>> where they go away completely. also this would not be the ideal fix)
>>
>> so my current assumption is that some backpressuring is not happening
>> correctly. but this is super vaguely, any other hints or support on this is
>> highly appreciated.
>>
>> --
>>
>>
>> *Fabian WollertZalando SE*
>>
>> E-Mail: fab...@zalando.de
>>
>>
>> Am Do., 4. Juli 2019 um 11:26 Uhr schrieb Flavio Pompermaier <
>> pomperma...@okkam.it>:
>>
>>> Any news on this? Have you found the cause of the error?
>>>
>>> On Fri, Jun 28, 2019 at 10:10 AM Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
>>>> Indeed looking at StreamElementSerializer the duplicate() method could
>>>> be bugged:
>>>>
>>>> @Override
>>>> public StreamElementSerializer<T> duplicate() {
>>>>       TypeSerializer<T> copy = typeSerializer.duplicate();
>>>>       return (copy == typeSerializer) ? this : new
>>>> StreamElementSerializer<T>(copy);
>>>> }
>>>>
>>>> Is ti safe to return this when copy == typeSerializer ...?
>>>>
>>>> On Fri, Jun 28, 2019 at 9:51 AM Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>>
>>>>> Hi Fabian,
>>>>> we had similar errors with Flink 1.3 [1][2] and the error was caused
>>>>> by the fact that a serialised was sharing the same object with multiple
>>>>> threads.
>>>>> The error was not deterministic and was changing from time to time.
>>>>> So maybe it could be something similar (IMHO).
>>>>>
>>>>> [1] http://codeha.us/apache-flink-users/msg02010.html
>>>>> [2]
>>>>> http://mail-archives.apache.org/mod_mbox/flink-user/201606.mbox/%3ccaeluf_aic_izyw5f27knter_y6h4+nzg2cpniozqdgm+wk7...@mail.gmail.com%3e
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>> On Fri, Jun 28, 2019 at 8:52 AM Fabian Wollert <fab...@zalando.de>
>>>>> wrote:
>>>>>
>>>>>> additionally we have these coming with this as well all the time:
>>>>>>
>>>>>> com.esotericsoftware.kryo.KryoException: 
>>>>>> java.lang.ArrayIndexOutOfBoundsException
>>>>>> Serialization trace:
>>>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>>>>>  at 
>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>>>  at 
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>>  at 
>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>>>>  at 
>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>>  at 
>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>>>  at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>>>
>>>>>>
>>>>>> or
>>>>>>
>>>>>>
>>>>>> com.esotericsoftware.kryo.KryoException: 
>>>>>> java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
>>>>>> Serialization trace:
>>>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>>>>>  at 
>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>>>  at 
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>>  at 
>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>>>>  at 
>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>>  at 
>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>>>  at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
>>>>>>  at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>>>>>>  at java.util.ArrayList.get(ArrayList.java:433)
>>>>>>  at 
>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>>>>>>  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>>>>>  at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>>>>>>  at 
>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:131)
>>>>>>  at 
>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>>>>>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>>>>  at 
>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>>>>  ... 12 more
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>>
>>>>>> *Fabian WollertZalando SE*
>>>>>>
>>>>>> E-Mail: fab...@zalando.de
>>>>>> Phone: +49 152 03479412
>>>>>>
>>>>>>
>>>>>>
>>>>>> Am Do., 27. Juni 2019 um 18:29 Uhr schrieb Fabian Wollert <
>>>>>> fab...@zalando.de>:
>>>>>>
>>>>>>> Hi, we have some Flink Jobs (Flink Version 1.7.1) consuming from a
>>>>>>> Custom Source and Ingesting into an Elasticsearch Cluster (V.5.6). In
>>>>>>> recent times, we see more and more Exceptions happening like this:
>>>>>>>
>>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: com. ^
>>>>>>>         at 
>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>>>>         at 
>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>>>>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>>>         at 
>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>>>         at 
>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>>>>>         at 
>>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>>>         at 
>>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>>>> Caused by: java.lang.ClassNotFoundException: com. ^
>>>>>>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>>>         at 
>>>>>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>>>         at java.lang.Class.forName0(Native Method)
>>>>>>>         at java.lang.Class.forName(Class.java:348)
>>>>>>>         at 
>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>>>>>>>
>>>>>>> ... 13 more
>>>>>>>
>>>>>>> or
>>>>>>>
>>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: 
>>>>>>> com.fasterxml.jackson.databind.node.DoubleNod    
>>>>>>> com.fasterxml.jackson.databind.node.ObjectNode
>>>>>>> Serialization trace:
>>>>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>>>>>>         at 
>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>>>>         at 
>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>>>>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>>>         at 
>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>>>         at 
>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>>>>>>>         at 
>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>>>>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>>>>>         at 
>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>>>>>         at 
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>>>         at 
>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>>>         at 
>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>>>>>         at 
>>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>>>         at 
>>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>>>> Caused by: java.lang.ClassNotFoundException: 
>>>>>>> com.fasterxml.jackson.databind.node.DoubleNod    
>>>>>>> com.fasterxml.jackson.databind.node.ObjectNode
>>>>>>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>>>         at 
>>>>>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>>>         at java.lang.Class.forName0(Native Method)
>>>>>>>         at java.lang.Class.forName(Class.java:348)
>>>>>>>         at 
>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>>>>>>>
>>>>>>> ... 19 more
>>>>>>>
>>>>>>> I guess somewhere the serialization between Steps in the TaskManager
>>>>>>> fails. Unfortunately, it happens very unpredictably. My question is: has
>>>>>>> someone seen this before? If yes, what was your approach on debugging 
>>>>>>> it?
>>>>>>> We have this problem mostly right now on high volume event processing, 
>>>>>>> so
>>>>>>> only when a high load is processed, then this appears. i tried to
>>>>>>> investigate with TRACE log level already, but that keeps the instance 
>>>>>>> this
>>>>>>> is running on more busy with writing tons of logs, which slows down
>>>>>>> processing and eventually does not trigger the exception. I'm wondering 
>>>>>>> if
>>>>>>> there is another way of investigation here possible.
>>>>>>>
>>>>>>> Thx in advance for any hints how to debug this.
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>>
>>>>>>> *Fabian WollertZalando SE*
>>>>>>>
>>>>>>> E-Mail: fab...@zalando.de
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>

Reply via email to