That's nice. Can you try it on your cluster with an added "reset" call on the buffer?
On Tue, 7 Jun 2016 at 14:35 Flavio Pompermaier <pomperma...@okkam.it> wrote: > After "some" digging into this problem I'm quite convinced that the > problem is caused by a missing reset of the buffer during the Kryo > deserialization, > likewise to what has been fixed by FLINK-2800 ( > https://github.com/apache/flink/pull/1308/files). > That fix added an output.clear() in theKryoException handling in > KryoSerializer.serialize() but, for the deserialization part there's no > such a call for the Input/NoFetchingInput object (there's a reset() method > but I don't know whether it is the right one to call..). > Do you think that's reasonable? > Could someone help me in writing a test to see whether this situation is > correctly handled by Flink? > I saw for example that in KryoGenericTypeSerializerTest there's a test to > test the EOFException triggered by the deserialization but it doesn't test > what happens making another call to the serializer after such Exception > occurs (and thus check whether the buffers are correctly cleared or not). > I'll try to start my testing part from there for the moment if anybody has > no objections.. > > Best, > Flavio > > > On Mon, Jun 6, 2016 at 4:08 PM, Ufuk Celebi <u...@apache.org> wrote: > >> Unless someone really invests time into debugging this I fear that the >> different misspellings are not really helpful, Flavio. >> >> On Mon, Jun 6, 2016 at 10:31 AM, Flavio Pompermaier >> <pomperma...@okkam.it> wrote: >> > This time I had the following exception (obviously >> > it.okkam.flinj.model.pojo.TipoSoggetto should be >> > it.okkam.flink.model.pojo.TipoSoggetto). >> > >> > java.lang.RuntimeException: Cannot instantiate class. >> > at >> > >> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407) >> > at >> > >> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >> > at >> > >> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >> > at >> > >> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >> > at >> > >> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >> > at >> > >> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) >> > at >> > >> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) >> > at >> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >> > at >> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> > at java.lang.Thread.run(Thread.java:745) >> > Caused by: java.lang.ClassNotFoundException: >> > it.okkam.flinj.model.pojo.TipoSoggetto >> > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >> > at java.lang.Class.forName0(Native Method) >> > at java.lang.Class.forName(Class.java:348) >> > at >> > >> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405) >> > ... 10 more >> > >> > >> > >> > On Wed, Jun 1, 2016 at 5:44 PM, Flavio Pompermaier < >> pomperma...@okkam.it> >> > wrote: >> >> >> >> The last week I've been able to run the job several times without any >> >> error. then I just recompiled it and the error reappered :( >> >> This time I had: >> >> >> >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup >> (CoGroup >> >> at main(DataInference.java:372)) -> Map (Map at >> >> writeEntitonPojos(ParquetThriftEntitons.java:170))' , caused an error: >> Error >> >> obtaining the sorted input: Thread 'SortMerger Reading Thread' >> terminated >> >> due to an exception: Serializer consumed more bytes than the record >> had. >> >> This indicates broken serialization. If you are using custom >> serialization >> >> types (Value or Writable), check their serialization methods. If you >> are >> >> using a Kryo-serialized type, check the corresponding Kryo serializer. >> >> at >> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456) >> >> at >> >> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> >> at java.lang.Thread.run(Thread.java:745) >> >> Caused by: java.lang.RuntimeException: Error obtaining the sorted >> input: >> >> Thread 'SortMerger Reading Thread' terminated due to an exception: >> >> Serializer consumed more bytes than the record had. This indicates >> broken >> >> serialization. If you are using custom serialization types (Value or >> >> Writable), check their serialization methods. If you are using a >> >> Kryo-serialized type, check the corresponding Kryo serializer. >> >> at >> >> >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >> >> at >> >> >> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) >> >> at >> >> >> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98) >> >> at >> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >> >> ... 3 more >> >> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >> >> terminated due to an exception: Serializer consumed more bytes than the >> >> record had. This indicates broken serialization. If you are using >> custom >> >> serialization types (Value or Writable), check their serialization >> methods. >> >> If you are using a Kryo-serialized type, check the corresponding Kryo >> >> serializer. >> >> at >> >> >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) >> >> Caused by: java.io.IOException: Serializer consumed more bytes than the >> >> record had. This indicates broken serialization. If you are using >> custom >> >> serialization types (Value or Writable), check their serialization >> methods. >> >> If you are using a Kryo-serialized type, check the corresponding Kryo >> >> serializer. >> >> at >> >> >> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142) >> >> at >> >> >> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >> >> at >> >> >> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >> >> at >> >> >> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) >> >> at >> >> >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) >> >> at >> >> >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >> >> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 >> >> at java.util.ArrayList.elementData(ArrayList.java:418) >> >> at java.util.ArrayList.get(ArrayList.java:431) >> >> at >> >> >> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) >> >> at >> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) >> >> at >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) >> >> at >> >> >> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) >> >> at >> >> >> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) >> >> at >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >> >> at >> >> >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >> >> at >> >> >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242) >> >> at >> >> >> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501) >> >> at >> >> >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) >> >> at >> >> >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >> >> at >> >> >> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) >> >> at >> >> >> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >> >> ... 5 more >> >> >> >> >> >> I can't really find a way to understand what is causing the error :( >> >> >> >> >> >> On Mon, May 30, 2016 at 12:25 PM, Robert Metzger <rmetz...@apache.org> >> >> wrote: >> >>> >> >>> Hi Flavio, >> >>> >> >>> can you privately share the source code of your Flink job with me? >> >>> >> >>> I'm wondering whether the issue might be caused by a version mixup >> >>> between different versions on the cluster (different JVM versions? or >> >>> different files in the lib/ folder?), How are you deploying the Flink >> job? >> >>> >> >>> Regards, >> >>> Robert >> >>> >> >>> >> >>> On Mon, May 30, 2016 at 11:33 AM, Flavio Pompermaier >> >>> <pomperma...@okkam.it> wrote: >> >>>> >> >>>> I tried to reproduce the error on a subset of the data and actually >> >>>> reducing the available memory and increasing a lot the gc (creating >> a lot of >> >>>> useless objects in one of the first UDFs) caused this error: >> >>>> >> >>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' >> >>>> terminated due to an exception: / by zero >> >>>> at >> >>>> >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) >> >>>> Caused by: java.lang.ArithmeticException: / by zero >> >>>> at >> >>>> >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.getSegmentsForReaders(UnilateralSortMerger.java:1651) >> >>>> at >> >>>> >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.mergeChannelList(UnilateralSortMerger.java:1565) >> >>>> at >> >>>> >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1417) >> >>>> at >> >>>> >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >> >>>> >> >>>> I hope this could help to restrict the debugging area :) >> >>>> >> >>>> Best, >> >>>> Flavio >> >>>> >> >>>> On Fri, May 27, 2016 at 8:21 PM, Stephan Ewen <se...@apache.org> >> wrote: >> >>>>> >> >>>>> Hi! >> >>>>> >> >>>>> That is a pretty thing indeed :-) Will try to look into this in a >> few >> >>>>> days... >> >>>>> >> >>>>> Stephan >> >>>>> >> >>>>> >> >>>>> On Fri, May 27, 2016 at 12:10 PM, Flavio Pompermaier >> >>>>> <pomperma...@okkam.it> wrote: >> >>>>>> >> >>>>>> Running the job with log level set to DEBUG made the job run >> >>>>>> successfully...Is this meaningful..? Maybe slowing down a little >> bit the >> >>>>>> threads could help serialization? >> >>>>>> >> >>>>>> >> >>>>>> On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier >> >>>>>> <pomperma...@okkam.it> wrote: >> >>>>>>> >> >>>>>>> Still not able to reproduce the error locally but remotly :) >> >>>>>>> Any suggestions about how to try to reproduce it locally on a >> subset >> >>>>>>> of the data? >> >>>>>>> This time I had: >> >>>>>>> >> >>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: >> ^Z^A >> >>>>>>> at >> >>>>>>> >> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >> >>>>>>> at >> >>>>>>> >> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >> >>>>>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >> >>>>>>> at >> >>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >> >>>>>>> at >> >>>>>>> >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >> >>>>>>> at >> >>>>>>> >> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431) >> >>>>>>> at >> >>>>>>> >> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >> >>>>>>> at >> >>>>>>> >> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >> >>>>>>> at >> >>>>>>> >> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >> >>>>>>> at >> >>>>>>> >> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >> >>>>>>> at >> >>>>>>> >> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) >> >>>>>>> at >> >>>>>>> >> org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96) >> >>>>>>> at >> >>>>>>> >> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >> >>>>>>> at >> >>>>>>> >> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >> >>>>>>> at >> >>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> >>>>>>> at java.lang.Thread.run(Thread.java:745) >> >>>>>>> >> >>>>>>> Best, >> >>>>>>> Flavio >> >>>>>>> >> >>>>>>> >> >>>>>>> On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier >> >>>>>>> <pomperma...@okkam.it> wrote: >> >>>>>>>> >> >>>>>>>> Do you have any suggestion about how to reproduce the error on a >> >>>>>>>> subset of the data? >> >>>>>>>> I'm trying changing the following but I can't find a >> configuration >> >>>>>>>> causing the error :( >> >>>>>>>> >> >>>>>>>> rivate static ExecutionEnvironment getLocalExecutionEnv() { >> >>>>>>>> org.apache.flink.configuration.Configuration c = new >> >>>>>>>> org.apache.flink.configuration.Configuration(); >> >>>>>>>> c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, >> >>>>>>>> "/tmp"); >> >>>>>>>> >> >>>>>>>> c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp"); >> >>>>>>>> >> c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, >> >>>>>>>> 0.9f); >> >>>>>>>> c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); >> >>>>>>>> c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, >> 4); >> >>>>>>>> c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s"); >> >>>>>>>> >> >>>>>>>> c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, >> 2048 * 12); >> >>>>>>>> ExecutionEnvironment env = >> >>>>>>>> ExecutionEnvironment.createLocalEnvironment(c); >> >>>>>>>> env.setParallelism(16); >> >>>>>>>> env.registerTypeWithKryoSerializer(DateTime.class, >> >>>>>>>> JodaDateTimeSerializer.class ); >> >>>>>>>> return env; >> >>>>>>>> } >> >>>>>>>> >> >>>>>>>> Best, >> >>>>>>>> Flavio >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann >> >>>>>>>> <trohrm...@apache.org> wrote: >> >>>>>>>>> >> >>>>>>>>> The error look really strange. Flavio, could you compile a test >> >>>>>>>>> program with example data and configuration to reproduce the >> problem. Given >> >>>>>>>>> that, we could try to debug the problem. >> >>>>>>>>> >> >>>>>>>>> Cheers, >> >>>>>>>>> Till >> >>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>> >> >> >> > >> > >