That's nice. Can you try it on your cluster with an added "reset" call on
the buffer?

On Tue, 7 Jun 2016 at 14:35 Flavio Pompermaier <pomperma...@okkam.it> wrote:

> After "some" digging into this problem I'm quite convinced that the
> problem is caused by a missing reset of the buffer during the Kryo
> deserialization,
> likewise to what has been fixed by FLINK-2800 (
> https://github.com/apache/flink/pull/1308/files).
> That fix added an output.clear() in theKryoException handling in
> KryoSerializer.serialize() but, for the deserialization part there's no
> such a call for the Input/NoFetchingInput object (there's a reset() method
> but I don't know whether it is the right one to call..).
> Do you think that's reasonable?
> Could someone help me in writing a test to see whether this situation is
> correctly handled by Flink?
> I saw for example that in KryoGenericTypeSerializerTest there's a test to
> test the EOFException triggered by the deserialization but it doesn't test
> what happens making another call to the serializer after such Exception
> occurs (and thus check whether the buffers are correctly cleared or not).
> I'll try to start my testing part from there for the moment if anybody has
> no objections..
>
> Best,
> Flavio
>
>
> On Mon, Jun 6, 2016 at 4:08 PM, Ufuk Celebi <u...@apache.org> wrote:
>
>> Unless someone really invests time into debugging this I fear that the
>> different misspellings are not really helpful, Flavio.
>>
>> On Mon, Jun 6, 2016 at 10:31 AM, Flavio Pompermaier
>> <pomperma...@okkam.it> wrote:
>> > This time I had the following exception (obviously
>> > it.okkam.flinj.model.pojo.TipoSoggetto should be
>> > it.okkam.flink.model.pojo.TipoSoggetto).
>> >
>> > java.lang.RuntimeException: Cannot instantiate class.
>> >       at
>> >
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
>> >       at
>> >
>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>> >       at
>> >
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>> >       at
>> >
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>> >       at
>> >
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>> >       at
>> >
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>> >       at
>> >
>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>> >       at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>> >       at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> >       at java.lang.Thread.run(Thread.java:745)
>> > Caused by: java.lang.ClassNotFoundException:
>> > it.okkam.flinj.model.pojo.TipoSoggetto
>> >       at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> >       at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> >       at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> >       at java.lang.Class.forName0(Native Method)
>> >       at java.lang.Class.forName(Class.java:348)
>> >       at
>> >
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
>> >       ... 10 more
>> >
>> >
>> >
>> > On Wed, Jun 1, 2016 at 5:44 PM, Flavio Pompermaier <
>> pomperma...@okkam.it>
>> > wrote:
>> >>
>> >> The last week I've been able to run the job several times without any
>> >> error. then I just recompiled it and the error reappered :(
>> >> This time I had:
>> >>
>> >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup
>> (CoGroup
>> >> at main(DataInference.java:372)) -> Map (Map at
>> >> writeEntitonPojos(ParquetThriftEntitons.java:170))' , caused an error:
>> Error
>> >> obtaining the sorted input: Thread 'SortMerger Reading Thread'
>> terminated
>> >> due to an exception: Serializer consumed more bytes than the record
>> had.
>> >> This indicates broken serialization. If you are using custom
>> serialization
>> >> types (Value or Writable), check their serialization methods. If you
>> are
>> >> using a Kryo-serialized type, check the corresponding Kryo serializer.
>> >>      at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>> >>      at
>> >> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> >>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> >>      at java.lang.Thread.run(Thread.java:745)
>> >> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>> input:
>> >> Thread 'SortMerger Reading Thread' terminated due to an exception:
>> >> Serializer consumed more bytes than the record had. This indicates
>> broken
>> >> serialization. If you are using custom serialization types (Value or
>> >> Writable), check their serialization methods. If you are using a
>> >> Kryo-serialized type, check the corresponding Kryo serializer.
>> >>      at
>> >>
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>> >>      at
>> >>
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>> >>      at
>> >>
>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98)
>> >>      at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>> >>      ... 3 more
>> >> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> >> terminated due to an exception: Serializer consumed more bytes than the
>> >> record had. This indicates broken serialization. If you are using
>> custom
>> >> serialization types (Value or Writable), check their serialization
>> methods.
>> >> If you are using a Kryo-serialized type, check the corresponding Kryo
>> >> serializer.
>> >>      at
>> >>
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>> >> Caused by: java.io.IOException: Serializer consumed more bytes than the
>> >> record had. This indicates broken serialization. If you are using
>> custom
>> >> serialization types (Value or Writable), check their serialization
>> methods.
>> >> If you are using a Kryo-serialized type, check the corresponding Kryo
>> >> serializer.
>> >>      at
>> >>
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>> >>      at
>> >>
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>> >>      at
>> >>
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>> >>      at
>> >>
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>> >>      at
>> >>
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>> >>      at
>> >>
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>> >> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>> >>      at java.util.ArrayList.elementData(ArrayList.java:418)
>> >>      at java.util.ArrayList.get(ArrayList.java:431)
>> >>      at
>> >>
>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>> >>      at
>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>> >>      at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>> >>      at
>> >>
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>> >>      at
>> >>
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>> >>      at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> >>      at
>> >>
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>> >>      at
>> >>
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>> >>      at
>> >>
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501)
>> >>      at
>> >>
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>> >>      at
>> >>
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> >>      at
>> >>
>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>> >>      at
>> >>
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>> >>      ... 5 more
>> >>
>> >>
>> >> I can't really find a way to understand what is causing the error :(
>> >>
>> >>
>> >> On Mon, May 30, 2016 at 12:25 PM, Robert Metzger <rmetz...@apache.org>
>> >> wrote:
>> >>>
>> >>> Hi Flavio,
>> >>>
>> >>> can you privately share the source code of your Flink job with me?
>> >>>
>> >>> I'm wondering whether the issue might be caused by a version mixup
>> >>> between different versions on the cluster (different JVM versions? or
>> >>> different files in the lib/ folder?), How are you deploying the Flink
>> job?
>> >>>
>> >>> Regards,
>> >>> Robert
>> >>>
>> >>>
>> >>> On Mon, May 30, 2016 at 11:33 AM, Flavio Pompermaier
>> >>> <pomperma...@okkam.it> wrote:
>> >>>>
>> >>>> I tried to reproduce the error on a subset of the data and actually
>> >>>> reducing the available memory and increasing a lot the gc (creating
>> a lot of
>> >>>> useless objects in one of the first UDFs) caused this error:
>> >>>>
>> >>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>> >>>> terminated due to an exception: / by zero
>> >>>> at
>> >>>>
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>> >>>> Caused by: java.lang.ArithmeticException: / by zero
>> >>>> at
>> >>>>
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.getSegmentsForReaders(UnilateralSortMerger.java:1651)
>> >>>> at
>> >>>>
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.mergeChannelList(UnilateralSortMerger.java:1565)
>> >>>> at
>> >>>>
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1417)
>> >>>> at
>> >>>>
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>> >>>>
>> >>>> I hope this could help to restrict the debugging area :)
>> >>>>
>> >>>> Best,
>> >>>> Flavio
>> >>>>
>> >>>> On Fri, May 27, 2016 at 8:21 PM, Stephan Ewen <se...@apache.org>
>> wrote:
>> >>>>>
>> >>>>> Hi!
>> >>>>>
>> >>>>> That is a pretty thing indeed :-) Will try to look into this in a
>> few
>> >>>>> days...
>> >>>>>
>> >>>>> Stephan
>> >>>>>
>> >>>>>
>> >>>>> On Fri, May 27, 2016 at 12:10 PM, Flavio Pompermaier
>> >>>>> <pomperma...@okkam.it> wrote:
>> >>>>>>
>> >>>>>> Running the job with log level set to DEBUG made the job run
>> >>>>>> successfully...Is this meaningful..? Maybe slowing down a little
>> bit the
>> >>>>>> threads could help serialization?
>> >>>>>>
>> >>>>>>
>> >>>>>> On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier
>> >>>>>> <pomperma...@okkam.it> wrote:
>> >>>>>>>
>> >>>>>>> Still not able to reproduce the error locally but remotly :)
>> >>>>>>> Any suggestions about how to try to reproduce it locally on a
>> subset
>> >>>>>>> of the data?
>> >>>>>>> This time I had:
>> >>>>>>>
>> >>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class:
>> ^Z^A
>> >>>>>>>         at
>> >>>>>>>
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>> >>>>>>>         at
>> >>>>>>>
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>> >>>>>>>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>> >>>>>>>         at
>> >>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>> >>>>>>>         at
>> >>>>>>>
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>> >>>>>>>         at
>> >>>>>>>
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>> >>>>>>>         at
>> >>>>>>>
>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>> >>>>>>>         at
>> >>>>>>>
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>> >>>>>>>         at
>> >>>>>>>
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>> >>>>>>>         at
>> >>>>>>>
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>> >>>>>>>         at
>> >>>>>>>
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>> >>>>>>>         at
>> >>>>>>>
>> org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
>> >>>>>>>         at
>> >>>>>>>
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>> >>>>>>>         at
>> >>>>>>>
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> >>>>>>>         at
>> >>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> >>>>>>>         at java.lang.Thread.run(Thread.java:745)
>> >>>>>>>
>> >>>>>>> Best,
>> >>>>>>> Flavio
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier
>> >>>>>>> <pomperma...@okkam.it> wrote:
>> >>>>>>>>
>> >>>>>>>> Do you have any suggestion about how to reproduce the error on a
>> >>>>>>>> subset of the data?
>> >>>>>>>> I'm trying changing the following but I can't find a
>> configuration
>> >>>>>>>> causing the error :(
>> >>>>>>>>
>> >>>>>>>> rivate static ExecutionEnvironment getLocalExecutionEnv() {
>> >>>>>>>>         org.apache.flink.configuration.Configuration c = new
>> >>>>>>>> org.apache.flink.configuration.Configuration();
>> >>>>>>>>         c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
>> >>>>>>>> "/tmp");
>> >>>>>>>>
>> >>>>>>>> c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp");
>> >>>>>>>>
>>  c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
>> >>>>>>>> 0.9f);
>> >>>>>>>>         c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
>> >>>>>>>>         c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
>> 4);
>> >>>>>>>>         c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s");
>> >>>>>>>>
>> >>>>>>>> c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
>> 2048 * 12);
>> >>>>>>>>         ExecutionEnvironment env =
>> >>>>>>>> ExecutionEnvironment.createLocalEnvironment(c);
>> >>>>>>>>         env.setParallelism(16);
>> >>>>>>>>         env.registerTypeWithKryoSerializer(DateTime.class,
>> >>>>>>>> JodaDateTimeSerializer.class );
>> >>>>>>>>         return env;
>> >>>>>>>>     }
>> >>>>>>>>
>> >>>>>>>> Best,
>> >>>>>>>> Flavio
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann
>> >>>>>>>> <trohrm...@apache.org> wrote:
>> >>>>>>>>>
>> >>>>>>>>> The error look really strange. Flavio, could you compile a test
>> >>>>>>>>> program with example data and configuration to reproduce the
>> problem. Given
>> >>>>>>>>> that, we could try to debug the problem.
>> >>>>>>>>>
>> >>>>>>>>> Cheers,
>> >>>>>>>>> Till
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>
>

Reply via email to