[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r84006557 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala --- @@ -142,319 +84,37 @@ case class HyperLogLogPlusPlus( override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType) - override def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes) - - /** Allocate enough words to store all registers. */ - override val aggBufferAttributes: Seq[AttributeReference] = Seq.tabulate(numWords) { i => -AttributeReference(s"MS[$i]", LongType)() + override def createAggregationBuffer(): HyperLogLogPlusPlusAlgo = { +new HyperLogLogPlusPlusAlgo(relativeSD) --- End diff -- @hvanhovell Do you mean we should not use `TypedImperativeAggregate` instead of `ImperativeAggregate`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15532: [SPARK-17989][SQL] Check ascendingOrder type in sort_arr...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15532 **[Test build #67176 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67176/consoleFull)** for PR 15532 at commit [`6ff7c7d`](https://github.com/apache/spark/commit/6ff7c7dc15694ab8f95e2d5f9c1fd30850b83114). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [SPARK-17931][CORE] taskScheduler has some unneeded seri...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/15505 cc @rxin --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15548: [SPARK-17985][CORE] Bump commons-lang3 version to 3.5.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15548 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15548: [SPARK-17985][CORE] Bump commons-lang3 version to 3.5.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15548 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/67168/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15377: [SPARK-17802] Improved caller context logging.
Github user weiqingy commented on the issue: https://github.com/apache/spark/pull/15377 @lins05 Maybe something like this: ``` test("Set Spark CallerContext") { val context = "test" try { new CallerContext(context).setCurrentContext() val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext") assert(CallerContext.callerContextSupported) assert(s"SPARK_$context" === callerContext.getMethod("getCurrent").invoke(null).toString) } catch { case e: ClassNotFoundException => assert(!CallerContext.callerContextSupported) } } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15548: [SPARK-17985][CORE] Bump commons-lang3 version to 3.5.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15548 **[Test build #67168 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67168/consoleFull)** for PR 15548 at commit [`f318dff`](https://github.com/apache/spark/commit/f318dffd4137c20bdc67ac054e345d55703d96de). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15550: [SPARK-18003][Spark Core] Fix bug of RDD zipWithIndex ge...
Github user tejasapatil commented on the issue: https://github.com/apache/spark/pull/15550 Have you looked for other places in the codebase which would also produce wrong result by using scala's `zipWithIndex()` ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15550: [SPARK-18003][Spark Core] Fix bug of RDD zipWithI...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/15550#discussion_r84004989 --- Diff: core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala --- @@ -64,8 +64,14 @@ class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = { val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition] -firstParent[T].iterator(split.prev, context).zipWithIndex.map { x => - (x._1, split.startIndex + x._2) +val parentIter = firstParent[T].iterator(split.prev, context) +new Iterator[(T, Long)] { + var idxAcc: Long = -1L --- End diff -- or rather split.startIndex - 1 given the current code for idxAcc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15532: [SPARK-17989][SQL] Check ascendingOrder type in s...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/15532#discussion_r84004834 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -124,7 +124,15 @@ case class SortArray(base: Expression, ascendingOrder: Expression) override def checkInputDataTypes(): TypeCheckResult = base.dataType match { case ArrayType(dt, _) if RowOrdering.isOrderable(dt) => - TypeCheckResult.TypeCheckSuccess + ascendingOrder match { +case Literal(_: Boolean, BooleanType) => + TypeCheckResult.TypeCheckSuccess +case _ => + TypeCheckResult.TypeCheckFailure( +s"Sort order in second argument requires ${BooleanType.simpleString} type as " + --- End diff -- Actually, I wrote this as I was worried of the case `cast(NULL as boolean)`. > Sort order in second argument requires boolean type as non-null, however, it is 'CAST(NULL AS BOOLEAN)' as boolean type. I will just make it short. Actually, that sentence was copied from the default one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15532: [SPARK-17989][SQL] Check ascendingOrder type in s...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15532#discussion_r84004551 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -124,7 +124,15 @@ case class SortArray(base: Expression, ascendingOrder: Expression) override def checkInputDataTypes(): TypeCheckResult = base.dataType match { case ArrayType(dt, _) if RowOrdering.isOrderable(dt) => - TypeCheckResult.TypeCheckSuccess + ascendingOrder match { +case Literal(_: Boolean, BooleanType) => + TypeCheckResult.TypeCheckSuccess +case _ => + TypeCheckResult.TypeCheckFailure( +s"Sort order in second argument requires ${BooleanType.simpleString} type as " + --- End diff -- (also technically your current english sentence is grammatically incorrect; you can't use comma to separate however here) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15532: [SPARK-17989][SQL] Check ascendingOrder type in s...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15532#discussion_r84004506 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -124,7 +124,15 @@ case class SortArray(base: Expression, ascendingOrder: Expression) override def checkInputDataTypes(): TypeCheckResult = base.dataType match { case ArrayType(dt, _) if RowOrdering.isOrderable(dt) => - TypeCheckResult.TypeCheckSuccess + ascendingOrder match { +case Literal(_: Boolean, BooleanType) => + TypeCheckResult.TypeCheckSuccess +case _ => + TypeCheckResult.TypeCheckFailure( +s"Sort order in second argument requires ${BooleanType.simpleString} type as " + --- End diff -- maybe even ignore the actual value to keep it short. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15532: [SPARK-17989][SQL] Check ascendingOrder type in s...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15532#discussion_r84004457 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -124,7 +124,15 @@ case class SortArray(base: Expression, ascendingOrder: Expression) override def checkInputDataTypes(): TypeCheckResult = base.dataType match { case ArrayType(dt, _) if RowOrdering.isOrderable(dt) => - TypeCheckResult.TypeCheckSuccess + ascendingOrder match { +case Literal(_: Boolean, BooleanType) => + TypeCheckResult.TypeCheckSuccess +case _ => + TypeCheckResult.TypeCheckFailure( +s"Sort order in second argument requires ${BooleanType.simpleString} type as " + --- End diff -- Even better, say "requires a boolean literal" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15547: [SPARK-18002][SQL] Pruning unnecessary IsNotNull predica...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15547 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/67169/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15532: [SPARK-17989][SQL] Check ascendingOrder type in s...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15532#discussion_r84004431 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -124,7 +124,15 @@ case class SortArray(base: Expression, ascendingOrder: Expression) override def checkInputDataTypes(): TypeCheckResult = base.dataType match { case ArrayType(dt, _) if RowOrdering.isOrderable(dt) => - TypeCheckResult.TypeCheckSuccess + ascendingOrder match { +case Literal(_: Boolean, BooleanType) => + TypeCheckResult.TypeCheckSuccess +case _ => + TypeCheckResult.TypeCheckFailure( +s"Sort order in second argument requires ${BooleanType.simpleString} type as " + --- End diff -- i think you should just say "requires boolean type" and remove "as non-null". I don't know what that was saying ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15547: [SPARK-18002][SQL] Pruning unnecessary IsNotNull predica...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15547 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15297: [WIP][SPARK-9862]Handling data skew
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15297 **[Test build #67175 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67175/consoleFull)** for PR 15297 at commit [`3682a9e`](https://github.com/apache/spark/commit/3682a9ecf72caebc186cc1539dfa0ebe8683aef2). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15532: [SPARK-17989][SQL] Check ascendingOrder type in sort_arr...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15532 **[Test build #67174 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67174/consoleFull)** for PR 15532 at commit [`eafe4d6`](https://github.com/apache/spark/commit/eafe4d66121c8e22fb149b8071403506013e90a8). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15547: [SPARK-18002][SQL] Pruning unnecessary IsNotNull predica...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15547 **[Test build #67169 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67169/consoleFull)** for PR 15547 at commit [`a264eba`](https://github.com/apache/spark/commit/a264ebac6924ae7184abe6fc75fa0dfb659b7d54). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15532: [SPARK-17989][SQL] Check ascendingOrder type in s...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/15532#discussion_r84004039 --- Diff: sql/core/src/test/resources/sql-tests/results/array.sql.out --- @@ -124,8 +124,23 @@ struct,sort_array(tinyint_array, -- !query 8 output [true] [1,2] [1,2] [1,2] [1,2] [9223372036854775808,9223372036854775809] [1.0,2.0] [1.0,2.0] [2016-03-13,2016-03-14] [2016-11-12 20:54:00.0,2016-11-15 20:54:00.0] - -- !query 9 +select sort_array(array('b', 'd'), '1') +-- !query 9 schema +struct<> +-- !query 9 output +org.apache.spark.sql.AnalysisException +cannot resolve 'sort_array(array('b', 'd'), '1')' due to data type mismatch: Sort order in second argument of sort_array() requires boolean type as non-null, however, it is ''1'' as string type.; line 1 pos 7 --- End diff -- Thank you for confirming @tejasapatil --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15532: [SPARK-17989][SQL] Check ascendingOrder type in s...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15532#discussion_r84003975 --- Diff: sql/core/src/test/resources/sql-tests/results/array.sql.out --- @@ -124,8 +124,23 @@ struct,sort_array(tinyint_array, -- !query 8 output [true] [1,2] [1,2] [1,2] [1,2] [9223372036854775808,9223372036854775809] [1.0,2.0] [1.0,2.0] [2016-03-13,2016-03-14] [2016-11-12 20:54:00.0,2016-11-15 20:54:00.0] - -- !query 9 +select sort_array(array('b', 'd'), '1') +-- !query 9 schema +struct<> +-- !query 9 output +org.apache.spark.sql.AnalysisException +cannot resolve 'sort_array(array('b', 'd'), '1')' due to data type mismatch: Sort order in second argument of sort_array() requires boolean type as non-null, however, it is ''1'' as string type.; line 1 pos 7 --- End diff -- Yes. Now that I see the final error message, its fine to omit that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrainedSche...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/15481 BTW, it was interesting that the earlier change did not trigger a test failure (the issue @viirya pointed out - about needing to move RemoveExecutor to receive) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrainedSche...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/15481 LGTM, @zsxwing any comments ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84002685 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84002480 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -109,6 +108,85 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } + test("Scheduler does not always schedule tasks on the same workers") { +val taskScheduler = setupScheduler() +roundrobin(taskScheduler) + } + + test("User can specify the roundrobin task assigner") { +val taskScheduler = setupScheduler(("spark.scheduler.taskAssigner", "RoUndrObin")) +roundrobin(taskScheduler) + } + + test("Fallback to roundrobin when the task assigner provided is not valid") { +val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "invalid") +roundrobin(taskScheduler) + } + + test("Scheduler balance the assignment to the worker with more free cores") { --- End diff -- Can you please clarify? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84002353 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84002236 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark issue #15377: [SPARK-17802] Improved caller context logging.
Github user lins05 commented on the issue: https://github.com/apache/spark/pull/15377 @weiqingy I agree that's a problem. But i don't see how to unit test the `callerContextSupported` method without repeating the same logic in the test code. Do you have any suggestion? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15532: [SPARK-17989][SQL] Check ascendingOrder type in s...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/15532#discussion_r84001844 --- Diff: sql/core/src/test/resources/sql-tests/inputs/array.sql --- @@ -71,6 +71,10 @@ select sort_array(timestamp_array) from primitive_arrays; +select sort_array(array('b', 'd'), '1'); --- End diff -- Sure, I will. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15546: [SPARK-17982][SQL] SQLBuilder should wrap the generated ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15546 **[Test build #67172 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67172/consoleFull)** for PR 15546 at commit [`105c36a`](https://github.com/apache/spark/commit/105c36ab7ee014171f215114e640d27eaf3ece42). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrainedSche...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15481 **[Test build #67173 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67173/consoleFull)** for PR 15481 at commit [`7d86054`](https://github.com/apache/spark/commit/7d86054cf97c81abfcbf3737a555bbd91c77e8a4). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15532: [SPARK-17989][SQL] Check ascendingOrder type in s...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15532#discussion_r84001226 --- Diff: sql/core/src/test/resources/sql-tests/inputs/array.sql --- @@ -71,6 +71,10 @@ select sort_array(timestamp_array) from primitive_arrays; +select sort_array(array('b', 'd'), '1'); --- End diff -- add some comment explaining what this is testing? (error reporting) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrainedSche...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/15481 Updated, can you review again? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15532: [SPARK-17989][SQL] Check ascendingOrder type in sort_arr...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15532 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/67167/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15532: [SPARK-17989][SQL] Check ascendingOrder type in sort_arr...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15532 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83997822 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83996049 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84000957 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83998311 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -109,6 +108,85 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } + test("Scheduler does not always schedule tasks on the same workers") { +val taskScheduler = setupScheduler() +roundrobin(taskScheduler) + } + + test("User can specify the roundrobin task assigner") { +val taskScheduler = setupScheduler(("spark.scheduler.taskAssigner", "RoUndrObin")) +roundrobin(taskScheduler) + } + + test("Fallback to roundrobin when the task assigner provided is not valid") { +val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "invalid") +roundrobin(taskScheduler) + } + + test("Scheduler balance the assignment to the worker with more free cores") { --- End diff -- I think you should structure this so that tests for the same task assigner are packed together. Right now, one has the look into each test case to figure out which policy is being tested. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83996692 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83998150 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83996326 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83995317 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified --- End diff -- nit: `different the locality restrictions` => `different locality restrictions` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83998358 --- Diff: docs/configuration.md --- @@ -1342,6 +1342,19 @@ Apart from these, the following properties are also available, and may be useful Should be greater than or equal to 1. Number of allowed retries = this value - 1. + + spark.scheduler.taskAssigner + roundrobin + +The strategy of how to allocate tasks among workers with free cores. By default, roundrobin --- End diff -- nit: create a list for each policy and explain inline instead of saying `former`, `latter` below. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83995649 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { --- End diff -- This feels like something which is set once for object generation and not supposed to be mutated later. I am not able to come up with better ways to do that. (Maybe pass it as a constructor param ?) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83995590 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 --- End diff -- - nit: use `protected` - nit: Why camel case ? This looks like an regular class var and not a constant. I know you want this to be treated as a constant but probably should have better guarding against that --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83995463 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to --- End diff -- It might be better to put these into separate points and not as a paragraph. Also, I am not sure what the protocol is about putting details like method names in the doc. As things stand, it will serve good for people trying to read the code but as the codebase evolves, things might get out of sync if this comment is not updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83996865 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83998964 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -250,24 +248,26 @@ private[spark] class TaskSchedulerImpl( private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, - shuffledOffers: Seq[WorkerOffer], - availableCpus: Array[Int], - tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { + taskAssigner: TaskAssigner) : Boolean = { var launchedTask = false -for (i <- 0 until shuffledOffers.size) { - val execId = shuffledOffers(i).executorId - val host = shuffledOffers(i).host - if (availableCpus(i) >= CPUS_PER_TASK) { +taskAssigner.init() +while(taskAssigner.hasNext) { --- End diff -- nit: space after `while` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83995119 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. --- End diff -- "this worker" ? `this` wont represent a worker here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83999715 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83996351 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean
[GitHub] spark issue #15532: [SPARK-17989][SQL] Check ascendingOrder type in sort_arr...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15532 **[Test build #67167 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67167/consoleFull)** for PR 15532 at commit [`55c0804`](https://github.com/apache/spark/commit/55c0804880fbcdddb1a57431ce9825e0a9016961). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83998916 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -60,7 +58,7 @@ private[spark] class TaskSchedulerImpl( def this(sc: SparkContext) = this(sc, sc.conf.get(config.MAX_TASK_FAILURES)) val conf = sc.conf - + private val taskAssigner: TaskAssigner = TaskAssigner.init(conf) --- End diff -- nit: extra space in `private val` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83995208 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ --- End diff -- nit: `Tracking` => `Tracks` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83995189 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested --- End diff -- nit: `requested` => `requests` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15542: [SPARK-17996][SQL] Fix unqualified catalog.getFunction(....
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15542 **[Test build #67171 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67171/consoleFull)** for PR 15542 at commit [`5597d25`](https://github.com/apache/spark/commit/5597d254ff36d18adf775b50f18bc255c9bbee97). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [SPARK-17931][CORE] taskScheduler has some unneeded seri...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15505 **[Test build #67159 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67159/consoleFull)** for PR 15505 at commit [`589f3bb`](https://github.com/apache/spark/commit/589f3bba59be5c34a031584c8c9b53b0f48533be). * This patch **fails from timeout after a configured wait of \`250m\`**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [SPARK-17931][CORE] taskScheduler has some unneeded seri...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15505 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/67159/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [SPARK-17931][CORE] taskScheduler has some unneeded seri...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15505 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r8416 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83999827 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83999756 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83999452 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark pull request #15550: [SPARK-18003][Spark Core] Fix bug of RDD zipWithI...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/15550#discussion_r83999445 --- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala --- @@ -833,6 +833,30 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { } } + test("zipWithIndex with partition size exceeding MaxInt") { +val result = sc.parallelize(Seq(1), 1).mapPartitions( --- End diff -- all right. I'll update the testcase. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15550: [SPARK-18003][Spark Core] Fix bug of RDD zipWithI...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15550#discussion_r83999283 --- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala --- @@ -833,6 +833,30 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { } } + test("zipWithIndex with partition size exceeding MaxInt") { +val result = sc.parallelize(Seq(1), 1).mapPartitions( --- End diff -- hm that's about 299s longer than I'd like for a unit test ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15546: [SPARK-17982][SQL] SQLBuilder should wrap the generated ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15546 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/67165/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15546: [SPARK-17982][SQL] SQLBuilder should wrap the generated ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15546 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15546: [SPARK-17982][SQL] SQLBuilder should wrap the generated ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15546 **[Test build #67165 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67165/consoleFull)** for PR 15546 at commit [`85c0686`](https://github.com/apache/spark/commit/85c0686a90395f3a7b56f22d78654e83e7ede7a6). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15550: [SPARK-18003][Spark Core] Fix bug of RDD zipWithI...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15550#discussion_r83998814 --- Diff: core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala --- @@ -64,8 +64,14 @@ class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = { val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition] -firstParent[T].iterator(split.prev, context).zipWithIndex.map { x => - (x._1, split.startIndex + x._2) +val parentIter = firstParent[T].iterator(split.prev, context) +new Iterator[(T, Long)] { + var idxAcc: Long = -1L --- End diff -- also why don't we initialize this to split.startIndex? I'd also rename this just "index" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83998771 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark pull request #15550: [SPARK-18003][Spark Core] Fix bug of RDD zipWithI...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15550#discussion_r83998765 --- Diff: core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala --- @@ -64,8 +64,14 @@ class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = { val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition] -firstParent[T].iterator(split.prev, context).zipWithIndex.map { x => - (x._1, split.startIndex + x._2) +val parentIter = firstParent[T].iterator(split.prev, context) +new Iterator[(T, Long)] { + var idxAcc: Long = -1L --- End diff -- private[this] --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15297: [WIP][SPARK-9862]Handling data skew
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15297 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83998540 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark issue #15297: [WIP][SPARK-9862]Handling data skew
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15297 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/67162/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15297: [WIP][SPARK-9862]Handling data skew
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15297 **[Test build #67162 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67162/consoleFull)** for PR 15297 at commit [`36624ec`](https://github.com/apache/spark/commit/36624ec94fdc3a2eeeadea1d86cdf2e794198434). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15546: [SPARK-17982][SQL] SQLBuilder should wrap the generated ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15546 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/67166/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15546: [SPARK-17982][SQL] SQLBuilder should wrap the generated ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15546 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15546: [SPARK-17982][SQL] SQLBuilder should wrap the generated ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15546 **[Test build #67166 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67166/consoleFull)** for PR 15546 at commit [`2af55c8`](https://github.com/apache/spark/commit/2af55c844a570e7c32af363842562a03c570e5e5). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15550: [SPARK-18003][Spark Core] Fix bug of RDD zipWithIndex ge...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15550 **[Test build #67170 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67170/consoleFull)** for PR 15550 at commit [`18c3f49`](https://github.com/apache/spark/commit/18c3f4914ca3fb5800dd68ac9d689b6f11cb2e92). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15550: [SPARK-18003][Spark Core] Fix bug of RDD zipWithI...
GitHub user WeichenXu123 opened a pull request: https://github.com/apache/spark/pull/15550 [SPARK-18003][Spark Core] Fix bug of RDD zipWithIndex generating wrong result when one partition contains more than 2147483647 records ## What changes were proposed in this pull request? Fix bug of RDD zipWithIndex generating wrong result when one partition contains more than 2147483647 records. ## How was this patch tested? test added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/WeichenXu123/spark fix_rdd_zipWithIndex_overflow Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15550.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15550 commit 18c3f4914ca3fb5800dd68ac9d689b6f11cb2e92 Author: WeichenXu Date: 2016-10-18T01:59:24Z update. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83998058 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark issue #15534: [SPARK-17917][Scheduler] Fire SparkListenerTasksStarved ...
Github user mariobriggs commented on the issue: https://github.com/apache/spark/pull/15534 thanks. Sounds possible. Let me try it that way with my app and i can then close this as required. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r83997249 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala --- @@ -142,319 +84,37 @@ case class HyperLogLogPlusPlus( override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType) - override def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes) - - /** Allocate enough words to store all registers. */ - override val aggBufferAttributes: Seq[AttributeReference] = Seq.tabulate(numWords) { i => -AttributeReference(s"MS[$i]", LongType)() + override def createAggregationBuffer(): HyperLogLogPlusPlusAlgo = { +new HyperLogLogPlusPlusAlgo(relativeSD) --- End diff -- @hvanhovell Do you mean we put some digest object (like how PercentileDigest does in ApproximatePercentile) in the buffer instead of putting HLL++ directly in it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83997070 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Un
[GitHub] spark issue #15534: [SPARK-17917][Scheduler] Fire SparkListenerTasksStarved ...
Github user rxin commented on the issue: https://github.com/apache/spark/pull/15534 If a job is submitted and there is no task start update for a while? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrai...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15481#discussion_r83996798 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -393,7 +393,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Remove all the lingering executors that should be removed but not yet. The reason might be // because (1) disconnected event is not yet received; (2) executors die silently. executorDataMap.toMap.foreach { case (eid, _) => - driverEndpoint.askWithRetry[Boolean]( + driverEndpoint.send( --- End diff -- Good catch. I think here we can just call `removeExecutor(...)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15549: Bootstrap perf
Github user ahshahid closed the pull request at: https://github.com/apache/spark/pull/15549 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15534: [SPARK-17917][Scheduler] Fire SparkListenerTasksStarved ...
Github user mariobriggs commented on the issue: https://github.com/apache/spark/pull/15534 >> The user can track this pretty easily themselves, can't they? << Can you explain a little more on your line of thinking here? I am more than happy to not have this code added if there exists other ways to track this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15539: [SPARK-17994] [SQL] Add back a file status cache for cat...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15539 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/67160/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15549: Bootstrap perf
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15549 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15539: [SPARK-17994] [SQL] Add back a file status cache for cat...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15539 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15539: [SPARK-17994] [SQL] Add back a file status cache for cat...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15539 **[Test build #67160 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67160/consoleFull)** for PR 15539 at commit [`b9272c2`](https://github.com/apache/spark/commit/b9272c2a51ab981a6343a43931f57d03e931aab1). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15549: Bootstrap perf
GitHub user ahshahid opened a pull request: https://github.com/apache/spark/pull/15549 Bootstrap perf reducing the generated code for struct, if the field elements of the struct are of same primitive data type. (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/SnappyDataInc/spark bootstrap_perf Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15549.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15549 commit 0e9333b275c8307a24bb6c7e8409ea48d4bac3d6 Author: Ryan Blue Date: 2016-07-08T19:37:26Z [SPARK-16420] Ensure compression streams are closed. ## What changes were proposed in this pull request? This uses the try/finally pattern to ensure streams are closed after use. `UnsafeShuffleWriter` wasn't closing compression streams, causing them to leak resources until garbage collected. This was causing a problem with codecs that use off-heap memory. ## How was this patch tested? Current tests are sufficient. This should not change behavior. Author: Ryan Blue Closes #14093 from rdblue/SPARK-16420-unsafe-shuffle-writer-leak. (cherry picked from commit 67e085ef6dd62774095f3187844c091db1a6a72c) Signed-off-by: Reynold Xin commit e3424fd7716d0c3f6ce82acd200bda704e42d3eb Author: wujian Date: 2016-07-08T21:38:05Z [SPARK-16281][SQL] Implement parse_url SQL function ## What changes were proposed in this pull request? This PR adds parse_url SQL functions in order to remove Hive fallback. A new implementation of #13999 ## How was this patch tested? Pass the exist tests including new testcases. Author: wujian Closes #14008 from janplus/SPARK-16281. (cherry picked from commit f5fef69143b2a83bb8b168b7417e92659af0c72c) Signed-off-by: Reynold Xin commit 07f562f5881f1896a41077a367c31af704551d78 Author: Yin Huai Date: 2016-07-08T22:56:46Z [SPARK-16453][BUILD] release-build.sh is missing hive-thriftserver for scala 2.10 ## What changes were proposed in this pull request? This PR adds hive-thriftserver profile to scala 2.10 build created by release-build.sh. Author: Yin Huai Closes #14108 from yhuai/SPARK-16453. (cherry picked from commit 60ba436b7010436c77dfe5219a9662accc25bffa) Signed-off-by: Yin Huai commit 463cbf72fd6db1d0646df432f56cd121b0eed625 Author: Dongjoon Hyun Date: 2016-07-08T23:07:12Z [SPARK-16387][SQL] JDBC Writer should use dialect to quote field names. ## What changes were proposed in this pull request? Currently, JDBC Writer uses dialects to get datatypes, but doesn't to quote field names. This PR uses dialects to quote the field names, too. **Reported Error Scenario (MySQL case)** ```scala scala> val url="jdbc:mysql://localhost:3306/temp" scala> val prop = new java.util.Properties scala> prop.setProperty("user","root") scala> spark.createDataset(Seq("a","b","c")).toDF("order") scala> df.write.mode("overwrite").jdbc(url, "temptable", prop) ...MySQLSyntaxErrorException: ... near 'order TEXT ) ``` ## How was this patch tested? Pass the Jenkins tests and manually do the above case. Author: Dongjoon Hyun Closes #14107 from dongjoon-hyun/SPARK-16387. (cherry picked from commit 3b22291b5f0317609cd71ce7af78e4c5063d66e8) Signed-off-by: Reynold Xin commit c425230fdf1654aecaa84aba02b6844923c56d61 Author: cody koeninger Date: 2016-07-09T00:47:58Z [SPARK-13569][STREAMING][KAFKA] pattern based topic subscription ## What changes were proposed in this pull request? Allow for kafka topic subscriptions based on a regex pattern. ## How was this patch tested? Unit tests, manual tests Author: cody koeninger Closes #14026 from koeninger/SPARK-13569. (cherry picked from commit fd6e8f0e2269a2e7f24f79d5c2041816ea308c86) Signed-off-by: Tathagata Das commit 16202ba684eae8d200e063abfe154c3d1b8106a5 Author: Sean Owen Date: 2016-07-09T03:17:50Z [SPARK-16376][WEBUI][SPARK WEB UI][APP-ID] HTTP ERROR 500 when using rest api "/applications//jobs" if array "stageIds" is empty ## What changes were proposed in this pull request? Avoid error finding max of empty Seq when
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83996232 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean)
[GitHub] spark issue #15525: [SPARK-17985][CORE] Bump commons-lang3 version to 3.5.
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/15525 @rxin I submitted a new pr #15548. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15547: [SPARK-18002][SQL] Pruning unnecessary IsNotNull predica...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15547 **[Test build #67169 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67169/consoleFull)** for PR 15547 at commit [`a264eba`](https://github.com/apache/spark/commit/a264ebac6924ae7184abe6fc75fa0dfb659b7d54). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15548: [SPARK-17985][CORE] Bump commons-lang3 version to 3.5.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15548 **[Test build #67168 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/67168/consoleFull)** for PR 15548 at commit [`f318dff`](https://github.com/apache/spark/commit/f318dffd4137c20bdc67ac054e345d55703d96de). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15548: [SPARK-17985][CORE] Bump commons-lang3 version to...
GitHub user ueshin opened a pull request: https://github.com/apache/spark/pull/15548 [SPARK-17985][CORE] Bump commons-lang3 version to 3.5. ## What changes were proposed in this pull request? `SerializationUtils.clone()` of commons-lang3 (<3.5) has a bug that breaks thread safety, which gets stack sometimes caused by race condition of initializing hash map. See https://issues.apache.org/jira/browse/LANG-1251. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ueshin/apache-spark issues/SPARK-17985 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15548.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15548 commit f318dffd4137c20bdc67ac054e345d55703d96de Author: Takuya UESHIN Date: 2016-10-18T02:42:14Z Bump commons-lang3 version to 3.5. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15547: [SPARK-18002][SQL] Pruning unnecessary IsNotNull ...
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/15547 [SPARK-18002][SQL] Pruning unnecessary IsNotNull predicates from Filter ## What changes were proposed in this pull request? In `PruneFilters` rule, we can prune unnecessary `IsNotNull` predicates if the predicate in `IsNotNull` is not nullable. ## How was this patch tested? Jenkins tests. Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 prune-isnotnull Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15547.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15547 commit a264ebac6924ae7184abe6fc75fa0dfb659b7d54 Author: Liang-Chi Hsieh Date: 2016-10-19T04:11:54Z Pruning unnecessary IsNotNull predicates from Filter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org