GitHub user victor-wong opened a pull request:

    https://github.com/apache/spark/pull/19824

    Revert "[SPARK-18905][STREAMING] Fix the issue of removing a failed jobset 
from JobScheduler.jobSets"

    ## What changes were proposed in this pull request?
    
    The code changes in PR(https://github.com/apache/spark/pull/16542) make me 
very confusing:
    
https://github.com/apache/spark/blob/5a02e3a2ac8a25d92d98d3b3b0d1173dddb9cc91/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203
    
    `
    private def handleJobCompletion(job: Job, completedTime: Long) {
        val jobSet = jobSets.get(job.time)
        jobSet.handleJobCompletion(job)
        job.setEndTime(completedTime)
        
listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
        logInfo("Finished job " + job.id + " from job set of time " + 
jobSet.time)
        if (jobSet.hasCompleted) {
          listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
        }
        job.result match {
          case Failure(e) =>
            reportError("Error running job " + job, e)
          case _ =>
            if (jobSet.hasCompleted) {
              jobSets.remove(jobSet.time)
              jobGenerator.onBatchCompletion(jobSet.time)
              logInfo("Total delay: %.3f s for time %s (execution: %.3f 
s)".format(
                jobSet.totalDelay / 1000.0, jobSet.time.toString,
                jobSet.processingDelay / 1000.0
              ))
            }
        }
      }
    `
    
    If a Job failed and the JobSet containing it has completed, listenerBus 
will post a StreamingListenerBatchCompleted, while jobGenerator will not invoke 
onBatchCompletion. So the batch is completed or not ?
    
    The key point is if a Job in a Batch failed, whether or not we consider the 
Batch as completed.
    
    I think if someone register a listener on StreamingListenerBatchCompleted, 
he just wants to get notified only when the batch finishes with no error. So if 
a Job is failed, we should not remove it from its JobSet, thus the JobSet has 
not completed. 
    
    ## How was this patch tested?
    
    existing tests
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/victor-wong/spark revert-job-completion

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19824.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19824
    
----
commit aafe7b62f80ff1e86f6c528711d24e7de54536c5
Author: wangjiasheng <[email protected]>
Date:   2017-11-27T08:28:48Z

    Revert "[SPARK-18905][STREAMING] Fix the issue of removing a failed jobset 
from JobScheduler.jobSets"
    
    This reverts commit f8db8945f25cb884278ff8841bac5f6f28f0dec6.

commit e4e57cca9b0d21db8ad6292f8fcbde2dd316d7b7
Author: wangjiasheng <[email protected]>
Date:   2017-11-27T08:31:24Z

    [SPARK][STREAMING] Invoke onBatchCompletion() only when all jobs in the 
JobSet are Success

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to