Thank you Kurt!

2017-04-27 17:40 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:

> Great!! Thanks a lot Kurt
>
> On Thu, Apr 27, 2017 at 5:31 PM, Kurt Young <ykt...@gmail.com> wrote:
>
>> Hi, i have found the bug: https://issues.apache.org
>> /jira/browse/FLINK-6398, will open a PR soon.
>>
>> Best,
>> Kurt
>>
>> On Thu, Apr 27, 2017 at 8:23 PM, Flavio Pompermaier <pomperma...@okkam.it
>> > wrote:
>>
>>> Thanks a lot Kurt!
>>>
>>> On Thu, Apr 27, 2017 at 2:12 PM, Kurt Young <ykt...@gmail.com> wrote:
>>>
>>>> Thanks for the test case, i will take a look at it.
>>>>
>>>> Flavio Pompermaier <pomperma...@okkam.it>于2017年4月27日 周四03:55写道:
>>>>
>>>>> I've created a repository with a unit test to reproduce the error at
>>>>> https://github.com/fpompermaier/flink-batch-bug/blob/mast
>>>>> er/src/test/java/it/okkam/flink/aci/TestDataInputDeserializer.java 
>>>>> (probably
>>>>> this error is related also to FLINK-4719).
>>>>>
>>>>> The exception is  thrown only when there are null strings and multiple
>>>>> slots per TM, I don't know whether UnilateralSorterMerger is involved or
>>>>> not (but I think so..).
>>>>> A quick fix for this problem would be very appreciated because it's
>>>>> bloking a production deployment..
>>>>>
>>>>> Thanks in advance to all,
>>>>> Flavio
>>>>>
>>>>> On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier <
>>>>> pomperma...@okkam.it> wrote:
>>>>>
>>>>>> After digging into the code and test I think that the problem is
>>>>>> almost certainly in the UnilateralSortMerger, there should be a missing
>>>>>> synchronization on some shared object somewhere...Right now I'm trying to
>>>>>> understand if this section of code creates some shared object (like 
>>>>>> queues)
>>>>>> that are accessed in a bad way when there's spilling to disk:
>>>>>>
>>>>>>                // start the thread that reads the input channels
>>>>>> this.readThread = getReadingThread(exceptionHandler, input,
>>>>>> circularQueues, largeRecordHandler,
>>>>>> parentTask, serializer, ((long) (startSpillingFraction *
>>>>>> sortMemory)));
>>>>>>
>>>>>> // start the thread that sorts the buffers
>>>>>> this.sortThread = getSortingThread(exceptionHandler, circularQueues,
>>>>>> parentTask);
>>>>>>
>>>>>> // start the thread that handles spilling to secondary storage
>>>>>> this.spillThread = getSpillingThread(exceptionHandler,
>>>>>> circularQueues, parentTask,
>>>>>> memoryManager, ioManager, serializerFactory, comparator,
>>>>>> this.sortReadMemory, this.writeMemory,
>>>>>> maxNumFileHandles);
>>>>>> ....
>>>>>> startThreads();
>>>>>>
>>>>>>
>>>>>> The problem is that the unit tests of GroupReduceDriver use Record
>>>>>> and testing Rows in not very straightforward and I'm still trying to
>>>>>> reproduce the problem in a local env..
>>>>>>
>>>>>> On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier <
>>>>>> pomperma...@okkam.it> wrote:
>>>>>>
>>>>>>> Thanks for the explanation . Is there a way to force this behaviour
>>>>>>> in a local environment (to try to debug the problem)?
>>>>>>>
>>>>>>> On 21 Apr 2017 21:49, "Fabian Hueske" <fhue...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Flavio,
>>>>>>>>
>>>>>>>> these files are used for spilling data to disk. In your case sorted
>>>>>>>> runs of records.
>>>>>>>> Later all (up to a fanout threshold) these sorted runs are read and
>>>>>>>> merged to get a completely sorted record stream.
>>>>>>>>
>>>>>>>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it
>>>>>>>> >:
>>>>>>>>
>>>>>>>>> The error appears as soon as some taskmanager generates some
>>>>>>>>> inputchannel file.
>>>>>>>>> What are those files used for?
>>>>>>>>>
>>>>>>>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <
>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>>> In another run of the job I had another Exception. Could it be
>>>>>>>>>> helpful?
>>>>>>>>>>
>>>>>>>>>> Error obtaining the sorted input: Thread 'SortMerger Reading
>>>>>>>>>> Thread' terminated due to an exception: Serializer consumed more 
>>>>>>>>>> bytes than
>>>>>>>>>> the record had. This indicates broken serialization. If you are using
>>>>>>>>>> custom serialization types (Value or Writable), check their 
>>>>>>>>>> serialization
>>>>>>>>>> methods. If you are using a Kryo-serialized type, check the 
>>>>>>>>>> corresponding
>>>>>>>>>> Kryo serializer.
>>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>>>>>>> ava:465)
>>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>>>>>>>> k.java:355)
>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an 
>>>>>>>>>> exception:
>>>>>>>>>> Serializer consumed more bytes than the record had. This indicates 
>>>>>>>>>> broken
>>>>>>>>>> serialization. If you are using custom serialization types (Value or
>>>>>>>>>> Writable), check their serialization methods. If you are using a
>>>>>>>>>> Kryo-serialized type, check the corresponding Kryo serializer.
>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>>>>>> ask.java:1094)
>>>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>>>>>>> (GroupReduceDriver.java:99)
>>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>>>>>>> ava:460)
>>>>>>>>>> ... 3 more
>>>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading
>>>>>>>>>> Thread' terminated due to an exception: Serializer consumed more 
>>>>>>>>>> bytes than
>>>>>>>>>> the record had. This indicates broken serialization. If you are using
>>>>>>>>>> custom serialization types (Value or Writable), check their 
>>>>>>>>>> serialization
>>>>>>>>>> methods. If you are using a Kryo-serialized type, check the 
>>>>>>>>>> corresponding
>>>>>>>>>> Kryo serializer.
>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>>>>> Caused by: java.io.IOException: Serializer consumed more bytes
>>>>>>>>>> than the record had. This indicates broken serialization. If you are 
>>>>>>>>>> using
>>>>>>>>>> custom serialization types (Value or Writable), check their 
>>>>>>>>>> serialization
>>>>>>>>>> methods. If you are using a Kryo-serialized type, check the 
>>>>>>>>>> corresponding
>>>>>>>>>> Kryo serializer.
>>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>>>> daptiveSpanningRecordDeserializer.java:123)
>>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>>>>>>> Reader.next(MutableRecordReader.java:42)
>>>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>>>>>>> ReaderIterator.java:59)
>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>>>>>>>>>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor
>>>>>>>>>> ySegment.java:104)
>>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>>>>>>>>> Byte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
>>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>>>>>>>>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
>>>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.ja
>>>>>>>>>> va:770)
>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>>> deserialize(StringSerializer.java:69)
>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>>> deserialize(StringSerializer.java:74)
>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>>> deserialize(StringSerializer.java:28)
>>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>>> serialize(RowSerializer.java:193)
>>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>>> serialize(RowSerializer.java:36)
>>>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>>>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>>>> daptiveSpanningRecordDeserializer.java:109)
>>>>>>>>>> ... 5 more
>>>>>>>>>>
>>>>>>>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
>>>>>>>>>> stefano.bort...@huawei.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> In fact the old problem was with the KryoSerializer missed
>>>>>>>>>>> initialization on the exception that would trigger the spilling on 
>>>>>>>>>>> disk.
>>>>>>>>>>> This would lead to dirty serialization buffer that would eventually 
>>>>>>>>>>> break
>>>>>>>>>>> the program. Till worked on it debugging the source code generating 
>>>>>>>>>>> the
>>>>>>>>>>> error. Perhaps someone could try the same also this time. If Flavio 
>>>>>>>>>>> can
>>>>>>>>>>> make the problem reproducible in a shareable program+data.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Stefano
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *From:* Stephan Ewen [mailto:se...@apache.org]
>>>>>>>>>>> *Sent:* Friday, April 21, 2017 10:04 AM
>>>>>>>>>>> *To:* user <user@flink.apache.org>
>>>>>>>>>>> *Subject:* Re: UnilateralSortMerger error (again)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> In the past, these errors were most often caused by bugs in the
>>>>>>>>>>> serializers, not in the sorter.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> What types are you using at that point? The Stack Trace reveals
>>>>>>>>>>> ROW and StringValue, any other involved types?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <
>>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>>
>>>>>>>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to
>>>>>>>>>>> force spilling to disk) and the job failed almost immediately..
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>>
>>>>>>>>>>> I debugged a bit the process repeating the job on a sub-slice of
>>>>>>>>>>> the entire data (using the id value to filter data with parquet 
>>>>>>>>>>> push down
>>>>>>>>>>> filters) and all slices completed successfully :(
>>>>>>>>>>>
>>>>>>>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4)
>>>>>>>>>>> to see if this was somehow a factor of stress but it didn't cause 
>>>>>>>>>>> any error.
>>>>>>>>>>>
>>>>>>>>>>> Then I almost doubled the number of rows to process and finally
>>>>>>>>>>> the error showed up again.
>>>>>>>>>>>
>>>>>>>>>>> It seems somehow related to spilling to disk but I can't really
>>>>>>>>>>> understand what's going on :(
>>>>>>>>>>>
>>>>>>>>>>> This is a summary of my debug attempts:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>>>>>>>>>>
>>>>>>>>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows
>>>>>>>>>>> => OK
>>>>>>>>>>>
>>>>>>>>>>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903
>>>>>>>>>>> rows => OK
>>>>>>>>>>>
>>>>>>>>>>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750
>>>>>>>>>>>  rows => OK
>>>>>>>>>>>
>>>>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>>>>>
>>>>>>>>>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Any help is appreciated..
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>>
>>>>>>>>>>> Flavio
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>>
>>>>>>>>>>> I could but only if there's a good probability that it fix the
>>>>>>>>>>> problem...how confident are you about it?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhih...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Looking at git log of DataInputDeserializer.java , there has
>>>>>>>>>>> been some recent change.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> If you have time, maybe try with 1.2.1 RC and see if the error
>>>>>>>>>>> is reproducible ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Cheers
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi to all,
>>>>>>>>>>>
>>>>>>>>>>> I think I'm again on the weird Exception with the
>>>>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>>>>>>>>>
>>>>>>>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink
>>>>>>>>>>> spills to disk but the Exception thrown is not very helpful. Any 
>>>>>>>>>>> idea?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the
>>>>>>>>>>> sorted input: Thread 'SortMerger Reading Thread' terminated due to 
>>>>>>>>>>> an
>>>>>>>>>>> exception: null
>>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>>>>>>> ask.java:1094)
>>>>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>>>>>>>> (GroupReduceDriver.java:99)
>>>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>>>>>>>> ava:460)
>>>>>>>>>>> ... 3 more
>>>>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading
>>>>>>>>>>> Thread' terminated due to an exception: null
>>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>>>>>> Caused by: java.io.EOFException
>>>>>>>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>>>>>>>>>> gnedByte(DataInputDeserializer.java:306)
>>>>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.ja
>>>>>>>>>>> va:747)
>>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>>>> deserialize(StringSerializer.java:69)
>>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>>>> deserialize(StringSerializer.java:74)
>>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>>>> deserialize(StringSerializer.java:28)
>>>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>>>> serialize(RowSerializer.java:193)
>>>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>>>> serialize(RowSerializer.java:36)
>>>>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>>>>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>>>>> daptiveSpanningRecordDeserializer.java:144)
>>>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>>>>>>>> Reader.next(MutableRecordReader.java:42)
>>>>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>>>>>>>> ReaderIterator.java:59)
>>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>>
>>>>>>>>>>> Flavio
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>>
>>>>> --
>>>> Best,
>>>> Kurt
>>>>
>>>
>>>
>>
>

Reply via email to