Hi Stefano,

we'll definitely look into it once Flink Forward is over and we've finished
the current release work. Thanks for reporting the issue.

Cheers,
Till

On Tue, Oct 6, 2015 at 9:21 AM, Stefano Bortoli <bort...@okkam.it> wrote:

> Hi guys, I could manage to complete the process crossing byte arrays I
> deserialize within the group function. However, I think this workaround is
> feasible just with relatively simple processes. Any idea/plan about to fix
> the serialization problem?
>
> saluti,
> Stefano
>
> Stefano Bortoli, PhD
>
> *ENS Technical Director *_______________________________________________
> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*
>
> *Email:* bort...@okkam.it
>
> *Phone nr: +39 0461 1823913 <%2B39%200461%201823913> *
>
> *Headquarters:* Trento (Italy), Via Trener 8
> *Registered office:* Trento (Italy), via Segantini 23
>
> Confidentially notice. This e-mail transmission may contain legally
> privileged and/or confidential information. Please do not read it if you
> are not the intended recipient(S). Any use, distribution, reproduction or
> disclosure by any other person is strictly prohibited. If you have received
> this e-mail in error, please notify the sender and destroy the original
> transmission and its attachments without reading or saving it in any manner.
>
> 2015-10-02 12:05 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>:
>
>> I don't know whether it is the same issue, but after switching from my
>> POJOs to BSONObject I have got a race condition issue with kryo
>> serialization.
>> I could complete the process using the byte[], but at this point I
>> actually need the POJO. I truly believe it is related to the reuse of the
>> Kryo instance, which is not thread safe.
>>
>>
>> ------------------------------------------------------------------------------------------------------
>> 2015-10-02 11:55:26 INFO  JobClient:161 - 10/02/2015 11:55:26
>> Cross(Cross at main(FlinkMongoHadoop2LinkPOI2CDA.java:138))(4/4) switched
>> to FAILED
>> java.lang.IndexOutOfBoundsException: Index: 112, Size: 0
>>     at java.util.ArrayList.rangeCheck(ArrayList.java:635)
>>     at java.util.ArrayList.get(ArrayList.java:411)
>>     at
>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>>     at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>>     at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>>     at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>>     at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>     at
>> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>>     at
>> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>>     at
>> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>>     at
>> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>>     at
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>>     at
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>> 2015-10-02 9:46 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>:
>>
>>> here it is: https://issues.apache.org/jira/browse/FLINK-2800
>>>
>>> saluti,
>>> Stefano
>>>
>>> 2015-10-01 18:50 GMT+02:00 Stephan Ewen <se...@apache.org>:
>>>
>>>> This looks to me like a bug where type registrations are not properly
>>>> forwarded to all Serializers.
>>>>
>>>> Can you open a JIRA ticket for this?
>>>>
>>>> On Thu, Oct 1, 2015 at 6:46 PM, Stefano Bortoli <s.bort...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi guys,
>>>>>
>>>>> I hit a Kryo exception while running a process 'crossing' POJOs
>>>>> datasets. I am using the 0.10-milestone-1.
>>>>> Checking the serializer:
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>>>>>
>>>>> I have noticed that the Kryo instance is reused along serialization
>>>>> calls (e.g. line 187).  However, Kryo is not threadsafe, and therefore I
>>>>> think it may cause the problem due to possible race condition. We had 
>>>>> these
>>>>> types of issues solved with a KryoFactory implementing a pool. Perhaps it
>>>>> should just a matter of calling the
>>>>>
>>>>> what should I do? Open a ticket?
>>>>>
>>>>> Thanks a lot guys for the great job!
>>>>>
>>>>> saluti,
>>>>> Stefano
>>>>>
>>>>> -----------------------------------------
>>>>> com.esotericsoftware.kryo.KryoException: Encountered unregistered
>>>>> class ID: 114
>>>>>     at
>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>>>>>     at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>     at
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>>>>>     at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>>>>>     at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>>     at
>>>>> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>>>>>     at
>>>>> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>>>>>     at
>>>>> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>>>>>     at
>>>>> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>>>>>     at
>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>>>>>     at
>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to