We made a small change in cluster scheduler to run tasks in random order
instead of round-robin and have checked all the tests turn out success.
Please review the corresponding patch attached and we would like to hear
your thoughts on our approach.
On Tue, Nov 26, 2013 at 9:53 PM, Praveen Rachabattuni <[email protected]
> wrote:
> We were to check the response time while running a sample query 50 times
> concurrently and we see the reduce operation(as per Shark Stages dashboard)
> is run on the same machine no matter if it is already occupied. We think
> better performance for concurrent querying could be achieved by random
> selection of spark worker instead in a round-robin fashion.
>
> Seems resourceOffers code part inside ClusterScheduler.scala is causing
> the round-robin execution of tasks. Would like to hear if we are headed in
> right direction.
>
> --
> Thanks
> ,
> Praveen R
>
--
Thanks
,
Praveen R
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index c1e65a3..f002873 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
import java.util.{TimerTask, Timer}
+import util.Random
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
@@ -223,7 +224,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
do {
launchedTask = false
- for (i <- 0 until offers.size) {
+ for (offer <- 0 until offers.size) {
+ // Uses all offers set to get random offer,
+ // this could cause repeated offer to be used
+ val i = Random.nextInt(offers.size)
val execId = offers(i).executorId
val host = offers(i).host
for (task <- taskSet.resourceOffer(execId, host, availableCpus(i),
maxLocality)) {