[jira] [Commented] (FLINK-2800) kryo serialization problem

2015-10-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14980331#comment-14980331
 ] 

ASF GitHub Bot commented on FLINK-2800:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1308


> kryo serialization problem
> --
>
> Key: FLINK-2800
> URL: https://issues.apache.org/jira/browse/FLINK-2800
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 0.10
> Environment: linux ubuntu 12.04 LTS, Java 7
>Reporter: Stefano Bortoli
>Assignee: Till Rohrmann
>
> Performing a cross of two dataset of POJOs I have got the exception below. 
> The first time I run the process, there was no problem. When I run it the 
> second time, I have got the exception. My guess is that it could be a race 
> condition related to the reuse of the Kryo serializer object. However, it 
> could also be "a bug where type registrations are not properly forwarded to 
> all Serializers", as suggested by Stephan.
> 
> 2015-10-01 18:18:21 INFO  JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at 
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 114
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>   at 
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2800) kryo serialization problem

2015-10-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14980091#comment-14980091
 ] 

ASF GitHub Bot commented on FLINK-2800:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1308#issuecomment-152122533
  
I will merge this PR.


> kryo serialization problem
> --
>
> Key: FLINK-2800
> URL: https://issues.apache.org/jira/browse/FLINK-2800
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 0.10
> Environment: linux ubuntu 12.04 LTS, Java 7
>Reporter: Stefano Bortoli
>Assignee: Till Rohrmann
>
> Performing a cross of two dataset of POJOs I have got the exception below. 
> The first time I run the process, there was no problem. When I run it the 
> second time, I have got the exception. My guess is that it could be a race 
> condition related to the reuse of the Kryo serializer object. However, it 
> could also be "a bug where type registrations are not properly forwarded to 
> all Serializers", as suggested by Stephan.
> 
> 2015-10-01 18:18:21 INFO  JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at 
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 114
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>   at 
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2800) kryo serialization problem

2015-10-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1497#comment-1497
 ] 

ASF GitHub Bot commented on FLINK-2800:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/1308

[FLINK-2800] [kryo] Fix Kryo serialization to clear buffered data

The Kryo serializer uses Kryo's Output class to buffer individual write 
operations before
it is written to the underlying output stream. This Output class is flushed 
by Flink's
KryoSerializer upon finishing its serialize call. However, in case of an 
exception when
flushing the Output, the buffered data is kept in the buffer. Since Flink 
uses EOFExceptions
to mark that an underlying buffer is full and has to be spilled, for 
example, it can happen
that the record triggering the spilling is written twice after it is 
rewritten. The reason
is that Kryo's Output buffer still contains the serialization data of the 
failed attempt which
is also flushed to the emptied output stream.

This duplication of records can lead to corrupted data which eventually 
let's the Flink program
crash. The problem is solved by clearing Kryo's Output when the flush 
operation was not successful.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixKryoSerialization

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1308.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1308


commit 5618cf7ff65c972dc33c77ba953966224e8c2a1e
Author: Till Rohrmann 
Date:   2015-10-28T17:40:41Z

[FLINK-2800] [kryo] Fix Kryo serialization to clear buffered data

The Kryo serializer uses Kryo's Output class to buffer individual write 
operations before
it is written to the underlying output stream. This Output class is flushed 
by Flink's
KryoSerializer upon finishing its serialize call. However, in case of an 
exception when
flushing the Output, the buffered data is kept in the buffer. Since Flink 
uses EOFExceptions
to mark that an underlying buffer is full and has to be spilled, for 
example, it can happen
that the record triggering the spilling is written twice after it is 
rewritten. The reason
is that Kryo's Output buffer still contains the serialization data of the 
failed attempt which
is also flushed to the emptied output stream.

This duplication of records can lead to corrupted data which eventually 
let's the Flink program
crash. The problem is solved by clearing Kryo's Output when the flush 
operation was not successful.




> kryo serialization problem
> --
>
> Key: FLINK-2800
> URL: https://issues.apache.org/jira/browse/FLINK-2800
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 0.10
> Environment: linux ubuntu 12.04 LTS, Java 7
>Reporter: Stefano Bortoli
>Assignee: Till Rohrmann
>
> Performing a cross of two dataset of POJOs I have got the exception below. 
> The first time I run the process, there was no problem. When I run it the 
> second time, I have got the exception. My guess is that it could be a race 
> condition related to the reuse of the Kryo serializer object. However, it 
> could also be "a bug where type registrations are not properly forwarded to 
> all Serializers", as suggested by Stephan.
> 
> 2015-10-01 18:18:21 INFO  JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at 
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 114
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>   at 
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>   at 
> 

[jira] [Commented] (FLINK-2800) kryo serialization problem

2015-10-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14979183#comment-14979183
 ] 

ASF GitHub Bot commented on FLINK-2800:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1308#issuecomment-151983062
  
Very important fix!
+1 to merge


> kryo serialization problem
> --
>
> Key: FLINK-2800
> URL: https://issues.apache.org/jira/browse/FLINK-2800
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 0.10
> Environment: linux ubuntu 12.04 LTS, Java 7
>Reporter: Stefano Bortoli
>Assignee: Till Rohrmann
>
> Performing a cross of two dataset of POJOs I have got the exception below. 
> The first time I run the process, there was no problem. When I run it the 
> second time, I have got the exception. My guess is that it could be a race 
> condition related to the reuse of the Kryo serializer object. However, it 
> could also be "a bug where type registrations are not properly forwarded to 
> all Serializers", as suggested by Stephan.
> 
> 2015-10-01 18:18:21 INFO  JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at 
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 114
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>   at 
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2800) kryo serialization problem

2015-10-15 Thread Stefano Bortoli (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14959143#comment-14959143
 ] 

Stefano Bortoli commented on FLINK-2800:


Ok, I have run a couple of tests more, and the process was completed without 
the KryoPool. However, when I reverted back to the global variable for Input 
and Output, I could reproduce the error. 

{quote}
10/15/2015 18:05:59 Job execution switched to status FAILED.
2015-10-15 18:05:59 INFO  JobManager:137 - Status of job 
1b05e39a7ea019d0a57702eb2a06d64a (Flink Java Job at Thu Oct 15 18:03:33 CEST 
2015) changed to FAILED.
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 115
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:667)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:778)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:153)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:787)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:161)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:787)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:251)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:1)
at 
org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
at 
org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
at 
org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
at 
org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
2015-10-15 18:05:59 INFO  JobClient:200 - Job execution failed
{quote}

This version of the method does not work:
{code}
@SuppressWarnings("unchecked")
@Override
public T deserialize(DataInputView source) throws IOException {
if (source != previousIn) {
previousIn = source;
DataInputViewStream inputStream = new 
DataInputViewStream(source);
input = new NoFetchingInput(inputStream);
}
//  DataInputViewStream inputStream = new 
DataInputViewStream(source);
//  Input input = new NoFetchingInput(inputStream);
checkKryoPoolInitialization();
//  Kryo kryo = kryoPool.borrow();
try {
return (T) kryo.readClassAndObject(input);
} catch (KryoException ke) {
Throwable cause = ke.getCause();

if (cause instanceof EOFException) {
throw (EOFException) cause;
} else {
throw ke;
}
} finally {
//  kryoPool.release(kryo);
}
}
{code}

whereas this one worked:
{code}
@SuppressWarnings("unchecked")
@Override
public T deserialize(DataInputView source) throws IOException {
if (source != previousIn) {
previousIn = source;
//  DataInputViewStream inputStream = new 
DataInputViewStream(source);
//  input = new NoFetchingInput(inputStream);
}
DataInputViewStream inputStream = new 
DataInputViewStream(source);
Input input = new NoFetchingInput(inputStream);
checkKryoPoolInitialization();
//  Kryo kryo = kryoPool.borrow();
try {
return (T) kryo.readClassAndObject(input);
} catch (KryoException ke) {
Throwable cause = ke.getCause();

if (cause instanceof EOFException) {
throw (EOFException) cause;
} else {
throw ke;
}
} finally {

[jira] [Commented] (FLINK-2800) kryo serialization problem

2015-10-15 Thread Stefano Bortoli (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14958995#comment-14958995
 ] 

Stefano Bortoli commented on FLINK-2800:


Hi guys. I have played around the KryoSerialization class. I have upgraded to 
Kryo 3.0.2 which offers a very convenient KryoPool with customizable 
KryoFactory, and then I moved in the method the instantiation of all the fields 
that were shared by serialization and deserialization method. I have also 
registered the classes (sometimes Kryo does not serialize the classes in the 
same order if you don't register them, and can cause problems). I still have to 
evaluate whether all of these changes are necessary, but with the kryoPool, 
registering the classes, and moving the Input and Output fields in the methods 
solved the exceptions above. I will keep on investigating.

Anyway, this shows that it was actually a race condition, perhaps because the 
KryoSerializer that is not cloned as expected along the execution chain.

> kryo serialization problem
> --
>
> Key: FLINK-2800
> URL: https://issues.apache.org/jira/browse/FLINK-2800
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 0.10
> Environment: linux ubuntu 12.04 LTS, Java 7
>Reporter: Stefano Bortoli
>
> Performing a cross of two dataset of POJOs I have got the exception below. 
> The first time I run the process, there was no problem. When I run it the 
> second time, I have got the exception. My guess is that it could be a race 
> condition related to the reuse of the Kryo serializer object. However, it 
> could also be "a bug where type registrations are not properly forwarded to 
> all Serializers", as suggested by Stephan.
> 
> 2015-10-01 18:18:21 INFO  JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at 
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 114
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>   at 
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2800) kryo serialization problem

2015-10-06 Thread Stefano Bortoli (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14945146#comment-14945146
 ] 

Stefano Bortoli commented on FLINK-2800:


Ok, I have some hints. Executing the cross alone did not create any problem, 
but adding the following join cause the race condition. 

{code}
DataSet> input1 = env.createFromElements()
DataSet> input2 = env.createFromElements()
DataSet> out = input1.cross(input2).combineGroup();
DataSet> union = 
out.join(input1).where(0).equalTo(0).projectFirst(0, 1).projectSecond(1);
{code}

is causing the race condition with the Kryo serialize, as shown by the 
exception below. 

{quote}
2015-10-06 16:54:34 ERROR RegularPactTask:1179 - Error in task code:  
Cross(Cross at main(FlinkMongoHadoop2Cross.java:147)) (1/4)
java.lang.IndexOutOfBoundsException: Index: 98, Size: 73
at java.util.ArrayList.rangeCheck(ArrayList.java:635)
at java.util.ArrayList.get(ArrayList.java:411)
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:211)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at 
org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
at 
org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
at 
org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
at 
org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)
{quote}

> kryo serialization problem
> --
>
> Key: FLINK-2800
> URL: https://issues.apache.org/jira/browse/FLINK-2800
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.0
> Environment: linux ubuntu 12.04 LTS, Java 7
>Reporter: Stefano Bortoli
>
> Performing a cross of two dataset of POJOs I have got the exception below. 
> The first time I run the process, there was no problem. When I run it the 
> second time, I have got the exception. My guess is that it could be a race 
> condition related to the reuse of the Kryo serializer object. However, it 
> could also be "a bug where type registrations are not properly forwarded to 
> all Serializers", as suggested by Stephan.
> 
> 2015-10-01 18:18:21 INFO  JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at 
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 114
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>   at 
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>   at 
> 

[jira] [Commented] (FLINK-2800) kryo serialization problem

2015-10-06 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944631#comment-14944631
 ] 

Till Rohrmann commented on FLINK-2800:
--

Hi [~stefano.bortoli], do you have small code example to reproduce the problem? 
Or does it happen with any cross operation?

> kryo serialization problem
> --
>
> Key: FLINK-2800
> URL: https://issues.apache.org/jira/browse/FLINK-2800
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: master
> Environment: linux ubuntu 12.04 LTS, Java 7
>Reporter: Stefano Bortoli
>
> Performing a cross of two dataset of POJOs I have got the exception below. 
> The first time I run the process, there was no problem. When I run it the 
> second time, I have got the exception. My guess is that it could be a race 
> condition related to the reuse of the Kryo serializer object. However, it 
> could also be "a bug where type registrations are not properly forwarded to 
> all Serializers", as suggested by Stephan.
> 
> 2015-10-01 18:18:21 INFO  JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at 
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 114
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>   at 
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2800) kryo serialization problem

2015-10-06 Thread Stefano Bortoli (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944646#comment-14944646
 ] 

Stefano Bortoli commented on FLINK-2800:


I had the problem using both my POJO and BSONObject in the Tuple2 crossing data 
read from mongodb. However, the code is quite simple.

{code}
InputFormat mapreduceInputFormat = new 
MyMongoInputFormat();
HadoopInputFormat hdIf = new 
HadoopInputFormat(
mapreduceInputFormat, Object.class, 
BSONObject.class, job);

// specify connection parameters
hdIf.getConfiguration().set(MongoUtil.MONGO_INPUT_URI_PROPERTY,collectionsUri);

DataSet> input = env.createInput(hdIf);
DataSet> pois = input.flatMap(   new 
GetEntitonForClass(CulturalSitePointOfInterest.class.getSimpleName()));
DataSet> cdas = 
input.flatMap(GetEntitonForClass(CulturalDigitalArtefact.class.getSimpleName()));

DataSet> out = pois.cross(cdas)
.combineGroup(new 
POI2CDACombineGroupFunction()).distinct().setParallelism(parallelism);

DataSet> union = 
out.join(pois).where(0).equalTo(0).projectFirst(0, 1).projectSecond(1);

DataSet> writable = union.groupBy(0)  
.reduceGroup(
new 
AddProxy2POIReduceGroupFunction()).setParallelism(parallelism);

MongoConfigUtil.setOutputURI(job.getConfiguration(), collectionsUri);
// emit result (this works only locally)
writable.output(new MongoHadoop2OutputFormat(new MongoOutputFormat(),
job));
// execute program
env.execute("Mongodb POI to CDA linker");
{code}

> kryo serialization problem
> --
>
> Key: FLINK-2800
> URL: https://issues.apache.org/jira/browse/FLINK-2800
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: master
> Environment: linux ubuntu 12.04 LTS, Java 7
>Reporter: Stefano Bortoli
>
> Performing a cross of two dataset of POJOs I have got the exception below. 
> The first time I run the process, there was no problem. When I run it the 
> second time, I have got the exception. My guess is that it could be a race 
> condition related to the reuse of the Kryo serializer object. However, it 
> could also be "a bug where type registrations are not properly forwarded to 
> all Serializers", as suggested by Stephan.
> 
> 2015-10-01 18:18:21 INFO  JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at 
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 114
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>   at 
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2800) kryo serialization problem

2015-10-06 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944659#comment-14944659
 ] 

Till Rohrmann commented on FLINK-2800:
--

If I'm not mistaken, then this code example is not complete. Would be great if 
you could fill in the gaps like {{POI2CDACombineGroupFunction}}, 
{{AddProxy2POIReduceGroupFunction}} and {{GetEntitonForClass}} or if you could 
distill the whole example down to something like

{code}
DataSet> input1 = env.createFromElements()
DataSet> input2 = env.createFromElements()

input1.cross(input2).print()
{code}

if this reproduces the problem for you. Thanks for your help.

> kryo serialization problem
> --
>
> Key: FLINK-2800
> URL: https://issues.apache.org/jira/browse/FLINK-2800
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: master
> Environment: linux ubuntu 12.04 LTS, Java 7
>Reporter: Stefano Bortoli
>
> Performing a cross of two dataset of POJOs I have got the exception below. 
> The first time I run the process, there was no problem. When I run it the 
> second time, I have got the exception. My guess is that it could be a race 
> condition related to the reuse of the Kryo serializer object. However, it 
> could also be "a bug where type registrations are not properly forwarded to 
> all Serializers", as suggested by Stephan.
> 
> 2015-10-01 18:18:21 INFO  JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at 
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 114
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>   at 
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2800) kryo serialization problem

2015-10-06 Thread Stefano Bortoli (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944959#comment-14944959
 ] 

Stefano Bortoli commented on FLINK-2800:


I don't think the functions are relevant. The "GetEntitonForClass" implements a 
filter. The combineGroup "POI2CDACombineGroupFunction" takes the results of the 
cross and applies a join logic to the tuples. "AddProxy2POIReduceGroupFunction" 
takes the POJO, and appends the results of the join to the target element. The 
error happens in the serialization of the POJO along the cross operation. The 
process ends without any problem if I use byte[] in place of BSONObject and I 
deserialize the object with a new Kryo() instance inside the combineGroup 
function. 

> kryo serialization problem
> --
>
> Key: FLINK-2800
> URL: https://issues.apache.org/jira/browse/FLINK-2800
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.0
> Environment: linux ubuntu 12.04 LTS, Java 7
>Reporter: Stefano Bortoli
>
> Performing a cross of two dataset of POJOs I have got the exception below. 
> The first time I run the process, there was no problem. When I run it the 
> second time, I have got the exception. My guess is that it could be a race 
> condition related to the reuse of the Kryo serializer object. However, it 
> could also be "a bug where type registrations are not properly forwarded to 
> all Serializers", as suggested by Stephan.
> 
> 2015-10-01 18:18:21 INFO  JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at 
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 114
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>   at 
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2800) kryo serialization problem

2015-10-06 Thread Stefano Bortoli (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14945078#comment-14945078
 ] 

Stefano Bortoli commented on FLINK-2800:


Nope, it comes when I use the BSONObject or directly my POJO and not the byte[] 
as part of the crossed tuples. If I pass through the byte[] then I deserialize 
the object in the method, and everything works. Trying to implement what Till 
suggested, but the cross exceeds the java heap memory, and I keep on having 
exceptions. 

{quote}
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at 
org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper.write(OutputViewDataOutputStreamWrapper.java:70)
at 
org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:228)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:214)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:36)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:25)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:186)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
at 
org.apache.flink.api.common.accumulators.SerializedListAccumulator.add(SerializedListAccumulator.java:59)
at org.apache.flink.api.java.Utils$CollectHelper.flatMap(Utils.java:127)
at 
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
at 
org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterFirst(CrossDriver.java:247)
at 
org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:160)
at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)
{quote}

> kryo serialization problem
> --
>
> Key: FLINK-2800
> URL: https://issues.apache.org/jira/browse/FLINK-2800
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.0
> Environment: linux ubuntu 12.04 LTS, Java 7
>Reporter: Stefano Bortoli
>
> Performing a cross of two dataset of POJOs I have got the exception below. 
> The first time I run the process, there was no problem. When I run it the 
> second time, I have got the exception. My guess is that it could be a race 
> condition related to the reuse of the Kryo serializer object. However, it 
> could also be "a bug where type registrations are not properly forwarded to 
> all Serializers", as suggested by Stephan.
> 
> 2015-10-01 18:18:21 INFO  JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at 
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 114
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>   

[jira] [Commented] (FLINK-2800) kryo serialization problem

2015-10-06 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944985#comment-14944985
 ] 

Till Rohrmann commented on FLINK-2800:
--

I agree with you. But it speeds issue tracking tremendously up, if you have a 
small example which you can copy-paste to start debugging and for which you 
know that it produced the error on the reporter's machine.

> kryo serialization problem
> --
>
> Key: FLINK-2800
> URL: https://issues.apache.org/jira/browse/FLINK-2800
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.0
> Environment: linux ubuntu 12.04 LTS, Java 7
>Reporter: Stefano Bortoli
>
> Performing a cross of two dataset of POJOs I have got the exception below. 
> The first time I run the process, there was no problem. When I run it the 
> second time, I have got the exception. My guess is that it could be a race 
> condition related to the reuse of the Kryo serializer object. However, it 
> could also be "a bug where type registrations are not properly forwarded to 
> all Serializers", as suggested by Stephan.
> 
> 2015-10-01 18:18:21 INFO  JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at 
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 114
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>   at 
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2800) kryo serialization problem

2015-10-06 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14945067#comment-14945067
 ] 

Fabian Hueske commented on FLINK-2800:
--

Each {{KryoSerializer}} has its own {{Kryo}} instance, so {{Kryo}} instances 
are not shared unless the {{KryoSerializer}} is shared. 
Serializers are deeply copied before sharing them. I had a look into 
{{CrossDriver.runBlockedOuterSecond()}} and didn't spot any problem with 
sharing of serializers.

If I understood you correctly, the issue does also occur when you do not 
register any types with Kryo, right?

> kryo serialization problem
> --
>
> Key: FLINK-2800
> URL: https://issues.apache.org/jira/browse/FLINK-2800
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.0
> Environment: linux ubuntu 12.04 LTS, Java 7
>Reporter: Stefano Bortoli
>
> Performing a cross of two dataset of POJOs I have got the exception below. 
> The first time I run the process, there was no problem. When I run it the 
> second time, I have got the exception. My guess is that it could be a race 
> condition related to the reuse of the Kryo serializer object. However, it 
> could also be "a bug where type registrations are not properly forwarded to 
> all Serializers", as suggested by Stephan.
> 
> 2015-10-01 18:18:21 INFO  JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at 
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 114
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>   at 
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2800) kryo serialization problem

2015-10-06 Thread Stefano Bortoli (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14945008#comment-14945008
 ] 

Stefano Bortoli commented on FLINK-2800:


I have added a couple of them, but it did not change anything. So, I removed 
them. I think it is a race condition, which is in general hard to reproduce. We 
had a similar problem when increasing load on a server using non-blocking 
calls. Kryo is not thread-safe, and we solved using a pool. The Kryo serializer 
of flink uses the twitter library, but in the end what is used is simply the 
constructor, without the pool. Perhaps using the pool would solve these 
problems.

> kryo serialization problem
> --
>
> Key: FLINK-2800
> URL: https://issues.apache.org/jira/browse/FLINK-2800
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.0
> Environment: linux ubuntu 12.04 LTS, Java 7
>Reporter: Stefano Bortoli
>
> Performing a cross of two dataset of POJOs I have got the exception below. 
> The first time I run the process, there was no problem. When I run it the 
> second time, I have got the exception. My guess is that it could be a race 
> condition related to the reuse of the Kryo serializer object. However, it 
> could also be "a bug where type registrations are not properly forwarded to 
> all Serializers", as suggested by Stephan.
> 
> 2015-10-01 18:18:21 INFO  JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at 
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 114
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>   at 
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2800) kryo serialization problem

2015-10-06 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944991#comment-14944991
 ] 

Fabian Hueske commented on FLINK-2800:
--

I tried to reproduce the issue by crossing two small data sets with generic 
types (serialized using Kryo) but that worked without problems.
Do you register any Kryo types in the ExecutionConfig?


> kryo serialization problem
> --
>
> Key: FLINK-2800
> URL: https://issues.apache.org/jira/browse/FLINK-2800
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.0
> Environment: linux ubuntu 12.04 LTS, Java 7
>Reporter: Stefano Bortoli
>
> Performing a cross of two dataset of POJOs I have got the exception below. 
> The first time I run the process, there was no problem. When I run it the 
> second time, I have got the exception. My guess is that it could be a race 
> condition related to the reuse of the Kryo serializer object. However, it 
> could also be "a bug where type registrations are not properly forwarded to 
> all Serializers", as suggested by Stephan.
> 
> 2015-10-01 18:18:21 INFO  JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at 
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 114
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>   at 
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2800) kryo serialization problem

2015-10-06 Thread Stefano Bortoli (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14945094#comment-14945094
 ] 

Stefano Bortoli commented on FLINK-2800:


Within the combine function I use anyway a Kryo serializer as part of the BSON 
object is a byte[] containing a serialized object. Do you think it can cause a 
problem?

> kryo serialization problem
> --
>
> Key: FLINK-2800
> URL: https://issues.apache.org/jira/browse/FLINK-2800
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.0
> Environment: linux ubuntu 12.04 LTS, Java 7
>Reporter: Stefano Bortoli
>
> Performing a cross of two dataset of POJOs I have got the exception below. 
> The first time I run the process, there was no problem. When I run it the 
> second time, I have got the exception. My guess is that it could be a race 
> condition related to the reuse of the Kryo serializer object. However, it 
> could also be "a bug where type registrations are not properly forwarded to 
> all Serializers", as suggested by Stephan.
> 
> 2015-10-01 18:18:21 INFO  JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at 
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 114
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>   at 
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2800) kryo serialization problem

2015-10-02 Thread Stefano Bortoli (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14940984#comment-14940984
 ] 

Stefano Bortoli commented on FLINK-2800:


I don't know whether it is the same issue, but after switching from my POJOs to 
BSONObject I have got a race condition with kryo serialization:

2015-10-02 11:55:26 INFO  JobClient:161 - 10/02/2015 11:55:26Cross(Cross at 
main(FlinkMongoHadoop2LinkPOI2CDA.java:138))(4/4) switched to FAILED
java.lang.IndexOutOfBoundsException: Index: 112, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:635)
at java.util.ArrayList.get(ArrayList.java:411)
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at 
org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
at 
org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
at 
org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
at java.lang.Thread.run(Thread.java:745)

> kryo serialization problem
> --
>
> Key: FLINK-2800
> URL: https://issues.apache.org/jira/browse/FLINK-2800
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: master
> Environment: linux ubuntu 12.04 LTS, Java 7
>Reporter: Stefano Bortoli
>
> Performing a cross of two dataset of POJOs I have got the exception below. 
> The first time I run the process, there was no problem. When I run it the 
> second time, I have got the exception. My guess is that it could be a race 
> condition related to the reuse of the Kryo serializer object. However, it 
> could also be "a bug where type registrations are not properly forwarded to 
> all Serializers", as suggested by Stephan.
> 
> 2015-10-01 18:18:21 INFO  JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at 
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 114
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>   at 
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>   at 
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)