After a second look to KryoSerializer I fear that Input and Output are
never closed..am I right?

On Tue, Jun 7, 2016 at 3:06 PM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> Hi Aljoscha,
> of course I can :)
> Thanks for helping me..do you think it is the right thing to do calling
> reset()?
> Actually, I don't know whether this is meaningful or not, but I already
> ran the job successfully once on the cluster (a second attempt is curerntly
> running) after my accidental modification to the KryoException handling in
> the KryoSerializer.deserialize()...
> My intention was to reset the input buffer calling the clear() method on
> it so I copied the line from above but I forgot to change the variable so I
> called output.clear() instead of input.reset()...
> For this reason I say that I don't know if this is meaningful or not...
>
>
> On Tue, Jun 7, 2016 at 2:50 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> That's nice. Can you try it on your cluster with an added "reset" call on
>> the buffer?
>>
>> On Tue, 7 Jun 2016 at 14:35 Flavio Pompermaier <pomperma...@okkam.it>
>> wrote:
>>
>>> After "some" digging into this problem I'm quite convinced that the
>>> problem is caused by a missing reset of the buffer during the Kryo
>>> deserialization,
>>> likewise to what has been fixed by FLINK-2800 (
>>> https://github.com/apache/flink/pull/1308/files).
>>> That fix added an output.clear() in theKryoException handling in
>>> KryoSerializer.serialize() but, for the deserialization part there's no
>>> such a call for the Input/NoFetchingInput object (there's a reset() method
>>> but I don't know whether it is the right one to call..).
>>> Do you think that's reasonable?
>>> Could someone help me in writing a test to see whether this situation is
>>> correctly handled by Flink?
>>> I saw for example that in KryoGenericTypeSerializerTest there's a test
>>> to test the EOFException triggered by the deserialization but it doesn't
>>> test what happens making another call to the serializer after such
>>> Exception occurs (and thus check whether the buffers are correctly cleared
>>> or not).
>>> I'll try to start my testing part from there for the moment if anybody
>>> has no objections..
>>>
>>> Best,
>>> Flavio
>>>
>>>
>>> On Mon, Jun 6, 2016 at 4:08 PM, Ufuk Celebi <u...@apache.org> wrote:
>>>
>>>> Unless someone really invests time into debugging this I fear that the
>>>> different misspellings are not really helpful, Flavio.
>>>>
>>>> On Mon, Jun 6, 2016 at 10:31 AM, Flavio Pompermaier
>>>> <pomperma...@okkam.it> wrote:
>>>> > This time I had the following exception (obviously
>>>> > it.okkam.flinj.model.pojo.TipoSoggetto should be
>>>> > it.okkam.flink.model.pojo.TipoSoggetto).
>>>> >
>>>> > java.lang.RuntimeException: Cannot instantiate class.
>>>> >       at
>>>> >
>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
>>>> >       at
>>>> >
>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>> >       at
>>>> >
>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>>> >       at
>>>> >
>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>> >       at
>>>> >
>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>> >       at
>>>> >
>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>>>> >       at
>>>> >
>>>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>>>> >       at
>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>> >       at
>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> >       at java.lang.Thread.run(Thread.java:745)
>>>> > Caused by: java.lang.ClassNotFoundException:
>>>> > it.okkam.flinj.model.pojo.TipoSoggetto
>>>> >       at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>> >       at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>> >       at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>> >       at java.lang.Class.forName0(Native Method)
>>>> >       at java.lang.Class.forName(Class.java:348)
>>>> >       at
>>>> >
>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
>>>> >       ... 10 more
>>>> >
>>>> >
>>>> >
>>>> > On Wed, Jun 1, 2016 at 5:44 PM, Flavio Pompermaier <
>>>> pomperma...@okkam.it>
>>>> > wrote:
>>>> >>
>>>> >> The last week I've been able to run the job several times without any
>>>> >> error. then I just recompiled it and the error reappered :(
>>>> >> This time I had:
>>>> >>
>>>> >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup
>>>> (CoGroup
>>>> >> at main(DataInference.java:372)) -> Map (Map at
>>>> >> writeEntitonPojos(ParquetThriftEntitons.java:170))' , caused an
>>>> error: Error
>>>> >> obtaining the sorted input: Thread 'SortMerger Reading Thread'
>>>> terminated
>>>> >> due to an exception: Serializer consumed more bytes than the record
>>>> had.
>>>> >> This indicates broken serialization. If you are using custom
>>>> serialization
>>>> >> types (Value or Writable), check their serialization methods. If you
>>>> are
>>>> >> using a Kryo-serialized type, check the corresponding Kryo
>>>> serializer.
>>>> >>      at
>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>>>> >>      at
>>>> >>
>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>> >>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> >>      at java.lang.Thread.run(Thread.java:745)
>>>> >> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>> input:
>>>> >> Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>> >> Serializer consumed more bytes than the record had. This indicates
>>>> broken
>>>> >> serialization. If you are using custom serialization types (Value or
>>>> >> Writable), check their serialization methods. If you are using a
>>>> >> Kryo-serialized type, check the corresponding Kryo serializer.
>>>> >>      at
>>>> >>
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>> >>      at
>>>> >>
>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>>> >>      at
>>>> >>
>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98)
>>>> >>      at
>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>>> >>      ... 3 more
>>>> >> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>> >> terminated due to an exception: Serializer consumed more bytes than
>>>> the
>>>> >> record had. This indicates broken serialization. If you are using
>>>> custom
>>>> >> serialization types (Value or Writable), check their serialization
>>>> methods.
>>>> >> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>> >> serializer.
>>>> >>      at
>>>> >>
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>>> >> Caused by: java.io.IOException: Serializer consumed more bytes than
>>>> the
>>>> >> record had. This indicates broken serialization. If you are using
>>>> custom
>>>> >> serialization types (Value or Writable), check their serialization
>>>> methods.
>>>> >> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>> >> serializer.
>>>> >>      at
>>>> >>
>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>>>> >>      at
>>>> >>
>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>> >>      at
>>>> >>
>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>> >>      at
>>>> >>
>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>> >>      at
>>>> >>
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>>> >>      at
>>>> >>
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>> >> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>>>> >>      at java.util.ArrayList.elementData(ArrayList.java:418)
>>>> >>      at java.util.ArrayList.get(ArrayList.java:431)
>>>> >>      at
>>>> >>
>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>>>> >>      at
>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>>> >>      at
>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>>>> >>      at
>>>> >>
>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>>>> >>      at
>>>> >>
>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>>> >>      at
>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>> >>      at
>>>> >>
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>> >>      at
>>>> >>
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>>>> >>      at
>>>> >>
>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501)
>>>> >>      at
>>>> >>
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>>>> >>      at
>>>> >>
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>> >>      at
>>>> >>
>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>>> >>      at
>>>> >>
>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>>> >>      ... 5 more
>>>> >>
>>>> >>
>>>> >> I can't really find a way to understand what is causing the error :(
>>>> >>
>>>> >>
>>>> >> On Mon, May 30, 2016 at 12:25 PM, Robert Metzger <
>>>> rmetz...@apache.org>
>>>> >> wrote:
>>>> >>>
>>>> >>> Hi Flavio,
>>>> >>>
>>>> >>> can you privately share the source code of your Flink job with me?
>>>> >>>
>>>> >>> I'm wondering whether the issue might be caused by a version mixup
>>>> >>> between different versions on the cluster (different JVM versions?
>>>> or
>>>> >>> different files in the lib/ folder?), How are you deploying the
>>>> Flink job?
>>>> >>>
>>>> >>> Regards,
>>>> >>> Robert
>>>> >>>
>>>> >>>
>>>> >>> On Mon, May 30, 2016 at 11:33 AM, Flavio Pompermaier
>>>> >>> <pomperma...@okkam.it> wrote:
>>>> >>>>
>>>> >>>> I tried to reproduce the error on a subset of the data and actually
>>>> >>>> reducing the available memory and increasing a lot the gc
>>>> (creating a lot of
>>>> >>>> useless objects in one of the first UDFs) caused this error:
>>>> >>>>
>>>> >>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>>> >>>> terminated due to an exception: / by zero
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>>> >>>> Caused by: java.lang.ArithmeticException: / by zero
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.getSegmentsForReaders(UnilateralSortMerger.java:1651)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.mergeChannelList(UnilateralSortMerger.java:1565)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1417)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>> >>>>
>>>> >>>> I hope this could help to restrict the debugging area :)
>>>> >>>>
>>>> >>>> Best,
>>>> >>>> Flavio
>>>> >>>>
>>>> >>>> On Fri, May 27, 2016 at 8:21 PM, Stephan Ewen <se...@apache.org>
>>>> wrote:
>>>> >>>>>
>>>> >>>>> Hi!
>>>> >>>>>
>>>> >>>>> That is a pretty thing indeed :-) Will try to look into this in a
>>>> few
>>>> >>>>> days...
>>>> >>>>>
>>>> >>>>> Stephan
>>>> >>>>>
>>>> >>>>>
>>>> >>>>> On Fri, May 27, 2016 at 12:10 PM, Flavio Pompermaier
>>>> >>>>> <pomperma...@okkam.it> wrote:
>>>> >>>>>>
>>>> >>>>>> Running the job with log level set to DEBUG made the job run
>>>> >>>>>> successfully...Is this meaningful..? Maybe slowing down a little
>>>> bit the
>>>> >>>>>> threads could help serialization?
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>> On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier
>>>> >>>>>> <pomperma...@okkam.it> wrote:
>>>> >>>>>>>
>>>> >>>>>>> Still not able to reproduce the error locally but remotly :)
>>>> >>>>>>> Any suggestions about how to try to reproduce it locally on a
>>>> subset
>>>> >>>>>>> of the data?
>>>> >>>>>>> This time I had:
>>>> >>>>>>>
>>>> >>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class:
>>>> ^Z^A
>>>> >>>>>>>         at
>>>> >>>>>>>
>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>> >>>>>>>         at
>>>> >>>>>>>
>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>> >>>>>>>         at
>>>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>> >>>>>>>         at
>>>> >>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>> >>>>>>>         at
>>>> >>>>>>>
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>> >>>>>>>         at
>>>> >>>>>>>
>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>>>> >>>>>>>         at
>>>> >>>>>>>
>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>> >>>>>>>         at
>>>> >>>>>>>
>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>>> >>>>>>>         at
>>>> >>>>>>>
>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>> >>>>>>>         at
>>>> >>>>>>>
>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>> >>>>>>>         at
>>>> >>>>>>>
>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>>>> >>>>>>>         at
>>>> >>>>>>>
>>>> org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
>>>> >>>>>>>         at
>>>> >>>>>>>
>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>> >>>>>>>         at
>>>> >>>>>>>
>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>> >>>>>>>         at
>>>> >>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> >>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>> >>>>>>>
>>>> >>>>>>> Best,
>>>> >>>>>>> Flavio
>>>> >>>>>>>
>>>> >>>>>>>
>>>> >>>>>>> On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier
>>>> >>>>>>> <pomperma...@okkam.it> wrote:
>>>> >>>>>>>>
>>>> >>>>>>>> Do you have any suggestion about how to reproduce the error on
>>>> a
>>>> >>>>>>>> subset of the data?
>>>> >>>>>>>> I'm trying changing the following but I can't find a
>>>> configuration
>>>> >>>>>>>> causing the error :(
>>>> >>>>>>>>
>>>> >>>>>>>> rivate static ExecutionEnvironment getLocalExecutionEnv() {
>>>> >>>>>>>>         org.apache.flink.configuration.Configuration c = new
>>>> >>>>>>>> org.apache.flink.configuration.Configuration();
>>>> >>>>>>>>         c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
>>>> >>>>>>>> "/tmp");
>>>> >>>>>>>>
>>>> >>>>>>>> c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp");
>>>> >>>>>>>>
>>>>  c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
>>>> >>>>>>>> 0.9f);
>>>> >>>>>>>>         c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
>>>> 4);
>>>> >>>>>>>>         c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
>>>> 4);
>>>> >>>>>>>>         c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000
>>>> s");
>>>> >>>>>>>>
>>>> >>>>>>>>
>>>> c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 12);
>>>> >>>>>>>>         ExecutionEnvironment env =
>>>> >>>>>>>> ExecutionEnvironment.createLocalEnvironment(c);
>>>> >>>>>>>>         env.setParallelism(16);
>>>> >>>>>>>>         env.registerTypeWithKryoSerializer(DateTime.class,
>>>> >>>>>>>> JodaDateTimeSerializer.class );
>>>> >>>>>>>>         return env;
>>>> >>>>>>>>     }
>>>> >>>>>>>>
>>>> >>>>>>>> Best,
>>>> >>>>>>>> Flavio
>>>> >>>>>>>>
>>>> >>>>>>>>
>>>> >>>>>>>> On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann
>>>> >>>>>>>> <trohrm...@apache.org> wrote:
>>>> >>>>>>>>>
>>>> >>>>>>>>> The error look really strange. Flavio, could you compile a
>>>> test
>>>> >>>>>>>>> program with example data and configuration to reproduce the
>>>> problem. Given
>>>> >>>>>>>>> that, we could try to debug the problem.
>>>> >>>>>>>>>
>>>> >>>>>>>>> Cheers,
>>>> >>>>>>>>> Till
>>>> >>>>>>>>
>>>> >>>>>>>>
>>>> >>>>>>>
>>>> >>>>>>
>>>> >>>>>
>>>> >>>>
>>>> >>>
>>>> >>
>>>> >
>>>>
>>>
>>>
>

Reply via email to