[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2018-10-21 Thread zhzhan
Github user zhzhan closed the pull request at:

https://github.com/apache/spark/pull/15541


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-11-01 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r85985739
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+private[scheduler] class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+
+  def assignTask(task: TaskDescription, cpu: Int): Unit = {
+if (coresAvailable < cpu) {
+  throw new SparkException(s"Available cores are less than cpu per 
task" +
+s" ($coresAvailable < $cpu)")
+}
+tasks += task
+coresAvailable -= cpu
+  }
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * performs task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes multiple rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invokes the init()
+ * of TaskAssigner to initialize the data structure for the round.
+ *
+ * Third, when performing real task assignment, hasNext/next() is used by 
TaskScheduler
+ * to check the worker availability and retrieve current offering from 
TaskAssigner.
+ *
+ * Fourth, TaskScheduler calls offerAccepted() to notify the TaskAssigner 
so that
+ * TaskAssigner can decide whether the current offer is valid or not for 
the next request.
+ *
+ * Fifth, after task assignment is done, TaskScheduler invokes the 
function tasks to
+ * retrieve all the task assignment information.
+ */
+
+private[scheduler] sealed abstract class TaskAssigner {
+  protected var offer: Seq[OfferState] = _
+  protected var cpuPerTask = 1
+
+  protected def withCpuPerTask(cpuPerTask: Int): TaskAssigner = {
+this.cpuPerTask = cpuPerTask
+this
+  }
+
+  /** The currently assigned offers. */
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  /**
+   * Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+   * By default, offers is randomly shuffled to avoid always placing tasks 
on the same set of
+   * workers.
+   */
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = Random.shuffle(workOffer.map(o => new OfferState(o)))
+  }
+
+  /** Invoked at each round of Taskset assignment to initialize the 
internal structure. */
+  def init(): Unit
+

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-11-01 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r85924864
  
--- Diff: docs/configuration.md ---
@@ -1350,6 +1350,20 @@ Apart from these, the following properties are also 
available, and may be useful
 Should be greater than or equal to 1. Number of allowed retries = this 
value - 1.
   
 
+
+  spark.scheduler.taskAssigner
+  roundrobin
+  
+The strategy of how to allocate tasks among workers with free cores. 
Three task
+assigners (roundrobin, packed, and balanced) are supported currently. 
By default, roundrobin
--- End diff --

Nit: I suggest double quote the keywords "roundrobin", "packed", and 
"balanced" in this paragraph. E.g. `the "balanced" task assigner` sounds better 
to me than `the balanced task assigner`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-11-01 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r85956857
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -305,12 +307,8 @@ private[spark] class TaskSchedulerImpl(
 hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
   }
 }
+taskAssigner.construct(offers)
--- End diff --

The comments of the `resourceOffers` method shoud be updated. It still says 
`We fill each node with tasks in a round-robin manner so that tasks are 
balanced across the cluster.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-11-01 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r85882792
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+private[scheduler] class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+
+  def assignTask(task: TaskDescription, cpu: Int): Unit = {
+if (coresAvailable < cpu) {
+  throw new SparkException(s"Available cores are less than cpu per 
task" +
+s" ($coresAvailable < $cpu)")
+}
+tasks += task
+coresAvailable -= cpu
+  }
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * performs task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes multiple rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invokes the init()
+ * of TaskAssigner to initialize the data structure for the round.
+ *
+ * Third, when performing real task assignment, hasNext/next() is used by 
TaskScheduler
+ * to check the worker availability and retrieve current offering from 
TaskAssigner.
+ *
+ * Fourth, TaskScheduler calls offerAccepted() to notify the TaskAssigner 
so that
+ * TaskAssigner can decide whether the current offer is valid or not for 
the next request.
+ *
+ * Fifth, after task assignment is done, TaskScheduler invokes the 
function tasks to
+ * retrieve all the task assignment information.
+ */
+
+private[scheduler] sealed abstract class TaskAssigner {
+  protected var offer: Seq[OfferState] = _
+  protected var cpuPerTask = 1
+
+  protected def withCpuPerTask(cpuPerTask: Int): TaskAssigner = {
+this.cpuPerTask = cpuPerTask
+this
+  }
+
+  /** The currently assigned offers. */
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  /**
+   * Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+   * By default, offers is randomly shuffled to avoid always placing tasks 
on the same set of
+   * workers.
+   */
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = Random.shuffle(workOffer.map(o => new OfferState(o)))
+  }
+
+  /** Invoked at each round of Taskset assignment to initialize the 
internal structure. */
+  def init(): Unit
+
 

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84621271
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -250,24 +251,24 @@ private[spark] class TaskSchedulerImpl(
   private def resourceOfferSingleTaskSet(
   taskSet: TaskSetManager,
   maxLocality: TaskLocality,
-  shuffledOffers: Seq[WorkerOffer],
-  availableCpus: Array[Int],
-  tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
+  taskAssigner: TaskAssigner) : Boolean = {
 var launchedTask = false
-for (i <- 0 until shuffledOffers.size) {
-  val execId = shuffledOffers(i).executorId
-  val host = shuffledOffers(i).host
-  if (availableCpus(i) >= CPUS_PER_TASK) {
+taskAssigner.init()
+while (taskAssigner.hasNext) {
+  var isAccepted = false
+  val currentOffer = taskAssigner.next()
+  val execId = currentOffer.workOffer.executorId
+  val host = currentOffer.workOffer.host
+  if (currentOffer.coresAvailable >= CPUS_PER_TASK) {
 try {
   for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
-tasks(i) += task
+currentOffer.assignTask(task, CPUS_PER_TASK)
 val tid = task.taskId
 taskIdToTaskSetManager(tid) = taskSet
 taskIdToExecutorId(tid) = execId
 executorIdToTaskCount(execId) += 1
-availableCpus(i) -= CPUS_PER_TASK
-assert(availableCpus(i) >= 0)
--- End diff --

@zhzhan ah, thanks for explanation! I don't notice it returns an Option. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-23 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84621076
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -250,24 +251,24 @@ private[spark] class TaskSchedulerImpl(
   private def resourceOfferSingleTaskSet(
   taskSet: TaskSetManager,
   maxLocality: TaskLocality,
-  shuffledOffers: Seq[WorkerOffer],
-  availableCpus: Array[Int],
-  tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
+  taskAssigner: TaskAssigner) : Boolean = {
 var launchedTask = false
-for (i <- 0 until shuffledOffers.size) {
-  val execId = shuffledOffers(i).executorId
-  val host = shuffledOffers(i).host
-  if (availableCpus(i) >= CPUS_PER_TASK) {
+taskAssigner.init()
+while (taskAssigner.hasNext) {
+  var isAccepted = false
+  val currentOffer = taskAssigner.next()
+  val execId = currentOffer.workOffer.executorId
+  val host = currentOffer.workOffer.host
+  if (currentOffer.coresAvailable >= CPUS_PER_TASK) {
 try {
   for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
-tasks(i) += task
+currentOffer.assignTask(task, CPUS_PER_TASK)
 val tid = task.taskId
 taskIdToTaskSetManager(tid) = taskSet
 taskIdToExecutorId(tid) = execId
 executorIdToTaskCount(execId) += 1
-availableCpus(i) -= CPUS_PER_TASK
-assert(availableCpus(i) >= 0)
--- End diff --

@viirya The assert will not fail even in the legacy code, because  
taskSet.resourceOffer(execId, host, maxLocality) return an Option. The for loop 
at most run once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-23 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84619879
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -250,24 +251,24 @@ private[spark] class TaskSchedulerImpl(
   private def resourceOfferSingleTaskSet(
   taskSet: TaskSetManager,
   maxLocality: TaskLocality,
-  shuffledOffers: Seq[WorkerOffer],
-  availableCpus: Array[Int],
-  tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
+  taskAssigner: TaskAssigner) : Boolean = {
 var launchedTask = false
-for (i <- 0 until shuffledOffers.size) {
-  val execId = shuffledOffers(i).executorId
-  val host = shuffledOffers(i).host
-  if (availableCpus(i) >= CPUS_PER_TASK) {
+taskAssigner.init()
+while (taskAssigner.hasNext) {
+  var isAccepted = false
+  val currentOffer = taskAssigner.next()
+  val execId = currentOffer.workOffer.executorId
+  val host = currentOffer.workOffer.host
+  if (currentOffer.coresAvailable >= CPUS_PER_TASK) {
 try {
   for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
-tasks(i) += task
+currentOffer.assignTask(task, CPUS_PER_TASK)
 val tid = task.taskId
 taskIdToTaskSetManager(tid) = taskSet
 taskIdToExecutorId(tid) = execId
 executorIdToTaskCount(execId) += 1
-availableCpus(i) -= CPUS_PER_TASK
-assert(availableCpus(i) >= 0)
--- End diff --

Thanks @viirya for the comments. Actually I was thinking removing the check 
although it is part of the legacy code. Now the check is moved into OfferState, 
which makes more sense. IMHO, typically the assertion should never fail. But 
from the OfferState's perspective, it should guarantee such restriction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-23 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84619023
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+private[scheduler] class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+
+  def assignTask(task: TaskDescription, cpu: Int): Unit = {
+tasks += task
+coresAvailable -= cpu
+assert(coresAvailable >= 0)
+  }
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * performs task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes multiple rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invokes the init()
+ * of TaskAssigner to initialize the data structure for the round.
+ *
+ * Third, when performing real task assignment, hasNext/next() is used by 
TaskScheduler
+ * to check the worker availability and retrieve current offering from 
TaskAssigner.
+ *
+ * Fourth, TaskScheduler calls offerAccepted() to notify the TaskAssigner 
so that
+ * TaskAssigner can decide whether the current offer is valid or not for 
the next request.
+ *
+ * Fifth, after task assignment is done, TaskScheduler invokes the 
function tasks to
+ * retrieve all the task assignment information.
+ */
+
+private[scheduler] sealed abstract class TaskAssigner {
+  protected var offer: Seq[OfferState] = _
+  protected var cpuPerTask = 1
+
+  protected def withCpuPerTask(cpuPerTask: Int): TaskAssigner = {
+this.cpuPerTask = cpuPerTask
--- End diff --

You mean cpuPerTask >= 1? I don't  think we need this check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84618189
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+private[scheduler] class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+
+  def assignTask(task: TaskDescription, cpu: Int): Unit = {
+tasks += task
+coresAvailable -= cpu
+assert(coresAvailable >= 0)
+  }
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * performs task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes multiple rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invokes the init()
+ * of TaskAssigner to initialize the data structure for the round.
+ *
+ * Third, when performing real task assignment, hasNext/next() is used by 
TaskScheduler
+ * to check the worker availability and retrieve current offering from 
TaskAssigner.
+ *
+ * Fourth, TaskScheduler calls offerAccepted() to notify the TaskAssigner 
so that
+ * TaskAssigner can decide whether the current offer is valid or not for 
the next request.
+ *
+ * Fifth, after task assignment is done, TaskScheduler invokes the 
function tasks to
+ * retrieve all the task assignment information.
+ */
+
+private[scheduler] sealed abstract class TaskAssigner {
+  protected var offer: Seq[OfferState] = _
+  protected var cpuPerTask = 1
+
+  protected def withCpuPerTask(cpuPerTask: Int): TaskAssigner = {
+this.cpuPerTask = cpuPerTask
--- End diff --

Do we need to add sanity check for `cpuPerTask`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-23 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84616602
  
--- Diff: docs/configuration.md ---
@@ -1350,6 +1350,20 @@ Apart from these, the following properties are also 
available, and may be useful
 Should be greater than or equal to 1. Number of allowed retries = this 
value - 1.
   
 
+
+  spark.scheduler.taskAssigner
+  roundrobin
+  
+The strategy of how to allocate tasks among workers with free cores. 
Three task
+assigners (roundrobin, packed, and balanced) are supported currently. 
By default, roundrobin
+with randomness is used to allocate task to workers with available 
cores in a
--- End diff --

`allocate task` -> `allocate tasks`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-23 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84616347
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+private[scheduler] class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+
+  def assignTask(task: TaskDescription, cpu: Int): Unit = {
+tasks += task
+coresAvailable -= cpu
+assert(coresAvailable >= 0)
+  }
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * performs task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes multiple rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invokes the init()
+ * of TaskAssigner to initialize the data structure for the round.
+ *
+ * Third, when performing real task assignment, hasNext/next() is used by 
TaskScheduler
+ * to check the worker availability and retrieve current offering from 
TaskAssigner.
+ *
+ * Fourth, TaskScheduler calls offerAccepted() to notify the TaskAssigner 
so that
+ * TaskAssigner can decide whether the current offer is valid or not for 
the next request.
+ *
+ * Fifth, after task assignment is done, TaskScheduler invokes the 
function tasks to
+ * retrieve all the task assignment information.
+ */
+
+private[scheduler] sealed abstract class TaskAssigner {
+  protected var offer: Seq[OfferState] = _
+  protected var cpuPerTask = 1
+
+  protected def withCpuPerTask(cpuPerTask: Int): TaskAssigner = {
+this.cpuPerTask = cpuPerTask
+this
+  }
+
+  /** The currently assigned offers. */
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  /**
+   * Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+   * By default, offers is randomly shuffled to avoid always placing tasks 
on the same set of
+   * workers.
+   */
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = Random.shuffle(workOffer.map(o => new OfferState(o)))
+  }
+
+  /** Invoked at each round of Taskset assignment to initialize the 
internal structure. */
+  def init(): Unit
+
+  /**
+   * Tests whether there is offer available to be used inside of one round 
of Taskset assignment.
+   * @return  `true` if a subseq

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84591196
  
--- Diff: docs/configuration.md ---
@@ -1342,6 +1342,20 @@ Apart from these, the following properties are also 
available, and may be useful
 Should be greater than or equal to 1. Number of allowed retries = this 
value - 1.
   
 
+
+  spark.scheduler.taskAssigner
+  roundrobin
+  
+The strategy of how to allocate tasks among workers with free cores. 
There are three task
+assigners (roundrobin, packed, and balanced) are supported currently. 
By default, roundrobin
+with randomness is used, which tries to allocate task to workers with 
available cores in
+roundrobin manner. The packed task assigner tries to allocate tasks to 
workers with the least
+free cores, resulting in tasks assigned to few workers, which may help 
driver to release the
+reserved idle workers when dynamic 
allocation(spark.dynamicAllocation.enabled) is enabled.
+The balanced task assigner tries to assign tasks across workers in a 
balance way (allocating
--- End diff --

`balance ` -> `balanced `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84591185
  
--- Diff: docs/configuration.md ---
@@ -1342,6 +1342,20 @@ Apart from these, the following properties are also 
available, and may be useful
 Should be greater than or equal to 1. Number of allowed retries = this 
value - 1.
   
 
+
+  spark.scheduler.taskAssigner
+  roundrobin
+  
+The strategy of how to allocate tasks among workers with free cores. 
There are three task
+assigners (roundrobin, packed, and balanced) are supported currently. 
By default, roundrobin
+with randomness is used, which tries to allocate task to workers with 
available cores in
+roundrobin manner. The packed task assigner tries to allocate tasks to 
workers with the least
+free cores, resulting in tasks assigned to few workers, which may help 
driver to release the
+reserved idle workers when dynamic 
allocation(spark.dynamicAllocation.enabled) is enabled.
+The balanced task assigner tries to assign tasks across workers in a 
balance way (allocating
+tasks to workers with most free cores).
--- End diff --

`most` -> `the most`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84591178
  
--- Diff: docs/configuration.md ---
@@ -1342,6 +1342,20 @@ Apart from these, the following properties are also 
available, and may be useful
 Should be greater than or equal to 1. Number of allowed retries = this 
value - 1.
   
 
+
+  spark.scheduler.taskAssigner
+  roundrobin
+  
+The strategy of how to allocate tasks among workers with free cores. 
There are three task
+assigners (roundrobin, packed, and balanced) are supported currently. 
By default, roundrobin
+with randomness is used, which tries to allocate task to workers with 
available cores in
+roundrobin manner. The packed task assigner tries to allocate tasks to 
workers with the least
+free cores, resulting in tasks assigned to few workers, which may help 
driver to release the
--- End diff --

`few` -> `fewer`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84591169
  
--- Diff: docs/configuration.md ---
@@ -1342,6 +1342,20 @@ Apart from these, the following properties are also 
available, and may be useful
 Should be greater than or equal to 1. Number of allowed retries = this 
value - 1.
   
 
+
+  spark.scheduler.taskAssigner
+  roundrobin
+  
+The strategy of how to allocate tasks among workers with free cores. 
There are three task
+assigners (roundrobin, packed, and balanced) are supported currently. 
By default, roundrobin
+with randomness is used, which tries to allocate task to workers with 
available cores in
--- End diff --

`is used, which tries to allocate task` -> is used to allocate tasks`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84591160
  
--- Diff: docs/configuration.md ---
@@ -1342,6 +1342,20 @@ Apart from these, the following properties are also 
available, and may be useful
 Should be greater than or equal to 1. Number of allowed retries = this 
value - 1.
   
 
+
+  spark.scheduler.taskAssigner
+  roundrobin
+  
+The strategy of how to allocate tasks among workers with free cores. 
There are three task
+assigners (roundrobin, packed, and balanced) are supported currently. 
By default, roundrobin
+with randomness is used, which tries to allocate task to workers with 
available cores in
+roundrobin manner. The packed task assigner tries to allocate tasks to 
workers with the least
--- End diff --

`in roundrobin manner` ->  `in a round-robin manner `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84591137
  
--- Diff: docs/configuration.md ---
@@ -1342,6 +1342,20 @@ Apart from these, the following properties are also 
available, and may be useful
 Should be greater than or equal to 1. Number of allowed retries = this 
value - 1.
   
 
+
+  spark.scheduler.taskAssigner
+  roundrobin
+  
+The strategy of how to allocate tasks among workers with free cores. 
There are three task
--- End diff --

`There are three task` -> `Three task`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84591093
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -305,12 +309,8 @@ private[spark] class TaskSchedulerImpl(
 hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
   }
 }
+taskAssigner.construct(offers)
 
-// Randomly shuffle offers to avoid always placing tasks on the same 
set of workers.
--- End diff --

Could we add this comment to [the code in 
`construct`](https://github.com/zhzhan/spark/blob/dd2b2077430bbb07047e928d20c1ad8fe940827a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala#L79)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84591073
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -250,24 +251,26 @@ private[spark] class TaskSchedulerImpl(
   private def resourceOfferSingleTaskSet(
   taskSet: TaskSetManager,
   maxLocality: TaskLocality,
-  shuffledOffers: Seq[WorkerOffer],
-  availableCpus: Array[Int],
-  tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
+  taskAssigner: TaskAssigner) : Boolean = {
 var launchedTask = false
-for (i <- 0 until shuffledOffers.size) {
-  val execId = shuffledOffers(i).executorId
-  val host = shuffledOffers(i).host
-  if (availableCpus(i) >= CPUS_PER_TASK) {
+taskAssigner.init()
+while (taskAssigner.hasNext) {
+  var assigned = false
+  val currentOffer = taskAssigner.next()
+  val execId = currentOffer.workOffer.executorId
+  val host = currentOffer.workOffer.host
+  if (currentOffer.coresAvailable >= CPUS_PER_TASK) {
 try {
   for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
-tasks(i) += task
+currentOffer.tasks += task
 val tid = task.taskId
 taskIdToTaskSetManager(tid) = taskSet
 taskIdToExecutorId(tid) = execId
 executorIdToTaskCount(execId) += 1
-availableCpus(i) -= CPUS_PER_TASK
-assert(availableCpus(i) >= 0)
+currentOffer.coresAvailable -= CPUS_PER_TASK
+assert(currentOffer.coresAvailable >= 0)
--- End diff --

Could we add a function in `class OfferState` for including the following 
three lines of codes? 
```Scala
currentOffer.tasks += task
currentOffer.coresAvailable -= CPUS_PER_TASK
assert(currentOffer.coresAvailable >= 0)
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84590978
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+private[scheduler] class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * performs task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invokes the init()
+ * of TaskAssigner to initialize the data structure for the round.
+ *
+ * Third, when performing real task assignment, hasNext()/getNext() is 
used by TaskScheduler
+ * to check the worker availability and retrieve current offering from 
TaskAssigner.
+ *
+ * Fourth, then offerAccepted is used by TaskScheduler to notify the 
TaskAssigner so that
+ * TaskAssigner can decide whether the current offer is valid or not for 
the next request.
+ *
+ * Fifth, After task assignment is done, TaskScheduler invokes the tasks() 
to
+ * retrieve all the task assignment information.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  protected var offer: Seq[OfferState] = _
+  protected var cpuPerTask = 1
+
+  protected def withCpuPerTask(cpuPerTask: Int): Unit = {
+this.cpuPerTask = cpuPerTask
+  }
+
+  /** The final assigned offer returned to TaskScheduler. */
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  /** Invoked at the beginning of resource offering to construct the offer 
with the workoffers. */
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = Random.shuffle(workOffer.map(o => new OfferState(o)))
+  }
+
+  /** Invoked at each round of Taskset assignment to initialize the 
internal structure. */
+  def init(): Unit
+
+  /**
+   * Tests whether there is offer available to be used inside of one round 
of Taskset assignment.
+   * @return  `true` if a subsequent call to `next` will yield an element,
+   *  `false` otherwise.
+   */
+  def hasNext: Boolean
+
+  /**
+   * Produces next worker offer based on the task assignment strategy.
+   * @return  the next available offer, if `hasNext` is `true`,
+   *  undefi

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84590948
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+private[scheduler] class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * performs task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invokes the init()
+ * of TaskAssigner to initialize the data structure for the round.
+ *
+ * Third, when performing real task assignment, hasNext()/getNext() is 
used by TaskScheduler
+ * to check the worker availability and retrieve current offering from 
TaskAssigner.
+ *
+ * Fourth, then offerAccepted is used by TaskScheduler to notify the 
TaskAssigner so that
+ * TaskAssigner can decide whether the current offer is valid or not for 
the next request.
+ *
+ * Fifth, After task assignment is done, TaskScheduler invokes the tasks() 
to
+ * retrieve all the task assignment information.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  protected var offer: Seq[OfferState] = _
+  protected var cpuPerTask = 1
+
+  protected def withCpuPerTask(cpuPerTask: Int): Unit = {
+this.cpuPerTask = cpuPerTask
+  }
+
+  /** The final assigned offer returned to TaskScheduler. */
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  /** Invoked at the beginning of resource offering to construct the offer 
with the workoffers. */
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = Random.shuffle(workOffer.map(o => new OfferState(o)))
+  }
+
+  /** Invoked at each round of Taskset assignment to initialize the 
internal structure. */
+  def init(): Unit
+
+  /**
+   * Tests whether there is offer available to be used inside of one round 
of Taskset assignment.
+   * @return  `true` if a subsequent call to `next` will yield an element,
+   *  `false` otherwise.
+   */
+  def hasNext: Boolean
+
+  /**
+   * Produces next worker offer based on the task assignment strategy.
+   * @return  the next available offer, if `hasNext` is `true`,
+   *  undefi

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84590835
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+private[scheduler] class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * performs task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invokes the init()
+ * of TaskAssigner to initialize the data structure for the round.
+ *
+ * Third, when performing real task assignment, hasNext()/getNext() is 
used by TaskScheduler
+ * to check the worker availability and retrieve current offering from 
TaskAssigner.
+ *
+ * Fourth, then offerAccepted is used by TaskScheduler to notify the 
TaskAssigner so that
+ * TaskAssigner can decide whether the current offer is valid or not for 
the next request.
+ *
+ * Fifth, After task assignment is done, TaskScheduler invokes the tasks() 
to
+ * retrieve all the task assignment information.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  protected var offer: Seq[OfferState] = _
+  protected var cpuPerTask = 1
+
+  protected def withCpuPerTask(cpuPerTask: Int): Unit = {
+this.cpuPerTask = cpuPerTask
+  }
+
+  /** The final assigned offer returned to TaskScheduler. */
--- End diff --

Here, the implementation of this function does not guarantee the offer 
assignment is final or not. Thus, maybe we can change the description to `The 
currently assigned offers`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84590773
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+private[scheduler] class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * performs task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invokes the init()
+ * of TaskAssigner to initialize the data structure for the round.
+ *
+ * Third, when performing real task assignment, hasNext()/getNext() is 
used by TaskScheduler
+ * to check the worker availability and retrieve current offering from 
TaskAssigner.
+ *
+ * Fourth, then offerAccepted is used by TaskScheduler to notify the 
TaskAssigner so that
+ * TaskAssigner can decide whether the current offer is valid or not for 
the next request.
+ *
+ * Fifth, After task assignment is done, TaskScheduler invokes the tasks() 
to
--- End diff --

`After` -> `after`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84590747
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+private[scheduler] class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * performs task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invokes the init()
+ * of TaskAssigner to initialize the data structure for the round.
+ *
+ * Third, when performing real task assignment, hasNext()/getNext() is 
used by TaskScheduler
--- End diff --

`getNext()` -> `next()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84590726
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+private[scheduler] class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * performs task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invokes the init()
+ * of TaskAssigner to initialize the data structure for the round.
+ *
+ * Third, when performing real task assignment, hasNext()/getNext() is 
used by TaskScheduler
--- End diff --

`hasNext()` -> `hasNext`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84590702
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+private[scheduler] class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * performs task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invokes the init()
+ * of TaskAssigner to initialize the data structure for the round.
+ *
+ * Third, when performing real task assignment, hasNext()/getNext() is 
used by TaskScheduler
+ * to check the worker availability and retrieve current offering from 
TaskAssigner.
+ *
+ * Fourth, then offerAccepted is used by TaskScheduler to notify the 
TaskAssigner so that
+ * TaskAssigner can decide whether the current offer is valid or not for 
the next request.
+ *
+ * Fifth, After task assignment is done, TaskScheduler invokes the tasks() 
to
+ * retrieve all the task assignment information.
+ */
+
+private[scheduler] abstract class TaskAssigner {
--- End diff --

`abstract class` -> `sealed abstract class`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84590664
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+private[scheduler] class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * performs task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
--- End diff --

`a number of` -> `multiple`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-20 Thread wangmiao1981
Github user wangmiao1981 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84424097
  
--- Diff: docs/configuration.md ---
@@ -1342,6 +1342,20 @@ Apart from these, the following properties are also 
available, and may be useful
 Should be greater than or equal to 1. Number of allowed retries = this 
value - 1.
   
 
+
+  spark.scheduler.taskAssigner
+  roundrobin
+  
+The strategy of how to allocate tasks among workers with free cores. 
There are three task
+assigners (roundrobin, packed, and balanced) are supported currently. 
By default, roundrobin
+with randomness is used, which tries to allocate task to workers with 
available cores in
+roundrobin manner.The packed task assigner tries to allocate tasks to 
workers with the least
--- End diff --

missed space between . and `The`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-20 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84424034
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * perform task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invoke the init()
+ * of TaskAssigner to initialize the data structure for the round.
+ *
+ * Third, when performing real task assignment, hasNext()/getNext() is 
used by TaskScheduler
+ * to check the worker availability and retrieve current offering from 
TaskAssigner.
+ *
+ * Fourth, then offerAccepted is used by TaskScheduler to notify the 
TaskAssigner so that
+ * TaskAssigner can decide whether the current offer is valid or not for 
the next request.
+ *
+ * Fifth, After task assignment is done, TaskScheduler invokes the tasks() 
to
+ * retrieve all the task assignment information.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  protected var offer: Seq[OfferState] = _
+  protected var cpuPerTask = 1
+
+  protected def withCpuPerTask(cpuPerTask: Int): Unit = {
+this.cpuPerTask = cpuPerTask
+  }
+
+  /** The final assigned offer returned to TaskScheduler. */
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  /** Invoked at the beginning of resource offering to construct the offer 
with the workoffers. */
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = Random.shuffle(workOffer.map(o => new OfferState(o)))
+  }
+
+  /** Invoked at each round of Taskset assignment to initialize the 
internal structure. */
+  def init(): Unit
+
+  /**
+   * Tests Whether there is offer available to be used inside of one round 
of Taskset assignment.
+   *  @return  `true` if a subsequent call to `next` will yield an element,
+   *   `false` otherwise.
+   */
+  def hasNext: Boolean
+
+  /**
+   * Produces next worker offer based on the task assignment strategy.
+   * @return  the next available offer, if `hasNext` is `true`,
+   *  undefined behavior otherwise.

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-20 Thread wangmiao1981
Github user wangmiao1981 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84423237
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * perform task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invoke the init()
+ * of TaskAssigner to initialize the data structure for the round.
+ *
+ * Third, when performing real task assignment, hasNext()/getNext() is 
used by TaskScheduler
+ * to check the worker availability and retrieve current offering from 
TaskAssigner.
+ *
+ * Fourth, then offerAccepted is used by TaskScheduler to notify the 
TaskAssigner so that
+ * TaskAssigner can decide whether the current offer is valid or not for 
the next request.
+ *
+ * Fifth, After task assignment is done, TaskScheduler invokes the tasks() 
to
+ * retrieve all the task assignment information.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  protected var offer: Seq[OfferState] = _
+  protected var cpuPerTask = 1
+
+  protected def withCpuPerTask(cpuPerTask: Int): Unit = {
+this.cpuPerTask = cpuPerTask
+  }
+
+  /** The final assigned offer returned to TaskScheduler. */
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  /** Invoked at the beginning of resource offering to construct the offer 
with the workoffers. */
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = Random.shuffle(workOffer.map(o => new OfferState(o)))
+  }
+
+  /** Invoked at each round of Taskset assignment to initialize the 
internal structure. */
+  def init(): Unit
+
+  /**
+   * Tests Whether there is offer available to be used inside of one round 
of Taskset assignment.
+   *  @return  `true` if a subsequent call to `next` will yield an element,
+   *   `false` otherwise.
+   */
+  def hasNext: Boolean
+
+  /**
+   * Produces next worker offer based on the task assignment strategy.
+   * @return  the next available offer, if `hasNext` is `true`,
+   *  undefined behavior othe

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-20 Thread wangmiao1981
Github user wangmiao1981 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84422908
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * perform task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invoke the init()
+ * of TaskAssigner to initialize the data structure for the round.
+ *
+ * Third, when performing real task assignment, hasNext()/getNext() is 
used by TaskScheduler
+ * to check the worker availability and retrieve current offering from 
TaskAssigner.
+ *
+ * Fourth, then offerAccepted is used by TaskScheduler to notify the 
TaskAssigner so that
+ * TaskAssigner can decide whether the current offer is valid or not for 
the next request.
+ *
+ * Fifth, After task assignment is done, TaskScheduler invokes the tasks() 
to
+ * retrieve all the task assignment information.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  protected var offer: Seq[OfferState] = _
+  protected var cpuPerTask = 1
+
+  protected def withCpuPerTask(cpuPerTask: Int): Unit = {
+this.cpuPerTask = cpuPerTask
+  }
+
+  /** The final assigned offer returned to TaskScheduler. */
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  /** Invoked at the beginning of resource offering to construct the offer 
with the workoffers. */
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = Random.shuffle(workOffer.map(o => new OfferState(o)))
+  }
+
+  /** Invoked at each round of Taskset assignment to initialize the 
internal structure. */
+  def init(): Unit
+
+  /**
+   * Tests Whether there is offer available to be used inside of one round 
of Taskset assignment.
+   *  @return  `true` if a subsequent call to `next` will yield an element,
+   *   `false` otherwise.
+   */
+  def hasNext: Boolean
+
+  /**
+   * Produces next worker offer based on the task assignment strategy.
+   * @return  the next available offer, if `hasNext` is `true`,
+   *  undefined behavior othe

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-20 Thread wangmiao1981
Github user wangmiao1981 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84422647
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * perform task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invoke the init()
+ * of TaskAssigner to initialize the data structure for the round.
+ *
+ * Third, when performing real task assignment, hasNext()/getNext() is 
used by TaskScheduler
+ * to check the worker availability and retrieve current offering from 
TaskAssigner.
+ *
+ * Fourth, then offerAccepted is used by TaskScheduler to notify the 
TaskAssigner so that
+ * TaskAssigner can decide whether the current offer is valid or not for 
the next request.
+ *
+ * Fifth, After task assignment is done, TaskScheduler invokes the tasks() 
to
+ * retrieve all the task assignment information.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  protected var offer: Seq[OfferState] = _
+  protected var cpuPerTask = 1
+
+  protected def withCpuPerTask(cpuPerTask: Int): Unit = {
+this.cpuPerTask = cpuPerTask
+  }
+
+  /** The final assigned offer returned to TaskScheduler. */
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  /** Invoked at the beginning of resource offering to construct the offer 
with the workoffers. */
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = Random.shuffle(workOffer.map(o => new OfferState(o)))
+  }
+
+  /** Invoked at each round of Taskset assignment to initialize the 
internal structure. */
+  def init(): Unit
+
+  /**
+   * Tests Whether there is offer available to be used inside of one round 
of Taskset assignment.
+   *  @return  `true` if a subsequent call to `next` will yield an element,
--- End diff --

`@return` is not aligned with the line above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enable

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-20 Thread wangmiao1981
Github user wangmiao1981 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84422357
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * perform task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invoke the init()
--- End diff --

`invoke` -> `invokes`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-20 Thread wangmiao1981
Github user wangmiao1981 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84422079
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * perform task assignment given available workers, it first sorts the 
candidate tasksets,
--- End diff --

`perform` -> `performs`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-20 Thread wangmiao1981
Github user wangmiao1981 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84421949
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
--- End diff --

Is this class private to `scheduler`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-20 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84225583
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * perform task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invoke the init()
+ * of TaskAssigner to initialize the data structure for the round.
+ *
+ * Third, when performing real task assignment, hasNext()/getNext() is 
used by TaskScheduler
+ * to check the worker availability and retrieve current offering from 
TaskAssigner.
+ *
+ * Fourth, then offerAccepted is used by TaskScheduler to notify the 
TaskAssigner so that
+ * TaskAssigner can decide whether the current offer is valid or not for 
the next request.
+ *
+ * Fifth, After task assignment is done, TaskScheduler invokes the tasks() 
to
+ * retrieve all the task assignment information.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  protected var offer: Seq[OfferState] = _
+  protected var cpuPerTask = 1
+
+  protected def withCpuPerTask(cpuPerTask: Int): Unit = {
+this.cpuPerTask = cpuPerTask
+  }
+
+  /** The final assigned offer returned to TaskScheduler. */
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  /** Invoked at the beginning of resource offering to construct the offer 
with the workoffers. */
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  /** Invoked at each round of Taskset assignment to initialize the 
internal structure. */
+  def init(): Unit
+
+  /**
+   * Tests Whether there is offer available to be used inside of one round 
of Taskset assignment.
+   *  @return  `true` if a subsequent call to `next` will yield an element,
+   *   `false` otherwise.
+   */
+  def hasNext: Boolean
+
+  /**
+   * Produces next worker offer based on the task assignment strategy.
+   * @return  the next available offer, if `hasNext` is `true`,
+   *  undefined behavior otherwise.
+   */
   

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-20 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84222924
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracks the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  /** The current remaining cores that can be allocated to tasks. */
+  var coresAvailable: Int = workOffer.cores
+  /** The list of tasks that are assigned to this WorkerOffer. */
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
when TaskScheduler
+ * perform task assignment given available workers, it first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different locality restrictions until there is either 
no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows.
+ *
+ * First, TaskScheduler invokes construct() of TaskAssigner to initialize 
the its internal
+ * worker states at the beginning of resource offering.
+ *
+ * Second, before each round of task assignment for a taskset, 
TaskScheduler invoke the init()
+ * of TaskAssigner to initialize the data structure for the round.
+ *
+ * Third, when performing real task assignment, hasNext()/getNext() is 
used by TaskScheduler
+ * to check the worker availability and retrieve current offering from 
TaskAssigner.
+ *
+ * Fourth, then offerAccepted is used by TaskScheduler to notify the 
TaskAssigner so that
+ * TaskAssigner can decide whether the current offer is valid or not for 
the next request.
+ *
+ * Fifth, After task assignment is done, TaskScheduler invokes the tasks() 
to
+ * retrieve all the task assignment information.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  protected var offer: Seq[OfferState] = _
+  protected var cpuPerTask = 1
+
+  protected def withCpuPerTask(cpuPerTask: Int): Unit = {
+this.cpuPerTask = cpuPerTask
+  }
+
+  /** The final assigned offer returned to TaskScheduler. */
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  /** Invoked at the beginning of resource offering to construct the offer 
with the workoffers. */
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  /** Invoked at each round of Taskset assignment to initialize the 
internal structure. */
+  def init(): Unit
+
+  /**
+   * Tests Whether there is offer available to be used inside of one round 
of Taskset assignment.
+   *  @return  `true` if a subsequent call to `next` will yield an element,
+   *   `false` otherwise.
+   */
+  def hasNext: Boolean
+
+  /**
+   * Produces next worker offer based on the task assignment strategy.
+   * @return  the next available offer, if `hasNext` is `true`,
+   *  undefined behavior otherwise.
+   */

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-19 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84160226
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): U

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-19 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84158910
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): Un

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-19 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84154979
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): U

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-19 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84129486
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -109,6 +108,85 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
 assert(!failedTaskSet)
   }
 
+  test("Scheduler does not always schedule tasks on the same workers") {
+val taskScheduler = setupScheduler()
+roundrobin(taskScheduler)
+  }
+
+  test("User can specify the roundrobin task assigner") {
+val taskScheduler = setupScheduler(("spark.scheduler.taskAssigner", 
"RoUndrObin"))
+roundrobin(taskScheduler)
+  }
+
+  test("Fallback to roundrobin when the task assigner provided is not 
valid") {
+val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> 
"invalid")
+roundrobin(taskScheduler)
+  }
+
+  test("Scheduler balance the assignment to the worker with more free 
cores") {
--- End diff --

Will change the test case name to make it more explicit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-19 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84119714
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): Un

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84002685
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): Un

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84002480
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -109,6 +108,85 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
 assert(!failedTaskSet)
   }
 
+  test("Scheduler does not always schedule tasks on the same workers") {
+val taskScheduler = setupScheduler()
+roundrobin(taskScheduler)
+  }
+
+  test("User can specify the roundrobin task assigner") {
+val taskScheduler = setupScheduler(("spark.scheduler.taskAssigner", 
"RoUndrObin"))
+roundrobin(taskScheduler)
+  }
+
+  test("Fallback to roundrobin when the task assigner provided is not 
valid") {
+val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> 
"invalid")
+roundrobin(taskScheduler)
+  }
+
+  test("Scheduler balance the assignment to the worker with more free 
cores") {
--- End diff --

Can you please clarify?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84002353
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): Un

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84002236
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): Un

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83997822
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83996049
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r84000957
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83998311
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -109,6 +108,85 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
 assert(!failedTaskSet)
   }
 
+  test("Scheduler does not always schedule tasks on the same workers") {
+val taskScheduler = setupScheduler()
+roundrobin(taskScheduler)
+  }
+
+  test("User can specify the roundrobin task assigner") {
+val taskScheduler = setupScheduler(("spark.scheduler.taskAssigner", 
"RoUndrObin"))
+roundrobin(taskScheduler)
+  }
+
+  test("Fallback to roundrobin when the task assigner provided is not 
valid") {
+val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> 
"invalid")
+roundrobin(taskScheduler)
+  }
+
+  test("Scheduler balance the assignment to the worker with more free 
cores") {
--- End diff --

I think you should structure this so that tests for the same task assigner 
are packed together. Right now, one has the look into each test case to figure 
out which policy is being tested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83996692
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83998150
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83996326
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83995317
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
--- End diff --

nit: `different the locality restrictions` => `different locality 
restrictions`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83998358
  
--- Diff: docs/configuration.md ---
@@ -1342,6 +1342,19 @@ Apart from these, the following properties are also 
available, and may be useful
 Should be greater than or equal to 1. Number of allowed retries = this 
value - 1.
   
 
+
+  spark.scheduler.taskAssigner
+  roundrobin
+  
+The strategy of how to allocate tasks among workers with free cores. 
By default, roundrobin
--- End diff --

nit: create a list for each policy and explain inline instead of saying 
`former`, `latter` below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83995649
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
--- End diff --

This feels like something which is set once for object generation and not 
supposed to be mutated later. I am not able to come up with better ways to do 
that. (Maybe pass it as a constructor param ?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83995590
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
--- End diff --

- nit: use `protected`
- nit: Why camel case ? This looks like an regular class var and not a 
constant. I know you want this to be treated as a constant but probably should 
have better guarding against that



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83995463
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
--- End diff --

It might be better to put these into separate points and not as a 
paragraph. Also, I am not sure what the protocol is about putting details like 
method names in the doc. As things stand, it will serve good for people trying 
to read the code but as the codebase evolves, things might get out of sync if 
this comment is not updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83996865
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83998964
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -250,24 +248,26 @@ private[spark] class TaskSchedulerImpl(
   private def resourceOfferSingleTaskSet(
   taskSet: TaskSetManager,
   maxLocality: TaskLocality,
-  shuffledOffers: Seq[WorkerOffer],
-  availableCpus: Array[Int],
-  tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
+  taskAssigner: TaskAssigner) : Boolean = {
 var launchedTask = false
-for (i <- 0 until shuffledOffers.size) {
-  val execId = shuffledOffers(i).executorId
-  val host = shuffledOffers(i).host
-  if (availableCpus(i) >= CPUS_PER_TASK) {
+taskAssigner.init()
+while(taskAssigner.hasNext) {
--- End diff --

nit: space after `while`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83995119
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
--- End diff --

"this worker" ? `this` wont represent a worker here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83999715
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83996351
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83998916
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -60,7 +58,7 @@ private[spark] class TaskSchedulerImpl(
   def this(sc: SparkContext) = this(sc, 
sc.conf.get(config.MAX_TASK_FAILURES))
 
   val conf = sc.conf
-
+  private  val taskAssigner: TaskAssigner = TaskAssigner.init(conf)
--- End diff --

nit: extra space in `private  val`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83995208
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
--- End diff --

nit: `Tracking` => `Tracks`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83995189
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
--- End diff --

nit: `requested` => `requests`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r8416
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): Un

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83999827
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): Un

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83999756
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): Un

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83999452
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): Un

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83998771
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): Un

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83998540
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): Un

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83998058
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): Un

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83997070
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean): Un

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83996232
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean)

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83995415
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean)

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r83995272
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/** Tracking the current state of the workers with available cores and 
assigned task list. */
+class OfferState(val workOffer: WorkerOffer) {
+  // The current remaining cores that can be allocated to tasks.
+  var coresAvailable: Int = workOffer.cores
+  // The list of tasks that are assigned to this worker.
+  val tasks = new ArrayBuffer[TaskDescription](coresAvailable)
+}
+
+/**
+ * TaskAssigner is the base class for all task assigner implementations, 
and can be
+ * extended to implement different task scheduling algorithms.
+ * Together with [[org.apache.spark.scheduler.TaskScheduler 
TaskScheduler]], TaskAssigner
+ * is used to assign tasks to workers with available cores. Internally, 
TaskScheduler, requested
+ * to perform task assignment given available workers, first sorts the 
candidate tasksets,
+ * and then for each taskset, it takes a number of rounds to request 
TaskAssigner for task
+ * assignment with different the locality restrictions until there is 
either no qualified
+ * workers or no valid tasks to be assigned.
+ *
+ * TaskAssigner is responsible to maintain the worker availability state 
and task assignment
+ * information. The contract between 
[[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
+ * and TaskAssigner is as follows. First, TaskScheduler invokes 
construct() of TaskAssigner to
+ * initialize the its internal worker states at the beginning of resource 
offering. Before each
+ * round of task assignment for a taskset, TaskScheduler invoke the init() 
of TaskAssigner to
+ * initialize the data structure for the round. When performing real task 
assignment,
+ * hasNext()/getNext() is used by TaskScheduler to check the worker 
availability and retrieve
+ * current offering from TaskAssigner. Then offerAccepted is used by 
TaskScheduler to notify
+ * the TaskAssigner so that TaskAssigner can decide whether the current 
offer is valid or not for
+ * the next request. After task assignment is done, TaskScheduler invokes 
the tasks() to
+ * retrieve all the task assignment information, and eventually, invokes 
reset() method so that
+ * TaskAssigner can cleanup its internal maintained resources.
+ */
+
+private[scheduler] abstract class TaskAssigner {
+  var offer: Seq[OfferState] = _
+  var CPUS_PER_TASK = 1
+
+  def withCpuPerTask(CPUS_PER_TASK: Int): Unit = {
+this.CPUS_PER_TASK = CPUS_PER_TASK
+  }
+
+  // The final assigned offer returned to TaskScheduler.
+  final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // Invoked at the beginning of resource offering to construct the offer 
with the workoffers.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => new OfferState(o))
+  }
+
+  // Invoked at each round of Taskset assignment to initialize the 
internal structure.
+  def init(): Unit
+
+  // Whether there is offer available to be used inside of one round of 
Taskset assignment.
+  def hasNext: Boolean
+
+  // Returned the next assigned offer based on the task assignment 
strategy.
+  def getNext(): OfferState
+
+  // Invoked by the TaskScheduler to indicate whether the current offer is 
accepted or not so that
+  // the assigner can decide whether the current worker is valid for the 
next offering.
+  def offerAccepted(assigned: Boolean)

[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-10-18 Thread zhzhan
GitHub user zhzhan opened a pull request:

https://github.com/apache/spark/pull/15541

[SPARK-17637][Scheduler]Packed scheduling for Spark tasks across executors

## What changes were proposed in this pull request?

Restructure the code and implement two new task assigner.
PackedAssigner: try to allocate tasks to the executors with least available 
cores, so that spark can release reserved executors when dynamic allocation is 
enabled.

BalancedAssigner: try to allocate tasks to the executors with more 
available cores in order to balance the workload across all executors.

By default, the original round robin assigner is used.

We test a pipeline, and new PackedAssigner save around 45% regarding the 
reserved cpu and memory with dynamic allocation enabled.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
Both unit test in TaskSchedulerImplSuite and manual tests in production 
pipeline.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhzhan/spark TaskAssigner

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15541.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15541


commit 75cdd1a77a227fa492a09e93794d4ea7be8a020f
Author: Zhan Zhang 
Date:   2016-10-19T01:20:48Z

TaskAssigner to support different scheduling algorithms




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org