tanvn opened a new pull request, #36424: URL: https://github.com/apache/spark/pull/36424
### What changes were proposed in this pull request? make `cleanAppData` atomic to prevent race condition between update and clean app data. When the race condition happens, ApplicationInfoWrapper for an application right after it has been updated by `mergeApplicationListing`. So there will be cases when the HS Web UI displays `Application not found` for applications whose logs does exist. #### Background Currently, the HS runs the `checkForLogs` to build the application list based on the current contents of the log directory for every 10 seconds by default. - https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L296-L299 - https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L472 In each turn of execution, this method scans the specified logDir and parse the log files to update its KVStore: - detect new updated/added files to process : https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L574-L578 - detect stale data to remove: https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L586-L600 These 2 operations are executed in different threads as `submitLogProcessTask` uses `replayExecutor` to submit tasks. https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L1389-L1401 ### When does the bug happen? `Application not found` error happens in the following scenario: In the first run of `checkForLogs`, it detected a newly-added log `viewfs://iu/log/spark3/AAA_1.inprogress` (log of an in-progress application named AAA). So it will add 2 entries to the KVStore - one entry of key-value as the key is the logPath (`viewfs://iu/log/spark3/AAA_1.inprogress`) and the value is an instance of LogInfo represented the log - https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L495-L505 - https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L545-L552 - one entry of key-value as the key is the applicationId (`AAA`) and the value is an instance of ApplicationInfoWrapper holding the information of the application. - https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L825 - https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L1172 In the next run of `checkForLogs`, now the AAA application has finished, the log `viewfs://iu/log/spark3/AAA_1.inprogress` has been deleted and a new log `viewfs://iu/log/spark3/AAA_1` is created. So `checkForLogs` will do the following 2 things in 2 different threads: - Thread 1: parsing the new log `viewfs://iu/log/spark3/AAA_1` and update data in its KVStore - add a new entry of key: `viewfs://iu/log/spark3/AAA_1` and value: an instance of LogInfo represented the log - updated the entry with key=applicationId (`AAA`) with new value of an instance of ApplicationInfoWrapper (for example: the isCompleted flag now change from false to true) - Thread 2: data related to `viewfs://iu/log/spark3/AAA_1.inprogress` is now considered as stale and it must be deleted. - clean App data for `viewfs://iu/log/spark3/AAA_1.inprogress` https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L586-L600 - Inside, `cleanAppData`, first it loads the latest information of `ApplicationInfoWrapper` from the KVStore: https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L632 For most of the time, when this line is executed, Thread 1 already finished `updating the entry with key=applicationId (AAA) with new value of an instance of ApplicationInfoWrapper` so this condition https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L637 will be evaluated as false, so `isStale` will be false. However, in some rare cases, when Thread1 does not finish the update yet, the old data of ApplicationInfoWrapper will be load, so `isStale` will be true and it leads to deleting the entry of ApplicationInfoWrapper in KVStore: https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L656-L662 and the worst thin g is it delete the entry right after when Thread 1 has finished updating the entry with key=applicationId (`AAA`) with new value of an instance of ApplicationInfoWrapper. So the entry for the ApplicationInfoWrapper of applicationId= `AAA` is removed forever then when users access the Web UI for this application, and `Application not found` is shown up while the log for the app does exist. So here we make the `cleanAppData` method atomic just like the `addListing` method https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L1172 so that - If Thread 1 gets the lock on `listing` before Thread 2, `isStale` will be false, the app will not be removed from KVStore - If Thread 2 gets the lock on `listing` before Thread 1, then `isStale` will be true, the app will be removed from KVStore but after that when it will be added again by Thread 1. In both case, the entry for the application will not be deleted unexpectedly from KVStore. ### Why are the changes needed? Fix the bug that is happening when the HS Web UI displays `Application not found` for applications whose logs does exist. ### Does this PR introduce _any_ user-facing change? No ## How was this patch tested? Deployed in our Spark HS and the `java.util.NoSuchElementException` exception does not happen anymore. `Application not found` error does not happen anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
