mridulm commented on PR #43194:
URL: https://github.com/apache/spark/pull/43194#issuecomment-1742205990

   So looking at it more, the race condition is due to the following:
   
   * We have two jobs for the `takeAsync(999)` - but they are executed one 
after another.
   * In most cases, the ` val thirdJobId = eventually(timeout(10.seconds)) {` 
will detect the first job start and return that value.
   * Subsequently, the last eventually starts and it will immediately match 
`sc.statusTracker.getJobIdsForTag("tag2").toSet should be (Set(secondJobId, 
thirdJobId))` - since the second job for `takeAsync` has not yet started.
   
   In some corner cases, the second job for `takeAsync` would have started 
before the last `eventually` - and so now we have 3 jobs tagged with `tag2` 
instead of 2.
   
   To fix this, IMO we should do the following:
   
   ```
     val thirdJobIds = eventually(timeout(10.seconds)) {
         if (thirdJobFuture.jobIds.size != 2) throw new 
IllegalStateException("Both jobs not yet started")
         thirdJobFuture.jobIds
       }
   
    eventually(timeout(10.seconds)) {
         sc.statusTracker.getJobIdsForTag("tag1").toSet should be (
           Set(firstJobId, secondJobId))
         sc.statusTracker.getJobIdsForTag("tag2").toSet should be (
           Set(secondJobId) ++ , thirdJobIds)
       }
   ```
   
   (Paraphrasing - some such).
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to