[jira] [Commented] (FLINK-2800) kryo serialization problem
[ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14980331#comment-14980331 ] ASF GitHub Bot commented on FLINK-2800: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1308 > kryo serialization problem > -- > > Key: FLINK-2800 > URL: https://issues.apache.org/jira/browse/FLINK-2800 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 0.10 > Environment: linux ubuntu 12.04 LTS, Java 7 >Reporter: Stefano Bortoli >Assignee: Till Rohrmann > > Performing a cross of two dataset of POJOs I have got the exception below. > The first time I run the process, there was no problem. When I run it the > second time, I have got the exception. My guess is that it could be a race > condition related to the reuse of the Kryo serializer object. However, it > could also be "a bug where type registrations are not properly forwarded to > all Serializers", as suggested by Stephan. > > 2015-10-01 18:18:21 INFO JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at > main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 114 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) > at > org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) > at > org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) > at > org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2800) kryo serialization problem
[ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14980091#comment-14980091 ] ASF GitHub Bot commented on FLINK-2800: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1308#issuecomment-152122533 I will merge this PR. > kryo serialization problem > -- > > Key: FLINK-2800 > URL: https://issues.apache.org/jira/browse/FLINK-2800 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 0.10 > Environment: linux ubuntu 12.04 LTS, Java 7 >Reporter: Stefano Bortoli >Assignee: Till Rohrmann > > Performing a cross of two dataset of POJOs I have got the exception below. > The first time I run the process, there was no problem. When I run it the > second time, I have got the exception. My guess is that it could be a race > condition related to the reuse of the Kryo serializer object. However, it > could also be "a bug where type registrations are not properly forwarded to > all Serializers", as suggested by Stephan. > > 2015-10-01 18:18:21 INFO JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at > main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 114 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) > at > org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) > at > org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) > at > org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2800) kryo serialization problem
[ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1497#comment-1497 ] ASF GitHub Bot commented on FLINK-2800: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/1308 [FLINK-2800] [kryo] Fix Kryo serialization to clear buffered data The Kryo serializer uses Kryo's Output class to buffer individual write operations before it is written to the underlying output stream. This Output class is flushed by Flink's KryoSerializer upon finishing its serialize call. However, in case of an exception when flushing the Output, the buffered data is kept in the buffer. Since Flink uses EOFExceptions to mark that an underlying buffer is full and has to be spilled, for example, it can happen that the record triggering the spilling is written twice after it is rewritten. The reason is that Kryo's Output buffer still contains the serialization data of the failed attempt which is also flushed to the emptied output stream. This duplication of records can lead to corrupted data which eventually let's the Flink program crash. The problem is solved by clearing Kryo's Output when the flush operation was not successful. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixKryoSerialization Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1308.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1308 commit 5618cf7ff65c972dc33c77ba953966224e8c2a1e Author: Till RohrmannDate: 2015-10-28T17:40:41Z [FLINK-2800] [kryo] Fix Kryo serialization to clear buffered data The Kryo serializer uses Kryo's Output class to buffer individual write operations before it is written to the underlying output stream. This Output class is flushed by Flink's KryoSerializer upon finishing its serialize call. However, in case of an exception when flushing the Output, the buffered data is kept in the buffer. Since Flink uses EOFExceptions to mark that an underlying buffer is full and has to be spilled, for example, it can happen that the record triggering the spilling is written twice after it is rewritten. The reason is that Kryo's Output buffer still contains the serialization data of the failed attempt which is also flushed to the emptied output stream. This duplication of records can lead to corrupted data which eventually let's the Flink program crash. The problem is solved by clearing Kryo's Output when the flush operation was not successful. > kryo serialization problem > -- > > Key: FLINK-2800 > URL: https://issues.apache.org/jira/browse/FLINK-2800 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 0.10 > Environment: linux ubuntu 12.04 LTS, Java 7 >Reporter: Stefano Bortoli >Assignee: Till Rohrmann > > Performing a cross of two dataset of POJOs I have got the exception below. > The first time I run the process, there was no problem. When I run it the > second time, I have got the exception. My guess is that it could be a race > condition related to the reuse of the Kryo serializer object. However, it > could also be "a bug where type registrations are not properly forwarded to > all Serializers", as suggested by Stephan. > > 2015-10-01 18:18:21 INFO JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at > main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 114 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) > at > org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) > at >
[jira] [Commented] (FLINK-2800) kryo serialization problem
[ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14979183#comment-14979183 ] ASF GitHub Bot commented on FLINK-2800: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1308#issuecomment-151983062 Very important fix! +1 to merge > kryo serialization problem > -- > > Key: FLINK-2800 > URL: https://issues.apache.org/jira/browse/FLINK-2800 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 0.10 > Environment: linux ubuntu 12.04 LTS, Java 7 >Reporter: Stefano Bortoli >Assignee: Till Rohrmann > > Performing a cross of two dataset of POJOs I have got the exception below. > The first time I run the process, there was no problem. When I run it the > second time, I have got the exception. My guess is that it could be a race > condition related to the reuse of the Kryo serializer object. However, it > could also be "a bug where type registrations are not properly forwarded to > all Serializers", as suggested by Stephan. > > 2015-10-01 18:18:21 INFO JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at > main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 114 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) > at > org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) > at > org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) > at > org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2800) kryo serialization problem
[ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14959143#comment-14959143 ] Stefano Bortoli commented on FLINK-2800: Ok, I have run a couple of tests more, and the process was completed without the KryoPool. However, when I reverted back to the global variable for Input and Output, I could reproduce the error. {quote} 10/15/2015 18:05:59 Job execution switched to status FAILED. 2015-10-15 18:05:59 INFO JobManager:137 - Status of job 1b05e39a7ea019d0a57702eb2a06d64a (Flink Java Job at Thu Oct 15 18:03:33 CEST 2015) changed to FAILED. com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 115 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:667) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:778) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:153) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:787) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:161) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:787) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:251) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:1) at org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) at org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) at org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) 2015-10-15 18:05:59 INFO JobClient:200 - Job execution failed {quote} This version of the method does not work: {code} @SuppressWarnings("unchecked") @Override public T deserialize(DataInputView source) throws IOException { if (source != previousIn) { previousIn = source; DataInputViewStream inputStream = new DataInputViewStream(source); input = new NoFetchingInput(inputStream); } // DataInputViewStream inputStream = new DataInputViewStream(source); // Input input = new NoFetchingInput(inputStream); checkKryoPoolInitialization(); // Kryo kryo = kryoPool.borrow(); try { return (T) kryo.readClassAndObject(input); } catch (KryoException ke) { Throwable cause = ke.getCause(); if (cause instanceof EOFException) { throw (EOFException) cause; } else { throw ke; } } finally { // kryoPool.release(kryo); } } {code} whereas this one worked: {code} @SuppressWarnings("unchecked") @Override public T deserialize(DataInputView source) throws IOException { if (source != previousIn) { previousIn = source; // DataInputViewStream inputStream = new DataInputViewStream(source); // input = new NoFetchingInput(inputStream); } DataInputViewStream inputStream = new DataInputViewStream(source); Input input = new NoFetchingInput(inputStream); checkKryoPoolInitialization(); // Kryo kryo = kryoPool.borrow(); try { return (T) kryo.readClassAndObject(input); } catch (KryoException ke) { Throwable cause = ke.getCause(); if (cause instanceof EOFException) { throw (EOFException) cause; } else { throw ke; } } finally {
[jira] [Commented] (FLINK-2800) kryo serialization problem
[ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14958995#comment-14958995 ] Stefano Bortoli commented on FLINK-2800: Hi guys. I have played around the KryoSerialization class. I have upgraded to Kryo 3.0.2 which offers a very convenient KryoPool with customizable KryoFactory, and then I moved in the method the instantiation of all the fields that were shared by serialization and deserialization method. I have also registered the classes (sometimes Kryo does not serialize the classes in the same order if you don't register them, and can cause problems). I still have to evaluate whether all of these changes are necessary, but with the kryoPool, registering the classes, and moving the Input and Output fields in the methods solved the exceptions above. I will keep on investigating. Anyway, this shows that it was actually a race condition, perhaps because the KryoSerializer that is not cloned as expected along the execution chain. > kryo serialization problem > -- > > Key: FLINK-2800 > URL: https://issues.apache.org/jira/browse/FLINK-2800 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 0.10 > Environment: linux ubuntu 12.04 LTS, Java 7 >Reporter: Stefano Bortoli > > Performing a cross of two dataset of POJOs I have got the exception below. > The first time I run the process, there was no problem. When I run it the > second time, I have got the exception. My guess is that it could be a race > condition related to the reuse of the Kryo serializer object. However, it > could also be "a bug where type registrations are not properly forwarded to > all Serializers", as suggested by Stephan. > > 2015-10-01 18:18:21 INFO JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at > main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 114 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) > at > org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) > at > org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) > at > org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2800) kryo serialization problem
[ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14945146#comment-14945146 ] Stefano Bortoli commented on FLINK-2800: Ok, I have some hints. Executing the cross alone did not create any problem, but adding the following join cause the race condition. {code} DataSet> input1 = env.createFromElements() DataSet > input2 = env.createFromElements() DataSet > out = input1.cross(input2).combineGroup(); DataSet > union = out.join(input1).where(0).equalTo(0).projectFirst(0, 1).projectSecond(1); {code} is causing the race condition with the Kryo serialize, as shown by the exception below. {quote} 2015-10-06 16:54:34 ERROR RegularPactTask:1179 - Error in task code: Cross(Cross at main(FlinkMongoHadoop2Cross.java:147)) (1/4) java.lang.IndexOutOfBoundsException: Index: 98, Size: 73 at java.util.ArrayList.rangeCheck(ArrayList.java:635) at java.util.ArrayList.get(ArrayList.java:411) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:211) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) at org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) at org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) at java.lang.Thread.run(Thread.java:745) {quote} > kryo serialization problem > -- > > Key: FLINK-2800 > URL: https://issues.apache.org/jira/browse/FLINK-2800 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.0 > Environment: linux ubuntu 12.04 LTS, Java 7 >Reporter: Stefano Bortoli > > Performing a cross of two dataset of POJOs I have got the exception below. > The first time I run the process, there was no problem. When I run it the > second time, I have got the exception. My guess is that it could be a race > condition related to the reuse of the Kryo serializer object. However, it > could also be "a bug where type registrations are not properly forwarded to > all Serializers", as suggested by Stephan. > > 2015-10-01 18:18:21 INFO JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at > main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 114 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) > at > org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) > at > org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) > at >
[jira] [Commented] (FLINK-2800) kryo serialization problem
[ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944631#comment-14944631 ] Till Rohrmann commented on FLINK-2800: -- Hi [~stefano.bortoli], do you have small code example to reproduce the problem? Or does it happen with any cross operation? > kryo serialization problem > -- > > Key: FLINK-2800 > URL: https://issues.apache.org/jira/browse/FLINK-2800 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: master > Environment: linux ubuntu 12.04 LTS, Java 7 >Reporter: Stefano Bortoli > > Performing a cross of two dataset of POJOs I have got the exception below. > The first time I run the process, there was no problem. When I run it the > second time, I have got the exception. My guess is that it could be a race > condition related to the reuse of the Kryo serializer object. However, it > could also be "a bug where type registrations are not properly forwarded to > all Serializers", as suggested by Stephan. > > 2015-10-01 18:18:21 INFO JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at > main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 114 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) > at > org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) > at > org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) > at > org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2800) kryo serialization problem
[ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944646#comment-14944646 ] Stefano Bortoli commented on FLINK-2800: I had the problem using both my POJO and BSONObject in the Tuple2 crossing data read from mongodb. However, the code is quite simple. {code} InputFormat
[jira] [Commented] (FLINK-2800) kryo serialization problem
[ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944659#comment-14944659 ] Till Rohrmann commented on FLINK-2800: -- If I'm not mistaken, then this code example is not complete. Would be great if you could fill in the gaps like {{POI2CDACombineGroupFunction}}, {{AddProxy2POIReduceGroupFunction}} and {{GetEntitonForClass}} or if you could distill the whole example down to something like {code} DataSet> input1 = env.createFromElements() DataSet > input2 = env.createFromElements() input1.cross(input2).print() {code} if this reproduces the problem for you. Thanks for your help. > kryo serialization problem > -- > > Key: FLINK-2800 > URL: https://issues.apache.org/jira/browse/FLINK-2800 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: master > Environment: linux ubuntu 12.04 LTS, Java 7 >Reporter: Stefano Bortoli > > Performing a cross of two dataset of POJOs I have got the exception below. > The first time I run the process, there was no problem. When I run it the > second time, I have got the exception. My guess is that it could be a race > condition related to the reuse of the Kryo serializer object. However, it > could also be "a bug where type registrations are not properly forwarded to > all Serializers", as suggested by Stephan. > > 2015-10-01 18:18:21 INFO JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at > main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 114 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) > at > org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) > at > org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) > at > org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2800) kryo serialization problem
[ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944959#comment-14944959 ] Stefano Bortoli commented on FLINK-2800: I don't think the functions are relevant. The "GetEntitonForClass" implements a filter. The combineGroup "POI2CDACombineGroupFunction" takes the results of the cross and applies a join logic to the tuples. "AddProxy2POIReduceGroupFunction" takes the POJO, and appends the results of the join to the target element. The error happens in the serialization of the POJO along the cross operation. The process ends without any problem if I use byte[] in place of BSONObject and I deserialize the object with a new Kryo() instance inside the combineGroup function. > kryo serialization problem > -- > > Key: FLINK-2800 > URL: https://issues.apache.org/jira/browse/FLINK-2800 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.0 > Environment: linux ubuntu 12.04 LTS, Java 7 >Reporter: Stefano Bortoli > > Performing a cross of two dataset of POJOs I have got the exception below. > The first time I run the process, there was no problem. When I run it the > second time, I have got the exception. My guess is that it could be a race > condition related to the reuse of the Kryo serializer object. However, it > could also be "a bug where type registrations are not properly forwarded to > all Serializers", as suggested by Stephan. > > 2015-10-01 18:18:21 INFO JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at > main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 114 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) > at > org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) > at > org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) > at > org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2800) kryo serialization problem
[ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14945078#comment-14945078 ] Stefano Bortoli commented on FLINK-2800: Nope, it comes when I use the BSONObject or directly my POJO and not the byte[] as part of the crossed tuples. If I pass through the byte[] then I deserialize the object in the method, and everything works. Trying to implement what Till suggested, but the cross exceeds the java heap memory, and I keep on having exceptions. {quote} java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper.write(OutputViewDataOutputStreamWrapper.java:70) at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39) at com.esotericsoftware.kryo.io.Output.flush(Output.java:163) at com.esotericsoftware.kryo.io.Output.require(Output.java:142) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:228) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:214) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:36) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:25) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:186) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) at org.apache.flink.api.common.accumulators.SerializedListAccumulator.add(SerializedListAccumulator.java:59) at org.apache.flink.api.java.Utils$CollectHelper.flatMap(Utils.java:127) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79) at org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterFirst(CrossDriver.java:247) at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:160) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) at java.lang.Thread.run(Thread.java:745) {quote} > kryo serialization problem > -- > > Key: FLINK-2800 > URL: https://issues.apache.org/jira/browse/FLINK-2800 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.0 > Environment: linux ubuntu 12.04 LTS, Java 7 >Reporter: Stefano Bortoli > > Performing a cross of two dataset of POJOs I have got the exception below. > The first time I run the process, there was no problem. When I run it the > second time, I have got the exception. My guess is that it could be a race > condition related to the reuse of the Kryo serializer object. However, it > could also be "a bug where type registrations are not properly forwarded to > all Serializers", as suggested by Stephan. > > 2015-10-01 18:18:21 INFO JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at > main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 114 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) >
[jira] [Commented] (FLINK-2800) kryo serialization problem
[ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944985#comment-14944985 ] Till Rohrmann commented on FLINK-2800: -- I agree with you. But it speeds issue tracking tremendously up, if you have a small example which you can copy-paste to start debugging and for which you know that it produced the error on the reporter's machine. > kryo serialization problem > -- > > Key: FLINK-2800 > URL: https://issues.apache.org/jira/browse/FLINK-2800 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.0 > Environment: linux ubuntu 12.04 LTS, Java 7 >Reporter: Stefano Bortoli > > Performing a cross of two dataset of POJOs I have got the exception below. > The first time I run the process, there was no problem. When I run it the > second time, I have got the exception. My guess is that it could be a race > condition related to the reuse of the Kryo serializer object. However, it > could also be "a bug where type registrations are not properly forwarded to > all Serializers", as suggested by Stephan. > > 2015-10-01 18:18:21 INFO JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at > main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 114 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) > at > org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) > at > org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) > at > org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2800) kryo serialization problem
[ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14945067#comment-14945067 ] Fabian Hueske commented on FLINK-2800: -- Each {{KryoSerializer}} has its own {{Kryo}} instance, so {{Kryo}} instances are not shared unless the {{KryoSerializer}} is shared. Serializers are deeply copied before sharing them. I had a look into {{CrossDriver.runBlockedOuterSecond()}} and didn't spot any problem with sharing of serializers. If I understood you correctly, the issue does also occur when you do not register any types with Kryo, right? > kryo serialization problem > -- > > Key: FLINK-2800 > URL: https://issues.apache.org/jira/browse/FLINK-2800 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.0 > Environment: linux ubuntu 12.04 LTS, Java 7 >Reporter: Stefano Bortoli > > Performing a cross of two dataset of POJOs I have got the exception below. > The first time I run the process, there was no problem. When I run it the > second time, I have got the exception. My guess is that it could be a race > condition related to the reuse of the Kryo serializer object. However, it > could also be "a bug where type registrations are not properly forwarded to > all Serializers", as suggested by Stephan. > > 2015-10-01 18:18:21 INFO JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at > main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 114 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) > at > org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) > at > org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) > at > org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2800) kryo serialization problem
[ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14945008#comment-14945008 ] Stefano Bortoli commented on FLINK-2800: I have added a couple of them, but it did not change anything. So, I removed them. I think it is a race condition, which is in general hard to reproduce. We had a similar problem when increasing load on a server using non-blocking calls. Kryo is not thread-safe, and we solved using a pool. The Kryo serializer of flink uses the twitter library, but in the end what is used is simply the constructor, without the pool. Perhaps using the pool would solve these problems. > kryo serialization problem > -- > > Key: FLINK-2800 > URL: https://issues.apache.org/jira/browse/FLINK-2800 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.0 > Environment: linux ubuntu 12.04 LTS, Java 7 >Reporter: Stefano Bortoli > > Performing a cross of two dataset of POJOs I have got the exception below. > The first time I run the process, there was no problem. When I run it the > second time, I have got the exception. My guess is that it could be a race > condition related to the reuse of the Kryo serializer object. However, it > could also be "a bug where type registrations are not properly forwarded to > all Serializers", as suggested by Stephan. > > 2015-10-01 18:18:21 INFO JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at > main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 114 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) > at > org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) > at > org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) > at > org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2800) kryo serialization problem
[ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944991#comment-14944991 ] Fabian Hueske commented on FLINK-2800: -- I tried to reproduce the issue by crossing two small data sets with generic types (serialized using Kryo) but that worked without problems. Do you register any Kryo types in the ExecutionConfig? > kryo serialization problem > -- > > Key: FLINK-2800 > URL: https://issues.apache.org/jira/browse/FLINK-2800 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.0 > Environment: linux ubuntu 12.04 LTS, Java 7 >Reporter: Stefano Bortoli > > Performing a cross of two dataset of POJOs I have got the exception below. > The first time I run the process, there was no problem. When I run it the > second time, I have got the exception. My guess is that it could be a race > condition related to the reuse of the Kryo serializer object. However, it > could also be "a bug where type registrations are not properly forwarded to > all Serializers", as suggested by Stephan. > > 2015-10-01 18:18:21 INFO JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at > main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 114 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) > at > org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) > at > org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) > at > org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2800) kryo serialization problem
[ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14945094#comment-14945094 ] Stefano Bortoli commented on FLINK-2800: Within the combine function I use anyway a Kryo serializer as part of the BSON object is a byte[] containing a serialized object. Do you think it can cause a problem? > kryo serialization problem > -- > > Key: FLINK-2800 > URL: https://issues.apache.org/jira/browse/FLINK-2800 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.0 > Environment: linux ubuntu 12.04 LTS, Java 7 >Reporter: Stefano Bortoli > > Performing a cross of two dataset of POJOs I have got the exception below. > The first time I run the process, there was no problem. When I run it the > second time, I have got the exception. My guess is that it could be a race > condition related to the reuse of the Kryo serializer object. However, it > could also be "a bug where type registrations are not properly forwarded to > all Serializers", as suggested by Stephan. > > 2015-10-01 18:18:21 INFO JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at > main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 114 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) > at > org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) > at > org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) > at > org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2800) kryo serialization problem
[ https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14940984#comment-14940984 ] Stefano Bortoli commented on FLINK-2800: I don't know whether it is the same issue, but after switching from my POJOs to BSONObject I have got a race condition with kryo serialization: 2015-10-02 11:55:26 INFO JobClient:161 - 10/02/2015 11:55:26Cross(Cross at main(FlinkMongoHadoop2LinkPOI2CDA.java:138))(4/4) switched to FAILED java.lang.IndexOutOfBoundsException: Index: 112, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:635) at java.util.ArrayList.get(ArrayList.java:411) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) at org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) at org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) at java.lang.Thread.run(Thread.java:745) > kryo serialization problem > -- > > Key: FLINK-2800 > URL: https://issues.apache.org/jira/browse/FLINK-2800 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: master > Environment: linux ubuntu 12.04 LTS, Java 7 >Reporter: Stefano Bortoli > > Performing a cross of two dataset of POJOs I have got the exception below. > The first time I run the process, there was no problem. When I run it the > second time, I have got the exception. My guess is that it could be a race > condition related to the reuse of the Kryo serializer object. However, it > could also be "a bug where type registrations are not properly forwarded to > all Serializers", as suggested by Stephan. > > 2015-10-01 18:18:21 INFO JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at > main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 114 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180) > at > org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111) > at > org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309) > at > org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)