012huang opened a new pull request #27060: [SPARK-30246][CORE][SHUFFLE]fix 
spark external shuffle memory leak
URL: https://github.com/apache/spark/pull/27060
 
 
   ### What changes were proposed in this pull request?
   An app finished abnormal sometimes may cause shuffe service memory leak. In 
one of our production cases, the app failed for Stage cancelled as SparkContext 
has already shut down. the strange is there are still requests for fetch 
shuffle data and cause error in server side as below:
   ```
   2019-12-08 22:23:33,375 ERROR server.TransportRequestHandler 
(TransportRequestHandler.java:processFetchRequest(132)) - Error opening block 
StreamChunkId{streamId=1902064894814, chunkIndex=0} for request from 
/10.221.115.175:38582
   java.lang.RuntimeException: Executor is not registered 
(appId=application_1574499669561_954327, execId=4514)
   ```
   the client sie also show corresponding log like this:
   ```
   org.apache.spark.shuffle.FetchFailedException: Failure while fetching 
StreamChunkId{streamId=1902064894814, chunkIndex=0}: 
java.lang.RuntimeException: Executor is not registered 
(appId=application_1574499669561_954327, execId=4514)
   ```
   in some cases, the request for `OpenBlocks` is still on the fly. In the code 
`ExternalShuffleBlockHandler#handleMessage`, it will register a `StreamState` 
to `OneForOneStreamManager#streams`, then reply an success response to client 
unconditionally , the client receive the response and then fire 
`ChunkFetchRequest` to fetch chunk, but at this time, the app has got event 
`APPLICATION_STOP` and executed `ExternalShuffleService#applicationRemoved` 
method to clean the app's `ExecutorShuffleInfo`, this made `Executor is not 
registered` error happended. even though when the client channel is closing, 
the `TransportRequestHandler#channelInactive` was called to clean the 
StreamState with relate channel, but when cleanning the `StreamState buffter`, 
it also lookup `ManagedBuffer` with` appId` and `execId` info which have been 
cleaned in executors object. we can also find the log:  `StreamManager 
connectionTerminated() callback failed` in NM's log file.
   
   so, when an `OpenBlocks` request come, we should lookup 
`ExternalShuffleBlockResolver#executors` , if the realted app is exited, we 
should not registering a `StreamState` and just close the client (or reply an 
special message to client and in client side to handle it). and when an app get 
`APPLICATION_STOP` to call `applicationRemoved`, we should clean the the 
related `streamState` before `ExecutorShuffleInfo` has been cleaned, this is 
what the PR changes and prevents the shuffle service memory leak.
   
   ### Why are the changes needed?
   The external shuffle service memory leak has a great impact on cluster with 
dynanic on and may cause NM crash.
   
   ### Does this PR introduce any user-facing change?
   No
   
   ### How was this patch tested?
   with existing ut

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to