After a second look to KryoSerializer I fear that Input and Output are never closed..am I right?
On Tue, Jun 7, 2016 at 3:06 PM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > Hi Aljoscha, > of course I can :) > Thanks for helping me..do you think it is the right thing to do calling > reset()? > Actually, I don't know whether this is meaningful or not, but I already > ran the job successfully once on the cluster (a second attempt is curerntly > running) after my accidental modification to the KryoException handling in > the KryoSerializer.deserialize()... > My intention was to reset the input buffer calling the clear() method on > it so I copied the line from above but I forgot to change the variable so I > called output.clear() instead of input.reset()... > For this reason I say that I don't know if this is meaningful or not... > > > On Tue, Jun 7, 2016 at 2:50 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> That's nice. Can you try it on your cluster with an added "reset" call on >> the buffer? >> >> On Tue, 7 Jun 2016 at 14:35 Flavio Pompermaier <pomperma...@okkam.it> >> wrote: >> >>> After "some" digging into this problem I'm quite convinced that the >>> problem is caused by a missing reset of the buffer during the Kryo >>> deserialization, >>> likewise to what has been fixed by FLINK-2800 ( >>> https://github.com/apache/flink/pull/1308/files). >>> That fix added an output.clear() in theKryoException handling in >>> KryoSerializer.serialize() but, for the deserialization part there's no >>> such a call for the Input/NoFetchingInput object (there's a reset() method >>> but I don't know whether it is the right one to call..). >>> Do you think that's reasonable? >>> Could someone help me in writing a test to see whether this situation is >>> correctly handled by Flink? >>> I saw for example that in KryoGenericTypeSerializerTest there's a test >>> to test the EOFException triggered by the deserialization but it doesn't >>> test what happens making another call to the serializer after such >>> Exception occurs (and thus check whether the buffers are correctly cleared >>> or not). >>> I'll try to start my testing part from there for the moment if anybody >>> has no objections.. >>> >>> Best, >>> Flavio >>> >>> >>> On Mon, Jun 6, 2016 at 4:08 PM, Ufuk Celebi <u...@apache.org> wrote: >>> >>>> Unless someone really invests time into debugging this I fear that the >>>> different misspellings are not really helpful, Flavio. >>>> >>>> On Mon, Jun 6, 2016 at 10:31 AM, Flavio Pompermaier >>>> <pomperma...@okkam.it> wrote: >>>> > This time I had the following exception (obviously >>>> > it.okkam.flinj.model.pojo.TipoSoggetto should be >>>> > it.okkam.flink.model.pojo.TipoSoggetto). >>>> > >>>> > java.lang.RuntimeException: Cannot instantiate class. >>>> > at >>>> > >>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407) >>>> > at >>>> > >>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>> > at >>>> > >>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >>>> > at >>>> > >>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>>> > at >>>> > >>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>>> > at >>>> > >>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) >>>> > at >>>> > >>>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) >>>> > at >>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >>>> > at >>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>> > at java.lang.Thread.run(Thread.java:745) >>>> > Caused by: java.lang.ClassNotFoundException: >>>> > it.okkam.flinj.model.pojo.TipoSoggetto >>>> > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>> > at java.lang.Class.forName0(Native Method) >>>> > at java.lang.Class.forName(Class.java:348) >>>> > at >>>> > >>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405) >>>> > ... 10 more >>>> > >>>> > >>>> > >>>> > On Wed, Jun 1, 2016 at 5:44 PM, Flavio Pompermaier < >>>> pomperma...@okkam.it> >>>> > wrote: >>>> >> >>>> >> The last week I've been able to run the job several times without any >>>> >> error. then I just recompiled it and the error reappered :( >>>> >> This time I had: >>>> >> >>>> >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup >>>> (CoGroup >>>> >> at main(DataInference.java:372)) -> Map (Map at >>>> >> writeEntitonPojos(ParquetThriftEntitons.java:170))' , caused an >>>> error: Error >>>> >> obtaining the sorted input: Thread 'SortMerger Reading Thread' >>>> terminated >>>> >> due to an exception: Serializer consumed more bytes than the record >>>> had. >>>> >> This indicates broken serialization. If you are using custom >>>> serialization >>>> >> types (Value or Writable), check their serialization methods. If you >>>> are >>>> >> using a Kryo-serialized type, check the corresponding Kryo >>>> serializer. >>>> >> at >>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>> >> at java.lang.Thread.run(Thread.java:745) >>>> >> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>> input: >>>> >> Thread 'SortMerger Reading Thread' terminated due to an exception: >>>> >> Serializer consumed more bytes than the record had. This indicates >>>> broken >>>> >> serialization. If you are using custom serialization types (Value or >>>> >> Writable), check their serialization methods. If you are using a >>>> >> Kryo-serialized type, check the corresponding Kryo serializer. >>>> >> at >>>> >> >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98) >>>> >> at >>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >>>> >> ... 3 more >>>> >> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>>> >> terminated due to an exception: Serializer consumed more bytes than >>>> the >>>> >> record had. This indicates broken serialization. If you are using >>>> custom >>>> >> serialization types (Value or Writable), check their serialization >>>> methods. >>>> >> If you are using a Kryo-serialized type, check the corresponding Kryo >>>> >> serializer. >>>> >> at >>>> >> >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) >>>> >> Caused by: java.io.IOException: Serializer consumed more bytes than >>>> the >>>> >> record had. This indicates broken serialization. If you are using >>>> custom >>>> >> serialization types (Value or Writable), check their serialization >>>> methods. >>>> >> If you are using a Kryo-serialized type, check the corresponding Kryo >>>> >> serializer. >>>> >> at >>>> >> >>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>>> >> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 >>>> >> at java.util.ArrayList.elementData(ArrayList.java:418) >>>> >> at java.util.ArrayList.get(ArrayList.java:431) >>>> >> at >>>> >> >>>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) >>>> >> at >>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) >>>> >> at >>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) >>>> >> at >>>> >> >>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) >>>> >> at >>>> >> >>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) >>>> >> at >>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>>> >> at >>>> >> >>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >>>> >> at >>>> >> >>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242) >>>> >> at >>>> >> >>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501) >>>> >> at >>>> >> >>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) >>>> >> at >>>> >> >>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >>>> >> ... 5 more >>>> >> >>>> >> >>>> >> I can't really find a way to understand what is causing the error :( >>>> >> >>>> >> >>>> >> On Mon, May 30, 2016 at 12:25 PM, Robert Metzger < >>>> rmetz...@apache.org> >>>> >> wrote: >>>> >>> >>>> >>> Hi Flavio, >>>> >>> >>>> >>> can you privately share the source code of your Flink job with me? >>>> >>> >>>> >>> I'm wondering whether the issue might be caused by a version mixup >>>> >>> between different versions on the cluster (different JVM versions? >>>> or >>>> >>> different files in the lib/ folder?), How are you deploying the >>>> Flink job? >>>> >>> >>>> >>> Regards, >>>> >>> Robert >>>> >>> >>>> >>> >>>> >>> On Mon, May 30, 2016 at 11:33 AM, Flavio Pompermaier >>>> >>> <pomperma...@okkam.it> wrote: >>>> >>>> >>>> >>>> I tried to reproduce the error on a subset of the data and actually >>>> >>>> reducing the available memory and increasing a lot the gc >>>> (creating a lot of >>>> >>>> useless objects in one of the first UDFs) caused this error: >>>> >>>> >>>> >>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' >>>> >>>> terminated due to an exception: / by zero >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) >>>> >>>> Caused by: java.lang.ArithmeticException: / by zero >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.getSegmentsForReaders(UnilateralSortMerger.java:1651) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.mergeChannelList(UnilateralSortMerger.java:1565) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1417) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>>> >>>> >>>> >>>> I hope this could help to restrict the debugging area :) >>>> >>>> >>>> >>>> Best, >>>> >>>> Flavio >>>> >>>> >>>> >>>> On Fri, May 27, 2016 at 8:21 PM, Stephan Ewen <se...@apache.org> >>>> wrote: >>>> >>>>> >>>> >>>>> Hi! >>>> >>>>> >>>> >>>>> That is a pretty thing indeed :-) Will try to look into this in a >>>> few >>>> >>>>> days... >>>> >>>>> >>>> >>>>> Stephan >>>> >>>>> >>>> >>>>> >>>> >>>>> On Fri, May 27, 2016 at 12:10 PM, Flavio Pompermaier >>>> >>>>> <pomperma...@okkam.it> wrote: >>>> >>>>>> >>>> >>>>>> Running the job with log level set to DEBUG made the job run >>>> >>>>>> successfully...Is this meaningful..? Maybe slowing down a little >>>> bit the >>>> >>>>>> threads could help serialization? >>>> >>>>>> >>>> >>>>>> >>>> >>>>>> On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier >>>> >>>>>> <pomperma...@okkam.it> wrote: >>>> >>>>>>> >>>> >>>>>>> Still not able to reproduce the error locally but remotly :) >>>> >>>>>>> Any suggestions about how to try to reproduce it locally on a >>>> subset >>>> >>>>>>> of the data? >>>> >>>>>>> This time I had: >>>> >>>>>>> >>>> >>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: >>>> ^Z^A >>>> >>>>>>> at >>>> >>>>>>> >>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >>>> >>>>>>> at >>>> >>>>>>> >>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >>>> >>>>>>> at >>>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>> >>>>>>> at >>>> >>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>>> >>>>>>> at >>>> >>>>>>> >>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >>>> >>>>>>> at >>>> >>>>>>> >>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431) >>>> >>>>>>> at >>>> >>>>>>> >>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>> >>>>>>> at >>>> >>>>>>> >>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >>>> >>>>>>> at >>>> >>>>>>> >>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>>> >>>>>>> at >>>> >>>>>>> >>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>>> >>>>>>> at >>>> >>>>>>> >>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) >>>> >>>>>>> at >>>> >>>>>>> >>>> org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96) >>>> >>>>>>> at >>>> >>>>>>> >>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >>>> >>>>>>> at >>>> >>>>>>> >>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>> >>>>>>> at >>>> >>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>> >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>>>>> >>>> >>>>>>> Best, >>>> >>>>>>> Flavio >>>> >>>>>>> >>>> >>>>>>> >>>> >>>>>>> On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier >>>> >>>>>>> <pomperma...@okkam.it> wrote: >>>> >>>>>>>> >>>> >>>>>>>> Do you have any suggestion about how to reproduce the error on >>>> a >>>> >>>>>>>> subset of the data? >>>> >>>>>>>> I'm trying changing the following but I can't find a >>>> configuration >>>> >>>>>>>> causing the error :( >>>> >>>>>>>> >>>> >>>>>>>> rivate static ExecutionEnvironment getLocalExecutionEnv() { >>>> >>>>>>>> org.apache.flink.configuration.Configuration c = new >>>> >>>>>>>> org.apache.flink.configuration.Configuration(); >>>> >>>>>>>> c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, >>>> >>>>>>>> "/tmp"); >>>> >>>>>>>> >>>> >>>>>>>> c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp"); >>>> >>>>>>>> >>>> c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, >>>> >>>>>>>> 0.9f); >>>> >>>>>>>> c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, >>>> 4); >>>> >>>>>>>> c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, >>>> 4); >>>> >>>>>>>> c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 >>>> s"); >>>> >>>>>>>> >>>> >>>>>>>> >>>> c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 12); >>>> >>>>>>>> ExecutionEnvironment env = >>>> >>>>>>>> ExecutionEnvironment.createLocalEnvironment(c); >>>> >>>>>>>> env.setParallelism(16); >>>> >>>>>>>> env.registerTypeWithKryoSerializer(DateTime.class, >>>> >>>>>>>> JodaDateTimeSerializer.class ); >>>> >>>>>>>> return env; >>>> >>>>>>>> } >>>> >>>>>>>> >>>> >>>>>>>> Best, >>>> >>>>>>>> Flavio >>>> >>>>>>>> >>>> >>>>>>>> >>>> >>>>>>>> On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann >>>> >>>>>>>> <trohrm...@apache.org> wrote: >>>> >>>>>>>>> >>>> >>>>>>>>> The error look really strange. Flavio, could you compile a >>>> test >>>> >>>>>>>>> program with example data and configuration to reproduce the >>>> problem. Given >>>> >>>>>>>>> that, we could try to debug the problem. >>>> >>>>>>>>> >>>> >>>>>>>>> Cheers, >>>> >>>>>>>>> Till >>>> >>>>>>>> >>>> >>>>>>>> >>>> >>>>>>> >>>> >>>>>> >>>> >>>>> >>>> >>>> >>>> >>> >>>> >> >>>> > >>>> >>> >>> >