Github user CodingCat commented on the pull request:
https://github.com/apache/spark/pull/1313#issuecomment-50886564
hey @mateiz I tried different implementations locally, it seems that the
failed test cases are just brought by the unnecessary delay for nopref tasks,
in the latest submission I just avoid delay for all NO_PREF tasks in
resourceOffer, if I add a condition check like if (maxLocality != NO_PREF ||
hasNodeLocalOnlyTasks), we will see
```
======
Set()
NODE_LOCAL,NODE_LOCAL
task 1, ArrayBuffer()
task 0, ArrayBuffer(TaskLocation(localhost, None))
find task in NODE_LOCAL
======
Set()
NODE_LOCAL,NODE_LOCAL
task 1, ArrayBuffer()
task 0, ArrayBuffer(TaskLocation(localhost, None))
miss task
======
Set()
NO_PREF,NODE_LOCAL
task 1, ArrayBuffer()
task 0, ArrayBuffer(TaskLocation(localhost, None))
miss task
======
Set()
ANY,NODE_LOCAL
task 1, ArrayBuffer()
task 0, ArrayBuffer(TaskLocation(localhost, None))
miss task
======
Set()
NODE_LOCAL,NODE_LOCAL
task 1, ArrayBuffer()
task 0, ArrayBuffer(TaskLocation(localhost, None))
miss task
======
Set()
NO_PREF,NODE_LOCAL
task 1, ArrayBuffer()
task 0, ArrayBuffer(TaskLocation(localhost, None))
miss task
======
Set()
ANY,NODE_LOCAL
task 1, ArrayBuffer()
task 0, ArrayBuffer(TaskLocation(localhost, None))
miss task
======
Exception in thread "pool-100-thread-1" java.lang.Error:
java.lang.InterruptedException
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:503)
at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:430)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1081)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1095)
at org.apache.spark.rdd.RDD.take(RDD.scala:1080)
at org.apache.spark.rdd.RDD.first(RDD.scala:1092)
at
org.apache.spark.streaming.StreamingContextSuite$$anonfun$14$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcVI$sp$2.apply(StreamingContextSuite.scala:170)
at
org.apache.spark.streaming.StreamingContextSuite$$anonfun$14$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcVI$sp$2.apply(StreamingContextSuite.scala:169)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
... 2 more
```
then it fails on StreamingContextSuite (stop gracefully)
```
[info] - stop gracefully *** FAILED ***
[info] org.scalatest.exceptions.TestFailedException was thrown.
(StreamingContextSuite.scala:179)
```
this is StreamingContextSuite.scala:179 and the related code:
```
input.count.foreachRDD(rdd => {
val count = rdd.first()
runningCount += count.toInt
logInfo("Count = " + count + ", Running count = " + runningCount)
})
ssc.start()
ssc.awaitTermination(500)
ssc.stop(stopSparkContext = false, stopGracefully = true)
logInfo("Running count = " + runningCount)
logInfo("TestReceiver.counter = " + TestReceiver.counter.get())
assert(runningCount > 0) // I AM Line 179
```
so, my impression is that we need to do fine granularity tracking or
discard waiting for nopref at all
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---