After "some" digging into this problem I'm quite convinced that the problem
is caused by a missing reset of the buffer during the Kryo deserialization,
likewise to what has been fixed by FLINK-2800 (
https://github.com/apache/flink/pull/1308/files).
That fix added an output.clear() in theKryoException handling in
KryoSerializer.serialize() but, for the deserialization part there's no
such a call for the Input/NoFetchingInput object (there's a reset() method
but I don't know whether it is the right one to call..).
Do you think that's reasonable?
Could someone help me in writing a test to see whether this situation is
correctly handled by Flink?
I saw for example that in KryoGenericTypeSerializerTest there's a test to
test the EOFException triggered by the deserialization but it doesn't test
what happens making another call to the serializer after such Exception
occurs (and thus check whether the buffers are correctly cleared or not).
I'll try to start my testing part from there for the moment if anybody has
no objections..

Best,
Flavio

On Mon, Jun 6, 2016 at 4:08 PM, Ufuk Celebi <u...@apache.org> wrote:

> Unless someone really invests time into debugging this I fear that the
> different misspellings are not really helpful, Flavio.
>
> On Mon, Jun 6, 2016 at 10:31 AM, Flavio Pompermaier
> <pomperma...@okkam.it> wrote:
> > This time I had the following exception (obviously
> > it.okkam.flinj.model.pojo.TipoSoggetto should be
> > it.okkam.flink.model.pojo.TipoSoggetto).
> >
> > java.lang.RuntimeException: Cannot instantiate class.
> >       at
> >
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
> >       at
> >
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> >       at
> >
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> >       at
> >
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> >       at
> >
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> >       at
> >
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> >       at
> >
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
> >       at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
> >       at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >       at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.lang.ClassNotFoundException:
> > it.okkam.flinj.model.pojo.TipoSoggetto
> >       at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> >       at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >       at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >       at java.lang.Class.forName0(Native Method)
> >       at java.lang.Class.forName(Class.java:348)
> >       at
> >
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
> >       ... 10 more
> >
> >
> >
> > On Wed, Jun 1, 2016 at 5:44 PM, Flavio Pompermaier <pomperma...@okkam.it
> >
> > wrote:
> >>
> >> The last week I've been able to run the job several times without any
> >> error. then I just recompiled it and the error reappered :(
> >> This time I had:
> >>
> >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup
> (CoGroup
> >> at main(DataInference.java:372)) -> Map (Map at
> >> writeEntitonPojos(ParquetThriftEntitons.java:170))' , caused an error:
> Error
> >> obtaining the sorted input: Thread 'SortMerger Reading Thread'
> terminated
> >> due to an exception: Serializer consumed more bytes than the record had.
> >> This indicates broken serialization. If you are using custom
> serialization
> >> types (Value or Writable), check their serialization methods. If you are
> >> using a Kryo-serialized type, check the corresponding Kryo serializer.
> >>      at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
> >>      at
> >> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> >>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >>      at java.lang.Thread.run(Thread.java:745)
> >> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> >> Thread 'SortMerger Reading Thread' terminated due to an exception:
> >> Serializer consumed more bytes than the record had. This indicates
> broken
> >> serialization. If you are using custom serialization types (Value or
> >> Writable), check their serialization methods. If you are using a
> >> Kryo-serialized type, check the corresponding Kryo serializer.
> >>      at
> >>
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
> >>      at
> >>
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
> >>      at
> >>
> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98)
> >>      at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
> >>      ... 3 more
> >> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> >> terminated due to an exception: Serializer consumed more bytes than the
> >> record had. This indicates broken serialization. If you are using custom
> >> serialization types (Value or Writable), check their serialization
> methods.
> >> If you are using a Kryo-serialized type, check the corresponding Kryo
> >> serializer.
> >>      at
> >>
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
> >> Caused by: java.io.IOException: Serializer consumed more bytes than the
> >> record had. This indicates broken serialization. If you are using custom
> >> serialization types (Value or Writable), check their serialization
> methods.
> >> If you are using a Kryo-serialized type, check the corresponding Kryo
> >> serializer.
> >>      at
> >>
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
> >>      at
> >>
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> >>      at
> >>
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> >>      at
> >>
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
> >>      at
> >>
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
> >>      at
> >>
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> >> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
> >>      at java.util.ArrayList.elementData(ArrayList.java:418)
> >>      at java.util.ArrayList.get(ArrayList.java:431)
> >>      at
> >>
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> >>      at
> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> >>      at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> >>      at
> >>
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
> >>      at
> >>
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
> >>      at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> >>      at
> >>
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
> >>      at
> >>
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
> >>      at
> >>
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501)
> >>      at
> >>
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
> >>      at
> >>
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> >>      at
> >>
> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
> >>      at
> >>
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> >>      ... 5 more
> >>
> >>
> >> I can't really find a way to understand what is causing the error :(
> >>
> >>
> >> On Mon, May 30, 2016 at 12:25 PM, Robert Metzger <rmetz...@apache.org>
> >> wrote:
> >>>
> >>> Hi Flavio,
> >>>
> >>> can you privately share the source code of your Flink job with me?
> >>>
> >>> I'm wondering whether the issue might be caused by a version mixup
> >>> between different versions on the cluster (different JVM versions? or
> >>> different files in the lib/ folder?), How are you deploying the Flink
> job?
> >>>
> >>> Regards,
> >>> Robert
> >>>
> >>>
> >>> On Mon, May 30, 2016 at 11:33 AM, Flavio Pompermaier
> >>> <pomperma...@okkam.it> wrote:
> >>>>
> >>>> I tried to reproduce the error on a subset of the data and actually
> >>>> reducing the available memory and increasing a lot the gc (creating a
> lot of
> >>>> useless objects in one of the first UDFs) caused this error:
> >>>>
> >>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> >>>> terminated due to an exception: / by zero
> >>>> at
> >>>>
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
> >>>> Caused by: java.lang.ArithmeticException: / by zero
> >>>> at
> >>>>
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.getSegmentsForReaders(UnilateralSortMerger.java:1651)
> >>>> at
> >>>>
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.mergeChannelList(UnilateralSortMerger.java:1565)
> >>>> at
> >>>>
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1417)
> >>>> at
> >>>>
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> >>>>
> >>>> I hope this could help to restrict the debugging area :)
> >>>>
> >>>> Best,
> >>>> Flavio
> >>>>
> >>>> On Fri, May 27, 2016 at 8:21 PM, Stephan Ewen <se...@apache.org>
> wrote:
> >>>>>
> >>>>> Hi!
> >>>>>
> >>>>> That is a pretty thing indeed :-) Will try to look into this in a few
> >>>>> days...
> >>>>>
> >>>>> Stephan
> >>>>>
> >>>>>
> >>>>> On Fri, May 27, 2016 at 12:10 PM, Flavio Pompermaier
> >>>>> <pomperma...@okkam.it> wrote:
> >>>>>>
> >>>>>> Running the job with log level set to DEBUG made the job run
> >>>>>> successfully...Is this meaningful..? Maybe slowing down a little
> bit the
> >>>>>> threads could help serialization?
> >>>>>>
> >>>>>>
> >>>>>> On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier
> >>>>>> <pomperma...@okkam.it> wrote:
> >>>>>>>
> >>>>>>> Still not able to reproduce the error locally but remotly :)
> >>>>>>> Any suggestions about how to try to reproduce it locally on a
> subset
> >>>>>>> of the data?
> >>>>>>> This time I had:
> >>>>>>>
> >>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
> >>>>>>>         at
> >>>>>>>
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
> >>>>>>>         at
> >>>>>>>
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
> >>>>>>>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> >>>>>>>         at
> >>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> >>>>>>>         at
> >>>>>>>
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
> >>>>>>>         at
> >>>>>>>
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
> >>>>>>>         at
> >>>>>>>
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> >>>>>>>         at
> >>>>>>>
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> >>>>>>>         at
> >>>>>>>
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> >>>>>>>         at
> >>>>>>>
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> >>>>>>>         at
> >>>>>>>
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> >>>>>>>         at
> >>>>>>> org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
> >>>>>>>         at
> >>>>>>>
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
> >>>>>>>         at
> >>>>>>>
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> >>>>>>>         at
> >>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >>>>>>>         at java.lang.Thread.run(Thread.java:745)
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Flavio
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier
> >>>>>>> <pomperma...@okkam.it> wrote:
> >>>>>>>>
> >>>>>>>> Do you have any suggestion about how to reproduce the error on a
> >>>>>>>> subset of the data?
> >>>>>>>> I'm trying changing the following but I can't find a configuration
> >>>>>>>> causing the error :(
> >>>>>>>>
> >>>>>>>> rivate static ExecutionEnvironment getLocalExecutionEnv() {
> >>>>>>>>         org.apache.flink.configuration.Configuration c = new
> >>>>>>>> org.apache.flink.configuration.Configuration();
> >>>>>>>>         c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
> >>>>>>>> "/tmp");
> >>>>>>>>
> >>>>>>>> c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp");
> >>>>>>>>
>  c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
> >>>>>>>> 0.9f);
> >>>>>>>>         c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
> >>>>>>>>         c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
> >>>>>>>>         c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s");
> >>>>>>>>
> >>>>>>>> c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
> 2048 * 12);
> >>>>>>>>         ExecutionEnvironment env =
> >>>>>>>> ExecutionEnvironment.createLocalEnvironment(c);
> >>>>>>>>         env.setParallelism(16);
> >>>>>>>>         env.registerTypeWithKryoSerializer(DateTime.class,
> >>>>>>>> JodaDateTimeSerializer.class );
> >>>>>>>>         return env;
> >>>>>>>>     }
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Flavio
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann
> >>>>>>>> <trohrm...@apache.org> wrote:
> >>>>>>>>>
> >>>>>>>>> The error look really strange. Flavio, could you compile a test
> >>>>>>>>> program with example data and configuration to reproduce the
> problem. Given
> >>>>>>>>> that, we could try to debug the problem.
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>> Till
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to