Thanks for the test case, i will take a look at it.

Flavio Pompermaier <pomperma...@okkam.it>于2017年4月27日 周四03:55写道:

> I've created a repository with a unit test to reproduce the error at
> https://github.com/fpompermaier/flink-batch-bug/blob/master/src/test/java/it/okkam/flink/aci/TestDataInputDeserializer.java
>  (probably
> this error is related also to FLINK-4719).
>
> The exception is  thrown only when there are null strings and multiple
> slots per TM, I don't know whether UnilateralSorterMerger is involved or
> not (but I think so..).
> A quick fix for this problem would be very appreciated because it's
> bloking a production deployment..
>
> Thanks in advance to all,
> Flavio
>
> On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> After digging into the code and test I think that the problem is almost
>> certainly in the UnilateralSortMerger, there should be a missing
>> synchronization on some shared object somewhere...Right now I'm trying to
>> understand if this section of code creates some shared object (like queues)
>> that are accessed in a bad way when there's spilling to disk:
>>
>>                // start the thread that reads the input channels
>> this.readThread = getReadingThread(exceptionHandler, input,
>> circularQueues, largeRecordHandler,
>> parentTask, serializer, ((long) (startSpillingFraction * sortMemory)));
>>
>> // start the thread that sorts the buffers
>> this.sortThread = getSortingThread(exceptionHandler, circularQueues,
>> parentTask);
>>
>> // start the thread that handles spilling to secondary storage
>> this.spillThread = getSpillingThread(exceptionHandler, circularQueues,
>> parentTask,
>> memoryManager, ioManager, serializerFactory, comparator,
>> this.sortReadMemory, this.writeMemory,
>> maxNumFileHandles);
>> ....
>> startThreads();
>>
>>
>> The problem is that the unit tests of GroupReduceDriver use Record and
>> testing Rows in not very straightforward and I'm still trying to reproduce
>> the problem in a local env..
>>
>> On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier <pomperma...@okkam.it
>> > wrote:
>>
>>> Thanks for the explanation . Is there a way to force this behaviour in a
>>> local environment (to try to debug the problem)?
>>>
>>> On 21 Apr 2017 21:49, "Fabian Hueske" <fhue...@gmail.com> wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> these files are used for spilling data to disk. In your case sorted
>>>> runs of records.
>>>> Later all (up to a fanout threshold) these sorted runs are read and
>>>> merged to get a completely sorted record stream.
>>>>
>>>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>>
>>>>> The error appears as soon as some taskmanager generates some
>>>>> inputchannel file.
>>>>> What are those files used for?
>>>>>
>>>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <
>>>>> pomperma...@okkam.it> wrote:
>>>>>
>>>>>> In another run of the job I had another Exception. Could it be
>>>>>> helpful?
>>>>>>
>>>>>> Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
>>>>>> terminated due to an exception: Serializer consumed more bytes than the
>>>>>> record had. This indicates broken serialization. If you are using custom
>>>>>> serialization types (Value or Writable), check their serialization 
>>>>>> methods.
>>>>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>>>> serializer.
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>>>> Serializer consumed more bytes than the record had. This indicates broken
>>>>>> serialization. If you are using custom serialization types (Value or
>>>>>> Writable), check their serialization methods. If you are using a
>>>>>> Kryo-serialized type, check the corresponding Kryo serializer.
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>>>>>> ... 3 more
>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>>> terminated due to an exception: Serializer consumed more bytes than the
>>>>>> record had. This indicates broken serialization. If you are using custom
>>>>>> serialization types (Value or Writable), check their serialization 
>>>>>> methods.
>>>>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>>>> serializer.
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>> Caused by: java.io.IOException: Serializer consumed more bytes than
>>>>>> the record had. This indicates broken serialization. If you are using
>>>>>> custom serialization types (Value or Writable), check their serialization
>>>>>> methods. If you are using a Kryo-serialized type, check the corresponding
>>>>>> Kryo serializer.
>>>>>> at org.apache.flink.runtime.io
>>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:123)
>>>>>> at org.apache.flink.runtime.io
>>>>>> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>> at org.apache.flink.runtime.io
>>>>>> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>>>>>> at
>>>>>> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104)
>>>>>> at org.apache.flink.runtime.io
>>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
>>>>>> at org.apache.flink.runtime.io
>>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.java:770)
>>>>>> at
>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>>>>>> at
>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
>>>>>> at
>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>> serialize(RowSerializer.java:193)
>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>> serialize(RowSerializer.java:36)
>>>>>> at
>>>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>>>>> at org.apache.flink.runtime.io
>>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
>>>>>> ... 5 more
>>>>>>
>>>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
>>>>>> stefano.bort...@huawei.com> wrote:
>>>>>>
>>>>>>> In fact the old problem was with the KryoSerializer missed
>>>>>>> initialization on the exception that would trigger the spilling on disk.
>>>>>>> This would lead to dirty serialization buffer that would eventually 
>>>>>>> break
>>>>>>> the program. Till worked on it debugging the source code generating the
>>>>>>> error. Perhaps someone could try the same also this time. If Flavio can
>>>>>>> make the problem reproducible in a shareable program+data.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Stefano
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *From:* Stephan Ewen [mailto:se...@apache.org]
>>>>>>> *Sent:* Friday, April 21, 2017 10:04 AM
>>>>>>> *To:* user <user@flink.apache.org>
>>>>>>> *Subject:* Re: UnilateralSortMerger error (again)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> In the past, these errors were most often caused by bugs in the
>>>>>>> serializers, not in the sorter.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> What types are you using at that point? The Stack Trace reveals ROW
>>>>>>> and StringValue, any other involved types?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <
>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>
>>>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to
>>>>>>> force spilling to disk) and the job failed almost immediately..
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>
>>>>>>> I debugged a bit the process repeating the job on a sub-slice of the
>>>>>>> entire data (using the id value to filter data with parquet push down
>>>>>>> filters) and all slices completed successfully :(
>>>>>>>
>>>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4) to
>>>>>>> see if this was somehow a factor of stress but it didn't cause any 
>>>>>>> error.
>>>>>>>
>>>>>>> Then I almost doubled the number of rows to process and finally the
>>>>>>> error showed up again.
>>>>>>>
>>>>>>> It seems somehow related to spilling to disk but I can't really
>>>>>>> understand what's going on :(
>>>>>>>
>>>>>>> This is a summary of my debug attempts:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>>>>>>
>>>>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
>>>>>>>
>>>>>>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903 rows
>>>>>>> => OK
>>>>>>>
>>>>>>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750
>>>>>>>  rows => OK
>>>>>>>
>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>>>>>
>>>>>>>
>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>
>>>>>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Any help is appreciated..
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Flavio
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>
>>>>>>> I could but only if there's a good probability that it fix the
>>>>>>> problem...how confident are you about it?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>
>>>>>>> Looking at git log of DataInputDeserializer.java , there has been
>>>>>>> some recent change.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>>>>>> reproducible ?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Cheers
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>
>>>>>>> Hi to all,
>>>>>>>
>>>>>>> I think I'm again on the weird Exception with the
>>>>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>>>>>
>>>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills
>>>>>>> to disk but the Exception thrown is not very helpful. Any idea?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an 
>>>>>>> exception:
>>>>>>> null
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>>>>>>> ... 3 more
>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>>>> terminated due to an exception: null
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>> Caused by: java.io.EOFException
>>>>>>> at
>>>>>>> org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:306)
>>>>>>> at
>>>>>>> org.apache.flink.types.StringValue.readString(StringValue.java:747)
>>>>>>> at
>>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>>>>>>> at
>>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
>>>>>>> at
>>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>> serialize(RowSerializer.java:193)
>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>> serialize(RowSerializer.java:36)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>>>>>> at org.apache.flink.runtime.io
>>>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:144)
>>>>>>> at org.apache.flink.runtime.io
>>>>>>> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>>> at org.apache.flink.runtime.io
>>>>>>> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Flavio
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>
>>
> --
Best,
Kurt

Reply via email to