I don't know whether it is the same issue, but after switching from my POJOs to BSONObject I have got a race condition issue with kryo serialization. I could complete the process using the byte[], but at this point I actually need the POJO. I truly believe it is related to the reuse of the Kryo instance, which is not thread safe.
------------------------------------------------------------------------------------------------------ 2015-10-02 11:55:26 INFO JobClient:161 - 10/02/2015 11:55:26 Cross(Cross at main(FlinkMongoHadoop2LinkPOI2CDA.java:138))(4/4) switched to FAILED java.lang.IndexOutOfBoundsException: Index: 112, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:635) at java.util.ArrayList.get(ArrayList.java:411) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) at org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) at org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) at java.lang.Thread.run(Thread.java:745) 2015-10-02 9:46 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>: > here it is: https://issues.apache.org/jira/browse/FLINK-2800 > > saluti, > Stefano > > 2015-10-01 18:50 GMT+02:00 Stephan Ewen <se...@apache.org>: > >> This looks to me like a bug where type registrations are not properly >> forwarded to all Serializers. >> >> Can you open a JIRA ticket for this? >> >> On Thu, Oct 1, 2015 at 6:46 PM, Stefano Bortoli <s.bort...@gmail.com> >> wrote: >> >>> Hi guys, >>> >>> I hit a Kryo exception while running a process 'crossing' POJOs >>> datasets. I am using the 0.10-milestone-1. >>> Checking the serializer: >>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) >>> >>> I have noticed that the Kryo instance is reused along serialization >>> calls (e.g. line 187). However, Kryo is not threadsafe, and therefore I >>> think it may cause the problem due to possible race condition. We had these >>> types of issues solved with a KryoFactory implementing a pool. Perhaps it >>> should just a matter of calling the >>> >>> what should I do? Open a ticket? >>> >>> Thanks a lot guys for the great job! >>> >>> saluti, >>> Stefano >>> >>> ----------------------------------------- >>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class >>> ID: 114 >>> at >>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) >>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>> at >>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) >>> at >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) >>> at >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>> at >>> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) >>> at >>> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) >>> at >>> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) >>> at >>> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) >>> at >>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) >>> at >>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) >>> at java.lang.Thread.run(Thread.java:745) >>> >> >> >