Github user CodingCat commented on the pull request:

    https://github.com/apache/spark/pull/1313#issuecomment-50886564
  
    hey @mateiz  I tried different implementations locally, it seems that the 
failed test cases are just brought by the unnecessary delay for nopref tasks, 
    
    in the latest submission I just avoid delay for all NO_PREF tasks in 
resourceOffer, if I add a condition check like if (maxLocality != NO_PREF || 
hasNodeLocalOnlyTasks), we will see
    
    ```
    ======
    Set()
    NODE_LOCAL,NODE_LOCAL
    task 1, ArrayBuffer()
    task 0, ArrayBuffer(TaskLocation(localhost, None))
    find task in NODE_LOCAL
    ======
    Set()
    NODE_LOCAL,NODE_LOCAL
    task 1, ArrayBuffer()
    task 0, ArrayBuffer(TaskLocation(localhost, None))
    miss task
    ======
    Set()
    NO_PREF,NODE_LOCAL
    task 1, ArrayBuffer()
    task 0, ArrayBuffer(TaskLocation(localhost, None))
    miss task
    ======
    Set()
    ANY,NODE_LOCAL
    task 1, ArrayBuffer()
    task 0, ArrayBuffer(TaskLocation(localhost, None))
    miss task
    ======
    Set()
    NODE_LOCAL,NODE_LOCAL
    task 1, ArrayBuffer()
    task 0, ArrayBuffer(TaskLocation(localhost, None))
    miss task
    ======
    Set()
    NO_PREF,NODE_LOCAL
    task 1, ArrayBuffer()
    task 0, ArrayBuffer(TaskLocation(localhost, None))
    miss task
    ======
    Set()
    ANY,NODE_LOCAL
    task 1, ArrayBuffer()
    task 0, ArrayBuffer(TaskLocation(localhost, None))
    miss task
    ======
    Exception in thread "pool-100-thread-1" java.lang.Error: 
java.lang.InterruptedException
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
    Caused by: java.lang.InterruptedException
        at java.lang.Object.wait(Native Method)
        at java.lang.Object.wait(Object.java:503)
        at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:430)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1062)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1081)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1095)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1080)
        at org.apache.spark.rdd.RDD.first(RDD.scala:1092)
        at 
org.apache.spark.streaming.StreamingContextSuite$$anonfun$14$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcVI$sp$2.apply(StreamingContextSuite.scala:170)
        at 
org.apache.spark.streaming.StreamingContextSuite$$anonfun$14$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcVI$sp$2.apply(StreamingContextSuite.scala:169)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
        at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
        at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
        at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        ... 2 more
    
    ```
    
    then it fails on StreamingContextSuite (stop gracefully)
    
    ```
    [info] - stop gracefully *** FAILED ***
    [info]   org.scalatest.exceptions.TestFailedException was thrown. 
(StreamingContextSuite.scala:179)
    ```
    
    this is StreamingContextSuite.scala:179 and the related code:
    
    ```
    input.count.foreachRDD(rdd => {
            val count = rdd.first()
            runningCount += count.toInt
            logInfo("Count = " + count + ", Running count = " + runningCount)
          })
          ssc.start()
          ssc.awaitTermination(500)
          ssc.stop(stopSparkContext = false, stopGracefully = true)
          logInfo("Running count = " + runningCount)
          logInfo("TestReceiver.counter = " + TestReceiver.counter.get())
          assert(runningCount > 0) // I AM Line 179
    ```
    so, my impression is that we need to do fine granularity tracking or 
discard waiting for nopref at all


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to