[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan closed the pull request at: https://github.com/apache/spark/pull/15541 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r85985739 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +private[scheduler] class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) + + def assignTask(task: TaskDescription, cpu: Int): Unit = { +if (coresAvailable < cpu) { + throw new SparkException(s"Available cores are less than cpu per task" + +s" ($coresAvailable < $cpu)") +} +tasks += task +coresAvailable -= cpu + } +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * performs task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes multiple rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invokes the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext/next() is used by TaskScheduler + * to check the worker availability and retrieve current offering from TaskAssigner. + * + * Fourth, TaskScheduler calls offerAccepted() to notify the TaskAssigner so that + * TaskAssigner can decide whether the current offer is valid or not for the next request. + * + * Fifth, after task assignment is done, TaskScheduler invokes the function tasks to + * retrieve all the task assignment information. + */ + +private[scheduler] sealed abstract class TaskAssigner { + protected var offer: Seq[OfferState] = _ + protected var cpuPerTask = 1 + + protected def withCpuPerTask(cpuPerTask: Int): TaskAssigner = { +this.cpuPerTask = cpuPerTask +this + } + + /** The currently assigned offers. */ + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + /** + * Invoked at the beginning of resource offering to construct the offer with the workoffers. + * By default, offers is randomly shuffled to avoid always placing tasks on the same set of + * workers. + */ + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = Random.shuffle(workOffer.map(o => new OfferState(o))) + } + + /** Invoked at each round of Taskset assignment to initialize the internal structure. */ + def init(): Unit +
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user lins05 commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r85924864 --- Diff: docs/configuration.md --- @@ -1350,6 +1350,20 @@ Apart from these, the following properties are also available, and may be useful Should be greater than or equal to 1. Number of allowed retries = this value - 1. + + spark.scheduler.taskAssigner + roundrobin + +The strategy of how to allocate tasks among workers with free cores. Three task +assigners (roundrobin, packed, and balanced) are supported currently. By default, roundrobin --- End diff -- Nit: I suggest double quote the keywords "roundrobin", "packed", and "balanced" in this paragraph. E.g. `the "balanced" task assigner` sounds better to me than `the balanced task assigner`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user lins05 commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r85956857 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -305,12 +307,8 @@ private[spark] class TaskSchedulerImpl( hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host } } +taskAssigner.construct(offers) --- End diff -- The comments of the `resourceOffers` method shoud be updated. It still says `We fill each node with tasks in a round-robin manner so that tasks are balanced across the cluster.` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r85882792 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +private[scheduler] class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) + + def assignTask(task: TaskDescription, cpu: Int): Unit = { +if (coresAvailable < cpu) { + throw new SparkException(s"Available cores are less than cpu per task" + +s" ($coresAvailable < $cpu)") +} +tasks += task +coresAvailable -= cpu + } +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * performs task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes multiple rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invokes the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext/next() is used by TaskScheduler + * to check the worker availability and retrieve current offering from TaskAssigner. + * + * Fourth, TaskScheduler calls offerAccepted() to notify the TaskAssigner so that + * TaskAssigner can decide whether the current offer is valid or not for the next request. + * + * Fifth, after task assignment is done, TaskScheduler invokes the function tasks to + * retrieve all the task assignment information. + */ + +private[scheduler] sealed abstract class TaskAssigner { + protected var offer: Seq[OfferState] = _ + protected var cpuPerTask = 1 + + protected def withCpuPerTask(cpuPerTask: Int): TaskAssigner = { +this.cpuPerTask = cpuPerTask +this + } + + /** The currently assigned offers. */ + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + /** + * Invoked at the beginning of resource offering to construct the offer with the workoffers. + * By default, offers is randomly shuffled to avoid always placing tasks on the same set of + * workers. + */ + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = Random.shuffle(workOffer.map(o => new OfferState(o))) + } + + /** Invoked at each round of Taskset assignment to initialize the internal structure. */ + def init(): Unit +
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84621271 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -250,24 +251,24 @@ private[spark] class TaskSchedulerImpl( private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, - shuffledOffers: Seq[WorkerOffer], - availableCpus: Array[Int], - tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { + taskAssigner: TaskAssigner) : Boolean = { var launchedTask = false -for (i <- 0 until shuffledOffers.size) { - val execId = shuffledOffers(i).executorId - val host = shuffledOffers(i).host - if (availableCpus(i) >= CPUS_PER_TASK) { +taskAssigner.init() +while (taskAssigner.hasNext) { + var isAccepted = false + val currentOffer = taskAssigner.next() + val execId = currentOffer.workOffer.executorId + val host = currentOffer.workOffer.host + if (currentOffer.coresAvailable >= CPUS_PER_TASK) { try { for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { -tasks(i) += task +currentOffer.assignTask(task, CPUS_PER_TASK) val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId executorIdToTaskCount(execId) += 1 -availableCpus(i) -= CPUS_PER_TASK -assert(availableCpus(i) >= 0) --- End diff -- @zhzhan ah, thanks for explanation! I don't notice it returns an Option. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84621076 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -250,24 +251,24 @@ private[spark] class TaskSchedulerImpl( private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, - shuffledOffers: Seq[WorkerOffer], - availableCpus: Array[Int], - tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { + taskAssigner: TaskAssigner) : Boolean = { var launchedTask = false -for (i <- 0 until shuffledOffers.size) { - val execId = shuffledOffers(i).executorId - val host = shuffledOffers(i).host - if (availableCpus(i) >= CPUS_PER_TASK) { +taskAssigner.init() +while (taskAssigner.hasNext) { + var isAccepted = false + val currentOffer = taskAssigner.next() + val execId = currentOffer.workOffer.executorId + val host = currentOffer.workOffer.host + if (currentOffer.coresAvailable >= CPUS_PER_TASK) { try { for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { -tasks(i) += task +currentOffer.assignTask(task, CPUS_PER_TASK) val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId executorIdToTaskCount(execId) += 1 -availableCpus(i) -= CPUS_PER_TASK -assert(availableCpus(i) >= 0) --- End diff -- @viirya The assert will not fail even in the legacy code, because taskSet.resourceOffer(execId, host, maxLocality) return an Option. The for loop at most run once. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84619879 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -250,24 +251,24 @@ private[spark] class TaskSchedulerImpl( private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, - shuffledOffers: Seq[WorkerOffer], - availableCpus: Array[Int], - tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { + taskAssigner: TaskAssigner) : Boolean = { var launchedTask = false -for (i <- 0 until shuffledOffers.size) { - val execId = shuffledOffers(i).executorId - val host = shuffledOffers(i).host - if (availableCpus(i) >= CPUS_PER_TASK) { +taskAssigner.init() +while (taskAssigner.hasNext) { + var isAccepted = false + val currentOffer = taskAssigner.next() + val execId = currentOffer.workOffer.executorId + val host = currentOffer.workOffer.host + if (currentOffer.coresAvailable >= CPUS_PER_TASK) { try { for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { -tasks(i) += task +currentOffer.assignTask(task, CPUS_PER_TASK) val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId executorIdToTaskCount(execId) += 1 -availableCpus(i) -= CPUS_PER_TASK -assert(availableCpus(i) >= 0) --- End diff -- Thanks @viirya for the comments. Actually I was thinking removing the check although it is part of the legacy code. Now the check is moved into OfferState, which makes more sense. IMHO, typically the assertion should never fail. But from the OfferState's perspective, it should guarantee such restriction. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84619023 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +private[scheduler] class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) + + def assignTask(task: TaskDescription, cpu: Int): Unit = { +tasks += task +coresAvailable -= cpu +assert(coresAvailable >= 0) + } +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * performs task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes multiple rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invokes the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext/next() is used by TaskScheduler + * to check the worker availability and retrieve current offering from TaskAssigner. + * + * Fourth, TaskScheduler calls offerAccepted() to notify the TaskAssigner so that + * TaskAssigner can decide whether the current offer is valid or not for the next request. + * + * Fifth, after task assignment is done, TaskScheduler invokes the function tasks to + * retrieve all the task assignment information. + */ + +private[scheduler] sealed abstract class TaskAssigner { + protected var offer: Seq[OfferState] = _ + protected var cpuPerTask = 1 + + protected def withCpuPerTask(cpuPerTask: Int): TaskAssigner = { +this.cpuPerTask = cpuPerTask --- End diff -- You mean cpuPerTask >= 1? I don't think we need this check. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84618189 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +private[scheduler] class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) + + def assignTask(task: TaskDescription, cpu: Int): Unit = { +tasks += task +coresAvailable -= cpu +assert(coresAvailable >= 0) + } +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * performs task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes multiple rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invokes the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext/next() is used by TaskScheduler + * to check the worker availability and retrieve current offering from TaskAssigner. + * + * Fourth, TaskScheduler calls offerAccepted() to notify the TaskAssigner so that + * TaskAssigner can decide whether the current offer is valid or not for the next request. + * + * Fifth, after task assignment is done, TaskScheduler invokes the function tasks to + * retrieve all the task assignment information. + */ + +private[scheduler] sealed abstract class TaskAssigner { + protected var offer: Seq[OfferState] = _ + protected var cpuPerTask = 1 + + protected def withCpuPerTask(cpuPerTask: Int): TaskAssigner = { +this.cpuPerTask = cpuPerTask --- End diff -- Do we need to add sanity check for `cpuPerTask`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84616602 --- Diff: docs/configuration.md --- @@ -1350,6 +1350,20 @@ Apart from these, the following properties are also available, and may be useful Should be greater than or equal to 1. Number of allowed retries = this value - 1. + + spark.scheduler.taskAssigner + roundrobin + +The strategy of how to allocate tasks among workers with free cores. Three task +assigners (roundrobin, packed, and balanced) are supported currently. By default, roundrobin +with randomness is used to allocate task to workers with available cores in a --- End diff -- `allocate task` -> `allocate tasks` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84616347 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +private[scheduler] class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) + + def assignTask(task: TaskDescription, cpu: Int): Unit = { +tasks += task +coresAvailable -= cpu +assert(coresAvailable >= 0) + } +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * performs task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes multiple rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invokes the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext/next() is used by TaskScheduler + * to check the worker availability and retrieve current offering from TaskAssigner. + * + * Fourth, TaskScheduler calls offerAccepted() to notify the TaskAssigner so that + * TaskAssigner can decide whether the current offer is valid or not for the next request. + * + * Fifth, after task assignment is done, TaskScheduler invokes the function tasks to + * retrieve all the task assignment information. + */ + +private[scheduler] sealed abstract class TaskAssigner { + protected var offer: Seq[OfferState] = _ + protected var cpuPerTask = 1 + + protected def withCpuPerTask(cpuPerTask: Int): TaskAssigner = { +this.cpuPerTask = cpuPerTask +this + } + + /** The currently assigned offers. */ + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + /** + * Invoked at the beginning of resource offering to construct the offer with the workoffers. + * By default, offers is randomly shuffled to avoid always placing tasks on the same set of + * workers. + */ + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = Random.shuffle(workOffer.map(o => new OfferState(o))) + } + + /** Invoked at each round of Taskset assignment to initialize the internal structure. */ + def init(): Unit + + /** + * Tests whether there is offer available to be used inside of one round of Taskset assignment. + * @return `true` if a subseq
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84591196 --- Diff: docs/configuration.md --- @@ -1342,6 +1342,20 @@ Apart from these, the following properties are also available, and may be useful Should be greater than or equal to 1. Number of allowed retries = this value - 1. + + spark.scheduler.taskAssigner + roundrobin + +The strategy of how to allocate tasks among workers with free cores. There are three task +assigners (roundrobin, packed, and balanced) are supported currently. By default, roundrobin +with randomness is used, which tries to allocate task to workers with available cores in +roundrobin manner. The packed task assigner tries to allocate tasks to workers with the least +free cores, resulting in tasks assigned to few workers, which may help driver to release the +reserved idle workers when dynamic allocation(spark.dynamicAllocation.enabled) is enabled. +The balanced task assigner tries to assign tasks across workers in a balance way (allocating --- End diff -- `balance ` -> `balanced ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84591185 --- Diff: docs/configuration.md --- @@ -1342,6 +1342,20 @@ Apart from these, the following properties are also available, and may be useful Should be greater than or equal to 1. Number of allowed retries = this value - 1. + + spark.scheduler.taskAssigner + roundrobin + +The strategy of how to allocate tasks among workers with free cores. There are three task +assigners (roundrobin, packed, and balanced) are supported currently. By default, roundrobin +with randomness is used, which tries to allocate task to workers with available cores in +roundrobin manner. The packed task assigner tries to allocate tasks to workers with the least +free cores, resulting in tasks assigned to few workers, which may help driver to release the +reserved idle workers when dynamic allocation(spark.dynamicAllocation.enabled) is enabled. +The balanced task assigner tries to assign tasks across workers in a balance way (allocating +tasks to workers with most free cores). --- End diff -- `most` -> `the most` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84591178 --- Diff: docs/configuration.md --- @@ -1342,6 +1342,20 @@ Apart from these, the following properties are also available, and may be useful Should be greater than or equal to 1. Number of allowed retries = this value - 1. + + spark.scheduler.taskAssigner + roundrobin + +The strategy of how to allocate tasks among workers with free cores. There are three task +assigners (roundrobin, packed, and balanced) are supported currently. By default, roundrobin +with randomness is used, which tries to allocate task to workers with available cores in +roundrobin manner. The packed task assigner tries to allocate tasks to workers with the least +free cores, resulting in tasks assigned to few workers, which may help driver to release the --- End diff -- `few` -> `fewer` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84591169 --- Diff: docs/configuration.md --- @@ -1342,6 +1342,20 @@ Apart from these, the following properties are also available, and may be useful Should be greater than or equal to 1. Number of allowed retries = this value - 1. + + spark.scheduler.taskAssigner + roundrobin + +The strategy of how to allocate tasks among workers with free cores. There are three task +assigners (roundrobin, packed, and balanced) are supported currently. By default, roundrobin +with randomness is used, which tries to allocate task to workers with available cores in --- End diff -- `is used, which tries to allocate task` -> is used to allocate tasks` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84591160 --- Diff: docs/configuration.md --- @@ -1342,6 +1342,20 @@ Apart from these, the following properties are also available, and may be useful Should be greater than or equal to 1. Number of allowed retries = this value - 1. + + spark.scheduler.taskAssigner + roundrobin + +The strategy of how to allocate tasks among workers with free cores. There are three task +assigners (roundrobin, packed, and balanced) are supported currently. By default, roundrobin +with randomness is used, which tries to allocate task to workers with available cores in +roundrobin manner. The packed task assigner tries to allocate tasks to workers with the least --- End diff -- `in roundrobin manner` -> `in a round-robin manner ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84591137 --- Diff: docs/configuration.md --- @@ -1342,6 +1342,20 @@ Apart from these, the following properties are also available, and may be useful Should be greater than or equal to 1. Number of allowed retries = this value - 1. + + spark.scheduler.taskAssigner + roundrobin + +The strategy of how to allocate tasks among workers with free cores. There are three task --- End diff -- `There are three task` -> `Three task` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84591093 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -305,12 +309,8 @@ private[spark] class TaskSchedulerImpl( hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host } } +taskAssigner.construct(offers) -// Randomly shuffle offers to avoid always placing tasks on the same set of workers. --- End diff -- Could we add this comment to [the code in `construct`](https://github.com/zhzhan/spark/blob/dd2b2077430bbb07047e928d20c1ad8fe940827a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala#L79)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84591073 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -250,24 +251,26 @@ private[spark] class TaskSchedulerImpl( private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, - shuffledOffers: Seq[WorkerOffer], - availableCpus: Array[Int], - tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { + taskAssigner: TaskAssigner) : Boolean = { var launchedTask = false -for (i <- 0 until shuffledOffers.size) { - val execId = shuffledOffers(i).executorId - val host = shuffledOffers(i).host - if (availableCpus(i) >= CPUS_PER_TASK) { +taskAssigner.init() +while (taskAssigner.hasNext) { + var assigned = false + val currentOffer = taskAssigner.next() + val execId = currentOffer.workOffer.executorId + val host = currentOffer.workOffer.host + if (currentOffer.coresAvailable >= CPUS_PER_TASK) { try { for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { -tasks(i) += task +currentOffer.tasks += task val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId executorIdToTaskCount(execId) += 1 -availableCpus(i) -= CPUS_PER_TASK -assert(availableCpus(i) >= 0) +currentOffer.coresAvailable -= CPUS_PER_TASK +assert(currentOffer.coresAvailable >= 0) --- End diff -- Could we add a function in `class OfferState` for including the following three lines of codes? ```Scala currentOffer.tasks += task currentOffer.coresAvailable -= CPUS_PER_TASK assert(currentOffer.coresAvailable >= 0) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84590978 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +private[scheduler] class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * performs task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invokes the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext()/getNext() is used by TaskScheduler + * to check the worker availability and retrieve current offering from TaskAssigner. + * + * Fourth, then offerAccepted is used by TaskScheduler to notify the TaskAssigner so that + * TaskAssigner can decide whether the current offer is valid or not for the next request. + * + * Fifth, After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information. + */ + +private[scheduler] abstract class TaskAssigner { + protected var offer: Seq[OfferState] = _ + protected var cpuPerTask = 1 + + protected def withCpuPerTask(cpuPerTask: Int): Unit = { +this.cpuPerTask = cpuPerTask + } + + /** The final assigned offer returned to TaskScheduler. */ + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + /** Invoked at the beginning of resource offering to construct the offer with the workoffers. */ + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = Random.shuffle(workOffer.map(o => new OfferState(o))) + } + + /** Invoked at each round of Taskset assignment to initialize the internal structure. */ + def init(): Unit + + /** + * Tests whether there is offer available to be used inside of one round of Taskset assignment. + * @return `true` if a subsequent call to `next` will yield an element, + * `false` otherwise. + */ + def hasNext: Boolean + + /** + * Produces next worker offer based on the task assignment strategy. + * @return the next available offer, if `hasNext` is `true`, + * undefi
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84590948 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +private[scheduler] class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * performs task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invokes the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext()/getNext() is used by TaskScheduler + * to check the worker availability and retrieve current offering from TaskAssigner. + * + * Fourth, then offerAccepted is used by TaskScheduler to notify the TaskAssigner so that + * TaskAssigner can decide whether the current offer is valid or not for the next request. + * + * Fifth, After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information. + */ + +private[scheduler] abstract class TaskAssigner { + protected var offer: Seq[OfferState] = _ + protected var cpuPerTask = 1 + + protected def withCpuPerTask(cpuPerTask: Int): Unit = { +this.cpuPerTask = cpuPerTask + } + + /** The final assigned offer returned to TaskScheduler. */ + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + /** Invoked at the beginning of resource offering to construct the offer with the workoffers. */ + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = Random.shuffle(workOffer.map(o => new OfferState(o))) + } + + /** Invoked at each round of Taskset assignment to initialize the internal structure. */ + def init(): Unit + + /** + * Tests whether there is offer available to be used inside of one round of Taskset assignment. + * @return `true` if a subsequent call to `next` will yield an element, + * `false` otherwise. + */ + def hasNext: Boolean + + /** + * Produces next worker offer based on the task assignment strategy. + * @return the next available offer, if `hasNext` is `true`, + * undefi
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84590835 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +private[scheduler] class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * performs task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invokes the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext()/getNext() is used by TaskScheduler + * to check the worker availability and retrieve current offering from TaskAssigner. + * + * Fourth, then offerAccepted is used by TaskScheduler to notify the TaskAssigner so that + * TaskAssigner can decide whether the current offer is valid or not for the next request. + * + * Fifth, After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information. + */ + +private[scheduler] abstract class TaskAssigner { + protected var offer: Seq[OfferState] = _ + protected var cpuPerTask = 1 + + protected def withCpuPerTask(cpuPerTask: Int): Unit = { +this.cpuPerTask = cpuPerTask + } + + /** The final assigned offer returned to TaskScheduler. */ --- End diff -- Here, the implementation of this function does not guarantee the offer assignment is final or not. Thus, maybe we can change the description to `The currently assigned offers` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84590773 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +private[scheduler] class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * performs task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invokes the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext()/getNext() is used by TaskScheduler + * to check the worker availability and retrieve current offering from TaskAssigner. + * + * Fourth, then offerAccepted is used by TaskScheduler to notify the TaskAssigner so that + * TaskAssigner can decide whether the current offer is valid or not for the next request. + * + * Fifth, After task assignment is done, TaskScheduler invokes the tasks() to --- End diff -- `After` -> `after` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84590747 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +private[scheduler] class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * performs task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invokes the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext()/getNext() is used by TaskScheduler --- End diff -- `getNext()` -> `next()` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84590726 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +private[scheduler] class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * performs task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invokes the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext()/getNext() is used by TaskScheduler --- End diff -- `hasNext()` -> `hasNext` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84590702 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +private[scheduler] class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * performs task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invokes the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext()/getNext() is used by TaskScheduler + * to check the worker availability and retrieve current offering from TaskAssigner. + * + * Fourth, then offerAccepted is used by TaskScheduler to notify the TaskAssigner so that + * TaskAssigner can decide whether the current offer is valid or not for the next request. + * + * Fifth, After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information. + */ + +private[scheduler] abstract class TaskAssigner { --- End diff -- `abstract class` -> `sealed abstract class` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84590664 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +private[scheduler] class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * performs task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task --- End diff -- `a number of` -> `multiple` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user wangmiao1981 commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84424097 --- Diff: docs/configuration.md --- @@ -1342,6 +1342,20 @@ Apart from these, the following properties are also available, and may be useful Should be greater than or equal to 1. Number of allowed retries = this value - 1. + + spark.scheduler.taskAssigner + roundrobin + +The strategy of how to allocate tasks among workers with free cores. There are three task +assigners (roundrobin, packed, and balanced) are supported currently. By default, roundrobin +with randomness is used, which tries to allocate task to workers with available cores in +roundrobin manner.The packed task assigner tries to allocate tasks to workers with the least --- End diff -- missed space between . and `The` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84424034 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * perform task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invoke the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext()/getNext() is used by TaskScheduler + * to check the worker availability and retrieve current offering from TaskAssigner. + * + * Fourth, then offerAccepted is used by TaskScheduler to notify the TaskAssigner so that + * TaskAssigner can decide whether the current offer is valid or not for the next request. + * + * Fifth, After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information. + */ + +private[scheduler] abstract class TaskAssigner { + protected var offer: Seq[OfferState] = _ + protected var cpuPerTask = 1 + + protected def withCpuPerTask(cpuPerTask: Int): Unit = { +this.cpuPerTask = cpuPerTask + } + + /** The final assigned offer returned to TaskScheduler. */ + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + /** Invoked at the beginning of resource offering to construct the offer with the workoffers. */ + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = Random.shuffle(workOffer.map(o => new OfferState(o))) + } + + /** Invoked at each round of Taskset assignment to initialize the internal structure. */ + def init(): Unit + + /** + * Tests Whether there is offer available to be used inside of one round of Taskset assignment. + * @return `true` if a subsequent call to `next` will yield an element, + * `false` otherwise. + */ + def hasNext: Boolean + + /** + * Produces next worker offer based on the task assignment strategy. + * @return the next available offer, if `hasNext` is `true`, + * undefined behavior otherwise.
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user wangmiao1981 commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84423237 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * perform task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invoke the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext()/getNext() is used by TaskScheduler + * to check the worker availability and retrieve current offering from TaskAssigner. + * + * Fourth, then offerAccepted is used by TaskScheduler to notify the TaskAssigner so that + * TaskAssigner can decide whether the current offer is valid or not for the next request. + * + * Fifth, After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information. + */ + +private[scheduler] abstract class TaskAssigner { + protected var offer: Seq[OfferState] = _ + protected var cpuPerTask = 1 + + protected def withCpuPerTask(cpuPerTask: Int): Unit = { +this.cpuPerTask = cpuPerTask + } + + /** The final assigned offer returned to TaskScheduler. */ + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + /** Invoked at the beginning of resource offering to construct the offer with the workoffers. */ + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = Random.shuffle(workOffer.map(o => new OfferState(o))) + } + + /** Invoked at each round of Taskset assignment to initialize the internal structure. */ + def init(): Unit + + /** + * Tests Whether there is offer available to be used inside of one round of Taskset assignment. + * @return `true` if a subsequent call to `next` will yield an element, + * `false` otherwise. + */ + def hasNext: Boolean + + /** + * Produces next worker offer based on the task assignment strategy. + * @return the next available offer, if `hasNext` is `true`, + * undefined behavior othe
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user wangmiao1981 commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84422908 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * perform task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invoke the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext()/getNext() is used by TaskScheduler + * to check the worker availability and retrieve current offering from TaskAssigner. + * + * Fourth, then offerAccepted is used by TaskScheduler to notify the TaskAssigner so that + * TaskAssigner can decide whether the current offer is valid or not for the next request. + * + * Fifth, After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information. + */ + +private[scheduler] abstract class TaskAssigner { + protected var offer: Seq[OfferState] = _ + protected var cpuPerTask = 1 + + protected def withCpuPerTask(cpuPerTask: Int): Unit = { +this.cpuPerTask = cpuPerTask + } + + /** The final assigned offer returned to TaskScheduler. */ + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + /** Invoked at the beginning of resource offering to construct the offer with the workoffers. */ + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = Random.shuffle(workOffer.map(o => new OfferState(o))) + } + + /** Invoked at each round of Taskset assignment to initialize the internal structure. */ + def init(): Unit + + /** + * Tests Whether there is offer available to be used inside of one round of Taskset assignment. + * @return `true` if a subsequent call to `next` will yield an element, + * `false` otherwise. + */ + def hasNext: Boolean + + /** + * Produces next worker offer based on the task assignment strategy. + * @return the next available offer, if `hasNext` is `true`, + * undefined behavior othe
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user wangmiao1981 commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84422647 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * perform task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invoke the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext()/getNext() is used by TaskScheduler + * to check the worker availability and retrieve current offering from TaskAssigner. + * + * Fourth, then offerAccepted is used by TaskScheduler to notify the TaskAssigner so that + * TaskAssigner can decide whether the current offer is valid or not for the next request. + * + * Fifth, After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information. + */ + +private[scheduler] abstract class TaskAssigner { + protected var offer: Seq[OfferState] = _ + protected var cpuPerTask = 1 + + protected def withCpuPerTask(cpuPerTask: Int): Unit = { +this.cpuPerTask = cpuPerTask + } + + /** The final assigned offer returned to TaskScheduler. */ + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + /** Invoked at the beginning of resource offering to construct the offer with the workoffers. */ + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = Random.shuffle(workOffer.map(o => new OfferState(o))) + } + + /** Invoked at each round of Taskset assignment to initialize the internal structure. */ + def init(): Unit + + /** + * Tests Whether there is offer available to be used inside of one round of Taskset assignment. + * @return `true` if a subsequent call to `next` will yield an element, --- End diff -- `@return` is not aligned with the line above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enable
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user wangmiao1981 commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84422357 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * perform task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invoke the init() --- End diff -- `invoke` -> `invokes` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user wangmiao1981 commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84422079 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * perform task assignment given available workers, it first sorts the candidate tasksets, --- End diff -- `perform` -> `performs` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user wangmiao1981 commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84421949 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { --- End diff -- Is this class private to `scheduler`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84225583 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * perform task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invoke the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext()/getNext() is used by TaskScheduler + * to check the worker availability and retrieve current offering from TaskAssigner. + * + * Fourth, then offerAccepted is used by TaskScheduler to notify the TaskAssigner so that + * TaskAssigner can decide whether the current offer is valid or not for the next request. + * + * Fifth, After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information. + */ + +private[scheduler] abstract class TaskAssigner { + protected var offer: Seq[OfferState] = _ + protected var cpuPerTask = 1 + + protected def withCpuPerTask(cpuPerTask: Int): Unit = { +this.cpuPerTask = cpuPerTask + } + + /** The final assigned offer returned to TaskScheduler. */ + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + /** Invoked at the beginning of resource offering to construct the offer with the workoffers. */ + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + /** Invoked at each round of Taskset assignment to initialize the internal structure. */ + def init(): Unit + + /** + * Tests Whether there is offer available to be used inside of one round of Taskset assignment. + * @return `true` if a subsequent call to `next` will yield an element, + * `false` otherwise. + */ + def hasNext: Boolean + + /** + * Produces next worker offer based on the task assignment strategy. + * @return the next available offer, if `hasNext` is `true`, + * undefined behavior otherwise. + */
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84222924 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * perform task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invoke the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext()/getNext() is used by TaskScheduler + * to check the worker availability and retrieve current offering from TaskAssigner. + * + * Fourth, then offerAccepted is used by TaskScheduler to notify the TaskAssigner so that + * TaskAssigner can decide whether the current offer is valid or not for the next request. + * + * Fifth, After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information. + */ + +private[scheduler] abstract class TaskAssigner { + protected var offer: Seq[OfferState] = _ + protected var cpuPerTask = 1 + + protected def withCpuPerTask(cpuPerTask: Int): Unit = { +this.cpuPerTask = cpuPerTask + } + + /** The final assigned offer returned to TaskScheduler. */ + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + /** Invoked at the beginning of resource offering to construct the offer with the workoffers. */ + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + /** Invoked at each round of Taskset assignment to initialize the internal structure. */ + def init(): Unit + + /** + * Tests Whether there is offer available to be used inside of one round of Taskset assignment. + * @return `true` if a subsequent call to `next` will yield an element, + * `false` otherwise. + */ + def hasNext: Boolean + + /** + * Produces next worker offer based on the task assignment strategy. + * @return the next available offer, if `hasNext` is `true`, + * undefined behavior otherwise. + */
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84160226 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): U
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84158910 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84154979 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): U
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84129486 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -109,6 +108,85 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } + test("Scheduler does not always schedule tasks on the same workers") { +val taskScheduler = setupScheduler() +roundrobin(taskScheduler) + } + + test("User can specify the roundrobin task assigner") { +val taskScheduler = setupScheduler(("spark.scheduler.taskAssigner", "RoUndrObin")) +roundrobin(taskScheduler) + } + + test("Fallback to roundrobin when the task assigner provided is not valid") { +val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "invalid") +roundrobin(taskScheduler) + } + + test("Scheduler balance the assignment to the worker with more free cores") { --- End diff -- Will change the test case name to make it more explicit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84119714 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84002685 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84002480 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -109,6 +108,85 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } + test("Scheduler does not always schedule tasks on the same workers") { +val taskScheduler = setupScheduler() +roundrobin(taskScheduler) + } + + test("User can specify the roundrobin task assigner") { +val taskScheduler = setupScheduler(("spark.scheduler.taskAssigner", "RoUndrObin")) +roundrobin(taskScheduler) + } + + test("Fallback to roundrobin when the task assigner provided is not valid") { +val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "invalid") +roundrobin(taskScheduler) + } + + test("Scheduler balance the assignment to the worker with more free cores") { --- End diff -- Can you please clarify? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84002353 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84002236 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83997822 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83996049 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84000957 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83998311 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -109,6 +108,85 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } + test("Scheduler does not always schedule tasks on the same workers") { +val taskScheduler = setupScheduler() +roundrobin(taskScheduler) + } + + test("User can specify the roundrobin task assigner") { +val taskScheduler = setupScheduler(("spark.scheduler.taskAssigner", "RoUndrObin")) +roundrobin(taskScheduler) + } + + test("Fallback to roundrobin when the task assigner provided is not valid") { +val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "invalid") +roundrobin(taskScheduler) + } + + test("Scheduler balance the assignment to the worker with more free cores") { --- End diff -- I think you should structure this so that tests for the same task assigner are packed together. Right now, one has the look into each test case to figure out which policy is being tested. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83996692 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83998150 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83996326 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83995317 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified --- End diff -- nit: `different the locality restrictions` => `different locality restrictions` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83998358 --- Diff: docs/configuration.md --- @@ -1342,6 +1342,19 @@ Apart from these, the following properties are also available, and may be useful Should be greater than or equal to 1. Number of allowed retries = this value - 1. + + spark.scheduler.taskAssigner + roundrobin + +The strategy of how to allocate tasks among workers with free cores. By default, roundrobin --- End diff -- nit: create a list for each policy and explain inline instead of saying `former`, `latter` below. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83995649 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { --- End diff -- This feels like something which is set once for object generation and not supposed to be mutated later. I am not able to come up with better ways to do that. (Maybe pass it as a constructor param ?) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83995590 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 --- End diff -- - nit: use `protected` - nit: Why camel case ? This looks like an regular class var and not a constant. I know you want this to be treated as a constant but probably should have better guarding against that --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83995463 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to --- End diff -- It might be better to put these into separate points and not as a paragraph. Also, I am not sure what the protocol is about putting details like method names in the doc. As things stand, it will serve good for people trying to read the code but as the codebase evolves, things might get out of sync if this comment is not updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83996865 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83998964 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -250,24 +248,26 @@ private[spark] class TaskSchedulerImpl( private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, - shuffledOffers: Seq[WorkerOffer], - availableCpus: Array[Int], - tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { + taskAssigner: TaskAssigner) : Boolean = { var launchedTask = false -for (i <- 0 until shuffledOffers.size) { - val execId = shuffledOffers(i).executorId - val host = shuffledOffers(i).host - if (availableCpus(i) >= CPUS_PER_TASK) { +taskAssigner.init() +while(taskAssigner.hasNext) { --- End diff -- nit: space after `while` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83995119 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. --- End diff -- "this worker" ? `this` wont represent a worker here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83999715 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83996351 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83998916 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -60,7 +58,7 @@ private[spark] class TaskSchedulerImpl( def this(sc: SparkContext) = this(sc, sc.conf.get(config.MAX_TASK_FAILURES)) val conf = sc.conf - + private val taskAssigner: TaskAssigner = TaskAssigner.init(conf) --- End diff -- nit: extra space in `private val` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83995208 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ --- End diff -- nit: `Tracking` => `Tracks` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83995189 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested --- End diff -- nit: `requested` => `requests` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r8416 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83999827 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83999756 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83999452 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83998771 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83998540 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83998058 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83997070 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83996232 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean)
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83995415 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean)
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83995272 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean)
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
GitHub user zhzhan opened a pull request: https://github.com/apache/spark/pull/15541 [SPARK-17637][Scheduler]Packed scheduling for Spark tasks across executors ## What changes were proposed in this pull request? Restructure the code and implement two new task assigner. PackedAssigner: try to allocate tasks to the executors with least available cores, so that spark can release reserved executors when dynamic allocation is enabled. BalancedAssigner: try to allocate tasks to the executors with more available cores in order to balance the workload across all executors. By default, the original round robin assigner is used. We test a pipeline, and new PackedAssigner save around 45% regarding the reserved cpu and memory with dynamic allocation enabled. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) Both unit test in TaskSchedulerImplSuite and manual tests in production pipeline. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhzhan/spark TaskAssigner Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15541.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15541 commit 75cdd1a77a227fa492a09e93794d4ea7be8a020f Author: Zhan Zhang Date: 2016-10-19T01:20:48Z TaskAssigner to support different scheduling algorithms --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org