We made a small change in cluster scheduler to run tasks in random order
instead of round-robin and have checked all the tests turn out success.
Please review the corresponding patch attached and we would like to hear
your thoughts on our approach.

On Tue, Nov 26, 2013 at 9:53 PM, Praveen Rachabattuni <[email protected]
> wrote:

> We were to check the response time while running a sample query 50 times
> concurrently and we see the reduce operation(as per Shark Stages dashboard)
> is run on the same machine no matter if it is already occupied. We think
> better performance for concurrent querying could be achieved by random
> selection of spark worker instead in a round-robin fashion.
>
> Seems resourceOffers code part inside ClusterScheduler.scala is causing
> the round-robin execution of tasks. Would like to hear if we are headed in
> right direction.
>
> --
>  Thanks
> ,
> Praveen R
>



-- 
Thanks
,
Praveen R
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index c1e65a3..f002873 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster
 import java.nio.ByteBuffer
 import java.util.concurrent.atomic.AtomicLong
 import java.util.{TimerTask, Timer}
+import util.Random
 
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
@@ -223,7 +224,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
     for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
       do {
         launchedTask = false
-        for (i <- 0 until offers.size) {
+        for (offer <- 0 until offers.size) {
+          // Uses all offers set to get random offer,
+          // this could cause repeated offer to be used
+          val i = Random.nextInt(offers.size)
           val execId = offers(i).executorId
           val host = offers(i).host
           for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), 
maxLocality)) {

Reply via email to