Hi Stefano, we'll definitely look into it once Flink Forward is over and we've finished the current release work. Thanks for reporting the issue.
Cheers, Till On Tue, Oct 6, 2015 at 9:21 AM, Stefano Bortoli <bort...@okkam.it> wrote: > Hi guys, I could manage to complete the process crossing byte arrays I > deserialize within the group function. However, I think this workaround is > feasible just with relatively simple processes. Any idea/plan about to fix > the serialization problem? > > saluti, > Stefano > > Stefano Bortoli, PhD > > *ENS Technical Director *_______________________________________________ > *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>* > > *Email:* bort...@okkam.it > > *Phone nr: +39 0461 1823913 <%2B39%200461%201823913> * > > *Headquarters:* Trento (Italy), Via Trener 8 > *Registered office:* Trento (Italy), via Segantini 23 > > Confidentially notice. This e-mail transmission may contain legally > privileged and/or confidential information. Please do not read it if you > are not the intended recipient(S). Any use, distribution, reproduction or > disclosure by any other person is strictly prohibited. If you have received > this e-mail in error, please notify the sender and destroy the original > transmission and its attachments without reading or saving it in any manner. > > 2015-10-02 12:05 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>: > >> I don't know whether it is the same issue, but after switching from my >> POJOs to BSONObject I have got a race condition issue with kryo >> serialization. >> I could complete the process using the byte[], but at this point I >> actually need the POJO. I truly believe it is related to the reuse of the >> Kryo instance, which is not thread safe. >> >> >> ------------------------------------------------------------------------------------------------------ >> 2015-10-02 11:55:26 INFO JobClient:161 - 10/02/2015 11:55:26 >> Cross(Cross at main(FlinkMongoHadoop2LinkPOI2CDA.java:138))(4/4) switched >> to FAILED >> java.lang.IndexOutOfBoundsException: Index: 112, Size: 0 >> at java.util.ArrayList.rangeCheck(ArrayList.java:635) >> at java.util.ArrayList.get(ArrayList.java:411) >> at >> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) >> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >> at >> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) >> at >> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) >> at >> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) >> at >> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) >> at >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) >> at >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) >> at java.lang.Thread.run(Thread.java:745) >> >> 2015-10-02 9:46 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>: >> >>> here it is: https://issues.apache.org/jira/browse/FLINK-2800 >>> >>> saluti, >>> Stefano >>> >>> 2015-10-01 18:50 GMT+02:00 Stephan Ewen <se...@apache.org>: >>> >>>> This looks to me like a bug where type registrations are not properly >>>> forwarded to all Serializers. >>>> >>>> Can you open a JIRA ticket for this? >>>> >>>> On Thu, Oct 1, 2015 at 6:46 PM, Stefano Bortoli <s.bort...@gmail.com> >>>> wrote: >>>> >>>>> Hi guys, >>>>> >>>>> I hit a Kryo exception while running a process 'crossing' POJOs >>>>> datasets. I am using the 0.10-milestone-1. >>>>> Checking the serializer: >>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) >>>>> >>>>> I have noticed that the Kryo instance is reused along serialization >>>>> calls (e.g. line 187). However, Kryo is not threadsafe, and therefore I >>>>> think it may cause the problem due to possible race condition. We had >>>>> these >>>>> types of issues solved with a KryoFactory implementing a pool. Perhaps it >>>>> should just a matter of calling the >>>>> >>>>> what should I do? Open a ticket? >>>>> >>>>> Thanks a lot guys for the great job! >>>>> >>>>> saluti, >>>>> Stefano >>>>> >>>>> ----------------------------------------- >>>>> com.esotericsoftware.kryo.KryoException: Encountered unregistered >>>>> class ID: 114 >>>>> at >>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) >>>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>>> at >>>>> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) >>>>> at >>>>> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) >>>>> at >>>>> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) >>>>> at >>>>> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) >>>>> at >>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) >>>>> at >>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>> >>>> >>> >> >