tanvn opened a new pull request, #36424:
URL: https://github.com/apache/spark/pull/36424

   ### What changes were proposed in this pull request?
   make `cleanAppData` atomic to prevent race condition between update and 
clean app data. 
   When the race condition happens, ApplicationInfoWrapper for an application 
right after it has been updated by `mergeApplicationListing`.
   So there will be cases when the HS Web UI displays `Application not found` 
for applications whose logs does exist.
   
   #### Background
   Currently, the HS runs the `checkForLogs` to build the application list 
based on the current contents of the log directory for every 10 seconds by 
default.
   - 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L296-L299
   - 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L472
   
   In each turn of execution, this method scans the specified logDir and parse 
the log files to update its KVStore:
   - detect new updated/added files to process : 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L574-L578
   - detect stale data to remove: 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L586-L600
   These 2 operations are executed in different threads as 
`submitLogProcessTask` uses `replayExecutor` to submit tasks.
   
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L1389-L1401
   
   ### When does the bug happen?
   `Application not found` error happens in the following scenario:
   In the first run of `checkForLogs`, it detected a newly-added log 
`viewfs://iu/log/spark3/AAA_1.inprogress` (log of an in-progress application 
named AAA). So it will add 2 entries to the KVStore
   - one entry of key-value as the key is the logPath 
(`viewfs://iu/log/spark3/AAA_1.inprogress`)  and the value is an instance of 
LogInfo represented the log
     -  
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L495-L505
     - 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L545-L552
   - one entry of key-value as the key is the applicationId (`AAA`) and  the 
value is an instance of ApplicationInfoWrapper holding the information of the 
application.
     - 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L825
     - 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L1172
   
   In the next run of `checkForLogs`, now the AAA application has finished, the 
log `viewfs://iu/log/spark3/AAA_1.inprogress` has been deleted and a new log 
`viewfs://iu/log/spark3/AAA_1` is created. So  `checkForLogs` will do the 
following 2 things in 2 different threads:
   - Thread 1: parsing the new log `viewfs://iu/log/spark3/AAA_1` and update 
data in its KVStore
     - add a new entry of key: `viewfs://iu/log/spark3/AAA_1` and value: an 
instance of LogInfo represented the log
     - updated the entry with key=applicationId (`AAA`)  with new value of  an 
instance of ApplicationInfoWrapper (for example: the isCompleted flag now 
change from false to true)
   - Thread 2:  data related to `viewfs://iu/log/spark3/AAA_1.inprogress` is 
now considered as stale and it must be deleted.
     - clean App data for `viewfs://iu/log/spark3/AAA_1.inprogress` 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L586-L600
     - Inside, `cleanAppData`, first it loads the latest information of 
`ApplicationInfoWrapper` from the KVStore: 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L632
  For most of the time, when this line is executed, Thread 1 already finished 
`updating the entry with key=applicationId (AAA)  with new value of  an 
instance of ApplicationInfoWrapper` so this condition 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L637
 will be evaluated as false, so `isStale` will be false. However, in some rare 
cases, when Thread1 does not finish the update yet, the old data of 
ApplicationInfoWrapper will be load, so `isStale` will be true and it leads to 
deleting the entry of ApplicationInfoWrapper in KVStore: 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L656-L662
 and the worst thin
 g is it delete the entry right after when Thread 1 has finished updating the 
entry with key=applicationId (`AAA`)  with new value of  an instance of 
ApplicationInfoWrapper. So the entry for the ApplicationInfoWrapper of 
applicationId= `AAA` is removed forever then when users access the Web UI for 
this application, and `Application not found` is shown up while the log for the 
app does exist.
   
   So here we make the `cleanAppData` method atomic just like the `addListing` 
method 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L1172
 so that
   - If Thread 1 gets the lock on `listing` before Thread 2, `isStale` will be 
false, the app will not be removed from KVStore
   - If Thread 2 gets the lock on `listing` before Thread 1, then `isStale` 
will be true, the app will be removed from KVStore but after that when it will 
be added again by Thread 1.
   In both case, the entry for the application will not be deleted unexpectedly 
from KVStore.
   
   
   ### Why are the changes needed?
   Fix the bug that is happening when the HS Web UI displays `Application not 
found` for applications whose logs does exist.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ## How was this patch tested?
   Deployed in our Spark HS and the `java.util.NoSuchElementException` 
exception does not happen anymore.
   `Application not found` error does not happen anymore.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to