Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/7385#discussion_r34538350
--- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala ---
@@ -116,57 +121,32 @@ class SimpleFutureAction[T] private[spark](jobWaiter:
JobWaiter[_], resultFunc:
}
override def ready(atMost: Duration)(implicit permit: CanAwait):
SimpleFutureAction.this.type = {
- if (!atMost.isFinite()) {
- awaitResult()
- } else jobWaiter.synchronized {
- val finishTime = System.currentTimeMillis() + atMost.toMillis
- while (!isCompleted) {
- val time = System.currentTimeMillis()
- if (time >= finishTime) {
- throw new TimeoutException
- } else {
- jobWaiter.wait(finishTime - time)
- }
- }
- }
+ // This call to the JobWaiter's future will throw an exception if the
job failed.
+ jobWaiter.toFuture.ready(atMost)(permit)
this
}
@throws(classOf[Exception])
override def result(atMost: Duration)(implicit permit: CanAwait): T = {
- ready(atMost)(permit)
- awaitResult() match {
- case scala.util.Success(res) => res
- case scala.util.Failure(e) => throw e
- }
+ // This call to the JobWaiter's future will throw an exception if the
job failed.
+ jobWaiter.toFuture.result(atMost)(permit)
+ // At this point, we know that the job succeeded so it's safe to
evaluate this function:
+ resultFunc
}
- override def onComplete[U](func: (Try[T]) => U)(implicit executor:
ExecutionContext) {
- executor.execute(new Runnable {
- override def run() {
- func(awaitResult())
- }
- })
+ override def onComplete[U](func: (Try[T]) => U)(implicit executor:
ExecutionContext): Unit = {
+ jobWaiter.toFuture.onComplete { (jobWaiterResult: Try[Unit]) =>
+ // If the job succeeded, then evaluate the result function;
otherwise, preserve the exception.
+ _value = jobWaiterResult.map(_ => resultFunc)
--- End diff --
@tdas, I think there's a bug here because we'll re-assign to `_value` if
there are multiple `onCompletes`. There's also a race in allowing `_value` to
be assigned here, since there's a lag between when the `jobWaiter` future
completes and when this callback runs. Fixing this now...
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]