Hello!

I often use batch mode to validate that my pipeline can produce the
expected results over some fixed input data, that usually works very well
and definitely helps to find bugs in my user code.

I have one job that reads many TBs of data from S3 and then writes reduced
outputs back to S3.

This job (p=800, on EMR/YARN, m5.xlarge with EBS) used to run with the hash
blocking shuffle but then I started to see "too many open files" exceptions
while scaling up the input data and switched to sort-merge.

For sort-merge there are various parameters that need to be adjusted and
the excellent blog post about it [1] and the flink memory documentation
certainly helped to understand this to some degree.

My issue is I don't manage to find memory settings that wouldn't result in
crashes.

If it runs with default settings, except for enabling sort-merge and
taskmanager.network.blocking-shuffle.compression.enabled: true, then it
would often crash with TimeoutExceptions in
SortMergeResultPartitionReadScheduler.allocateBuffers (line 168). The
exception recommends increasing
taskmanager.memory.framework.off-heap.batch-shuffle.size.

With an increased batch-shuffle.size: 128m and off-heap.size: 256m I would
still get the same exception.
Increasing this further would usually result in yarn killing containers
with the 137 exit code.

Increasing sort-shuffle.min-buffers e.g. to 2048 would have the same
result, container is killed with 137. Also after adding more memory to the
network.

To circumvent this I tried to increase the jvm-overhead but this also
didn't help.

My operators don't need a lot of memory (no off-heap memory) and are
probably not causing this, e.g. one where this often fails is a reduce
which only keeps one object at a time.

The crashes usually happen after the job is almost finished.

What is a good approach to debug this?

Thanks,
Jörn

[1] https://flink.apache.org/2021/10/26/sort-shuffle-part1.html

Reply via email to