mridulm commented on PR #43194:
URL: https://github.com/apache/spark/pull/43194#issuecomment-1742205990
So looking at it more, the race condition is due to the following:
* We have two jobs for the `takeAsync(999)` - but they are executed one
after another.
* In most cases, the ` val thirdJobId = eventually(timeout(10.seconds)) {`
will detect the first job start and return that value.
* Subsequently, the last eventually starts and it will immediately match
`sc.statusTracker.getJobIdsForTag("tag2").toSet should be (Set(secondJobId,
thirdJobId))` - since the second job for `takeAsync` has not yet started.
In some corner cases, the second job for `takeAsync` would have started
before the last `eventually` - and so now we have 3 jobs tagged with `tag2`
instead of 2.
To fix this, IMO we should do the following:
```
val thirdJobIds = eventually(timeout(10.seconds)) {
if (thirdJobFuture.jobIds.size != 2) throw new
IllegalStateException("Both jobs not yet started")
thirdJobFuture.jobIds
}
eventually(timeout(10.seconds)) {
sc.statusTracker.getJobIdsForTag("tag1").toSet should be (
Set(firstJobId, secondJobId))
sc.statusTracker.getJobIdsForTag("tag2").toSet should be (
Set(secondJobId) ++ , thirdJobIds)
}
```
(Paraphrasing - some such).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]