Hi community,

For some high-parallel Flink jobs, when hdfs Namenode is under pressure, JM 
cleaning files becomes a bottleneck. I have some questions and hope to be 
answered. thanks.



Motivation:
Flink job:

Parallelism = 4000

Checkpoint interval = 10s

More than 5 Operators include State, and 4,000 * 5 = 20,000 files are generated 
every 10s.


A large number of State files are written to the chk-xxx directory, and only JM 
cleans up these files. When the hdfs Namenode is under pressure, the speed of 
JM cleaning up files is slow, and the cleaning speed cannot keep up with the 
speed of generating files, resulting in a large number of files remaining in 
hdfs.


Increasing the value of `state.backend.fs.memory-threshold` can alleviate the 
problem, but it cannot solve it at all. So I hope to clean up as much as 
possible at the directory level, not the file level.



Idea:


When the CompletedCheckpoint#doDiscard method cleans up the State, there are 
three steps:
1. Clean up the _metadata file
2. Clean up all state data file
3. Clean up the chk-xxx directory, and recursive=false


Question: The _metadata file and many OperatorState files are in the chk-xxx 
directory. Can the following changes be made:
1. Do not clean up the _metadata file
2. Clean up all state data file, but don't clean up the files of state data in 
the chk-xxx directory
3. Delete the chk-xxx directory, and recursive=true


The above changes will reduce hdfs calls in most scenarios and reduce NameNode 
pressure. I have completed the development and can get a lot of benefits. 


commit link: 
https://github.com/1996fanrui/flink/commit/728cffd6e556515e69bc14e646b03c5edcd84934


Will there be some unknown risks? Hope to get expert guidance, thanks.





Best


fanrui

回复