Hi Robert,

Any updates on the below for the community?

Thanks,
M

On Tue, Apr 25, 2017 at 8:50 AM, Robert Schmidtke <ro.schmid...@gmail.com>
wrote:

> Hi Ufuk, thanks for coming back to me on this.
>
> The records are 100 bytes in size, the benchmark being TeraSort, so that
> should not be an issue. I have played around with the input size, and here
> are my observations:
>
> 128 GiB input: 0 Spilling in Flink.
> 256 GiB input: 88 GiB Spilling in Flink (so 88 GiB of reads, 88 GiB of
> writes), and my instrumentation covers all of it.
> 384 GiB input: 391 GiB Spilling in Flink, and I cover all of it.
> 512 GiB input: 522 GiB Spilling in Flink, but I miss 140 GiB of it.
> 640 GiB input: 653 GiB Spilling in Flink, but I miss 281 GiB of it.
> 768 GiB input: 784 GiB Spilling in Flink, but I miss 490 GiB of it.
> 896 GiB input: 914 GiB Spilling in Flink, but I miss 662 GiB of it.
> 1024 GiB input: 1045 GiB Spilling in Flink, but I miss 968 GiB of it.
>
> So regardless of how well configured my system is and spilling is even
> necessary, it seems that with larger spilling amounts, the way the data is
> spilled changes (and I start missing larger and larger portions of I/O
> until almost 100%).
> Now since I have written the instrumentation myself, I cannot guarantee
> that it is flawless and I might have missed something.
> I'm currently looking into how the file channels are being accessed in
> parallel by multiple threads, which I cover as well and my tests verify it,
> but maybe there are special access patterns here.
>
> Robert
>
> On Mon, Apr 24, 2017 at 2:25 PM, Ufuk Celebi <u...@apache.org> wrote:
>
>> Hey Robert,
>>
>> for batch that should cover the relevant spilling code. If the records
>> are >= 5 MB, the SpillingAdaptiveSpanningRecordDeserializer will spill
>> incoming records as well. But that should be covered by the
>> FileChannel instrumentation as well?
>>
>> – Ufuk
>>
>>
>> On Tue, Apr 18, 2017 at 3:57 PM, Robert Schmidtke
>> <ro.schmid...@gmail.com> wrote:
>> > Hi,
>> >
>> > I have already looked at the UnilateralSortMerger, concluding that all
>> I/O
>> > eventually goes via SegmentReadRequest and SegmentWriteRequest (which in
>> > turn use java.nio.channels.FileChannel) in AsynchronousFileIOChannel.
>> Are
>> > there more interaction points between Flink and the underlying file
>> system
>> > that I might want to consider?
>> >
>> > Thanks!
>> > Robert
>> >
>> > On Fri, Apr 7, 2017 at 5:02 PM, Kurt Young <ykt...@gmail.com> wrote:
>> >>
>> >> Hi,
>> >>
>> >> You probably want check out UnilateralSortMerger.java, this is the
>> class
>> >> which is responsible for external sort for flink. Here is a short
>> >> description for how it works: there are totally 3 threads working
>> together,
>> >> one for reading, one for sorting partial data in memory, and the last
>> one is
>> >> responsible for spilling. Flink will first figure out how many memory
>> it can
>> >> use during the in-memory sort, and manage them as MemorySegments. Once
>> these
>> >> memory runs out, the sorting thread will take over these memory and do
>> the
>> >> in-memory sorting (For more details about in-memory sorting, you can
>> see
>> >> NormalizedKeySorter). After this, the spilling thread will write this
>> sorted
>> >> data to disk and make these memory available again for reading. This
>> will
>> >> repeated until all data has been processed.
>> >> Normally, the data will be read twice (one from source, and one from
>> disk)
>> >> and write once, but if you spilled too much files, flink will first
>> merge
>> >> some all the files and make sure the last merge step will not exceed
>> some
>> >> limit (default 128). Hope this can help you.
>> >>
>> >> Best,
>> >> Kurt
>> >>
>> >> On Fri, Apr 7, 2017 at 4:20 PM, Robert Schmidtke <
>> ro.schmid...@gmail.com>
>> >> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> I'm currently examining the I/O patterns of Flink, and I'd like to
>> know
>> >>> when/how Flink goes to disk. Let me give an introduction of what I
>> have done
>> >>> so far.
>> >>>
>> >>> I am running TeraGen (from the Hadoop examples package) + TeraSort
>> >>> (https://github.com/robert-schmidtke/terasort) on a 16 node cluster,
>> each
>> >>> node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte
>> of disk.
>> >>> I'm using YARN and HDFS. The underlying file system is XFS.
>> >>>
>> >>> Now before running TeraGen and TeraSort, I reset the XFS counters to
>> >>> zero, and after TeraGen + TeraSort are finished, I dump the XFS
>> counters
>> >>> again. Accumulated over the entire cluster I get 3 TiB of writes and
>> 3.2 TiB
>> >>> of reads. What I'd have expected would be 2 TiB of writes (1 for
>> TeraGen, 1
>> >>> for TeraSort) and 1 TiB of reads (during TeraSort).
>> >>>
>> >>> Unsatisfied by the coarseness of these numbers I developed an HDFS
>> >>> wrapper that logs file system statistics for each call to hdfs://...,
>> such
>> >>> as start time/end time, no. of bytes read/written etc. I can plot
>> these
>> >>> numbers and see what I expect: during TeraGen I have 1 TiB of writes
>> to
>> >>> hdfs://..., during TeraSort I have 1 TiB of reads from and 1 TiB of
>> writes
>> >>> to hdfs://... So far, so good.
>> >>>
>> >>> Now this still did not explain the disk I/O, so I added bytecode
>> >>> instrumentation to a range of Java classes, like FileIn/OutputStream,
>> >>> RandomAccessFile, FileChannel, ZipFile, multiple *Buffer classes for
>> memory
>> >>> mapped files etc., and have the same statistics: start/end of a read
>> >>> from/write to disk, no. of bytes involved and such. I can plot these
>> numbers
>> >>> too and see that the HDFS JVMs write 1 TiB of data to disk during
>> TeraGen
>> >>> (expected) and read and write 1 TiB from and to disk during TeraSort
>> >>> (expected).
>> >>>
>> >>> Sorry for the enormous introduction, but now there's finally the
>> >>> interesting part: Flink's JVMs read from and write to disk 1 TiB of
>> data
>> >>> each during TeraSort. I'm suspecting there is some sort of spilling
>> >>> involved, potentially because I have not done the setup properly. But
>> that
>> >>> is not the crucial point: my statistics give a total of 3 TiB of
>> writes to
>> >>> disk (2 TiB for HDFS, 1 TiB for Flink), which agrees with the XFS
>> counters
>> >>> from above. However, my statistics only give 2 TiB of reads from disk
>> (1 TiB
>> >>> for HDFS, 1 TiB for Flink), so I'm missing an entire TiB of reads
>> from disk
>> >>> somewhere. I have done the same with Hadoop TeraSort, and there I'm
>> not
>> >>> missing any data, meaning my statistics agree with XFS for TeraSort on
>> >>> Hadoop, which is why I suspect there are some cases where Flink goes
>> to disk
>> >>> without me noticing it.
>> >>>
>> >>> Therefore here finally the question: in which cases does Flink go to
>> >>> disk, and how does it do so (meaning precisely which Java classes are
>> >>> involved, so I can check my bytecode instrumentation)? This would also
>> >>> include any kind of resource distribution via HDFS/YARN I guess (like
>> JAR
>> >>> files and I don't know what). Seeing that I'm missing an amount of
>> data
>> >>> equal to the size of my input set I'd suspect there must be some sort
>> of
>> >>> shuffling/spilling at play here, but I'm not sure. Maybe there is
>> also some
>> >>> sort of remote I/O involved via sockets or so that I'm missing.
>> >>>
>> >>> Any hints as to where Flink might incur disk I/O are greatly
>> appreciated!
>> >>> I'm also happy with doing the digging myself, once pointed to the
>> proper
>> >>> packages in the Apache Flink source tree (I have done my fair share of
>> >>> inspection already, but could not be sure whether or not I have missed
>> >>> something). Thanks a lot in advance!
>> >>>
>> >>> Robert
>> >>>
>> >>> --
>> >>> My GPG Key ID: 336E2680
>> >>
>> >>
>> >
>> >
>> >
>> > --
>> > My GPG Key ID: 336E2680
>>
>
>
>
> --
> My GPG Key ID: 336E2680
>

Reply via email to