Hello! I often use batch mode to validate that my pipeline can produce the expected results over some fixed input data, that usually works very well and definitely helps to find bugs in my user code.
I have one job that reads many TBs of data from S3 and then writes reduced outputs back to S3. This job (p=800, on EMR/YARN, m5.xlarge with EBS) used to run with the hash blocking shuffle but then I started to see "too many open files" exceptions while scaling up the input data and switched to sort-merge. For sort-merge there are various parameters that need to be adjusted and the excellent blog post about it [1] and the flink memory documentation certainly helped to understand this to some degree. My issue is I don't manage to find memory settings that wouldn't result in crashes. If it runs with default settings, except for enabling sort-merge and taskmanager.network.blocking-shuffle.compression.enabled: true, then it would often crash with TimeoutExceptions in SortMergeResultPartitionReadScheduler.allocateBuffers (line 168). The exception recommends increasing taskmanager.memory.framework.off-heap.batch-shuffle.size. With an increased batch-shuffle.size: 128m and off-heap.size: 256m I would still get the same exception. Increasing this further would usually result in yarn killing containers with the 137 exit code. Increasing sort-shuffle.min-buffers e.g. to 2048 would have the same result, container is killed with 137. Also after adding more memory to the network. To circumvent this I tried to increase the jvm-overhead but this also didn't help. My operators don't need a lot of memory (no off-heap memory) and are probably not causing this, e.g. one where this often fails is a reduce which only keeps one object at a time. The crashes usually happen after the job is almost finished. What is a good approach to debug this? Thanks, Jörn [1] https://flink.apache.org/2021/10/26/sort-shuffle-part1.html