[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan closed the pull request at: https://github.com/apache/spark/pull/15541 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20480: [Spark-23306] Fix the oom caused by contention
GitHub user zhzhan opened a pull request: https://github.com/apache/spark/pull/20480 [Spark-23306] Fix the oom caused by contention ## What changes were proposed in this pull request? here is race condition in TaskMemoryManger, which may cause OOM. The memory released may be taken by another task because there is a gap between releaseMemory and acquireMemory, e.g., UnifiedMemoryManager, causing the OOM. if the current is the only one that can perform spill. It can happen to BytesToBytesMap, as it only spill required bytes. Loop on current consumer if it still has memory to release. ## How was this patch tested? The race contention is hard to reproduce, but the current logic seems causing the issue. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhzhan/spark oom Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20480.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20480 commit df96f0c126833b0e812cd715ae1538dbd38afac4 Author: Zhan Zhang <zhanzhang@...> Date: 2018-01-12T19:51:19Z fix the oom caused by contention --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17180: [SPARK-19839][Core]release longArray in BytesToBytesMap
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/17180 retest it please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17180: [SPARK-19839][Core]release longArray in BytesToBytesMap
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/17180 Will fix the unit test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18694: [SPARK-21492][SQL] Memory leak in SortMergeJoin
Github user zhzhan closed the pull request at: https://github.com/apache/spark/pull/18694 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18694: [SPARK-21492][SQL] Memory leak in SortMergeJoin
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/18694 Close the PR and will work on adding close interface for the iterator used in SparkSQL to remove extra overhead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17180: [SPARK-19839][Core]release longArray in BytesToBytesMap
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/17180 The test failure us caused by call method on the map after `destructiveIterator()` has been called. It is illegal by the definition. https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L417 We should remove this test case as it does not follow the restriction. Please let me know the feedback. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17180: [SPARK-19839][Core]release longArray in BytesToBytesMap
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/17180 per review comments, release the longArray on destructive iterator creation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18694: [SPARK-21492][SQL] Memory leak in SortMergeJoin
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/18694 Currently the patch helps the scenario such as Join(A, Join(B,C)). It is critical for us because we have some internal development in which each stage may consists of tens of sort operators. We found each operators takes the memory without releasing the current page, and causes a lot of spills. Such memory leak becomes critical (ShuffledHashJoin has similar issues and we did not hit issues caused by Limit). To me, the leak itself is a bug. If it is agreed that we should fix this type of leak, we can find a more elegant way, such as new close() interface, to avoid the overhead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18694: [SPARK-21492][SQL] Memory leak in SortMergeJoin
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/18694 If it is assumed that the pipeline is as simple as one stage only has one operator need to spill, you are right. But if the pipeline is more complex, for example multiple operator needs to spill, this leak can cause serious issue. A more elegant way is to expose a new interface, e.g., close() for RowIterator. If it is agreed, we can implement that and solve the issue without overhead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18694: [SPARK-21492][SQL] Memory leak in SortMergeJoin
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/18694 cleanup hook is used after task is done. The diff solve the leak for SortMergeJoin only and does not apply to the limit case. Limit is another special case and need to be taken care of separately. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18694: [SPARK-21492][SQL] Memory leak in SortMergeJoin
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/18694#discussion_r128683903 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -649,6 +660,11 @@ private[joins] class SortMergeJoinScanner( // Initialization (note: do _not_ want to advance streamed here). advancedBufferedToRowWithNullFreeJoinKey() + def destruct(): Unit = { + while (streamedIter.advanceNext()) {} + while (bufferedIter.advanceNext()) {} --- End diff -- Detail is explained below. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18694: [SPARK-21492][SQL] Memory leak in SortMergeJoin
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/18694 The memory leak happens on following scenario. For example, in inner join, the left side is exhausted, we will stop advance the right side. Because the right side is not reach the end, the memory hold will not be released, cannot be used by any other operator, for example, UnsafeShuffleWriter, causing more spills. Will locate the code logic in UnsafeExternalSorter that prevent the memory being released. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18694: [SPARK-21492][SQL] Memory leak in SortMergeJoin
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/18694#discussion_r128679491 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -649,6 +660,11 @@ private[joins] class SortMergeJoinScanner( // Initialization (note: do _not_ want to advance streamed here). advancedBufferedToRowWithNullFreeJoinKey() + def destruct(): Unit = { + while (streamedIter.advanceNext()) {} + while (bufferedIter.advanceNext()) {} --- End diff -- It does introduce extra overhead. The other way is to introduce a new interface for RowIterator to destruct itself. But memory leak is worse than extra overhead, because it causes more spill. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18694: [SPARK-21492][SQL] Memory leak in SortMergeJoin
GitHub user zhzhan opened a pull request: https://github.com/apache/spark/pull/18694 [SPARK-21492][SQL] Memory leak in SortMergeJoin ## What changes were proposed in this pull request? Fix the memory in SortMergeJoin ## How was this patch tested? Relies on existing unit test. Test in production job, and the memory leak is fixed by the diff. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhzhan/spark leak Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18694.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18694 commit b8acae2d35c342a117222a3a0cc111f31bd4b4c4 Author: Zhan Zhang <zhanzh...@fb.com> Date: 2017-07-20T20:25:16Z fix memory leak on SortMergeJoin --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17180: [SPARK-19839][Core]release longArray in BytesToBy...
GitHub user zhzhan opened a pull request: https://github.com/apache/spark/pull/17180 [SPARK-19839][Core]release longArray in BytesToBytesMap ## What changes were proposed in this pull request? When BytesToBytesMap spills, its longArray should be released. Otherwise, it may not released until the task complete. This array may take a significant amount of memory, which cannot be used by later operator, such as UnsafeShuffleExternalSorter, resulting in more frequent spill in sorter. This patch release the array as destructive iterator will not use this array anymore. ## How was this patch tested? Manual test in production You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhzhan/spark memory Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17180.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17180 commit 562625621f701c77e7755a82c9d9551688f97684 Author: Zhan Zhang <zhanzh...@fb.com> Date: 2017-03-06T20:02:01Z release longArray in BytesToBytesMap --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17155: [SPARK-19815][SQL] Not orderable should be applied to ri...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/17155 @gatorsmile Thanks for reviewing this. I am thinking the logic again. On the surface, the logic may be correct. Since in the join, the left and right key should be the same type. Will close the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17155: [SPARK-19815][SQL] Not orderable should be applie...
Github user zhzhan closed the pull request at: https://github.com/apache/spark/pull/17155 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17155: [SPARK-19815][SQL] Not order able should be appli...
GitHub user zhzhan opened a pull request: https://github.com/apache/spark/pull/17155 [SPARK-19815][SQL] Not order able should be applied to right key instead of left key ## What changes were proposed in this pull request? Change the orderable condition. ## How was this patch tested? Relies on existing test. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhzhan/spark hashjoin Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17155.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17155 commit 91a61658923418e648bd8960feb6b7e09ef6f915 Author: Zhan Zhang <zhanzh...@fb.com> Date: 2017-03-03T23:35:03Z the rightkey should not be orderable instead of left key --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeRowArray...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/16909 @hvanhovell @davies Correct me if I am wrong. My understanding is that following code will go though all matching rows on the right side, and put them into the BufferedRowIterator. If there is OOM caused by ArrayList matches in SortMergeJoinExec, the memory usage will be doubled in currentRows in BufferedRowIterator (assuming the left and right have the same size). There are two way to solve it. One is to consume one row at a time, and the other one is make BufferedRowIterator spoilable (which should be much easier based on this PR). https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L556 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeRowArray...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/16909 @tejasapatil Do you want to fix the BufferedRowIterator for WholeStageCodegenExec as well? As for inner join, the LinkedList currentRows would cause the same issue as it buffer the rows from inner join, and takes more memory (probably double if left and right has similar size). Also they can share the similar iterator data structure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r91570259 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala --- @@ -487,6 +489,26 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) { + sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") + sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") + val testData = spark.range(10).repartition(1) + + // Expected Max(s) is 10 as statefulUDF returns the sequence number starting from 1. + checkAnswer(testData.selectExpr("statefulUDF() as s").agg(max($"s")), Row(10)) + + // Expected Max(s) is 5 as statefulUDF returns the sequence number starting from 1, + // and the data is evenly distributed into 2 partitions. --- End diff -- case logical.Repartition(numPartitions, shuffle, child) => if (shuffle) { ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil } else { execution.CoalesceExec(numPartitions, planLater(child)) :: Nil } case logical.RepartitionByExpression(expressions, child, nPartitions) => exchange.ShuffleExchange(HashPartitioning( expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r91569919 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala --- @@ -487,6 +489,26 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) { + sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") + sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") + val testData = spark.range(10).repartition(1) + + // Expected Max(s) is 10 as statefulUDF returns the sequence number starting from 1. + checkAnswer(testData.selectExpr("statefulUDF() as s").agg(max($"s")), Row(10)) + + // Expected Max(s) is 5 as statefulUDF returns the sequence number starting from 1, + // and the data is evenly distributed into 2 partitions. --- End diff -- My understanding is that repartition uses RoundRobinPartitioning. repartition(2, $"id" < 5) uses hash implementation. In this case, I found all rows are shuffled to one partition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r91142141 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala --- @@ -487,6 +488,52 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) { + sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") + sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") + withTempView("inputTable") { --- End diff -- The test cannot resolve the function and throw the error if I use: test("Hive Stateful UDF") { withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) { sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") val testData = spark.range(10).repartition(1) println(s"start session: $spark") val m = testData.select("statefulUDF() as s, ") checkAnswer(testData.select("statefulUDF() as s").agg(max($"s")), Row(10)) ... Do I miss anything, or is it a bug? I will investigate why this happens. Failed to analyze query: org.apache.spark.sql.AnalysisException: cannot resolve '`statefulUDF() as s`' given input columns: [id];; 'Project ['statefulUDF() as s] +- Repartition 1, true +- Range (0, 10, step=1, splits=Some(1)) org.apache.spark.sql.AnalysisException: cannot resolve '`statefulUDF() as s`' given input columns: [id];; 'Project ['statefulUDF() as s] +- Repartition 1, true +- Range (0, 10, step=1, splits=Some(1)) at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:314) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:314) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:313) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:282) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:292) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:296) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:296) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$7.apply(QueryPlan.scala:301) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:192) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:301) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:74) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:132) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48) at org.apache.spark.sql.Dataset$.ofRows(Da
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r91026585 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala --- @@ -487,6 +488,52 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) { + sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") + sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") + withTempView("inputTable") { +val testData = spark.sparkContext.parallelize( + (0 until 10) map (x => IntegerCaseClass(1)), 2).toDF() +testData.createOrReplaceTempView("inputTable") +// Distribute all rows to one partition (all rows have the same content), --- End diff -- @cloud-fan Thanks for the review. Because all rows only contains IntegerCaseClass(1), RepartitionByExpression will assign all rows to one partition, which has 10 records. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r91026433 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala --- @@ -487,6 +488,52 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) { + sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") + sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") + withTempView("inputTable") { +val testData = spark.sparkContext.parallelize( + (0 until 10) map (x => IntegerCaseClass(1)), 2).toDF() +testData.createOrReplaceTempView("inputTable") +// Distribute all rows to one partition (all rows have the same content), +// and expected Max(s) is 10 as statefulUDF returns the sequence number starting from 1. +checkAnswer( + sql( +""" +|SELECT MAX(s) FROM +| (SELECT statefulUDF() as s FROM +|(SELECT i from inputTable DISTRIBUTE by i) a +|) b + """.stripMargin), + Row(10)) + +// Expected Max(s) is 5, as there are 2 partitions with 5 rows each, and statefulUDF +// returns the sequence number of the rows in the partition starting from 1. +checkAnswer( + sql( +""" + |SELECT MAX(s) FROM + | (SELECT statefulUDF() as s FROM + |(SELECT i from inputTable) a + |) b +""".stripMargin), + Row(5)) + +// Expected Max(s) is 1, as stateless UDF is deterministic and replaced by constant 1. --- End diff -- StatelessUDF is foldable: override def foldable: Boolean = isUDFDeterministic && children.forall(_.foldable) ConstantFolding optimizer will replace it with constant: case e if e.foldable => Literal.create(e.eval(EmptyRow), e.dataType) Here is the explain(true): == Parsed Logical Plan == 'Project [unresolvedalias('MAX('s), None)] +- 'SubqueryAlias b +- 'Project ['statelessUDF() AS s#39] +- 'SubqueryAlias a +- 'RepartitionByExpression ['i] +- 'Project ['i] +- 'UnresolvedRelation `inputTable` == Analyzed Logical Plan == max(s): bigint Aggregate [max(s#39L) AS max(s)#46L] +- SubqueryAlias b +- Project [HiveSimpleUDF#org.apache.spark.sql.hive.execution.StatelessUDF() AS s#39L] +- SubqueryAlias a +- RepartitionByExpression [i#4] +- Project [i#4] +- SubqueryAlias inputtable +- SerializeFromObject [assertnotnull(assertnotnull(input[0, org.apache.spark.sql.hive.execution.IntegerCaseClass, true], top level Product input object), - root class: "org.apache.spark.sql.hive.execution.IntegerCaseClass").i AS i#4] +- ExternalRDD [obj#3] == Optimized Logical Plan == Aggregate [max(s#39L) AS max(s)#46L] +- Project [1 AS s#39L] +- RepartitionByExpression [i#4] +- SerializeFromObject [assertnotnull(assertnotnull(input[0, org.apache.spark.sql.hive.execution.IntegerCaseClass, true], top level Product input object), - root class: "org.apache.spark.sql.hive.execution.IntegerCaseClass").i AS i#4] +- ExternalRDD [obj#3] == Physical Plan == *HashAggregate(keys=[], functions=[max(s#39L)], output=[max(s)#46L]) +- Exchange SinglePartition +- *HashAggregate(keys=[], functions=[partial_max(s#39L)], output=[max#48L]) +- *Project [1 AS s#39L] +- Exchange hashpartitioning(i#4, 5) +- *SerializeFromObject [assertnotnull(assertnotnull(input[0, org.apache.spark.sql.hive.execution.IntegerCaseClass, true], top level Product input object), - root class: "org.apache.spark.sql.hive.execution.IntegerCaseClass").i AS i#4] +- Scan ExternalRDDScan[obj#3] --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r90763121 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala --- @@ -487,6 +488,29 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") +sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") +val testData = spark.sparkContext.parallelize( + (0 until 10) map(x => IntegerCaseClass(1)), 2).toDF() +testData.createOrReplaceTempView("inputTable") +val max1 = + sql("SELECT MAX(s) FROM (" + +"SELECT statefulUDF() as s FROM (SELECT i from inputTable DISTRIBUTE by i) a" + +") b").head().getLong(0) --- End diff -- will rewrite it after gathering feedback from others. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16068: [SPARK-18637][SQL]Stateful UDF should be considered as n...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/16068 @gatorsmile we cannot use deterministic = true/false, as there are existing udf with deterministic as true, but stateful as true as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16068: [SPARK-18637][SQL]Stateful UDF should be considered as n...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/16068 My understanding is that the non-deterministic udf does not need to be stageful, but a stateful udf has to be non-deterministic. Here is the comments in hive regarding this property /** If a UDF stores state based on the sequence of records it has processed, it is stateful. A stateful UDF cannot be used in certain expressions such as case statement and certain optimizations such as AND/OR short circuiting don't apply for such UDFs, as they need to be invoked for each record. row_sequence is an example of stateful UDF. A stateful UDF is considered to be non-deterministic, irrespective of what deterministic() returns. * @return true */ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16068: [SPARK-18637][SQL]Stateful UDF should be considered as n...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/16068 @hvanhovell Would you like take a look and let me know if you have any concern? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16068: [SPARK-18637][SQL]Stateful UDF should be considered as n...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/16068 @hvanhovell Thanks for looking at this. We have a big number of UDFs that have this issue. For example, the UDF gives different result with different partition/sort, but the UDF is pushdown before the partition/sort, resulting in unexpected behavior. I will working on finding some test cases for it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16068: [SPARK-18637][SQL]Stateful UDF should be considered as n...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/16068 retest it please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16068: stateful udf should be nondeterministic
GitHub user zhzhan opened a pull request: https://github.com/apache/spark/pull/16068 stateful udf should be nondeterministic ## What changes were proposed in this pull request? Make stateful udf as nondeterministic ## How was this patch tested? Mainly relies on existing queries. We also manually check the queries with stateful udf in the filter. Without the patch, the udf is mistakenly pushdown for efficiency. After the patch, the physical plan is generated correctly. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhzhan/spark state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16068.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16068 commit a4d8b4af648e53c355bee16fe371137d0b349331 Author: Zhan Zhang <zhanzh...@fb.com> Date: 2016-11-29T23:32:45Z stateful udf should be nondeterministic --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15541: [SPARK-17637][Scheduler]Packed scheduling for Spark task...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/15541 @rxin Thanks for the feedback regarding the TaskAssigner API. The current API is designed based on the current logic of TaskSchedulerImp, where the scheduler takes many rounds to assign the tasks for each task set. I have not figured out a better way yet. Any suggestions are welcome. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r85985739 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +private[scheduler] class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) + + def assignTask(task: TaskDescription, cpu: Int): Unit = { +if (coresAvailable < cpu) { + throw new SparkException(s"Available cores are less than cpu per task" + +s" ($coresAvailable < $cpu)") +} +tasks += task +coresAvailable -= cpu + } +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * performs task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes multiple rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invokes the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext/next() is used by TaskScheduler + * to check the worker availability and retrieve current offering from TaskAssigner. + * + * Fourth, TaskScheduler calls offerAccepted() to notify the TaskAssigner so that + * TaskAssigner can decide whether the current offer is valid or not for the next request. + * + * Fifth, after task assignment is done, TaskScheduler invokes the function tasks to + * retrieve all the task assignment information. + */ + +private[scheduler] sealed abstract class TaskAssigner { + protected var offer: Seq[OfferState] = _ + protected var cpuPerTask = 1 + + protected def withCpuPerTask(cpuPerTask: Int): TaskAssigner = { +this.cpuPerTask = cpuPerTask +this + } + + /** The currently assigned offers. */ + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + /** + * Invoked at the beginning of resource offering to construct the offer with the workoffers. + * By default, offers is randomly shuffled to avoid always placing tasks on the same set of + * workers. + */ + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = Random.shuffle(workOffer.map(o => new OfferState(o))) + } + + /** Invoked at each round of Taskset assignment to initialize th
[GitHub] spark issue #15541: [SPARK-17637][Scheduler]Packed scheduling for Spark task...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/15541 @rxin Would you like to take a look and let you know if you have any concern? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84621076 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -250,24 +251,24 @@ private[spark] class TaskSchedulerImpl( private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, - shuffledOffers: Seq[WorkerOffer], - availableCpus: Array[Int], - tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { + taskAssigner: TaskAssigner) : Boolean = { var launchedTask = false -for (i <- 0 until shuffledOffers.size) { - val execId = shuffledOffers(i).executorId - val host = shuffledOffers(i).host - if (availableCpus(i) >= CPUS_PER_TASK) { +taskAssigner.init() +while (taskAssigner.hasNext) { + var isAccepted = false + val currentOffer = taskAssigner.next() + val execId = currentOffer.workOffer.executorId + val host = currentOffer.workOffer.host + if (currentOffer.coresAvailable >= CPUS_PER_TASK) { try { for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { -tasks(i) += task +currentOffer.assignTask(task, CPUS_PER_TASK) val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId executorIdToTaskCount(execId) += 1 -availableCpus(i) -= CPUS_PER_TASK -assert(availableCpus(i) >= 0) --- End diff -- @viirya The assert will not fail even in the legacy code, because taskSet.resourceOffer(execId, host, maxLocality) return an Option. The for loop at most run once. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84619879 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -250,24 +251,24 @@ private[spark] class TaskSchedulerImpl( private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, - shuffledOffers: Seq[WorkerOffer], - availableCpus: Array[Int], - tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { + taskAssigner: TaskAssigner) : Boolean = { var launchedTask = false -for (i <- 0 until shuffledOffers.size) { - val execId = shuffledOffers(i).executorId - val host = shuffledOffers(i).host - if (availableCpus(i) >= CPUS_PER_TASK) { +taskAssigner.init() +while (taskAssigner.hasNext) { + var isAccepted = false + val currentOffer = taskAssigner.next() + val execId = currentOffer.workOffer.executorId + val host = currentOffer.workOffer.host + if (currentOffer.coresAvailable >= CPUS_PER_TASK) { try { for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { -tasks(i) += task +currentOffer.assignTask(task, CPUS_PER_TASK) val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId executorIdToTaskCount(execId) += 1 -availableCpus(i) -= CPUS_PER_TASK -assert(availableCpus(i) >= 0) --- End diff -- Thanks @viirya for the comments. Actually I was thinking removing the check although it is part of the legacy code. Now the check is moved into OfferState, which makes more sense. IMHO, typically the assertion should never fail. But from the OfferState's perspective, it should guarantee such restriction. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84619023 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +private[scheduler] class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) + + def assignTask(task: TaskDescription, cpu: Int): Unit = { +tasks += task +coresAvailable -= cpu +assert(coresAvailable >= 0) + } +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * performs task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes multiple rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invokes the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext/next() is used by TaskScheduler + * to check the worker availability and retrieve current offering from TaskAssigner. + * + * Fourth, TaskScheduler calls offerAccepted() to notify the TaskAssigner so that + * TaskAssigner can decide whether the current offer is valid or not for the next request. + * + * Fifth, after task assignment is done, TaskScheduler invokes the function tasks to + * retrieve all the task assignment information. + */ + +private[scheduler] sealed abstract class TaskAssigner { + protected var offer: Seq[OfferState] = _ + protected var cpuPerTask = 1 + + protected def withCpuPerTask(cpuPerTask: Int): TaskAssigner = { +this.cpuPerTask = cpuPerTask --- End diff -- You mean cpuPerTask >= 1? I don't think we need this check. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84424034 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * perform task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invoke the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext()/getNext() is used by TaskScheduler + * to check the worker availability and retrieve current offering from TaskAssigner. + * + * Fourth, then offerAccepted is used by TaskScheduler to notify the TaskAssigner so that + * TaskAssigner can decide whether the current offer is valid or not for the next request. + * + * Fifth, After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information. + */ + +private[scheduler] abstract class TaskAssigner { + protected var offer: Seq[OfferState] = _ + protected var cpuPerTask = 1 + + protected def withCpuPerTask(cpuPerTask: Int): Unit = { +this.cpuPerTask = cpuPerTask + } + + /** The final assigned offer returned to TaskScheduler. */ + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + /** Invoked at the beginning of resource offering to construct the offer with the workoffers. */ + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = Random.shuffle(workOffer.map(o => new OfferState(o))) + } + + /** Invoked at each round of Taskset assignment to initialize the internal structure. */ + def init(): Unit + + /** + * Tests Whether there is offer available to be used inside of one round of Taskset assignment. + * @return `true` if a subsequent call to `next` will yield an element, + * `false` otherwise. + */ + def hasNext: Boolean + + /** + * Produces next worker offer based on the task assignment strategy. + * @return the next available offer, if `hasNext` is `true`, + * undef
[GitHub] spark issue #15541: [SPARK-17637][Scheduler]Packed scheduling for Spark task...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/15541 @gatorsmile I didn't see your new comments --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15541: [SPARK-17637][Scheduler]Packed scheduling for Spark task...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/15541 @rxin Can you please take a look, and let me know if you have any concern? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84158910 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAcc
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84129486 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -109,6 +108,85 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } + test("Scheduler does not always schedule tasks on the same workers") { +val taskScheduler = setupScheduler() +roundrobin(taskScheduler) + } + + test("User can specify the roundrobin task assigner") { +val taskScheduler = setupScheduler(("spark.scheduler.taskAssigner", "RoUndrObin")) +roundrobin(taskScheduler) + } + + test("Fallback to roundrobin when the task assigner provided is not valid") { +val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "invalid") +roundrobin(taskScheduler) + } + + test("Scheduler balance the assignment to the worker with more free cores") { --- End diff -- Will change the test case name to make it more explicit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84119714 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAcc
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84002685 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAcc
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84002480 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -109,6 +108,85 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } + test("Scheduler does not always schedule tasks on the same workers") { +val taskScheduler = setupScheduler() +roundrobin(taskScheduler) + } + + test("User can specify the roundrobin task assigner") { +val taskScheduler = setupScheduler(("spark.scheduler.taskAssigner", "RoUndrObin")) +roundrobin(taskScheduler) + } + + test("Fallback to roundrobin when the task assigner provided is not valid") { +val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "invalid") +roundrobin(taskScheduler) + } + + test("Scheduler balance the assignment to the worker with more free cores") { --- End diff -- Can you please clarify? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84002353 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAcc
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r84002236 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAcc
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83999756 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAcc
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83998058 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAcc
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15541#discussion_r83997070 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { +this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAcc
[GitHub] spark issue #15541: [SPARK-17637][Scheduler]Packed scheduling for Spark task...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/15541 @rxin @gatorsmile Can you please take a look, and kindly provide your comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...
GitHub user zhzhan opened a pull request: https://github.com/apache/spark/pull/15541 [SPARK-17637][Scheduler]Packed scheduling for Spark tasks across executors ## What changes were proposed in this pull request? Restructure the code and implement two new task assigner. PackedAssigner: try to allocate tasks to the executors with least available cores, so that spark can release reserved executors when dynamic allocation is enabled. BalancedAssigner: try to allocate tasks to the executors with more available cores in order to balance the workload across all executors. By default, the original round robin assigner is used. We test a pipeline, and new PackedAssigner save around 45% regarding the reserved cpu and memory with dynamic allocation enabled. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) Both unit test in TaskSchedulerImplSuite and manual tests in production pipeline. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhzhan/spark TaskAssigner Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15541.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15541 commit 75cdd1a77a227fa492a09e93794d4ea7be8a020f Author: Zhan Zhang <zhanzh...@fb.com> Date: 2016-10-19T01:20:48Z TaskAssigner to support different scheduling algorithms --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15218: [SPARK-17637][Scheduler]Packed scheduling for Spark task...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/15218 @wangmiao1981 Thanks for reviewing this. I will open another PR solving these comments soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15218: [SPARK-17637][Scheduler]Packed scheduling for Spark task...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/15218 @rxin Thanks a lot for the detail review. I will update the patch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15218: [SPARK-17637][Scheduler]Packed scheduling for Spark task...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/15218 @mridulm Thanks for reviewing this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15218: [SPARK-17637][Scheduler]Packed scheduling for Spark task...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/15218 @mridulm You are right. This patch is mainly for the job that has multiple stages, which is very common in production pipeline. As you mentioned, if there is shuffle involved, getLocationsWithLargestOutputs in MapOutputTracker typically return None for the ShuffledRowRDD and ShuffledRDD because of the threshold REDUCER_PREF_LOCS_FRACTION (20%). The ShuffledRowRDD/ShuffleRDD can be easily more than 10 partitions (even hundreds) in real production pipeline, thus the patch does help a lot in CPU reservation time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15218: [SPARK-17637][Scheduler]Packed scheduling for Spark task...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/15218 @mridulm Thanks for the comments. Your concern regarding the locality is right. The patch does not change this behavior, which takes priority of locality preference. But if multiple executors satisfying the locality restriction, the policy will be applied. In our production pipeline, we do see a big gain with respect to reserved cpu resources when dynamic allocation is enabled. @kayousterhout Would you like take a look and provide your comments? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15218: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15218#discussion_r82321008 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.SparkConf + +case class OfferState(workOffer: WorkerOffer, var cores: Int) { + // Build a list of tasks to assign to each worker. + val tasks = new ArrayBuffer[TaskDescription](cores) +} + +abstract class TaskAssigner(conf: SparkConf) { + var offer: Seq[OfferState] = _ + val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) + + // The final assigned offer returned to TaskScheduler. + def tasks(): Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // construct the assigner by the workoffer. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => OfferState(o, o.cores)) + } + + // Invoked in each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Indicating whether there is offer available to be used by one round of Taskset assignment. + def hasNext(): Boolean + + // Next available offer returned to one round of Taskset assignment. + def getNext(): OfferState + + // Called by the TaskScheduler to indicate whether the current offer is accepted + // In order to decide whether the current is valid for the next offering. + def taskAssigned(assigned: Boolean): Unit + + // Release internally maintained resources. Subclass is responsible to + // release its own private resources. + def reset: Unit = { +offer = null + } +} + +class RoundRobinAssigner(conf: SparkConf) extends TaskAssigner(conf) { + var i = 0 + override def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = Random.shuffle(workOffer.map(o => OfferState(o, o.cores))) + } + override def init(): Unit = { +i = 0 + } + override def hasNext: Boolean = { +i < offer.size + } + override def getNext(): OfferState = { +offer(i) + } + override def taskAssigned(assigned: Boolean): Unit = { +i += 1 + } + override def reset: Unit = { +super.reset +i = 0 + } +} + +class BalancedAssigner(conf: SparkConf) extends TaskAssigner(conf) { --- End diff -- @mridulm Thanks for the comments. But I am lost here. My understanding is Ordering-wise, x is equal to y if x.cores == y.cores. This ordering is used by priority queue to construct the data structure. Following is an example from trait Ordering. PersonA will be equal to PersionB if they are the same age. Do I miss anything? * import scala.util.Sorting * * case class Person(name:String, age:Int) * val people = Array(Person("bob", 30), Person("ann", 32), Person("carl", 19)) * * // sort by age * object AgeOrdering extends Ordering[Person] { * def compare(a:Person, b:Person) = a.age compare b.age * } * Sorting.quickSort(people)(AgeOrdering) * }}} --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15218: [SPARK-17637][Scheduler]Packed scheduling for Spa...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/15218#discussion_r82290564 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.SparkConf + +case class OfferState(workOffer: WorkerOffer, var cores: Int) { + // Build a list of tasks to assign to each worker. + val tasks = new ArrayBuffer[TaskDescription](cores) +} + +abstract class TaskAssigner(conf: SparkConf) { + var offer: Seq[OfferState] = _ + val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) + + // The final assigned offer returned to TaskScheduler. + def tasks(): Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // construct the assigner by the workoffer. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => OfferState(o, o.cores)) + } + + // Invoked in each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Indicating whether there is offer available to be used by one round of Taskset assignment. + def hasNext(): Boolean + + // Next available offer returned to one round of Taskset assignment. + def getNext(): OfferState + + // Called by the TaskScheduler to indicate whether the current offer is accepted + // In order to decide whether the current is valid for the next offering. + def taskAssigned(assigned: Boolean): Unit + + // Release internally maintained resources. Subclass is responsible to + // release its own private resources. + def reset: Unit = { +offer = null + } +} + +class RoundRobinAssigner(conf: SparkConf) extends TaskAssigner(conf) { + var i = 0 + override def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = Random.shuffle(workOffer.map(o => OfferState(o, o.cores))) + } + override def init(): Unit = { +i = 0 + } + override def hasNext: Boolean = { +i < offer.size + } + override def getNext(): OfferState = { +offer(i) + } + override def taskAssigned(assigned: Boolean): Unit = { +i += 1 + } + override def reset: Unit = { +super.reset +i = 0 + } +} + +class BalancedAssigner(conf: SparkConf) extends TaskAssigner(conf) { --- End diff -- BTW, I don't think need to handle the case of x.cores == y.cores, which means they are equal, and depends on the algorithm in priority queue to decide the behavior. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15218: [SPARK-17637][Scheduler]Packed scheduling for Spark task...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/15218 @mridulm Thanks for review this. Will wait for a while in case there are more comments before solving it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15218: [SPARK-17637][Scheduler]Packed scheduling for Spark task...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/15218 @gatorsmile Thanks. #65832 is the latest one which does not have the same failure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15218: [SPARK-17637][Scheduler]Packed scheduling for Spark task...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/15218 retest please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15218: [SPARK-17637][Scheduler]Packed scheduling for Spark task...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/15218 Failed in DirectKafkaStreamSuite. It should has nothing to do with the patch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15218: [Spark-17637][Scheduler]Packed scheduling for Spa...
GitHub user zhzhan opened a pull request: https://github.com/apache/spark/pull/15218 [Spark-17637][Scheduler]Packed scheduling for Spark tasks across executors ## What changes were proposed in this pull request? Restructure the code and implement two new task assigner. PackedAssigner: try to allocate tasks to the executors with least available cores, so that spark can release reserved executors when dynamic allocation is enabled. BalancedAssigner: try to allocate tasks to the executors with more available cores in order to balance the workload across all executors. By default, the original round robin assigner is used. We test a pipeline, and new PackedAssigner save around 45% regarding the reserved cpu and memory with dynamic allocation enabled. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) Both unit test in TaskSchedulerImplSuite and manual tests in production pipeline. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhzhan/spark packed-scheduler Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15218.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15218 commit 97ee760f8acdacf73e7b8c9a1c65578821efb05c Author: Zhan Zhang <zhanzh...@fb.com> Date: 2016-09-18T23:16:22Z enable multiple task-worker allocation scheduling commit 3f094cf25a6bb7cb50365d47cd00fb84340d8c6c Author: Zhan Zhang <zhanzh...@fb.com> Date: 2016-09-18T23:21:09Z fix the configuration.md commit c3ebf9ca84f23d7c150cd1abc69955a7a62678ba Author: Zhan Zhang <zhanzh...@fb.com> Date: 2016-09-23T03:23:38Z formatting and change test cases --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15080: [SPARK-17526][Web UI]: Display the executor log links wi...
Github user zhzhan commented on the issue: https://github.com/apache/spark/pull/15080 @srowen Thanks for reviewing this. Any suggestion to improve it are welcomed. It does bother us a lot without being able to locate the debug log quickly in production. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15080: SPARK-17526: add log links in job failures
GitHub user zhzhan opened a pull request: https://github.com/apache/spark/pull/15080 SPARK-17526: add log links in job failures ## What changes were proposed in this pull request? Add the executor log links with the job failure message on Spark UI and Console ## How was this patch tested? The patch is manually tested. In Console: ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; stdout: stdout_log_link stderr: stderr_log_link; aborting job ... In WebUI ![screen shot 2016-09-13 at 9 33 06 am](https://cloud.githubusercontent.com/assets/4451616/18482552/72dd572a-7995-11e6-98be-9324f3750305.png) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhzhan/spark debug-log Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15080.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15080 commit 19fa189acfcaadcb1410564d1cb178076ac60406 Author: Zhan Zhang <zhanzh...@fb.com> Date: 2016-09-13T16:30:45Z SPARK-17526: add log links in job failures --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-15441][SQL] support null object in oute...
Github user zhzhan commented on the pull request: https://github.com/apache/spark/pull/13322#issuecomment-222432192 My understanding is that this new added hidden column is mainly for serdes object to/from row. How would you leverage it to solve the the out join case where the null object is actually added during query execution? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-12417. [SQL] Orc bloom filter options ar...
Github user zhzhan commented on the pull request: https://github.com/apache/spark/pull/10375#issuecomment-165844369 Any test cases to make sure it works as expected? Do you mind changing the orc ppd enabled as default or using another JIRA. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11562][SQL] Provide user an option to i...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/9553#discussion_r44243045 --- Diff: repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala --- @@ -78,16 +79,21 @@ object Main extends Logging { } sparkContext = new SparkContext(conf) logInfo("Created spark context..") +sqlContextWithHive = conf.getBoolean("spark.sql.hive.context", true) sparkContext } def createSQLContext(): SQLContext = { -val name = "org.apache.spark.sql.hive.HiveContext" +val name = sqlContextWithHive match { + case true => "org.apache.spark.sql.hive.HiveContext" + case false => "org.apache.spark.sql.SQLContext" +} + val loader = Utils.getContextOrSparkClassLoader try { sqlContext = loader.loadClass(name).getConstructor(classOf[SparkContext]) .newInstance(sparkContext).asInstanceOf[SQLContext] - logInfo("Created sql context (with Hive support)..") + logInfo("Created sql context with " + name) } catch { case _: java.lang.ClassNotFoundException | _: java.lang.NoClassDefFoundError => sqlContext = new SQLContext(sparkContext) --- End diff -- I don't know how to fix this, but the logic seems to be weird if SQLContext is used, then in the exception handler the context is created again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11562][SQL] Provide user an option to i...
Github user zhzhan commented on the pull request: https://github.com/apache/spark/pull/9553#issuecomment-154943558 Need document update for this new configuration. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11562][SQL] Provide user an option to i...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/9553#discussion_r44242983 --- Diff: repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala --- @@ -132,6 +132,7 @@ class SparkILoop( @DeveloperApi var sparkContext: SparkContext = _ var sqlContext: SQLContext = _ + var sqlContextWithHive: Boolean = _ --- End diff -- I don't think we need an extra var for this, and we can get the config directly since nowhere else will use it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11265] [YARN] YarnClient can't get toke...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/9232#discussion_r43204781 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala --- @@ -142,6 +145,97 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) ConverterUtils.toContainerId(containerIdString) } + + /** + * Obtains token for the Hive metastore, using the current user as the principal. + * Some exceptions are caught and downgraded to a log message. + * @param conf hadoop configuration; the Hive configuration will be based on this + * @return a token, or `None` if there's no need for a token (no metastore URI or principal + * in the config), or if a binding exception was caught and downgraded. + */ + def obtainTokenForHiveMetastore(conf: Configuration): Option[Token[DelegationTokenIdentifier]] = { +try { + obtainTokenForHiveMetastoreInner(conf, UserGroupInformation.getCurrentUser().getUserName) +} catch { + case e: Exception => { +handleTokenIntrospectionFailure("Hive", e) +None + } +} + } + + /** + * Handle failures to obtain a token through introspection. Failures to load the class are + * not treated as errors: anything else is. + * @param service service name for error messages + * @param thrown exception caught + * @throws Exception if the `thrown` exception isn't one that is to be ignored + */ + private[yarn] def handleTokenIntrospectionFailure(service: String, thrown: Throwable): Unit = { +thrown match { + case e: ClassNotFoundException => +logInfo(s"$service class not found $e") +logDebug("Hive Class not found", e) + case t: Throwable => { +throw t --- End diff -- Here the exception is thrown. I know swallow the exception is bad, but what happen if the user does not want to access the hive metastore but want to use spark even if token cannot be acquired? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10623] [SQL] Fixes ORC predicate push-d...
Github user zhzhan commented on the pull request: https://github.com/apache/spark/pull/8799#issuecomment-141355072 LGTM Thanks for fixing this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10623][SQL]: fix the predicate pushdown...
Github user zhzhan closed the pull request at: https://github.com/apache/spark/pull/8783 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10623][SQL]: fix the predicate pushdown...
Github user zhzhan commented on the pull request: https://github.com/apache/spark/pull/8783#issuecomment-141215436 @liancheng Thanks for review. Since https://github.com/apache/spark/pull/8799 is opened, which also fix another issue. I will close this one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10623][SQL]: fix the predicate pushdown...
GitHub user zhzhan opened a pull request: https://github.com/apache/spark/pull/8783 [SPARK-10623][SQL]: fix the predicate pushdown construction The predicate pushdown is not working because the construction is wrong. Fix it with startAnd/end You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhzhan/spark SPARK-10623 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/8783.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #8783 commit 6fcc9b1e8c1cac7a020fbb79fd9662b0126804d1 Author: Zhan Zhang <zzhang@hw11188.local> Date: 2015-09-16T21:44:15Z SPARK-10623: fix the predicate pushdown construction --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10623][SQL]: fix the predicate pushdown...
Github user zhzhan commented on the pull request: https://github.com/apache/spark/pull/8783#issuecomment-140941698 @liancheng @marmbrus Can you help to review it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10304][SQL]: throw error when the table...
Github user zhzhan commented on the pull request: https://github.com/apache/spark/pull/8547#issuecomment-136610247 Adding an PartitionValues.empty does not cover all problems. Will close this PR, and investigate other approaches. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10304][SQL]: throw error when the table...
Github user zhzhan closed the pull request at: https://github.com/apache/spark/pull/8547 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10304][SQL]: throw error when the table...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/8547#discussion_r38391122 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala --- @@ -436,7 +436,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio Try(fs.listStatus(qualified)).getOrElse(Array.empty) }.filterNot { status => val name = status.getPath.getName - name.toLowerCase == "_temporary" || name.startsWith(".") + // Is it safe to replace "_temporary" to "_"? --- End diff -- Thanks for the comments. It seems there is a lot of corner cases to be covered from the test case. for example 1st is valid, but 2nd is not: 1st: "hdfs://host:9000/path/_temporary", "hdfs://host:9000/path/a=10/b=20", "hdfs://host:9000/path/_temporary/path", 2nd: "hdfs://host:9000/path/_temporary", "hdfs://host:9000/path/a=10/b=20", "hdfs://host:9000/path/path1", Adding an PartitionValues.empty does not solve the problem. Will close this PR, and investigate other approaches. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10304][SQL]: throw error when the table...
GitHub user zhzhan opened a pull request: https://github.com/apache/spark/pull/8547 [SPARK-10304][SQL]: throw error when the table directory is invalid Throw error if the directory of a table is invalid, validated by either all files in the directory are partitioned, or none of them are partitioned. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhzhan/spark SPARK-10304 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/8547.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #8547 commit be5522d24a39da20fc8d08b13e15def8e9139cc3 Author: Zhan Zhang <zzhang@hw11188.local> Date: 2015-09-01T05:01:18Z SPARK-10304: throw error when the table directory is invalid --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9170][SQL] Instead of StandardStructObj...
Github user zhzhan commented on the pull request: https://github.com/apache/spark/pull/7520#issuecomment-135323382 @viirya Take a quick second look at the issue. As @chenghao-intel mentioned, since normalizing the name(to lower case) is the default behavior. Should we fix it in the following place(in StructType.scala) instead? Probably I am missing something. def apply(name: String): StructField = { nameToField.getOrElse(name.toLowerCase, throw new IllegalArgumentException(sField $name does not exist.)) } --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9170][SQL] Instead of StandardStructObj...
Github user zhzhan commented on the pull request: https://github.com/apache/spark/pull/7520#issuecomment-135324209 Also we need to change private lazy val nameToField: Map[String, StructField] = fields.map(f = f.name.toLowerCase - f).toMap --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9170][SQL] User-provided columns should...
Github user zhzhan commented on the pull request: https://github.com/apache/spark/pull/7520#issuecomment-135332239 @liancheng have more insights on this part. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9613] [CORE] [WIP] Ban use of JavaConve...
Github user zhzhan commented on the pull request: https://github.com/apache/spark/pull/8033#issuecomment-131959546 @srowen It seems that the mapping got messed up, which I don't have clue yet and didn't find any obvious reason why the patch can break the test. I will dig more and let you know if I have any new findings. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9613] [CORE] [WIP] Ban use of JavaConve...
Github user zhzhan commented on the pull request: https://github.com/apache/spark/pull/8033#issuecomment-131960610 @srowen Probably you can revert back the change in sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9170][SQL] Instead of StandardStructObj...
Github user zhzhan commented on the pull request: https://github.com/apache/spark/pull/7520#issuecomment-127465813 LGTM. Will let @liancheng take a final look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9170][SQL] Instead of StandardStructObj...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/7520#discussion_r35396110 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala --- @@ -86,19 +86,10 @@ private[orc] class OrcOutputWriter( TypeInfoUtils.getTypeInfoFromTypeString( HiveMetastoreTypes.toMetastoreType(dataSchema)) -TypeInfoUtils - .getStandardJavaObjectInspectorFromTypeInfo(typeInfo) - .asInstanceOf[StructObjectInspector] +OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo]) + .asInstanceOf[SettableStructObjectInspector] --- End diff -- Is it safe to cast to SettableStructObjectInspector, because the function signature is ObjectInspector createObjectInspector. Although the current implementation may all return SettableStructObjectInspector. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9170][SQL] Instead of StandardStructObj...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/7520#discussion_r35395830 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala --- @@ -120,15 +111,11 @@ private[orc] class OrcOutputWriter( } override def write(row: Row): Unit = { -var i = 0 -while (i row.length) { - reusableOutputBuffer(i) = wrappers(i)(row(i)) - i += 1 -} +val orcRow = wrap(row, structOI) --- End diff -- Looks like this call will create a new object for each row written instead of reuse reusableOutputBuffer. Is it a concern? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9170][SQL] Instead of StandardStructObj...
Github user zhzhan commented on the pull request: https://github.com/apache/spark/pull/7520#issuecomment-124335510 LGTM with the comments answered or resolved. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9170][SQL] Instead of StandardStructObj...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/7520#discussion_r35337326 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala --- @@ -85,18 +85,11 @@ private[orc] class OrcOutputWriter( TypeInfoUtils.getTypeInfoFromTypeString( HiveMetastoreTypes.toMetastoreType(dataSchema)) -TypeInfoUtils - .getStandardJavaObjectInspectorFromTypeInfo(typeInfo) - .asInstanceOf[StructObjectInspector] +OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo]) + .asInstanceOf[SettableStructObjectInspector] } - // Used to hold temporary `Writable` fields of the next row to be written. - private val reusableOutputBuffer = new Array[Any](dataSchema.length) - - // Used to convert Catalyst values into Hadoop `Writable`s. - private val wrappers = structOI.getAllStructFieldRefs.map { ref = -wrapperFor(ref.getFieldObjectInspector) - }.toArray + private val allStructFieldRefs = structOI.getAllStructFieldRefs --- End diff -- I didn't see any usage for this variable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8501] [SQL] Avoids reading schema from ...
Github user zhzhan commented on the pull request: https://github.com/apache/spark/pull/7200#issuecomment-118190051 some minor comments. Overall, LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8501] [SQL] Avoids reading schema from ...
Github user zhzhan commented on the pull request: https://github.com/apache/spark/pull/7200#issuecomment-118187344 @liancheng Because in spark, we will not create the orc file if the record is empty. It is only happens with the ORC file created by hive, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8501] [SQL] Avoids reading schema from ...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/7200#discussion_r33831074 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala --- @@ -24,30 +24,58 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector import org.apache.spark.Logging import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.hive.HiveMetastoreTypes import org.apache.spark.sql.types.StructType -private[orc] object OrcFileOperator extends Logging{ - def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = { +private[orc] object OrcFileOperator extends Logging { + // TODO Needs to consider all files when schema evolution is taken into account. + def getFileReader(basePath: String, config: Option[Configuration] = None): Option[Reader] = { +def isWithNonEmptySchema(path: Path, reader: Reader): Boolean = { + reader.getObjectInspector match { +case oi: StructObjectInspector if oi.getAllStructFieldRefs.size() 0 = + true +case oi: StructObjectInspector if oi.getAllStructFieldRefs.size() == 0 = + logInfo( +sORC file $path has empty schema, it probably contains no rows. + + Trying to read another ORC file to figure out the schema.) + false +case _ = false --- End diff -- In what situation, will the third case happen? If not exist, can we collapse the 2nd and 3rd case? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-5111][SQL]HiveContext and Thriftserver ...
Github user zhzhan commented on the pull request: https://github.com/apache/spark/pull/4064#issuecomment-112875139 @WangTaoTheTonic The problem happens with spark-1.3 and hadoop-2.6 in kerberos cluster. With hive-0.14 support, I suppose the problem may be gone, but I didn't verify it yet. I will close it since hive-0.14 is supported. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-5111][SQL]HiveContext and Thriftserver ...
Github user zhzhan closed the pull request at: https://github.com/apache/spark/pull/4064 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7009] repackaging spark assembly jar wi...
Github user zhzhan closed the pull request at: https://github.com/apache/spark/pull/5637 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7009] repackaging spark assembly jar wi...
Github user zhzhan commented on the pull request: https://github.com/apache/spark/pull/5637#issuecomment-112615055 Close this PR, as it may be outdated with latest spark upstream and not working. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org