Github user reggert commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9264#discussion_r44864838
  
    --- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala ---
    @@ -177,80 +150,50 @@ class SimpleFutureAction[T] private[spark](jobWaiter: 
JobWaiter[_], resultFunc:
      * takeSample. Cancellation works by setting the cancelled flag to true 
and interrupting the
      * action thread if it is being blocked by a job.
      */
    +@DeveloperApi
     class ComplexFutureAction[T] extends FutureAction[T] {
     
    -  // Pointer to the thread that is executing the action. It is set when 
the action is run.
    -  @volatile private var thread: Thread = _
    +  @volatile private var _cancelled = false
     
    -  // A flag indicating whether the future has been cancelled. This is used 
in case the future
    -  // is cancelled before the action was even run (and thus we have no 
thread to interrupt).
    -  @volatile private var _cancelled: Boolean = false
    -
    -  @volatile private var jobs: Seq[Int] = Nil
    +  @volatile private var subActions: List[FutureAction[_]] = Nil
     
       // A promise used to signal the future.
    -  private val p = promise[T]()
    +  private val p = Promise[T]()
     
    -  override def cancel(): Unit = this.synchronized {
    +  override def cancel(): Unit = synchronized {
         _cancelled = true
    -    if (thread != null) {
    -      thread.interrupt()
    -    }
    +    p.tryFailure(new SparkException("Action has been cancelled"))
    +    subActions.foreach {_.cancel()}
       }
     
       /**
        * Executes some action enclosed in the closure. To properly enable 
cancellation, the closure
        * should use runJob implementation in this promise. See takeAsync for 
example.
        */
    -  def run(func: => T)(implicit executor: ExecutionContext): this.type = {
    -    scala.concurrent.future {
    -      thread = Thread.currentThread
    -      try {
    -        p.success(func)
    -      } catch {
    -        case e: Exception => p.failure(e)
    -      } finally {
    -        // This lock guarantees when calling `thread.interrupt()` in 
`cancel`,
    -        // thread won't be set to null.
    -        ComplexFutureAction.this.synchronized {
    -          thread = null
    -        }
    -      }
    -    }
    +  def run(func: => Future[T])(implicit executor: ExecutionContext): 
this.type = {
    +    p.tryCompleteWith(func)
         this
       }
     
       /**
    -   * Runs a Spark job. This is a wrapper around the same functionality 
provided by SparkContext
    +   * Submit a job for execution and return a FutureAction holding the 
result.
    +   * This is a wrapper around the same functionality provided by 
SparkContext
        * to enable cancellation.
        */
    -  def runJob[T, U, R](
    +  def submitJob[T, U, R](
           rdd: RDD[T],
           processPartition: Iterator[T] => U,
           partitions: Seq[Int],
           resultHandler: (Int, U) => Unit,
    -      resultFunc: => R) {
    +      resultFunc: => R): FutureAction[R] = synchronized {
         // If the action hasn't been cancelled yet, submit the job. The check 
and the submitJob
         // command need to be in an atomic block.
    -    val job = this.synchronized {
    -      if (!isCancelled) {
    -        rdd.context.submitJob(rdd, processPartition, partitions, 
resultHandler, resultFunc)
    -      } else {
    -        throw new SparkException("Action has been cancelled")
    -      }
    -    }
    -
    -    this.jobs = jobs ++ job.jobIds
    -
    -    // Wait for the job to complete. If the action is cancelled (with an 
interrupt),
    -    // cancel the job and stop the execution. This is not in a 
synchronized block because
    -    // Await.ready eventually waits on the monitor in FutureJob.jobWaiter.
    -    try {
    -      Await.ready(job, Duration.Inf)
    -    } catch {
    -      case e: InterruptedException =>
    -        job.cancel()
    -        throw new SparkException("Action has been cancelled")
    +    if (!isCancelled) {
    +      val job = rdd.context.submitJob(rdd, processPartition, partitions, 
resultHandler, resultFunc)
    +      subActions = job::subActions
    --- End diff --
    
    I actually haven't seen Spark code that uses `::` at all (not that it 
doesn't exist - I just haven't looked at any files that use it).
    My understanding is that using `::` as the "cons" operator is a notation 
inherited from ML. All of the (S)ML examples I have ever seen use it without 
any spaces, though it seems that Scala examples don't necessarily follow the 
same style convention. To me, using it without the spaces reads better, since 
it usually looks like a list of elements chained together, but it doesn't 
really matter to me either way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to